nsq

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2015 License: MIT, MIT Imports: 27 Imported by: 0

README

go-nsq

Build Status GoDoc GitHub release

The official Go package for NSQ.

Docs

See godoc and the main repo apps directory for examples of clients built using this package.

Tests

Tests are run via ./test.sh (which requires nsqd and nsqlookupd to be installed).

Documentation

Overview

Package nsq is the official Go package for NSQ (http://nsq.io/)

It provides high-level Consumer and Producer types as well as low-level functions to communicate over the NSQ protocol

Index

Constants

View Source
const (
	FrameTypeResponse int32 = 0
	FrameTypeError    int32 = 1
	FrameTypeMessage  int32 = 2
)

frame types

View Source
const (
	StateInit = iota
	StateDisconnected
	StateConnected
	StateSubscribed
	// StateClosing means CLOSE has started...
	// (responses are ok, but no new messages will be sent)
	StateClosing
)

states

View Source
const MsgIDLength = 16

The number of bytes for a Message.ID

View Source
const VERSION = "1.0.5"

VERSION

Variables

View Source
var ErrAlreadyConnected = errors.New("already connected")

ErrAlreadyConnected is returned from ConnectToNSQD when already connected

View Source
var ErrClosing = errors.New("closing")

ErrClosing is returned when a connection is closing

View Source
var ErrNotConnected = errors.New("not connected")

ErrNotConnected is returned when a publish command is made against a Producer that is not connected

View Source
var ErrOverMaxInFlight = errors.New("over configure max-inflight")

ErrOverMaxInFlight is returned from Consumer if over max-in-flight

View Source
var ErrStopped = errors.New("stopped")

ErrStopped is returned when a publish command is made against a Producer that has been stopped

View Source
var MagicV1 = []byte("  V1")

MagicV1 is the initial identifier sent when connecting for V1 clients

View Source
var MagicV2 = []byte("  V2")

MagicV2 is the initial identifier sent when connecting for V2 clients

Functions

func IsValidChannelName

func IsValidChannelName(name string) bool

IsValidChannelName checks a channel name for correctness

func IsValidTopicName

func IsValidTopicName(name string) bool

IsValidTopicName checks a topic name for correctness

func ReadResponse

func ReadResponse(r io.Reader) ([]byte, error)

ReadResponse is a client-side utility function to read from the supplied Reader according to the NSQ protocol spec:

[x][x][x][x][x][x][x][x]...
|  (int32) || (binary)
|  4-byte  || N-byte
------------------------...
    size       data

func ReadUnpackedResponse

func ReadUnpackedResponse(r io.Reader) (int32, []byte, error)

ReadUnpackedResponse reads and parses data from the underlying TCP connection according to the NSQ TCP protocol spec and returns the frameType, data or error

func UnpackResponse

func UnpackResponse(response []byte) (int32, []byte, error)

UnpackResponse is a client-side utility function that unpacks serialized data according to NSQ protocol spec:

[x][x][x][x][x][x][x][x]...
|  (int32) || (binary)
|  4-byte  || N-byte
------------------------...
  frame ID     data

Returns a triplicate of: frame type, data ([]byte), error

Types

type AuthResponse

type AuthResponse struct {
	Identity        string `json:"identity"`
	IdentityUrl     string `json:"identity_url"`
	PermissionCount int64  `json:"permission_count"`
}

AuthResponse represents the metadata returned from an AUTH command to nsqd

type BackoffStrategy

type BackoffStrategy interface {
	Calculate(attempt int) time.Duration
}

BackoffStrategy defines a strategy for calculating the duration of time a consumer should backoff for a given attempt

type Command

type Command struct {
	Name   []byte
	Params [][]byte
	Body   []byte
}

Command represents a command from a client to an NSQ daemon

func Auth

func Auth(secret string) (*Command, error)

Auth sends credentials for authentication

After `Identify`, this is usually the first message sent, if auth is used.

func DeferredPublish

func DeferredPublish(topic string, delay time.Duration, body []byte) *Command

DeferredPublish creates a new Command to write a message to a given topic where the message will queue at the channel level until the timeout expires

func Finish

func Finish(id MessageID) *Command

Finish creates a new Command to indiciate that a given message (by id) has been processed successfully

func Identify

func Identify(js map[string]interface{}) (*Command, error)

Identify creates a new Command to provide information about the client. After connecting, it is generally the first message sent.

The supplied map is marshaled into JSON to provide some flexibility for this command to evolve over time.

See http://nsq.io/clients/tcp_protocol_spec.html#identify for information on the supported options

func MultiPublish

func MultiPublish(topic string, bodies [][]byte) (*Command, error)

MultiPublish creates a new Command to write more than one message to a given topic (useful for high-throughput situations to avoid roundtrips and saturate the pipe)

func Nop

func Nop() *Command

Nop creates a new Command that has no effect server side. Commonly used to respond to heartbeats

func Ping

func Ping() *Command

Ping creates a new Command to keep-alive the state of all the announced topic/channels for a given client

func Publish

func Publish(topic string, body []byte) *Command

Publish creates a new Command to write a message to a given topic

func Ready

func Ready(count int) *Command

Ready creates a new Command to specify the number of messages a client is willing to receive

func Register

func Register(topic string, channel string) *Command

Register creates a new Command to add a topic/channel for the connected nsqd

func Requeue

func Requeue(id MessageID, delay time.Duration) *Command

Requeue creates a new Command to indicate that a given message (by id) should be requeued after the given delay NOTE: a delay of 0 indicates immediate requeue

func StartClose

func StartClose() *Command

StartClose creates a new Command to indicate that the client would like to start a close cycle. nsqd will no longer send messages to a client in this state and the client is expected finish pending messages and close the connection

func Subscribe

func Subscribe(topic string, channel string) *Command

Subscribe creates a new Command to subscribe to the given topic/channel

func Touch

func Touch(id MessageID) *Command

Touch creates a new Command to reset the timeout for a given message (by id)

func UnRegister

func UnRegister(topic string, channel string) *Command

UnRegister creates a new Command to remove a topic/channel for the connected nsqd

func (*Command) String

func (c *Command) String() string

String returns the name and parameters of the Command

func (*Command) WriteTo

func (c *Command) WriteTo(w io.Writer) (int64, error)

WriteTo implements the WriterTo interface and serializes the Command to the supplied Writer.

It is suggested that the target Writer is buffered to avoid performing many system calls.

type Config

type Config struct {
	DialTimeout time.Duration `opt:"dial_timeout" default:"1s"`

	// Deadlines for network reads and writes
	ReadTimeout  time.Duration `opt:"read_timeout" min:"100ms" max:"5m" default:"60s"`
	WriteTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m" default:"1s"`

	// LocalAddr is the local address to use when dialing an nsqd.
	// If empty, a local address is automatically chosen.
	LocalAddr net.Addr `opt:"local_addr"`

	// Duration between polling lookupd for new producers, and fractional jitter to add to
	// the lookupd pool loop. this helps evenly distribute requests even if multiple consumers
	// restart at the same time
	//
	// NOTE: when not using nsqlookupd, LookupdPollInterval represents the duration of time between
	// reconnection attempts
	LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s"`
	LookupdPollJitter   float64       `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"`

	// Maximum duration when REQueueing (for doubling of deferred requeue)
	MaxRequeueDelay     time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"`
	DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"`

	// Backoff strategy, defaults to exponential backoff. Overwrite this to define alternative backoff algrithms.
	BackoffStrategy BackoffStrategy `opt:"backoff_strategy" default:"exponential"`
	// Maximum amount of time to backoff when processing fails 0 == no backoff
	MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"`
	// Unit of time for calculating consumer backoff
	BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"`

	// Maximum number of times this consumer will attempt to process a message before giving up
	MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`

	// Duration to wait for a message from a producer when in a state where RDY
	// counts are re-distributed (ie. max_in_flight < num_producers)
	LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"`

	// Duration between redistributing max-in-flight to connections
	RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s"`

	// Identifiers sent to nsqd representing this client
	// UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>")
	ClientID  string `opt:"client_id"` // (defaults: short hostname)
	Hostname  string `opt:"hostname"`
	UserAgent string `opt:"user_agent"`

	// Duration of time between heartbeats. This must be less than ReadTimeout
	HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s"`
	// Integer percentage to sample the channel (requires nsqd 0.2.25+)
	SampleRate int32 `opt:"sample_rate" min:"0" max:"99"`

	// To set TLS config, use the following options:
	//
	// tls_v1 - Bool enable TLS negotiation
	// tls_root_ca_file - String path to file containing root CA
	// tls_insecure_skip_verify - Bool indicates whether this client should verify server certificates
	// tls_cert - String path to file containing public key for certificate
	// tls_key - String path to file containing private key for certificate
	// tls_min_version - String indicating the minimum version of tls acceptable ('ssl3.0', 'tls1.0', 'tls1.1', 'tls1.2')
	//
	TlsV1     bool        `opt:"tls_v1"`
	TlsConfig *tls.Config `opt:"tls_config"`

	// Compression Settings
	Deflate      bool `opt:"deflate"`
	DeflateLevel int  `opt:"deflate_level" min:"1" max:"9" default:"6"`
	Snappy       bool `opt:"snappy"`

	// Size of the buffer (in bytes) used by nsqd for buffering writes to this connection
	OutputBufferSize int64 `opt:"output_buffer_size" default:"16384"`
	// Timeout used by nsqd before flushing buffered writes (set to 0 to disable).
	//
	// WARNING: configuring clients with an extremely low
	// (< 25ms) output_buffer_timeout has a significant effect
	// on nsqd CPU usage (particularly with > 50 clients connected).
	OutputBufferTimeout time.Duration `opt:"output_buffer_timeout" default:"250ms"`

	// Maximum number of messages to allow in flight (concurrency knob)
	MaxInFlight int `opt:"max_in_flight" min:"0" default:"1"`

	// The server-side message timeout for messages delivered to this client
	MsgTimeout time.Duration `opt:"msg_timeout" min:"0"`

	// secret for nsqd authentication (requires nsqd 0.2.29+)
	AuthSecret string `opt:"auth_secret"`
	// contains filtered or unexported fields
}

Config is a struct of NSQ options

The only valid way to create a Config is via NewConfig, using a struct literal will panic. After Config is passed into a high-level type (like Consumer, Producer, etc.) the values are no longer mutable (they are copied).

Use Set(option string, value interface{}) as an alternate way to set parameters

func NewConfig

func NewConfig() *Config

NewConfig returns a new default nsq configuration.

This must be used to initialize Config structs. Values can be set directly, or through Config.Set()

func (*Config) Set

func (c *Config) Set(option string, value interface{}) error

Set takes an option as a string and a value as an interface and attempts to set the appropriate configuration option.

It attempts to coerce the value into the right format depending on the named option and the underlying type of the value passed in.

Calls to Set() that take a time.Duration as an argument can be input as:

"1000ms" (a string parsed by time.ParseDuration())
1000 (an integer interpreted as milliseconds)
1000*time.Millisecond (a literal time.Duration value)

Calls to Set() that take bool can be input as:

"true" (a string parsed by strconv.ParseBool())
true (a boolean)
1 (an int where 1 == true and 0 == false)

It returns an error for an invalid option or value.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks that all values are within specified min/max ranges

type ConfigFlag

type ConfigFlag struct {
	*Config
}

ConfigFlag wraps a Config and implements the flag.Value interface

func (*ConfigFlag) Set

func (c *ConfigFlag) Set(opt string) (err error)

Set takes a comma separated value and follows the rules in Config.Set using the first field as the option key, and the second (if present) as the value

func (*ConfigFlag) String

func (c *ConfigFlag) String() string

String implements the flag.Value interface

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn represents a connection to nsqd

Conn exposes a set of callbacks for the various events that occur on a connection

func NewConn

func NewConn(addr string, config *Config, delegate ConnDelegate) *Conn

NewConn returns a new Conn instance

func (*Conn) Close

func (c *Conn) Close() error

Close idempotently initiates connection close

func (*Conn) Connect

func (c *Conn) Connect() (*IdentifyResponse, error)

Connect dials and bootstraps the nsqd connection (including IDENTIFY) and returns the IdentifyResponse

func (*Conn) Flush

func (c *Conn) Flush() error

Flush writes all buffered data to the underlying TCP connection

func (*Conn) IsClosing

func (c *Conn) IsClosing() bool

IsClosing indicates whether or not the connection is currently in the processing of gracefully closing

func (*Conn) LastMessageTime

func (c *Conn) LastMessageTime() time.Time

LastMessageTime returns a time.Time representing the time at which the last message was received

func (*Conn) LastRDY

func (c *Conn) LastRDY() int64

LastRDY returns the previously set RDY count

func (*Conn) MaxRDY

func (c *Conn) MaxRDY() int64

MaxRDY returns the nsqd negotiated maximum RDY count that it will accept for this connection

func (*Conn) RDY

func (c *Conn) RDY() int64

RDY returns the current RDY count

func (*Conn) Read

func (c *Conn) Read(p []byte) (int, error)

Read performs a deadlined read on the underlying TCP connection

func (*Conn) RemoteAddr

func (c *Conn) RemoteAddr() net.Addr

RemoteAddr returns the configured destination nsqd address

func (*Conn) SetLogger

func (c *Conn) SetLogger(l logger, lvl LogLevel, format string)

SetLogger assigns the logger to use as well as a level.

The format parameter is expected to be a printf compatible string with a single %s argument. This is useful if you want to provide additional context to the log messages that the connection will print, the default is '(%s)'.

The logger parameter is an interface that requires the following method to be implemented (such as the the stdlib log.Logger):

Output(calldepth int, s string)

func (*Conn) SetRDY

func (c *Conn) SetRDY(rdy int64)

SetRDY stores the specified RDY count

func (*Conn) String

func (c *Conn) String() string

String returns the fully-qualified address

func (*Conn) Write

func (c *Conn) Write(p []byte) (int, error)

Write performs a deadlined write on the underlying TCP connection

func (*Conn) WriteCommand

func (c *Conn) WriteCommand(cmd *Command) error

WriteCommand is a goroutine safe method to write a Command to this connection, and flush.

type ConnDelegate

type ConnDelegate interface {
	// OnResponse is called when the connection
	// receives a FrameTypeResponse from nsqd
	OnResponse(*Conn, []byte)

	// OnError is called when the connection
	// receives a FrameTypeError from nsqd
	OnError(*Conn, []byte)

	// OnMessage is called when the connection
	// receives a FrameTypeMessage from nsqd
	OnMessage(*Conn, *Message)

	// OnMessageFinished is called when the connection
	// handles a FIN command from a message handler
	OnMessageFinished(*Conn, *Message)

	// OnMessageRequeued is called when the connection
	// handles a REQ command from a message handler
	OnMessageRequeued(*Conn, *Message)

	// OnBackoff is called when the connection triggers a backoff state
	OnBackoff(*Conn)

	// OnContinue is called when the connection finishes a message without adjusting backoff state
	OnContinue(*Conn)

	// OnResume is called when the connection triggers a resume state
	OnResume(*Conn)

	// OnIOError is called when the connection experiences
	// a low-level TCP transport error
	OnIOError(*Conn, error)

	// OnHeartbeat is called when the connection
	// receives a heartbeat from nsqd
	OnHeartbeat(*Conn)

	// OnClose is called when the connection
	// closes, after all cleanup
	OnClose(*Conn)
}

ConnDelegate is an interface of methods that are used as callbacks in Conn

type Consumer

type Consumer struct {

	// read from this channel to block until consumer is cleanly stopped
	StopChan chan int
	// contains filtered or unexported fields
}

Consumer is a high-level type to consume from NSQ.

A Consumer instance is supplied a Handler that will be executed concurrently via goroutines to handle processing the stream of messages consumed from the specified topic/channel. See: Handler/HandlerFunc for details on implementing the interface to create handlers.

If configured, it will poll nsqlookupd instances and handle connection (and reconnection) to any discovered nsqds.

func NewConsumer

func NewConsumer(topic string, channel string, config *Config) (*Consumer, error)

NewConsumer creates a new instance of Consumer for the specified topic/channel

The only valid way to create a Config is via NewConfig, using a struct literal will panic. After Config is passed into NewConsumer the values are no longer mutable (they are copied).

func (*Consumer) AddConcurrentHandlers

func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int)

AddConcurrentHandlers sets the Handler for messages received by this Consumer. It takes a second argument which indicates the number of goroutines to spawn for message handling.

This panics if called after connecting to NSQD or NSQ Lookupd

(see Handler or HandlerFunc for details on implementing this interface)

func (*Consumer) AddHandler

func (r *Consumer) AddHandler(handler Handler)

AddHandler sets the Handler for messages received by this Consumer. This can be called multiple times to add additional handlers. Handler will have a 1:1 ratio to message handling goroutines.

This panics if called after connecting to NSQD or NSQ Lookupd

(see Handler or HandlerFunc for details on implementing this interface)

func (*Consumer) ChangeMaxInFlight

func (r *Consumer) ChangeMaxInFlight(maxInFlight int)

ChangeMaxInFlight sets a new maximum number of messages this comsumer instance will allow in-flight, and updates all existing connections as appropriate.

For example, ChangeMaxInFlight(0) would pause message flow

If already connected, it updates the reader RDY state for each connection.

func (*Consumer) ConnectToNSQD

func (r *Consumer) ConnectToNSQD(addr string) error

ConnectToNSQD takes a nsqd address to connect directly to.

It is recommended to use ConnectToNSQLookupd so that topics are discovered automatically. This method is useful when you want to connect to a single, local, instance.

func (*Consumer) ConnectToNSQDs

func (r *Consumer) ConnectToNSQDs(addresses []string) error

ConnectToNSQDs takes multiple nsqd addresses to connect directly to.

It is recommended to use ConnectToNSQLookupd so that topics are discovered automatically. This method is useful when you want to connect to local instance.

func (*Consumer) ConnectToNSQLookupd

func (r *Consumer) ConnectToNSQLookupd(addr string) error

ConnectToNSQLookupd adds an nsqlookupd address to the list for this Consumer instance.

If it is the first to be added, it initiates an HTTP request to discover nsqd producers for the configured topic.

A goroutine is spawned to handle continual polling.

func (*Consumer) ConnectToNSQLookupds

func (r *Consumer) ConnectToNSQLookupds(addresses []string) error

ConnectToNSQLookupds adds multiple nsqlookupd address to the list for this Consumer instance.

If adding the first address it initiates an HTTP request to discover nsqd producers for the configured topic.

A goroutine is spawned to handle continual polling.

func (*Consumer) DisconnectFromNSQD

func (r *Consumer) DisconnectFromNSQD(addr string) error

DisconnectFromNSQD closes the connection to and removes the specified `nsqd` address from the list

func (*Consumer) DisconnectFromNSQLookupd

func (r *Consumer) DisconnectFromNSQLookupd(addr string) error

DisconnectFromNSQLookupd removes the specified `nsqlookupd` address from the list used for periodic discovery.

func (*Consumer) IsStarved

func (r *Consumer) IsStarved() bool

IsStarved indicates whether any connections for this consumer are blocked on processing before being able to receive more messages (ie. RDY count of 0 and not exiting)

func (*Consumer) SetBehaviorDelegate

func (r *Consumer) SetBehaviorDelegate(cb interface{})

SetBehaviorDelegate takes a type implementing one or more of the following interfaces that modify the behavior of the `Consumer`:

DiscoveryFilter

func (*Consumer) SetLogger

func (r *Consumer) SetLogger(l logger, lvl LogLevel)

SetLogger assigns the logger to use as well as a level

The logger parameter is an interface that requires the following method to be implemented (such as the the stdlib log.Logger):

Output(calldepth int, s string)

func (*Consumer) Stats

func (r *Consumer) Stats() *ConsumerStats

Stats retrieves the current connection and message statistics for a Consumer

func (*Consumer) Stop

func (r *Consumer) Stop()

Stop will initiate a graceful stop of the Consumer (permanent)

NOTE: receive on StopChan to block until this process completes

type ConsumerStats

type ConsumerStats struct {
	MessagesReceived uint64
	MessagesFinished uint64
	MessagesRequeued uint64
	Connections      int
}

ConsumerStats represents a snapshot of the state of a Consumer's connections and the messages it has seen

type DiscoveryFilter

type DiscoveryFilter interface {
	Filter([]string) []string
}

DiscoveryFilter is an interface accepted by `SetBehaviorDelegate()` for filtering the nsqds returned from discovery via nsqlookupd

type ErrIdentify

type ErrIdentify struct {
	Reason string
}

ErrIdentify is returned from Conn as part of the IDENTIFY handshake

func (ErrIdentify) Error

func (e ErrIdentify) Error() string

Error returns a stringified error

type ErrProtocol

type ErrProtocol struct {
	Reason string
}

ErrProtocol is returned from Producer when encountering an NSQ protocol level error

func (ErrProtocol) Error

func (e ErrProtocol) Error() string

Error returns a stringified error

type ExponentialStrategy

type ExponentialStrategy struct {
	// contains filtered or unexported fields
}

ExponentialStrategy implements an exponential backoff strategy (default)

func (*ExponentialStrategy) Calculate

func (s *ExponentialStrategy) Calculate(attempt int) time.Duration

Calculate returns a duration of time: 2 ^ attempt

type FailedMessageLogger

type FailedMessageLogger interface {
	LogFailedMessage(message *Message)
}

FailedMessageLogger is an interface that can be implemented by handlers that wish to receive a callback when a message is deemed "failed" (i.e. the number of attempts exceeded the Consumer specified MaxAttemptCount)

type FullJitterStrategy

type FullJitterStrategy struct {
	// contains filtered or unexported fields
}

FullJitterStrategy implements http://www.awsarchitectureblog.com/2015/03/backoff.html

func (*FullJitterStrategy) Calculate

func (s *FullJitterStrategy) Calculate(attempt int) time.Duration

Calculate returns a random duration of time [0, 2 ^ attempt]

type Handler

type Handler interface {
	HandleMessage(message *Message) error
}

Handler is the message processing interface for Consumer

Implement this interface for handlers that return whether or not message processing completed successfully.

When the return value is nil Consumer will automatically handle FINishing.

When the returned value is non-nil Consumer will automatically handle REQueing.

type HandlerFunc

type HandlerFunc func(message *Message) error

HandlerFunc is a convenience type to avoid having to declare a struct to implement the Handler interface, it can be used like this:

consumer.AddHandler(nsq.HandlerFunc(func(m *Message) error {
	// handle the message
}))

func (HandlerFunc) HandleMessage

func (h HandlerFunc) HandleMessage(m *Message) error

HandleMessage implements the Handler interface

type IdentifyResponse

type IdentifyResponse struct {
	MaxRdyCount  int64 `json:"max_rdy_count"`
	TLSv1        bool  `json:"tls_v1"`
	Deflate      bool  `json:"deflate"`
	Snappy       bool  `json:"snappy"`
	AuthRequired bool  `json:"auth_required"`
}

IdentifyResponse represents the metadata returned from an IDENTIFY command to nsqd

type LogLevel

type LogLevel int

LogLevel specifies the severity of a given log message

const (
	LogLevelDebug LogLevel = iota
	LogLevelInfo
	LogLevelWarning
	LogLevelError
)

Log levels

func (LogLevel) String

func (lvl LogLevel) String() string

String returns the string form for a given LogLevel

type Message

type Message struct {
	ID        MessageID
	Body      []byte
	Timestamp int64
	Attempts  uint16

	NSQDAddress string

	Delegate MessageDelegate
	// contains filtered or unexported fields
}

Message is the fundamental data type containing the id, body, and metadata

func DecodeMessage

func DecodeMessage(b []byte) (*Message, error)

DecodeMessage deseralizes data (as []byte) and creates a new Message

func NewMessage

func NewMessage(id MessageID, body []byte) *Message

NewMessage creates a Message, initializes some metadata, and returns a pointer

func (*Message) DisableAutoResponse

func (m *Message) DisableAutoResponse()

DisableAutoResponse disables the automatic response that would normally be sent when a handler.HandleMessage returns (FIN/REQ based on the error value returned).

This is useful if you want to batch, buffer, or asynchronously respond to messages.

func (*Message) Finish

func (m *Message) Finish()

Finish sends a FIN command to the nsqd which sent this message

func (*Message) HasResponded

func (m *Message) HasResponded() bool

HasResponded indicates whether or not this message has been responded to

func (*Message) IsAutoResponseDisabled

func (m *Message) IsAutoResponseDisabled() bool

IsAutoResponseDisabled indicates whether or not this message will be responded to automatically

func (*Message) Requeue

func (m *Message) Requeue(delay time.Duration)

Requeue sends a REQ command to the nsqd which sent this message, using the supplied delay.

A delay of -1 will automatically calculate based on the number of attempts and the configured default_requeue_delay

func (*Message) RequeueWithoutBackoff

func (m *Message) RequeueWithoutBackoff(delay time.Duration)

RequeueWithoutBackoff sends a REQ command to the nsqd which sent this message, using the supplied delay.

Notably, using this method to respond does not trigger a backoff event on the configured Delegate.

func (*Message) Touch

func (m *Message) Touch()

Touch sends a TOUCH command to the nsqd which sent this message

func (*Message) WriteTo

func (m *Message) WriteTo(w io.Writer) (int64, error)

WriteTo implements the WriterTo interface and serializes the message into the supplied producer.

It is suggested that the target Writer is buffered to avoid performing many system calls.

type MessageDelegate

type MessageDelegate interface {
	// OnFinish is called when the Finish() method
	// is triggered on the Message
	OnFinish(*Message)

	// OnRequeue is called when the Requeue() method
	// is triggered on the Message
	OnRequeue(m *Message, delay time.Duration, backoff bool)

	// OnTouch is called when the Touch() method
	// is triggered on the Message
	OnTouch(*Message)
}

MessageDelegate is an interface of methods that are used as callbacks in Message

type MessageID

type MessageID [MsgIDLength]byte

MessageID is the ASCII encoded hexadecimal message ID

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer is a high-level type to publish to NSQ.

A Producer instance is 1:1 with a destination `nsqd` and will lazily connect to that instance (and re-connect) when Publish commands are executed.

func NewProducer

func NewProducer(addr string, config *Config) (*Producer, error)

NewProducer returns an instance of Producer for the specified address

The only valid way to create a Config is via NewConfig, using a struct literal will panic. After Config is passed into NewProducer the values are no longer mutable (they are copied).

func (*Producer) DeferredPublish

func (w *Producer) DeferredPublish(topic string, delay time.Duration, body []byte) error

DeferredPublish synchronously publishes a message body to the specified topic where the message will queue at the channel level until the timeout expires, returning an error if publish failed

func (*Producer) DeferredPublishAsync

func (w *Producer) DeferredPublishAsync(topic string, delay time.Duration, body []byte,
	doneChan chan *ProducerTransaction, args ...interface{}) error

DeferredPublishAsync publishes a message body to the specified topic where the message will queue at the channel level until the timeout expires but does not wait for the response from `nsqd`.

When the Producer eventually receives the response from `nsqd`, the supplied `doneChan` (if specified) will receive a `ProducerTransaction` instance with the supplied variadic arguments and the response error if present

func (*Producer) MultiPublish

func (w *Producer) MultiPublish(topic string, body [][]byte) error

MultiPublish synchronously publishes a slice of message bodies to the specified topic, returning an error if publish failed

func (*Producer) MultiPublishAsync

func (w *Producer) MultiPublishAsync(topic string, body [][]byte, doneChan chan *ProducerTransaction,
	args ...interface{}) error

MultiPublishAsync publishes a slice of message bodies to the specified topic but does not wait for the response from `nsqd`.

When the Producer eventually receives the response from `nsqd`, the supplied `doneChan` (if specified) will receive a `ProducerTransaction` instance with the supplied variadic arguments and the response error if present

func (*Producer) Ping

func (w *Producer) Ping() error

Ping causes the Producer to connect to it's configured nsqd (if not already connected) and send a `Nop` command, returning any error that might occur.

This method can be used to verify that a newly-created Producer instance is configured correctly, rather than relying on the lazy "connect on Publish" behavior of a Producer.

func (*Producer) Publish

func (w *Producer) Publish(topic string, body []byte) error

Publish synchronously publishes a message body to the specified topic, returning an error if publish failed

func (*Producer) PublishAsync

func (w *Producer) PublishAsync(topic string, body []byte, doneChan chan *ProducerTransaction,
	args ...interface{}) error

PublishAsync publishes a message body to the specified topic but does not wait for the response from `nsqd`.

When the Producer eventually receives the response from `nsqd`, the supplied `doneChan` (if specified) will receive a `ProducerTransaction` instance with the supplied variadic arguments and the response error if present

func (*Producer) SetLogger

func (w *Producer) SetLogger(l logger, lvl LogLevel)

SetLogger assigns the logger to use as well as a level

The logger parameter is an interface that requires the following method to be implemented (such as the the stdlib log.Logger):

Output(calldepth int, s string)

func (*Producer) Stop

func (w *Producer) Stop()

Stop initiates a graceful stop of the Producer (permanent)

NOTE: this blocks until completion

func (*Producer) String

func (w *Producer) String() string

String returns the address of the Producer

type ProducerTransaction

type ProducerTransaction struct {
	Error error         // the error (or nil) of the publish command
	Args  []interface{} // the slice of variadic arguments passed to PublishAsync or MultiPublishAsync
	// contains filtered or unexported fields
}

ProducerTransaction is returned by the async publish methods to retrieve metadata about the command after the response is received.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL