go-nats: github.com/nats-io/go-nats Index | Examples | Files | Directories

package nats

import "github.com/nats-io/go-nats"

A Go client for the NATS messaging system (https://nats.io).

A Go client for the NATS messaging system (https://nats.io).

Index

Examples

Package Files

context.go enc.go nats.go netchan.go parser.go timer.go

Constants

const (
    JSON_ENCODER    = "json"
    GOB_ENCODER     = "gob"
    DEFAULT_ENCODER = "default"
)

Indexe names into the Registered Encoders.

const (
    Version                 = "1.7.2"
    DefaultURL              = "nats://127.0.0.1:4222"
    DefaultPort             = 4222
    DefaultMaxReconnect     = 60
    DefaultReconnectWait    = 2 * time.Second
    DefaultTimeout          = 2 * time.Second
    DefaultPingInterval     = 2 * time.Minute
    DefaultMaxPingOut       = 2
    DefaultMaxChanLen       = 8192            // 8k
    DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB
    RequestChanLen          = 8
    DefaultDrainTimeout     = 30 * time.Second
    LangString              = "go"
)

Default Constants

const (
    // STALE_CONNECTION is for detection and proper handling of stale connections.
    STALE_CONNECTION = "stale connection"

    // PERMISSIONS_ERR is for when nats server subject authorization has failed.
    PERMISSIONS_ERR = "permissions violation"

    // AUTHORIZATION_ERR is for when nats server user authorization has failed.
    AUTHORIZATION_ERR = "authorization violation"
)
const (
    DISCONNECTED = Status(iota)
    CONNECTED
    CLOSED
    RECONNECTING
    CONNECTING
    DRAINING_SUBS
    DRAINING_PUBS
)
const (
    AsyncSubscription = SubscriptionType(iota)
    SyncSubscription
    ChanSubscription
    NilSubscription
)

The different types of subscription types.

const (
    DefaultSubPendingMsgsLimit  = 65536
    DefaultSubPendingBytesLimit = 65536 * 1024
)

Pending Limits

const (
    OP_START = iota
    OP_PLUS
    OP_PLUS_O
    OP_PLUS_OK
    OP_MINUS
    OP_MINUS_E
    OP_MINUS_ER
    OP_MINUS_ERR
    OP_MINUS_ERR_SPC
    MINUS_ERR_ARG
    OP_M
    OP_MS
    OP_MSG
    OP_MSG_SPC
    MSG_ARG
    MSG_PAYLOAD
    MSG_END
    OP_P
    OP_PI
    OP_PIN
    OP_PING
    OP_PO
    OP_PON
    OP_PONG
    OP_I
    OP_IN
    OP_INF
    OP_INFO
    OP_INFO_SPC
    INFO_ARG
)
const (
    InboxPrefix = "_INBOX."
)

InboxPrefix is the prefix for all inbox subjects.

const MAX_CONTROL_LINE_SIZE = 1024

Variables

var (
    ErrConnectionClosed       = errors.New("nats: connection closed")
    ErrConnectionDraining     = errors.New("nats: connection draining")
    ErrDrainTimeout           = errors.New("nats: draining connection timed out")
    ErrConnectionReconnecting = errors.New("nats: connection reconnecting")
    ErrSecureConnRequired     = errors.New("nats: secure connection required")
    ErrSecureConnWanted       = errors.New("nats: secure connection not available")
    ErrBadSubscription        = errors.New("nats: invalid subscription")
    ErrTypeSubscription       = errors.New("nats: invalid subscription type")
    ErrBadSubject             = errors.New("nats: invalid subject")
    ErrSlowConsumer           = errors.New("nats: slow consumer, messages dropped")
    ErrTimeout                = errors.New("nats: timeout")
    ErrBadTimeout             = errors.New("nats: timeout invalid")
    ErrAuthorization          = errors.New("nats: authorization violation")
    ErrNoServers              = errors.New("nats: no servers available for connection")
    ErrJsonParse              = errors.New("nats: connect message, json parse error")
    ErrChanArg                = errors.New("nats: argument needs to be a channel type")
    ErrMaxPayload             = errors.New("nats: maximum payload exceeded")
    ErrMaxMessages            = errors.New("nats: maximum messages delivered")
    ErrSyncSubRequired        = errors.New("nats: illegal call on an async subscription")
    ErrMultipleTLSConfigs     = errors.New("nats: multiple tls.Configs not allowed")
    ErrNoInfoReceived         = errors.New("nats: protocol exception, INFO not received")
    ErrReconnectBufExceeded   = errors.New("nats: outbound buffer limit exceeded")
    ErrInvalidConnection      = errors.New("nats: invalid connection")
    ErrInvalidMsg             = errors.New("nats: invalid message or message nil")
    ErrInvalidArg             = errors.New("nats: invalid argument")
    ErrInvalidContext         = errors.New("nats: invalid context")
    ErrNoDeadlineContext      = errors.New("nats: context requires a deadline")
    ErrNoEchoNotSupported     = errors.New("nats: no echo option not supported by this server")
    ErrClientIDNotSupported   = errors.New("nats: client ID not supported by this server")
    ErrUserButNoSigCB         = errors.New("nats: user callback defined without a signature handler")
    ErrNkeyButNoSigCB         = errors.New("nats: nkey defined without a signature handler")
    ErrNoUserCB               = errors.New("nats: user callback not defined")
    ErrNkeyAndUser            = errors.New("nats: user callback and nkey defined")
    ErrNkeysNotSupported      = errors.New("nats: nkeys not supported by the server")
    ErrStaleConnection        = errors.New("nats: " + STALE_CONNECTION)
    ErrTokenAlreadySet        = errors.New("nats: token and token handler both set")
)

Errors

var DefaultOptions = GetDefaultOptions()

DEPRECATED: Use GetDefaultOptions() instead. DefaultOptions is not safe for use by multiple clients. For details see #308.

func NewInbox Uses

func NewInbox() string

NewInbox will return an inbox string which can be used for directed replies from subscribers. These are guaranteed to be unique, but can be shared and subscribed to by others.

func RegisterEncoder Uses

func RegisterEncoder(encType string, enc Encoder)

RegisterEncoder will register the encType with the given Encoder. Useful for customization.

type AuthTokenHandler Uses

type AuthTokenHandler func() string

AuthTokenHandler is used to generate a new token.

type Conn Uses

type Conn struct {
    // Keep all members for which we use atomic at the beginning of the
    // struct and make sure they are all 64bits (or use padding if necessary).
    // atomic.* functions crash on 32bit machines if operand is not aligned
    // at 64bit. See https://github.com/golang/go/issues/599
    Statistics

    // Opts holds the configuration of the Conn.
    // Modifying the configuration of a running Conn is a race.
    Opts Options
    // contains filtered or unexported fields
}

A Conn represents a bare connection to a nats-server. It can send and receive []byte payloads.

func Connect Uses

func Connect(url string, options ...Option) (*Conn, error)

Connect will attempt to connect to the NATS system. The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222 Comma separated arrays are also supported, e.g. urlA, urlB. Options start with the defaults but can be overridden.

Shows different ways to create a Conn

Code:


nc, _ := nats.Connect(nats.DefaultURL)
nc.Close()

nc, _ = nats.Connect("nats://derek:secretpassword@demo.nats.io:4222")
nc.Close()

nc, _ = nats.Connect("tls://derek:secretpassword@demo.nats.io:4443")
nc.Close()

opts := nats.Options{
    AllowReconnect: true,
    MaxReconnect:   10,
    ReconnectWait:  5 * time.Second,
    Timeout:        1 * time.Second,
}

nc, _ = opts.Connect()
nc.Close()

func (*Conn) AuthRequired Uses

func (nc *Conn) AuthRequired() bool

AuthRequired will return if the connected server requires authorization.

func (*Conn) Barrier Uses

func (nc *Conn) Barrier(f func()) error

Barrier schedules the given function `f` to all registered asynchronous subscriptions. Only the last subscription to see this barrier will invoke the function. If no subscription is registered at the time of this call, `f()` is invoked right away. ErrConnectionClosed is returned if the connection is closed prior to the call.

func (*Conn) Buffered Uses

func (nc *Conn) Buffered() (int, error)

Buffered will return the number of bytes buffered to be sent to the server. FIXME(dlc) take into account disconnected state.

func (*Conn) ChanQueueSubscribe Uses

func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error)

ChanQueueSubscribe will express interest in the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message, which will be placed on the channel. You should not close the channel until sub.Unsubscribe() has been called. Note: This is the same than QueueSubscribeSyncWithChan.

func (*Conn) ChanSubscribe Uses

func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error)

ChanSubscribe will express interest in the given subject and place all messages received on the channel. You should not close the channel until sub.Unsubscribe() has been called.

func (*Conn) Close Uses

func (nc *Conn) Close()

Close will close the connection to the server. This call will release all blocking calls, such as Flush() and NextMsg()

Code:

nc, _ := nats.Connect(nats.DefaultURL)
nc.Close()

func (*Conn) ConnectedAddr Uses

func (nc *Conn) ConnectedAddr() string

ConnectedAddr returns the connected server's IP

func (*Conn) ConnectedServerId Uses

func (nc *Conn) ConnectedServerId() string

Report the connected server's Id

func (*Conn) ConnectedUrl Uses

func (nc *Conn) ConnectedUrl() string

Report the connected server's Url

func (*Conn) DiscoveredServers Uses

func (nc *Conn) DiscoveredServers() []string

DiscoveredServers returns only the server urls that have been discovered after a connection has been established. If authentication is enabled, use UserInfo or Token when connecting with these urls.

func (*Conn) Drain Uses

func (nc *Conn) Drain() error

Drain will put a connection into a drain state. All subscriptions will immediately be put into a drain state. Upon completion, the publishers will be drained and can not publish any additional messages. Upon draining of the publishers, the connection will be closed. Use the ClosedCB() option to know when the connection has moved from draining to closed.

func (*Conn) Flush Uses

func (nc *Conn) Flush() error

Flush will perform a round trip to the server and return when it receives the internal reply.

Code:

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")}
for i := 0; i < 1000; i++ {
    nc.PublishMsg(msg)
}
err := nc.Flush()
if err == nil {
    // Everything has been processed by the server for nc *Conn.
}

func (*Conn) FlushTimeout Uses

func (nc *Conn) FlushTimeout(timeout time.Duration) (err error)

FlushTimeout allows a Flush operation to have an associated timeout.

Code:

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")}
for i := 0; i < 1000; i++ {
    nc.PublishMsg(msg)
}
// Only wait for up to 1 second for Flush
err := nc.FlushTimeout(1 * time.Second)
if err == nil {
    // Everything has been processed by the server for nc *Conn.
}

func (*Conn) FlushWithContext Uses

func (nc *Conn) FlushWithContext(ctx context.Context) error

FlushWithContext will allow a context to control the duration of a Flush() call. This context should be non-nil and should have a deadline set. We will return an error if none is present.

func (*Conn) GetClientID Uses

func (nc *Conn) GetClientID() (uint64, error)

GetClientID returns the client ID assigned by the server to which the client is currently connected to. Note that the value may change if the client reconnects. This function returns ErrNoClientIDReturned if the server is of a version prior to 1.2.0.

func (*Conn) IsClosed Uses

func (nc *Conn) IsClosed() bool

IsClosed tests if a Conn has been closed.

func (*Conn) IsConnected Uses

func (nc *Conn) IsConnected() bool

IsConnected tests if a Conn is connected.

func (*Conn) IsDraining Uses

func (nc *Conn) IsDraining() bool

IsDraining tests if a Conn is in the draining state.

func (*Conn) IsReconnecting Uses

func (nc *Conn) IsReconnecting() bool

IsReconnecting tests if a Conn is reconnecting.

func (*Conn) LastError Uses

func (nc *Conn) LastError() error

LastError reports the last error encountered via the connection. It can be used reliably within ClosedCB in order to find out reason why connection was closed for example.

func (*Conn) MaxPayload Uses

func (nc *Conn) MaxPayload() int64

MaxPayload returns the size limit that a message payload can have. This is set by the server configuration and delivered to the client upon connect.

func (*Conn) NewRespInbox Uses

func (nc *Conn) NewRespInbox() string

NewRespInbox is the new format used for _INBOX.

func (*Conn) NumSubscriptions Uses

func (nc *Conn) NumSubscriptions() int

NumSubscriptions returns active number of subscriptions.

func (*Conn) Publish Uses

func (nc *Conn) Publish(subj string, data []byte) error

Publish publishes the data argument to the given subject. The data argument is left untouched and needs to be correctly interpreted on the receiver.

Code:

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

nc.Publish("foo", []byte("Hello World!"))

func (*Conn) PublishMsg Uses

func (nc *Conn) PublishMsg(m *Msg) error

PublishMsg publishes the Msg structure, which includes the Subject, an optional Reply and an optional Data field.

Code:

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")}
nc.PublishMsg(msg)

func (*Conn) PublishRequest Uses

func (nc *Conn) PublishRequest(subj, reply string, data []byte) error

PublishRequest will perform a Publish() excpecting a response on the reply subject. Use Request() for automatically waiting for a response inline.

func (*Conn) QueueSubscribe Uses

func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error)

QueueSubscribe creates an asynchronous queue subscriber on the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message asynchronously.

Code:

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

received := 0

nc.QueueSubscribe("foo", "worker_group", func(_ *nats.Msg) {
    received++
})

func (*Conn) QueueSubscribeSync Uses

func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error)

QueueSubscribeSync creates a synchronous queue subscriber on the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message synchronously using Subscription.NextMsg().

func (*Conn) QueueSubscribeSyncWithChan Uses

func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error)

QueueSubscribeSyncWithChan will express interest in the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message, which will be placed on the channel. You should not close the channel until sub.Unsubscribe() has been called. Note: This is the same than ChanQueueSubscribe.

func (*Conn) Request Uses

func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error)

Request will send a request payload and deliver the response message, or an error, including a timeout if no message was received properly.

Code:

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

nc.Subscribe("foo", func(m *nats.Msg) {
    nc.Publish(m.Reply, []byte("I will help you"))
})
nc.Request("foo", []byte("help"), 50*time.Millisecond)

func (*Conn) RequestWithContext Uses

func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error)

RequestWithContext takes a context, a subject and payload in bytes and request expecting a single response.

func (*Conn) Servers Uses

func (nc *Conn) Servers() []string

Servers returns the list of known server urls, including additional servers discovered after a connection has been established. If authentication is enabled, use UserInfo or Token when connecting with these urls.

func (*Conn) SetClosedHandler Uses

func (nc *Conn) SetClosedHandler(cb ConnHandler)

SetClosedHandler will set the reconnect event handler.

func (*Conn) SetDisconnectHandler Uses

func (nc *Conn) SetDisconnectHandler(dcb ConnHandler)

SetDisconnectHandler will set the disconnect event handler.

func (*Conn) SetDiscoveredServersHandler Uses

func (nc *Conn) SetDiscoveredServersHandler(dscb ConnHandler)

SetDiscoveredServersHandler will set the discovered servers handler.

func (*Conn) SetErrorHandler Uses

func (nc *Conn) SetErrorHandler(cb ErrHandler)

SetErrorHandler will set the async error handler.

func (*Conn) SetReconnectHandler Uses

func (nc *Conn) SetReconnectHandler(rcb ConnHandler)

SetReconnectHandler will set the reconnect event handler.

func (*Conn) Stats Uses

func (nc *Conn) Stats() Statistics

Stats will return a race safe copy of the Statistics section for the connection.

func (*Conn) Status Uses

func (nc *Conn) Status() Status

Status returns the current state of the connection.

func (*Conn) Subscribe Uses

func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error)

Subscribe will express interest in the given subject. The subject can have wildcards (partial:*, full:>). Messages will be delivered to the associated MsgHandler.

This Example shows an asynchronous subscriber.

Code:

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

nc.Subscribe("foo", func(m *nats.Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
})

func (*Conn) SubscribeSync Uses

func (nc *Conn) SubscribeSync(subj string) (*Subscription, error)

SubscribeSync will express interest on the given subject. Messages will be received synchronously using Subscription.NextMsg().

This Example shows a synchronous subscriber.

Code:

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

sub, _ := nc.SubscribeSync("foo")
m, err := sub.NextMsg(1 * time.Second)
if err == nil {
    fmt.Printf("Received a message: %s\n", string(m.Data))
} else {
    fmt.Println("NextMsg timed out.")
}

func (*Conn) TLSRequired Uses

func (nc *Conn) TLSRequired() bool

TLSRequired will return if the connected server requires TLS connections.

type ConnHandler Uses

type ConnHandler func(*Conn)

ConnHandler is used for asynchronous events such as disconnected and closed connections.

type CustomDialer Uses

type CustomDialer interface {
    Dial(network, address string) (net.Conn, error)
}

CustomDialer can be used to specify any dialer, not necessarily a *net.Dialer.

type EncodedConn Uses

type EncodedConn struct {
    Conn *Conn
    Enc  Encoder
}

EncodedConn are the preferred way to interface with NATS. They wrap a bare connection to a nats server and have an extendable encoder system that will encode and decode messages from raw Go types.

func NewEncodedConn Uses

func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error)

NewEncodedConn will wrap an existing Connection and utilize the appropriate registered encoder.

Shows how to wrap a Conn into an EncodedConn

Code:

nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, "json")
c.Close()

func (*EncodedConn) BindRecvChan Uses

func (c *EncodedConn) BindRecvChan(subject string, channel interface{}) (*Subscription, error)

BindRecvChan binds a channel for receive operations from NATS.

BindRecvChan() allows binding of a Go channel to a nats subject for subscribe operations. The Encoder attached to the EncodedConn will be used for un-marshaling.

Code:

nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, "json")
defer c.Close()

type person struct {
    Name    string
    Address string
    Age     int
}

ch := make(chan *person)
c.BindRecvChan("hello", ch)

me := &person{Name: "derek", Age: 22, Address: "85 Second St"}
c.Publish("hello", me)

// Receive the publish directly on a channel
who := <-ch

fmt.Printf("%v says hello!\n", who)

func (*EncodedConn) BindRecvQueueChan Uses

func (c *EncodedConn) BindRecvQueueChan(subject, queue string, channel interface{}) (*Subscription, error)

BindRecvQueueChan binds a channel for queue-based receive operations from NATS.

func (*EncodedConn) BindSendChan Uses

func (c *EncodedConn) BindSendChan(subject string, channel interface{}) error

BindSendChan binds a channel for send operations to NATS.

BindSendChan() allows binding of a Go channel to a nats subject for publish operations. The Encoder attached to the EncodedConn will be used for marshaling.

Code:

nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, "json")
defer c.Close()

type person struct {
    Name    string
    Address string
    Age     int
}

ch := make(chan *person)
c.BindSendChan("hello", ch)

me := &person{Name: "derek", Age: 22, Address: "85 Second St"}
ch <- me

func (*EncodedConn) Close Uses

func (c *EncodedConn) Close()

Close will close the connection to the server. This call will release all blocking calls, such as Flush(), etc.

func (*EncodedConn) Drain Uses

func (c *EncodedConn) Drain() error

Drain will put a connection into a drain state. All subscriptions will immediately be put into a drain state. Upon completion, the publishers will be drained and can not publish any additional messages. Upon draining of the publishers, the connection will be closed. Use the ClosedCB() option to know when the connection has moved from draining to closed.

func (*EncodedConn) Flush Uses

func (c *EncodedConn) Flush() error

Flush will perform a round trip to the server and return when it receives the internal reply.

func (*EncodedConn) FlushTimeout Uses

func (c *EncodedConn) FlushTimeout(timeout time.Duration) (err error)

FlushTimeout allows a Flush operation to have an associated timeout.

func (*EncodedConn) LastError Uses

func (c *EncodedConn) LastError() error

LastError reports the last error encountered via the Connection.

func (*EncodedConn) Publish Uses

func (c *EncodedConn) Publish(subject string, v interface{}) error

Publish publishes the data argument to the given subject. The data argument will be encoded using the associated encoder.

EncodedConn can publish virtually anything just by passing it in. The encoder will be used to properly encode the raw Go type

Code:

nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, "json")
defer c.Close()

type person struct {
    Name    string
    Address string
    Age     int
}

me := &person{Name: "derek", Age: 22, Address: "85 Second St"}
c.Publish("hello", me)

func (*EncodedConn) PublishRequest Uses

func (c *EncodedConn) PublishRequest(subject, reply string, v interface{}) error

PublishRequest will perform a Publish() expecting a response on the reply subject. Use Request() for automatically waiting for a response inline.

func (*EncodedConn) QueueSubscribe Uses

func (c *EncodedConn) QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error)

QueueSubscribe will create a queue subscription on the given subject and process incoming messages using the specified Handler. The Handler should be a func that matches a signature from the description of Handler from above.

func (*EncodedConn) Request Uses

func (c *EncodedConn) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error

Request will create an Inbox and perform a Request() call with the Inbox reply for the data v. A response will be decoded into the vPtrResponse.

func (*EncodedConn) RequestWithContext Uses

func (c *EncodedConn) RequestWithContext(ctx context.Context, subject string, v interface{}, vPtr interface{}) error

RequestWithContext will create an Inbox and perform a Request using the provided cancellation context with the Inbox reply for the data v. A response will be decoded into the vPtrResponse.

func (*EncodedConn) Subscribe Uses

func (c *EncodedConn) Subscribe(subject string, cb Handler) (*Subscription, error)

Subscribe will create a subscription on the given subject and process incoming messages using the specified Handler. The Handler should be a func that matches a signature from the description of Handler from above.

EncodedConn's subscribers will automatically decode the wire data into the requested Go type using the Decode() method of the registered Encoder. The callback signature can also vary to include additional data, such as subject and reply subjects.

Code:

nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, "json")
defer c.Close()

type person struct {
    Name    string
    Address string
    Age     int
}

c.Subscribe("hello", func(p *person) {
    fmt.Printf("Received a person! %+v\n", p)
})

c.Subscribe("hello", func(subj, reply string, p *person) {
    fmt.Printf("Received a person on subject %s! %+v\n", subj, p)
})

me := &person{Name: "derek", Age: 22, Address: "85 Second St"}
c.Publish("hello", me)

type Encoder Uses

type Encoder interface {
    Encode(subject string, v interface{}) ([]byte, error)
    Decode(subject string, data []byte, vPtr interface{}) error
}

Encoder interface is for all register encoders

func EncoderForType Uses

func EncoderForType(encType string) Encoder

EncoderForType will return the registered Encoder for the encType.

type ErrHandler Uses

type ErrHandler func(*Conn, *Subscription, error)

ErrHandler is used to process asynchronous errors encountered while processing inbound messages.

type Handler Uses

type Handler interface{}

Handler is a specific callback used for Subscribe. It is generalized to an interface{}, but we will discover its format and arguments at runtime and perform the correct callback, including de-marshaling JSON strings back into the appropriate struct based on the signature of the Handler.

Handlers are expected to have one of four signatures.

type person struct {
	Name string `json:"name,omitempty"`
	Age  uint   `json:"age,omitempty"`
}

handler := func(m *Msg)
handler := func(p *person)
handler := func(subject string, o *obj)
handler := func(subject, reply string, o *obj)

These forms allow a callback to request a raw Msg ptr, where the processing of the message from the wire is untouched. Process a JSON representation and demarshal it into the given struct, e.g. person. There are also variants where the callback wants either the subject, or the subject and the reply subject.

type Msg Uses

type Msg struct {
    Subject string
    Reply   string
    Data    []byte
    Sub     *Subscription
    // contains filtered or unexported fields
}

Msg is a structure used by Subscribers and PublishMsg().

type MsgHandler Uses

type MsgHandler func(msg *Msg)

MsgHandler is a callback function that processes messages delivered to asynchronous subscribers.

type Option Uses

type Option func(*Options) error

Option is a function on the options for a connection.

func ClientCert Uses

func ClientCert(certFile, keyFile string) Option

ClientCert is a helper option to provide the client certificate from a file. If Secure is not already set this will set it as well.

func ClosedHandler Uses

func ClosedHandler(cb ConnHandler) Option

ClosedHandler is an Option to set the closed handler.

func Dialer Uses

func Dialer(dialer *net.Dialer) Option

Dialer is an Option to set the dialer which will be used when attempting to establish a connection. DEPRECATED: Should use CustomDialer instead.

func DisconnectHandler Uses

func DisconnectHandler(cb ConnHandler) Option

DisconnectHandler is an Option to set the disconnected handler.

func DiscoveredServersHandler Uses

func DiscoveredServersHandler(cb ConnHandler) Option

DiscoveredServersHandler is an Option to set the new servers handler.

func DontRandomize Uses

func DontRandomize() Option

DontRandomize is an Option to turn off randomizing the server pool.

func DrainTimeout Uses

func DrainTimeout(t time.Duration) Option

DrainTimeout is an Option to set the timeout for draining a connection.

func ErrorHandler Uses

func ErrorHandler(cb ErrHandler) Option

ErrorHandler is an Option to set the async error handler.

func FlusherTimeout Uses

func FlusherTimeout(t time.Duration) Option

FlusherTimeout is an Option to set the write (and flush) timeout on a connection.

func MaxPingsOutstanding Uses

func MaxPingsOutstanding(max int) Option

MaxPingsOutstanding is an Option to set the maximum number of ping requests that can go un-answered by the server before closing the connection.

func MaxReconnects Uses

func MaxReconnects(max int) Option

MaxReconnects is an Option to set the maximum number of reconnect attempts.

func Name Uses

func Name(name string) Option

Name is an Option to set the client name.

func Nkey Uses

func Nkey(pubKey string, sigCB SignatureHandler) Option

Nkey will set the public Nkey and the signature callback to sign the server nonce.

func NkeyOptionFromSeed Uses

func NkeyOptionFromSeed(seedFile string) (Option, error)

NkeyOptionFromSeed will load an nkey pair from a seed file. It will return the NKey Option and will handle signing of nonce challenges from the server. It will take care to not hold keys in memory and to wipe memory.

func NoEcho Uses

func NoEcho() Option

NoEcho is an Option to turn off messages echoing back from a server. Note this is supported on servers >= version 1.2. Proto 1 or greater.

func NoReconnect Uses

func NoReconnect() Option

NoReconnect is an Option to turn off reconnect behavior.

func PingInterval Uses

func PingInterval(t time.Duration) Option

PingInterval is an Option to set the period for client ping commands.

func ReconnectBufSize Uses

func ReconnectBufSize(size int) Option

ReconnectBufSize sets the buffer size of messages kept while busy reconnecting.

func ReconnectHandler Uses

func ReconnectHandler(cb ConnHandler) Option

ReconnectHandler is an Option to set the reconnected handler.

func ReconnectWait Uses

func ReconnectWait(t time.Duration) Option

ReconnectWait is an Option to set the wait time between reconnect attempts.

func RootCAs Uses

func RootCAs(file ...string) Option

RootCAs is a helper option to provide the RootCAs pool from a list of filenames. If Secure is not already set this will set it as well.

func Secure Uses

func Secure(tls ...*tls.Config) Option

Secure is an Option to enable TLS secure connections that skip server verification by default. Pass a TLS Configuration for proper TLS. NOTE: This should NOT be used in a production setting.

func SetCustomDialer Uses

func SetCustomDialer(dialer CustomDialer) Option

SetCustomDialer is an Option to set a custom dialer which will be used when attempting to establish a connection. If both Dialer and CustomDialer are specified, CustomDialer takes precedence.

func SyncQueueLen Uses

func SyncQueueLen(max int) Option

SyncQueueLen will set the maximum queue len for the internal channel used for SubscribeSync().

func Timeout Uses

func Timeout(t time.Duration) Option

Timeout is an Option to set the timeout for Dial on a connection.

func Token Uses

func Token(token string) Option

Token is an Option to set the token to use when a token is not included directly in the URLs and when a token handler is not provided.

func TokenHandler Uses

func TokenHandler(cb AuthTokenHandler) Option

TokenHandler is an Option to set the token handler to use when a token is not included directly in the URLs and when a token is not set.

func UseOldRequestStyle Uses

func UseOldRequestStyle() Option

UseOldRequestStyle is an Option to force usage of the old Request style.

func UserCredentials Uses

func UserCredentials(userOrChainedFile string, seedFiles ...string) Option

UserCredentials is a convenience function that takes a filename for a user's JWT and a filename for the user's private Nkey seed.

func UserInfo Uses

func UserInfo(user, password string) Option

UserInfo is an Option to set the username and password to use when not included directly in the URLs.

func UserJWT Uses

func UserJWT(userCB UserJWTHandler, sigCB SignatureHandler) Option

UserJWT will set the callbacks to retrieve the user's JWT and the signature callback to sign the server nonce. This an the Nkey option are mutually exclusive.

type Options Uses

type Options struct {

    // Url represents a single NATS server url to which the client
    // will be connecting. If the Servers option is also set, it
    // then becomes the first server in the Servers array.
    Url string

    // Servers is a configured set of servers which this client
    // will use when attempting to connect.
    Servers []string

    // NoRandomize configures whether we will randomize the
    // server pool.
    NoRandomize bool

    // NoEcho configures whether the server will echo back messages
    // that are sent on this connection if we also have matching subscriptions.
    // Note this is supported on servers >= version 1.2. Proto 1 or greater.
    NoEcho bool

    // Name is an optional name label which will be sent to the server
    // on CONNECT to identify the client.
    Name string

    // Verbose signals the server to send an OK ack for commands
    // successfully processed by the server.
    Verbose bool

    // Pedantic signals the server whether it should be doing further
    // validation of subjects.
    Pedantic bool

    // Secure enables TLS secure connections that skip server
    // verification by default. NOT RECOMMENDED.
    Secure bool

    // TLSConfig is a custom TLS configuration to use for secure
    // transports.
    TLSConfig *tls.Config

    // AllowReconnect enables reconnection logic to be used when we
    // encounter a disconnect from the current server.
    AllowReconnect bool

    // MaxReconnect sets the number of reconnect attempts that will be
    // tried before giving up. If negative, then it will never give up
    // trying to reconnect.
    MaxReconnect int

    // ReconnectWait sets the time to backoff after attempting a reconnect
    // to a server that we were already connected to previously.
    ReconnectWait time.Duration

    // Timeout sets the timeout for a Dial operation on a connection.
    Timeout time.Duration

    // DrainTimeout sets the timeout for a Drain Operation to complete.
    DrainTimeout time.Duration

    // FlusherTimeout is the maximum time to wait for write operations
    // to the underlying connection to complete (including the flusher loop).
    FlusherTimeout time.Duration

    // PingInterval is the period at which the client will be sending ping
    // commands to the server, disabled if 0 or negative.
    PingInterval time.Duration

    // MaxPingsOut is the maximum number of pending ping commands that can
    // be awaiting a response before raising an ErrStaleConnection error.
    MaxPingsOut int

    // ClosedCB sets the closed handler that is called when a client will
    // no longer be connected.
    ClosedCB ConnHandler

    // DisconnectedCB sets the disconnected handler that is called
    // whenever the connection is disconnected.
    DisconnectedCB ConnHandler

    // ReconnectedCB sets the reconnected handler called whenever
    // the connection is successfully reconnected.
    ReconnectedCB ConnHandler

    // DiscoveredServersCB sets the callback that is invoked whenever a new
    // server has joined the cluster.
    DiscoveredServersCB ConnHandler

    // AsyncErrorCB sets the async error handler (e.g. slow consumer errors)
    AsyncErrorCB ErrHandler

    // ReconnectBufSize is the size of the backing bufio during reconnect.
    // Once this has been exhausted publish operations will return an error.
    ReconnectBufSize int

    // SubChanLen is the size of the buffered channel used between the socket
    // Go routine and the message delivery for SyncSubscriptions.
    // NOTE: This does not affect AsyncSubscriptions which are
    // dictated by PendingLimits()
    SubChanLen int

    // UserJWT sets the callback handler that will fetch a user's JWT.
    UserJWT UserJWTHandler

    // Nkey sets the public nkey that will be used to authenticate
    // when connecting to the server. UserJWT and Nkey are mutually exclusive
    // and if defined, UserJWT will take precedence.
    Nkey string

    // SignatureCB designates the function used to sign the nonce
    // presented from the server.
    SignatureCB SignatureHandler

    // User sets the username to be used when connecting to the server.
    User string

    // Password sets the password to be used when connecting to a server.
    Password string

    // Token sets the token to be used when connecting to a server.
    Token string

    // TokenHandler designates the function used to generate the token to be used when connecting to a server.
    TokenHandler AuthTokenHandler

    // Dialer allows a custom net.Dialer when forming connections.
    // DEPRECATED: should use CustomDialer instead.
    Dialer *net.Dialer

    // CustomDialer allows to specify a custom dialer (not necessarily
    // a *net.Dialer).
    CustomDialer CustomDialer

    // UseOldRequestStyle forces the old method of Requests that utilize
    // a new Inbox and a new Subscription for each request.
    UseOldRequestStyle bool
}

Options can be used to create a customized connection.

func GetDefaultOptions Uses

func GetDefaultOptions() Options

GetDefaultOptions returns default configuration options for the client.

func (Options) Connect Uses

func (o Options) Connect() (*Conn, error)

Connect will attempt to connect to a NATS server with multiple options.

type SignatureHandler Uses

type SignatureHandler func([]byte) ([]byte, error)

SignatureHandler is used to sign a nonce from the server while authenticating with nkeys. The user should sign the nonce and return the base64 encoded signature.

type Statistics Uses

type Statistics struct {
    InMsgs     uint64
    OutMsgs    uint64
    InBytes    uint64
    OutBytes   uint64
    Reconnects uint64
}

Tracks various stats received and sent on this connection, including counts for messages and bytes.

type Status Uses

type Status int

Status represents the state of the connection.

type Subscription Uses

type Subscription struct {

    // Subject that represents this subscription. This can be different
    // than the received subject inside a Msg if this is a wildcard.
    Subject string

    // Optional queue group name. If present, all subscriptions with the
    // same name will form a distributed queue, and each message will
    // only be processed by one member of the group.
    Queue string
    // contains filtered or unexported fields
}

A Subscription represents interest in a given subject.

func (*Subscription) AutoUnsubscribe Uses

func (s *Subscription) AutoUnsubscribe(max int) error

AutoUnsubscribe will issue an automatic Unsubscribe that is processed by the server when max messages have been received. This can be useful when sending a request to an unknown number of subscribers.

Code:

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

received, wanted, total := 0, 10, 100

sub, _ := nc.Subscribe("foo", func(_ *nats.Msg) {
    received++
})
sub.AutoUnsubscribe(wanted)

for i := 0; i < total; i++ {
    nc.Publish("foo", []byte("Hello"))
}
nc.Flush()

fmt.Printf("Received = %d", received)

func (*Subscription) ClearMaxPending Uses

func (s *Subscription) ClearMaxPending() error

ClearMaxPending resets the maximums seen so far.

func (*Subscription) Delivered Uses

func (s *Subscription) Delivered() (int64, error)

Delivered returns the number of delivered messages for this subscription.

func (*Subscription) Drain Uses

func (s *Subscription) Drain() error

Drain will remove interest but continue callbacks until all messages have been processed.

func (*Subscription) Dropped Uses

func (s *Subscription) Dropped() (int, error)

Dropped returns the number of known dropped messages for this subscription. This will correspond to messages dropped by violations of PendingLimits. If the server declares the connection a SlowConsumer, this number may not be valid.

func (*Subscription) IsValid Uses

func (s *Subscription) IsValid() bool

IsValid returns a boolean indicating whether the subscription is still active. This will return false if the subscription has already been closed.

func (*Subscription) MaxPending Uses

func (s *Subscription) MaxPending() (int, int, error)

MaxPending returns the maximum number of queued messages and queued bytes seen so far.

func (*Subscription) NextMsg Uses

func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error)

NextMsg will return the next message available to a synchronous subscriber or block until one is available. A timeout can be used to return when no message has been delivered.

Code:

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

sub, _ := nc.SubscribeSync("foo")
m, err := sub.NextMsg(1 * time.Second)
if err == nil {
    fmt.Printf("Received a message: %s\n", string(m.Data))
} else {
    fmt.Println("NextMsg timed out.")
}

func (*Subscription) NextMsgWithContext Uses

func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error)

NextMsgWithContext takes a context and returns the next message available to a synchronous subscriber, blocking until it is delivered or context gets canceled.

func (*Subscription) Pending Uses

func (s *Subscription) Pending() (int, int, error)

Pending returns the number of queued messages and queued bytes in the client for this subscription.

func (*Subscription) PendingLimits Uses

func (s *Subscription) PendingLimits() (int, int, error)

PendingLimits returns the current limits for this subscription. If no error is returned, a negative value indicates that the given metric is not limited.

func (*Subscription) QueuedMsgs Uses

func (s *Subscription) QueuedMsgs() (int, error)

Queued returns the number of queued messages in the client for this subscription. DEPRECATED: Use Pending()

func (*Subscription) SetPendingLimits Uses

func (s *Subscription) SetPendingLimits(msgLimit, bytesLimit int) error

SetPendingLimits sets the limits for pending msgs and bytes for this subscription. Zero is not allowed. Any negative value means that the given metric is not limited.

func (*Subscription) Type Uses

func (s *Subscription) Type() SubscriptionType

Type returns the type of Subscription.

func (*Subscription) Unsubscribe Uses

func (s *Subscription) Unsubscribe() error

Unsubscribe will remove interest in the given subject.

Code:

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

sub, _ := nc.SubscribeSync("foo")
// ...
sub.Unsubscribe()

type SubscriptionType Uses

type SubscriptionType int

SubscriptionType is the type of the Subscription.

type UserJWTHandler Uses

type UserJWTHandler func() (string, error)

UserJWTHandler is used to fetch and return the account signed JWT for this user.

Directories

PathSynopsis
bench
encoders/builtin
encoders/protobuf
test
util

Package nats imports 26 packages (graph) and is imported by 207 packages. Updated 2019-06-08. Refresh now. Tools for package owners.