okq

package module
v0.0.0-...-f31546d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 17, 2017 License: MIT Imports: 9 Imported by: 2

README

okq-go

GoDoc

This package is still under development, and its API may change at any time. With that said, it is also being used in production, so API changes aside it should always be usable. The previous version is completely stable.

A go driver for the okq persistent queue.

okq uses the redis protocol and calling conventions as it's interface, so any standard redis client can be used to interact with it. This package wraps around a normal redis pool, however, to provide a convenient interface to interfact with. Specifically, it creates a simple interface for event consumers to use so they retrieve events through a channel rather than implementing that logic manually everytime.

Usage

To get:

go get github.com/mediocregopher/okq-go.v2

To import:

import "github.com/mediocregopher/okq-go.v2"

Documentation

Overview

Package okq is a go client for the okq persitent queue

To import inside your package do:

import "github.com/mediocregopher/okq-go.v2"

Connecting

Most of the time you'll want to use New to make a new Client. This will create a connection pool of the size given, and use that for all operation. Client's are thread-safe.

cl, err := okq.New("127.0.0.1:4777", 10)

Pushing to queues

All events in okq require a unique event id. This package will automatically generate a unique id if you use the standard Push methods.

cl.Push("super-queue", "my awesome event", okq.Normal)

You can also create your own id by using the PushEvent methods. Remember though that the event id *must* be unique within that queue.

e := okq.Event{"super-queue", "unique id", "my awesome event"}
cl.PushEvent(&e, okq.Normal)

Consuming from queues

You can turn any Client into a consumer by using the Consumer methods. These will return a channel which will block until an error is hit or a manual stop occurs

Example of a consumer which should never quit

fn := func(ctx context.Context, e okq.Event) bool {
	log.Printf("event received on %s: %s", e.Queue, e.Contents)
	return true
}
for {
	errCh := cl.Consumer(context.Background(), fn, nil, "queue1", "queue2")
	log.Printf("error received from consumer: %s", <-errCh)
}

See the doc string for the Consumer method for more details

Index

Constants

This section is empty.

Variables

View Source
var DefaultTimeout = 30 * time.Second

DefaultTimeout is used as the default timeout for reading from the redis socket, and is used as the time to block per notify command for consumers. This is only relevant if using the the New function

Functions

func IsDup

func IsDup(err error) bool

IsDup returns true if the error given was returned from okq for the case of an event which was submitted being the duplicate of an existing one.

Types

type Client

type Client struct {
	Opts
	// contains filtered or unexported fields
}

Client is a client for the okq persistent queue which talks to a pool of okq instances.

All methods on Client are thread-safe.

func New

func New(addr string, numConns int) (*Client, error)

New takes in a redis address and creates a connection pool for it. DefaultTimeout will be used for NotifyTimout.

func NewWithOpts

func NewWithOpts(o Opts) *Client

NewWithOpts returns a new initialized Client based on the given Opts. RedisPool is a required field in Opts.

func (*Client) Close

func (c *Client) Close() error

Close closes all connections that this client currently has pooled. Should only be called once all other commands and consumers are done running

func (*Client) Consume

func (c *Client) Consume(opts ConsumerOpts) <-chan error

Consume turns a client into a consumer. It will register itself on the Queues, and call the Callback on all events it comes across. It returns a buffered error channel to which an error will be written when one is come across. At that point Consumer must be called again.

The Callback is called synchronously, so if you wish to process events in parallel you'll have to call Consume multiple times from multiple go routines.

func (*Client) Consumer

func (c *Client) Consumer(ctx context.Context, fn ConsumerFunc, queues ...string) <-chan error

Consumer turns a client into a consumer, and is a shortcut around Consume.

func (*Client) PeekLast

func (c *Client) PeekLast(queue string) (Event, error)

PeekLast returns the event most recently added to the queue, without actually removing it from the queue. Returns an empty Event (IsZero() == true) if the queue is empty

func (*Client) PeekNext

func (c *Client) PeekNext(queue string) (Event, error)

PeekNext returns the next event which will be retrieved from the queue, without actually removing it from the queue. Returns an empty Event (IsZero() == true) if the queue is empty

func (*Client) Push

func (c *Client) Push(queue, contents string, f PushFlag) error

Push pushes an event with the given contents onto the queue. The event's ID will be an automatically generated uuid

Normal event:

cl.Push("queue", "some event", okq.Normal)

High priority event:

cl.Push("queue", "some important event", okq.HighPriority)

Submit an event as fast as possible

cl.Push("queue", "not that important event", okq.NoBlock)

Submit an important event, but do it as fast and unsafely as possibly (this probably would never actually be wanted

cl.Push("queue", "not that important event", okq.HighPriority & okq.NoBlock)

func (*Client) PushEvent

func (c *Client) PushEvent(e Event, f PushFlag) error

PushEvent pushes the given event onto its queue. The event's Id must be unique within that queue

func (*Client) Status

func (c *Client) Status(queue ...string) ([]QueueStatus, error)

Status returns the statuses of the given queues, or the statuses of all the known queues if no queues are given

type ConsumerFunc

type ConsumerFunc func(context.Context, Event) bool

ConsumerFunc is passed into Consumer, and is used as a callback for incoming Events. It should return true if the event was processed successfully and false otherwise.

The Context will be canceled once the event's expire has been reached (as set in ConsumerOpts ExpireSeconds field) or the base Context passed to Consume is canceled.

type ConsumerOpts

type ConsumerOpts struct {
	// Required, the queues to consume
	Queues []string

	// Required, the callback to call for every consumed event
	Callback ConsumerFunc

	// Optional, if set this can be canceled to stop the consumer after it's
	// completed its current job. The consumer will write context.Canceled to
	// its error channel once it's done. This same context (if set) will be used
	// as the root Context for each call to Callback.
	Context context.Context

	// Optional (default 30), the number of seconds a consumed job has in order
	// to be completed (i.e. Callback finishes running). If the event is not
	// completed in time okq will put it back in the queue it came from,
	// regardless of the status of the consumer.
	//
	// If -1 then the expire is effectively infinity and the event is never put
	// back in the queue, regardless of what the Callback returns.
	//
	// This timeout, if not -1, is also used to cancel the Context passed to the
	// Callback
	ExpireSeconds int
}

ConsumerOpts are the set of parameters that an okq consumer can run with

type Event

type Event struct {
	Queue    string // The queue the event is coming from/going to
	ID       string // Unique id of this event
	Contents string // Arbitrary contents of the event
}

Event is a single event which can be read from or written to an okq instance

func (Event) IsZero

func (e Event) IsZero() bool

IsZero returns true if this is an empty Event (usually used as a return from an empty queue)

type Opts

type Opts struct {
	RedisPool

	// Defaults to DefaultTimeout. This indicates the time a consumer should
	// block on the connection waiting for new events. This should be equal to
	// the read timeout on the redis connections.
	NotifyTimeout time.Duration
}

Opts are various fields which can be used with NewWithOpts to create an okq client. Only RedisPool is required.

type PushFlag

type PushFlag int

PushFlag is passed into either of the Push commands to alter their behavior. You can or multiple of these together to combine their behavior

const (
	// Normal is the expected behavior (call waits for event to be committed to
	// okq, normal priority)
	Normal PushFlag = 1 << iota

	// HighPriority causes the pushed event to be placed at the front of the
	// queue instead of the back
	HighPriority

	// NoBlock causes the server to not wait for the event to be committed to
	// disk before replying, it will reply as soon as it can and commit
	// asynchronously
	NoBlock
)

type QueueStatus

type QueueStatus struct {
	Name       string // Name of the queue
	Total      int64  // Total events in the queue, includes ones being processed
	Processing int64  // Number of events currently being processed
	Consumers  int64  // Number of connections registered as consumers for this queue
}

QueueStatus describes the current status for a single queue, as described by the QSTATUS command

type RedisPool

type RedisPool interface {
	Get() (*redis.Client, error)
	Put(*redis.Client)
	Empty()
}

RedisPool is an interface which is implemented by radix.v2's pool.Pool type, but can be easily implemented by other types if desired

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL