stomp

package
v0.0.0-...-3245bf1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2020 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	MethodStomp       = []byte("STOMP")
	MethodConnect     = []byte("CONNECT")
	MethodConnected   = []byte("CONNECTED")
	MethodSend        = []byte("SEND")
	MethodSubscribe   = []byte("SUBSCRIBE")
	MethodUnsubscribe = []byte("UNSUBSCRIBE")
	MethodAck         = []byte("ACK")
	MethodNack        = []byte("NACK")
	MethodDisconnect  = []byte("DISCONNECT")
	MethodMessage     = []byte("MESSAGE")
	MethodRecipet     = []byte("RECEIPT")
	MethodError       = []byte("ERROR")
	MethodPing        = []byte("PING")
)

STOMP protocol methods.

View Source
var (
	HeaderAccept       = []byte("accept-version")
	HeaderAck          = []byte("ack")
	HeaderExpires      = []byte("expires")
	HeaderDest         = []byte("destination")
	HeaderHost         = []byte("host")
	HeaderLogin        = []byte("login")
	HeaderPass         = []byte("passcode")
	HeaderID           = []byte("id")
	HeaderMessageID    = []byte("message-id")
	HeaderPersist      = []byte("persist")
	HeaderPrefetch     = []byte("prefetch-count")
	HeaderReceipt      = []byte("receipt")
	HeaderReceiptID    = []byte("receipt-id")
	HeaderRetain       = []byte("retain")
	HeaderSelector     = []byte("selector")
	HeaderServer       = []byte("server")
	HeaderSession      = []byte("session")
	HeaderSubscription = []byte("subscription")
	HeaderVersion      = []byte("version")
)

STOMP protocol headers.

View Source
var (
	AckAuto      = []byte("auto")
	AckClient    = []byte("client")
	PersistTrue  = []byte("true")
	RetainTrue   = []byte("true")
	RetainLast   = []byte("last")
	RetainAll    = []byte("all")
	RetainRemove = []byte("remove")
)

Common STOMP header values.

View Source
var STOMP = []byte("1.2")

STOMP protocol version.

Functions

func ParseInt

func ParseInt(d []byte) (n int)

ParseInt returns the ascii integer value.

func ParseInt64

func ParseInt64(d []byte) (n int64)

ParseInt64 returns the ascii integer value.

func Pipe

func Pipe() (Peer, Peer)

Pipe creates a synchronous in-memory pipe, where reads on one end are matched with writes on the other. This is useful for direct, in-memory client-server communication.

func Rand

func Rand() []byte

Rand returns a random int64 number as a []byte of ascii characters.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client defines a client connection to a STOMP server.

func Dial

func Dial(target string) (*Client, error)

Dial creates a client connection to the given target.

func FromContext

func FromContext(ctx context.Context) (*Client, bool)

FromContext retrieves the client from context

func MustFromContext

func MustFromContext(ctx context.Context) *Client

MustFromContext retrieves the client from context. Panics if not found

func New

func New(peer Peer) *Client

New returns a new STOMP client using the given connection.

func (*Client) Ack

func (c *Client) Ack(id []byte, opts ...MessageOption) error

Ack acknowledges the messages with the given id.

func (*Client) Connect

func (c *Client) Connect(opts ...MessageOption) error

Connect opens the connection and establishes the session.

func (*Client) Disconnect

func (c *Client) Disconnect() error

Disconnect terminates the session and closes the connection.

func (*Client) Done

func (c *Client) Done() <-chan error

Done returns a channel

func (*Client) Nack

func (c *Client) Nack(id []byte, opts ...MessageOption) error

Nack negative-acknowledges the messages with the given id.

func (*Client) NewContext

func (c *Client) NewContext(ctx context.Context, client *Client) context.Context

NewContext adds the client to the context.

func (*Client) Send

func (c *Client) Send(dest string, data []byte, opts ...MessageOption) error

Send sends the data to the given destination.

func (*Client) SendJSON

func (c *Client) SendJSON(dest string, v interface{}, opts ...MessageOption) error

SendJSON sends the JSON encoding of v to the given destination.

func (*Client) Subscribe

func (c *Client) Subscribe(dest string, handler Handler, opts ...MessageOption) (id []byte, err error)

Subscribe subscribes to the given destination.

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(id []byte, opts ...MessageOption) error

Unsubscribe unsubscribes to the destination.

type Handler

type Handler interface {
	Handle(*Message)
}

Handler handles a STOMP message.

type HandlerFunc

type HandlerFunc func(*Message)

The HandlerFunc type is an adapter to allow the use of an ordinary function as a STOMP message handler.

func (HandlerFunc) Handle

func (f HandlerFunc) Handle(m *Message)

Handle calls f(m).

type Header struct {
	// contains filtered or unexported fields
}

Header represents the header section of the STOMP message.

func (*Header) Add

func (h *Header) Add(name, data []byte)

Add appens the key value pair to the header.

func (*Header) Field

func (h *Header) Field(name []byte) []byte

Field returns the named header value in string format. This is used to provide compatibility with the SQL expression evaluation package.

func (*Header) Get

func (h *Header) Get(name []byte) (b []byte)

Get returns the named header value.

func (*Header) GetBool

func (h *Header) GetBool(name string) bool

GetBool returns the named header value.

func (*Header) GetInt

func (h *Header) GetInt(name string) int

GetInt returns the named header value.

func (*Header) GetInt64

func (h *Header) GetInt64(name string) int64

GetInt64 returns the named header value.

func (*Header) GetString

func (h *Header) GetString(name string) string

GetString returns the named header value.

func (*Header) Index

func (h *Header) Index(i int) (k, v []byte)

Index returns the keypair at index i.

func (*Header) Len

func (h *Header) Len() int

Len returns the header length.

type Message

type Message struct {
	ID       []byte // id header
	Proto    []byte // stomp version
	Method   []byte // stomp method
	User     []byte // username header
	Pass     []byte // password header
	Dest     []byte // destination header
	Subs     []byte // subscription id
	Ack      []byte // ack id
	Msg      []byte // message-id header
	Persist  []byte // persist header
	Retain   []byte // retain header
	Prefetch []byte // prefetch count
	Expires  []byte // expires header
	Receipt  []byte // receipt header
	Selector []byte // selector header
	Body     []byte
	Header   *Header // custom headers
	// contains filtered or unexported fields
}

Message represents a parsed STOMP message.

func NewMessage

func NewMessage() *Message

NewMessage returns an empty message from the message pool.

func (*Message) Apply

func (m *Message) Apply(opts ...MessageOption)

Apply applies the options to the message.

func (*Message) Bytes

func (m *Message) Bytes() []byte

Bytes returns the Message in raw byte format.

func (*Message) Context

func (m *Message) Context() context.Context

Context returns the request's context.

func (*Message) Copy

func (m *Message) Copy() *Message

Copy returns a copy of the Message.

func (*Message) Parse

func (m *Message) Parse(b []byte) error

Parse parses the raw bytes into the message.

func (*Message) Release

func (m *Message) Release()

Release releases the message back to the message pool.

func (*Message) Reset

func (m *Message) Reset()

Reset resets the meesage fields to their zero values.

func (*Message) String

func (m *Message) String() string

String returns the Message in string format.

func (*Message) Unmarshal

func (m *Message) Unmarshal(v interface{}) error

Unmarshal parses the JSON-encoded body of the message and stores the result in the value pointed to by v.

func (*Message) WithContext

func (m *Message) WithContext(ctx context.Context) *Message

WithContext returns a shallow copy of m with its context changed to ctx. The provided ctx must be non-nil.

type MessageOption

type MessageOption func(*Message)

MessageOption configures message options.

func WithAck

func WithAck(ack string) MessageOption

WithAck returns a MessageOption configured with an ack policy.

func WithCredentials

func WithCredentials(username, password string) MessageOption

WithCredentials returns a MessageOption which sets credentials.

func WithExpires

func WithExpires(exp int64) MessageOption

WithExpires returns a MessageOption configured with an expiration.

func WithHeader

func WithHeader(key, value string) MessageOption

WithHeader returns a MessageOption which sets a header.

func WithHeaders

func WithHeaders(headers map[string]string) MessageOption

WithHeaders returns a MessageOption which sets headers.

func WithPersistence

func WithPersistence() MessageOption

WithPersistence returns a MessageOption configured to persist.

func WithPrefetch

func WithPrefetch(prefetch int) MessageOption

WithPrefetch returns a MessageOption configured with a prefetch count.

func WithReceipt

func WithReceipt() MessageOption

WithReceipt returns a MessageOption configured with a receipt request.

func WithRetain

func WithRetain(retain string) MessageOption

WithRetain returns a MessageOption configured to retain the message.

func WithSelector

func WithSelector(selector string) MessageOption

WithSelector returns a MessageOption configured to filter messages using a sql-like evaluation string.

type Peer

type Peer interface {
	// Send sends a message.
	Send(*Message) error

	// Receive returns a channel of inbound messages.
	Receive() <-chan *Message

	// Close closes the connection.
	Close() error

	// Addr returns the peer address.
	Addr() string
}

Peer defines a peer-to-peer connection.

func Conn

func Conn(c net.Conn) Peer

Conn creates a network-connected peer that reads and writes messages using net.Conn c.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL