Documentation ¶
Overview ¶
Package electron lets you write concurrent AMQP 1.0 messaging clients and servers.
This package requires the [proton-C library](http://github.com/apache/qpid-proton/go/pkg/proton) to be installed.
Start by creating a Container with NewContainer. An AMQP Container represents a single AMQP "application" and can contain client and server connections.
You can enable AMQP over any connection that implements the standard net.Conn interface. Typically you can connect with net.Dial() or listen for server connections with net.Listen. Enable AMQP by passing the net.Conn to Container.Connection().
AMQP allows bi-direction peer-to-peer message exchange as well as client-to-broker. Messages are sent over "links". Each link is one-way and has a Sender and Receiver end. Connection.Sender() and Connection.Receiver() open links to Send() and Receive() messages. Connection.Incoming() lets you accept incoming links opened by the remote peer. You can open and accept multiple links in both directions on a single Connection.
Some of the documentation examples show client and server side by side in a single program, in separate goroutines. This is only for example purposes, real AMQP applications would run in separate processes on the network.
Some of the documentation examples show client and server side by side in a single program, in separate goroutines. This is only for example purposes, real AMQP applications would run in separate processes on the network.
Example (ClientServer) ¶
Example client sending messages to a server running in a goroutine.
package main import ( "fmt" "github.com/apache/qpid-proton/go/pkg/amqp" "github.com/apache/qpid-proton/go/pkg/electron" "log" "net" "sync" ) // Example Server that accepts a single Connection, Session and Receiver link // and prints messages received until the link closes. func Server(l net.Listener) { cont := electron.NewContainer("server") c, err := cont.Accept(l) if err != nil { log.Fatal(err) } l.Close() // This server only accepts one connection // Process incoming endpoints till we get a Receiver link var r electron.Receiver for r == nil { in := <-c.Incoming() switch in := in.(type) { case *electron.IncomingSession, *electron.IncomingConnection: in.Accept() // Accept the incoming connection and session for the receiver case *electron.IncomingReceiver: in.SetCapacity(10) in.SetPrefetch(true) // Automatic flow control for a buffer of 10 messages. r = in.Accept().(electron.Receiver) case nil: return // Connection is closed default: in.Reject(amqp.Errorf("example-server", "unexpected endpoint %v", in)) } } go func() { // Reject any further incoming endpoints for in := range c.Incoming() { in.Reject(amqp.Errorf("example-server", "unexpected endpoint %v", in)) } }() // Receive messages till the Receiver closes rm, err := r.Receive() for ; err == nil; rm, err = r.Receive() { fmt.Printf("server received: %q\n", rm.Message.Body()) rm.Accept() // Signal to the client that the message was accepted } fmt.Printf("server receiver closed: %v\n", err) } // Example client sending messages to a server running in a goroutine. func main() { l, err := net.Listen("tcp", "127.0.0.1:0") // tcp4 so example will work on ipv6-disabled platforms if err != nil { log.Fatal(err) } // SERVER: start the server running in a separate goroutine var waitServer sync.WaitGroup // We will wait for the server goroutine to finish before exiting waitServer.Add(1) go func() { // Run the server in the background defer waitServer.Done() Server(l) }() // CLIENT: Send messages to the server addr := l.Addr() c, err := electron.Dial(addr.Network(), addr.String()) if err != nil { log.Fatal(err) } s, err := c.Sender() if err != nil { log.Fatal(err) } for i := 0; i < 3; i++ { msg := fmt.Sprintf("hello %v", i) // Send and wait for the Outcome from the server. // Note: For higher throughput, use SendAsync() to send a stream of messages // and process the returning stream of Outcomes concurrently. s.SendSync(amqp.NewMessageWith(msg)) } c.Close(nil) // Closing the connection will stop the server waitServer.Wait() // Let the server finish }
Output: server received: "hello 0" server received: "hello 1" server received: "hello 2" server receiver closed: EOF
Index ¶
- Constants
- Variables
- func After(timeout time.Duration) <-chan time.Time
- func GlobalSASLConfigDir(dir string)
- func GlobalSASLConfigName(name string)
- func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error)
- func SASLExtended() bool
- type Connection
- type ConnectionOption
- func AllowIncoming() ConnectionOption
- func ContainerId(id string) ConnectionOption
- func Heartbeat(delay time.Duration) ConnectionOption
- func Parent(cont Container) ConnectionOption
- func Password(password []byte) ConnectionOption
- func SASLAllowInsecure(b bool) ConnectionOption
- func SASLAllowedMechs(mechs string) ConnectionOption
- func SASLEnable() ConnectionOption
- func Server() ConnectionOption
- func User(user string) ConnectionOption
- func VirtualHost(virtualHost string) ConnectionOption
- type ConnectionSettings
- type Container
- type Endpoint
- type Incoming
- type IncomingConnection
- func (in *IncomingConnection) Accept() Endpoint
- func (in *IncomingConnection) AcceptConnection(opts ...ConnectionOption) Connection
- func (c IncomingConnection) Heartbeat() time.Duration
- func (in *IncomingConnection) Reject(err error)
- func (in *IncomingConnection) String() string
- func (c IncomingConnection) User() string
- func (c IncomingConnection) VirtualHost() string
- type IncomingReceiver
- func (in *IncomingReceiver) Accept() Endpoint
- func (l *IncomingReceiver) Filter() map[amqp.Symbol]interface{}
- func (l *IncomingReceiver) IsReceiver() bool
- func (l *IncomingReceiver) IsSender() bool
- func (l *IncomingReceiver) LinkName() string
- func (l *IncomingReceiver) RcvSettle() RcvSettleMode
- func (in *IncomingReceiver) Reject(err error)
- func (in *IncomingReceiver) SetCapacity(capacity int)
- func (in *IncomingReceiver) SetPrefetch(prefetch bool)
- func (l *IncomingReceiver) SndSettle() SndSettleMode
- func (l *IncomingReceiver) Source() string
- func (l *IncomingReceiver) SourceSettings() TerminusSettings
- func (in *IncomingReceiver) String() string
- func (l *IncomingReceiver) Target() string
- func (l *IncomingReceiver) TargetSettings() TerminusSettings
- type IncomingSender
- func (in *IncomingSender) Accept() Endpoint
- func (l *IncomingSender) Filter() map[amqp.Symbol]interface{}
- func (l *IncomingSender) IsReceiver() bool
- func (l *IncomingSender) IsSender() bool
- func (l *IncomingSender) LinkName() string
- func (l *IncomingSender) RcvSettle() RcvSettleMode
- func (in *IncomingSender) Reject(err error)
- func (l *IncomingSender) SndSettle() SndSettleMode
- func (l *IncomingSender) Source() string
- func (l *IncomingSender) SourceSettings() TerminusSettings
- func (in *IncomingSender) String() string
- func (l *IncomingSender) Target() string
- func (l *IncomingSender) TargetSettings() TerminusSettings
- type IncomingSession
- type LinkOption
- func AtLeastOnce() LinkOption
- func AtMostOnce() LinkOption
- func Capacity(n int) LinkOption
- func DurableSubscription(name string) LinkOption
- func Filter(m map[amqp.Symbol]interface{}) LinkOption
- func LinkName(s string) LinkOption
- func Prefetch(p bool) LinkOption
- func RcvSettle(m RcvSettleMode) LinkOption
- func SndSettle(m SndSettleMode) LinkOption
- func Source(s string) LinkOption
- func SourceSettings(ts TerminusSettings) LinkOption
- func Target(s string) LinkOption
- func TargetSettings(ts TerminusSettings) LinkOption
- type LinkSettings
- type Outcome
- type RcvSettleMode
- type ReceivedMessage
- type Receiver
- type Sender
- type SentStatus
- type Session
- type SessionOption
- type SndSettleMode
- type TerminusSettings
Examples ¶
Constants ¶
const ( // Messages are sent unsettled SndUnsettled = SndSettleMode(proton.SndUnsettled) // Messages are sent already settled SndSettled = SndSettleMode(proton.SndSettled) // Sender can send either unsettled or settled messages. SndMixed = SndSettleMode(proton.SndMixed) )
const ( // Receiver settles first. RcvFirst = RcvSettleMode(proton.RcvFirst) // Receiver waits for sender to settle before settling. RcvSecond = RcvSettleMode(proton.RcvSecond) )
const Forever time.Duration = math.MaxInt64
Forever can be used as a timeout parameter to indicate wait forever.
Variables ¶
var Closed = io.EOF
Closed is an alias for io.EOF. It is returned as an error when an endpoint was closed cleanly.
var EOF = io.EOF
EOF is an alias for io.EOF. It is returned as an error when an endpoint was closed cleanly.
var Timeout = fmt.Errorf("timeout")
Timeout is the error returned if an operation does not complete on time.
Methods named *Timeout in this package take time.Duration timeout parameter.
If timeout > 0 and there is no result available before the timeout, they return a zero or nil value and Timeout as an error.
If timeout == 0 they will return a result if one is immediately available or nil/zero and Timeout as an error if not.
If timeout == Forever the function will return only when there is a result or some non-timeout error occurs.
Functions ¶
func After ¶
After is like time.After but returns a nil channel if timeout == Forever since selecting on a nil channel will never return.
func GlobalSASLConfigDir ¶
func GlobalSASLConfigDir(dir string)
GlobalSASLConfigDir sets the SASL configuration directory for every Connection created in this process. If not called, the default is determined by your SASL installation.
You can set SASLAllowInsecure and SASLAllowedMechs on individual connections.
Must be called at most once, before any connections are created.
func GlobalSASLConfigName ¶
func GlobalSASLConfigName(name string)
GlobalSASLConfigName sets the SASL configuration name for every Connection created in this process. If not called the default is "proton-server".
The complete configuration file name is
<sasl-config-dir>/<sasl-config-name>.conf
You can set SASLAllowInsecure and SASLAllowedMechs on individual connections.
Must be called at most once, before any connections are created.
func NewConnection ¶
func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error)
NewConnection creates a connection with the given options. Options are applied in order.
func SASLExtended ¶
func SASLExtended() bool
Do we support extended SASL negotiation? All implementations of Proton support ANONYMOUS and EXTERNAL on both client and server sides and PLAIN on the client side.
Extended SASL implememtations use an external library (Cyrus SASL) to support other mechanisms beyond these basic ones.
Types ¶
type Connection ¶
type Connection interface { Endpoint ConnectionSettings // Sender opens a new sender on the DefaultSession. Sender(...LinkOption) (Sender, error) // Receiver opens a new Receiver on the DefaultSession(). Receiver(...LinkOption) (Receiver, error) // DefaultSession() returns a default session for the connection. It is opened // on the first call to DefaultSession and returned on subsequent calls. DefaultSession() (Session, error) // Session opens a new session. Session(...SessionOption) (Session, error) // Container for the connection. Container() Container // Disconnect the connection abruptly with an error. Disconnect(error) // Wait waits for the connection to be disconnected. Wait() error // WaitTimeout is like Wait but returns Timeout if the timeout expires. WaitTimeout(time.Duration) error // Incoming returns a channel for incoming endpoints opened by the remote peer. // See the Incoming interface for more detail. // // Note: this channel will first return an *IncomingConnection for the // connection itself which allows you to look at security information and // decide whether to Accept() or Reject() the connection. Then it will return // *IncomingSession, *IncomingSender and *IncomingReceiver as they are opened // by the remote end. // // Note 2: you must receiving from Incoming() and call Accept/Reject to avoid // blocking electron event loop. Normally you would run a loop in a goroutine // to handle incoming types that interest and Accept() those that don't. Incoming() <-chan Incoming }
Connection is an AMQP connection, created by a Container.
func Dial ¶
func Dial(network, address string, opts ...ConnectionOption) (c Connection, err error)
Dial is shorthand for using net.Dial() then NewConnection() See net.Dial() for the meaning of the network, address arguments.
func DialWithDialer ¶
func DialWithDialer(dialer *net.Dialer, network, address string, opts ...ConnectionOption) (c Connection, err error)
DialWithDialer is shorthand for using dialer.Dial() then NewConnection() See net.Dial() for the meaning of the network, address arguments.
type ConnectionOption ¶
type ConnectionOption func(*connection)
ConnectionOption arguments can be passed when creating a connection to configure it.
func AllowIncoming ¶
func AllowIncoming() ConnectionOption
AllowIncoming returns a ConnectionOption to enable incoming endpoints, see Connection.Incoming() This is automatically set for Server() connections.
func ContainerId ¶
func ContainerId(id string) ConnectionOption
ContainerId returns a ConnectionOption that creates a new Container with id and associates it with the connection
func Heartbeat ¶
func Heartbeat(delay time.Duration) ConnectionOption
Heartbeat returns a ConnectionOption that requests the maximum delay between sending frames for the remote peer. If we don't receive any frames within 2*delay we will close the connection.
func Parent ¶
func Parent(cont Container) ConnectionOption
Parent returns a ConnectionOption that associates the Connection with it's Container If not set a connection will create its own default container.
func Password ¶
func Password(password []byte) ConnectionOption
Password returns a ConnectionOption to set the password used to establish a connection. Only applies to outbound client connection.
The connection will erase its copy of the password from memory as soon as it has been used to authenticate. If you are concerned about passwords staying in memory you should never store them as strings, and should overwrite your copy as soon as you are done with it.
func SASLAllowInsecure ¶
func SASLAllowInsecure(b bool) ConnectionOption
SASLAllowInsecure returns a ConnectionOption that allows or disallows clear text SASL authentication mechanisms
By default the SASL layer is configured not to allow mechanisms that disclose the clear text of the password over an unencrypted AMQP connection. This specifically will disallow the use of the PLAIN mechanism without using SSL encryption.
This default is to avoid disclosing password information accidentally over an insecure network.
func SASLAllowedMechs ¶
func SASLAllowedMechs(mechs string) ConnectionOption
SASLAllowedMechs returns a ConnectionOption to set the list of allowed SASL mechanisms.
Can be used on the client or the server to restrict the SASL for a connection. mechs is a space-separated list of mechanism names.
The mechanisms allowed by default are determined by your SASL library and system configuration, with two exceptions: GSSAPI and GSS-SPNEGO are disabled by default. To enable them, you must explicitly add them using this option.
Clients must set the allowed mechanisms before the the outgoing connection is attempted. Servers must set them before the listening connection is setup.
func SASLEnable ¶
func SASLEnable() ConnectionOption
SASLEnable returns a ConnectionOption that enables SASL authentication. Only required if you don't set any other SASL options.
func Server ¶
func Server() ConnectionOption
Server returns a ConnectionOption to put the connection in server mode for incoming connections.
A server connection will do protocol negotiation to accept a incoming AMQP connection. Normally you would call this for a connection created by net.Listener.Accept()
func User ¶
func User(user string) ConnectionOption
User returns a ConnectionOption sets the user name for a connection
func VirtualHost ¶
func VirtualHost(virtualHost string) ConnectionOption
VirtualHost returns a ConnectionOption to set the AMQP virtual host for the connection. Only applies to outbound client connection.
type ConnectionSettings ¶
type ConnectionSettings interface { // Authenticated user name associated with the connection. User() string // The AMQP virtual host name for the connection. // // Optional, useful when the server has multiple names and provides different // service based on the name the client uses to connect. // // By default it is set to the DNS host name that the client uses to connect, // but it can be set to something different at the client side with the // VirtualHost() option. // // Returns error if the connection fails to authenticate. VirtualHost() string // Heartbeat is the maximum delay between sending frames that the remote peer // has requested of us. If the interval expires an empty "heartbeat" frame // will be sent automatically to keep the connection open. Heartbeat() time.Duration }
Settings associated with a Connection.
type Container ¶
type Container interface { // Id is a unique identifier for the container in your distributed application. Id() string // Connection creates a connection associated with this container. Connection(conn net.Conn, opts ...ConnectionOption) (Connection, error) // Dial is shorthand for // conn, err := net.Dial(); c, err := Connection(conn, opts...) // See net.Dial() for the meaning of the network, address arguments. Dial(network string, address string, opts ...ConnectionOption) (Connection, error) // Accept is shorthand for: // conn, err := l.Accept(); c, err := Connection(conn, append(opts, Server()...) Accept(l net.Listener, opts ...ConnectionOption) (Connection, error) // String returns Id() String() string }
Container is an AMQP container, it represents a single AMQP "application" which can have multiple client or server connections.
Each Container in a distributed AMQP application must have a unique container-id which is applied to its connections.
Create with NewContainer()
func NewContainer ¶
NewContainer creates a new container. The id must be unique in your distributed application, all connections created by the container will have this container-id.
If id == "" a random UUID will be generated for the id.
type Endpoint ¶
type Endpoint interface { // Close an endpoint and signal an error to the remote end if error != nil. Close(error) // String is a human readable identifier, useful for debugging and logging. String() string // Error returns nil if the endpoint is open, otherwise returns an error. // Error() == Closed means the endpoint was closed without error. Error() error // Connection is the connection associated with this endpoint. Connection() Connection // Done returns a channel that will close when the endpoint closes. // After Done() has closed, Error() will return the reason for closing. Done() <-chan struct{} // Sync() waits for the remote peer to confirm the endpoint is active or // reject it with an error. You can call it immediately on new endpoints // for more predictable error handling. // // AMQP is an asynchronous protocol. It is legal to create an endpoint and // start using it without waiting for confirmation. This avoids a needless // delay in the non-error case and throughput by "assuming the best". // // However if there *is* an error, these "optimistic" actions will fail. The // endpoint and its children will be closed with an error. The error will only // be detected when you try to use one of these endpoints or call Sync() Sync() error }
Endpoint is the local end of a communications channel to the remote peer process. The following interface implement Endpoint: Connection, Session, Sender and Receiver.
You can create an endpoint with functions on Container, Connection and Session. You can accept incoming endpoints from the remote peer using Connection.Incoming()
type Incoming ¶
type Incoming interface { // Accept and open the endpoint. Accept() Endpoint // Reject the endpoint with an error Reject(error) // contains filtered or unexported methods }
Incoming is the interface for incoming endpoints, see Connection.Incoming()
Call Incoming.Accept() to open the endpoint or Incoming.Reject() to close it with optional error
Implementing types are *IncomingConnection, *IncomingSession, *IncomingSender and *IncomingReceiver. Each type provides methods to examine the incoming endpoint request and set configuration options for the local endpoint before calling Accept() or Reject()
type IncomingConnection ¶
type IncomingConnection struct {
// contains filtered or unexported fields
}
func (*IncomingConnection) Accept ¶
func (in *IncomingConnection) Accept() Endpoint
func (*IncomingConnection) AcceptConnection ¶
func (in *IncomingConnection) AcceptConnection(opts ...ConnectionOption) Connection
AcceptConnection is like Accept() but takes ConnectionOption arguments like NewConnection(). For example you can set the Heartbeat() for the incoming connection.
func (IncomingConnection) VirtualHost ¶
func (c IncomingConnection) VirtualHost() string
type IncomingReceiver ¶
type IncomingReceiver struct {
// contains filtered or unexported fields
}
IncomingReceiver is sent on the Connection.Incoming() channel when there is an incoming request to open a receiver link.
func (*IncomingReceiver) Accept ¶
func (in *IncomingReceiver) Accept() Endpoint
Accept accepts an incoming receiver endpoint
func (*IncomingReceiver) IsReceiver ¶
func (l *IncomingReceiver) IsReceiver() bool
func (*IncomingReceiver) RcvSettle ¶
func (l *IncomingReceiver) RcvSettle() RcvSettleMode
func (*IncomingReceiver) SetCapacity ¶
func (in *IncomingReceiver) SetCapacity(capacity int)
SetCapacity sets the capacity of the incoming receiver, call before Accept()
func (*IncomingReceiver) SetPrefetch ¶
func (in *IncomingReceiver) SetPrefetch(prefetch bool)
SetPrefetch sets the pre-fetch mode of the incoming receiver, call before Accept()
func (*IncomingReceiver) SndSettle ¶
func (l *IncomingReceiver) SndSettle() SndSettleMode
func (*IncomingReceiver) SourceSettings ¶
func (l *IncomingReceiver) SourceSettings() TerminusSettings
func (*IncomingReceiver) TargetSettings ¶
func (l *IncomingReceiver) TargetSettings() TerminusSettings
type IncomingSender ¶
type IncomingSender struct {
// contains filtered or unexported fields
}
IncomingSender is sent on the Connection.Incoming() channel when there is an incoming request to open a sender link.
func (*IncomingSender) Accept ¶
func (in *IncomingSender) Accept() Endpoint
Accept accepts an incoming sender endpoint
func (*IncomingSender) IsReceiver ¶
func (l *IncomingSender) IsReceiver() bool
func (*IncomingSender) RcvSettle ¶
func (l *IncomingSender) RcvSettle() RcvSettleMode
func (*IncomingSender) SndSettle ¶
func (l *IncomingSender) SndSettle() SndSettleMode
func (*IncomingSender) SourceSettings ¶
func (l *IncomingSender) SourceSettings() TerminusSettings
func (*IncomingSender) TargetSettings ¶
func (l *IncomingSender) TargetSettings() TerminusSettings
type IncomingSession ¶
type IncomingSession struct {
// contains filtered or unexported fields
}
IncomingSender is sent on the Connection.Incoming() channel when there is an incoming request to open a session.
func (*IncomingSession) Accept ¶
func (in *IncomingSession) Accept() Endpoint
Accept an incoming session endpoint.
func (*IncomingSession) SetIncomingCapacity ¶
func (in *IncomingSession) SetIncomingCapacity(bytes uint)
SetIncomingCapacity sets the session buffer capacity of an incoming session in bytes.
func (*IncomingSession) SetOutgoingWindow ¶
func (in *IncomingSession) SetOutgoingWindow(frames uint)
SetOutgoingWindow sets the session outgoing window of an incoming session in frames.
type LinkOption ¶
type LinkOption func(*linkSettings)
LinkOption can be passed when creating a sender or receiver link to set optional configuration.
func AtLeastOnce ¶
func AtLeastOnce() LinkOption
AtLeastOnce returns a LinkOption that requests acknowledgment for every message, acknowledgment indicates the message was definitely received. In the event of a failure, unacknowledged messages can be re-sent but there is a chance that the message will be received twice in this case. Sets SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst
func AtMostOnce ¶
func AtMostOnce() LinkOption
AtMostOnce returns a LinkOption that sets "fire and forget" mode, messages are sent but no acknowledgment is received, messages can be lost if there is a network failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
func Capacity ¶
func Capacity(n int) LinkOption
Capacity returns a LinkOption that sets the link capacity
func DurableSubscription ¶
func DurableSubscription(name string) LinkOption
DurableSubscription returns a LinkOption that configures a Receiver as a named durable subscription. The name overrides (and is overridden by) LinkName() so you should normally only use one of these options.
func Filter ¶
func Filter(m map[amqp.Symbol]interface{}) LinkOption
Filter returns a LinkOption that sets a filter.
func LinkName ¶
func LinkName(s string) LinkOption
LinkName returns a LinkOption that sets the link name.
func Prefetch ¶
func Prefetch(p bool) LinkOption
Prefetch returns a LinkOption that sets a receivers pre-fetch flag. Not relevant for a sender.
func RcvSettle ¶
func RcvSettle(m RcvSettleMode) LinkOption
RcvSettle returns a LinkOption that sets the send settle mode
func SndSettle ¶
func SndSettle(m SndSettleMode) LinkOption
SndSettle returns a LinkOption that sets the send settle mode
func Source ¶
func Source(s string) LinkOption
Source returns a LinkOption that sets address that messages are coming from.
func SourceSettings ¶
func SourceSettings(ts TerminusSettings) LinkOption
SourceSettings returns a LinkOption that sets all the SourceSettings. Note: it will override the source address set by a Source() option
func Target ¶
func Target(s string) LinkOption
Target returns a LinkOption that sets address that messages are going to.
func TargetSettings ¶
func TargetSettings(ts TerminusSettings) LinkOption
TargetSettings returns a LinkOption that sets all the TargetSettings. Note: it will override the target address set by a Target() option
type LinkSettings ¶
type LinkSettings interface { // Source address that messages are coming from. Source() string // Target address that messages are going to. Target() string // Name is a unique name for the link among links between the same // containers in the same direction. By default generated automatically. LinkName() string // IsSender is true if this is the sending end of the link. IsSender() bool // IsReceiver is true if this is the receiving end of the link. IsReceiver() bool // SndSettle defines when the sending end of the link settles message delivery. SndSettle() SndSettleMode // RcvSettle defines when the sending end of the link settles message delivery. RcvSettle() RcvSettleMode // Session containing the Link Session() Session // Filter for the link Filter() map[amqp.Symbol]interface{} // Advanced settings for the source SourceSettings() TerminusSettings // Advanced settings for the target TargetSettings() TerminusSettings }
Settings associated with a link
type Outcome ¶
type Outcome struct { // Status of the message: was it sent, how was it acknowledged. Status SentStatus // Error is a local error if Status is Unsent or Unacknowledged, a remote error otherwise. Error error // Value provided by the application in SendAsync() Value interface{} }
Outcome provides information about the outcome of sending a message.
type RcvSettleMode ¶
type RcvSettleMode proton.RcvSettleMode
RcvSettleMode defines when the receiving end of the link settles message delivery.
type ReceivedMessage ¶
type ReceivedMessage struct { // Message is the received message. Message amqp.Message // contains filtered or unexported fields }
ReceivedMessage contains an amqp.Message and allows the message to be acknowledged.
func (*ReceivedMessage) Accept ¶
func (rm *ReceivedMessage) Accept() error
Accept tells the sender that we take responsibility for processing the message.
func (*ReceivedMessage) Reject ¶
func (rm *ReceivedMessage) Reject() error
Reject tells the sender we consider the message invalid and unusable.
func (*ReceivedMessage) Release ¶
func (rm *ReceivedMessage) Release() error
Release tells the sender we will not process the message but some other receiver might.
type Receiver ¶
type Receiver interface { Endpoint LinkSettings // Receive blocks until a message is available or until the Receiver is closed // and has no more buffered messages. Receive() (ReceivedMessage, error) // ReceiveTimeout is like Receive but gives up after timeout, see Timeout. // // Note that that if Prefetch is false, after a Timeout the credit issued by // Receive remains on the link. It will be used by the next call to Receive. ReceiveTimeout(timeout time.Duration) (ReceivedMessage, error) // Prefetch==true means the Receiver will automatically issue credit to the // remote sender to keep its buffer as full as possible, i.e. it will // "pre-fetch" messages independently of the application calling // Receive(). This gives good throughput for applications that handle a // continuous stream of messages. Larger capacity may improve throughput, the // optimal value depends on the characteristics of your application. // // Prefetch==false means the Receiver will issue only issue credit when you // call Receive(), and will only issue enough credit to satisfy the calls // actually made. This gives lower throughput but will not fetch any messages // in advance. It is good for synchronous applications that need to evaluate // each message before deciding whether to receive another. The // request-response pattern is a typical example. If you make concurrent // calls to Receive with pre-fetch disabled, you can improve performance by // setting the capacity close to the expected number of concurrent calls. // Prefetch() bool // Capacity is the size (number of messages) of the local message buffer // These are messages received but not yet returned to the application by a call to Receive() Capacity() int }
Receiver is a Link that receives messages.
type Sender ¶
type Sender interface { Endpoint LinkSettings // SendSync sends a message and blocks until the message is acknowledged by the remote receiver. // Returns an Outcome, which may contain an error if the message could not be sent. SendSync(m amqp.Message) Outcome // SendWaitable puts a message in the send buffer and returns a channel that // you can use to wait for the Outcome of just that message. The channel is // buffered so you can receive from it whenever you want without blocking. // // Note: can block if there is no space to buffer the message. SendWaitable(m amqp.Message) <-chan Outcome // SendForget buffers a message for sending and returns, with no notification of the outcome. // // Note: can block if there is no space to buffer the message. SendForget(m amqp.Message) // SendAsync puts a message in the send buffer and returns immediately. An // Outcome with Value = value will be sent to the ack channel when the remote // receiver has acknowledged the message or if there is an error. // // You can use the same ack channel for many calls to SendAsync(), possibly on // many Senders. The channel will receive the outcomes in the order they // become available. The channel should be buffered and/or served by dedicated // goroutines to avoid blocking the connection. // // If ack == nil no Outcome is sent. // // Note: can block if there is no space to buffer the message. SendAsync(m amqp.Message, ack chan<- Outcome, value interface{}) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, value interface{}, timeout time.Duration) SendWaitableTimeout(m amqp.Message, timeout time.Duration) <-chan Outcome SendForgetTimeout(m amqp.Message, timeout time.Duration) SendSyncTimeout(m amqp.Message, timeout time.Duration) Outcome }
Sender is a Link that sends messages.
The result of sending a message is provided by an Outcome value.
A sender can buffer messages up to the credit limit provided by the remote receiver. All the Send* methods will block if the buffer is full until there is space. Send*Timeout methods will give up after the timeout and set Timeout as Outcome.Error.
type SentStatus ¶
type SentStatus int
SentStatus indicates the status of a sent message.
const ( // Message was never sent Unsent SentStatus = iota // Message was sent but never acknowledged. It may or may not have been received. Unacknowledged // Message was accepted by the receiver (or was sent pre-settled, accept is assumed) Accepted // Message was rejected as invalid by the receiver Rejected // Message was not processed by the receiver but may be valid for a different receiver Released // Receiver responded with an unrecognized status. Unknown )
func (SentStatus) String ¶
func (s SentStatus) String() string
String human readable name for SentStatus.
type Session ¶
type Session interface { Endpoint // Sender opens a new sender. Sender(...LinkOption) (Sender, error) // Receiver opens a new Receiver. Receiver(...LinkOption) (Receiver, error) }
Session is an AMQP session, it contains Senders and Receivers.
type SessionOption ¶
type SessionOption func(*session)
SessionOption can be passed when creating a Session
func IncomingCapacity ¶
func IncomingCapacity(bytes uint) SessionOption
IncomingCapacity returns a Session Option that sets the size (in bytes) of the session's incoming data buffer.
func OutgoingWindow ¶
func OutgoingWindow(frames uint) SessionOption
OutgoingWindow returns a Session Option that sets the outgoing window size (in frames).
type SndSettleMode ¶
type SndSettleMode proton.SndSettleMode
SndSettleMode defines when the sending end of the link settles message delivery.
type TerminusSettings ¶
type TerminusSettings struct { Durability proton.Durability Expiry proton.ExpiryPolicy Timeout time.Duration Dynamic bool }
Advanced AMQP settings for the source or target of a link. Usually these can be set via a more descriptive LinkOption, e.g. DurableSubscription() and do not need to be set/examined directly.