execrpc

package module
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2024 License: MIT Imports: 18 Imported by: 6

README

Tests on Linux, MacOS and Windows Go Report Card GoDoc

This library implements a simple, custom RPC protocol via os/exex and stdin and stdout. Both server and client comes in a raw ([]byte) and strongly typed variant (using Go generics).

A strongly typed client may look like this:

// Define the request, message and receipt types for the RPC call.
client, err := execrpc.StartClient(
	client, err := execrpc.StartClient(
	execrpc.ClientOptions[model.ExampleConfig, model.ExampleRequest, model.ExampleMessage, model.ExampleReceipt]{
		ClientRawOptions: execrpc.ClientRawOptions{
			Version: 1,
			Cmd:     "go",
			Dir:     "./examples/servers/typed",
			Args:    []string{"run", "."},
			Env:     env,
			Timeout: 30 * time.Second,
		},
		Config: model.ExampleConfig{},
		Codec:  codec,
	},
)

if err != nil {
	log.Fatal(err)
}

// Consume standalone messages (e.g. log messages) in its own goroutine.
go func() {
	for msg := range client.MessagesRaw() {
		fmt.Println("got message", string(msg.Body))
	}
}()

// Execute the request.
result := client.Execute(model.ExampleRequest{Text: "world"})

// Check for errors.
if err := result.Err(); err != nil {
	log.Fatal(err)
}

// Consume the messages.
for m := range result.Messages() {
	fmt.Println(m)
}

// Wait for the receipt.
receipt := <-result.Receipt()

// Check again for errors.
if err := result.Err(); err != nil {
	log.Fatal(err)
}

fmt.Println(receipt.Text)

// Close the client.
if err := client.Close(); err != nil {
	log.Fatal(err)
}

To get the best performance you should keep the client open as long as its needed – and store it as a shared object; it's safe and encouraged to call Execute from multiple goroutines.

And the server side of the above:


func main() {
	log.SetFlags(0)
	log.SetPrefix("readme-example: ")

	var clientConfig model.ExampleConfig

	server, err := execrpc.NewServer(
		execrpc.ServerOptions[model.ExampleConfig, model.ExampleRequest, model.ExampleMessage, model.ExampleReceipt]{
			// Optional function to provide a hasher for the ETag.
			GetHasher: func() hash.Hash {
				return fnv.New64a()
			},

			// Allows you to delay message delivery, and drop
			// them after reading the receipt (e.g. the ETag matches the ETag seen by client).
			DelayDelivery: false,

			// Optional function to initialize the server
			// with the client configuration.
			// This will be called once on server start.
			Init: func(cfg model.ExampleConfig) error {
				clientConfig = cfg
				return clientConfig.Init()
			},

			// Handle the incoming call.
			Handle: func(c *execrpc.Call[model.ExampleRequest, model.ExampleMessage, model.ExampleReceipt]) {
				// Raw messages are passed directly to the client,
				// typically used for log messages.
				c.SendRaw(
					execrpc.Message{
						Header: execrpc.Header{
							Version: 32,
							Status:  150,
						},
						Body: []byte("log message"),
					},
				)

				// Enqueue one or more messages.
				c.Enqueue(
					model.ExampleMessage{
						Hello: "Hello 1!",
					},
					model.ExampleMessage{
						Hello: "Hello 2!",
					},
				)

				c.Enqueue(
					model.ExampleMessage{
						Hello: "Hello 3!",
					},
				)

				// Wait for the framework generated receipt.
				receipt := <-c.Receipt()

				// ETag provided by the framework.
				// A hash of all message bodies.
				// fmt.Println("Receipt:", receipt.ETag)

				// Modify if needed.
				receipt.Size = uint32(123)
				receipt.Text = "echoed: " + c.Request.Text

				// Close the message stream and send the receipt.
				// Pass true to drop any queued messages,
				// this is only relevant if DelayDelivery is enabled.
				c.Close(false, receipt)
			},
		},
	)
	if err != nil {
		handleErr(err)
	}

	if err := server.Start(); err != nil {
		handleErr(err)
	}
}

func handleErr(err error) {
	log.Fatalf("error: failed to start typed echo server: %s", err)
}

Generate ETag

The server can generate an ETag for the messages. This is a hash of all message bodies.

To enable this:

  1. Provide a GetHasher function to the server options.
  2. Have the Receipt implement the TagProvider interface.

Note that there are three different optional E-interfaces for the Receipt:

  1. TagProvider for the ETag.
  2. SizeProvider for the size.
  3. LastModifiedProvider for the last modified timestamp.

A convenient struct that can be embedded in your Receipt that implements all of these is the Identity.

Status Codes

The status codes in the header between 1 and 99 are reserved for the system. This will typically be used to catch decoding/encoding errors on the server.

Documentation

Index

Constants

View Source
const (
	// MessageStatusOK is the status code for a successful and complete message exchange.
	MessageStatusOK = iota

	// MessageStatusContinue is the status code for a message that should continue the conversation.
	MessageStatusContinue

	// MessageStatusInitServer is the status code for a message used to initialize/configure the server.
	MessageStatusInitServer

	// MessageStatusErrDecodeFailed is the status code for a message that failed to decode.
	MessageStatusErrDecodeFailed
	// MessageStatusErrEncodeFailed is the status code for a message that failed to encode.
	MessageStatusErrEncodeFailed
	// MessageStatusErrInitServerFailed is the status code for a message that failed to initialize the server.
	MessageStatusErrInitServerFailed

	// MessageStatusSystemReservedMax is the maximum value for a system reserved status code.
	MessageStatusSystemReservedMax = 99
)

Variables

View Source
var (
	// ErrTimeoutWaitingForServer is returned on timeouts starting the server.
	ErrTimeoutWaitingForServer = errors.New("timed out waiting for server to start")
	// ErrTimeoutWaitingForCall is returned on timeouts waiting for a call to complete.
	ErrTimeoutWaitingForCall = errors.New("timed out waiting for call to complete")
)
View Source
var ErrShutdown = errors.New("connection is shut down")

ErrShutdown will be returned from Execute and Close if the client is or is about to be shut down.

Functions

This section is empty.

Types

type Call added in v0.8.0

type Call[Q, M, R any] struct {
	Request Q
	// contains filtered or unexported fields
}

Call is the request/response exchange between the client and server. Note that the stream parameter S is optional, set it to any if not used.

func (*Call[Q, M, R]) Close added in v0.8.0

func (c *Call[Q, M, R]) Close(drop bool, r R)

Close closes the call and sends andy buffered messages and the receipt back to the client. If drop is true, the buffered messages are dropped. Note that drop is only relevant if the server is configured with DelayDelivery set to true.

func (*Call[Q, M, R]) Enqueue added in v0.8.0

func (c *Call[Q, M, R]) Enqueue(rr ...M)

Enqueue enqueues one or more messages to be sent back to the client.

func (*Call[Q, M, R]) Receipt added in v0.8.0

func (c *Call[Q, M, R]) Receipt() <-chan R

func (*Call[Q, M, R]) SendRaw added in v0.8.0

func (c *Call[Q, M, R]) SendRaw(ms ...Message)

SendRaw sends one or more messages back to the client that is not part of the request/response exchange. These messages must have ID 0.

type Client

type Client[C, Q, M, R any] struct {
	// contains filtered or unexported fields
}

Client is a strongly typed RPC client.

func StartClient

func StartClient[C, Q, M, R any](opts ClientOptions[C, Q, M, R]) (*Client[C, Q, M, R], error)

StartClient starts a client for the given options.

func (*Client[C, Q, M, R]) Close

func (c *Client[C, Q, M, R]) Close() error

Close closes the client.

func (*Client[C, Q, M, R]) Execute

func (c *Client[C, Q, M, R]) Execute(r Q) Result[M, R]

Execute sends the request to the server and returns the result. You should check Err() both before and after reading from the messages and receipt channels.

func (*Client[C, Q, M, R]) MessagesRaw added in v0.8.0

func (c *Client[C, Q, M, R]) MessagesRaw() <-chan Message

MessagesRaw returns the raw messages from the server. These are not connected to the request-response flow, typically used for log messages etc.

type ClientOptions

type ClientOptions[C, Q, M, R any] struct {
	ClientRawOptions

	// The configuration to pass to the server.
	Config C

	// The codec to use.
	Codec codecs.Codec
}

ClientOptions are options for the client.

type ClientRaw

type ClientRaw struct {

	// Messages from the server that are not part of the request-response flow.
	Messages chan Message
	// contains filtered or unexported fields
}

ClientRaw is a raw RPC client. Raw means that the client doesn't do any type conversion, a byte slice is what you get.

func StartClientRaw

func StartClientRaw(opts ClientRawOptions) (*ClientRaw, error)

StartClientRaw starts a untyped client client for the given options.

func (*ClientRaw) Close

func (c *ClientRaw) Close() error

Close closes the server connection and waits for the server process to quit.

func (*ClientRaw) Execute

func (c *ClientRaw) Execute(withMessage func(m *Message), messages chan<- Message) error

Execute sends body to the server and sends any messages to the messages channel. It's safe to call Execute from multiple goroutines. The messages channel wil be closed when the call is done.

type ClientRawOptions

type ClientRawOptions struct {
	// Version number passed to the server.
	Version uint16

	// The server to start.
	Cmd string

	// The arguments to pass to the command.
	Args []string

	// Environment variables to pass to the command.
	// These will be merged with the environment variables of the current process,
	// vallues in this slice have precedence.
	// A slice of strings of the form "key=value"
	Env []string

	// Dir specifies the working directory of the command.
	// If Dir is the empty string, the command runs in the
	// calling process's current directory.
	Dir string

	// The timeout for the client.
	Timeout time.Duration
}

ClientRawOptions are options for the raw part of the client.

type Dispatcher

type Dispatcher interface {
	// SendMessage sends one or more message back to the client.
	SendMessage(...Message)
}

Dispatcher is the interface for dispatching messages to the client.

type Header struct {
	ID      uint32
	Version uint16
	Status  uint16
	Size    uint32
}

Header is the header of a message. ID and Size are set by the system. Status may be set by the system.

func (*Header) Read

func (h *Header) Read(r io.Reader) error

Read reads the header from the reader.

func (Header) Write

func (h Header) Write(w io.Writer) error

Write writes the header to the writer.

type Identity added in v0.8.0

type Identity struct {
	LastModified int64  `json:"lastModified"`
	ETag         string `json:"eTag"`
	Size         uint32 `json:"size"`
}

Identity holds the modified time (Unix seconds) and a 64-bit checksum.

func (Identity) GetELastModified added in v0.8.0

func (i Identity) GetELastModified() int64

GetELastModified returns the last modified time.

func (Identity) GetESize added in v0.8.0

func (i Identity) GetESize() uint32

GetESize returns the size.

func (Identity) GetETag added in v0.8.0

func (i Identity) GetETag() string

GetETag returns the checksum.

func (*Identity) SetELastModified added in v0.8.0

func (i *Identity) SetELastModified(t int64)

SetELastModified sets the last modified time.

func (*Identity) SetESize added in v0.8.0

func (i *Identity) SetESize(s uint32)

SetESize sets the size.

func (*Identity) SetETag added in v0.8.0

func (i *Identity) SetETag(s string)

SetETag sets the checksum.

type LastModifiedProvider added in v0.8.0

type LastModifiedProvider interface {
	GetELastModified() int64
	SetELastModified(int64)
}

LastModifiedProvider is the interface for a type that can provide a last modified time.

type Message

type Message struct {
	Header Header
	Body   []byte
}

Message is what gets sent to and from the server.

func (*Message) Read

func (m *Message) Read(r io.Reader) error

func (*Message) Write

func (m *Message) Write(w io.Writer) error

type Result added in v0.8.0

type Result[M, R any] struct {
	// contains filtered or unexported fields
}

Result is the result of a request with zero or more messages and the receipt.

func (Result[M, R]) Err added in v0.8.0

func (r Result[M, R]) Err() error

Err returns any error.

func (Result[M, R]) Messages added in v0.8.0

func (r Result[M, R]) Messages() <-chan M

Messages returns the messages from the server.

func (Result[M, R]) Receipt added in v0.8.0

func (r Result[M, R]) Receipt() <-chan R

Receipt returns the receipt from the server.

type Server

type Server[C, Q, M, R any] struct {
	*ServerRaw
	// contains filtered or unexported fields
}

Server is a stringly typed server for requests of type Q and responses of tye R.

func NewServer

func NewServer[C, Q, M, R any](opts ServerOptions[C, Q, M, R]) (*Server[C, Q, M, R], error)

NewServer creates a new Server. using the given options.

func (*Server[C, Q, M, R]) Start added in v0.8.0

func (s *Server[C, Q, M, R]) Start() error

type ServerOptions

type ServerOptions[C, Q, M, R any] struct {
	// Init is the function that will be called when the server is started.
	// It can be used to initialize the server with the given configuration.
	// If an error is returned, the server will stop.
	Init func(C) error

	// Handle is the function that will be called when a request is received.
	Handle func(*Call[Q, M, R])

	// Codec is the codec that will be used to encode and decode requests, messages and receipts.
	// The client will tell the server what codec is in use, so in most cases you should just leave this unset.
	Codec codecs.Codec

	// GetHasher returns the hash instance to be used for the response body
	// If it's not set or it returns nil, no hash will be calculated.
	GetHasher func() hash.Hash

	// Delay delivery of messages to the client until Close is called.
	// Close takes a drop parameter that will drop any buffered messages.
	// This can be useful if you want to check the server generated ETag,
	// maybe the client already has this data.
	DelayDelivery bool
}

ServerOptions is the options for a server.

type ServerRaw

type ServerRaw struct {
	// contains filtered or unexported fields
}

ServerRaw is a RPC server handling raw messages with a header and []byte body. See Server for a generic, typed version.

func NewServerRaw

func NewServerRaw(opts ServerRawOptions) (*ServerRaw, error)

NewServerRaw creates a new Server using the given options.

func (*ServerRaw) Start

func (s *ServerRaw) Start() error

Start sets upt the server communication and starts the server loop.

type ServerRawOptions

type ServerRawOptions struct {
	// Call is the message exhcange between the client and server.
	// Note that any error returned by this function will be treated as a fatal error and the server is stopped.
	// Validation errors etc. should be returned in the response message.
	// Message passed to the Dispatcher as part of the request/response must
	// use the same ID as the request.
	// ID 0 is reserved for standalone messages (e.g. log messages).
	Call func(Message, Dispatcher) error
}

ServerRawOptions is the options for a raw portion of the server.

type SizeProvider added in v0.8.0

type SizeProvider interface {
	GetESize() uint32
	SetESize(uint32)
}

SizeProvider is the interface for a type that can provide a size.

type TagProvider added in v0.8.0

type TagProvider interface {
	GetETag() string
	SetETag(string)
}

TagProvider is the interface for a type that can provide a eTag.

Directories

Path Synopsis
examples
servers/raw Module
servers/typed Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL