gores

package
v0.0.0-...-cf0f666 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2017 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Debug = 1
	Trace = 2
)

Variables

This section is empty.

Functions

func ExecuteJob

func ExecuteJob(job *Job, tasks *map[string]interface{}) error

ExecuteJob executes the job, given the mapper of corresponding worker

func Launch

func Launch(config *Config, tasks *map[string]interface{}) error

Launch startups the gores Dispatcher and Worker to do background works

func LogLevel

func LogLevel() int

func SetLogLevel

func SetLogLevel(level int)

Types

type Config

type Config struct {
	// Authetication for Redis connection
	RedisURL      string
	RedisPassword string

	// Maximum number of idle connections in the Redis pool
	RedisMaxIdle int

	// Redigo closes connection after it remains idle for this duration
	RedisIdleTimeout int

	// Conn blocks for this duration when trying to pop items from several queues from Redis
	BlpopMaxBlockTime int

	// Maximum number of workers needed for Gores
	MaxWorkers int

	// names of queues to fetch jobs from
	Queues []string

	// Dispatcher returns after it did not have jobs to dispatch for this duration
	DispatcherTimeout int

	// Worker returns after it did not have job to work on after this duration
	WorkerTimeout int
}

Config contains the configuration parameters for running Gores

func InitConfig

func InitConfig(confPath string) (*Config, error)

InitConfig creates new config instance based on the config.json file path

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher represents the dispatcher between Redis server and workers

func NewDispatcher

func NewDispatcher(gores *Gores, config *Config, queues mapset.Set) *Dispatcher

NewDispatcher creates Dispatcher instance

func (*Dispatcher) Start

func (disp *Dispatcher) Start(tasks *map[string]interface{}) error

Start starts dispatching in fanout way

type Gores

type Gores struct {
	// contains filtered or unexported fields
}

Gores represents the main Gores object that stores all configurations and connection with Redis

func NewGores

func NewGores(config *Config) *Gores

NewGores creates a new Gores instance given the pointer to config object

func (*Gores) BlockPop

func (gores *Gores) BlockPop(queues mapset.Set) (string, map[string]interface{}, error)

BlockPop calls "BLPOP" command on Redis message queue "BLPOP" blocks for a configured time until a new job item is found and popped

func (*Gores) CurrentTime

func (gores *Gores) CurrentTime() int64

CurrentTime retruns the current unix timestamp

func (*Gores) Decode

func (gores *Gores) Decode(data []byte) (map[string]interface{}, error)

Decode unmarshals byte array returned from Redis to a map instance

func (*Gores) Encode

func (gores *Gores) Encode(item interface{}) (string, error)

Encode marshalls map instance to its string representation

func (*Gores) Enqueue

func (gores *Gores) Enqueue(item map[string]interface{}) error

Enqueue puts new job item to Redis message queue

func (*Gores) EnqueueAt

func (gores *Gores) EnqueueAt(datetime int64, item interface{}) error

EnqueueAt puts the job to Redis delayed queue for the given timestamp

func (*Gores) Info

func (gores *Gores) Info() (map[string]interface{}, error)

Info returns the information of the Redis queue

func (*Gores) NextDelayedTimestamp

func (gores *Gores) NextDelayedTimestamp() int64

NextDelayedTimestamp returns the next delayed timestamps

func (*Gores) NextItemForTimestamp

func (gores *Gores) NextItemForTimestamp(timestamp int64) map[string]interface{}

NextItemForTimestamp fetches item from delayed queue in Redis that has the given timestamp

func (*Gores) Pop

func (gores *Gores) Pop(queue string) (map[string]interface{}, error)

Pop calls "LPOP" command on Redis message queue "LPOP" does not block even there is no item found

func (*Gores) Queues

func (gores *Gores) Queues() []string

Queues returns a slice of existing queues' names

func (*Gores) Size

func (gores *Gores) Size(queue string) (int64, error)

Size returns the size of the given message queue "gores:queue:%s" on Redis

func (*Gores) SizeOfQueue

func (gores *Gores) SizeOfQueue(key string) int64

SizeOfQueue return the size of any given queue on Redis

func (*Gores) Workers

func (gores *Gores) Workers() []string

Workers retruns a slice of existing worker names

type Job

type Job struct {
	// contains filtered or unexported fields
}

Job represents a job that needs to be executed

func NewJob

func NewJob(queue string, payload map[string]interface{}, gores *Gores) *Job

NewJob initilizes a new job object

func ReserveJob

func ReserveJob(gores *Gores, queues mapset.Set) (*Job, error)

ReserveJob uses BLPOP command to fetch job from Redis

func (*Job) Failed

func (job *Job) Failed()

Failed update the state of the job to be failed

func (*Job) Payload

func (job *Job) Payload() map[string]interface{}

Payload returns the payload map inside the job struct

func (*Job) Processed

func (job *Job) Processed()

Processed updates the state of job to be processed

func (*Job) Retry

func (job *Job) Retry(payload map[string]interface{}) bool

Retry enqueues the failed job back to Redis queue

func (*Job) String

func (job *Job) String() string

String returns the string representation of the job object

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler represents a scheduler that schedule delayed and failed jobs in Gores

func NewScheduler

func NewScheduler(config *Config) *Scheduler

NewScheduler initializes a new shceduler

func (*Scheduler) HandleDelayedItems

func (sche *Scheduler) HandleDelayedItems()

HandleDelayedItems re-enqueue delayed or failed jobs back to redis

func (*Scheduler) NextDelayedTimestamps

func (sche *Scheduler) NextDelayedTimestamps()

NextDelayedTimestamps fetches delayed jobs from Redis and place them into channel

func (*Scheduler) Run

func (sche *Scheduler) Run()

Run startups the scheduler

func (*Scheduler) ScheduleShutdown

func (sche *Scheduler) ScheduleShutdown()

ScheduleShutdown schedules the shutdown of sheduler

type Stat

type Stat struct {
	// contains filtered or unexported fields
}

Stat represents the statistic of a specific job queue in Redis

func NewStat

func NewStat(name string, gores *Gores) *Stat

NewStat initializes a new stat struct

func (*Stat) Clear

func (stat *Stat) Clear() int

Clear deletes the statistic about the queue

func (*Stat) Decr

func (stat *Stat) Decr() int

Decr decrements the count of the given queue key

func (*Stat) Get

func (stat *Stat) Get() int64

Get retrieves the statistic of the given queue

func (*Stat) Incr

func (stat *Stat) Incr() int

Incr increments the count of the given queue key

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker represents a object involved in Gores

func NewWorker

func NewWorker(config *Config, queues mapset.Set, goroutineID int) *Worker

NewWorker initlizes new worker

func (*Worker) All

func (worker *Worker) All(gores *Gores) []*Worker

All retruns a slice of existing workers

func (*Worker) Exists

func (worker *Worker) Exists(workerID string) int64

Exists checks whether the worker with given id exists

func (*Worker) Find

func (worker *Worker) Find(workerID string, gores *Gores) *Worker

Find retruns the worker with given worker id

func (*Worker) Gores

func (worker *Worker) Gores() *Gores

Gores returns the pointer to embedded Gores object

func (*Worker) PruneDeadWorkers

func (worker *Worker) PruneDeadWorkers() error

PruneDeadWorkers delets the worker information

func (*Worker) RegisterWorker

func (worker *Worker) RegisterWorker() error

RegisterWorker saves information about this worker on Redis

func (*Worker) Size

func (worker *Worker) Size() int

Size returns the total number of live workers

func (*Worker) Start

func (worker *Worker) Start(dispatcher *Dispatcher, tasks *map[string]interface{}) error

Start starts the worker and start working on tasks

func (*Worker) String

func (worker *Worker) String() string

String returns the string representation of this worker

func (*Worker) UnregisterWorker

func (worker *Worker) UnregisterWorker() error

UnregisterWorker delets all information related to this worker from Redis

func (*Worker) WorkerPids

func (worker *Worker) WorkerPids() mapset.Set

WorkerPids returns a set of existing workers' ids

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL