Documentation ¶
Index ¶
Constants ¶
View Source
const ( USE CommandName = "use" PUT = "put" WATCH = "watch" IGNORE = "ignore" RESERVE = "reserve" RESERVE_WITH_TIMEOUT = "reserve-with-timeout" DELETE = "delete" RELEASE = "release" BURY = "bury" TOUCH = "touch" )
View Source
const ( BAD_FORMAT string = "BAD_FORMAT" UNKNOWN_COMMAND = "UNKNOWN_COMMAND" NOT_IGNORED = "NOT_IGNORED" NOT_FOUND = "NOT_FOUND" EXPECTED_CRLF = "EXPECTED_CRLF" JOB_TOO_BIG = "JOB_TOO_BIG" TIMED_OUT = "TIMED_OUT" )
View Source
const MAX_JOBS_PER_ITERATION int = 20
View Source
const MAX_JOB_SIZE int64 = 65536 // 2^16
View Source
const NANO = 1000000000
View Source
const QUEUE_FREQUENCY time.Duration = 20 * time.Millisecond // process every 20ms. TODO check why some clients get stuck when this is lower
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AwaitingClient ¶
type AwaitingClient struct { SendChannel chan Command Request Command QueuedAt int64 Timeout int64 // contains filtered or unexported fields }
AwaitingClient stores an awaiting client send channel for a tube
func NewAwaitingClient ¶
func NewAwaitingClient(request Command, sendChannel chan Command) *AwaitingClient
func (*AwaitingClient) Dequeued ¶
func (w *AwaitingClient) Dequeued()
func (*AwaitingClient) Enqueued ¶
func (w *AwaitingClient) Enqueued()
func (*AwaitingClient) Id ¶
func (w *AwaitingClient) Id() string
func (*AwaitingClient) Key ¶
func (w *AwaitingClient) Key() int64
func (*AwaitingClient) Timeleft ¶
func (w *AwaitingClient) Timeleft() int64
func (*AwaitingClient) Timestamp ¶
func (w *AwaitingClient) Timestamp() int64
type Command ¶
type Command struct { Name CommandName RawCommand string Params map[string]string WaitingForMore bool MoreToSend bool Err error Job Job }
func NewCommand ¶
func NewCommand() Command
func NewDefaultCommand ¶
func NewDefaultCommand() Command
type CommandName ¶
type CommandName string
type CommandParseOptions ¶
type CommandParseOptions struct { ExpectedLength int WaitingForMore bool Params []string Name CommandName }
type CommandReplyOptions ¶
type Job ¶
type Job struct { Pri int64 Delay int64 // time set as delay in seconds StartedDelayAt int64 // timestamp of when it was set to delayed StartedTTRAt int64 // timestamp of when it was reserved TTR int64 // time set as ttr in seconds Bytes int64 Data string // contains filtered or unexported fields }
func (*Job) SetState ¶
*
put with delay release with delay ----------------> [DELAYED] <------------. | | | (time passes) | | | put v reserve | delete -----------------> [READY] ---------> [RESERVED] --------> *poof* ^ ^ | | | \ release | | | `-------------' | | | | kick | | | | bury | [BURIED] <---------------' | | delete `--------> *poof*
type PriorityQueue ¶
type PriorityQueue interface { Init() // queue item Enqueue(item PriorityQueueItem) // get the highest priority item without removing Peek() (item PriorityQueueItem) // remove item from begining Dequeue() (item PriorityQueueItem) // find an item by id in the queue Find(id string) (item PriorityQueueItem) // delete an item and return it by id Delete(id string) PriorityQueueItem // size Size() int }
PriorityQueue is the interface that all backends should implement, See backend/min_heap.go for an example
type PriorityQueueItem ¶
type PriorityQueueItem interface { Key() int64 Id() string Timestamp() int64 Enqueued() Dequeued() }
PriorityQueueItem is a single item in the PriorityQueue. This interface helps in isolating details of backend items
type Tube ¶
type Tube struct { Name string Ready PriorityQueue Reserved PriorityQueue Delayed PriorityQueue Buried PriorityQueue AwaitingClients PriorityQueue AwaitingTimedClients map[string]*AwaitingClient }
Tube represents a single tube(queue) in the beanstalkg server
func (*Tube) Process ¶
func (tube *Tube) Process()
Process runs all the necessary operations for upkeep of the tube TODO unit test
func (*Tube) ProcessTimedClients ¶
func (tube *Tube) ProcessTimedClients()
ProcessTimedClients reserves jobs for or times out the clients with a timeout
Click to show internal directories.
Click to hide internal directories.