gevent

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2021 License: Apache-2.0 Imports: 14 Imported by: 0

README

gevent imply go-event which tries to make event handling easier.

What does gevent want to do

  1. Async execute jobs safely without too many go routines.
  2. Support delayed events to execute.
  3. Support to config executing options

Main usage scenarios:

  1. Separate side business from main, like system log which should be async.
  2. Use delayed event to confirm order status or pay status.
  3. Decouple domain events to avoid cycling call.
  4. Notify downstream while domain event happening.

Attention

  1. Instant event may delay a few milliseconds while dispatching.
  2. Local events are not durable, use redis durable model for distribute systems.

TODO

  1. Support use rmq etc.
  2. Benchmark

Any question or suggestion, please let me know.

How to use:

    go get github.com/ltwonders/gevent

Example:

package main

import (
	"context"
	"log"
	"time"

	"github.com/alicebob/miniredis"
	"github.com/gomodule/redigo/redis"
	"github.com/ltwonders/gevent"
)

func main() {
	ctx := context.Background()
	localDispatcher := gevent.LocalInit()
	s, err0 := miniredis.Run()
	if err0 != nil {
		panic(err0)
	}
	defer s.Close()

	pool := &redis.Pool{
		MaxIdle: 2,
		Dial:    func() (redis.Conn, error) { return redis.Dial("tcp", s.Addr()) },
	}
	redisDispatcher := gevent.NewRedisWithContext(ctx, &gevent.ClientSimple{Pool: pool})

	//annoy func to handle event
	var annoyFunc = func(ctx context.Context, evt gevent.Event) error {
		log.Printf("instant event [%+v] start at %+v", evt, time.Now().Second())
		time.Sleep(2 * time.Second)
		log.Printf("instant event [%+v] finish at %+v", evt, time.Now().Second())
		return nil
	}
	if err1 := localDispatcher.Register(ctx, "instant", annoyFunc); nil != err1 {
		log.Printf("fail to register local handler")
	}

	type instantEvent struct {
		ID int
	}

	// dispatch events
	for i := 1; i <= 100; i++ {
		inst := &instantEvent{ID: i}
		if err := gevent.Dispatch(ctx, "instant", inst, localDispatcher, redisDispatcher); nil != err {
			log.Printf("dispatch failed: %s", err)
		}
	}
	if err2 := redisDispatcher.Register(ctx, "instant", annoyFunc); nil != err2 {
		log.Printf("fail to register redis handler")
	}

	time.Sleep(200 * time.Second)
}

more examples, see example package

Documentation

Overview

Package gevent is used for decoupling domain event and improving performance main usage scenarios: 1. side business separating from main workflow, like system log etc. 2. delayed event to check result except polling, like checking pay result status 3. de-couple layer event without cycling call 4. notify downstream while domain event happening,like after pay finished etc./*

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrDispatcherNotWorking        = errors.New("dispatcher is not working")
	ErrNoDispatcherSpecified       = errors.New("no dispatcher specified")
	ErrNoTopicSpecified            = errors.New("no topic specified")
	ErrNoEventSpecified            = errors.New("no Event specified")
	ErrNoHandlerFound              = errors.New("no handler found")
	ErrDuplicateInitialized        = errors.New("duplicate initialized dispatcher")
	ErrMaxQueuedEventsReached      = errors.New("max queued heap reached")
	ErrConcurrentModificationEvent = errors.New("concurrent modification event")
)

Functions

func Dispatch

func Dispatch(ctx context.Context, topic Topic, evt Event, dispatcher Dispatcher, dispatchers ...Dispatcher) error

Dispatch dispatch a event to a topic, support multi dispatchers to receive events should keep idempotent for event handling

func DispatchLocal

func DispatchLocal(ctx context.Context, tpc Topic, evt Event) error

func Local

func Local() *localDispatcher

func LocalInit

func LocalInit(options ...Option) *localDispatcher

LocalInit create local localDispatcher as need, init will default options if no options pass

Types

type ClientSimple

type ClientSimple struct {
	Pool *redis.Pool
	sync.Mutex
}

ClientSimple a redis client uses redigo, can be replaced by other redis clientWrapper

func (*ClientSimple) DLock

func (s *ClientSimple) DLock(ctx context.Context, key, value string, expire time.Duration) bool

DLock lock with setnx or set px or set ex

func (*ClientSimple) DUnlock

func (s *ClientSimple) DUnlock(ctx context.Context, key, value string) bool

DUnlock unlock with lua script

func (*ClientSimple) ZAdd

func (s *ClientSimple) ZAdd(ctx context.Context, key string, score float64, evt interface{}) error

ZAdd use ZADD to add evt to key with score

func (*ClientSimple) ZRangeByScore

func (s *ClientSimple) ZRangeByScore(ctx context.Context, key string, scoreTo float64, start int, stop int) ([]string, error)

ZRangeByScore use ZRANGEBYSCORE list (stop - start) events from -inf to scoreTo, returns json-marshalled string slice

func (*ClientSimple) ZRem

func (s *ClientSimple) ZRem(ctx context.Context, key string, score float64, evt interface{}) error

ZRem use ZREM to remove evt with score from key

type DelayedEvent

type DelayedEvent interface {
	Delayed() time.Duration
	Event
}

DelayedEvent indicate event that needs be handled delay

type DispatchedEvent

type DispatchedEvent struct {
	EmitAt time.Time `json:"emit_at"`
	Event  Event     `json:"event"`
	Retry  int32     `json:"retry"`
}

DispatchedEvent event after dispatched, with emit time and event body

func (DispatchedEvent) JsonStable

func (e DispatchedEvent) JsonStable() string

JsonStable convert event to stable ordered json by marshalling event twice to keep the same json output

type DispatchedHeap

type DispatchedHeap []*DispatchedEvent

DispatchedHeap event heap, use heap.Push, heap.Pop to push and pop element the event happens nearly current time will be pop first

func (DispatchedHeap) Len

func (h DispatchedHeap) Len() int

func (DispatchedHeap) Less

func (h DispatchedHeap) Less(i, j int) bool

func (*DispatchedHeap) Pop

func (h *DispatchedHeap) Pop() interface{}

func (*DispatchedHeap) Push

func (h *DispatchedHeap) Push(x interface{})

func (DispatchedHeap) Swap

func (h DispatchedHeap) Swap(i, j int)

type Dispatcher

type Dispatcher interface {

	// Dispatch dispatch an event to special topic
	Dispatch(ctx context.Context, tpc Topic, evt Event) error

	// Register register a handle func
	Register(ctx context.Context, tpc Topic, handler HandleFunc) error

	// Remove remove a handle func
	Remove(ctx context.Context, tpc Topic, handler HandleFunc) bool

	// Stop stop localDispatcher
	Stop(ctx context.Context)
}

Dispatcher allow dispatching event, registering handler and remove handler

func NewRedis

func NewRedis(client RedisClient, opts ...Option) Dispatcher

NewRedis create a new redis redisDispatcher

func NewRedisWithContext

func NewRedisWithContext(ctx context.Context, client RedisClient, opts ...Option) Dispatcher

NewRedisWithContext create a redis redisDispatcher that relates to ctx, if ctx is cancelled, redisDispatcher will stop

type DistributeLocker

type DistributeLocker interface {

	//DLock a distribute lock to hold event removing and avoid duplicating consume
	DLock(ctx context.Context, key, value string, expiration time.Duration) bool

	//DUnlock remove lock after finishing, should use cas or lua script
	DUnlock(ctx context.Context, key, value string) bool
}

DistributeLocker a distribute locker will lock while handling or removing event to avoid duplicate operation it is better to be implemented if services are distribute deployed

type Event

type Event interface{}

Event indicate a event to dispatch, should marked with json marshall tag and be idempotent handling

type HandleFunc

type HandleFunc func(ctx context.Context, evt Event) error

HandleFunc simply func to handle func

func ToHandleFunc

func ToHandleFunc(handler Handler) HandleFunc

ToHandleFunc convert interface Handler to func for easy invoke

type Handler

type Handler interface {
	Handle(ctx context.Context, evt Event) error
}

Handler interface to handle a event, strut can implement this need convert to func use ToHandleFunc() while registering to a localDispatcher

type Option

type Option interface{}

Option option for different localDispatcher config

func WithMaxQueued

func WithMaxQueued(m int64) Option

func WithMaxRetry

func WithMaxRetry(retry int32) Option

func WithParallelThreshold

func WithParallelThreshold(threshold int) Option

WithParallelThreshold set go-routines threshold for parallel running in a topic, suggest between [10,40]

func WithTaskMaxDuration

func WithTaskMaxDuration(d time.Duration) Option

func WithTickerInterval

func WithTickerInterval(d time.Duration) Option

type OptionMaxQueued

type OptionMaxQueued struct {
	MaxQueued int64
}

OptionMaxQueued indicates how many events should be queued, especially valid for local localDispatcher

type OptionMaxRetry

type OptionMaxRetry struct {
	Retry int32
}

OptionMaxRetry indicates max retry count if handler failed

type OptionParallelRoutines

type OptionParallelRoutines struct {
	Threshold int
}

OptionParallelRoutines indicates routines will launch for handling each topic

type OptionTaskMaxDuration

type OptionTaskMaxDuration struct {
	MaxDuration time.Duration
}

OptionTaskMaxDuration indicates max duration which the task will execute

type OptionTickerInterval

type OptionTickerInterval struct {
	Interval time.Duration
}

OptionTickerInterval indicates interval for localDispatcher ticks

type Options

type Options struct {
	MaxQueuedEvents        int64
	TickerInterval         time.Duration
	ParallelThreshold      int
	MaxRetry               int32
	MaxTaskExecuteDuration time.Duration
}

Options parsed config for dispatchers

type RedisClient

type RedisClient interface {
	//ZAdd add event to a sorted set, the score should be the time that event will emit
	ZAdd(ctx context.Context, key string, score float64, evt interface{}) error
	//ZRem remove event from a sorted set, better to use evt and score to avoid remove the wrong event
	ZRem(ctx context.Context, key string, score float64, evt interface{}) error
	//ZRangeByScore list (stop - start) events while score are lower than scoreTo
	ZRangeByScore(ctx context.Context, key string, scoreTo float64, start, stop int) ([]string, error)
}

RedisClient Redis clientWrapper to add\rem\range events, better to also implements gevent.DistributeLocker to control event handling and event removing

type Topic

type Topic string

Topic each event has a topic to dispatch

func (Topic) String

func (t Topic) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL