okq

package
v0.0.0-...-048e319 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2016 License: MIT Imports: 7 Imported by: 7

Documentation

Overview

Package okq is a go client for the okq persitent queue

To import inside your package do:

import "github.com/mediocregopher/okq-go/okq"

Connecting

Use New to create a Client. This Client can have knowledge of multiple okq endpoints, and will attempt to reconnect at random if it loses connection. In most cases it will only return an error if it can't connect to any of the endpoints at that moment.

cl := okq.New("127.0.0.1:4777", "127.0.0.1:4778")

Pushing to queues

All events in okq require a unique event id. This package will automatically generate a unique id if you use the standard Push methods.

cl.Push("super-queue", "my awesome event", okq.Normal)

You can also create your own id by using the PushEvent methods. Remember though that the event id *must* be unique within that queue.

e := okq.Event{"super-queue", "unique id", "my awesome event"}
cl.PushEvent(&e, okq.Normal)

Consuming from queues

You can turn any Client into a consumer by using the Consumer methods. These will block as they call the given function on incoming events, and only return upon an error or a manual stop.

Example of a consumer which should never quit

fn := func(e *okq.Event) bool {
	log.Printf("event received on %s: %s", e.Queue, e.Contents)
	return true
}
for {
	err := cl.Consumer(fn, nil, "queue1", "queue2")
	log.Printf("error received from consumer: %s", err)
}

See the doc string for the Consumer method for more details

Index

Constants

View Source
const DefaultTimeout = 30 * time.Second

DefaultTimeout is used when reading from socket

Variables

View Source
var Debug bool

If true turns on debug logging and agg support (see https://github.com/grooveshark/golib)

Functions

This section is empty.

Types

type Client

type Client struct {

	// Timeout to use for reads/writes to okq. This defaults to DefaultTimeout,
	// but can be overwritten immediately after NewClient is called
	Timeout time.Duration
	// contains filtered or unexported fields
}

Client is a client for the okq persistant queue. It can talk to a pool of okq instances and failover from one to the other if one loses connectivity

func New

func New(addr ...string) *Client

New takes one or more okq endpoints (all in the same pool) and returns a client which will interact with them. Returns an error if it can't connect to any of the given clients

func (*Client) Close

func (c *Client) Close() error

Close closes all connections that this client currently has open

func (*Client) Consumer

func (c *Client) Consumer(
	fn ConsumerFunc, stopCh chan bool, queues ...string,
) error

Consumer turns a client into a consumer. It will register itself on the given queues, and call the ConsumerFunc on all events it comes across. If stopCh is non-nil and is closed this will return immediately (unless blocking on a QNOTIFY command, in which case it will return after that returns).

The ConsumerFunc is called synchronously, so if you wish to process events in parallel you'll have to create multiple connections to okq

func (*Client) ConsumerUnsafe

func (c *Client) ConsumerUnsafe(
	fn ConsumerFunc, stopCh chan bool, queues ...string,
) error

ConsumerUnsafe is the same as Consumer except that the given ConsumerFunc is called asynchronously and its return value doesn't matter (because no QACK is ever sent to the okq server)

func (*Client) PeekLast

func (c *Client) PeekLast(queue string) (*Event, error)

PeekLast returns the event most recently added to the queue, without actually removing it from the queue. Returns nil if the queue is empty

func (*Client) PeekNext

func (c *Client) PeekNext(queue string) (*Event, error)

PeekNext returns the next event which will be retrieved from the queue, without actually removing it from the queue. Returns nil if the queue is empty

func (*Client) Push

func (c *Client) Push(queue, contents string, f PushFlag) error

Push pushes an event with the given contents onto the queue. The event's ID will be an automatically generated uuid

Normal event:

cl.Push("queue", "some event", okq.Normal)

High priority event:

cl.Push("queue", "some important event", okq.HighPriority)

Submit an event as fast as possible

cl.Push("queue", "not that important event", okq.NoBlock)

func (*Client) PushEvent

func (c *Client) PushEvent(e *Event, f PushFlag) error

PushEvent pushes the given event onto its queue. The event's Id must be unique within that queue

func (*Client) Status

func (c *Client) Status(queue ...string) ([]QueueStatus, error)

Status returns the statuses of the given queues, or the statuses of all the known queues if no queues are given

type ConsumerFunc

type ConsumerFunc func(*Event) bool

ConsumerFunc is passed into Consume, and is used as a callback for incoming Events. It should return true if the event was processed successfully and false otherwise. If ConsumerUnsafe is being used the return is ignored

type Event

type Event struct {
	Queue    string // The queue the event is coming from/going to
	ID       string // Unique id of this event
	Contents string // Arbitrary contents of the event
}

Event is a single event which can be read from or written to an okq instance

type PushFlag

type PushFlag int

PushFlag is passed into either of the Push commands to alter their behavior. You can or multiple of these together to combine their behavior

const (
	// Normal is the expected behavior (call waits for event to be committed to
	// okq, normal priority)
	Normal PushFlag = 1 << iota

	// HighPriority causes the pushed event to be placed at the front of the
	// queue instead of the back
	HighPriority

	// NoBlock causes set the server to not wait for the event to be committed
	// to disk before replying, it will reply as soon as it can and commit
	// asynchronously
	NoBlock
)

type QueueStatus

type QueueStatus struct {
	Name       string // Name of the queue
	Total      int64  // Total events in the queue, includes ones being processed
	Processing int64  // Number of events currently being processed
	Consumers  int64  // Number of connections registered as consumers for this queue
}

QueueStatus describes the current status for a single queue, as described by the QSTATUS command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL