Documentation ¶
Overview ¶
Package okq is a go client for the okq persitent queue
To import inside your package do:
import "github.com/mediocregopher/okq-go/okq"
Connecting ¶
Use New to create a Client. This Client can have knowledge of multiple okq endpoints, and will attempt to reconnect at random if it loses connection. In most cases it will only return an error if it can't connect to any of the endpoints at that moment.
cl := okq.New("127.0.0.1:4777", "127.0.0.1:4778")
Pushing to queues ¶
All events in okq require a unique event id. This package will automatically generate a unique id if you use the standard Push methods.
cl.Push("super-queue", "my awesome event", okq.Normal)
You can also create your own id by using the PushEvent methods. Remember though that the event id *must* be unique within that queue.
e := okq.Event{"super-queue", "unique id", "my awesome event"} cl.PushEvent(&e, okq.Normal)
Consuming from queues ¶
You can turn any Client into a consumer by using the Consumer methods. These will block as they call the given function on incoming events, and only return upon an error or a manual stop.
Example of a consumer which should never quit
fn := func(e *okq.Event) bool { log.Printf("event received on %s: %s", e.Queue, e.Contents) return true } for { err := cl.Consumer(fn, nil, "queue1", "queue2") log.Printf("error received from consumer: %s", err) }
See the doc string for the Consumer method for more details
Index ¶
- Constants
- Variables
- type Client
- func (c *Client) Close() error
- func (c *Client) Consumer(fn ConsumerFunc, stopCh chan bool, queues ...string) error
- func (c *Client) ConsumerUnsafe(fn ConsumerFunc, stopCh chan bool, queues ...string) error
- func (c *Client) PeekLast(queue string) (*Event, error)
- func (c *Client) PeekNext(queue string) (*Event, error)
- func (c *Client) Push(queue, contents string, f PushFlag) error
- func (c *Client) PushEvent(e *Event, f PushFlag) error
- func (c *Client) Status(queue ...string) ([]QueueStatus, error)
- type ConsumerFunc
- type Event
- type PushFlag
- type QueueStatus
Constants ¶
const DefaultTimeout = 30 * time.Second
DefaultTimeout is used when reading from socket
Variables ¶
var Debug bool
If true turns on debug logging and agg support (see https://github.com/grooveshark/golib)
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct { // Timeout to use for reads/writes to okq. This defaults to DefaultTimeout, // but can be overwritten immediately after NewClient is called Timeout time.Duration // contains filtered or unexported fields }
Client is a client for the okq persistant queue. It can talk to a pool of okq instances and failover from one to the other if one loses connectivity
func New ¶
New takes one or more okq endpoints (all in the same pool) and returns a client which will interact with them. Returns an error if it can't connect to any of the given clients
func (*Client) Consumer ¶
func (c *Client) Consumer( fn ConsumerFunc, stopCh chan bool, queues ...string, ) error
Consumer turns a client into a consumer. It will register itself on the given queues, and call the ConsumerFunc on all events it comes across. If stopCh is non-nil and is closed this will return immediately (unless blocking on a QNOTIFY command, in which case it will return after that returns).
The ConsumerFunc is called synchronously, so if you wish to process events in parallel you'll have to create multiple connections to okq
func (*Client) ConsumerUnsafe ¶
func (c *Client) ConsumerUnsafe( fn ConsumerFunc, stopCh chan bool, queues ...string, ) error
ConsumerUnsafe is the same as Consumer except that the given ConsumerFunc is called asynchronously and its return value doesn't matter (because no QACK is ever sent to the okq server)
func (*Client) PeekLast ¶
PeekLast returns the event most recently added to the queue, without actually removing it from the queue. Returns nil if the queue is empty
func (*Client) PeekNext ¶
PeekNext returns the next event which will be retrieved from the queue, without actually removing it from the queue. Returns nil if the queue is empty
func (*Client) Push ¶
Push pushes an event with the given contents onto the queue. The event's ID will be an automatically generated uuid
Normal event:
cl.Push("queue", "some event", okq.Normal)
High priority event:
cl.Push("queue", "some important event", okq.HighPriority)
Submit an event as fast as possible
cl.Push("queue", "not that important event", okq.NoBlock)
type ConsumerFunc ¶
ConsumerFunc is passed into Consume, and is used as a callback for incoming Events. It should return true if the event was processed successfully and false otherwise. If ConsumerUnsafe is being used the return is ignored
type Event ¶
type Event struct { Queue string // The queue the event is coming from/going to ID string // Unique id of this event Contents string // Arbitrary contents of the event }
Event is a single event which can be read from or written to an okq instance
type PushFlag ¶
type PushFlag int
PushFlag is passed into either of the Push commands to alter their behavior. You can or multiple of these together to combine their behavior
const ( // Normal is the expected behavior (call waits for event to be committed to // okq, normal priority) Normal PushFlag = 1 << iota // HighPriority causes the pushed event to be placed at the front of the // queue instead of the back HighPriority // NoBlock causes set the server to not wait for the event to be committed // to disk before replying, it will reply as soon as it can and commit // asynchronously NoBlock )
type QueueStatus ¶
type QueueStatus struct { Name string // Name of the queue Total int64 // Total events in the queue, includes ones being processed Processing int64 // Number of events currently being processed Consumers int64 // Number of connections registered as consumers for this queue }
QueueStatus describes the current status for a single queue, as described by the QSTATUS command