etcdq

package
v0.0.0-...-8f86345 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2021 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

*

  • @Author: hiram
  • @Date: 2021/3/18 13:40

*

  • @Author: hiram
  • @Date: 2021/3/18 13:41

*

  • @Author: hiram
  • @Date: 2021/3/18 13:41

*

  • @Author: hiram
  • @Date: 2021/3/18 13:42

*

  • @Author: hiram
  • @Date: 2021/3/18 13:43

*

  • @Author: hiram
  • @Date: 2021/3/18 13:43

*

  • @Author: hiram
  • @Date: 2021/3/18 13:44

*

  • @Author: hiram
  • @Date: 2021/3/18 13:44

Index

Constants

View Source
const (
	ResourcePrefix = "/resources/" // custom resource operation, generate events
	WatcherPrefix  = "/watchers/"  // global single watcher, watch events
	QueuePrefix    = "/queues/"    // central queue, storage events
	HandlerPrefix  = "/handlers/"  // handler's own events to handle
	LockPrefix     = "/locks"      // handler's registered lock
)
View Source
const (
	WatchSleep    = time.Millisecond * 100
	ScheduleSleep = time.Millisecond * 100
	HandlerSleep  = time.Millisecond * 100
	MonitorSleep  = time.Second * 2
	ErrorSleep    = time.Second * 5
	CtxTimeout    = time.Second * 10
)
View Source
const (
	MaxCallSendMsgSize = 512 * 1024 * 1024
	MaxCallRecvMsgSize = 512 * 1024 * 1024
	MaxTxnOps          = 32768
)

./etcd.exe --max-txn-ops="32768" --max-request-bytes="536870912"

Variables

View Source
var (
	Debug *log.Logger
	Info  *log.Logger
	Error *log.Logger
)
View Source
var Exists = struct{}{}

Functions

func Hash

func Hash(s string) int

func HostName

func HostName() string

func ObjectToString

func ObjectToString(object interface{}) string

func Pid

func Pid() string

func RunHandle

func RunHandle()

func RunWatch

func RunWatch()

func Stamp

func Stamp() int64

func StringToObject

func StringToObject(str string, object interface{})

Types

type Client

type Client interface {
	ReadItem(string) KV
	ReadItems(string) KVs
	WriteItem(string, string)
	DeleteItem(string)
	Atomic(writes KVs, deletes Ks) *v3.TxnResponse
	WatchPrefix(prefix string, startRevision int64, eventCh chan Events, stopCh chan struct{})
	SpinLock(name string, ttl int64) func() bool // A distributed lock, return func for lockAliveCheck
}

type Etcd

type Etcd struct {
	MaxTxnOps int
	// contains filtered or unexported fields
}

func NewEtcd

func NewEtcd(endPoint string, dialTimeout time.Duration) Etcd

func (*Etcd) Atomic

func (e *Etcd) Atomic(writes KVs, deletes Ks) *v3.TxnResponse

func (*Etcd) DeleteItem

func (e *Etcd) DeleteItem(key string)

func (*Etcd) ReadItem

func (e *Etcd) ReadItem(key string) KV

func (*Etcd) ReadItems

func (e *Etcd) ReadItems(key string) KVs

func (*Etcd) SpinLock

func (e *Etcd) SpinLock(name string, ttl int64) func() bool

func (*Etcd) WatchPrefix

func (e *Etcd) WatchPrefix(prefix string, startRevision int64, eventCh chan Events, stopCh chan struct{})

func (*Etcd) WriteItem

func (e *Etcd) WriteItem(key string, value string)

type Event

type Event struct {
	// contains filtered or unexported fields
}

type Event_EventType

type Event_EventType int32
const (
	PUT    Event_EventType = 0
	DELETE Event_EventType = 1
)

type Events

type Events []Event

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

func NewHandler

func NewHandler(etcd *Etcd, resource string, handleFunc func(task KV) bool, mergeFunc func(taskPrefix string)) Handler

func (*Handler) Execute

func (h *Handler) Execute()

func (*Handler) GroupBy

func (h *Handler) GroupBy(tasks *KVs)

func (*Handler) Merge

func (h *Handler) Merge(prefix string)

func (*Handler) Receive

func (h *Handler) Receive() KVs

func (*Handler) Run

func (h *Handler) Run()

type KV

type KV struct {
	// contains filtered or unexported fields
}

type KVs

type KVs []KV

type Ks

type Ks []string

type Q

type Q interface {
	// contains filtered or unexported methods
}

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue(etcd *Etcd, name string) Queue

type QueueValue

type QueueValue struct {
	EventType Event_EventType
	Value     interface{}
}

type Schedule

type Schedule interface {
	InitPrevAliveHandlers(handlerPrefix string)
	Distribute(handlerPrefix string, queue Queue)
	Redistribute(handlerLockPrefix string, queue Queue)
}

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(etcd *Etcd) Scheduler

func (*Scheduler) Distribute

func (s *Scheduler) Distribute(handlerPrefix string, queue Queue)

func (*Scheduler) InitPrevAliveHandlers

func (s *Scheduler) InitPrevAliveHandlers(handlerPrefix string)

func (*Scheduler) Redistribute

func (s *Scheduler) Redistribute(handlerLockPrefix string, queue Queue)

type Set

type Set struct {
	// contains filtered or unexported fields
}

func NewSet

func NewSet() Set

func (*Set) Add

func (s *Set) Add(item interface{})

func (*Set) Clear

func (s *Set) Clear()

func (*Set) Contains

func (s *Set) Contains(item interface{}) bool

func (*Set) Size

func (s *Set) Size() int

type SyncHandle

type SyncHandle interface {
	Receive() KVs            // receive events distributed from queue
	GroupBy(*KVs)            // group tasks by prefix
	Merge(taskPrefix string) // merge events of one item

	Execute() // execute tasks with WaitGroup
	// contains filtered or unexported methods
}

type Watch

type Watch interface {
	WatchPrefix(stopCh chan struct{})
	// contains filtered or unexported methods
}

type WatchScheduleGroup

type WatchScheduleGroup struct {
	// contains filtered or unexported fields
}

func NewWatchScheduleGroup

func NewWatchScheduleGroup(resource string, etcd *Etcd) WatchScheduleGroup

func (WatchScheduleGroup) Run

func (wsg WatchScheduleGroup) Run()

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

func NewWatcher

func NewWatcher(etcd *Etcd, queue Queue, resource string) Watcher

func (*Watcher) WatchPrefix

func (w *Watcher) WatchPrefix(stopCh chan struct{})

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL