Documentation ¶
Overview ¶
nsq is the official Go package for https://github.com/bitly/nsq
It provides high-level Reader and Writer types to implement consumers and producers as well as low-level functions to communicate over the NSQ protocol.
Index ¶
- Constants
- Variables
- func ApiRequest(endpoint string) (*simplejson.Json, error)
- func IsValidChannelName(name string) bool
- func IsValidTopicName(name string) bool
- func NewDeadlineTransport(timeout time.Duration) *http.Transport
- func ReadResponse(r io.Reader) ([]byte, error)
- func UnpackResponse(response []byte) (int32, []byte, error)
- type AsyncHandler
- type Command
- func Finish(id MessageID) *Command
- func Identify(js map[string]interface{}) (*Command, error)
- func MultiPublish(topic string, bodies [][]byte) (*Command, error)
- func Nop() *Command
- func Ping() *Command
- func Publish(topic string, body []byte) *Command
- func Ready(count int) *Command
- func Register(topic string, channel string) *Command
- func Requeue(id MessageID, timeoutMs int) *Command
- func StartClose() *Command
- func Subscribe(topic string, channel string) *Command
- func Touch(id MessageID) *Command
- func UnRegister(topic string, channel string) *Command
- type FailedMessageLogger
- type FinishedMessage
- type Handler
- type Message
- type MessageID
- type Reader
- func (q *Reader) AddAsyncHandler(handler AsyncHandler)
- func (q *Reader) AddHandler(handler Handler)
- func (q *Reader) Configure(option string, value interface{}) error
- func (q *Reader) ConnectToLookupd(addr string) error
- func (q *Reader) ConnectToNSQ(addr string) error
- func (q *Reader) ConnectionMaxInFlight() int64
- func (q *Reader) IsStarved() bool
- func (q *Reader) MaxInFlight() int
- func (q *Reader) SetMaxBackoffDuration(duration time.Duration)
- func (q *Reader) SetMaxInFlight(maxInFlight int)
- func (q *Reader) Stop()
- type Writer
- func (w *Writer) MultiPublish(topic string, body [][]byte) (int32, []byte, error)
- func (w *Writer) MultiPublishAsync(topic string, body [][]byte, doneChan chan *WriterTransaction, ...) error
- func (w *Writer) Publish(topic string, body []byte) (int32, []byte, error)
- func (w *Writer) PublishAsync(topic string, body []byte, doneChan chan *WriterTransaction, ...) error
- func (w *Writer) Stop()
- func (w *Writer) String() string
- type WriterTransaction
Constants ¶
const ( // when successful FrameTypeResponse int32 = 0 // when an error occurred FrameTypeError int32 = 1 // when it's a serialized message FrameTypeMessage int32 = 2 )
const ( StateInit = iota StateDisconnected StateConnected StateSubscribed // close has started. responses are ok, but no new messages will be sent StateClosing )
const DefaultClientTimeout = 60 * time.Second
The amount of time nsqd will allow a client to idle, can be overriden
const MsgIdLength = 16
The number of bytes for a Message.Id
const VERSION = "0.3.4"
Variables ¶
var ErrAlreadyConnected = errors.New("already connected")
returned from ConnectToNSQ when already connected
var ErrLookupdAddressExists = errors.New("lookupd address already exists")
returned from ConnectToLookupd when given lookupd address exists already
var ErrNotConnected = errors.New("not connected")
returned when a publish command is made against a Writer that is not connected
var ErrOverMaxInFlight = errors.New("over configure max-inflight")
returned from updateRdy if over max-in-flight
var ErrStopped = errors.New("stopped")
returned when a publish command is made against a Writer that has been stopped
var MagicV1 = []byte(" V1")
var MagicV2 = []byte(" V2")
Functions ¶
func ApiRequest ¶
ApiRequest is a helper function to perform an HTTP request and parse our NSQ daemon's expected response format, with deadlines.
{"status_code":200, "status_txt":"OK", "data":{...}}
func IsValidChannelName ¶
IsValidChannelName checks a channel name for correctness
func IsValidTopicName ¶
IsValidTopicName checks a topic name for correctness
func NewDeadlineTransport ¶
A custom http.Transport with support for deadline timeouts
func ReadResponse ¶
ReadResponse is a client-side utility function to read from the supplied Reader according to the NSQ protocol spec:
[x][x][x][x][x][x][x][x]... | (int32) || (binary) | 4-byte || N-byte ------------------------... size data
func UnpackResponse ¶
UnpackResponse is a client-side utility function that unpacks serialized data according to NSQ protocol spec:
[x][x][x][x][x][x][x][x]... | (int32) || (binary) | 4-byte || N-byte ------------------------... frame ID data
Returns a triplicate of: frame type, data ([]byte), error
Types ¶
type AsyncHandler ¶
type AsyncHandler interface {
HandleMessage(message *Message, responseChan chan *FinishedMessage)
}
AsyncHandler is the asynchronous interface to Reader.
Implement this interface for handlers that wish to defer responding until later. This is particularly useful if you want to batch work together.
An AsyncHandler must send:
&FinishedMessage{messageID, requeueDelay, true|false}
To the supplied responseChan to indicate that a message is processed.
type Command ¶
Command represents a command from a client to an NSQ daemon
func Finish ¶
Finish creates a new Command to indiciate that a given message (by id) has been processed successfully
func Identify ¶
Identify creates a new Command to provide information about the client. After connecting, it is generally the first message sent.
The supplied map is marshaled into JSON to provide some flexibility for this command to evolve over time.
See http://bitly.github.io/nsq/clients/tcp_protocol_spec.html#identify for information on the supported options
func MultiPublish ¶
MultiPublish creates a new Command to write more than one message to a given topic. This is useful for high-throughput situations to avoid roundtrips and saturate the pipe.
func Nop ¶
func Nop() *Command
Nop creates a new Command that has no effect server side. Commonly used to respond to heartbeats
func Ping ¶
func Ping() *Command
Ping creates a new Command to keep-alive the state of all the announced topic/channels for a given client
func Ready ¶
Ready creates a new Command to specify the number of messages a client is willing to receive
func Requeue ¶
Requeue creates a new Command to indicate that a given message (by id) should be requeued after the given timeout (in ms) NOTE: a timeout of 0 indicates immediate requeue
func StartClose ¶
func StartClose() *Command
StartClose creates a new Command to indicate that the client would like to start a close cycle. nsqd will no longer send messages to a client in this state and the client is expected finish pending messages and close the connection
func UnRegister ¶
Unregister creates a new Command to remove a topic/channel for the connected nsqd
type FailedMessageLogger ¶
type FailedMessageLogger interface {
LogFailedMessage(message *Message)
}
FailedMessageLogger is an interface that can be implemented by handlers that wish to receive a callback when a message is deemed "failed" (i.e. the number of attempts exceeded the Reader specified MaxAttemptCount)
type FinishedMessage ¶
FinishedMessage is the data type used over responseChan in AsyncHandlers
type Handler ¶
Handler is the synchronous interface to Reader.
Implement this interface for handlers that return whether or not message processing completed successfully.
When the return value is nil Reader will automatically handle FINishing.
When the returned value is non-nil Reader will automatically handle REQueing.
type Message ¶
type Message struct { Id MessageID Body []byte Timestamp int64 Attempts uint16 // contains filtered or unexported fields }
Message is the fundamental data type containing the id, body, and metadata
func DecodeMessage ¶
DecodeMessage deseralizes data (as []byte) and creates a new Message
func NewMessage ¶
NewMessage creates a Message, initializes some metadata, and returns a pointer
func (*Message) EncodeBytes ¶
EncodeBytes serializes the message into a new, returned, []byte
func (*Message) Requeue ¶
Requeue sends a REQUEUE command to the nsqd which sent this message, using the supplied delay
type MessageID ¶
type MessageID [MsgIdLength]byte
type Reader ¶
type Reader struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms MessagesReceived uint64 // an atomic counter - # of messages received MessagesFinished uint64 // an atomic counter - # of messages FINished MessagesRequeued uint64 // an atomic counter - # of messages REQueued sync.RWMutex // basics TopicName string // name of topic to subscribe to ChannelName string // name of channel to subscribe to ShortIdentifier string // an identifier to send to nsqd when connecting (defaults: short hostname) LongIdentifier string // an identifier to send to nsqd when connecting (defaults: long hostname) VerboseLogging bool // enable verbose logging ExitChan chan int // read from this channel to block your main loop // network deadlines ReadTimeout time.Duration // the deadline set for network reads WriteTimeout time.Duration // the deadline set for network writes // lookupd LookupdPollInterval time.Duration // duration between polling lookupd for new connections LookupdPollJitter float64 // Maximum fractional amount of jitter to add to the lookupd pool loop. This helps evenly distribute requests even if multiple consumers restart at the same time. // requeue delays MaxRequeueDelay time.Duration // the maximum duration when REQueueing (for doubling of deferred requeue) DefaultRequeueDelay time.Duration // the default duration when REQueueing BackoffMultiplier time.Duration // the unit of time for calculating reader backoff // misc MaxAttemptCount uint16 // maximum number of times this reader will attempt to process a message LowRdyIdleTimeout time.Duration // the amount of time in seconds to wait for a message from a producer when in a state where RDY counts are re-distributed (ie. max_in_flight < num_producers) // transport layer security TLSv1 bool // negotiate enabling TLS TLSConfig *tls.Config // client TLS configuration // compression Deflate bool // negotiate enabling Deflate compression DeflateLevel int // the compression level to negotiate for Deflate Snappy bool // negotiate enabling Snappy compression SampleRate int32 // set the sampleRate of the client's messagePump (requires nsqd 0.2.25+) // contains filtered or unexported fields }
Reader is a high-level type to consume from NSQ.
A Reader instance is supplied handler(s) that will be executed concurrently via goroutines to handle processing the stream of messages consumed from the specified topic/channel. See: AsyncHandler and Handler for details on implementing those interfaces to create handlers.
If configured, it will poll nsqlookupd instances and handle connection (and reconnection) to any discovered nsqds.
func NewReader ¶
NewReader creates a new instance of Reader for the specified topic/channel
The returned Reader instance is setup with sane default values. To modify configuration, update the values on the returned instance before connecting.
func (*Reader) AddAsyncHandler ¶
func (q *Reader) AddAsyncHandler(handler AsyncHandler)
AddAsyncHandler adds an AsyncHandler for messages received by this Reader.
See AsyncHandler for details on implementing this interface.
It's ok to start more than one handler simultaneously, they are concurrently executed in goroutines.
func (*Reader) AddHandler ¶
AddHandler adds a Handler for messages received by this Reader.
See Handler for details on implementing this interface.
It's ok to start more than one handler simultaneously, they are concurrently executed in goroutines.
func (*Reader) Configure ¶
Configure takes an option as a string and a value as an interface and attempts to set the appropriate configuration option on the reader instance.
It attempts to coerce the value into the right format depending on the named option and the underlying type of the value passed in.
It returns an error for an invalid option or value.
func (*Reader) ConnectToLookupd ¶
ConnectToLookupd adds an nsqlookupd address to the list for this Reader instance.
If it is the first to be added, it initiates an HTTP request to discover nsqd producers for the configured topic.
A goroutine is spawned to handle continual polling.
func (*Reader) ConnectToNSQ ¶
ConnectToNSQ takes a nsqd address to connect directly to.
It is recommended to use ConnectToLookupd so that topics are discovered automatically. This method is useful when you want to connect to a single, local, instance.
func (*Reader) ConnectionMaxInFlight ¶
ConnectionMaxInFlight calculates the per-connection max-in-flight count.
This may change dynamically based on the number of connections to nsqd the Reader is responsible for.
func (*Reader) IsStarved ¶
IsStarved indicates whether any connections for this reader are blocked on processing before being able to receive more messages (ie. RDY count of 0 and not exiting)
func (*Reader) MaxInFlight ¶
MaxInFlight returns the configured maximum number of messages to allow in-flight.
func (*Reader) SetMaxBackoffDuration ¶
SetMaxBackoffDuration sets the maximum duration a connection will backoff from message processing
func (*Reader) SetMaxInFlight ¶
SetMaxInFlight sets the maximum number of messages this reader instance will allow in-flight.
If already connected, it updates the reader RDY state for each connection.
type Writer ¶
type Writer struct { net.Conn WriteTimeout time.Duration Addr string HeartbeatInterval time.Duration ShortIdentifier string LongIdentifier string // contains filtered or unexported fields }
Writer is a high-level type to publish to NSQ.
A Writer instance is 1:1 with a destination `nsqd` and will lazily connect to that instance (and re-connect) when Publish commands are executed.
func (*Writer) MultiPublish ¶
MultiPublish synchronously publishes a slice of message bodies to the specified topic, returning the response frameType, data, and error
func (*Writer) MultiPublishAsync ¶
func (w *Writer) MultiPublishAsync(topic string, body [][]byte, doneChan chan *WriterTransaction, args ...interface{}) error
MultiPublishAsync publishes a slice of message bodies to the specified topic but does not wait for the response from `nsqd`.
When the Writer eventually receives the response from `nsqd`, the supplied `doneChan` (if specified) will receive a `WriterTransaction` instance with the supplied variadic arguments (and the response `FrameType`, `Data`, and `Error`)
func (*Writer) Publish ¶
Publish synchronously publishes a message body to the specified topic, returning the response frameType, data, and error
func (*Writer) PublishAsync ¶
func (w *Writer) PublishAsync(topic string, body []byte, doneChan chan *WriterTransaction, args ...interface{}) error
PublishAsync publishes a message body to the specified topic but does not wait for the response from `nsqd`.
When the Writer eventually receives the response from `nsqd`, the supplied `doneChan` (if specified) will receive a `WriterTransaction` instance with the supplied variadic arguments (and the response `FrameType`, `Data`, and `Error`)
type WriterTransaction ¶
type WriterTransaction struct { FrameType int32 // the frame type received in response to the publish command Data []byte // the response data of the publish command Error error // the error (or nil) of the publish command Args []interface{} // the slice of variadic arguments passed to PublishAsync or MultiPublishAsync // contains filtered or unexported fields }
WriterTransaction is returned by the async publish methods to retrieve metadata about the command after the response is received.