electron

package
v0.0.0-...-f45cdce Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2016 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package electron lets you write concurrent AMQP 1.0 messaging clients and servers.

Start by creating a Container with NewContainer. An AMQP Container represents a single AMQP "application" and can contain client and server connections.

You can enable AMQP over any connection that implements the standard net.Conn interface. Typically you can connect with net.Dial() or listen for server connections with net.Listen. Enable AMQP by passing the net.Conn to Container.Connection().

AMQP allows bi-direction peer-to-peer message exchange as well as client-to-broker. Messages are sent over "links". Each link is one-way and has a Sender and Receiver end. Connection.Sender() and Connection.Receiver() open links to Send() and Receive() messages. Connection.Incoming() lets you accept incoming links opened by the remote peer. You can open and accept multiple links in both directions on a single Connection.

Some of the documentation examples show client and server side by side in a single program, in separate goroutines. This is only for example purposes, real AMQP applications would run in separate processes on the network. More realistic examples: https://github.com/apache/qpid-proton/blob/master/examples/go/README.md

Some of the documentation examples show client and server side by side in a single program, in separate goroutines. This is only for example purposes, real AMQP applications would run in separate processes on the network. More realistic examples: https://github.com/apache/qpid-proton/blob/master/examples/go/README.md

Example (ClientServer)

Example client and server communicating via AMQP over a TCP/IP connection.

Normally client and server would be separate processes. For more realistic examples:

https://github.com/apache/qpid-proton/blob/master/examples/go/README.md
package main

import (
	"fmt"
	"net"
	"qpid.apache.org/amqp"
	"qpid.apache.org/electron"
)

// Print errors
func check(msg string, err error) bool {
	if err != nil {
		fmt.Printf("%s: %s\n", msg, err)
	}
	return err == nil
}

func runServer(cont electron.Container, l net.Listener) {
	for c, err := cont.Accept(l); check("accept connection", err); c, err = cont.Accept(l) {
		go func() { // Process connections concurrently, accepting AMQP endpoints
			for in := range c.Incoming() {
				ep := in.Accept() // Accept all endpoints
				go func() {       // Process endpoints concurrently
					switch ep := ep.(type) {
					case electron.Sender:
						m := amqp.NewMessageWith("hello yourself")
						fmt.Printf("server %q sending %q\n", ep.Source(), m.Body())
						ep.SendForget(m) // One-way send, client does not need to Accept.
					case electron.Receiver:
						if rm, err := ep.Receive(); check("server receive", err) {
							fmt.Printf("server %q received %q\n", ep.Target(), rm.Message.Body())
							err := rm.Accept() // Client is waiting for Accept.
							check("accept message", err)
						}
					}
				}()
			}
		}()
	}
}

func startServer() (addr net.Addr) {
	cont := electron.NewContainer("server")
	if l, err := net.Listen("tcp", ""); check("listen", err) {
		addr = l.Addr()
		go runServer(cont, l)
	}
	return addr
}

// Connect to addr and send/receive a message.
func client(addr net.Addr) {
	if c, err := electron.Dial(addr.Network(), addr.String()); check("dial", err) {
		defer c.Close(nil)
		if s, err := c.Sender(electron.Target("target")); check("sender", err) {
			fmt.Printf("client sending\n")
			s.SendSync(amqp.NewMessageWith("hello")) // Send and wait for server to Accept()
		}
		if r, err := c.Receiver(electron.Source("source")); check("receiver", err) {
			if rm, err := r.Receive(); err == nil {
				fmt.Printf("client received %q\n", rm.Message.Body())
			}
		}
	}
}

// Example client and server communicating via AMQP over a TCP/IP connection.
//
// Normally client and server would be separate processes.
// For more realistic examples:
//
//	https://github.com/apache/qpid-proton/blob/master/examples/go/README.md
func main() {
	addr := startServer()
	client(addr)
}
Output:

client sending
server "target" received "hello"
server "source" sending "hello yourself"
client received "hello yourself"

Index

Examples

Constants

View Source
const (
	// Messages are sent unsettled
	SndUnsettled = SndSettleMode(proton.SndUnsettled)
	// Messages are sent already settled
	SndSettled = SndSettleMode(proton.SndSettled)
	// Sender can send either unsettled or settled messages.
	SendMixed = SndSettleMode(proton.SndMixed)
)
View Source
const (
	// Receiver settles first.
	RcvFirst = RcvSettleMode(proton.RcvFirst)
	// Receiver waits for sender to settle before settling.
	RcvSecond = RcvSettleMode(proton.RcvSecond)
)

Forever can be used as a timeout parameter to indicate wait forever.

Variables

View Source
var Closed = io.EOF

Closed is an alias for io.EOF. It is returned as an error when an endpoint was closed cleanly.

View Source
var Timeout = fmt.Errorf("timeout")

Timeout is the error returned if an operation does not complete on time.

Methods named *Timeout in this package take time.Duration timeout parameter.

If timeout > 0 and there is no result available before the timeout, they return a zero or nil value and Timeout as an error.

If timeout == 0 they will return a result if one is immediatley available or nil/zero and Timeout as an error if not.

If timeout == Forever the function will return only when there is a result or some non-timeout error occurs.

Functions

func After

func After(timeout time.Duration) <-chan time.Time

After is like time.After but returns a nil channel if timeout == Forever since selecting on a nil channel will never return.

func GlobalSASLConfigDir

func GlobalSASLConfigDir(dir string)

GlobalSASLConfigDir sets the SASL configuration directory for every Connection created in this process. If not called, the default is determined by your SASL installation.

You can set SASLAllowInsecure and SASLAllowedMechs on individual connections.

func GlobalSASLConfigName

func GlobalSASLConfigName(dir string)

GlobalSASLConfigName sets the SASL configuration name for every Connection created in this process. If not called the default is "proton-server".

The complete configuration file name is

<sasl-config-dir>/<sasl-config-name>.conf

You can set SASLAllowInsecure and SASLAllowedMechs on individual connections.

func NewConnection

func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error)

NewConnection creates a connection with the given options.

Types

type Connection

type Connection interface {
	Endpoint
	ConnectionSettings

	// Sender opens a new sender on the DefaultSession.
	Sender(...LinkOption) (Sender, error)

	// Receiver opens a new Receiver on the DefaultSession().
	Receiver(...LinkOption) (Receiver, error)

	// DefaultSession() returns a default session for the connection. It is opened
	// on the first call to DefaultSession and returned on subsequent calls.
	DefaultSession() (Session, error)

	// Session opens a new session.
	Session(...SessionOption) (Session, error)

	// Container for the connection.
	Container() Container

	// Disconnect the connection abruptly with an error.
	Disconnect(error)

	// Wait waits for the connection to be disconnected.
	Wait() error

	// WaitTimeout is like Wait but returns Timeout if the timeout expires.
	WaitTimeout(time.Duration) error

	// Incoming returns a channel for incoming endpoints opened by the remote peer.
	// See the Incoming interface for more.
	//
	// Not receiving from Incoming() and calling Accept/Reject will block the
	// electron event loop. You should run a loop to handle the types that
	// interest you in a switch{} and and Accept() all others.
	Incoming() <-chan Incoming
}

Connection is an AMQP connection, created by a Container.

func Dial

func Dial(network, addr string, opts ...ConnectionOption) (c Connection, err error)

Dial is shorthand for using net.Dial() then NewConnection()

func DialWithDialer

func DialWithDialer(dialer *net.Dialer, network, addr string, opts ...ConnectionOption) (c Connection, err error)

DialWithDialer is shorthand for using dialer.Dial() then NewConnection()

type ConnectionOption

type ConnectionOption func(*connection)

ConnectionOption can be passed when creating a connection to configure various options

func AllowIncoming

func AllowIncoming() ConnectionOption

AllowIncoming returns a ConnectionOption to enable incoming endpoints, see Connection.Incoming() This is automatically set for Server() connections.

func Heartbeat

func Heartbeat(delay time.Duration) ConnectionOption

Heartbeat returns a ConnectionOption that requests the maximum delay between sending frames for the remote peer. If we don't receive any frames within 2*delay we will close the connection.

func Parent

func Parent(cont Container) ConnectionOption

Parent returns a ConnectionOption that associates the Connection with it's Container If not set a connection will create its own default container.

func Password

func Password(password []byte) ConnectionOption

Password returns a ConnectionOption to set the password used to establish a connection. Only applies to outbound client connection.

The connection will erase its copy of the password from memory as soon as it has been used to authenticate. If you are concerned about paswords staying in memory you should never store them as strings, and should overwrite your copy as soon as you are done with it.

func SASLAllowInsecure

func SASLAllowInsecure(b bool) ConnectionOption

SASLAllowInsecure returns a ConnectionOption that allows or disallows clear text SASL authentication mechanisms

By default the SASL layer is configured not to allow mechanisms that disclose the clear text of the password over an unencrypted AMQP connection. This specifically will disallow the use of the PLAIN mechanism without using SSL encryption.

This default is to avoid disclosing password information accidentally over an insecure network.

func SASLAllowedMechs

func SASLAllowedMechs(mechs string) ConnectionOption

SASLAllowedMechs returns a ConnectionOption to set the list of allowed SASL mechanisms.

Can be used on the client or the server to restrict the SASL for a connection. mechs is a space-separated list of mechanism names.

func SASLEnable

func SASLEnable() ConnectionOption

SASLEnable returns a ConnectionOption that enables SASL authentication. Only required if you don't set any other SASL options.

func Server

func Server() ConnectionOption

Server returns a ConnectionOption to put the connection in server mode for incoming connections.

A server connection will do protocol negotiation to accept a incoming AMQP connection. Normally you would call this for a connection created by net.Listener.Accept()

func User

func User(user string) ConnectionOption

User returns a ConnectionOption sets the user name for a connection

func VirtualHost

func VirtualHost(virtualHost string) ConnectionOption

VirtualHost returns a ConnectionOption to set the AMQP virtual host for the connection. Only applies to outbound client connection.

type ConnectionSettings

type ConnectionSettings interface {
	// Authenticated user name associated with the connection.
	User() string

	// The AMQP virtual host name for the connection.
	//
	// Optional, useful when the server has multiple names and provides different
	// service based on the name the client uses to connect.
	//
	// By default it is set to the DNS host name that the client uses to connect,
	// but it can be set to something different at the client side with the
	// VirtualHost() option.
	//
	// Returns error if the connection fails to authenticate.
	VirtualHost() string

	// Heartbeat is the maximum delay between sending frames that the remote peer
	// has requested of us. If the interval expires an empty "heartbeat" frame
	// will be sent automatically to keep the connection open.
	Heartbeat() time.Duration
}

Settings associated with a Connection.

type Container

type Container interface {
	// Id is a unique identifier for the container in your distributed application.
	Id() string

	// Connection creates a connection associated with this container.
	Connection(conn net.Conn, opts ...ConnectionOption) (Connection, error)

	// Dial is shorthand for
	//     conn, err := net.Dial(); c, err := Connection(conn, opts...)
	Dial(network string, addr string, opts ...ConnectionOption) (Connection, error)

	// Accept is shorthand for:
	//     conn, err := l.Accept(); c, err := Connection(conn, append(opts, Server()...)
	Accept(l net.Listener, opts ...ConnectionOption) (Connection, error)

	// String returns Id()
	String() string
}

Container is an AMQP container, it represents a single AMQP "application" which can have multiple client or server connections.

Each Container in a distributed AMQP application must have a unique container-id which is applied to its connections.

Create with NewContainer()

func NewContainer

func NewContainer(id string) Container

NewContainer creates a new container. The id must be unique in your distributed application, all connections created by the container will have this container-id.

If id == "" a random UUID will be generated for the id.

type Endpoint

type Endpoint interface {
	// Close an endpoint and signal an error to the remote end if error != nil.
	Close(error)

	// String is a human readable identifier, useful for debugging and logging.
	String() string

	// Error returns nil if the endpoint is open, otherwise returns an error.
	// Error() == Closed means the endpoint was closed without error.
	Error() error

	// Connection is the connection associated with this endpoint.
	Connection() Connection

	// Done returns a channel that will close when the endpoint closes.
	// After Done() has closed, Error() will return the reason for closing.
	Done() <-chan struct{}

	// Sync() waits for the remote peer to confirm the endpoint is active or
	// reject it with an error. You can call it immediately on new endpoints
	// for more predictable error handling.
	//
	// AMQP is an asynchronous protocol. It is legal to create an endpoint and
	// start using it without waiting for confirmation. This avoids a needless
	// delay in the non-error case and throughput by "assuming the best".
	//
	// However if there *is* an error, these "optimistic" actions will fail. The
	// endpoint and its children will be closed with an error. The error will only
	// be detected when you try to use one of these endpoints or call Sync()
	Sync() error
	// contains filtered or unexported methods
}

Endpoint is the local end of a communications channel to the remote peer process. The following interface implement Endpoint: Connection, Session, Sender and Receiver.

You can create an endpoint with functions on Container, Connection and Session. You can accept incoming endpoints from the remote peer using Connection.Incoming()

type Incoming

type Incoming interface {
	// Accept and open the endpoint.
	Accept() Endpoint

	// Reject the endpoint with an error
	Reject(error)
	// contains filtered or unexported methods
}

Incoming is the interface for incoming endpoints, see Connection.Incoming()

Call Incoming.Accept() to open the endpoint or Incoming.Reject() to close it with optional error

Implementing types are *IncomingConnection, *IncomingSession, *IncomingSender and *IncomingReceiver. Each type provides methods to examine the incoming endpoint request and set configuration options for the local endpoint before calling Accept() or Reject()

type IncomingConnection

type IncomingConnection struct {
	// contains filtered or unexported fields
}

func (*IncomingConnection) Accept

func (in *IncomingConnection) Accept() Endpoint

func (*IncomingConnection) AcceptConnection

func (in *IncomingConnection) AcceptConnection(opts ...ConnectionOption) Connection

AcceptConnection is like Accept() but takes ConnectionOption s For example you can set the Heartbeat() for the accepted connection.

func (IncomingConnection) Heartbeat

func (c IncomingConnection) Heartbeat() time.Duration

func (*IncomingConnection) Reject

func (in *IncomingConnection) Reject(err error)

func (*IncomingConnection) String

func (in *IncomingConnection) String() string

func (IncomingConnection) User

func (c IncomingConnection) User() string

func (IncomingConnection) VirtualHost

func (c IncomingConnection) VirtualHost() string

type IncomingReceiver

type IncomingReceiver struct {
	// contains filtered or unexported fields
}

IncomingReceiver is sent on the Connection.Incoming() channel when there is an incoming request to open a receiver link.

func (*IncomingReceiver) Accept

func (in *IncomingReceiver) Accept() Endpoint

Accept accepts an incoming receiver endpoint

func (*IncomingReceiver) IsReceiver

func (l *IncomingReceiver) IsReceiver() bool

func (*IncomingReceiver) IsSender

func (l *IncomingReceiver) IsSender() bool

func (*IncomingReceiver) LinkName

func (l *IncomingReceiver) LinkName() string

func (*IncomingReceiver) RcvSettle

func (l *IncomingReceiver) RcvSettle() RcvSettleMode

func (*IncomingReceiver) Reject

func (in *IncomingReceiver) Reject(err error)

func (*IncomingReceiver) SetCapacity

func (in *IncomingReceiver) SetCapacity(capacity int)

SetCapacity sets the capacity of the incoming receiver, call before Accept()

func (*IncomingReceiver) SetPrefetch

func (in *IncomingReceiver) SetPrefetch(prefetch bool)

SetPrefetch sets the pre-fetch mode of the incoming receiver, call before Accept()

func (*IncomingReceiver) SndSettle

func (l *IncomingReceiver) SndSettle() SndSettleMode

func (*IncomingReceiver) Source

func (l *IncomingReceiver) Source() string

func (*IncomingReceiver) String

func (in *IncomingReceiver) String() string

func (*IncomingReceiver) Target

func (l *IncomingReceiver) Target() string

type IncomingSender

type IncomingSender struct {
	// contains filtered or unexported fields
}

IncomingSender is sent on the Connection.Incoming() channel when there is an incoming request to open a sender link.

func (*IncomingSender) Accept

func (in *IncomingSender) Accept() Endpoint

Accept accepts an incoming sender endpoint

func (*IncomingSender) IsReceiver

func (l *IncomingSender) IsReceiver() bool

func (*IncomingSender) IsSender

func (l *IncomingSender) IsSender() bool

func (*IncomingSender) LinkName

func (l *IncomingSender) LinkName() string

func (*IncomingSender) RcvSettle

func (l *IncomingSender) RcvSettle() RcvSettleMode

func (*IncomingSender) Reject

func (in *IncomingSender) Reject(err error)

func (*IncomingSender) SndSettle

func (l *IncomingSender) SndSettle() SndSettleMode

func (*IncomingSender) Source

func (l *IncomingSender) Source() string

func (*IncomingSender) String

func (in *IncomingSender) String() string

func (*IncomingSender) Target

func (l *IncomingSender) Target() string

type IncomingSession

type IncomingSession struct {
	// contains filtered or unexported fields
}

IncomingSender is sent on the Connection.Incoming() channel when there is an incoming request to open a session.

func (*IncomingSession) Accept

func (in *IncomingSession) Accept() Endpoint

Accept an incoming session endpoint.

func (*IncomingSession) Reject

func (in *IncomingSession) Reject(err error)

func (*IncomingSession) SetIncomingCapacity

func (in *IncomingSession) SetIncomingCapacity(bytes uint)

SetIncomingCapacity sets the session buffer capacity of an incoming session in bytes.

func (*IncomingSession) SetOutgoingWindow

func (in *IncomingSession) SetOutgoingWindow(frames uint)

SetOutgoingWindow sets the session outgoing window of an incoming session in frames.

func (*IncomingSession) String

func (in *IncomingSession) String() string

type LinkOption

type LinkOption func(*linkSettings)

LinkOption can be passed when creating a sender or receiver link to set optional configuration.

func AtLeastOnce

func AtLeastOnce() LinkOption

AtLeastOnce returns a LinkOption that requests acknowledgment for every message, acknowledgment indicates the message was definitely received. In the event of a failure, unacknowledged messages can be re-sent but there is a chance that the message will be received twice in this case. Sets SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst

func AtMostOnce

func AtMostOnce() LinkOption

AtMostOnce returns a LinkOption that sets "fire and forget" mode, messages are sent but no acknowledgment is received, messages can be lost if there is a network failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst

func Capacity

func Capacity(n int) LinkOption

Capacity returns a LinkOption that sets the link capacity

func LinkName

func LinkName(s string) LinkOption

LinkName returns a LinkOption that sets the link name.

func Prefetch

func Prefetch(p bool) LinkOption

Prefetch returns a LinkOption that sets a receivers pre-fetch flag. Not relevant for a sender.

func RcvSettle

func RcvSettle(m RcvSettleMode) LinkOption

RcvSettle returns a LinkOption that sets the send settle mode

func SndSettle

func SndSettle(m SndSettleMode) LinkOption

SndSettle returns a LinkOption that sets the send settle mode

func Source

func Source(s string) LinkOption

Source returns a LinkOption that sets address that messages are coming from.

func Target

func Target(s string) LinkOption

Target returns a LinkOption that sets address that messages are going to.

type LinkSettings

type LinkSettings interface {
	// Source address that messages are coming from.
	Source() string

	// Target address that messages are going to.
	Target() string

	// Name is a unique name for the link among links between the same
	// containers in the same direction. By default generated automatically.
	LinkName() string

	// IsSender is true if this is the sending end of the link.
	IsSender() bool

	// IsReceiver is true if this is the receiving end of the link.
	IsReceiver() bool

	// SndSettle defines when the sending end of the link settles message delivery.
	SndSettle() SndSettleMode

	// RcvSettle defines when the sending end of the link settles message delivery.
	RcvSettle() RcvSettleMode

	// Session containing the Link
	Session() Session
}

Settings associated with a link

type Outcome

type Outcome struct {
	// Status of the message: was it sent, how was it acknowledged.
	Status SentStatus
	// Error is a local error if Status is Unsent or Unacknowledged, a remote error otherwise.
	Error error
	// Value provided by the application in SendAsync()
	Value interface{}
}

Outcome provides information about the outcome of sending a message.

type RcvSettleMode

type RcvSettleMode proton.RcvSettleMode

RcvSettleMode defines when the receiving end of the link settles message delivery.

type ReceivedMessage

type ReceivedMessage struct {
	// Message is the received message.
	Message amqp.Message
	// contains filtered or unexported fields
}

ReceivedMessage contains an amqp.Message and allows the message to be acknowledged.

func (*ReceivedMessage) Accept

func (rm *ReceivedMessage) Accept() error

Accept tells the sender that we take responsibility for processing the message.

func (*ReceivedMessage) Reject

func (rm *ReceivedMessage) Reject() error

Reject tells the sender we consider the message invalid and unusable.

func (*ReceivedMessage) Release

func (rm *ReceivedMessage) Release() error

Release tells the sender we will not process the message but some other receiver might.

type Receiver

type Receiver interface {
	Endpoint
	LinkSettings

	// Receive blocks until a message is available or until the Receiver is closed
	// and has no more buffered messages.
	Receive() (ReceivedMessage, error)

	// ReceiveTimeout is like Receive but gives up after timeout, see Timeout.
	//
	// Note that that if Prefetch is false, after a Timeout the credit issued by
	// Receive remains on the link. It will be used by the next call to Receive.
	ReceiveTimeout(timeout time.Duration) (ReceivedMessage, error)

	// Prefetch==true means the Receiver will automatically issue credit to the
	// remote sender to keep its buffer as full as possible, i.e. it will
	// "pre-fetch" messages independently of the application calling
	// Receive(). This gives good throughput for applications that handle a
	// continuous stream of messages. Larger capacity may improve throughput, the
	// optimal value depends on the characteristics of your application.
	//
	// Prefetch==false means the Receiver will issue only issue credit when you
	// call Receive(), and will only issue enough credit to satisfy the calls
	// actually made. This gives lower throughput but will not fetch any messages
	// in advance. It is good for synchronous applications that need to evaluate
	// each message before deciding whether to receive another. The
	// request-response pattern is a typical example.  If you make concurrent
	// calls to Receive with pre-fetch disabled, you can improve performance by
	// setting the capacity close to the expected number of concurrent calls.
	//
	Prefetch() bool

	// Capacity is the size (number of messages) of the local message buffer
	// These are messages received but not yet returned to the application by a call to Receive()
	Capacity() int
}

Receiver is a Link that receives messages.

type Sender

type Sender interface {
	Endpoint
	LinkSettings

	// SendSync sends a message and blocks until the message is acknowledged by the remote receiver.
	// Returns an Outcome, which may contain an error if the message could not be sent.
	SendSync(m amqp.Message) Outcome

	// SendWaitable puts a message in the send buffer and returns a channel that
	// you can use to wait for the Outcome of just that message. The channel is
	// buffered so you can receive from it whenever you want without blocking.
	//
	// Note: can block if there is no space to buffer the message.
	SendWaitable(m amqp.Message) <-chan Outcome

	// SendForget buffers a message for sending and returns, with no notification of the outcome.
	//
	// Note: can block if there is no space to buffer the message.
	SendForget(m amqp.Message)

	// SendAsync puts a message in the send buffer and returns immediately.  An
	// Outcome with Value = value will be sent to the ack channel when the remote
	// receiver has acknowledged the message or if there is an error.
	//
	// You can use the same ack channel for many calls to SendAsync(), possibly on
	// many Senders. The channel will receive the outcomes in the order they
	// become available. The channel should be buffered and/or served by dedicated
	// goroutines to avoid blocking the connection.
	//
	// If ack == nil no Outcome is sent.
	//
	// Note: can block if there is no space to buffer the message.
	SendAsync(m amqp.Message, ack chan<- Outcome, value interface{})

	SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, value interface{}, timeout time.Duration)

	SendWaitableTimeout(m amqp.Message, timeout time.Duration) <-chan Outcome

	SendForgetTimeout(m amqp.Message, timeout time.Duration)

	SendSyncTimeout(m amqp.Message, timeout time.Duration) Outcome
}

Sender is a Link that sends messages.

The result of sending a message is provided by an Outcome value.

A sender can buffer messages up to the credit limit provided by the remote receiver. All the Send* methods will block if the buffer is full until there is space. Send*Timeout methods will give up after the timeout and set Timeout as Outcome.Error.

type SentStatus

type SentStatus int

SentStatus indicates the status of a sent message.

const (
	// Message was never sent
	Unsent SentStatus = iota
	// Message was sent but never acknowledged. It may or may not have been received.
	Unacknowledged
	// Message was accepted by the receiver (or was sent pre-settled, accept is assumed)
	Accepted
	// Message was rejected as invalid by the receiver
	Rejected
	// Message was not processed by the receiver but may be valid for a different receiver
	Released
	// Receiver responded with an unrecognized status.
	Unknown
)

func (SentStatus) String

func (s SentStatus) String() string

String human readable name for SentStatus.

type Session

type Session interface {
	Endpoint

	// Sender opens a new sender.
	Sender(...LinkOption) (Sender, error)

	// Receiver opens a new Receiver.
	Receiver(...LinkOption) (Receiver, error)
}

Session is an AMQP session, it contains Senders and Receivers.

type SessionOption

type SessionOption func(*session)

SessionOption can be passed when creating a Session

func IncomingCapacity

func IncomingCapacity(bytes uint) SessionOption

IncomingCapacity returns a Session Option that sets the size (in bytes) of the session's incoming data buffer.

func OutgoingWindow

func OutgoingWindow(frames uint) SessionOption

OutgoingWindow returns a Session Option that sets the outgoing window size (in frames).

type SndSettleMode

type SndSettleMode proton.SndSettleMode

SndSettleMode returns a LinkOption that defines when the sending end of the link settles message delivery.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL