comm

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2019 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package comm contains Client API and communication layer for theia server.

Communication to theia server is established via websocket channel. The default implementation of the Client library that communicates to theia server is based on websockets implementation.

The Client API implements the basic functionalities that theia offers: send (publish) an Event to theia server; find past events; and receive events from the server in real time.

Here is an example of establishing connection to theia server and publishing an event:

import (
	"github.com/theia-log/selene/comm"
	"github.com/theia-log/selene/model"
)

func main() {
	// Create new client to the server
	client := comm.NewWebsocketClient("ws://localhost:6433")

	// Send the event
	if err := client.Send(&model.Event{
		ID: 		model.NewEventID(),
		Timestamp: 	1550695140.89999200,
		Source: 	"/dev/sensors/temp-sensor",
		Tags: 		[]string{"sensor", "home", "temp"},
		Content: 	"10C",
	}); err != nil {
		panic(err)
	}
}

Reading events from the server is done in an asynchronous way. The functions that read from the server return a read channel that will push new events as they come from the server.

An example of looking up past event using EventFilter:

import (
	"log"

	"github.com/theia-log/selene/comm"
	"github.com/theia-log/selene/model"
)

func main() {
	// Create new client to the server
	client := comm.NewWebsocketClient("ws://localhost:6433")

	respChan, err := client.Find(&comm.EventFilter{
		Start: 	1550710745.10, 	// return only events that happened after
								// this time
		End:	1550710746.90,	// but before this time
		Tags: 	[]string{"sensor", "temp.+"},	// events that contain
												// these tags
		Content: "\\d+C",		// content that matches this regex
		Order: 	"asc",			// ascending, by timestamp
	})

	if err != nil {
		panic(err)
	}

	for {
		resp, ok := <- respChan
		if !ok {
			break	// we're done, no more events
		}
		if resp.Error != nil {
			// an error occurred, log it
			log.Println("[ERROR]: ", resp.Error.Error())
			continue
		}
		log.Println(resp.Event.Dump())	// print the event
	}
}

Tracking events in real-time looks exactly the same as looking up past events. To receive the events in real time, we use comm.Client.Receive() function:

import (
	"log"

	"github.com/theia-log/selene/comm"
	"github.com/theia-log/selene/model"
)

func main() {
	// Create new client to the server
	client := comm.NewWebsocketClient("ws://localhost:6433")

	// Receive reads events in real-time
	respChan, err := client.Receive(&comm.EventFilter{
		Start: 	1550710745.10, 	// return only events that happened after
								// this time
		Tags: 	[]string{"sensor", "temp.+"},	// events that contain
												// these tags
		Content: "\\d+C",		// content that matches this regex
	})

	if err != nil {
		panic(err)
	}

	for {
		resp, ok := <- respChan
		if !ok {
			break	// Server closed the connection, we're done.
		}
		if resp.Error != nil {
			// an error occurred, log it
			log.Println("[ERROR]: ", resp.Error.Error())
			continue
		}
		log.Println(resp.Event.Dump())	// print the event
	}
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client interface {
	// Send publishes an event to the remote server.
	// Returns an error if the client fails to send the event.
	Send(event *model.Event) error

	// Receive opens a channel for real-time events to the server.
	// The events that match the EventFilter are returned of the EventResponse
	// channel.
	// If the client fails to open a real-time event channel to the server, an
	// error is returned.
	// It should be noted that the server will never close this type of channel.
	// The responsibility for closing the connection is on the client side.
	Receive(filter *EventFilter) (chan *EventResponse, error)

	// Find performs a lookup for past events on the server.
	// The server will return all the events that match the EventFilter.
	// The events are returned as they are found and are published on the
	// EventResponse channel.
	// If the client fails to open the channel or other error occurs during
	// establishing the connection or while setting the filter, an error will
	// be returned.
	// The server will automatically close the connection once all of the
	// matching events have been returned to the client.
	Find(filter *EventFilter) (chan *EventResponse, error)
}

Client interface describes the client API for a theia server. Defines methods to send events and to query both past and real-time events. The querying operations (Receive, Find) are both streaming and asynchronous, meaning that the number of events to be returned is not known and the server returns (streams) the events as the arrive. These functions return a chan to listen on, and events are decoded and published on the channel as they arrive.

type EventFilter

type EventFilter struct {
	// Match events that happened after this time. This is required for
	// filtering both past and real-time events.
	Start float64 `json:"start,omitempty"`

	// Match events that happened before this timestamp. Optional.
	End *float64 `json:"end,omitempty"`

	// Tags is a list of possible values to match for tags. Each value may be a
	// regular expression. Matches the event only if all patterns are found in
	// the event tag list.
	Tags []string `json:"tags,omitempty"`

	// Match the content of the event. This value is evaluated as regular
	// expression.
	Content *string `json:"content,omitempty"`

	// Order in which to return the events. Makes sense only for past events.
	Order *EventOrder `json:"order,omitempty"`
}

EventFilter holds values for filtering events. This structure is used when filtering past events and filering real-time events as well.

func Filter

func Filter(start float64) *EventFilter

Filter creates new EventFilter with start timestamp.

func (*EventFilter) DumpBytes

func (f *EventFilter) DumpBytes() ([]byte, error)

DumpBytes serializes the event filter values as bytes. Theia expects the filter in JSON format, so this function serializes the filter data to JSON, then encodes in UTF-8.

func (*EventFilter) MatchContent

func (f *EventFilter) MatchContent(content string) *EventFilter

MatchContent sets the matcher for the content of this EventFilter. Returns pointer to this EventFilter.

func (*EventFilter) MatchEnd

func (f *EventFilter) MatchEnd(end float64) *EventFilter

MatchEnd sets the end timestamp for this EventFilter. Match events that happened before this time. Returns pointer to this EventFilter.

func (*EventFilter) MatchTag

func (f *EventFilter) MatchTag(tag ...string) *EventFilter

MatchTag adds a matcher to the list of tags matchers of this EventFilter. Returns pointer to this EventFilter.

func (*EventFilter) OrderAsc

func (f *EventFilter) OrderAsc() *EventFilter

OrderAsc set the filter order to ascending. Returns pointer to this EventFilter.

func (*EventFilter) OrderDesc

func (f *EventFilter) OrderDesc() *EventFilter

OrderDesc sets the filter order to descending. Returns pointer to this EventFilter.

type EventOrder

type EventOrder string

EventOrder is the order in which the events should be returned. Can be either 'asc' - ascending, or 'desc' - descending.

const OrderAsc EventOrder = "asc"

OrderAsc sort in ascending order.

const OrderDesc EventOrder = "desc"

OrderDesc sort in descending order.

type EventResponse

type EventResponse struct {
	// The event received from the server. In case of error it may be nil.
	Event *model.Event

	// The error that occurred.
	// If the read was successful, this will be set to nil.
	Error error
}

EventResponse holds an Event or an Error. Used to pass data over a channel from a query operation.

type Message

type Message []byte

Message is a websocket message represented as an array of bytes.

func (Message) EqualsTo

func (m Message) EqualsTo(message []byte) bool

EqualsTo check if this message is equal to another message data.

type OnMessageHandler

type OnMessageHandler func([]byte) error

OnMessageHandler gets called when a message is received.

type WebsocketClient

type WebsocketClient struct {
	// contains filtered or unexported fields
}

WebsocketClient implements the Client interface. Implements a client to a particular Theia server. Connections to the theia actions, like /event, /find and /live are reused if possible - new connections will not be opened if a channel is already established on the endpoint.

func NewWebsocketClient

func NewWebsocketClient(serverURL string) *WebsocketClient

NewWebsocketClient creates new websocket Client to theia server on the given server URL.

func (*WebsocketClient) Find

func (w *WebsocketClient) Find(filter *EventFilter) (chan *EventResponse, error)

Find looks up past events that match the given EventFilter.

func (*WebsocketClient) Receive

func (w *WebsocketClient) Receive(filter *EventFilter) (chan *EventResponse, error)

Receive opens a channel for real-time events that match the EventFilter.

func (*WebsocketClient) Send

func (w *WebsocketClient) Send(event *model.Event) error

Send send an event to the server.

type WebsocketMock

type WebsocketMock struct {
	MockURL string

	Errors []error
	// contains filtered or unexported fields
}

WebsocketMock implements a mock specification for websocket server.

func NewWebsocketMock

func NewWebsocketMock() *WebsocketMock

NewWebsocketMock constructs a new websocket mock to be used when testing.

func (*WebsocketMock) AddError

func (w *WebsocketMock) AddError(err error) *WebsocketMock

AddError adds an error to the mock object. The errors are kept sequentially as they are added.

func (*WebsocketMock) Expect

func (w *WebsocketMock) Expect(mesage string) *WebsocketMock

Expect expect to receive a message with the given value.

func (*WebsocketMock) HandleReceivedMessage

func (w *WebsocketMock) HandleReceivedMessage(handler OnMessageHandler) *WebsocketMock

HandleReceivedMessage add handler for received messages.

func (*WebsocketMock) Respond

func (w *WebsocketMock) Respond(message string) *WebsocketMock

Respond responds to the client websocket with the given message after the first message has been received.

func (*WebsocketMock) Terminate

func (w *WebsocketMock) Terminate() error

Terminate terminates and closes the server connection.

func (*WebsocketMock) WaitRequestsToComplete

func (w *WebsocketMock) WaitRequestsToComplete(n int)

WaitRequestsToComplete can be called to wait until the whole handling of incoming and outgoing messages has been completed. You must specify the number of messages to be handled before the execution can continue and this method returns control.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL