bokchoy: github.com/thoas/bokchoy Index | Files | Directories

package bokchoy

import "github.com/thoas/bokchoy"

Index

Package Files

bokchoy.go broker.go broker_redis.go config.go constants.go consumer.go context.go errors.go handler.go options.go queue.go request.go serializer.go serializer_json.go server.go task.go terminal.go tracer.go utils.go

Constants

const (
    Version = "v0.2.0"
)

Variables

var (
    // ErrorCtxKey is the context.Context key to store
    // the recovered error from the middleware
    ErrorCtxKey = &contextKey{"Error"}

    // TaskCtxKey is the context.Context key to store
    // the task from the middleware
    TaskCtxKey = &contextKey{"Task"}

    // AfterRequestCtxKey is the context.Context to store
    // functions to execute after the request
    AfterRequestCtxKey = &contextKey{"AfterRequest"}
)
var (
    // ErrAttributeError is returned when an attribute is not found.
    ErrAttributeError = fmt.Errorf("Attribute error")

    // ErrTaskCanceled is returned when a task is canceled.
    ErrTaskCanceled = fmt.Errorf("Task canceled")

    // ErrTaskNotFound is returned when a task is not found.
    ErrTaskNotFound = fmt.Errorf("Task not found")

    // ErrNoQueueToRun is returned when no queue has been found to run.
    ErrNoQueueToRun = fmt.Errorf("No queue to run")
)
var (
    // Normal colors
    ColorBlack   = Color{'\033', '[', '3', '0', 'm'}
    ColorRed     = Color{'\033', '[', '3', '1', 'm'}
    ColorGreen   = Color{'\033', '[', '3', '2', 'm'}
    ColorYellow  = Color{'\033', '[', '3', '3', 'm'}
    ColorBlue    = Color{'\033', '[', '3', '4', 'm'}
    ColorMagenta = Color{'\033', '[', '3', '5', 'm'}
    ColorCyan    = Color{'\033', '[', '3', '6', 'm'}
    ColorWhite   = Color{'\033', '[', '3', '7', 'm'}

    // Bright colors
    ColorBrightBlack   = Color{'\033', '[', '3', '0', ';', '1', 'm'}
    ColorBrightRed     = Color{'\033', '[', '3', '1', ';', '1', 'm'}
    ColorBrightGreen   = Color{'\033', '[', '3', '2', ';', '1', 'm'}
    ColorBrightYellow  = Color{'\033', '[', '3', '3', ';', '1', 'm'}
    ColorBrightBlue    = Color{'\033', '[', '3', '4', ';', '1', 'm'}
    ColorBrightMagenta = Color{'\033', '[', '3', '5', ';', '1', 'm'}
    ColorBrightCyan    = Color{'\033', '[', '3', '6', ';', '1', 'm'}
    ColorBrightWhite   = Color{'\033', '[', '3', '7', ';', '1', 'm'}

    ColorReset = Color{'\033', '[', '0', 'm'}
)
var DefaultTracer = NewLoggerTracer(logging.DefaultLogger)

DefaultTracer is the default tracer.

func GetContextError Uses

func GetContextError(ctx context.Context) error

GetContextError returns the in-context recovered error for a request.

func WithContextAfterRequestFunc Uses

func WithContextAfterRequestFunc(ctx context.Context, f AfterRequestFunc) context.Context

WithContextAfterRequestFunc registers a new function to be executed after the request

func WithContextError Uses

func WithContextError(ctx context.Context, err error) context.Context

WithContextError sets the in-context error for a request.

func WithContextTask Uses

func WithContextTask(ctx context.Context, task *Task) context.Context

WithContextTask sets the in-context task for a request.

type AfterRequestFunc Uses

type AfterRequestFunc func()

AfterRequestFunc is a function which will execute after the request

func GetContextAfterRequestFuncs Uses

func GetContextAfterRequestFuncs(ctx context.Context) []AfterRequestFunc

GetContextAfterRequestFuncs returns the registered functions which will execute after the request

type Bokchoy Uses

type Bokchoy struct {
    Serializer Serializer
    Logger     logging.Logger
    Tracer     Tracer
    // contains filtered or unexported fields
}

Bokchoy is the main object which stores all configuration, queues and broker.

func New Uses

func New(ctx context.Context, cfg Config, options ...Option) (*Bokchoy, error)

New initializes a new Bokchoy instance.

func (*Bokchoy) Empty Uses

func (b *Bokchoy) Empty(ctx context.Context) error

Empty empties initialized queues.

func (*Bokchoy) Flush Uses

func (b *Bokchoy) Flush() error

Flush flushes data of the entire system.

func (*Bokchoy) Handle Uses

func (b *Bokchoy) Handle(queueName string, sub Handler, options ...Option)

Handle registers a new handler to consume tasks for a queue.

func (*Bokchoy) HandleFunc Uses

func (b *Bokchoy) HandleFunc(queueName string, f HandlerFunc, options ...Option)

HandleFunc registers a new handler function to consume tasks for a queue.

func (*Bokchoy) Publish Uses

func (b *Bokchoy) Publish(ctx context.Context, queueName string, payload interface{}, options ...Option) (*Task, error)

Publish publishes a new payload to a queue.

func (*Bokchoy) Queue Uses

func (b *Bokchoy) Queue(name string) *Queue

Queue gets or creates a new queue.

func (*Bokchoy) QueueNames Uses

func (b *Bokchoy) QueueNames() []string

QueueNames returns the managed queue names.

func (*Bokchoy) Run Uses

func (b *Bokchoy) Run(ctx context.Context, options ...Option) error

Run runs the system and block the current goroutine.

func (*Bokchoy) ServerNames Uses

func (b *Bokchoy) ServerNames() []string

ServerNames returns the managed server names.

func (*Bokchoy) Stop Uses

func (b *Bokchoy) Stop(ctx context.Context)

Stop stops all queues and consumers.

func (*Bokchoy) Use Uses

func (b *Bokchoy) Use(sub ...func(Handler) Handler) *Bokchoy

Use append a new middleware to the system.

type Broker Uses

type Broker interface {
    // Initialize initializes the broker.
    Initialize(context.Context) error

    // Ping pings the broker to ensure it's well connected.
    Ping() error

    // Get returns raw data stored in broker.
    Get(string) (map[string]interface{}, error)

    // Empty empties a queue.
    Empty(string) error

    // Flush flushes the entire broker.
    Flush() error

    // Count returns number of items from a queue name.
    Count(string) (int, error)

    // Save synchronizes the stored item.
    Set(string, map[string]interface{}, time.Duration) error

    // Publish publishes raw data.
    Publish(string, string, string, map[string]interface{}, time.Time) error

    // Consume returns an array of raw data.
    Consume(string, string, time.Time) ([]map[string]interface{}, error)
}

Broker is the common interface to define a Broker.

type BrokerConfig Uses

type BrokerConfig struct {
    Type  string
    Redis RedisConfig
}

BrokerConfig contains the broker configuration.

type Color Uses

type Color []byte

Color is a terminal color representation.

type ColorWriter Uses

type ColorWriter struct {
    *bytes.Buffer
    // contains filtered or unexported fields
}

ColorWriter is a bytes buffer with color.

func NewColorWriter Uses

func NewColorWriter(color Color) *ColorWriter

NewColorWriter initializes a new ColorWriter.

func (ColorWriter) WithColor Uses

func (c ColorWriter) WithColor(color Color) *ColorWriter

WithColor returns a new ColorWriter with a new color.

func (*ColorWriter) Write Uses

func (c *ColorWriter) Write(s string, args ...interface{})

Write writes an output to stdout. nolint: errcheck

type Config Uses

type Config struct {
    Queues     []QueueConfig
    Broker     BrokerConfig
    Serializer SerializerConfig
}

Config contains the main configuration to initialize Bokchoy.

type Handler Uses

type Handler interface {
    Handle(*Request) error
}

Handler is an interface to implement a task handler.

type HandlerFunc Uses

type HandlerFunc func(*Request) error

HandlerFunc is a handler to handle incoming tasks.

func (HandlerFunc) Handle Uses

func (s HandlerFunc) Handle(r *Request) error

Handle consumes the request.

type JSONSerializer Uses

type JSONSerializer struct {
}

func (JSONSerializer) Dumps Uses

func (s JSONSerializer) Dumps(v interface{}) ([]byte, error)

func (JSONSerializer) Loads Uses

func (s JSONSerializer) Loads(data []byte, v interface{}) error

func (JSONSerializer) String Uses

func (s JSONSerializer) String() string

type Option Uses

type Option func(opts *Options)

Option is an option unit.

func WithBroker Uses

func WithBroker(broker Broker) Option

WithBroker registers new broker.

func WithConcurrency Uses

func WithConcurrency(concurrency int) Option

WithConcurrency defines the number of concurrent consumers.

func WithCountdown Uses

func WithCountdown(countdown time.Duration) Option

WithCountdown defines the countdown to launch a delayed task.

func WithDisableOutput Uses

func WithDisableOutput(disableOutput bool) Option

WithDisableOutput defines if the output (logo, queues information) should be disabled.

func WithInitialize Uses

func WithInitialize(initialize bool) Option

WithInitialize defines if the broker needs to be initialized.

func WithLogger Uses

func WithLogger(logger logging.Logger) Option

WithLogger defines the Logger.

func WithMaxRetries Uses

func WithMaxRetries(maxRetries int) Option

WithMaxRetries defines the number of maximum retries for a failed task.

func WithQueues Uses

func WithQueues(queues []string) Option

WithQueues allows to override queues to run.

func WithRetryIntervals Uses

func WithRetryIntervals(retryIntervals []time.Duration) Option

WithRetryIntervals defines the retry intervals for a failed task.

func WithSerializer Uses

func WithSerializer(serializer Serializer) Option

WithSerializer defines the Serializer.

func WithServers Uses

func WithServers(servers []Server) Option

WithServers registers new servers to be run.

func WithTTL Uses

func WithTTL(ttl time.Duration) Option

WithTTL defines the duration to keep the task in the broker.

func WithTimeout Uses

func WithTimeout(timeout time.Duration) Option

WithTimeout defines the timeout used to execute a task.

func WithTracer Uses

func WithTracer(tracer Tracer) Option

WithTracer defines the Tracer.

type Options Uses

type Options struct {
    Tracer         Tracer
    Logger         logging.Logger
    Concurrency    int
    MaxRetries     int
    TTL            time.Duration
    Countdown      *time.Duration
    Timeout        time.Duration
    RetryIntervals []time.Duration
    Serializer     Serializer
    Initialize     bool
    Queues         []string
    DisableOutput  bool
    Servers        []Server
    Broker         Broker
}

Options is the bokchoy options.

func (Options) RetryIntervalsDisplay Uses

func (o Options) RetryIntervalsDisplay() string

RetryIntervalsDisplay returns a string representation of the retry intervals.

type Queue Uses

type Queue struct {
    // contains filtered or unexported fields
}

Queue contains consumers to enqueue.

func (*Queue) Cancel Uses

func (q *Queue) Cancel(ctx context.Context, taskID string) (*Task, error)

Cancel cancels a task using its ID.

func (*Queue) Consume Uses

func (q *Queue) Consume(ctx context.Context) ([]*Task, error)

Consume returns an array of tasks.

func (*Queue) ConsumeDelayed Uses

func (q *Queue) ConsumeDelayed(ctx context.Context) ([]*Task, error)

ConsumeDelayed returns an array of delayed tasks.

func (*Queue) Consumer Uses

func (q *Queue) Consumer() *consumer

Consumer returns a random consumer.

func (*Queue) Count Uses

func (q *Queue) Count(ctx context.Context) (QueueStats, error)

Count returns statistics from queue: * direct: number of waiting tasks * delayed: number of waiting delayed tasks * total: number of total tasks

func (Queue) DelayName Uses

func (q Queue) DelayName() string

DelayName returns the delayed queue name.

func (*Queue) Empty Uses

func (q *Queue) Empty(ctx context.Context) error

Empty empties queue.

func (*Queue) Get Uses

func (q *Queue) Get(ctx context.Context, taskID string) (*Task, error)

Get returns a task instance from the broker with its id.

func (*Queue) Handle Uses

func (q *Queue) Handle(sub Handler, options ...Option) *Queue

Handle registers a new handler to consume tasks.

func (*Queue) HandleFunc Uses

func (q *Queue) HandleFunc(f HandlerFunc, options ...Option) *Queue

HandleFunc registers a new handler function to consume tasks.

func (*Queue) HandleRequest Uses

func (q *Queue) HandleRequest(ctx context.Context, r *Request) error

HandleRequest handles a request synchronously with a consumer.

func (Queue) MarshalLogObject Uses

func (q Queue) MarshalLogObject(enc logging.ObjectEncoder) error

MarshalLogObject returns the log representation for the queue.

func (Queue) Name Uses

func (q Queue) Name() string

Name returns the queue name.

func (*Queue) NewTask Uses

func (q *Queue) NewTask(payload interface{}, options ...Option) *Task

NewTask returns a new task instance from payload and options.

func (*Queue) OnComplete Uses

func (q *Queue) OnComplete(sub Handler) *Queue

OnComplete registers a new handler to be executed when a task is completed.

func (*Queue) OnCompleteFunc Uses

func (q *Queue) OnCompleteFunc(f HandlerFunc) *Queue

OnCompleteFunc registers a new handler function to be executed when a task is completed.

func (*Queue) OnFailure Uses

func (q *Queue) OnFailure(sub Handler) *Queue

OnFailure registers a new handler to be executed when a task is failed.

func (*Queue) OnFailureFunc Uses

func (q *Queue) OnFailureFunc(f HandlerFunc) *Queue

OnFailureFunc registers a new handler function to be executed when a task is failed.

func (*Queue) OnStart Uses

func (q *Queue) OnStart(sub Handler) *Queue

OnStart registers a new handler to be executed when a task is started.

func (*Queue) OnStartFunc Uses

func (q *Queue) OnStartFunc(f HandlerFunc) *Queue

OnStartFunc registers a new handler function to be executed when a task is started.

func (*Queue) OnSuccess Uses

func (q *Queue) OnSuccess(sub Handler) *Queue

OnSuccess registers a new handler to be executed when a task is succeeded.

func (*Queue) OnSuccessFunc Uses

func (q *Queue) OnSuccessFunc(f HandlerFunc) *Queue

OnSuccessFunc registers a new handler function to be executed when a task is succeeded.

func (*Queue) Publish Uses

func (q *Queue) Publish(ctx context.Context, payload interface{}, options ...Option) (*Task, error)

Publish publishes a new payload to the queue.

func (*Queue) PublishTask Uses

func (q *Queue) PublishTask(ctx context.Context, task *Task) error

PublishTask publishes a new task to the queue.

func (*Queue) Save Uses

func (q *Queue) Save(ctx context.Context, task *Task) error

Save saves a task to the queue.

func (Queue) TaskKey Uses

func (q Queue) TaskKey(taskID string) string

TaskKey returns the task key prefixed by the queue name.

func (*Queue) Use Uses

func (q *Queue) Use(sub ...func(Handler) Handler) *Queue

Use appends a new handler middleware to the queue.

type QueueConfig Uses

type QueueConfig struct {
    Name string
}

QueueConfig contains queue information that should be initialized.

type QueueStats Uses

type QueueStats struct {
    Total   int
    Direct  int
    Delayed int
}

QueueStats is the statistics returned by a Queue.

type RedisBroker Uses

type RedisBroker struct {
    ClientType string
    Client     redisClient
    Prefix     string
    Logger     logging.Logger
    // contains filtered or unexported fields
}

RedisBroker is the redis broker.

func (*RedisBroker) Consume Uses

func (p *RedisBroker) Consume(name string, taskPrefix string, eta time.Time) ([]map[string]interface{}, error)

Consume returns an array of raw data.

func (*RedisBroker) Count Uses

func (p *RedisBroker) Count(queueName string) (int, error)

Count returns number of items from a queue name.

func (*RedisBroker) Empty Uses

func (p *RedisBroker) Empty(name string) error

Empty removes the redis key for a queue.

func (*RedisBroker) Flush Uses

func (p *RedisBroker) Flush() error

Flush flushes the entire redis database.

func (*RedisBroker) Get Uses

func (p *RedisBroker) Get(taskKey string) (map[string]interface{}, error)

Get returns stored raw data from task key.

func (*RedisBroker) Initialize Uses

func (p *RedisBroker) Initialize(ctx context.Context) error

Initialize initializes the redis broker.

func (RedisBroker) Ping Uses

func (p RedisBroker) Ping() error

Ping pings the redis broker to ensure it's well connected.

func (*RedisBroker) Publish Uses

func (p *RedisBroker) Publish(queueName string, taskPrefix string,
    taskID string, data map[string]interface{}, eta time.Time) error

Publish publishes raw data. it uses a hash to store the task itself pushes the task id to the list or a zset if the task is delayed.

func (*RedisBroker) Set Uses

func (p *RedisBroker) Set(taskKey string, data map[string]interface{}, expiration time.Duration) error

Save synchronizes the stored item in redis.

func (RedisBroker) String Uses

func (p RedisBroker) String() string

type RedisClientConfig Uses

type RedisClientConfig redis.Options

RedisClientConfig contains the redis client configuration.

type RedisClusterConfig Uses

type RedisClusterConfig redis.ClusterOptions

RedisClusterConfig contains the redis cluster configuration.

type RedisConfig Uses

type RedisConfig struct {
    Type     string
    Prefix   string
    Client   RedisClientConfig
    Cluster  RedisClusterConfig
    Sentinel RedisSentinelConfig
}

RedisConfig contains all redis configuration: client, sentinel (failover), cluster.

type RedisSentinelConfig Uses

type RedisSentinelConfig redis.FailoverOptions

RedisSentinelConfig contains the redis sentinel configuration.

type Request Uses

type Request struct {
    Task *Task
    // contains filtered or unexported fields
}

Request is the bokchoy Request which will be handled by a subscriber handler.

func (*Request) Context Uses

func (r *Request) Context() context.Context

Context returns the context attached to the Request.

func (Request) String Uses

func (r Request) String() string

String returns a string representation of a Request

func (*Request) WithContext Uses

func (r *Request) WithContext(ctx context.Context) *Request

WithContext creates a new Request with a context

type Serializer Uses

type Serializer interface {
    Dumps(interface{}) ([]byte, error)
    Loads([]byte, interface{}) error
}

Serializer defines an interface to implement a serializer.

type SerializerConfig Uses

type SerializerConfig struct {
    Type string
}

SerializerConfig contains a serializer configuration to store tasks.

type Server Uses

type Server interface {
    Start(context.Context) error
    Stop(context.Context)
}

type Task Uses

type Task struct {
    ID             string
    Name           string
    PublishedAt    time.Time
    StartedAt      time.Time
    ProcessedAt    time.Time
    Status         int
    OldStatus      int
    MaxRetries     int
    Payload        interface{}
    Result         interface{}
    Error          interface{}
    ExecTime       float64
    TTL            time.Duration
    Timeout        time.Duration
    ETA            time.Time
    RetryIntervals []time.Duration
}

Task is the model stored in a Queue.

func GetContextTask Uses

func GetContextTask(ctx context.Context) *Task

GetContextTask returns the in-context task for a request.

func NewTask Uses

func NewTask(name string, payload interface{}, options ...Option) *Task

NewTask initializes a new Task.

func TaskFromPayload Uses

func TaskFromPayload(data map[string]interface{}, serializer Serializer) (*Task, error)

TaskFromPayload returns a Task instance from raw data.

func (Task) ETADisplay Uses

func (t Task) ETADisplay() string

ETADisplay returns the string representation of the ETA.

func (*Task) Finished Uses

func (t *Task) Finished() bool

Finished returns if a task is finished or not.

func (*Task) IsStatusCanceled Uses

func (t *Task) IsStatusCanceled() bool

IsStatusCanceled returns if the task status is canceled.

func (*Task) IsStatusFailed Uses

func (t *Task) IsStatusFailed() bool

IsStatusFailed returns if the task status is failed.

func (*Task) IsStatusProcessing Uses

func (t *Task) IsStatusProcessing() bool

IsStatusProcessing returns if the task status is processing.

func (*Task) IsStatusSucceeded Uses

func (t *Task) IsStatusSucceeded() bool

IsStatusSucceeded returns if the task status is succeeded.

func (*Task) IsStatusWaiting Uses

func (t *Task) IsStatusWaiting() bool

IsStatusWaiting returns if the task status is waiting.

func (Task) Key Uses

func (t Task) Key() string

Key returns the task key.

func (*Task) MarkAsCanceled Uses

func (t *Task) MarkAsCanceled()

MarkAsCanceled marks a task as canceled.

func (*Task) MarkAsFailed Uses

func (t *Task) MarkAsFailed(err error)

MarkAsFailed marks a task as failed.

func (*Task) MarkAsProcessing Uses

func (t *Task) MarkAsProcessing()

MarkAsProcessing marks a task as processing.

func (*Task) MarkAsSucceeded Uses

func (t *Task) MarkAsSucceeded()

MarkAsSucceeded marks a task as succeeded.

func (Task) MarshalLogObject Uses

func (t Task) MarshalLogObject(enc logging.ObjectEncoder) error

MarshalLogObject returns the log representation for the task.

func (Task) RetryETA Uses

func (t Task) RetryETA() time.Time

RetryETA returns the next ETA.

func (Task) RetryIntervalsDisplay Uses

func (t Task) RetryIntervalsDisplay() string

RetryIntervalsDisplay returns the string representation of the retry intervals.

func (Task) Serialize Uses

func (t Task) Serialize(serializer Serializer) (map[string]interface{}, error)

Serialize serializes a Task to raw data.

func (Task) StatusDisplay Uses

func (t Task) StatusDisplay() string

StatusDisplay returns the status in human representation.

func (Task) String Uses

func (t Task) String() string

String returns the string representation of Task.

type Tracer Uses

type Tracer interface {
    Log(context.Context, string, error)
}

Tracer is a component used to trace errors.

func NewLoggerTracer Uses

func NewLoggerTracer(logger logging.Logger) Tracer

NewLoggerTracer initializes a new Tracer instance.

Directories

PathSynopsis
contrib/rpc
contrib/rpc/proto
contrib/sentry
examples
examples/crawler
examples/crawler/handler
examples/crawler/parser
examples/crawler/task
examples/custom-broker
examples/producer
examples/rpc
examples/sentry
examples/worker
logging
middleware

Package bokchoy imports 17 packages (graph) and is imported by 11 packages. Updated 2019-11-12. Refresh now. Tools for package owners.