Documentation ¶
Overview ¶
Package rinq is a cross-language command bus and distributed ephemeral data store.
Example (MathService) ¶
This example shows how to issue a command call from one peer to another.
There is a "server" peer, which performs basic mathematical operations, and a "client" peer which invokes those operations.
In the example both the client peer and the server peer are running in the same process. Outside of an example, these peers would typically be running on separate servers.
//go:build !without_amqp && !without_examples // +build !without_amqp,!without_examples package main import ( "context" "fmt" "github.com/rinq/rinq-go/src/rinq" "github.com/rinq/rinq-go/src/rinqamqp" ) // arguments contains the parameters for the commands in the "math" namespace type arguments struct { Left, Right int } // mathHandler is the command handler for the "math" namespace func mathHandler( ctx context.Context, req rinq.Request, res rinq.Response, ) { defer req.Payload.Close() // decode the request payload into the arguments struct var args arguments if err := req.Payload.Decode(&args); err != nil { res.Fail("invalid-arguments", "could not decode arguments") return } var result int switch req.Command { case "add": result = args.Left + args.Right case "sub": result = args.Left - args.Right default: res.Fail("unknown-command", "no such command: "+req.Command) return } // send the result in the response payload payload := rinq.NewPayload(result) defer payload.Close() res.Done(payload) } // This example shows how to issue a command call from one peer to another. // // There is a "server" peer, which performs basic mathematical operations, // and a "client" peer which invokes those operations. // // In the example both the client peer and the server peer are running in the // same process. Outside of an example, these peers would typically be running // on separate servers. func main() { // create a new peer to act as the "server" and start listening for commands // in the "math" namespace. serverPeer, err := rinqamqp.DialEnv() if err != nil { panic(err) } defer serverPeer.Stop() serverPeer.Listen("math", mathHandler) // create a new peer to act as the "client", and a session to make the // call. clientPeer, err := rinqamqp.DialEnv() if err != nil { panic(err) } defer clientPeer.Stop() sess := clientPeer.Session() defer sess.Destroy() // call the "math::add" command ctx := context.Background() args := rinq.NewPayload(arguments{1, 2}) result, err := sess.Call(ctx, "math", "add", args) if err != nil { panic(err) } fmt.Printf("1 + 2 = %s\n", result) }
Output: 1 + 2 = 3
Index ¶
- Constants
- func FailureType(err error) string
- func IsCommandError(err error) bool
- func IsFailure(err error) bool
- func IsFailureType(t string, err error) bool
- func IsNotFound(err error) bool
- func ShouldRetry(err error) bool
- type AsyncHandler
- type Attr
- type AttrTable
- type CommandError
- type CommandHandler
- type Failure
- type FrozenAttributesError
- type NotFoundError
- type Notification
- type NotificationHandler
- type Payload
- type Peer
- type Request
- type Response
- type Revision
- type Session
- type StaleFetchError
- type StaleUpdateError
Examples ¶
Constants ¶
const Version = "0.7.0"
Version is the rinq-go library version.
Variables ¶
This section is empty.
Functions ¶
func FailureType ¶
FailureType returns the failure type of err; or an empty string if err is not a Failure.
func IsCommandError ¶
IsCommandError returns true if err was sent in response to a command request, as opposed to a local error that occurred when attempting to send the request.
func IsFailureType ¶
IsFailureType returns true if err is a Failure with a type of t.
func IsNotFound ¶
IsNotFound returns true if err is a NotFoundError.
func ShouldRetry ¶
ShouldRetry returns true if a call to Revision.Get(), GetMany(), Update() or Destroy() failed because the revision is out of date.
The operation should be retried on the latest revision of the session, which can be retrieved with Revision.Refresh().
Types ¶
type AsyncHandler ¶
type AsyncHandler func( ctx context.Context, sess Session, msgID ident.MessageID, ns, cmd string, in *Payload, err error, )
AsyncHandler is a call-back function invoked when a response is received to a command call made with Session.CallAsync()
If err is non-nil, it always represents a server-side error.
IsFailure(err) returns true if the error is an application-defined failure. Failures are server-side errors that are part of the command's public API, as opposed to unexpected errors. If err is a failure, in contains the failure's application-defined payload; for this reason in.Close() must be called, even if err is non-nil.
The handler is responsible for closing the in payload, however there is no requirement that the payload be closed during the execution of the handler.
type Attr ¶
type Attr struct { // Key is an application-defined identifier for the attribute. Keys are // unique within a session. Any valid UTF-8 string can be used a key, // including the empty string. Key string `json:"k"` // Value is the attribute's value. Any valid UTF-8 string can be used as a // value, including the empty string. Value string `json:"v,omitempty"` // IsFrozen is true if the attribute is "frozen" such that it can never be // altered again (for a given session). IsFrozen bool `json:"f,omitempty"` }
Attr is a sesssion attribute.
Sessions contain a versioned key/value store. See the Session interface for more information.
func Freeze ¶
Freeze is a convenience method that returns an Attr with the specified key and value, and the IsFrozen flag set to true.
Example ¶
attr := Freeze("foo", "bar") fmt.Println(attr)
Output: foo@bar
type AttrTable ¶
type AttrTable interface { // Get returns the attribute with key k. Get(k string) (Attr, bool) // Each calls fn for each attribute in the collection. Iteration stops // when fn returns false. Each(fn func(Attr) bool) // IsEmpty returns true if there are no attributes in the table. IsEmpty() bool String() string }
AttrTable is a read-only table of session attributes.
type CommandError ¶
type CommandError string
CommandError is an error (as opposed to a Failure) sent in response to a command.
func (CommandError) Error ¶
func (err CommandError) Error() string
type CommandHandler ¶
CommandHandler is a callback-function invoked when a command request is received by the peer.
Command requests can only be received for namespaces that a peer is listening to. See Peer.Listen() to start listening.
The handler MUST close the response by calling res.Done(), res.Error() or res.Close(); otherwise the request may be redelivered, possibly to a different peer.
The handler is responsible for closing req.Payload, however there is no requirement that the payload be closed during the execution of the handler.
type Failure ¶
type Failure struct { // Type is an application-defined string identifying the failure. // They serve the same purpose as an error code. They should be concise // and easily understandable within the context of the application's API. Type string // Message is an optional human-readable description of the failure. Message string // Payload is an optional application-defined payload. Payload *Payload }
Failure is an application-defined command error.
Failures are used to indicate an error that is "expected" within the domain of the command that produced it. Failures form part of the command's API and should usually be handled by the caller.
Failures can be produced in a command handler by calling Response.Fail() or passing a Failure value to Response.Error().
type FrozenAttributesError ¶
FrozenAttributesError indicates a failure to update a session because at least one of the attributes being updated is frozen.
func (FrozenAttributesError) Error ¶
func (err FrozenAttributesError) Error() string
type NotFoundError ¶
NotFoundError indicates that an operation failed because the session does not exist.
func (NotFoundError) Error ¶
func (err NotFoundError) Error() string
type Notification ¶
type Notification struct { // ID uniquely identifies the notification. ID ident.MessageID // Source refers to the session that sent the notification. Source Revision // Namespace is the notification namespace. Namespaces are used to route // notifications to only those sessions that intend to handle them. Namespace string // Type is an application-defined notification type. Type string // Payload contains optional application-defined information. The handler // that accepts the notifiation is responsible for closing the payload, // however there is no requirement that the payload be closed during the // execution of the handler. Payload *Payload // IsMulticast is true if the notification was (potentially) sent to more // than one session. IsMulticast bool // For multicast notifications, Constraint contains the attributes used as // criteria for selecting which sessions receive the notification. The // constraint is nil if IsMulticast is false. Constraint constraint.Constraint }
Notification holds information about an inter-session notification.
type NotificationHandler ¶
type NotificationHandler func( ctx context.Context, target Session, n Notification, )
NotificationHandler is a callback-function invoked when an inter-session notification is received.
Notifications can only be received for namespaces that a session is listening to. See Session.Listen() to start listening.
The handler is responsible for closing n.Payload, however there is no requirement that the payload be closed during the execution of the handler.
type Payload ¶
type Payload struct {
// contains filtered or unexported fields
}
Payload is an immutable, application-defined value that is included in a command request, command response, or inter-session notification.
A nil-payload pointer is equivalent to a payload with a value of nil.
Payloads must be closed by the application when no longer required. This includes payloads constructed by calling NewPayload() or NewPayloadFromBytes(), as well as any payload returned by a Rinq operation (such as Session.Call()), or passed to a callback function that was provided by the application.
Payloads are NOT safe for concurrent use. To share a payload across multiple goroutines, call Payload.Clone() to obtain a second payload that references the same underlying data.
Payload values can be any value that can be represented using CBOR encoding. See http://cbor.io/ for more information.
Payloads are modeled in this way to allow an application to forward incoming payloads without the need to decode and re-encode them.
func NewPayload ¶
func NewPayload(v interface{}) *Payload
NewPayload creates a new payload from an arbitrary value.
func NewPayloadFromBytes ¶
NewPayloadFromBytes creates a new payload from a binary representation. Ownership of the byte-slice is transferred to the payload. An empty byte-slice is equivalent to the nil value.
func (*Payload) Bytes ¶
Bytes returns the binary representation of the payload, in CBOR encoding.
The returned byte-slice is invalidated when the payload is closed, it must be copied if it is intended to be used for longer than the lifetime of the payload.
If the payload was created from a non-empty byte-slice, the return value is always that same byte-slice, unless the payload has been closed.
If the payload was created from a nil value, the returned byte-slice is nil.
func (*Payload) Close ¶
func (p *Payload) Close()
Close releases any resources held by the payload, resetting the payload to represent the nil value.
func (*Payload) Len ¶
Len returns the encoded payload length, in bytes. A length of zero indicates a nil payload value.
type Peer ¶
type Peer interface { // ID returns the peer's unique identifier. ID() ident.PeerID // Session returns a new session owned by this peer. // // Creating a session does not perform any network IO. The only limit to the // number of sessions is the memory required to store them. // // Sessions created after the peer has been stopped are unusable. Any // operation will fail immediately. Session() Session // Listen starts listening for command requests in the given namespace. // // When a command request is received with a namespace equal to ns, the // handler h is invoked. // // Repeated calls to Listen() with the same namespace simply changes the // handler associated with that namespace. // // h is invoked on its own goroutine for each command request. Listen(ns string, h CommandHandler) error // Unlisten stops listening for command requests in the given namepsace. // // If the peer is not currently listening to ns, nil is returned immediately. Unlisten(ns string) error // Done returns a channel that is closed when the peer is stopped. // // Err() may be called to obtain the error that caused the peer to stop, if // any occurred. Done() <-chan struct{} // Err returns the error that caused the Done() channel to close. // // A nil return value indicates that the peer was stopped because Stop() or // GracefulStop() has been called. Err() error // Stop instructs the peer to disconnect from the network immediately. // // Stop does NOT block until the peer is disconnected. Use the Done() // channel to wait for the peer to disconnect. Stop() // GracefulStop() instructs the peer to disconnect from the network once // all pending operations have completed. // // Any calls to Session.Call(), command handlers or notification handlers // must return before the peer has stopped. // // GracefulStop does NOT block until the peer is disconnected. Use the // Done() channel to wait for the peer to disconnect. GracefulStop() }
Peer represents a connection to a Rinq network.
Peers can act as a server, responding to application-defined commands. Use Peer.Listen() to start accepting incoming command requests.
Command request are sent by sessions, represented by the Session interface. Sessions can also send notifications to other sessions. Sessions are created by calling Peer.Session().
Each peer is assigned a unique ID, which is represented by the PeerID struct. All IDs generated by the peer, such as session IDs and message IDs contain the peer ID, so that they can be traced to their origin easily.
Example (Listen) ¶
This example illustrates how to listen for incoming command requests.
peer, err := rinqamqp.DialEnv() if err != nil { panic(err) } defer peer.Stop() peer.Listen("my-api", func( ctx context.Context, req Request, res Response, ) { defer req.Payload.Close() // handle the command res.Close() }) if false { // prevent the example from blocking forever. <-peer.Done() }
Output:
Example (Session) ¶
This example illustrates how to establish a new session.
peer, err := rinqamqp.DialEnv() if err != nil { panic(err) } defer peer.Stop() sess := peer.Session() defer sess.Destroy() fmt.Printf("created session #%d\n", sess.ID().Seq)
Output: created session #1
type Request ¶
type Request struct { // ID uniquely identifies the command request. ID ident.MessageID // Source is the revision of the session that sent the request, at the time // it was sent (which is not necessarily the latest). Source Revision // Namespace is the command namespace. Namespaces are used to route command // requests to the appropriate peer and command handler. Namespace string // Command is the application-defined command name for the request. The // command is logged for each request. Command string // Payload contains optional application-defined information about the // request, such as arguments to the command. The handler that accepts the // request is responsible for closing the payload, however there is no // requirement that the payload be closed during the execution of the handler. Payload *Payload }
Request holds information about an incoming command request.
type Response ¶
type Response interface { // IsRequired returns true if the sender is waiting for the response. // // If the response is not required, any payload data sent is discarded. // The response must always be closed, even if IsRequired() returns false. IsRequired() bool // IsClosed returns true if the response has already been closed. IsClosed() bool // Done sends a payload to the source session and closes the response. // // A panic occurs if the response has already been closed. Done(*Payload) // Error sends an error to the source session and closes the response. // // A panic occurs if the response has already been closed. Error(error) // Fail is a convenience method that creates a Failure and passes it to the // Error() method. The created failure is returned. // // The failure type t is used verbatim. The failure message is formatted // according to the format specifier f, interpolated with values from v. // // A panic occurs if the response has already been closed or if t is empty. Fail(t, f string, v ...interface{}) Failure // Close finalizes the response. // // If the origin session is expecting response it will receive a nil payload. // // It is not an error to close a response multiple times. The return value // is true the first time Close() is called, and false on subsequent calls. Close() bool }
Response sends a reply to incoming command requests.
Example (Fail) ¶
This example illustrates how to respond to a command request with an application-defined failure.
peer, err := rinqamqp.DialEnv() if err != nil { panic(err) } defer peer.Stop() peer.Listen("my-api", func( ctx context.Context, req Request, res Response, ) { defer req.Payload.Close() res.Fail( "my-api-error", "the call to %s failed spectacularly!", req.Command, ) }) sess := peer.Session() defer sess.Destroy() in, err := sess.Call(context.Background(), "my-api", "test", nil) defer in.Close() fmt.Println(err)
Output: my-api-error: the call to test failed spectacularly!
type Revision ¶
type Revision interface { // SessionID returns the ID of the underlying session. SessionID() ident.SessionID // Refresh returns the latest revision of the session. // // If the session has been destroyed, err is nil, but any operations on rev // will return an error e such that IsNotFound(e) is true. This means that // any non-nil err indicates an unexpected error occurred when querying the // session state. Refresh(ctx context.Context) (rev Revision, err error) // Get returns the attribute with key k within the ns namespace of the // attribute table. // // The returned attribute is guaranteed to be correct as of Ref().Rev. // Non-existent attributes are equivalent to empty attributes, therefore it // is not an error to request a key that has never been created. // // Peers do not always have a complete copy of the attribute table. If the // attribute value is unknown it is fetched from the owning peer. // // If the attribute can not be retrieved because it has already been // modified, ShouldRetry(err) returns true. To fetch the attribute value at // the later revision, first call Refresh() then retry the Get() on the // newer revision. // // If IsNotFound(err) returns true, the session has been destroyed and the // revision can not be queried. Get(ctx context.Context, ns, k string) (attr Attr, err error) // GetMany returns the attributes with keys in k within the ns namespace of // the attribute table. // // The returned attributes are guaranteed to be correct as of Ref().Rev. // Non-existent attributes are equivalent to empty attributes, therefore it // is not an error to request keys that have never been created. // // Peers do not always have a complete copy of the attribute table. If any // of the attribute values are unknown they are fetched from the owning peer. // // If any of the attributes can not be retrieved because they have already // been modified, ShouldRetry(err) returns true. To fetch the attribute // values at the later revision, first call Refresh() then retry the // GetMany() on the newer revision. // // If IsNotFound(err) returns true, the session has been destroyed and the // revision can not be queried. // // If err is nil, t contains all of the attributes specified in k. GetMany(ctx context.Context, ns string, k ...string) (t AttrTable, err error) // Update atomically modifies a set of attributes within the ns namespace of // the attribute table. // // A successful update produces a new revision. // // Each update is atomic; either all of the attributes in attrs are updated, // or the attribute table remains unchanged. On success, rev is the newly // created revision. // // The following conditions must be met for an update to succeed: // // 1. The session revision represented by this instance must be the latest // revision. If Ref().Rev is not the latest revision the update fails; // ShouldRetry(err) returns true. // // 2. All attribute changes must reference non-frozen attributes. If any of // attributes being updated are already frozen the update fails and // ShouldRetry(err) returns false. // // If attrs is empty no update occurs, rev is this revision and err is nil. // // As a convenience, if the update fails for any reason, rev is this // revision. This allows the caller to assign the return value to an // existing variable without first checking for errors. Update(ctx context.Context, ns string, attrs ...Attr) (rev Revision, err error) // Clear is an update operation that atomically sets the value of each // attribute within the ns namespace to the empty string. // // The sematics are the same as for Update(). This means the operation fails // if ANY attribute in the ns namespace is frozen. // // As a convenience, if the clear operation fails for any reason, rev is // this revision. This allows the caller to assign the return value to an // existing variable without first checking for errors. Clear(ctx context.Context, ns string) (rev Revision, err error) // Destroy terminates the session. // // The session revision represented by this instance must be the latest // revision. If Ref().Rev is not the latest revision the destroy fails; // ShouldRetry(err) returns true. Destroy(ctx context.Context) (err error) }
Revision represents a specific revision of a session.
Revision is the sole interface for manipulating a session's attribute table.
The underlying session may be "local", i.e. owned by a peer running in this process, or "remote", owned by a different peer.
For remote sessions, operations may require network IO. Deadlines are honored for all methods that accept a context.
Example (Get) ¶
This example illustrates how to read an attribute from a session.
It includes logic necessary to fetch the attribute even if the Revision in use is out-of-date, by retrying on the latest revision.
peer, err := rinqamqp.DialEnv() if err != nil { panic(err) } defer peer.Stop() sess := peer.Session() defer sess.Destroy() rev := sess.CurrentRevision() ctx := context.Background() var attr Attr for { attr, err = rev.Get(ctx, "my-api", "user-id") if err != nil { if ShouldRetry(err) { // the attribute could not be fetched because it has been // updated since rev was obtained rev, err = rev.Refresh(ctx) if err == nil { continue } } panic(err) } break } if attr.Value == "" { fmt.Println("user is not logged in") }
Output: user is not logged in
Example (Update) ¶
This example illustrates how to modify an attribute in a session.
It includes logic to retry in the face of an optimistic-lock failure, which occurs if the revision is out of date.
peer, err := rinqamqp.DialEnv() if err != nil { panic(err) } defer peer.Stop() sess := peer.Session() defer sess.Destroy() rev := sess.CurrentRevision() ctx := context.Background() for { rev, err = rev.Update(ctx, "my-api", Set("user-id", "123")) if err != nil { if ShouldRetry(err) { // the session could not be updated because rev is out of date rev, err = rev.Refresh(ctx) if err == nil { continue } } panic(err) } fmt.Println("updated to new revision") break }
Output: updated to new revision
type Session ¶
type Session interface { // ID returns the session's unique identifier. ID() ident.SessionID // CurrentRevision returns the current revision of this session. CurrentRevision() Revision // Call sends a command request to the next available peer listening to the // ns namespace and waits for a response. // // In the context of the call, the sessions owning peer is the "client" and // the listening peer is the "server". The client and server may be the same // peer. // // cmd and out are an application-defined command name and request payload, // respectively. Both are passed to the command handler on the server. // // Calls always use a deadline; if ctx does not have a deadline, a timeout // described by options.DefaultTimeout() is used. // // If the call completes successfully, err is nil and in is the // application-defined response payload sent by the server. // // If err is non-nil, it may represent either client-side error or a // server-side error. IsServerError(err) returns true if the error occurred // on the server. // // IsFailure(err) returns true if the error is an application-defined // failure. Failures are server-side errors that are part of the command's // public API, as opposed to unexpected errors. If err is a failure, out // contains the failure's application-defined payload; for this reason // out.Close() must always be called, even if err is non-nil. // // If IsNotFound(err) returns true, the session has been destroyed and the // command request can not be sent. Call(ctx context.Context, ns, cmd string, out *Payload) (in *Payload, err error) // CallAync sends a command request to the next available peer listening to // the ns namespace and instructs it to send a response, but does not block. // // cmd and out are an application-defined command name and request payload, // respectively. Both are passed to the command handler on the server. // // id is a value identifying the outgoing command request. // // When a response is received, the handler specified by SetAsyncHandler() // is invoked. It is passed the id, namespace and command name of the // request, along with the response payload and error. // // It is the application's responsibility to correlate the request with the // response and handle the context deadline. The request is NOT tracked by // the session and as such the handler is never invoked in the event of a // timeout. // // If IsNotFound(err) returns true, the session has been destroyed and the // command request can not be sent. CallAsync(ctx context.Context, ns, cmd string, out *Payload) (id ident.MessageID, err error) // SetAsyncHandler sets the asynchronous call handler. // // h is invoked for each command response received to a command request made // with CallAsync(). // // If IsNotFound(err) returns true, the session has been destroyed and the // handler can not be set. SetAsyncHandler(h AsyncHandler) error // Execute sends a command request to the next available peer listening to // the ns namespace and returns immediately. // // cmd and out are an application-defined command name and request payload, // respectively. Both are passed to the command handler on the server. // // If IsNotFound(err) returns true, the session has been destroyed and the // command request can not be sent. Execute(ctx context.Context, ns, cmd string, out *Payload) (err error) // Notify sends a message directly to another session listening to the ns // namespace. // // t and out are an application-defined notification type and payload, // respectively. Both are passed to the notification handler configured on // the session identified by s. // // If IsNotFound(err) returns true, this session has been destroyed and the // notification can not be sent. Notify(ctx context.Context, ns, t string, s ident.SessionID, out *Payload) (err error) // NotifyMany sends a message to multiple sessions that are listening to the // ns namespace. // // The constraint c is a set of attribute key/value pairs that a session // must have in the ns namespace of its attribute table in order to receive // the notification. // // t and out are an application-defined notification type and payload, // respectively. Both are passed to the notification handlers configured on // those sessions that match c. // // If IsNotFound(err) returns true, this session has been destroyed and the // notification can not be sent. NotifyMany(ctx context.Context, ns, t string, c constraint.Constraint, out *Payload) error // Listen begins listening for notifications sent to this session in the ns // namespace. // // When a notification is received with a namespace equal to ns, h is invoked. // // h is invoked on its own goroutine for each notification. Listen(ns string, h NotificationHandler) error // Unlisten stops listening for notifications from the ns namespace. // // If the session is not currently listening for notifications, nil is // returned immediately. Unlisten(ns string) error // Destroy terminates the session. // // Destroy does NOT block until the session is destroyed, use the // Session.Done() channel to wait for the session to be destroyed. Destroy() // Done returns a channel that is closed when the session is destroyed and // any pending Session.Call() operations have completed. // // The session may be destroyed directly with Destroy(), or via a Revision // that refers to this session, either locally or remotely. // // All sessions are destroyed when their owning peer is stopped. Done() <-chan struct{} }
Session is an interface representing a "local" session, that is, a session created by a peer running in this process.
Sessions are the "clients" on a Rinq network, able to issue command requests and send notifications to other sessions.
Sessions are created by calling Peer.Session(). The peer that creates a session is called the "owning peer".
Each session has an in-memory attribute table, which can be used to store application-defined key/value pairs. A session's attribute table can be modified locally, as well as remotely by peers that have received a command request or notification from the session.
The attribute table is namespaced. Any operation performed on the attribute table occurs within a single namespace.
The attribute table is versioned. Each revision of the attribute table is represented by the Revision interface.
An optimistic-locking strategy is employed to protect the attribute table against concurrent writes. In order for a write to succeed, it must be made through a Revision value that represents the current (most recent) revision.
Individual attributes in the table can be "frozen", preventing any further changes to that attribute.
Example (CallAsync) ¶
This example shows how to make an asynchronous command call.
peer, err := rinqamqp.DialEnv() if err != nil { panic(err) } defer peer.Stop() // listen for command requests peer.Listen("my-api", func( ctx context.Context, req Request, res Response, ) { defer req.Payload.Close() payload := NewPayload("<payload>") defer payload.Close() res.Done(payload) }) sess := peer.Session() defer sess.Destroy() // setup the asynchronous response handler if err := sess.SetAsyncHandler(func( ctx context.Context, s Session, _ ident.MessageID, ns, cmd string, in *Payload, err error, ) { defer in.Close() peer.Stop() fmt.Printf("received %s::%s response with %s payload\n", ns, cmd, in.Value()) }); err != nil { panic(err) } // send the command request if _, err := sess.CallAsync( context.Background(), "my-api", "test", nil, ); err != nil { panic(err) } <-peer.Done()
Output: received my-api::test response with <payload> payload
Example (Notify) ¶
This example shows how to send a notification from one session to another.
peer, err := rinqamqp.DialEnv() if err != nil { panic(err) } defer peer.Stop() // create a session to receive the notification recv := peer.Session() defer recv.Destroy() if err := recv.Listen( "my-api", func( ctx context.Context, target Session, n Notification, ) { defer n.Payload.Close() peer.Stop() fmt.Printf("received %s::%s with %s payload\n", n.Namespace, n.Type, n.Payload.Value()) }, ); err != nil { panic(err) } // create a session to send the notification to recv send := peer.Session() defer send.Destroy() payload := NewPayload("<payload>") defer payload.Close() if err := send.Notify( context.Background(), "my-api", "<type>", recv.ID(), payload, ); err != nil { panic(err) } <-peer.Done()
Output: received my-api::<type> with <payload> payload
Example (NotifyMany) ¶
This example shows how to send a notification from one session to several sessions that contain specific attribute values.
peer, err := rinqamqp.DialEnv() if err != nil { panic(err) } // create three sessions for receiving notifications recv1 := peer.Session() recv2 := peer.Session() recv3 := peer.Session() // create a notification handler that stops the peer once TWO notifications // have been received var recvCount int32 handler := func(ctx context.Context, target Session, n Notification) { defer n.Payload.Close() if target == recv3 { panic("message delivered to unexpected session") } fmt.Printf("received %s::%s with %s payload\n", n.Namespace, n.Type, n.Payload.Value()) if atomic.AddInt32(&recvCount, 1) == 2 { peer.Stop() } } // configure all three sessions to listen for notifications for _, s := range []Session{recv1, recv2, recv3} { if err := s.Listen("my-api", handler); err != nil { panic(err) } } // update the first TWO sessions with a "foo" attribute for _, s := range []Session{recv1, recv2} { rev := s.CurrentRevision() if _, err := rev.Update( context.Background(), "my-api", Freeze("foo", "bar"), ); err != nil { panic(err) } } // create a session to send the notification to recv send := peer.Session() payload := NewPayload("<payload>") defer payload.Close() // constrain the notification to only those sessions that have a "foo" // attribute with a value of "bar" con := constraint.Equal("foo", "bar") if err := send.NotifyMany( context.Background(), "my-api", "<type>", con, payload, ); err != nil { panic(err) } <-peer.Done()
Output: received my-api::<type> with <payload> payload received my-api::<type> with <payload> payload
type StaleFetchError ¶
StaleFetchError indicates a failure to fetch an attribute for a specific revision because it has been modified after that revision.
func (StaleFetchError) Error ¶
func (err StaleFetchError) Error() string
type StaleUpdateError ¶
StaleUpdateError indicates a failure to update or destroy a session because the session has been modified after that revision.
func (StaleUpdateError) Error ¶
func (err StaleUpdateError) Error() string
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package constraint provides functions for building session constraints for use with rinq.Session.NotifyMany().
|
Package constraint provides functions for building session constraints for use with rinq.Session.NotifyMany(). |
Package ident contains types that represent various Rinq identifiers.
|
Package ident contains types that represent various Rinq identifiers. |
Package options defines options that can be customized when creating a Peer.
|
Package options defines options that can be customized when creating a Peer. |
Package trace provides functions for configuring custom trace identifiers.
|
Package trace provides functions for configuring custom trace identifiers. |