nsq

package
v0.0.0-...-65af2ff Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2014 License: AGPL-3.0 Imports: 26 Imported by: 0

README

go-nsq

go-nsq is the official Go package for NSQ.

Build Status

The latest stable release is 0.3.4.

It provides high-level Reader and Writer types to implement consumers and producers as well as low-level functions to communicate over the NSQ protocol.

See the main repo apps directory for examples of clients built using this package.

Installing

$ go get github.com/bitly/go-nsq

Importing

import "github.com/bitly/go-nsq"

Docs

See godoc for pretty documentation or:

# in the go-nsq package directory
$ go doc

Documentation

Overview

nsq is the official Go package for https://github.com/bitly/nsq

It provides high-level Reader and Writer types to implement consumers and producers as well as low-level functions to communicate over the NSQ protocol.

Index

Constants

View Source
const (
	// when successful
	FrameTypeResponse int32 = 0
	// when an error occurred
	FrameTypeError int32 = 1
	// when it's a serialized message
	FrameTypeMessage int32 = 2
)
View Source
const (
	StateInit = iota
	StateDisconnected
	StateConnected
	StateSubscribed
	// close has started. responses are ok, but no new messages will be sent
	StateClosing
)
View Source
const DefaultClientTimeout = 60 * time.Second

The amount of time nsqd will allow a client to idle, can be overriden

View Source
const MsgIdLength = 16

The number of bytes for a Message.Id

View Source
const VERSION = "0.3.4"

Variables

View Source
var ErrAlreadyConnected = errors.New("already connected")

returned from ConnectToNSQ when already connected

View Source
var ErrLookupdAddressExists = errors.New("lookupd address already exists")

returned from ConnectToLookupd when given lookupd address exists already

View Source
var ErrNotConnected = errors.New("not connected")

returned when a publish command is made against a Writer that is not connected

View Source
var ErrOverMaxInFlight = errors.New("over configure max-inflight")

returned from updateRdy if over max-in-flight

View Source
var ErrStopped = errors.New("stopped")

returned when a publish command is made against a Writer that has been stopped

View Source
var MagicV1 = []byte("  V1")
View Source
var MagicV2 = []byte("  V2")

Functions

func ApiRequest

func ApiRequest(endpoint string) (*simplejson.Json, error)

ApiRequest is a helper function to perform an HTTP request and parse our NSQ daemon's expected response format, with deadlines.

{"status_code":200, "status_txt":"OK", "data":{...}}

func IsValidChannelName

func IsValidChannelName(name string) bool

IsValidChannelName checks a channel name for correctness

func IsValidTopicName

func IsValidTopicName(name string) bool

IsValidTopicName checks a topic name for correctness

func NewDeadlineTransport

func NewDeadlineTransport(timeout time.Duration) *http.Transport

A custom http.Transport with support for deadline timeouts

func ReadResponse

func ReadResponse(r io.Reader) ([]byte, error)

ReadResponse is a client-side utility function to read from the supplied Reader according to the NSQ protocol spec:

[x][x][x][x][x][x][x][x]...
|  (int32) || (binary)
|  4-byte  || N-byte
------------------------...
    size       data

func UnpackResponse

func UnpackResponse(response []byte) (int32, []byte, error)

UnpackResponse is a client-side utility function that unpacks serialized data according to NSQ protocol spec:

[x][x][x][x][x][x][x][x]...
|  (int32) || (binary)
|  4-byte  || N-byte
------------------------...
  frame ID     data

Returns a triplicate of: frame type, data ([]byte), error

Types

type AsyncHandler

type AsyncHandler interface {
	HandleMessage(message *Message, responseChan chan *FinishedMessage)
}

AsyncHandler is the asynchronous interface to Reader.

Implement this interface for handlers that wish to defer responding until later. This is particularly useful if you want to batch work together.

An AsyncHandler must send:

&FinishedMessage{messageID, requeueDelay, true|false}

To the supplied responseChan to indicate that a message is processed.

type Command

type Command struct {
	Name   []byte
	Params [][]byte
	Body   []byte
}

Command represents a command from a client to an NSQ daemon

func Finish

func Finish(id MessageID) *Command

Finish creates a new Command to indiciate that a given message (by id) has been processed successfully

func Identify

func Identify(js map[string]interface{}) (*Command, error)

Identify creates a new Command to provide information about the client. After connecting, it is generally the first message sent.

The supplied map is marshaled into JSON to provide some flexibility for this command to evolve over time.

See http://bitly.github.io/nsq/clients/tcp_protocol_spec.html#identify for information on the supported options

func MultiPublish

func MultiPublish(topic string, bodies [][]byte) (*Command, error)

MultiPublish creates a new Command to write more than one message to a given topic. This is useful for high-throughput situations to avoid roundtrips and saturate the pipe.

func Nop

func Nop() *Command

Nop creates a new Command that has no effect server side. Commonly used to respond to heartbeats

func Ping

func Ping() *Command

Ping creates a new Command to keep-alive the state of all the announced topic/channels for a given client

func Publish

func Publish(topic string, body []byte) *Command

Publish creates a new Command to write a message to a given topic

func Ready

func Ready(count int) *Command

Ready creates a new Command to specify the number of messages a client is willing to receive

func Register

func Register(topic string, channel string) *Command

Register creates a new Command to add a topic/channel for the connected nsqd

func Requeue

func Requeue(id MessageID, timeoutMs int) *Command

Requeue creates a new Command to indicate that a given message (by id) should be requeued after the given timeout (in ms) NOTE: a timeout of 0 indicates immediate requeue

func StartClose

func StartClose() *Command

StartClose creates a new Command to indicate that the client would like to start a close cycle. nsqd will no longer send messages to a client in this state and the client is expected finish pending messages and close the connection

func Subscribe

func Subscribe(topic string, channel string) *Command

Subscribe creates a new Command to subscribe to the given topic/channel

func Touch

func Touch(id MessageID) *Command

Touch creates a new Command to reset the timeout for a given message (by id)

func UnRegister

func UnRegister(topic string, channel string) *Command

Unregister creates a new Command to remove a topic/channel for the connected nsqd

func (*Command) String

func (c *Command) String() string

String returns the name and parameters of the Command

func (*Command) Write

func (c *Command) Write(w io.Writer) error

Write serializes the Command to the supplied Writer.

It is suggested that the target Writer is buffered to avoid performing many system calls.

type FailedMessageLogger

type FailedMessageLogger interface {
	LogFailedMessage(message *Message)
}

FailedMessageLogger is an interface that can be implemented by handlers that wish to receive a callback when a message is deemed "failed" (i.e. the number of attempts exceeded the Reader specified MaxAttemptCount)

type FinishedMessage

type FinishedMessage struct {
	Id             MessageID
	RequeueDelayMs int
	Success        bool
}

FinishedMessage is the data type used over responseChan in AsyncHandlers

type Handler

type Handler interface {
	HandleMessage(message *Message) error
}

Handler is the synchronous interface to Reader.

Implement this interface for handlers that return whether or not message processing completed successfully.

When the return value is nil Reader will automatically handle FINishing.

When the returned value is non-nil Reader will automatically handle REQueing.

type Message

type Message struct {
	Id        MessageID
	Body      []byte
	Timestamp int64
	Attempts  uint16
	// contains filtered or unexported fields
}

Message is the fundamental data type containing the id, body, and metadata

func DecodeMessage

func DecodeMessage(byteBuf []byte) (*Message, error)

DecodeMessage deseralizes data (as []byte) and creates a new Message

func NewMessage

func NewMessage(id MessageID, body []byte) *Message

NewMessage creates a Message, initializes some metadata, and returns a pointer

func (*Message) EncodeBytes

func (m *Message) EncodeBytes() ([]byte, error)

EncodeBytes serializes the message into a new, returned, []byte

func (*Message) Requeue

func (m *Message) Requeue(timeoutMs int)

Requeue sends a REQUEUE command to the nsqd which sent this message, using the supplied delay

func (*Message) Touch

func (m *Message) Touch()

Touch sends a TOUCH command to the nsqd which sent this message

func (*Message) Write

func (m *Message) Write(w io.Writer) error

Write serializes the message into the supplied writer.

It is suggested that the target Writer is buffered to avoid performing many system calls.

type MessageID

type MessageID [MsgIdLength]byte

type Reader

type Reader struct {
	// 64bit atomic vars need to be first for proper alignment on 32bit platforms
	MessagesReceived uint64 // an atomic counter - # of messages received
	MessagesFinished uint64 // an atomic counter - # of messages FINished
	MessagesRequeued uint64 // an atomic counter - # of messages REQueued

	sync.RWMutex

	// basics
	TopicName       string   // name of topic to subscribe to
	ChannelName     string   // name of channel to subscribe to
	ShortIdentifier string   // an identifier to send to nsqd when connecting (defaults: short hostname)
	LongIdentifier  string   // an identifier to send to nsqd when connecting (defaults: long hostname)
	VerboseLogging  bool     // enable verbose logging
	ExitChan        chan int // read from this channel to block your main loop

	// network deadlines
	ReadTimeout  time.Duration // the deadline set for network reads
	WriteTimeout time.Duration // the deadline set for network writes

	// lookupd
	LookupdPollInterval time.Duration // duration between polling lookupd for new connections
	LookupdPollJitter   float64       // Maximum fractional amount of jitter to add to the lookupd pool loop. This helps evenly distribute requests even if multiple consumers restart at the same time.

	// requeue delays
	MaxRequeueDelay     time.Duration // the maximum duration when REQueueing (for doubling of deferred requeue)
	DefaultRequeueDelay time.Duration // the default duration when REQueueing
	BackoffMultiplier   time.Duration // the unit of time for calculating reader backoff

	// misc
	MaxAttemptCount   uint16        // maximum number of times this reader will attempt to process a message
	LowRdyIdleTimeout time.Duration // the amount of time in seconds to wait for a message from a producer when in a state where RDY counts are re-distributed (ie. max_in_flight < num_producers)

	// transport layer security
	TLSv1     bool        // negotiate enabling TLS
	TLSConfig *tls.Config // client TLS configuration

	// compression
	Deflate      bool // negotiate enabling Deflate compression
	DeflateLevel int  // the compression level to negotiate for Deflate
	Snappy       bool // negotiate enabling Snappy compression

	SampleRate int32 // set the sampleRate of the client's messagePump (requires nsqd 0.2.25+)
	// contains filtered or unexported fields
}

Reader is a high-level type to consume from NSQ.

A Reader instance is supplied handler(s) that will be executed concurrently via goroutines to handle processing the stream of messages consumed from the specified topic/channel. See: AsyncHandler and Handler for details on implementing those interfaces to create handlers.

If configured, it will poll nsqlookupd instances and handle connection (and reconnection) to any discovered nsqds.

func NewReader

func NewReader(topic string, channel string) (*Reader, error)

NewReader creates a new instance of Reader for the specified topic/channel

The returned Reader instance is setup with sane default values. To modify configuration, update the values on the returned instance before connecting.

func (*Reader) AddAsyncHandler

func (q *Reader) AddAsyncHandler(handler AsyncHandler)

AddAsyncHandler adds an AsyncHandler for messages received by this Reader.

See AsyncHandler for details on implementing this interface.

It's ok to start more than one handler simultaneously, they are concurrently executed in goroutines.

func (*Reader) AddHandler

func (q *Reader) AddHandler(handler Handler)

AddHandler adds a Handler for messages received by this Reader.

See Handler for details on implementing this interface.

It's ok to start more than one handler simultaneously, they are concurrently executed in goroutines.

func (*Reader) Configure

func (q *Reader) Configure(option string, value interface{}) error

Configure takes an option as a string and a value as an interface and attempts to set the appropriate configuration option on the reader instance.

It attempts to coerce the value into the right format depending on the named option and the underlying type of the value passed in.

It returns an error for an invalid option or value.

func (*Reader) ConnectToLookupd

func (q *Reader) ConnectToLookupd(addr string) error

ConnectToLookupd adds an nsqlookupd address to the list for this Reader instance.

If it is the first to be added, it initiates an HTTP request to discover nsqd producers for the configured topic.

A goroutine is spawned to handle continual polling.

func (*Reader) ConnectToNSQ

func (q *Reader) ConnectToNSQ(addr string) error

ConnectToNSQ takes a nsqd address to connect directly to.

It is recommended to use ConnectToLookupd so that topics are discovered automatically. This method is useful when you want to connect to a single, local, instance.

func (*Reader) ConnectionMaxInFlight

func (q *Reader) ConnectionMaxInFlight() int64

ConnectionMaxInFlight calculates the per-connection max-in-flight count.

This may change dynamically based on the number of connections to nsqd the Reader is responsible for.

func (*Reader) IsStarved

func (q *Reader) IsStarved() bool

IsStarved indicates whether any connections for this reader are blocked on processing before being able to receive more messages (ie. RDY count of 0 and not exiting)

func (*Reader) MaxInFlight

func (q *Reader) MaxInFlight() int

MaxInFlight returns the configured maximum number of messages to allow in-flight.

func (*Reader) SetMaxBackoffDuration

func (q *Reader) SetMaxBackoffDuration(duration time.Duration)

SetMaxBackoffDuration sets the maximum duration a connection will backoff from message processing

func (*Reader) SetMaxInFlight

func (q *Reader) SetMaxInFlight(maxInFlight int)

SetMaxInFlight sets the maximum number of messages this reader instance will allow in-flight.

If already connected, it updates the reader RDY state for each connection.

func (*Reader) Stop

func (q *Reader) Stop()

Stop will gracefully stop the Reader

type Writer

type Writer struct {
	net.Conn

	WriteTimeout      time.Duration
	Addr              string
	HeartbeatInterval time.Duration
	ShortIdentifier   string
	LongIdentifier    string
	// contains filtered or unexported fields
}

Writer is a high-level type to publish to NSQ.

A Writer instance is 1:1 with a destination `nsqd` and will lazily connect to that instance (and re-connect) when Publish commands are executed.

func NewWriter

func NewWriter(addr string) *Writer

NewWriter returns an instance of Writer for the specified address

func (*Writer) MultiPublish

func (w *Writer) MultiPublish(topic string, body [][]byte) (int32, []byte, error)

MultiPublish synchronously publishes a slice of message bodies to the specified topic, returning the response frameType, data, and error

func (*Writer) MultiPublishAsync

func (w *Writer) MultiPublishAsync(topic string, body [][]byte, doneChan chan *WriterTransaction, args ...interface{}) error

MultiPublishAsync publishes a slice of message bodies to the specified topic but does not wait for the response from `nsqd`.

When the Writer eventually receives the response from `nsqd`, the supplied `doneChan` (if specified) will receive a `WriterTransaction` instance with the supplied variadic arguments (and the response `FrameType`, `Data`, and `Error`)

func (*Writer) Publish

func (w *Writer) Publish(topic string, body []byte) (int32, []byte, error)

Publish synchronously publishes a message body to the specified topic, returning the response frameType, data, and error

func (*Writer) PublishAsync

func (w *Writer) PublishAsync(topic string, body []byte, doneChan chan *WriterTransaction, args ...interface{}) error

PublishAsync publishes a message body to the specified topic but does not wait for the response from `nsqd`.

When the Writer eventually receives the response from `nsqd`, the supplied `doneChan` (if specified) will receive a `WriterTransaction` instance with the supplied variadic arguments (and the response `FrameType`, `Data`, and `Error`)

func (*Writer) Stop

func (w *Writer) Stop()

Stop disconnects and permanently stops the Writer

func (*Writer) String

func (w *Writer) String() string

String returns the address of the Writer

type WriterTransaction

type WriterTransaction struct {
	FrameType int32         // the frame type received in response to the publish command
	Data      []byte        // the response data of the publish command
	Error     error         // the error (or nil) of the publish command
	Args      []interface{} // the slice of variadic arguments passed to PublishAsync or MultiPublishAsync
	// contains filtered or unexported fields
}

WriterTransaction is returned by the async publish methods to retrieve metadata about the command after the response is received.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL