consumer

package
v0.0.0-...-beb54cf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2020 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	//Channel size to allocate. It is important for task implementation to send event
	//asynchronously to avoid blocking the execution thread
	TaskEventChannelSize = 1
)

Variables

View Source
var ErrTimeout = errors.New("timeout")

hard coded to avoid dependency on go-beanstalkd library only for one constant

Functions

This section is empty.

Types

type Configuration

type Configuration struct {
	//Waiting time for consumer reserve
	WaitForConsumerReserve time.Duration

	//Waiting time for quit signal timeout
	Heartbeat time.Duration

	ReleasePriority uint32
	ReleaseDelay    time.Duration
	BuryPriority    uint32
}

Configuration stores initialization data for worker server

func NewConfiguration

func NewConfiguration() *Configuration

type ConnectionHandler

type ConnectionHandler interface {
	Reserve(timeout time.Duration) (id uint64, body []byte, err error)
	Release(id uint64, pri uint32, delay time.Duration) error
	Delete(id uint64) error
	Bury(id uint64, pri uint32) error
	Touch(id uint64) error
	Put(body []byte, pri uint32, delay, ttr time.Duration) (id uint64, err error)
	ListTubes() ([]string, error)

	Close() error
}

type Consumer

type Consumer struct {
	*Configuration
	// contains filtered or unexported fields
}

Consumer stores configuration for consumer activation

func NewConsumer

func NewConsumer(config *Configuration, connectorHandler connector.Handler,
	connectionHandler ConnectionHandler) *Consumer

NewConsumer creates consumer instance with given Handler

func (*Consumer) Bury

func (con *Consumer) Bury(id uint64, pri uint32) error

func (*Consumer) Close

func (con *Consumer) Close() error

func (*Consumer) ConnectionHandler

func (con *Consumer) ConnectionHandler() ConnectionHandler

func (*Consumer) Delete

func (con *Consumer) Delete(id uint64) error

func (*Consumer) Init

func (con *Consumer) Init() error

func (*Consumer) ListTubes

func (con *Consumer) ListTubes() ([]string, error)

func (*Consumer) OnEndConsume

func (con *Consumer) OnEndConsume()

func (*Consumer) OnHeartbeat

func (con *Consumer) OnHeartbeat()

func (*Consumer) OnReserveTimeout

func (con *Consumer) OnReserveTimeout()

func (*Consumer) OnStartConsume

func (con *Consumer) OnStartConsume()

func (*Consumer) Release

func (con *Consumer) Release(id uint64, pri uint32, delay time.Duration) error

func (*Consumer) Reserve

func (con *Consumer) Reserve(timeout time.Duration) (id uint64, body []byte, err error)

func (*Consumer) SetEventHandler

func (con *Consumer) SetEventHandler(handler EventHandler)

func (*Consumer) SetTaskPayloadHandler

func (con *Consumer) SetTaskPayloadHandler(handler common.TaskPayloadHandler)

func (*Consumer) StartConsumer

func (con *Consumer) StartConsumer() error

StartConsumer starts consumer thread

func (*Consumer) StopConsumer

func (con *Consumer) StopConsumer()

StopConsumer stops consumer thread

func (*Consumer) TaskEventChannel

func (con *Consumer) TaskEventChannel() chan<- *common.TaskProcessEvent

func (*Consumer) Touch

func (con *Consumer) Touch(id uint64) error

type EventHandler

type EventHandler interface {
	OnStartConsume()
	OnEndConsume()

	OnReserveTimeout()
	OnHeartbeat()
}

type Handler

type Handler interface {
	Init() error
	Close() error

	ConnectionHandler() ConnectionHandler

	TaskEventChannel() chan<- *common.TaskProcessEvent

	SetEventHandler(handler EventHandler)
	SetTaskPayloadHandler(handler common.TaskPayloadHandler)

	StartConsumer() error
	StopConsumer()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL