architecture

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2017 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const (
	USE                  CommandName = "use"
	PUT                              = "put"
	WATCH                            = "watch"
	IGNORE                           = "ignore"
	RESERVE                          = "reserve"
	RESERVE_WITH_TIMEOUT             = "reserve-with-timeout"
	DELETE                           = "delete"
	RELEASE                          = "release"
	BURY                             = "bury"
	TOUCH                            = "touch"
)
View Source
const (
	BAD_FORMAT      string = "BAD_FORMAT"
	UNKNOWN_COMMAND        = "UNKNOWN_COMMAND"
	NOT_IGNORED            = "NOT_IGNORED"
	NOT_FOUND              = "NOT_FOUND"
	EXPECTED_CRLF          = "EXPECTED_CRLF"
	JOB_TOO_BIG            = "JOB_TOO_BIG"
	TIMED_OUT              = "TIMED_OUT"
)
View Source
const MAX_JOBS_PER_ITERATION int = 20
View Source
const MAX_JOB_SIZE int64 = 65536 // 2^16
View Source
const NANO = 1000000000
View Source
const QUEUE_FREQUENCY time.Duration = 20 * time.Millisecond // process every 20ms. TODO check why some clients get stuck when this is lower

Variables

This section is empty.

Functions

This section is empty.

Types

type AwaitingClient

type AwaitingClient struct {
	SendChannel chan Command
	Request     Command
	QueuedAt    int64
	Timeout     int64
	// contains filtered or unexported fields
}

AwaitingClient stores an awaiting client send channel for a tube

func NewAwaitingClient

func NewAwaitingClient(request Command, sendChannel chan Command) *AwaitingClient

func (*AwaitingClient) Dequeued

func (w *AwaitingClient) Dequeued()

func (*AwaitingClient) Enqueued

func (w *AwaitingClient) Enqueued()

func (*AwaitingClient) Id

func (w *AwaitingClient) Id() string

func (*AwaitingClient) Key

func (w *AwaitingClient) Key() int64

func (*AwaitingClient) Timeleft

func (w *AwaitingClient) Timeleft() int64

func (*AwaitingClient) Timestamp

func (w *AwaitingClient) Timestamp() int64

type Command

type Command struct {
	Name           CommandName
	RawCommand     string
	Params         map[string]string
	WaitingForMore bool
	MoreToSend     bool
	Err            error
	Job            Job
}

func NewCommand

func NewCommand() Command

func NewDefaultCommand

func NewDefaultCommand() Command

func (*Command) Copy

func (command *Command) Copy() Command

func (*Command) Parse

func (command *Command) Parse(rawCommand string) (bool, error)

Parse keeps track of the state of the command and it will be called multiple times for commands such as 'put' where the length of command spans multiple lines

func (*Command) Reply

func (command *Command) Reply() (bool, string)

type CommandName

type CommandName string

type CommandParseOptions

type CommandParseOptions struct {
	ExpectedLength int
	WaitingForMore bool
	Params         []string
	Name           CommandName
}

type CommandReplyOptions

type CommandReplyOptions struct {
	Result   bool
	Message  string
	Param    string
	UseJobID bool
}

type Job

type Job struct {
	Pri            int64
	Delay          int64 // time set as delay in seconds
	StartedDelayAt int64 // timestamp of when it was set to delayed
	StartedTTRAt   int64 // timestamp of when it was reserved
	TTR            int64 // time set as ttr in seconds
	Bytes          int64
	Data           string
	// contains filtered or unexported fields
}

func NewJob

func NewJob(id string, pri, delay, ttr, bytes int64, data string) *Job

func (*Job) Dequeued

func (job *Job) Dequeued()

func (*Job) Enqueued

func (job *Job) Enqueued()

func (*Job) Id

func (job *Job) Id() string

func (*Job) Key

func (job *Job) Key() int64

Return proper key according to the present job state

func (*Job) SetState

func (job *Job) SetState(state State) error

*

 put with delay               release with delay
----------------> [DELAYED] <------------.
                      |                   |
                      | (time passes)     |
                      |                   |
 put                  v     reserve       |       delete
-----------------> [READY] ---------> [RESERVED] --------> *poof*
                     ^  ^                |  |
                     |   \  release      |  |
                     |    `-------------'   |
                     |                      |
                     | kick                 |
                     |                      |
                     |       bury           |
                  [BURIED] <---------------'
                     |
                     |  delete
                      `--------> *poof*

func (*Job) State

func (job *Job) State() State

func (*Job) Timestamp

func (job *Job) Timestamp() int64

type PriorityQueue

type PriorityQueue interface {
	Init()
	// queue item
	Enqueue(item PriorityQueueItem)
	// get the highest priority item without removing
	Peek() (item PriorityQueueItem)
	// remove item from begining
	Dequeue() (item PriorityQueueItem)
	// find an item by id in the queue
	Find(id string) (item PriorityQueueItem)
	// delete an item and return it by id
	Delete(id string) PriorityQueueItem
	// size
	Size() int
}

PriorityQueue is the interface that all backends should implement, See backend/min_heap.go for an example

type PriorityQueueItem

type PriorityQueueItem interface {
	Key() int64
	Id() string
	Timestamp() int64
	Enqueued()
	Dequeued()
}

PriorityQueueItem is a single item in the PriorityQueue. This interface helps in isolating details of backend items

type State

type State int
const (
	READY    State = iota // = 0
	DELAYED               // = 1
	RESERVED              // = 2
	BURIED                // = 3
)

type Tube

type Tube struct {
	Name                 string
	Ready                PriorityQueue
	Reserved             PriorityQueue
	Delayed              PriorityQueue
	Buried               PriorityQueue
	AwaitingClients      PriorityQueue
	AwaitingTimedClients map[string]*AwaitingClient
}

Tube represents a single tube(queue) in the beanstalkg server

func (*Tube) Process

func (tube *Tube) Process()

Process runs all the necessary operations for upkeep of the tube TODO unit test

func (*Tube) ProcessTimedClients

func (tube *Tube) ProcessTimedClients()

ProcessTimedClients reserves jobs for or times out the clients with a timeout

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL