q

package module
v0.0.0-...-aba326a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2017 License: Apache-2.0 Imports: 4 Imported by: 0

README

q Godoc Travis Codecov

A toy in-memory queueing service with a lot of plumbing. q exposes an arbitrary number of in-memory FIFO queues via gRPC. Each queue supports add, peek, and pop operations. Queues may be limited in size or unbounded.

Both queues and messages may be tagged. Queue tags may be updated, but message tags (and messages in general) are immutable.

Of course, with all queues and their messages being stored in-memory, everything will be forgotten when the q process dies. :)

Components

The q service consists of three binaries:

  • q - The main logic. Serves a gRPC API on port 10002.
  • qrest - Serves a (mostly) automatically generated REST to gRPC gateway on port 80. See proto/q.swagger.json for the API spec.
  • qcli - A commandline gRPC client for q.

Metrics, logging, and management

q exposes Prometheus metrics via HTTP at /metrics on port 10003. We expose the count of total enqueued and consumed messages, tagged by queue ID. Total errors are also exposed, tagged by queue and error type. We only expose counts, not gauges, because counts don't lose meaning when downsampled in a timeseries.

qrest also exposes Prometheus metrics at /metrics on port 80, but only the process and Go runtime information Prometheus provides for free.

Both q and qrest provide terse JSON structured logs on stdout by default. Run them with the -d flag for debug logging in a more human friendly format.

q can be terminated immediately by hitting /quitquitquit on its metrics port. qrest can also be terminated by hitting /quitquitquit on its main port.

Packages

q consists of the following packages. Refer to their GoDocs for API details:

  • q - Defines the core interfaces and types for the queue service.
  • q/e - Provides error types and handling.
  • q/boltdb - A (currently unused) BoltDb backed implementation of q.Queue.
  • q/factory - A q.Factory implementation.
  • q/logging - Log emitting wrappers for q.Queue and q.Manager.
  • q/manager - Implementations of q.Manager.
  • q/memory - An in-memory implementation of q.Queue.
  • q/metrics - Metric emitting wrappers for q.Queue.
  • q/rpc - Implements gRPC API for q.
  • q/proto - Protocol buffer specification for the gRPC API and on-disk serialisation.
  • q/test/fixtures - Common fixtures used to test q.

Running

Kubernetes deployment configs are provided under kube/. Use minikube to run q locally:

# Install Minikube and deploy q
$ brew cask install minikube
$ minikube start
$ kubectl create namespace q
$ kubectl -n q create -f kube/deployment.yaml
$ kubectl -n q create -f kube/service.yaml

# Use kubectl to determine which node ports map to which internal ports.
$ kubectl -n q describe service q|grep Port
Type:                   NodePort
Port:                   grpc    10002/TCP
NodePort:               grpc    31051/TCP
Port:                   metrics 10003/TCP
NodePort:               metrics 31457/TCP
Port:                   rest    80/TCP
NodePort:               rest    30647/TCP

$ minikube service -n q q --url
http://192.168.99.101:31051  # gRPC is listening here.
http://192.168.99.101:31457  # Metrics are being served here.
http://192.168.99.101:30647  # The REST gateway is being served here.

# Use it!
$ docker pull negz/qcli
$ docker run negz/qcli /qcli -s 192.168.99.101:31051 new MEMORY 10 -t function="cubesat launcher"
{
  "queue": {
    "meta": {
      "id": "f9e0925d-bfaa-4e59-96ae-dd78a0bb751d",
      "created": "2017-06-25T23:01:08.008101953Z",
      "tags": [
        {
          "key": "function",
          "value": "cubesat launcher"
        }
      ]
    },
    "store": "MEMORY"
  }
}
$ echo "dove"|docker run negz/qcli /qcli -s 192.168.99.101:31051 add f9e0925d-bfaa-4e59-96ae-dd78a0bb751d -t size=3U
{
  "message": {
    "meta": {
      "id": "91432eee-e5ef-4f14-84ab-eb471e2e9d61",
      "created": "2017-06-25T23:05:44.628686920Z",
      "tags": [
        {
          "key": "size",
          "value": "3U"
        }
      ]
    }
  }
}
$ curl -s http://192.168.99.101:31457/metrics|grep queue
# HELP queue_messages_enqueued_total Number of queued messages.
# TYPE queue_messages_enqueued_total counter
queue_messages_enqueued_total{queue="f9e0925d-bfaa-4e59-96ae-dd78a0bb751d"} 1

Building

You'll need working Go and Docker installs. This project has been built and tested against Go 1.8. Clone the project and run the build script to compile the binaries and create Docker images:

$ mkdir -p ${GOPATH}/src/github.com/negz
$ cd ${GOPATH}/src/github.com/negz
$ git clone git@github.com:negz/q
$ cd q
$ scripts/build.sh

Testing

Take a look at the Travis CI project, or run scripts/test.sh. Note that test.sh assumes you've setup your environment per the build instructions.

Generated code

Any directory with a generate.go file contains automatically generated code generated by running go generate.

This code generation depends on tools that are not managed by dep. To regenerate code you'll need to:

$ go get golang.org/x/tools/cmd/stringer
$ go get github.com/gogo/protobuf/protoc-gen-gogoslick
$ go get github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
$ go get github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger

Known issues

  • The NewMessage protocol buffer message is not included in the generated Swagger API docs. This makes it difficult to discover how to add new messages to a queue.

Documentation

Overview

Package q provides interfaces and types for use in a FIFO queue system.

Index

Constants

View Source
const Unbounded int = -1

Unbounded queues will accept messages until they exhaust available resources.

Variables

This section is empty.

Functions

This section is empty.

Types

type Error

type Error int

Error differentiates errors for metric collection purposes.

const (
	// UnknownError indicates an unknown error type.
	UnknownError Error = iota

	// Full indicates a queue was full.
	Full

	// NotFound indicates no messages were found in a queue.
	NotFound
)

func (Error) String

func (i Error) String() string

type Factory

type Factory interface {
	New(s Store, limit int, t ...Tag) (Queue, error)
}

A Factory produces new queues with the requested store, limit, and tags.

type Manager

type Manager interface {
	Add(Queue) error                 // Add a new queue to the manager.
	Get(id uuid.UUID) (Queue, error) // Get an existing queue given its ID.
	Delete(id uuid.UUID) error       // Delete an existing queue given its ID.
	List() ([]Queue, error)          // List all existing queues.
}

A Manager manages a set of queues.

type Message

type Message struct {
	*Metadata
	Payload []byte // The Payload of a Message is an arbitrary byte array.
}

A Message represents an entry in a queue.

func NewMessage

func NewMessage(payload []byte, o ...Option) *Message

NewMessage creates a message from the supplied payload.

type Metadata

type Metadata struct {
	ID      uuid.UUID // ID is a globally unique identifier for a resource.
	Created time.Time // Created is the creation time of a resource.
	Tags    *Tags     // Tags are arbitrary key:value pairs associated with a resource.
}

Metadata is useful information associated with either queues or messages.

type Metrics

type Metrics interface {
	Enqueued(id uuid.UUID) // Enqueued increments the enqueued message count.
	Consumed(id uuid.UUID) // Consumed increments the consumed message count.
	// Error increments the count of errors encountered while queueing or consuming messages.
	Error(id uuid.UUID, t Error)
}

Metrics for a queue. We only expose counts, not gauges, because they don't lose meaning when downsampled in a timeseries. See https://goo.gl/WTHgAq for details.

type Option

type Option func(*Message)

An Option represents an optional argument to a new message.

func Tagged

func Tagged(t ...Tag) Option

Tagged applies the provided tags to a new message.

type Queue

type Queue interface {
	ID() uuid.UUID           // ID is the globally unique identifier for this queue.
	Created() time.Time      // Created is the creation time of this queue.
	Tags() *Tags             // Tags are arbitrary key:value pairs associated with this queue.
	Store() Store            // Store indicates which backing store this queue uses.
	Add(*Message) error      // Add amends a message to this queue.
	Pop() (*Message, error)  // Pop consumes and returns the next message in the queue.
	Peek() (*Message, error) // Peek returns the next message in the queue without consuming it.
}

A Queue stores Messages for consumption by another process.

type Store

type Store int

A Store is the type of backing store a queue uses.

const (
	// UnknownStore queues have an indeterminate backing store.
	UnknownStore Store = iota

	// Memory queues are in-memory. Their contents do not persist across process restarts.
	Memory

	// BoltDB queues are persisted to disk using a BoltDB store.
	BoltDB
)

func (Store) String

func (i Store) String() string

type Tag

type Tag struct {
	Key   string
	Value string
}

A Tag is an arbitrary key:value pair associate with a resource.

func (Tag) String

func (t Tag) String() string

String represents a tag as a string.

type Tags

type Tags struct {
	// contains filtered or unexported fields
}

Tags are a threadsafe set of tags. A single key may have multiple values.

func (*Tags) Add

func (t *Tags) Add(k, v string)

Add the supplied key value pair to a set of Tags.

func (*Tags) AddMap

func (t *Tags) AddMap(m map[string]string)

AddMap adds each key value pair in a map to a set of Tags.

func (*Tags) AddTag

func (t *Tags) AddTag(tag Tag)

AddTag adds the supplied Tag to a set of Tags.

func (*Tags) Contains

func (t *Tags) Contains(k, v string) bool

Contains indicates whether the given key value pair exists in a set of tags.

func (*Tags) ContainsTag

func (t *Tags) ContainsTag(tag Tag) bool

ContainsTag indicates whether the given tag exists in a set of tags.

func (*Tags) Get

func (t *Tags) Get() []Tag

Get all tags.

func (*Tags) Remove

func (t *Tags) Remove(k, v string)

Remove the supplied key value pair from a set of Tags.

func (*Tags) RemoveTag

func (t *Tags) RemoveTag(tag Tag)

RemoveTag removes the supplied tag from a set of Tags.

Directories

Path Synopsis
Package bdb provides a FIFO queue backed by a BoltDB database.
Package bdb provides a FIFO queue backed by a BoltDB database.
cmd
q
e
Package e provides error types and handling.
Package e provides error types and handling.
Package factory provides a FIFO queue factory.
Package factory provides a FIFO queue factory.
Package logging provides logging wrappers for queues and queue managers.
Package logging provides logging wrappers for queues and queue managers.
Package manager provides implementations of queue managers, which act as an index of various FIFO queues.
Package manager provides implementations of queue managers, which act as an index of various FIFO queues.
Package memory provides an in-memory FIFO queue backed by a linked list.
Package memory provides an in-memory FIFO queue backed by a linked list.
Package metrics provides implementations of the q.Metrics interface and a wrapper to expose metrics for any implementation of the q.Queue interface.
Package metrics provides implementations of the q.Metrics interface and a wrapper to expose metrics for any implementation of the q.Queue interface.
Package proto is a generated protocol buffer package.
Package proto is a generated protocol buffer package.
Package rpc implements a gRPC server for the q queue service.
Package rpc implements a gRPC server for the q queue service.
test
integration
Package integration is an integration test.
Package integration is an integration test.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL