qpid.apache.org: qpid.apache.org/electron Index | Examples | Files

package electron

import "qpid.apache.org/electron"

Package electron lets you write concurrent AMQP 1.0 messaging clients and servers.

This package requires the [proton-C library](http://qpid.apache.org/proton) to be installed.

Start by creating a Container with NewContainer. An AMQP Container represents a single AMQP "application" and can contain client and server connections.

You can enable AMQP over any connection that implements the standard net.Conn interface. Typically you can connect with net.Dial() or listen for server connections with net.Listen. Enable AMQP by passing the net.Conn to Container.Connection().

AMQP allows bi-direction peer-to-peer message exchange as well as client-to-broker. Messages are sent over "links". Each link is one-way and has a Sender and Receiver end. Connection.Sender() and Connection.Receiver() open links to Send() and Receive() messages. Connection.Incoming() lets you accept incoming links opened by the remote peer. You can open and accept multiple links in both directions on a single Connection.

Some of the documentation examples show client and server side by side in a single program, in separate goroutines. This is only for example purposes, real AMQP applications would run in separate processes on the network.

Some of the documentation examples show client and server side by side in a single program, in separate goroutines. This is only for example purposes, real AMQP applications would run in separate processes on the network.

Example client sending messages to a server running in a goroutine.

Code:

l, err := net.Listen("tcp", "127.0.0.1:0") // tcp4 so example will work on ipv6-disabled platforms
if err != nil {
    log.Fatal(err)
}

// SERVER: start the server running in a separate goroutine
var waitServer sync.WaitGroup // We will wait for the server goroutine to finish before exiting
waitServer.Add(1)
go func() { // Run the server in the background
    defer waitServer.Done()
    Server(l)
}()

// CLIENT: Send messages to the server
addr := l.Addr()
c, err := electron.Dial(addr.Network(), addr.String())
if err != nil {
    log.Fatal(err)
}
s, err := c.Sender()
if err != nil {
    log.Fatal(err)
}
for i := 0; i < 3; i++ {
    msg := fmt.Sprintf("hello %v", i)
    // Send and wait for the Outcome from the server.
    // Note: For higher throughput, use SendAsync() to send a stream of messages
    // and process the returning stream of Outcomes concurrently.
    s.SendSync(amqp.NewMessageWith(msg))
}
c.Close(nil) // Closing the connection will stop the server

waitServer.Wait() // Let the server finish

Output:

server received: "hello 0"
server received: "hello 1"
server received: "hello 2"
server receiver closed: EOF

Index

Examples

Package Files

connection.go container.go doc.go endpoint.go handler.go link.go receiver.go sender.go session.go time.go

Constants

const (
    // Messages are sent unsettled
    SndUnsettled = SndSettleMode(proton.SndUnsettled)
    // Messages are sent already settled
    SndSettled = SndSettleMode(proton.SndSettled)
    // Sender can send either unsettled or settled messages.
    SndMixed = SndSettleMode(proton.SndMixed)
)
const (
    // Receiver settles first.
    RcvFirst = RcvSettleMode(proton.RcvFirst)
    // Receiver waits for sender to settle before settling.
    RcvSecond = RcvSettleMode(proton.RcvSecond)
)
const Forever time.Duration = math.MaxInt64

Forever can be used as a timeout parameter to indicate wait forever.

Variables

var Closed = io.EOF

Closed is an alias for io.EOF. It is returned as an error when an endpoint was closed cleanly.

var EOF = io.EOF

EOF is an alias for io.EOF. It is returned as an error when an endpoint was closed cleanly.

var Timeout = fmt.Errorf("timeout")

Timeout is the error returned if an operation does not complete on time.

Methods named *Timeout in this package take time.Duration timeout parameter.

If timeout > 0 and there is no result available before the timeout, they return a zero or nil value and Timeout as an error.

If timeout == 0 they will return a result if one is immediately available or nil/zero and Timeout as an error if not.

If timeout == Forever the function will return only when there is a result or some non-timeout error occurs.

func After Uses

func After(timeout time.Duration) <-chan time.Time

After is like time.After but returns a nil channel if timeout == Forever since selecting on a nil channel will never return.

func GlobalSASLConfigDir Uses

func GlobalSASLConfigDir(dir string)

GlobalSASLConfigDir sets the SASL configuration directory for every Connection created in this process. If not called, the default is determined by your SASL installation.

You can set SASLAllowInsecure and SASLAllowedMechs on individual connections.

Must be called at most once, before any connections are created.

func GlobalSASLConfigName Uses

func GlobalSASLConfigName(name string)

GlobalSASLConfigName sets the SASL configuration name for every Connection created in this process. If not called the default is "proton-server".

The complete configuration file name is

<sasl-config-dir>/<sasl-config-name>.conf

You can set SASLAllowInsecure and SASLAllowedMechs on individual connections.

Must be called at most once, before any connections are created.

func NewConnection Uses

func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error)

NewConnection creates a connection with the given options. Options are applied in order.

func SASLExtended Uses

func SASLExtended() bool

Do we support extended SASL negotiation? All implementations of Proton support ANONYMOUS and EXTERNAL on both client and server sides and PLAIN on the client side.

Extended SASL implememtations use an external library (Cyrus SASL) to support other mechanisms beyond these basic ones.

type Connection Uses

type Connection interface {
    Endpoint
    ConnectionSettings

    // Sender opens a new sender on the DefaultSession.
    Sender(...LinkOption) (Sender, error)

    // Receiver opens a new Receiver on the DefaultSession().
    Receiver(...LinkOption) (Receiver, error)

    // DefaultSession() returns a default session for the connection. It is opened
    // on the first call to DefaultSession and returned on subsequent calls.
    DefaultSession() (Session, error)

    // Session opens a new session.
    Session(...SessionOption) (Session, error)

    // Container for the connection.
    Container() Container

    // Disconnect the connection abruptly with an error.
    Disconnect(error)

    // Wait waits for the connection to be disconnected.
    Wait() error

    // WaitTimeout is like Wait but returns Timeout if the timeout expires.
    WaitTimeout(time.Duration) error

    // Incoming returns a channel for incoming endpoints opened by the remote peer.
    // See the Incoming interface for more detail.
    //
    // Note: this channel will first return an *IncomingConnection for the
    // connection itself which allows you to look at security information and
    // decide whether to Accept() or Reject() the connection. Then it will return
    // *IncomingSession, *IncomingSender and *IncomingReceiver as they are opened
    // by the remote end.
    //
    // Note 2: you must receiving from Incoming() and call Accept/Reject to avoid
    // blocking electron event loop. Normally you would run a loop in a goroutine
    // to handle incoming types that interest and Accept() those that don't.
    Incoming() <-chan Incoming
}

Connection is an AMQP connection, created by a Container.

func Dial Uses

func Dial(network, address string, opts ...ConnectionOption) (c Connection, err error)

Dial is shorthand for using net.Dial() then NewConnection() See net.Dial() for the meaning of the network, address arguments.

func DialWithDialer Uses

func DialWithDialer(dialer *net.Dialer, network, address string, opts ...ConnectionOption) (c Connection, err error)

DialWithDialer is shorthand for using dialer.Dial() then NewConnection() See net.Dial() for the meaning of the network, address arguments.

type ConnectionOption Uses

type ConnectionOption func(*connection)

ConnectionOption arguments can be passed when creating a connection to configure it.

func AllowIncoming Uses

func AllowIncoming() ConnectionOption

AllowIncoming returns a ConnectionOption to enable incoming endpoints, see Connection.Incoming() This is automatically set for Server() connections.

func ContainerId Uses

func ContainerId(id string) ConnectionOption

ContainerId returns a ConnectionOption that creates a new Container with id and associates it with the connection

func Heartbeat Uses

func Heartbeat(delay time.Duration) ConnectionOption

Heartbeat returns a ConnectionOption that requests the maximum delay between sending frames for the remote peer. If we don't receive any frames within 2*delay we will close the connection.

func Parent Uses

func Parent(cont Container) ConnectionOption

Parent returns a ConnectionOption that associates the Connection with it's Container If not set a connection will create its own default container.

func Password Uses

func Password(password []byte) ConnectionOption

Password returns a ConnectionOption to set the password used to establish a connection. Only applies to outbound client connection.

The connection will erase its copy of the password from memory as soon as it has been used to authenticate. If you are concerned about passwords staying in memory you should never store them as strings, and should overwrite your copy as soon as you are done with it.

func SASLAllowInsecure Uses

func SASLAllowInsecure(b bool) ConnectionOption

SASLAllowInsecure returns a ConnectionOption that allows or disallows clear text SASL authentication mechanisms

By default the SASL layer is configured not to allow mechanisms that disclose the clear text of the password over an unencrypted AMQP connection. This specifically will disallow the use of the PLAIN mechanism without using SSL encryption.

This default is to avoid disclosing password information accidentally over an insecure network.

func SASLAllowedMechs Uses

func SASLAllowedMechs(mechs string) ConnectionOption

SASLAllowedMechs returns a ConnectionOption to set the list of allowed SASL mechanisms.

Can be used on the client or the server to restrict the SASL for a connection. mechs is a space-separated list of mechanism names.

The mechanisms allowed by default are determined by your SASL library and system configuration, with two exceptions: GSSAPI and GSS-SPNEGO are disabled by default. To enable them, you must explicitly add them using this option.

Clients must set the allowed mechanisms before the the outgoing connection is attempted. Servers must set them before the listening connection is setup.

func SASLEnable Uses

func SASLEnable() ConnectionOption

SASLEnable returns a ConnectionOption that enables SASL authentication. Only required if you don't set any other SASL options.

func Server Uses

func Server() ConnectionOption

Server returns a ConnectionOption to put the connection in server mode for incoming connections.

A server connection will do protocol negotiation to accept a incoming AMQP connection. Normally you would call this for a connection created by net.Listener.Accept()

func User Uses

func User(user string) ConnectionOption

User returns a ConnectionOption sets the user name for a connection

func VirtualHost Uses

func VirtualHost(virtualHost string) ConnectionOption

VirtualHost returns a ConnectionOption to set the AMQP virtual host for the connection. Only applies to outbound client connection.

type ConnectionSettings Uses

type ConnectionSettings interface {
    // Authenticated user name associated with the connection.
    User() string

    // The AMQP virtual host name for the connection.
    //
    // Optional, useful when the server has multiple names and provides different
    // service based on the name the client uses to connect.
    //
    // By default it is set to the DNS host name that the client uses to connect,
    // but it can be set to something different at the client side with the
    // VirtualHost() option.
    //
    // Returns error if the connection fails to authenticate.
    VirtualHost() string

    // Heartbeat is the maximum delay between sending frames that the remote peer
    // has requested of us. If the interval expires an empty "heartbeat" frame
    // will be sent automatically to keep the connection open.
    Heartbeat() time.Duration
}

Settings associated with a Connection.

type Container Uses

type Container interface {
    // Id is a unique identifier for the container in your distributed application.
    Id() string

    // Connection creates a connection associated with this container.
    Connection(conn net.Conn, opts ...ConnectionOption) (Connection, error)

    // Dial is shorthand for
    //     conn, err := net.Dial(); c, err := Connection(conn, opts...)
    // See net.Dial() for the meaning of the network, address arguments.
    Dial(network string, address string, opts ...ConnectionOption) (Connection, error)

    // Accept is shorthand for:
    //     conn, err := l.Accept(); c, err := Connection(conn, append(opts, Server()...)
    Accept(l net.Listener, opts ...ConnectionOption) (Connection, error)

    // String returns Id()
    String() string
}

Container is an AMQP container, it represents a single AMQP "application" which can have multiple client or server connections.

Each Container in a distributed AMQP application must have a unique container-id which is applied to its connections.

Create with NewContainer()

func NewContainer Uses

func NewContainer(id string) Container

NewContainer creates a new container. The id must be unique in your distributed application, all connections created by the container will have this container-id.

If id == "" a random UUID will be generated for the id.

type Endpoint Uses

type Endpoint interface {
    // Close an endpoint and signal an error to the remote end if error != nil.
    Close(error)

    // String is a human readable identifier, useful for debugging and logging.
    String() string

    // Error returns nil if the endpoint is open, otherwise returns an error.
    // Error() == Closed means the endpoint was closed without error.
    Error() error

    // Connection is the connection associated with this endpoint.
    Connection() Connection

    // Done returns a channel that will close when the endpoint closes.
    // After Done() has closed, Error() will return the reason for closing.
    Done() <-chan struct{}

    // Sync() waits for the remote peer to confirm the endpoint is active or
    // reject it with an error. You can call it immediately on new endpoints
    // for more predictable error handling.
    //
    // AMQP is an asynchronous protocol. It is legal to create an endpoint and
    // start using it without waiting for confirmation. This avoids a needless
    // delay in the non-error case and throughput by "assuming the best".
    //
    // However if there *is* an error, these "optimistic" actions will fail. The
    // endpoint and its children will be closed with an error. The error will only
    // be detected when you try to use one of these endpoints or call Sync()
    Sync() error
    // contains filtered or unexported methods
}

Endpoint is the local end of a communications channel to the remote peer process. The following interface implement Endpoint: Connection, Session, Sender and Receiver.

You can create an endpoint with functions on Container, Connection and Session. You can accept incoming endpoints from the remote peer using Connection.Incoming()

type Incoming Uses

type Incoming interface {
    // Accept and open the endpoint.
    Accept() Endpoint

    // Reject the endpoint with an error
    Reject(error)
    // contains filtered or unexported methods
}

Incoming is the interface for incoming endpoints, see Connection.Incoming()

Call Incoming.Accept() to open the endpoint or Incoming.Reject() to close it with optional error

Implementing types are *IncomingConnection, *IncomingSession, *IncomingSender and *IncomingReceiver. Each type provides methods to examine the incoming endpoint request and set configuration options for the local endpoint before calling Accept() or Reject()

type IncomingConnection Uses

type IncomingConnection struct {
    // contains filtered or unexported fields
}

func (*IncomingConnection) Accept Uses

func (in *IncomingConnection) Accept() Endpoint

func (*IncomingConnection) AcceptConnection Uses

func (in *IncomingConnection) AcceptConnection(opts ...ConnectionOption) Connection

AcceptConnection is like Accept() but takes ConnectionOption arguments like NewConnection(). For example you can set the Heartbeat() for the incoming connection.

func (IncomingConnection) Heartbeat Uses

func (c IncomingConnection) Heartbeat() time.Duration

func (*IncomingConnection) Reject Uses

func (in *IncomingConnection) Reject(err error)

func (*IncomingConnection) String Uses

func (in *IncomingConnection) String() string

func (IncomingConnection) User Uses

func (c IncomingConnection) User() string

func (IncomingConnection) VirtualHost Uses

func (c IncomingConnection) VirtualHost() string

type IncomingReceiver Uses

type IncomingReceiver struct {
    // contains filtered or unexported fields
}

IncomingReceiver is sent on the Connection.Incoming() channel when there is an incoming request to open a receiver link.

func (*IncomingReceiver) Accept Uses

func (in *IncomingReceiver) Accept() Endpoint

Accept accepts an incoming receiver endpoint

func (*IncomingReceiver) Filter Uses

func (l *IncomingReceiver) Filter() map[amqp.Symbol]interface{}

func (*IncomingReceiver) IsReceiver Uses

func (l *IncomingReceiver) IsReceiver() bool

func (*IncomingReceiver) IsSender Uses

func (l *IncomingReceiver) IsSender() bool

func (*IncomingReceiver) LinkName Uses

func (l *IncomingReceiver) LinkName() string

func (*IncomingReceiver) RcvSettle Uses

func (l *IncomingReceiver) RcvSettle() RcvSettleMode

func (*IncomingReceiver) Reject Uses

func (in *IncomingReceiver) Reject(err error)

func (*IncomingReceiver) SetCapacity Uses

func (in *IncomingReceiver) SetCapacity(capacity int)

SetCapacity sets the capacity of the incoming receiver, call before Accept()

func (*IncomingReceiver) SetPrefetch Uses

func (in *IncomingReceiver) SetPrefetch(prefetch bool)

SetPrefetch sets the pre-fetch mode of the incoming receiver, call before Accept()

func (*IncomingReceiver) SndSettle Uses

func (l *IncomingReceiver) SndSettle() SndSettleMode

func (*IncomingReceiver) Source Uses

func (l *IncomingReceiver) Source() string

func (*IncomingReceiver) SourceSettings Uses

func (l *IncomingReceiver) SourceSettings() TerminusSettings

func (*IncomingReceiver) String Uses

func (in *IncomingReceiver) String() string

func (*IncomingReceiver) Target Uses

func (l *IncomingReceiver) Target() string

func (*IncomingReceiver) TargetSettings Uses

func (l *IncomingReceiver) TargetSettings() TerminusSettings

type IncomingSender Uses

type IncomingSender struct {
    // contains filtered or unexported fields
}

IncomingSender is sent on the Connection.Incoming() channel when there is an incoming request to open a sender link.

func (*IncomingSender) Accept Uses

func (in *IncomingSender) Accept() Endpoint

Accept accepts an incoming sender endpoint

func (*IncomingSender) Filter Uses

func (l *IncomingSender) Filter() map[amqp.Symbol]interface{}

func (*IncomingSender) IsReceiver Uses

func (l *IncomingSender) IsReceiver() bool

func (*IncomingSender) IsSender Uses

func (l *IncomingSender) IsSender() bool

func (*IncomingSender) LinkName Uses

func (l *IncomingSender) LinkName() string

func (*IncomingSender) RcvSettle Uses

func (l *IncomingSender) RcvSettle() RcvSettleMode

func (*IncomingSender) Reject Uses

func (in *IncomingSender) Reject(err error)

func (*IncomingSender) SndSettle Uses

func (l *IncomingSender) SndSettle() SndSettleMode

func (*IncomingSender) Source Uses

func (l *IncomingSender) Source() string

func (*IncomingSender) SourceSettings Uses

func (l *IncomingSender) SourceSettings() TerminusSettings

func (*IncomingSender) String Uses

func (in *IncomingSender) String() string

func (*IncomingSender) Target Uses

func (l *IncomingSender) Target() string

func (*IncomingSender) TargetSettings Uses

func (l *IncomingSender) TargetSettings() TerminusSettings

type IncomingSession Uses

type IncomingSession struct {
    // contains filtered or unexported fields
}

IncomingSender is sent on the Connection.Incoming() channel when there is an incoming request to open a session.

func (*IncomingSession) Accept Uses

func (in *IncomingSession) Accept() Endpoint

Accept an incoming session endpoint.

func (*IncomingSession) Reject Uses

func (in *IncomingSession) Reject(err error)

func (*IncomingSession) SetIncomingCapacity Uses

func (in *IncomingSession) SetIncomingCapacity(bytes uint)

SetIncomingCapacity sets the session buffer capacity of an incoming session in bytes.

func (*IncomingSession) SetOutgoingWindow Uses

func (in *IncomingSession) SetOutgoingWindow(frames uint)

SetOutgoingWindow sets the session outgoing window of an incoming session in frames.

func (*IncomingSession) String Uses

func (in *IncomingSession) String() string

type LinkOption Uses

type LinkOption func(*linkSettings)

LinkOption can be passed when creating a sender or receiver link to set optional configuration.

func AtLeastOnce Uses

func AtLeastOnce() LinkOption

AtLeastOnce returns a LinkOption that requests acknowledgment for every message, acknowledgment indicates the message was definitely received. In the event of a failure, unacknowledged messages can be re-sent but there is a chance that the message will be received twice in this case. Sets SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst

func AtMostOnce Uses

func AtMostOnce() LinkOption

AtMostOnce returns a LinkOption that sets "fire and forget" mode, messages are sent but no acknowledgment is received, messages can be lost if there is a network failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst

func Capacity Uses

func Capacity(n int) LinkOption

Capacity returns a LinkOption that sets the link capacity

func DurableSubscription Uses

func DurableSubscription(name string) LinkOption

DurableSubscription returns a LinkOption that configures a Receiver as a named durable subscription. The name overrides (and is overridden by) LinkName() so you should normally only use one of these options.

func Filter Uses

func Filter(m map[amqp.Symbol]interface{}) LinkOption

Filter returns a LinkOption that sets a filter.

func LinkName Uses

func LinkName(s string) LinkOption

LinkName returns a LinkOption that sets the link name.

func Prefetch Uses

func Prefetch(p bool) LinkOption

Prefetch returns a LinkOption that sets a receivers pre-fetch flag. Not relevant for a sender.

func RcvSettle Uses

func RcvSettle(m RcvSettleMode) LinkOption

RcvSettle returns a LinkOption that sets the send settle mode

func SndSettle Uses

func SndSettle(m SndSettleMode) LinkOption

SndSettle returns a LinkOption that sets the send settle mode

func Source Uses

func Source(s string) LinkOption

Source returns a LinkOption that sets address that messages are coming from.

func SourceSettings Uses

func SourceSettings(ts TerminusSettings) LinkOption

SourceSettings returns a LinkOption that sets all the SourceSettings. Note: it will override the source address set by a Source() option

func Target Uses

func Target(s string) LinkOption

Target returns a LinkOption that sets address that messages are going to.

func TargetSettings Uses

func TargetSettings(ts TerminusSettings) LinkOption

TargetSettings returns a LinkOption that sets all the TargetSettings. Note: it will override the target address set by a Target() option

type LinkSettings Uses

type LinkSettings interface {
    // Source address that messages are coming from.
    Source() string

    // Target address that messages are going to.
    Target() string

    // Name is a unique name for the link among links between the same
    // containers in the same direction. By default generated automatically.
    LinkName() string

    // IsSender is true if this is the sending end of the link.
    IsSender() bool

    // IsReceiver is true if this is the receiving end of the link.
    IsReceiver() bool

    // SndSettle defines when the sending end of the link settles message delivery.
    SndSettle() SndSettleMode

    // RcvSettle defines when the sending end of the link settles message delivery.
    RcvSettle() RcvSettleMode

    // Session containing the Link
    Session() Session

    // Filter for the link
    Filter() map[amqp.Symbol]interface{}

    // Advanced settings for the source
    SourceSettings() TerminusSettings

    // Advanced settings for the target
    TargetSettings() TerminusSettings
}

Settings associated with a link

type Outcome Uses

type Outcome struct {
    // Status of the message: was it sent, how was it acknowledged.
    Status SentStatus
    // Error is a local error if Status is Unsent or Unacknowledged, a remote error otherwise.
    Error error
    // Value provided by the application in SendAsync()
    Value interface{}
}

Outcome provides information about the outcome of sending a message.

type RcvSettleMode Uses

type RcvSettleMode proton.RcvSettleMode

RcvSettleMode defines when the receiving end of the link settles message delivery.

type ReceivedMessage Uses

type ReceivedMessage struct {
    // Message is the received message.
    Message amqp.Message
    // contains filtered or unexported fields
}

ReceivedMessage contains an amqp.Message and allows the message to be acknowledged.

func (*ReceivedMessage) Accept Uses

func (rm *ReceivedMessage) Accept() error

Accept tells the sender that we take responsibility for processing the message.

func (*ReceivedMessage) Reject Uses

func (rm *ReceivedMessage) Reject() error

Reject tells the sender we consider the message invalid and unusable.

func (*ReceivedMessage) Release Uses

func (rm *ReceivedMessage) Release() error

Release tells the sender we will not process the message but some other receiver might.

type Receiver Uses

type Receiver interface {
    Endpoint
    LinkSettings

    // Receive blocks until a message is available or until the Receiver is closed
    // and has no more buffered messages.
    Receive() (ReceivedMessage, error)

    // ReceiveTimeout is like Receive but gives up after timeout, see Timeout.
    //
    // Note that that if Prefetch is false, after a Timeout the credit issued by
    // Receive remains on the link. It will be used by the next call to Receive.
    ReceiveTimeout(timeout time.Duration) (ReceivedMessage, error)

    // Prefetch==true means the Receiver will automatically issue credit to the
    // remote sender to keep its buffer as full as possible, i.e. it will
    // "pre-fetch" messages independently of the application calling
    // Receive(). This gives good throughput for applications that handle a
    // continuous stream of messages. Larger capacity may improve throughput, the
    // optimal value depends on the characteristics of your application.
    //
    // Prefetch==false means the Receiver will issue only issue credit when you
    // call Receive(), and will only issue enough credit to satisfy the calls
    // actually made. This gives lower throughput but will not fetch any messages
    // in advance. It is good for synchronous applications that need to evaluate
    // each message before deciding whether to receive another. The
    // request-response pattern is a typical example.  If you make concurrent
    // calls to Receive with pre-fetch disabled, you can improve performance by
    // setting the capacity close to the expected number of concurrent calls.
    //
    Prefetch() bool

    // Capacity is the size (number of messages) of the local message buffer
    // These are messages received but not yet returned to the application by a call to Receive()
    Capacity() int
}

Receiver is a Link that receives messages.

type Sender Uses

type Sender interface {
    Endpoint
    LinkSettings

    // SendSync sends a message and blocks until the message is acknowledged by the remote receiver.
    // Returns an Outcome, which may contain an error if the message could not be sent.
    SendSync(m amqp.Message) Outcome

    // SendWaitable puts a message in the send buffer and returns a channel that
    // you can use to wait for the Outcome of just that message. The channel is
    // buffered so you can receive from it whenever you want without blocking.
    //
    // Note: can block if there is no space to buffer the message.
    SendWaitable(m amqp.Message) <-chan Outcome

    // SendForget buffers a message for sending and returns, with no notification of the outcome.
    //
    // Note: can block if there is no space to buffer the message.
    SendForget(m amqp.Message)

    // SendAsync puts a message in the send buffer and returns immediately.  An
    // Outcome with Value = value will be sent to the ack channel when the remote
    // receiver has acknowledged the message or if there is an error.
    //
    // You can use the same ack channel for many calls to SendAsync(), possibly on
    // many Senders. The channel will receive the outcomes in the order they
    // become available. The channel should be buffered and/or served by dedicated
    // goroutines to avoid blocking the connection.
    //
    // If ack == nil no Outcome is sent.
    //
    // Note: can block if there is no space to buffer the message.
    SendAsync(m amqp.Message, ack chan<- Outcome, value interface{})

    SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, value interface{}, timeout time.Duration)

    SendWaitableTimeout(m amqp.Message, timeout time.Duration) <-chan Outcome

    SendForgetTimeout(m amqp.Message, timeout time.Duration)

    SendSyncTimeout(m amqp.Message, timeout time.Duration) Outcome
}

Sender is a Link that sends messages.

The result of sending a message is provided by an Outcome value.

A sender can buffer messages up to the credit limit provided by the remote receiver. All the Send* methods will block if the buffer is full until there is space. Send*Timeout methods will give up after the timeout and set Timeout as Outcome.Error.

type SentStatus Uses

type SentStatus int

SentStatus indicates the status of a sent message.

const (
    // Message was never sent
    Unsent SentStatus = iota
    // Message was sent but never acknowledged. It may or may not have been received.
    Unacknowledged
    // Message was accepted by the receiver (or was sent pre-settled, accept is assumed)
    Accepted
    // Message was rejected as invalid by the receiver
    Rejected
    // Message was not processed by the receiver but may be valid for a different receiver
    Released
    // Receiver responded with an unrecognized status.
    Unknown
)

func (SentStatus) String Uses

func (s SentStatus) String() string

String human readable name for SentStatus.

type Session Uses

type Session interface {
    Endpoint

    // Sender opens a new sender.
    Sender(...LinkOption) (Sender, error)

    // Receiver opens a new Receiver.
    Receiver(...LinkOption) (Receiver, error)
}

Session is an AMQP session, it contains Senders and Receivers.

type SessionOption Uses

type SessionOption func(*session)

SessionOption can be passed when creating a Session

func IncomingCapacity Uses

func IncomingCapacity(bytes uint) SessionOption

IncomingCapacity returns a Session Option that sets the size (in bytes) of the session's incoming data buffer.

func OutgoingWindow Uses

func OutgoingWindow(frames uint) SessionOption

OutgoingWindow returns a Session Option that sets the outgoing window size (in frames).

type SndSettleMode Uses

type SndSettleMode proton.SndSettleMode

SndSettleMode defines when the sending end of the link settles message delivery.

type TerminusSettings Uses

type TerminusSettings struct {
    Durability proton.Durability
    Expiry     proton.ExpiryPolicy
    Timeout    time.Duration
    Dynamic    bool
}

Advanced AMQP settings for the source or target of a link. Usually these can be set via a more descriptive LinkOption, e.g. DurableSubscription() and do not need to be set/examined directly.

Package electron imports 14 packages (graph) and is imported by 7 packages. Updated 2020-04-29. Refresh now. Tools for package owners.