go-nsq: github.com/nsqio/go-nsq Index | Examples | Files

package nsq

import "github.com/nsqio/go-nsq"

Package nsq is the official Go package for NSQ (http://nsq.io/)

It provides high-level Consumer and Producer types as well as low-level functions to communicate over the NSQ protocol

Index

Examples

Package Files

api_request.go command.go config.go config_flag.go conn.go consumer.go delegates.go errors.go message.go producer.go protocol.go states.go version.go

Constants

const (
    FrameTypeResponse int32 = 0
    FrameTypeError    int32 = 1
    FrameTypeMessage  int32 = 2
)

frame types

const (
    StateInit = iota
    StateDisconnected
    StateConnected
)

states

const MsgIDLength = 16

The number of bytes for a Message.ID

const VERSION = "1.0.7"

VERSION

Variables

var ErrAlreadyConnected = errors.New("already connected")

ErrAlreadyConnected is returned from ConnectToNSQD when already connected

var ErrClosing = errors.New("closing")

ErrClosing is returned when a connection is closing

var ErrNotConnected = errors.New("not connected")

ErrNotConnected is returned when a publish command is made against a Producer that is not connected

var ErrOverMaxInFlight = errors.New("over configure max-inflight")

ErrOverMaxInFlight is returned from Consumer if over max-in-flight

var ErrStopped = errors.New("stopped")

ErrStopped is returned when a publish command is made against a Producer that has been stopped

var MagicV1 = []byte("  V1")

MagicV1 is the initial identifier sent when connecting for V1 clients

var MagicV2 = []byte("  V2")

MagicV2 is the initial identifier sent when connecting for V2 clients

func IsValidChannelName Uses

func IsValidChannelName(name string) bool

IsValidChannelName checks a channel name for correctness

func IsValidTopicName Uses

func IsValidTopicName(name string) bool

IsValidTopicName checks a topic name for correctness

func ReadResponse Uses

func ReadResponse(r io.Reader) ([]byte, error)

ReadResponse is a client-side utility function to read from the supplied Reader according to the NSQ protocol spec:

[x][x][x][x][x][x][x][x]...
|  (int32) || (binary)
|  4-byte  || N-byte
------------------------...
    size       data

func ReadUnpackedResponse Uses

func ReadUnpackedResponse(r io.Reader) (int32, []byte, error)

ReadUnpackedResponse reads and parses data from the underlying TCP connection according to the NSQ TCP protocol spec and returns the frameType, data or error

func UnpackResponse Uses

func UnpackResponse(response []byte) (int32, []byte, error)

UnpackResponse is a client-side utility function that unpacks serialized data according to NSQ protocol spec:

[x][x][x][x][x][x][x][x]...
|  (int32) || (binary)
|  4-byte  || N-byte
------------------------...
  frame ID     data

Returns a triplicate of: frame type, data ([]byte), error

type AuthResponse Uses

type AuthResponse struct {
    Identity        string `json:"identity"`
    IdentityUrl     string `json:"identity_url"`
    PermissionCount int64  `json:"permission_count"`
}

AuthResponse represents the metadata returned from an AUTH command to nsqd

type BackoffStrategy Uses

type BackoffStrategy interface {
    Calculate(attempt int) time.Duration
}

BackoffStrategy defines a strategy for calculating the duration of time a consumer should backoff for a given attempt

type Command Uses

type Command struct {
    Name   []byte
    Params [][]byte
    Body   []byte
}

Command represents a command from a client to an NSQ daemon

func Auth Uses

func Auth(secret string) (*Command, error)

Auth sends credentials for authentication

After `Identify`, this is usually the first message sent, if auth is used.

func DeferredPublish Uses

func DeferredPublish(topic string, delay time.Duration, body []byte) *Command

DeferredPublish creates a new Command to write a message to a given topic where the message will queue at the channel level until the timeout expires

func Finish Uses

func Finish(id MessageID) *Command

Finish creates a new Command to indiciate that a given message (by id) has been processed successfully

func Identify Uses

func Identify(js map[string]interface{}) (*Command, error)

Identify creates a new Command to provide information about the client. After connecting, it is generally the first message sent.

The supplied map is marshaled into JSON to provide some flexibility for this command to evolve over time.

See http://nsq.io/clients/tcp_protocol_spec.html#identify for information on the supported options

func MultiPublish Uses

func MultiPublish(topic string, bodies [][]byte) (*Command, error)

MultiPublish creates a new Command to write more than one message to a given topic (useful for high-throughput situations to avoid roundtrips and saturate the pipe)

func Nop Uses

func Nop() *Command

Nop creates a new Command that has no effect server side. Commonly used to respond to heartbeats

func Ping Uses

func Ping() *Command

Ping creates a new Command to keep-alive the state of all the announced topic/channels for a given client

func Publish Uses

func Publish(topic string, body []byte) *Command

Publish creates a new Command to write a message to a given topic

func Ready Uses

func Ready(count int) *Command

Ready creates a new Command to specify the number of messages a client is willing to receive

func Register Uses

func Register(topic string, channel string) *Command

Register creates a new Command to add a topic/channel for the connected nsqd

func Requeue Uses

func Requeue(id MessageID, delay time.Duration) *Command

Requeue creates a new Command to indicate that a given message (by id) should be requeued after the given delay NOTE: a delay of 0 indicates immediate requeue

func StartClose Uses

func StartClose() *Command

StartClose creates a new Command to indicate that the client would like to start a close cycle. nsqd will no longer send messages to a client in this state and the client is expected finish pending messages and close the connection

func Subscribe Uses

func Subscribe(topic string, channel string) *Command

Subscribe creates a new Command to subscribe to the given topic/channel

func Touch Uses

func Touch(id MessageID) *Command

Touch creates a new Command to reset the timeout for a given message (by id)

func UnRegister Uses

func UnRegister(topic string, channel string) *Command

UnRegister creates a new Command to remove a topic/channel for the connected nsqd

func (*Command) String Uses

func (c *Command) String() string

String returns the name and parameters of the Command

func (*Command) WriteTo Uses

func (c *Command) WriteTo(w io.Writer) (int64, error)

WriteTo implements the WriterTo interface and serializes the Command to the supplied Writer.

It is suggested that the target Writer is buffered to avoid performing many system calls.

type Config Uses

type Config struct {
    DialTimeout time.Duration `opt:"dial_timeout" default:"1s"`

    // Deadlines for network reads and writes
    ReadTimeout  time.Duration `opt:"read_timeout" min:"100ms" max:"5m" default:"60s"`
    WriteTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m" default:"1s"`

    // LocalAddr is the local address to use when dialing an nsqd.
    // If empty, a local address is automatically chosen.
    LocalAddr net.Addr `opt:"local_addr"`

    // Duration between polling lookupd for new producers, and fractional jitter to add to
    // the lookupd pool loop. this helps evenly distribute requests even if multiple consumers
    // restart at the same time
    //
    // NOTE: when not using nsqlookupd, LookupdPollInterval represents the duration of time between
    // reconnection attempts
    LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s"`
    LookupdPollJitter   float64       `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"`

    // Maximum duration when REQueueing (for doubling of deferred requeue)
    MaxRequeueDelay     time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"`
    DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"`

    // Backoff strategy, defaults to exponential backoff. Overwrite this to define alternative backoff algrithms.
    BackoffStrategy BackoffStrategy `opt:"backoff_strategy" default:"exponential"`
    // Maximum amount of time to backoff when processing fails 0 == no backoff
    MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"`
    // Unit of time for calculating consumer backoff
    BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"`

    // Maximum number of times this consumer will attempt to process a message before giving up
    MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`

    // Duration to wait for a message from an nsqd when in a state where RDY
    // counts are re-distributed (e.g. max_in_flight < num_producers)
    LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"`
    // Duration to wait until redistributing RDY for an nsqd regardless of LowRdyIdleTimeout
    LowRdyTimeout time.Duration `opt:"low_rdy_timeout" min:"1s" max:"5m" default:"30s"`
    // Duration between redistributing max-in-flight to connections
    RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s"`

    // Identifiers sent to nsqd representing this client
    // UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>")
    ClientID  string `opt:"client_id"` // (defaults: short hostname)
    Hostname  string `opt:"hostname"`
    UserAgent string `opt:"user_agent"`

    // Duration of time between heartbeats. This must be less than ReadTimeout
    HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s"`
    // Integer percentage to sample the channel (requires nsqd 0.2.25+)
    SampleRate int32 `opt:"sample_rate" min:"0" max:"99"`

    // To set TLS config, use the following options:
    //
    // tls_v1 - Bool enable TLS negotiation
    // tls_root_ca_file - String path to file containing root CA
    // tls_insecure_skip_verify - Bool indicates whether this client should verify server certificates
    // tls_cert - String path to file containing public key for certificate
    // tls_key - String path to file containing private key for certificate
    // tls_min_version - String indicating the minimum version of tls acceptable ('ssl3.0', 'tls1.0', 'tls1.1', 'tls1.2')
    //
    TlsV1     bool        `opt:"tls_v1"`
    TlsConfig *tls.Config `opt:"tls_config"`

    // Compression Settings
    Deflate      bool `opt:"deflate"`
    DeflateLevel int  `opt:"deflate_level" min:"1" max:"9" default:"6"`
    Snappy       bool `opt:"snappy"`

    // Size of the buffer (in bytes) used by nsqd for buffering writes to this connection
    OutputBufferSize int64 `opt:"output_buffer_size" default:"16384"`
    // Timeout used by nsqd before flushing buffered writes (set to 0 to disable).
    //
    // WARNING: configuring clients with an extremely low
    // (< 25ms) output_buffer_timeout has a significant effect
    // on nsqd CPU usage (particularly with > 50 clients connected).
    OutputBufferTimeout time.Duration `opt:"output_buffer_timeout" default:"250ms"`

    // Maximum number of messages to allow in flight (concurrency knob)
    MaxInFlight int `opt:"max_in_flight" min:"0" default:"1"`

    // The server-side message timeout for messages delivered to this client
    MsgTimeout time.Duration `opt:"msg_timeout" min:"0"`

    // secret for nsqd authentication (requires nsqd 0.2.29+)
    AuthSecret string `opt:"auth_secret"`
    // contains filtered or unexported fields
}

Config is a struct of NSQ options

The only valid way to create a Config is via NewConfig, using a struct literal will panic. After Config is passed into a high-level type (like Consumer, Producer, etc.) the values are no longer mutable (they are copied).

Use Set(option string, value interface{}) as an alternate way to set parameters

func NewConfig Uses

func NewConfig() *Config

NewConfig returns a new default nsq configuration.

This must be used to initialize Config structs. Values can be set directly, or through Config.Set()

func (*Config) Set Uses

func (c *Config) Set(option string, value interface{}) error

Set takes an option as a string and a value as an interface and attempts to set the appropriate configuration option.

It attempts to coerce the value into the right format depending on the named option and the underlying type of the value passed in.

Calls to Set() that take a time.Duration as an argument can be input as:

"1000ms" (a string parsed by time.ParseDuration())
1000 (an integer interpreted as milliseconds)
1000*time.Millisecond (a literal time.Duration value)

Calls to Set() that take bool can be input as:

"true" (a string parsed by strconv.ParseBool())
true (a boolean)
1 (an int where 1 == true and 0 == false)

It returns an error for an invalid option or value.

func (*Config) Validate Uses

func (c *Config) Validate() error

Validate checks that all values are within specified min/max ranges

type ConfigFlag Uses

type ConfigFlag struct {
    Config *Config
}

ConfigFlag wraps a Config and implements the flag.Value interface

Code:

cfg := nsq.NewConfig()
flagSet := flag.NewFlagSet("", flag.ExitOnError)

flagSet.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to pass through to nsq.Consumer (may be given multiple times)")
flagSet.PrintDefaults()

err := flagSet.Parse([]string{
    "--consumer-opt=heartbeat_interval,1s",
    "--consumer-opt=max_attempts,10",
})
if err != nil {
    panic(err.Error())
}
println("HeartbeatInterval", cfg.HeartbeatInterval)
println("MaxAttempts", cfg.MaxAttempts)

func (*ConfigFlag) Set Uses

func (c *ConfigFlag) Set(opt string) (err error)

Set takes a comma separated value and follows the rules in Config.Set using the first field as the option key, and the second (if present) as the value

func (*ConfigFlag) String Uses

func (c *ConfigFlag) String() string

String implements the flag.Value interface

type Conn Uses

type Conn struct {
    // contains filtered or unexported fields
}

Conn represents a connection to nsqd

Conn exposes a set of callbacks for the various events that occur on a connection

func NewConn Uses

func NewConn(addr string, config *Config, delegate ConnDelegate) *Conn

NewConn returns a new Conn instance

func (*Conn) Close Uses

func (c *Conn) Close() error

Close idempotently initiates connection close

func (*Conn) Connect Uses

func (c *Conn) Connect() (*IdentifyResponse, error)

Connect dials and bootstraps the nsqd connection (including IDENTIFY) and returns the IdentifyResponse

func (*Conn) Flush Uses

func (c *Conn) Flush() error

Flush writes all buffered data to the underlying TCP connection

func (*Conn) IsClosing Uses

func (c *Conn) IsClosing() bool

IsClosing indicates whether or not the connection is currently in the processing of gracefully closing

func (*Conn) LastMessageTime Uses

func (c *Conn) LastMessageTime() time.Time

LastMessageTime returns a time.Time representing the time at which the last message was received

func (*Conn) LastRDY Uses

func (c *Conn) LastRDY() int64

LastRDY returns the previously set RDY count

func (*Conn) LastRdyTime Uses

func (c *Conn) LastRdyTime() time.Time

LastRdyTime returns the time of the last non-zero RDY update for this connection

func (*Conn) MaxRDY Uses

func (c *Conn) MaxRDY() int64

MaxRDY returns the nsqd negotiated maximum RDY count that it will accept for this connection

func (*Conn) RDY Uses

func (c *Conn) RDY() int64

RDY returns the current RDY count

func (*Conn) Read Uses

func (c *Conn) Read(p []byte) (int, error)

Read performs a deadlined read on the underlying TCP connection

func (*Conn) RemoteAddr Uses

func (c *Conn) RemoteAddr() net.Addr

RemoteAddr returns the configured destination nsqd address

func (*Conn) SetLogger Uses

func (c *Conn) SetLogger(l logger, lvl LogLevel, format string)

SetLogger assigns the logger to use as well as a level.

The format parameter is expected to be a printf compatible string with a single %s argument. This is useful if you want to provide additional context to the log messages that the connection will print, the default is '(%s)'.

The logger parameter is an interface that requires the following method to be implemented (such as the the stdlib log.Logger):

Output(calldepth int, s string)

func (*Conn) SetRDY Uses

func (c *Conn) SetRDY(rdy int64)

SetRDY stores the specified RDY count

func (*Conn) String Uses

func (c *Conn) String() string

String returns the fully-qualified address

func (*Conn) Write Uses

func (c *Conn) Write(p []byte) (int, error)

Write performs a deadlined write on the underlying TCP connection

func (*Conn) WriteCommand Uses

func (c *Conn) WriteCommand(cmd *Command) error

WriteCommand is a goroutine safe method to write a Command to this connection, and flush.

type ConnDelegate Uses

type ConnDelegate interface {
    // OnResponse is called when the connection
    // receives a FrameTypeResponse from nsqd
    OnResponse(*Conn, []byte)

    // OnError is called when the connection
    // receives a FrameTypeError from nsqd
    OnError(*Conn, []byte)

    // OnMessage is called when the connection
    // receives a FrameTypeMessage from nsqd
    OnMessage(*Conn, *Message)

    // OnMessageFinished is called when the connection
    // handles a FIN command from a message handler
    OnMessageFinished(*Conn, *Message)

    // OnMessageRequeued is called when the connection
    // handles a REQ command from a message handler
    OnMessageRequeued(*Conn, *Message)

    // OnBackoff is called when the connection triggers a backoff state
    OnBackoff(*Conn)

    // OnContinue is called when the connection finishes a message without adjusting backoff state
    OnContinue(*Conn)

    // OnResume is called when the connection triggers a resume state
    OnResume(*Conn)

    // OnIOError is called when the connection experiences
    // a low-level TCP transport error
    OnIOError(*Conn, error)

    // OnHeartbeat is called when the connection
    // receives a heartbeat from nsqd
    OnHeartbeat(*Conn)

    // OnClose is called when the connection
    // closes, after all cleanup
    OnClose(*Conn)
}

ConnDelegate is an interface of methods that are used as callbacks in Conn

type Consumer Uses

type Consumer struct {

    // read from this channel to block until consumer is cleanly stopped
    StopChan chan int
    // contains filtered or unexported fields
}

Consumer is a high-level type to consume from NSQ.

A Consumer instance is supplied a Handler that will be executed concurrently via goroutines to handle processing the stream of messages consumed from the specified topic/channel. See: Handler/HandlerFunc for details on implementing the interface to create handlers.

If configured, it will poll nsqlookupd instances and handle connection (and reconnection) to any discovered nsqds.

func NewConsumer Uses

func NewConsumer(topic string, channel string, config *Config) (*Consumer, error)

NewConsumer creates a new instance of Consumer for the specified topic/channel

The only valid way to create a Config is via NewConfig, using a struct literal will panic. After Config is passed into NewConsumer the values are no longer mutable (they are copied).

func (*Consumer) AddConcurrentHandlers Uses

func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int)

AddConcurrentHandlers sets the Handler for messages received by this Consumer. It takes a second argument which indicates the number of goroutines to spawn for message handling.

This panics if called after connecting to NSQD or NSQ Lookupd

(see Handler or HandlerFunc for details on implementing this interface)

func (*Consumer) AddHandler Uses

func (r *Consumer) AddHandler(handler Handler)

AddHandler sets the Handler for messages received by this Consumer. This can be called multiple times to add additional handlers. Handler will have a 1:1 ratio to message handling goroutines.

This panics if called after connecting to NSQD or NSQ Lookupd

(see Handler or HandlerFunc for details on implementing this interface)

func (*Consumer) ChangeMaxInFlight Uses

func (r *Consumer) ChangeMaxInFlight(maxInFlight int)

ChangeMaxInFlight sets a new maximum number of messages this comsumer instance will allow in-flight, and updates all existing connections as appropriate.

For example, ChangeMaxInFlight(0) would pause message flow

If already connected, it updates the reader RDY state for each connection.

func (*Consumer) ConnectToNSQD Uses

func (r *Consumer) ConnectToNSQD(addr string) error

ConnectToNSQD takes a nsqd address to connect directly to.

It is recommended to use ConnectToNSQLookupd so that topics are discovered automatically. This method is useful when you want to connect to a single, local, instance.

func (*Consumer) ConnectToNSQDs Uses

func (r *Consumer) ConnectToNSQDs(addresses []string) error

ConnectToNSQDs takes multiple nsqd addresses to connect directly to.

It is recommended to use ConnectToNSQLookupd so that topics are discovered automatically. This method is useful when you want to connect to local instance.

func (*Consumer) ConnectToNSQLookupd Uses

func (r *Consumer) ConnectToNSQLookupd(addr string) error

ConnectToNSQLookupd adds an nsqlookupd address to the list for this Consumer instance.

If it is the first to be added, it initiates an HTTP request to discover nsqd producers for the configured topic.

A goroutine is spawned to handle continual polling.

func (*Consumer) ConnectToNSQLookupds Uses

func (r *Consumer) ConnectToNSQLookupds(addresses []string) error

ConnectToNSQLookupds adds multiple nsqlookupd address to the list for this Consumer instance.

If adding the first address it initiates an HTTP request to discover nsqd producers for the configured topic.

A goroutine is spawned to handle continual polling.

func (*Consumer) DisconnectFromNSQD Uses

func (r *Consumer) DisconnectFromNSQD(addr string) error

DisconnectFromNSQD closes the connection to and removes the specified `nsqd` address from the list

func (*Consumer) DisconnectFromNSQLookupd Uses

func (r *Consumer) DisconnectFromNSQLookupd(addr string) error

DisconnectFromNSQLookupd removes the specified `nsqlookupd` address from the list used for periodic discovery.

func (*Consumer) IsStarved Uses

func (r *Consumer) IsStarved() bool

IsStarved indicates whether any connections for this consumer are blocked on processing before being able to receive more messages (ie. RDY count of 0 and not exiting)

func (*Consumer) SetBehaviorDelegate Uses

func (r *Consumer) SetBehaviorDelegate(cb interface{})

SetBehaviorDelegate takes a type implementing one or more of the following interfaces that modify the behavior of the `Consumer`:

DiscoveryFilter

func (*Consumer) SetLogger Uses

func (r *Consumer) SetLogger(l logger, lvl LogLevel)

SetLogger assigns the logger to use as well as a level

The logger parameter is an interface that requires the following method to be implemented (such as the the stdlib log.Logger):

Output(calldepth int, s string)

func (*Consumer) Stats Uses

func (r *Consumer) Stats() *ConsumerStats

Stats retrieves the current connection and message statistics for a Consumer

func (*Consumer) Stop Uses

func (r *Consumer) Stop()

Stop will initiate a graceful stop of the Consumer (permanent)

NOTE: receive on StopChan to block until this process completes

type ConsumerStats Uses

type ConsumerStats struct {
    MessagesReceived uint64
    MessagesFinished uint64
    MessagesRequeued uint64
    Connections      int
}

ConsumerStats represents a snapshot of the state of a Consumer's connections and the messages it has seen

type DiscoveryFilter Uses

type DiscoveryFilter interface {
    Filter([]string) []string
}

DiscoveryFilter is an interface accepted by `SetBehaviorDelegate()` for filtering the nsqds returned from discovery via nsqlookupd

type ErrIdentify Uses

type ErrIdentify struct {
    Reason string
}

ErrIdentify is returned from Conn as part of the IDENTIFY handshake

func (ErrIdentify) Error Uses

func (e ErrIdentify) Error() string

Error returns a stringified error

type ErrProtocol Uses

type ErrProtocol struct {
    Reason string
}

ErrProtocol is returned from Producer when encountering an NSQ protocol level error

func (ErrProtocol) Error Uses

func (e ErrProtocol) Error() string

Error returns a stringified error

type ExponentialStrategy Uses

type ExponentialStrategy struct {
    // contains filtered or unexported fields
}

ExponentialStrategy implements an exponential backoff strategy (default)

func (*ExponentialStrategy) Calculate Uses

func (s *ExponentialStrategy) Calculate(attempt int) time.Duration

Calculate returns a duration of time: 2 ^ attempt

type FailedMessageLogger Uses

type FailedMessageLogger interface {
    LogFailedMessage(message *Message)
}

FailedMessageLogger is an interface that can be implemented by handlers that wish to receive a callback when a message is deemed "failed" (i.e. the number of attempts exceeded the Consumer specified MaxAttemptCount)

type FullJitterStrategy Uses

type FullJitterStrategy struct {
    // contains filtered or unexported fields
}

FullJitterStrategy implements http://www.awsarchitectureblog.com/2015/03/backoff.html

func (*FullJitterStrategy) Calculate Uses

func (s *FullJitterStrategy) Calculate(attempt int) time.Duration

Calculate returns a random duration of time [0, 2 ^ attempt]

type Handler Uses

type Handler interface {
    HandleMessage(message *Message) error
}

Handler is the message processing interface for Consumer

Implement this interface for handlers that return whether or not message processing completed successfully.

When the return value is nil Consumer will automatically handle FINishing.

When the returned value is non-nil Consumer will automatically handle REQueing.

type HandlerFunc Uses

type HandlerFunc func(message *Message) error

HandlerFunc is a convenience type to avoid having to declare a struct to implement the Handler interface, it can be used like this:

consumer.AddHandler(nsq.HandlerFunc(func(m *Message) error {
	// handle the message
}))

func (HandlerFunc) HandleMessage Uses

func (h HandlerFunc) HandleMessage(m *Message) error

HandleMessage implements the Handler interface

type IdentifyResponse Uses

type IdentifyResponse struct {
    MaxRdyCount  int64 `json:"max_rdy_count"`
    TLSv1        bool  `json:"tls_v1"`
    Deflate      bool  `json:"deflate"`
    Snappy       bool  `json:"snappy"`
    AuthRequired bool  `json:"auth_required"`
}

IdentifyResponse represents the metadata returned from an IDENTIFY command to nsqd

type LogLevel Uses

type LogLevel int

LogLevel specifies the severity of a given log message

const (
    LogLevelDebug LogLevel = iota
    LogLevelInfo
    LogLevelWarning
    LogLevelError
)

Log levels

func (LogLevel) String Uses

func (lvl LogLevel) String() string

String returns the string form for a given LogLevel

type Message Uses

type Message struct {
    ID        MessageID
    Body      []byte
    Timestamp int64
    Attempts  uint16

    NSQDAddress string

    Delegate MessageDelegate
    // contains filtered or unexported fields
}

Message is the fundamental data type containing the id, body, and metadata

func DecodeMessage Uses

func DecodeMessage(b []byte) (*Message, error)

DecodeMessage deserializes data (as []byte) and creates a new Message message format:

[x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
|       (int64)        ||    ||      (hex string encoded in ASCII)           || (binary)
|       8-byte         ||    ||                 16-byte                      || N-byte
------------------------------------------------------------------------------------------...
  nanosecond timestamp    ^^                   message ID                       message body
                       (uint16)
                        2-byte
                       attempts

func NewMessage Uses

func NewMessage(id MessageID, body []byte) *Message

NewMessage creates a Message, initializes some metadata, and returns a pointer

func (*Message) DisableAutoResponse Uses

func (m *Message) DisableAutoResponse()

DisableAutoResponse disables the automatic response that would normally be sent when a handler.HandleMessage returns (FIN/REQ based on the error value returned).

This is useful if you want to batch, buffer, or asynchronously respond to messages.

func (*Message) Finish Uses

func (m *Message) Finish()

Finish sends a FIN command to the nsqd which sent this message

func (*Message) HasResponded Uses

func (m *Message) HasResponded() bool

HasResponded indicates whether or not this message has been responded to

func (*Message) IsAutoResponseDisabled Uses

func (m *Message) IsAutoResponseDisabled() bool

IsAutoResponseDisabled indicates whether or not this message will be responded to automatically

func (*Message) Requeue Uses

func (m *Message) Requeue(delay time.Duration)

Requeue sends a REQ command to the nsqd which sent this message, using the supplied delay.

A delay of -1 will automatically calculate based on the number of attempts and the configured default_requeue_delay

func (*Message) RequeueWithoutBackoff Uses

func (m *Message) RequeueWithoutBackoff(delay time.Duration)

RequeueWithoutBackoff sends a REQ command to the nsqd which sent this message, using the supplied delay.

Notably, using this method to respond does not trigger a backoff event on the configured Delegate.

func (*Message) Touch Uses

func (m *Message) Touch()

Touch sends a TOUCH command to the nsqd which sent this message

func (*Message) WriteTo Uses

func (m *Message) WriteTo(w io.Writer) (int64, error)

WriteTo implements the WriterTo interface and serializes the message into the supplied producer.

It is suggested that the target Writer is buffered to avoid performing many system calls.

type MessageDelegate Uses

type MessageDelegate interface {
    // OnFinish is called when the Finish() method
    // is triggered on the Message
    OnFinish(*Message)

    // OnRequeue is called when the Requeue() method
    // is triggered on the Message
    OnRequeue(m *Message, delay time.Duration, backoff bool)

    // OnTouch is called when the Touch() method
    // is triggered on the Message
    OnTouch(*Message)
}

MessageDelegate is an interface of methods that are used as callbacks in Message

type MessageID Uses

type MessageID [MsgIDLength]byte

MessageID is the ASCII encoded hexadecimal message ID

type Producer Uses

type Producer struct {
    // contains filtered or unexported fields
}

Producer is a high-level type to publish to NSQ.

A Producer instance is 1:1 with a destination `nsqd` and will lazily connect to that instance (and re-connect) when Publish commands are executed.

func NewProducer Uses

func NewProducer(addr string, config *Config) (*Producer, error)

NewProducer returns an instance of Producer for the specified address

The only valid way to create a Config is via NewConfig, using a struct literal will panic. After Config is passed into NewProducer the values are no longer mutable (they are copied).

func (*Producer) DeferredPublish Uses

func (w *Producer) DeferredPublish(topic string, delay time.Duration, body []byte) error

DeferredPublish synchronously publishes a message body to the specified topic where the message will queue at the channel level until the timeout expires, returning an error if publish failed

func (*Producer) DeferredPublishAsync Uses

func (w *Producer) DeferredPublishAsync(topic string, delay time.Duration, body []byte,
    doneChan chan *ProducerTransaction, args ...interface{}) error

DeferredPublishAsync publishes a message body to the specified topic where the message will queue at the channel level until the timeout expires but does not wait for the response from `nsqd`.

When the Producer eventually receives the response from `nsqd`, the supplied `doneChan` (if specified) will receive a `ProducerTransaction` instance with the supplied variadic arguments and the response error if present

func (*Producer) MultiPublish Uses

func (w *Producer) MultiPublish(topic string, body [][]byte) error

MultiPublish synchronously publishes a slice of message bodies to the specified topic, returning an error if publish failed

func (*Producer) MultiPublishAsync Uses

func (w *Producer) MultiPublishAsync(topic string, body [][]byte, doneChan chan *ProducerTransaction,
    args ...interface{}) error

MultiPublishAsync publishes a slice of message bodies to the specified topic but does not wait for the response from `nsqd`.

When the Producer eventually receives the response from `nsqd`, the supplied `doneChan` (if specified) will receive a `ProducerTransaction` instance with the supplied variadic arguments and the response error if present

func (*Producer) Ping Uses

func (w *Producer) Ping() error

Ping causes the Producer to connect to it's configured nsqd (if not already connected) and send a `Nop` command, returning any error that might occur.

This method can be used to verify that a newly-created Producer instance is configured correctly, rather than relying on the lazy "connect on Publish" behavior of a Producer.

func (*Producer) Publish Uses

func (w *Producer) Publish(topic string, body []byte) error

Publish synchronously publishes a message body to the specified topic, returning an error if publish failed

func (*Producer) PublishAsync Uses

func (w *Producer) PublishAsync(topic string, body []byte, doneChan chan *ProducerTransaction,
    args ...interface{}) error

PublishAsync publishes a message body to the specified topic but does not wait for the response from `nsqd`.

When the Producer eventually receives the response from `nsqd`, the supplied `doneChan` (if specified) will receive a `ProducerTransaction` instance with the supplied variadic arguments and the response error if present

func (*Producer) SetLogger Uses

func (w *Producer) SetLogger(l logger, lvl LogLevel)

SetLogger assigns the logger to use as well as a level

The logger parameter is an interface that requires the following method to be implemented (such as the the stdlib log.Logger):

Output(calldepth int, s string)

func (*Producer) Stop Uses

func (w *Producer) Stop()

Stop initiates a graceful stop of the Producer (permanent)

NOTE: this blocks until completion

func (*Producer) String Uses

func (w *Producer) String() string

String returns the address of the Producer

type ProducerTransaction Uses

type ProducerTransaction struct {
    Error error         // the error (or nil) of the publish command
    Args  []interface{} // the slice of variadic arguments passed to PublishAsync or MultiPublishAsync
    // contains filtered or unexported fields
}

ProducerTransaction is returned by the async publish methods to retrieve metadata about the command after the response is received.

Package nsq imports 27 packages (graph) and is imported by 359 packages. Updated 2019-07-15. Refresh now. Tools for package owners.