client

package
v3.0.0-...-0d2fc6f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2022 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package client provides a WAMP client implementation that is interoperable with any standard WAMP router and is capable of using all of the advanced profile features supported by the nexus WAMP router.

Index

Examples

Constants

View Source
const (
	JSON          = serialize.JSON
	MSGPACK       = serialize.MSGPACK
	CBOR          = serialize.CBOR
	WampPPTScheme = "wamp"
	MqttPPTScheme = "mqtt"
)

Variables

View Source
var (
	ErrAlreadyClosed           = errors.New("already closed")
	ErrCallerNoProg            = errors.New("caller not accepting progressive results")
	ErrNotConn                 = errors.New("not connected")
	ErrNotRegistered           = errors.New("not registered for procedure")
	ErrNotSubscribed           = errors.New("not subscribed to topic")
	ErrReplyTimeout            = errors.New("timeout waiting for reply")
	ErrRouterNoRoles           = errors.New("router did not announce any supported roles")
	ErrPPTNotSupportedByRouter = errors.New("payload passthru mode is not supported by the router")
	ErrPPTNotSupportedByPeer   = errors.New("peer is trying to use Payload PassThru Mode while it was not announced during HELLO handshake") //nolint:lll
	ErrPPTSchemeInvalid        = errors.New("ppt scheme provided is invalid")
	ErrPPTSerializerInvalid    = errors.New("ppt serializer provided is invalid or not supported")
	ErrSerialization           = errors.New("can not serialize/deserialize payload")
)
View Source
var E2eeSerializers = map[string]serialize.Serialization{"cbor": CBOR}
View Source
var InvocationCanceled = InvokeResult{Err: wamp.ErrCanceled}

InvocationCanceled is returned from an InvocationHandler to indicate that the invocation was canceled.

View Source
var PPTSerializers = map[string]serialize.Serialization{"json": JSON, "msgpack": MSGPACK, "cbor": CBOR}

Functions

func CookieURL

func CookieURL(routerURL string) (*url.URL, error)

CookieURL takes a websocket URL string and outputs a url.URL that can be used to retrieve cookies from a http.CookieJar as may be provided in Config.WsCfg.Jar.

Types

type AuthFunc

type AuthFunc func(challenge *wamp.Challenge) (signature string, details wamp.Dict)

AuthFunc takes the CHALLENGE message and returns the signature string and any WELCOME message details. If the signature is accepted, the details are used to populate the welcome message, as well as the session attributes.

In response to a CHALLENGE message, the Client MUST send an AUTHENTICATE message. Therefore, AuthFunc does not return an error. If an error is encountered within AuthFunc, then an empty signature should be returned since the client cannot give a valid signature response.

This is used in the AuthHandler map, in a Config, and is used when the client joins a realm.

type Client

type Client struct {
	// contains filtered or unexported fields
}

A Client routes messages to/from a WAMP router.

func ConnectLocal

func ConnectLocal(router router.Router, cfg Config) (*Client, error)

ConnectLocal creates a new client directly connected to the router instance. This is used to connect clients, embedded in the same application as the router, to the router. Doing this eliminates the need for any socket of serialization overhead, and does not require authentication. The new client joins the realm specified in the Config.

Example
// Create router that local client attaches to.
routerConfig := &router.Config{
	RealmConfigs: []*router.RealmConfig{
		{
			URI:           wamp.URI("nexus.realm1"),
			AnonymousAuth: true,
		},
	},
}
r, err := router.NewRouter(routerConfig, nil)
if err != nil {
	log.Fatal(err)
}

// Configure and connect local client.
c, err := ConnectLocal(r, Config{Realm: "nexus.realm1"})
if err != nil {
	log.Fatal(err)
}
defer c.Close()
Output:

func ConnectNet

func ConnectNet(ctx context.Context, routerURL string, cfg Config) (*Client, error)

ConnectNet creates a new client connected a WAMP router over a websocket, TCP socket, or unix socket. The new client joins the realm specified in the Config. The context may be used to cancel or timeout connecting to a router.

For websocket clients, the routerURL has the form "ws://host:port/" or "wss://host:port/", for websocket or websocket with TLS respectively. The scheme "http" is interchangeable with "ws", and "https" is interchangeable with "wss". The host:port portion is the same as for a TCP client.

For TCP clients, the router URL has the form "tcp://host:port/" or "tcps://host:port/", for TCP socket or TCP socket with TLS respectively. The host must be a literal IP address, or a host name that can be resolved to IP addresses. The port must be a literal port number or a service name. If the host is a literal IPv6 address it must be enclosed in square brackets, as in "[2001:db8::1]:80". For details, see: https://golang.org/pkg/net/#Dial

For Unix socket clients, the routerURL has the form "unix://path". The path portion specifies a path on the local file system where the Unix socket is created. TLS is not used for unix sockets.

Example
// Configure and connect client.
ctx := context.Background()
c, err := ConnectNet(ctx, "unix:///tmp/app.sock", Config{Realm: "nexus.realm1"})
if err != nil {
	log.Fatal(err)
}
defer c.Close()
Output:

func NewClient

func NewClient(p wamp.Peer, cfg Config) (*Client, error)

NewClient takes a connected Peer, joins the realm specified in cfg, and if successful, returns a new client.

NOTE: This method is exported for clients that use a Peer implementation not provided with the nexus package. Generally, clients are created using ConnectNet() or ConnectLocal().

func (*Client) Call

func (c *Client) Call(ctx context.Context, procedure string, options wamp.Dict, args wamp.List, kwargs wamp.Dict, progcb ProgressHandler) (*wamp.Result, error)

Call calls the procedure corresponding to the given URI.

If an ERROR message is received from the router, the error value returned can be type asserted to RPCError to provide access to the returned ERROR message. This may be necessary for the client application to process error data from the RPC invocation.

Call Canceling

The provided Context allows the caller to cancel a call, or to set a deadline that cancels the call when the deadline expires. There is no separate Cancel() API to do this. If the call is canceled before a result is received, then a CANCEL message is sent to the router to cancel the call according to the specified mode. The client's cancel mode can be set using SetCallCancelMode().

If the context is canceled or times out, then error returned will not be a RPCError. This allows the caller to distinguish between cancellation initiated by the client (by canceling context), and cancellation initialed elsewhere.

Call Timeout

If a timeout is provided in the options, and the callee supports call timeout, then the timeout value is passed to the callee so that the invocation can be canceled by the callee if the timeout is reached before a response is returned. This is the behavior implemented by the nexus client in the callee role.

The nexus router also supports automatic call cancellation by the router, after the timeout specified in the call options. This way, the router will automatically cancel the call even if the callee does not support it.

To request automatic call timeout, by the router and callee, specify a timeout in milliseconds: options["timeout"] = 30000

Caller Identification

A caller may request the disclosure of its identity (its WAMP session ID) to callees, if allowed by the dealer.

To request that this caller's identity disclosed to callees, set:

options["disclose_me"] = true

NOTE: Use consts defined in wamp/options.go instead of raw strings.

Progressive Call Results

A caller indicates its willingness to receive progressive results by supplying a ProgressHandler function to handle progressive results that are returned before the final result. Call returns when the final result is returned by the callee. The progress handler is guaranteed not to be called after Call returns.

There is no need to set the "receive_progress" option, as this is automatically set if a progress callback is provided.

IMPORTANT: If the context has a timeout, then the amount of time needs to be sufficient for the caller to receive all progressive results as well as the final result.

Example
// Configure and connect caller client.
ctx := context.Background()
cfg := Config{Realm: "nexus.realm1"}
caller, err := ConnectNet(ctx, "ws://localhost:8080/", cfg)
if err != nil {
	log.Fatal(err)
}
defer caller.Close()

// Create a context to cancel the call after 5 seconds if no response.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Test calling the "sum" procedure with args 1..10.
callArgs := wamp.List{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
result, err := caller.Call(ctx, "sum", nil, callArgs, nil, nil)
if err != nil {
	log.Fatal(err)
}

// Print the result.
sum, _ := wamp.AsInt64(result.Arguments[0])
log.Println("The sum is:", sum)
Output:

Example (Progressive)
// Configure and connect caller client.
ctx := context.Background()
cfg := Config{Realm: "nexus.realm1"}
caller, err := ConnectNet(ctx, "ws://localhost:8080/", cfg)
if err != nil {
	log.Fatal(err)
}
defer caller.Close()

// The progress handler prints test output as it arrives.
progHandler := func(result *wamp.Result) {
	// Received more progress ingo from callee.
	percentDone, _ := wamp.AsInt64(result.Arguments[0])
	log.Printf("Test is %d%% done", percentDone)
}

// Create a context to cancel the call in one minute if not finished.
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

// Call the example procedure "run.test".  The argument specifies how
// frequently to send progress updates.
result, err := caller.Call(
	ctx, "run.test", nil, wamp.List{time.Second * 2}, nil, progHandler)
if err != nil {
	log.Fatal("Failed to call procedure:", err)
}

// As a final result, in this example, the callee returns the boolean
// result of the test.
passFail, _ := result.Arguments[0].(bool)
if passFail {
	log.Println("Test passed")
} else {
	log.Println("Test failed")
}
Output:

func (*Client) Close

func (c *Client) Close() error

Close causes the client to leave the realm it has joined, and closes the connection to the router.

func (*Client) Connected

func (c *Client) Connected() bool

Connected returns true if the client is still connected to (receiving from) the router.

func (*Client) Done

func (c *Client) Done() <-chan struct{}

Done returns a channel that signals when the client is no longer connected to a router and has shutdown.

func (*Client) HasFeature

func (c *Client) HasFeature(role, feature string) bool

HasFeature returns true if the session has the specified feature for the specified role.

func (*Client) ID

func (c *Client) ID() wamp.ID

ID returns the client's session ID which is assigned after attaching to a router and joining a realm.

func (*Client) Logger

func (c *Client) Logger() stdlog.StdLog

Logger returns the clients logger that was provided by Config when the client was created, or the stdout logger if one was not provided in Config.

func (*Client) Publish

func (c *Client) Publish(topic string, options wamp.Dict, args wamp.List, kwargs wamp.Dict) error

Publish publishes an EVENT to all subscribed clients.

Publish Options

To receive a PUBLISHED response set:

options["acknowledge"] = true

To request subscriber blacklisting by subscriber, authid, or authrole, set:

options["exclude"] = [subscriberID, ...]
options["exclude_authid"] = ["authid", ..]
options["exclude_authrole"] = ["authrole", ..]

To request subscriber whitelisting by subscriber, authid, or authrole, set:

options["eligible"] = [subscriberID, ...]
options["eligible_authid"] = ["authid", ..]
options["eligible_authrole"] = ["authrole", ..]

When connecting to a nexus router, blacklisting and whitelisting can be used with any attribute assigned to the subscriber session, by setting:

options["exclude_xxx"] = [val1, val2, ..]

and

options["eligible_xxx"] = [val1, val2, ..]

where xxx is the name of any session attribute, typically supplied with the HELLO message.

To receive a published event, the subscriber session must not have any values that appear in a blacklist, and must have a value from each whitelist.

To request that this publisher's identity is disclosed to subscribers, set:

options["disclose_me"] = true

NOTE: Use consts defined in wamp/options.go instead of raw strings.

func (*Client) RealmDetails

func (c *Client) RealmDetails() wamp.Dict

RealmDetails returns the realm information received in the WELCOME message.

func (*Client) Register

func (c *Client) Register(procedure string, fn InvocationHandler, options wamp.Dict) error

Register registers the client to handle invocations of the specified procedure. The InvocationHandler is set to be called for each procedure call received.

If the registration handler wants to cancel the call without returning a result, then it should return InvocationCanceled.

Register Options

To request a pattern-based registration set:

options["match"] = "prefix" or "wildcard"

To request a shared registration pattern set:

options["invoke"] = "single", "roundrobin", "random", "first", "last"

To request that caller identification is disclosed to this callee, set:

options["disclose_caller"] = true

NOTE: Use consts defined in wamp/options.go instead of raw strings.

Example
// Configure and connect callee client.
ctx := context.Background()
cfg := Config{Realm: "nexus.realm1"}
callee, err := ConnectNet(ctx, "tcp://localhost:8080/", cfg)
if err != nil {
	log.Fatal(err)
}
defer callee.Close()

// Define function that is called to perform remote procedure.
sum := func(ctx context.Context, inv *wamp.Invocation) InvokeResult {
	var sum int64
	for _, arg := range inv.Arguments {
		n, _ := wamp.AsInt64(arg)
		sum += n
	}
	return InvokeResult{Args: wamp.List{sum}}
}

// Register procedure "sum"
if err = callee.Register("sum", sum, nil); err != nil {
	log.Fatal("Failed to register procedure:", err)
}

// Keep handling remote procedure calls until router exits.
<-callee.Done()
Output:

Example (Progressive)
// Configure and connect callee client.
ctx := context.Background()
cfg := Config{Realm: "nexus.realm1"}
callee, err := ConnectNet(ctx, "tcp://localhost:8080/", cfg)
if err != nil {
	log.Fatal(err)
}
defer callee.Close()

// Define function that is called to perform remote procedure.
sendData := func(ctx context.Context, inv *wamp.Invocation) InvokeResult {
	// Get update interval from caller.
	interval, _ := wamp.AsInt64(inv.Arguments[0])

	// Send simulated progress every interval.
	ticker := time.NewTicker(time.Duration(interval))
	defer ticker.Stop()
	for percentDone := 0; percentDone < 100; {
		<-ticker.C
		percentDone += 20
		if e := callee.SendProgress(ctx, wamp.List{percentDone}, nil); e != nil {
			// If send failed, return error saying the call is canceled.
			return InvocationCanceled
		}
	}
	// Send true as final result.
	return InvokeResult{Args: wamp.List{true}}
}

// Register example procedure.
if err = callee.Register("system_test", sendData, nil); err != nil {
	log.Fatal("Failed to register procedure:", err)
}

// Keep handling remote procedure calls until router exits.
<-callee.Done()
Output:

func (*Client) RegistrationID

func (c *Client) RegistrationID(procedure string) (regID wamp.ID, ok bool)

RegistrationID returns the registration ID for the specified procedure. If the client is not registered for the procedure, then returns false for second boolean return value.

func (*Client) RouterGoodbye

func (c *Client) RouterGoodbye() *wamp.Goodbye

RouterGoodbye returns the GOODBYE message received from the router, if one was received. The client must be disconnected from the router first, so first check that the channel returned by client.Done() is closed before calling this function.

func (*Client) SendProgress

func (c *Client) SendProgress(ctx context.Context, args wamp.List, kwArgs wamp.Dict) error

SendProgress is used by a Callee client to return progressive RPC results.

IMPORTANT: The context passed into SendProgress MUST be the same context that was passed into the invocation handler. This context is responsible for associating progressive results with the call in progress.

Example
// Configure and connect callee client.
ctx := context.Background()
cfg := Config{Realm: "nexus.realm1"}
callee, err := ConnectNet(ctx, "ws://localhost:8080/", cfg)
if err != nil {
	log.Fatal(err)
}
defer callee.Close()

// Define function that is called to perform remote procedure.
sendData := func(ctx context.Context, inv *wamp.Invocation) InvokeResult {
	// Get update interval from caller.
	interval, _ := wamp.AsInt64(inv.Arguments[0])

	// Send simulated progress every interval.
	ticker := time.NewTicker(time.Duration(interval))
	defer ticker.Stop()
	for percentDone := 0; percentDone < 100; {
		<-ticker.C
		percentDone += 20
		if e := callee.SendProgress(ctx, wamp.List{percentDone}, nil); e != nil {
			// If send failed, return error saying the call is canceled.
			return InvocationCanceled
		}
	}
	// Send true as final result.
	return InvokeResult{Args: wamp.List{true}}
}

// Register example procedure.
if err = callee.Register("system_test", sendData, nil); err != nil {
	log.Fatal("Failed to register procedure:", err)
}

// Keep handling remote procedure calls until router exits.
<-callee.Done()
Output:

func (*Client) SetCallCancelMode

func (c *Client) SetCallCancelMode(cancelMode string) error

SetCallCancelMode sets the client's call cancel mode to one of the following: "kill", "killnowait', "skip". Setting to "" specifies using the default value: "killnowait". The cancel mode is an option that is sent in a CANCEL message when a CALL is canceled.

Cancel Mode Behavior

"skip": The pending call is canceled and ERROR is sent immediately back to the caller. No INTERRUPT is sent to the callee and the result is discarded when received.

"kill": INTERRUPT is sent to the client, but ERROR is not returned to the caller until after the callee has responded to the canceled call. In this case the caller may receive RESULT or ERROR depending whether the callee finishes processing the invocation or the interrupt first.

"killnowait": The pending call is canceled and ERROR is sent immediately back to the caller. INTERRUPT is sent to the callee and any response to the invocation or interrupt from the callee is discarded when received.

If the callee does not support call canceling, then behavior is "skip".

func (*Client) Subscribe

func (c *Client) Subscribe(topic string, fn EventHandler, options wamp.Dict) error

Subscribe subscribes the client to the specified topic or topic pattern.

The specified EventHandler is registered to be called every time an event is received for the topic. The subscription can specify an exact event URI to match, or it can specify a URI pattern to match multiple events for the same handler by specifying the pattern type in options.

Subscribe Options

To request a pattern-based subscription set:

options["match"] = "prefix" or "wildcard"

NOTE: Use consts defined in wamp/options.go instead of raw strings.

Example
const (
	site1DbErrors  = "site1.db.errors"
	site2DbAny     = "site2.db."     // prefix match
	site1AnyAlerts = "site1..alerts" // wildcard match
)

// Configure and connect subscriber client.
ctx := context.Background()
cfg := Config{Realm: "nexus.realm1"}
subscriber, err := ConnectNet(ctx, "ws://localhost:8080/", cfg)
if err != nil {
	log.Fatal(err)
}
defer subscriber.Close()

// Define specific event handler.
handler := func(event *wamp.Event) {
	eventData, _ := wamp.AsString(event.Arguments[0])
	log.Printf("Event data for topic %s: %s", site1DbErrors, eventData)
}

// Subscribe to event.
err = subscriber.Subscribe(site1DbErrors, handler, nil)
if err != nil {
	log.Fatal("subscribe error:", err)
}

// Define pattern-based event handler.
patHandler := func(event *wamp.Event) {
	// Events received for pattern-based (prefix and wildcard)
	// subscriptions contain the matched topic in the details.
	eventTopic, _ := wamp.AsURI(event.Details["topic"])
	eventData, _ := wamp.AsString(event.Arguments[0])
	log.Printf("Event data for topic %s: %s", eventTopic, eventData)
}

// Subscribe to event with prefix match.
options := wamp.Dict{"match": "prefix"}
err = subscriber.Subscribe(site2DbAny, patHandler, options)
if err != nil {
	log.Fatal("subscribe error:", err)
}

// Subscribe to event with wildcard match.
options["match"] = "wildcard"
err = subscriber.Subscribe(site1AnyAlerts, patHandler, options)
if err != nil {
	log.Fatal("subscribe error:", err)
}

// Keep handling events until router exits.
<-subscriber.Done()
Output:

func (*Client) SubscribeChan

func (c *Client) SubscribeChan(topic string, events chan<- *wamp.Event, options wamp.Dict) error

SubscribeChan subscribes the client to the specified topic or topic pattern. Events are written to the provided channel.

No other incoming messages can be processed while blocked on writing events to this channel. When waiting for a message that is not delivered to this channel, that message will never be seen if a previously received event cannot be written to this channel.

func (*Client) SubscriptionID

func (c *Client) SubscriptionID(topic string) (subID wamp.ID, ok bool)

SubscriptionID returns the subscription ID for the specified topic. If the client does not have an active subscription to the topic, then returns false for second boolean return value.

func (*Client) Unregister

func (c *Client) Unregister(procedure string) error

Unregister removes the registration of a procedure from the router.

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(topic string) error

Unsubscribe removes the registered EventHandler from the topic.

type Config

type Config struct {
	// Realm is the URI of the realm the client will join.
	Realm string

	// HelloDetails contains details about the client.  The client provides the
	// roles, unless already supplied by the user.
	HelloDetails wamp.Dict

	// AuthHandlers is a map of authmethod to AuthFunc.  All authmethod keys
	// from this map are automatically added to HelloDetails["authmethods"]
	AuthHandlers map[string]AuthFunc

	// Enable debug logging for client.
	Debug bool

	// Logger for client to use.  If not set, client logs to os.Stderr.
	Logger stdlog.StdLog

	// ResponseTimeout specifies the amount of time that the client will block
	// waiting for a response from the router.  A value of 0 uses the default.
	ResponseTimeout time.Duration

	// Set to JSON or MSGPACK.  Default (zero-value) is JSON.
	Serialization serialize.Serialization

	// Provide a tls.Config to connect the client using TLS.  The zero
	// configuration specifies using defaults.  A nil tls.Config means do not
	// use TLS.
	TlsCfg *tls.Config

	// For local clients only.  This configures the router-to-client queue size
	// and is the maximum number of messages that are enqueued for the client
	// to read.  If the client does not read messages, then additional messages
	// from the router are dropped.  A value of zero specifies the default.
	LocalQueueSize int

	// Client receive limit for use with RawSocket transport.
	// If recvLimit is > 0, then the client will not receive messages with size
	// larger than the nearest power of 2 greater than or equal to recvLimit.
	// If recvLimit is <= 0, then the default of 16M is used.
	RecvLimit int

	// Websocket transport configuration.
	WsCfg transport.WebsocketConfig
}

Config configures a client with everything needed to begin a session with a WAMP router.

type EventHandler

type EventHandler func(event *wamp.Event)

EventHandler is a function that handles a publish event.

type InvocationHandler

type InvocationHandler func(context.Context, *wamp.Invocation) InvokeResult

InvocationHandler handles a remote procedure call.

The Context is used to signal that the router issued an INTERRUPT request to cancel the call-in-progress. The client application can use this to abandon what it is doing, if it chooses to pay attention to ctx.Done().

If the callee wishes to send progressive results, and the caller is willing to receive them, SendProgress() may be called from within an InvocationHandler for each progressive result to send to the caller. It is not required that the handler send any progressive results.

type InvokeResult

type InvokeResult struct {
	Args    wamp.List
	Kwargs  wamp.Dict
	Err     wamp.URI
	Options wamp.Dict
}

InvokeResult represents the result of invoking a procedure.

type ProgressHandler

type ProgressHandler func(*wamp.Result)

ProgressHandler is a type of function that is registered to asynchronously handle progressive results while Call is waiting for a final response.

type RPCError

type RPCError struct {
	Err       *wamp.Error
	Procedure string
}

RPCError is a wrapper for a WAMP ERROR message that is received as a result of a CALL. This allows the client application to type assert the error to a RPCError and inspect the the ERROR message contents, as may be necessary to process an error response from the callee.

func (RPCError) Error

func (rpce RPCError) Error() string

Error implements the error interface, returning an error string for the RPCError.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL