Documentation ¶
Overview ¶
Package okq is a go client for the okq persitent queue
To import inside your package do:
import "github.com/mediocregopher/okq-go.v2"
Connecting ¶
Most of the time you'll want to use New to make a new Client. This will create a connection pool of the size given, and use that for all operation. Client's are thread-safe.
cl, err := okq.New("127.0.0.1:4777", 10)
Pushing to queues ¶
All events in okq require a unique event id. This package will automatically generate a unique id if you use the standard Push methods.
cl.Push("super-queue", "my awesome event", okq.Normal)
You can also create your own id by using the PushEvent methods. Remember though that the event id *must* be unique within that queue.
e := okq.Event{"super-queue", "unique id", "my awesome event"} cl.PushEvent(&e, okq.Normal)
Consuming from queues ¶
You can turn any Client into a consumer by using the Consumer methods. These will return a channel which will block until an error is hit or a manual stop occurs
Example of a consumer which should never quit
fn := func(ctx context.Context, e okq.Event) bool { log.Printf("event received on %s: %s", e.Queue, e.Contents) return true } for { errCh := cl.Consumer(context.Background(), fn, nil, "queue1", "queue2") log.Printf("error received from consumer: %s", <-errCh) }
See the doc string for the Consumer method for more details
Index ¶
- Variables
- func IsDup(err error) bool
- type Client
- func (c *Client) Close() error
- func (c *Client) Consume(opts ConsumerOpts) <-chan error
- func (c *Client) Consumer(ctx context.Context, fn ConsumerFunc, queues ...string) <-chan error
- func (c *Client) PeekLast(queue string) (Event, error)
- func (c *Client) PeekNext(queue string) (Event, error)
- func (c *Client) Push(queue, contents string, f PushFlag) error
- func (c *Client) PushEvent(e Event, f PushFlag) error
- func (c *Client) Status(queue ...string) ([]QueueStatus, error)
- type ConsumerFunc
- type ConsumerOpts
- type Event
- type Opts
- type PushFlag
- type QueueStatus
- type RedisPool
Constants ¶
This section is empty.
Variables ¶
var DefaultTimeout = 30 * time.Second
DefaultTimeout is used as the default timeout for reading from the redis socket, and is used as the time to block per notify command for consumers. This is only relevant if using the the New function
Functions ¶
Types ¶
type Client ¶
type Client struct { Opts // contains filtered or unexported fields }
Client is a client for the okq persistent queue which talks to a pool of okq instances.
All methods on Client are thread-safe.
func New ¶
New takes in a redis address and creates a connection pool for it. DefaultTimeout will be used for NotifyTimout.
func NewWithOpts ¶
NewWithOpts returns a new initialized Client based on the given Opts. RedisPool is a required field in Opts.
func (*Client) Close ¶
Close closes all connections that this client currently has pooled. Should only be called once all other commands and consumers are done running
func (*Client) Consume ¶
func (c *Client) Consume(opts ConsumerOpts) <-chan error
Consume turns a client into a consumer. It will register itself on the Queues, and call the Callback on all events it comes across. It returns a buffered error channel to which an error will be written when one is come across. At that point Consumer must be called again.
The Callback is called synchronously, so if you wish to process events in parallel you'll have to call Consume multiple times from multiple go routines.
func (*Client) Consumer ¶
Consumer turns a client into a consumer, and is a shortcut around Consume.
func (*Client) PeekLast ¶
PeekLast returns the event most recently added to the queue, without actually removing it from the queue. Returns an empty Event (IsZero() == true) if the queue is empty
func (*Client) PeekNext ¶
PeekNext returns the next event which will be retrieved from the queue, without actually removing it from the queue. Returns an empty Event (IsZero() == true) if the queue is empty
func (*Client) Push ¶
Push pushes an event with the given contents onto the queue. The event's ID will be an automatically generated uuid
Normal event:
cl.Push("queue", "some event", okq.Normal)
High priority event:
cl.Push("queue", "some important event", okq.HighPriority)
Submit an event as fast as possible
cl.Push("queue", "not that important event", okq.NoBlock)
Submit an important event, but do it as fast and unsafely as possibly (this probably would never actually be wanted
cl.Push("queue", "not that important event", okq.HighPriority & okq.NoBlock)
type ConsumerFunc ¶
ConsumerFunc is passed into Consumer, and is used as a callback for incoming Events. It should return true if the event was processed successfully and false otherwise.
The Context will be canceled once the event's expire has been reached (as set in ConsumerOpts ExpireSeconds field) or the base Context passed to Consume is canceled.
type ConsumerOpts ¶
type ConsumerOpts struct { // Required, the queues to consume Queues []string // Required, the callback to call for every consumed event Callback ConsumerFunc // Optional, if set this can be canceled to stop the consumer after it's // completed its current job. The consumer will write context.Canceled to // its error channel once it's done. This same context (if set) will be used // as the root Context for each call to Callback. Context context.Context // Optional (default 30), the number of seconds a consumed job has in order // to be completed (i.e. Callback finishes running). If the event is not // completed in time okq will put it back in the queue it came from, // regardless of the status of the consumer. // // If -1 then the expire is effectively infinity and the event is never put // back in the queue, regardless of what the Callback returns. // // This timeout, if not -1, is also used to cancel the Context passed to the // Callback ExpireSeconds int }
ConsumerOpts are the set of parameters that an okq consumer can run with
type Event ¶
type Event struct { Queue string // The queue the event is coming from/going to ID string // Unique id of this event Contents string // Arbitrary contents of the event }
Event is a single event which can be read from or written to an okq instance
type Opts ¶
type Opts struct { RedisPool // Defaults to DefaultTimeout. This indicates the time a consumer should // block on the connection waiting for new events. This should be equal to // the read timeout on the redis connections. NotifyTimeout time.Duration }
Opts are various fields which can be used with NewWithOpts to create an okq client. Only RedisPool is required.
type PushFlag ¶
type PushFlag int
PushFlag is passed into either of the Push commands to alter their behavior. You can or multiple of these together to combine their behavior
const ( // Normal is the expected behavior (call waits for event to be committed to // okq, normal priority) Normal PushFlag = 1 << iota // HighPriority causes the pushed event to be placed at the front of the // queue instead of the back HighPriority // NoBlock causes the server to not wait for the event to be committed to // disk before replying, it will reply as soon as it can and commit // asynchronously NoBlock )
type QueueStatus ¶
type QueueStatus struct { Name string // Name of the queue Total int64 // Total events in the queue, includes ones being processed Processing int64 // Number of events currently being processed Consumers int64 // Number of connections registered as consumers for this queue }
QueueStatus describes the current status for a single queue, as described by the QSTATUS command