futurama

package module
v0.0.0-...-1e6017f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 2, 2023 License: MIT Imports: 20 Imported by: 0

README

Futurama

Futurama is a mysql backed priority queue for scheduling delayed events, written in Go.

  • It allows you to execute events at specified time in future.
  • Different from background batch job targeted job queues, it is good at handling simple/small tasks (e.g. calling a http API after 30sec) under heavy production load.
  • It has a stateless design so that you can run multiple instances for load balancing or failover.
  • It is empowering one of top ranked MMO strategy game - War of Nations.
  • It is supposed to support multiple backends for storing events, for the time being only MySQL is officially implemented.

Requirements

Well tested with:

  • Go >= 1.3
  • MySQL >= 5.5

Basic usage

// create a queue
q := futurama.CreateQueue(config, map[string]TriggerInterface{
      triggerType: trigger,
      ...
})
q.Start()

// schedule an event
triggerTime := time.Now().Add(3 * time.Second)
evId := q.Create(triggerType, triggerTime, triggerParam)

// cancel the event
if we_want_to_cancel {
  q.Cancel(evId)
}

// stop the queue
q.Stop()
  • Create an event queue with futurama.CreateQueue.
    • 1st arg config provides access to MySQL .. etc.
    • 2nd arg provides triggers with a map of string -> TriggerInterface, these triggers are used to process events.
  • Calling q.Create() with triggerType(string), triggerTime(time.Time) and triggerParams(interface{}) will add an event to queue.
    • Later at triggerTime, a trigger associated with triggerType will be called.
  • q.Create() returns the id(string) of created event, id can be use for cancelling the event.
Config

By default, futurama connects to local MySQL server (host=127.0.0.1, port=3306). You can either create a default config and replace with customized values:

config := futurama.DefaultConfig()
config.Host = "example.com"
config.User = "dev"
config.Pass = "..."

q := futurama.CreateQueue(config, triggers)
...

or load config values from a json file - a Config object is json deserializable:

configFile := "queue.json"
config := futurama.DefaultConfig()
file, _ := ioutil.ReadFile(configFile)
json.Unmarshal(file, config)

q := futurama.CreateQueue(config, triggers)
...

inside queue.json:

{
  "mysql6": false,
  "username": "dev",
  "password": "...",
  "host": "example.com"
} // see config.go for more setting options 

NOTE: By enabling mysql6, scheduled time can be specified in millisecond (and futurama needs to actually connect to a MySQL server that supports DATETIME(6))

Triggers

A trigger can be any go struct that implements TriggerInterface (see interface.go)

type TriggerResult struct {
	Status      EventStatus
	TriggerTime time.Time
	Data        interface{}
}

type TriggerInterface interface {
 Trigger(ev *Event) *TriggerResult
}
  • ev *Event is created by q.Create(triggerType, triggerTime, triggerParam)
  • Trigger function is called at triggerTime, it can access triggerParam through ev.Data

Retry on failures(Backoff)

  • Events will be re-scheduled if Trigger function failed (return ``TriggerResult.Status = EventStatus_RETRY```)
  • Re-scheduled triggerTime is delayed upon failures by following exponential backoff
  • Max number of re-attempts is 18 by default, it can be configured by Config.SchedulerConfig.MaxRetry, after MaxRetry, the event will be forgotten (removed from DB) ...

Running test

go test -v -logtostderr

Documentation

Index

Constants

View Source
const (
	EventStatus_DEFAULT = 1 + iota
	EventStatus_OK
	EventStatus_CANCEL
	EventStatus_ERROR
	EventStatus_RETRY
)
View Source
const (
	SQL_TMPL_CREATE_DATABASE = `CREATE DATABASE IF NOT EXISTS %s`
	SQL_TMPL_CREATE_TABLE    = `` /* 340-byte string literal not displayed */

)
View Source
const SEQ_MASK_INT32 = 0x7fffffff

Variables

View Source
var (
	SQL_SAVE_EVENT             string
	SQL_DELETE_EVENT           string
	SQL_UPDATE_EVENT_STATUS    string
	SQL_UPDATE_EVENT_FOR_RETRY string

	SQL_RESET_DELAYED_EVENTS string
	SQL_DECLARE_OWNERSHIP    string
	SQL_SELECT_EVENTS        string
)
View Source
var Encoder = newEncoderPool()
View Source
var SchedulerDeps = &SchedulerDepsContainer{}

Functions

func TestOnly_ResetDb

func TestOnly_ResetDb(cfg *MySQLConfig)

For testing ONLY

Types

type CompareFunc

type CompareFunc func(a *HeapItem, b *HeapItem) bool

type Config

type Config struct {
	StatIntervalSec int `json:"stat_interval_sec"`
	SchedulerConfig
	MySQLConfig
}

func DefaultConfig

func DefaultConfig() *Config

type ConsumerInterface

type ConsumerInterface interface {
	Start()
	Stop()
	Events() <-chan []*Event
}

type EncoderPool

type EncoderPool struct {
	// contains filtered or unexported fields
}

func (*EncoderPool) Marshal

func (self *EncoderPool) Marshal(v interface{}) ([]byte, error)

type Event

type Event struct {
	Id          string
	TriggerType string
	TriggerTime time.Time
	Owner       string
	Attempts    int
	Status      EventStatus
	Created     time.Time
	Updated     time.Time
	Completed   time.Time
	Locked      time.Time
	Data        interface{}
	// contains filtered or unexported fields
}

func NewEvent

func NewEvent(triggerType string, triggerTime time.Time, data interface{}) *Event

func TestOnly_SelectEvents

func TestOnly_SelectEvents(cfg *MySQLConfig) []*Event

func (*Event) GetKey

func (self *Event) GetKey() string

func (*Event) Stop

func (self *Event) Stop()

func (*Event) String

func (self *Event) String() string

type EventStatus

type EventStatus uint32

func (EventStatus) String

func (self EventStatus) String() string

type Heap

type Heap struct {
	// contains filtered or unexported fields
}

func NewHeap

func NewHeap(reversed bool) *Heap

func (*Heap) Len

func (self *Heap) Len() int

func (*Heap) Less

func (self *Heap) Less(i, j int) bool

func (*Heap) Pop

func (self *Heap) Pop() interface{}

func (*Heap) Push

func (self *Heap) Push(x interface{})

func (*Heap) Swap

func (self *Heap) Swap(i, j int)

type HeapItem

type HeapItem struct {
	// public
	Value interface{}
	// contains filtered or unexported fields
}

func NewHeapItem

func NewHeapItem(priority int64, value interface{}) *HeapItem

type MySQLConfig

type MySQLConfig struct {
	MySQL6            bool   `json:"mysql6"`
	User              string `json:"username"`
	Pass              string `json:"password"`
	Host              string `json:"host"`
	Port              int    `json:"port"`
	DbName            string `json:"db_name"`
	TableName         string `json:"table_name"`
	MaxOpenConnection int    `json:"max_open_connection"`

	ConsumerName           string `json:"queue_name"`
	ConsumerLockTimeoutSec int    `json:"consumer_lock_timeout_sec"`
	ConsumerTimeWindowSec  int    `json:"consumer_time_window_sec"`
	ConsumerSelectLimit    int    `json:"consumer_select_limit"`
	ConsumerSleepMSec      int    `json:"consumer_sleep_msec"`
}

type MySQLConsumer

type MySQLConsumer struct {
	// contains filtered or unexported fields
}

func NewMySQLConsumer

func NewMySQLConsumer(cfg *Config, store *MySQLStore) *MySQLConsumer

func (*MySQLConsumer) Events

func (self *MySQLConsumer) Events() <-chan []*Event

func (*MySQLConsumer) GetStat

func (self *MySQLConsumer) GetStat(reset bool) map[string]interface{}

func (*MySQLConsumer) Start

func (self *MySQLConsumer) Start()

func (*MySQLConsumer) Stop

func (self *MySQLConsumer) Stop()

type MySQLStore

type MySQLStore struct {
	// contains filtered or unexported fields
}

func NewMySQLStore

func NewMySQLStore(cfg *Config) *MySQLStore

func (*MySQLStore) Cancel

func (self *MySQLStore) Cancel(evId string) error

func (*MySQLStore) Close

func (self *MySQLStore) Close()

func (*MySQLStore) GetDb

func (self *MySQLStore) GetDb() *sql.DB

func (*MySQLStore) GetStat

func (self *MySQLStore) GetStat(reset bool) map[string]interface{}

func (*MySQLStore) Open

func (self *MySQLStore) Open() error

func (*MySQLStore) Save

func (self *MySQLStore) Save(ev *Event) string

func (*MySQLStore) UpdateForRetry

func (self *MySQLStore) UpdateForRetry(ev *Event, retryParam interface{}) error

func (*MySQLStore) UpdateStatus

func (self *MySQLStore) UpdateStatus(evId string, status EventStatus) error

type NoTrigger

type NoTrigger struct{}

default trigger

func (*NoTrigger) Trigger

func (self *NoTrigger) Trigger(ev *Event) *TriggerResult

type PQ

type PQ struct {
	// contains filtered or unexported fields
}

func NewPQ

func NewPQ(reversed bool, maxSize int) *PQ

func (*PQ) Len

func (self *PQ) Len() int

func (*PQ) Lookup

func (self *PQ) Lookup(key string) int

func (*PQ) Pop

func (self *PQ) Pop() PQItem

func (*PQ) Push

func (self *PQ) Push(value PQItem, priority int64) (index int, poped PQItem)

meanings of returned values index >= 0, poped == nil: queue is not full, new item has been pushed index >= 0, poped != nil: queue is full, new item has been pushed, the item with lowest priority has been poped index < 0, poped == nil: not possible for PQ (only if there is a wrong usage/implementation e.g. we set max == 0 ...) index < 0, poped != nil: queue is full, new item has not been pushed because it has the lowest priority

func (*PQ) Remove

func (self *PQ) Remove(key string) PQItem

func (*PQ) Top

func (self *PQ) Top() PQItem

type PQItem

type PQItem interface {
	GetKey() string
}

type Queue

type Queue struct {
	QueueDepsContainer `inject:"inline"`
	// contains filtered or unexported fields
}

func CreateCustomQueue

func CreateCustomQueue(cfg *Config, triggers map[string]TriggerInterface) *Queue

func CreateQueue

func CreateQueue(cfg *Config, triggers map[string]TriggerInterface) (*Queue, error)

func (*Queue) Cancel

func (self *Queue) Cancel(evId string) error

func (*Queue) Create

func (self *Queue) Create(triggerType string, triggerTime time.Time, data interface{}) string

func (*Queue) GetStat

func (self *Queue) GetStat() map[string]interface{}

func (*Queue) Populate

func (self *Queue) Populate(store StoreInterface, consumer ConsumerInterface) (*Queue, error)

func (*Queue) Start

func (self *Queue) Start() error

func (*Queue) Stop

func (self *Queue) Stop()

type QueueDepsContainer

type QueueDepsContainer struct {
	Store    StoreInterface    `inject:""`
	Consumer ConsumerInterface `inject:""`
}

type Scheduler

type Scheduler struct {
	SchedulerDepsContainer `inject:"inline"`
	// contains filtered or unexported fields
}

func (*Scheduler) GetStat

func (self *Scheduler) GetStat(reset bool) map[string]interface{}

type SchedulerConfig

type SchedulerConfig struct {
	MaxScheduledEvents int `json:"max_scheduled_events"`
	MaxRetry           int `json:"max_retry"`
}

type SchedulerDepsContainer

type SchedulerDepsContainer struct {
	Store StoreInterface `inject:""`
}

type Seq32

type Seq32 uint32

func (*Seq32) Get

func (self *Seq32) Get() int32

func (*Seq32) Next

func (self *Seq32) Next() int32

func (*Seq32) Reset

func (self *Seq32) Reset()

type Stat

type Stat struct {
	// contains filtered or unexported fields
}

func NewStat

func NewStat(collectIntervalSec int) *Stat

func (*Stat) Add

func (self *Stat) Add(s StatInterface)

func (*Stat) GetStat

func (self *Stat) GetStat(reset bool) map[string]interface{}

func (*Stat) Start

func (self *Stat) Start()

func (*Stat) Stop

func (self *Stat) Stop()

type StatInterface

type StatInterface interface {
	GetStat(reset bool) map[string]interface{}
}

type StoreInterface

type StoreInterface interface {
	Open() error
	Close()
	Save(ev *Event) string
	Cancel(evId string) error
	UpdateStatus(evId string, status EventStatus) error
	UpdateForRetry(ev *Event, retryParam interface{}) error
}

type TriggerInterface

type TriggerInterface interface {
	Trigger(ev *Event) *TriggerResult
}

type TriggerResult

type TriggerResult struct {
	Status      EventStatus
	TriggerTime time.Time
	Data        interface{}
}

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL