Documentation ¶
Index ¶
- Constants
- func ExecuteJob(job *Job, tasks *map[string]interface{}) error
- func Launch(config *Config, tasks *map[string]interface{}) error
- func LogLevel() int
- func SetLogLevel(level int)
- type Config
- type Dispatcher
- type Gores
- func (gores *Gores) BlockPop(queues mapset.Set) (string, map[string]interface{}, error)
- func (gores *Gores) CurrentTime() int64
- func (gores *Gores) Decode(data []byte) (map[string]interface{}, error)
- func (gores *Gores) Encode(item interface{}) (string, error)
- func (gores *Gores) Enqueue(item map[string]interface{}) error
- func (gores *Gores) EnqueueAt(datetime int64, item interface{}) error
- func (gores *Gores) Info() (map[string]interface{}, error)
- func (gores *Gores) NextDelayedTimestamp() int64
- func (gores *Gores) NextItemForTimestamp(timestamp int64) map[string]interface{}
- func (gores *Gores) Pop(queue string) (map[string]interface{}, error)
- func (gores *Gores) Queues() []string
- func (gores *Gores) Size(queue string) (int64, error)
- func (gores *Gores) SizeOfQueue(key string) int64
- func (gores *Gores) Workers() []string
- type Job
- type Scheduler
- type Stat
- type Worker
- func (worker *Worker) All(gores *Gores) []*Worker
- func (worker *Worker) Exists(workerID string) int64
- func (worker *Worker) Find(workerID string, gores *Gores) *Worker
- func (worker *Worker) Gores() *Gores
- func (worker *Worker) PruneDeadWorkers() error
- func (worker *Worker) RegisterWorker() error
- func (worker *Worker) Size() int
- func (worker *Worker) Start(dispatcher *Dispatcher, tasks *map[string]interface{}) error
- func (worker *Worker) String() string
- func (worker *Worker) UnregisterWorker() error
- func (worker *Worker) WorkerPids() mapset.Set
Constants ¶
const ( Debug = 1 Trace = 2 )
Variables ¶
This section is empty.
Functions ¶
func ExecuteJob ¶
ExecuteJob executes the job, given the mapper of corresponding worker
func SetLogLevel ¶
func SetLogLevel(level int)
Types ¶
type Config ¶
type Config struct { // Authetication for Redis connection RedisURL string RedisPassword string // Maximum number of idle connections in the Redis pool RedisMaxIdle int // Redigo closes connection after it remains idle for this duration RedisIdleTimeout int // Conn blocks for this duration when trying to pop items from several queues from Redis BlpopMaxBlockTime int // Maximum number of workers needed for Gores MaxWorkers int // names of queues to fetch jobs from Queues []string // Dispatcher returns after it did not have jobs to dispatch for this duration DispatcherTimeout int // Worker returns after it did not have job to work on after this duration WorkerTimeout int }
Config contains the configuration parameters for running Gores
func InitConfig ¶
InitConfig creates new config instance based on the config.json file path
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher represents the dispatcher between Redis server and workers
func NewDispatcher ¶
func NewDispatcher(gores *Gores, config *Config, queues mapset.Set) *Dispatcher
NewDispatcher creates Dispatcher instance
func (*Dispatcher) Start ¶
func (disp *Dispatcher) Start(tasks *map[string]interface{}) error
Start starts dispatching in fanout way
type Gores ¶
type Gores struct {
// contains filtered or unexported fields
}
Gores represents the main Gores object that stores all configurations and connection with Redis
func (*Gores) BlockPop ¶
BlockPop calls "BLPOP" command on Redis message queue "BLPOP" blocks for a configured time until a new job item is found and popped
func (*Gores) CurrentTime ¶
CurrentTime retruns the current unix timestamp
func (*Gores) NextDelayedTimestamp ¶
NextDelayedTimestamp returns the next delayed timestamps
func (*Gores) NextItemForTimestamp ¶
NextItemForTimestamp fetches item from delayed queue in Redis that has the given timestamp
func (*Gores) Pop ¶
Pop calls "LPOP" command on Redis message queue "LPOP" does not block even there is no item found
func (*Gores) SizeOfQueue ¶
SizeOfQueue return the size of any given queue on Redis
type Job ¶
type Job struct {
// contains filtered or unexported fields
}
Job represents a job that needs to be executed
func ReserveJob ¶
ReserveJob uses BLPOP command to fetch job from Redis
func (*Job) Processed ¶
func (job *Job) Processed()
Processed updates the state of job to be processed
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler represents a scheduler that schedule delayed and failed jobs in Gores
func NewScheduler ¶
NewScheduler initializes a new shceduler
func (*Scheduler) HandleDelayedItems ¶
func (sche *Scheduler) HandleDelayedItems()
HandleDelayedItems re-enqueue delayed or failed jobs back to redis
func (*Scheduler) NextDelayedTimestamps ¶
func (sche *Scheduler) NextDelayedTimestamps()
NextDelayedTimestamps fetches delayed jobs from Redis and place them into channel
func (*Scheduler) ScheduleShutdown ¶
func (sche *Scheduler) ScheduleShutdown()
ScheduleShutdown schedules the shutdown of sheduler
type Stat ¶
type Stat struct {
// contains filtered or unexported fields
}
Stat represents the statistic of a specific job queue in Redis
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker represents a object involved in Gores
func (*Worker) PruneDeadWorkers ¶
PruneDeadWorkers delets the worker information
func (*Worker) RegisterWorker ¶
RegisterWorker saves information about this worker on Redis
func (*Worker) Start ¶
func (worker *Worker) Start(dispatcher *Dispatcher, tasks *map[string]interface{}) error
Start starts the worker and start working on tasks
func (*Worker) UnregisterWorker ¶
UnregisterWorker delets all information related to this worker from Redis
func (*Worker) WorkerPids ¶
func (worker *Worker) WorkerPids() mapset.Set
WorkerPids returns a set of existing workers' ids