Documentation ¶
Overview ¶
Package gevent is used for decoupling domain event and improving performance main usage scenarios: 1. side business separating from main workflow, like system log etc. 2. delayed event to check result except polling, like checking pay result status 3. de-couple layer event without cycling call 4. notify downstream while domain event happening,like after pay finished etc./*
Index ¶
- Variables
- func Dispatch(ctx context.Context, topic Topic, evt Event, dispatcher Dispatcher, ...) error
- func DispatchLocal(ctx context.Context, tpc Topic, evt Event) error
- func Local() *localDispatcher
- func LocalInit(options ...Option) *localDispatcher
- type ClientSimple
- func (s *ClientSimple) DLock(ctx context.Context, key, value string, expire time.Duration) bool
- func (s *ClientSimple) DUnlock(ctx context.Context, key, value string) bool
- func (s *ClientSimple) ZAdd(ctx context.Context, key string, score float64, evt interface{}) error
- func (s *ClientSimple) ZRangeByScore(ctx context.Context, key string, scoreTo float64, start int, stop int) ([]string, error)
- func (s *ClientSimple) ZRem(ctx context.Context, key string, score float64, evt interface{}) error
- type DelayedEvent
- type DispatchedEvent
- type DispatchedHeap
- type Dispatcher
- type DistributeLocker
- type Event
- type HandleFunc
- type Handler
- type Option
- type OptionMaxQueued
- type OptionMaxRetry
- type OptionParallelRoutines
- type OptionTaskMaxDuration
- type OptionTickerInterval
- type Options
- type RedisClient
- type Topic
Constants ¶
This section is empty.
Variables ¶
var ( ErrDispatcherNotWorking = errors.New("dispatcher is not working") ErrNoDispatcherSpecified = errors.New("no dispatcher specified") ErrNoTopicSpecified = errors.New("no topic specified") ErrNoEventSpecified = errors.New("no Event specified") ErrNoHandlerFound = errors.New("no handler found") ErrDuplicateInitialized = errors.New("duplicate initialized dispatcher") ErrMaxQueuedEventsReached = errors.New("max queued heap reached") ErrConcurrentModificationEvent = errors.New("concurrent modification event") )
Functions ¶
func Dispatch ¶
func Dispatch(ctx context.Context, topic Topic, evt Event, dispatcher Dispatcher, dispatchers ...Dispatcher) error
Dispatch dispatch a event to a topic, support multi dispatchers to receive events should keep idempotent for event handling
Types ¶
type ClientSimple ¶
ClientSimple a redis client uses redigo, can be replaced by other redis clientWrapper
func (*ClientSimple) DUnlock ¶
func (s *ClientSimple) DUnlock(ctx context.Context, key, value string) bool
DUnlock unlock with lua script
type DelayedEvent ¶
DelayedEvent indicate event that needs be handled delay
type DispatchedEvent ¶
type DispatchedEvent struct { EmitAt time.Time `json:"emit_at"` Event Event `json:"event"` Retry int32 `json:"retry"` }
DispatchedEvent event after dispatched, with emit time and event body
func (DispatchedEvent) JsonStable ¶
func (e DispatchedEvent) JsonStable() string
JsonStable convert event to stable ordered json by marshalling event twice to keep the same json output
type DispatchedHeap ¶
type DispatchedHeap []*DispatchedEvent
DispatchedHeap event heap, use heap.Push, heap.Pop to push and pop element the event happens nearly current time will be pop first
func (DispatchedHeap) Len ¶
func (h DispatchedHeap) Len() int
func (DispatchedHeap) Less ¶
func (h DispatchedHeap) Less(i, j int) bool
func (*DispatchedHeap) Pop ¶
func (h *DispatchedHeap) Pop() interface{}
func (*DispatchedHeap) Push ¶
func (h *DispatchedHeap) Push(x interface{})
func (DispatchedHeap) Swap ¶
func (h DispatchedHeap) Swap(i, j int)
type Dispatcher ¶
type Dispatcher interface { // Dispatch dispatch an event to special topic Dispatch(ctx context.Context, tpc Topic, evt Event) error // Register register a handle func Register(ctx context.Context, tpc Topic, handler HandleFunc) error // Remove remove a handle func Remove(ctx context.Context, tpc Topic, handler HandleFunc) bool // Stop stop localDispatcher Stop(ctx context.Context) }
Dispatcher allow dispatching event, registering handler and remove handler
func NewRedis ¶
func NewRedis(client RedisClient, opts ...Option) Dispatcher
NewRedis create a new redis redisDispatcher
func NewRedisWithContext ¶
func NewRedisWithContext(ctx context.Context, client RedisClient, opts ...Option) Dispatcher
NewRedisWithContext create a redis redisDispatcher that relates to ctx, if ctx is cancelled, redisDispatcher will stop
type DistributeLocker ¶
type DistributeLocker interface { //DLock a distribute lock to hold event removing and avoid duplicating consume DLock(ctx context.Context, key, value string, expiration time.Duration) bool //DUnlock remove lock after finishing, should use cas or lua script DUnlock(ctx context.Context, key, value string) bool }
DistributeLocker a distribute locker will lock while handling or removing event to avoid duplicate operation it is better to be implemented if services are distribute deployed
type Event ¶
type Event interface{}
Event indicate a event to dispatch, should marked with json marshall tag and be idempotent handling
type HandleFunc ¶
HandleFunc simply func to handle func
func ToHandleFunc ¶
func ToHandleFunc(handler Handler) HandleFunc
ToHandleFunc convert interface Handler to func for easy invoke
type Handler ¶
Handler interface to handle a event, strut can implement this need convert to func use ToHandleFunc() while registering to a localDispatcher
type Option ¶
type Option interface{}
Option option for different localDispatcher config
func WithMaxQueued ¶
func WithMaxRetry ¶
func WithParallelThreshold ¶
WithParallelThreshold set go-routines threshold for parallel running in a topic, suggest between [10,40]
func WithTaskMaxDuration ¶
func WithTickerInterval ¶
type OptionMaxQueued ¶
type OptionMaxQueued struct {
MaxQueued int64
}
OptionMaxQueued indicates how many events should be queued, especially valid for local localDispatcher
type OptionMaxRetry ¶
type OptionMaxRetry struct {
Retry int32
}
OptionMaxRetry indicates max retry count if handler failed
type OptionParallelRoutines ¶
type OptionParallelRoutines struct {
Threshold int
}
OptionParallelRoutines indicates routines will launch for handling each topic
type OptionTaskMaxDuration ¶
OptionTaskMaxDuration indicates max duration which the task will execute
type OptionTickerInterval ¶
OptionTickerInterval indicates interval for localDispatcher ticks
type Options ¶
type Options struct { MaxQueuedEvents int64 TickerInterval time.Duration ParallelThreshold int MaxRetry int32 MaxTaskExecuteDuration time.Duration }
Options parsed config for dispatchers
type RedisClient ¶
type RedisClient interface { //ZAdd add event to a sorted set, the score should be the time that event will emit ZAdd(ctx context.Context, key string, score float64, evt interface{}) error //ZRem remove event from a sorted set, better to use evt and score to avoid remove the wrong event ZRem(ctx context.Context, key string, score float64, evt interface{}) error //ZRangeByScore list (stop - start) events while score are lower than scoreTo ZRangeByScore(ctx context.Context, key string, scoreTo float64, start, stop int) ([]string, error) }
RedisClient Redis clientWrapper to add\rem\range events, better to also implements gevent.DistributeLocker to control event handling and event removing