frugal

package module
v0.0.0-...-a06991b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: Apache-2.0 Imports: 21 Imported by: 58

README

frugal-go

Go library for Frugal.

Documentation

Overview

Package frugal provides the library APIs used by the Frugal code generator.

Index

Constants

View Source
const (
	// Inherited from thrift
	TRANSPORT_EXCEPTION_UNKNOWN      = thrift.UNKNOWN_TRANSPORT_EXCEPTION
	TRANSPORT_EXCEPTION_NOT_OPEN     = thrift.NOT_OPEN
	TRANSPORT_EXCEPTION_ALREADY_OPEN = thrift.ALREADY_OPEN
	TRANSPORT_EXCEPTION_TIMED_OUT    = thrift.TIMED_OUT
	TRANSPORT_EXCEPTION_END_OF_FILE  = thrift.END_OF_FILE

	// TRANSPORT_EXCEPTION_REQUEST_TOO_LARGE is a TTransportException
	// error type indicating the request exceeded the size limit.
	TRANSPORT_EXCEPTION_REQUEST_TOO_LARGE = 100

	// TRANSPORT_EXCEPTION_RESPONSE_TOO_LARGE is a TTransportException
	// error type indicating the response exceeded the size limit.
	TRANSPORT_EXCEPTION_RESPONSE_TOO_LARGE = 101

	// TRANSPORT_EXCEPTION_DISCONNECTED is a TTransportException error type
	// indicating the transport was disconnected
	TRANSPORT_EXCEPTION_DISCONNECTED = 102

	// TRANSPORT_EXCEPTION_SERVICE_NOT_AVAILABLE is a TTransportException
	// error type indicating the service host, subject, etc. was not found.
	TRANSPORT_EXCEPTION_SERVICE_NOT_AVAILABLE = 103
)

TTransportException types used in frugal instantiated TTransportExceptions.

View Source
const (
	// Inherited from thrift
	APPLICATION_EXCEPTION_UNKNOWN                 = thrift.UNKNOWN_APPLICATION_EXCEPTION
	APPLICATION_EXCEPTION_UNKNOWN_METHOD          = thrift.UNKNOWN_METHOD
	APPLICATION_EXCEPTION_INVALID_MESSAGE_TYPE    = thrift.INVALID_MESSAGE_TYPE_EXCEPTION
	APPLICATION_EXCEPTION_WRONG_METHOD_NAME       = thrift.WRONG_METHOD_NAME
	APPLICATION_EXCEPTION_BAD_SEQUENCE_ID         = thrift.BAD_SEQUENCE_ID
	APPLICATION_EXCEPTION_MISSING_RESULT          = thrift.MISSING_RESULT
	APPLICATION_EXCEPTION_INTERNAL_ERROR          = thrift.INTERNAL_ERROR
	APPLICATION_EXCEPTION_PROTOCOL_ERROR          = thrift.PROTOCOL_ERROR
	APPLICATION_EXCEPTION_INVALID_TRANSFORM       = 8
	APPLICATION_EXCEPTION_INVALID_PROTOCOL        = 9
	APPLICATION_EXCEPTION_UNSUPPORTED_CLIENT_TYPE = 10

	// APPLICATION_EXCEPTION_RESPONSE_TOO_LARGE is a TApplicationException
	// error type indicating the response exceeded the size limit.
	APPLICATION_EXCEPTION_RESPONSE_TOO_LARGE = 100
)

TApplicationException types used in frugal instantiated TApplicationExceptions.

View Source
const (
	RequestReceivedTimeKey = "request_received_time"
)

Variables

This section is empty.

Functions

func DefaultFNatsServerOnRequestFinished

func DefaultFNatsServerOnRequestFinished(properties map[interface{}]interface{})

DefaultFNatsServerOnRequestFinished is the default handler called when an FNatsServer finishes processing a message. If does nothing

func DefaultFNatsServerOnRequestReceived

func DefaultFNatsServerOnRequestReceived(properties map[interface{}]interface{})

DefaultFNatsServerOnRequestReceived is the default handler called when an FNatsServer receives a message. It adds the time the request was received to the passed in properties.

func IsErrTooLarge

func IsErrTooLarge(err error) bool

IsErrTooLarge indicates if the given error is a TTransportException indicating an oversized request or response.

func NewDefaultFNatsServerOnRequestStarted

func NewDefaultFNatsServerOnRequestStarted(highWatermark time.Duration) func(map[interface{}]interface{})

NewDefaultFNatsServerOnRequestStarted constructs a default handler for when an FNatsServer starts processing a message. It checks the current time against a start time in the passed in properties, and logs a warning if the difference is over a threshold.

func NewFrugalHandlerFunc

func NewFrugalHandlerFunc(processor FProcessor, protocolFactory *FProtocolFactory) http.HandlerFunc

NewFrugalHandlerFunc is a function that creates a ready to use Frugal handler function.

func NewTFramedTransportFactory

func NewTFramedTransportFactory(factory thrift.TTransportFactory) thrift.TTransportFactory

NewTFramedTransportFactory creates a new TTransportFactory that produces TFramedTransports.

func NewTFramedTransportFactoryMaxLength

func NewTFramedTransportFactoryMaxLength(factory thrift.TTransportFactory, maxLength uint32) thrift.TTransportFactory

NewTFramedTransportFactoryMaxLength creates a new TTransportFactory that produces TFramedTransports with the given max length.

func SetLogger

func SetLogger(logger *logrus.Logger)

SetLogger sets the Logger used by Frugal.

func ToContext

func ToContext(fctx FContext) (context.Context, context.CancelFunc)

ToContext converts a FContext to a context.Context for integration with thrift.

func WriteBinary

func WriteBinary(p thrift.TProtocol, value []byte, name string, field int16) error

WriteBinary writes []byte `value` of field name and id `name` and `field` respectively into `p`.

func WriteBinaryWithContext

func WriteBinaryWithContext(ctx context.Context, p thrift.TProtocol, value []byte, name string, field int16) error

WriteBinaryWithContext is the same as WriteBinary with a context.Context.

func WriteBool

func WriteBool(p thrift.TProtocol, value bool, name string, field int16) error

WriteBool writes bool `value` of field name and id `name` and `field` respectively into `p`.

func WriteBoolWithContext

func WriteBoolWithContext(ctx context.Context, p thrift.TProtocol, value bool, name string, field int16) error

WriteBoolWithContext is the same as WriteBool with a context.Context.

func WriteByte

func WriteByte(p thrift.TProtocol, value int8, name string, field int16) error

WriteByte writes byte `value` of field name and id `name` and `field` respectively into `p`.

func WriteByteWithContext

func WriteByteWithContext(ctx context.Context, p thrift.TProtocol, value int8, name string, field int16) error

WriteByteWithContext is the same as WriteByte with a context.Context.

func WriteDouble

func WriteDouble(p thrift.TProtocol, value float64, name string, field int16) error

WriteDouble writes float64 `value` of field name and id `name` and `field` respectively into `p`.

func WriteDoubleWithContext

func WriteDoubleWithContext(ctx context.Context, p thrift.TProtocol, value float64, name string, field int16) error

WriteDoubleWithContext is the same as WriteDouble with a context.Context.

func WriteI16

func WriteI16(p thrift.TProtocol, value int16, name string, field int16) error

WriteI16 writes int16 `value` of field name and id `name` and `field` respectively into `p`.

func WriteI16WithContext

func WriteI16WithContext(ctx context.Context, p thrift.TProtocol, value int16, name string, field int16) error

WriteI16WithContext is the same as WriteI16 with a context.Context.

func WriteI32

func WriteI32(p thrift.TProtocol, value int32, name string, field int16) error

WriteI32 writes int32 `value` of field name and id `name` and `field` respectively into `p`.

func WriteI32WithContext

func WriteI32WithContext(ctx context.Context, p thrift.TProtocol, value int32, name string, field int16) error

WriteI32WithContext is the same as WriteI32 with a context.Context.

func WriteI64

func WriteI64(p thrift.TProtocol, value int64, name string, field int16) error

WriteI64 writes int64 `value` of field name and id `name` and `field` respectively into `p`.

func WriteI64WithContext

func WriteI64WithContext(ctx context.Context, p thrift.TProtocol, value int64, name string, field int16) error

WriteI64WithContext is the same as WriteI64 with a context.Context.

func WriteString

func WriteString(p thrift.TProtocol, value, name string, field int16) error

WriteString writes string `value` of field name and id `name` and `field` respectively into `p`.

func WriteStringWithContext

func WriteStringWithContext(ctx context.Context, p thrift.TProtocol, value, name string, field int16) error

WriteStringWithContext is the same as WriteString with a context.Context.

func WriteStruct

func WriteStruct(p thrift.TProtocol, value thrift.TStruct, name string, field int16) error

WriteStruct writes thrift.Struct of filed and id `name` and `field` respectively into `p`.

func WriteStructWithContext

func WriteStructWithContext(ctx context.Context, p thrift.TProtocol, value thrift.TStruct, name string, field int16) error

WriteStructWithContext is the same as WriteStruct with a context.Context.

Types

type Arguments

type Arguments []interface{}

Arguments contains the arguments to a service method. The first argument will always be the FContext.

func (Arguments) Context

func (a Arguments) Context() FContext

Context returns the first argument value as an FContext.

func (Arguments) SetContext

func (a Arguments) SetContext(ctx FContext)

Context sets the given FContext as the first argument.

type BaseFTransportMonitor

type BaseFTransportMonitor struct {
	MaxReopenAttempts uint
	InitialWait       time.Duration
	MaxWait           time.Duration
}

BaseFTransportMonitor is a default monitor implementation that attempts to re-open a closed transport with exponential backoff behavior and a capped number of retries. Its behavior can be customized by embedding this struct type in a new struct which "overrides" desired callbacks.

func (*BaseFTransportMonitor) OnClosedCleanly

func (m *BaseFTransportMonitor) OnClosedCleanly()

OnClosedCleanly is called when the transport is closed cleanly by a call to Close()

func (*BaseFTransportMonitor) OnClosedUncleanly

func (m *BaseFTransportMonitor) OnClosedUncleanly(cause error) (bool, time.Duration)

OnClosedUncleanly is called when the transport is closed for a reason *other* than a call to Close(). Returns whether to try reopening the transport and, if so, how long to wait before making the attempt.

func (*BaseFTransportMonitor) OnReopenFailed

func (m *BaseFTransportMonitor) OnReopenFailed(prevAttempts uint, prevWait time.Duration) (bool, time.Duration)

OnReopenFailed is called when an attempt to reopen the transport fails. Given the number of previous attempts to re-open the transport and the length of the previous wait. Returns whether to attempt to re-open the transport, and how long to wait before making the attempt.

func (*BaseFTransportMonitor) OnReopenSucceeded

func (m *BaseFTransportMonitor) OnReopenSucceeded()

OnReopenSucceeded is called after the transport has been successfully re-opened.

type FAsyncCallback

type FAsyncCallback func(thrift.TTransport) error

FAsyncCallback is an internal callback which is constructed by generated code and invoked by an FRegistry when a RPC response is received. In other words, it's used to complete RPCs. The operation ID on FContext is used to look up the appropriate callback. FAsyncCallback is passed an in-memory TTransport which wraps the complete message. The callback returns an error or throws an exception if an unrecoverable error occurs and the transport needs to be shutdown.

type FBaseProcessor

type FBaseProcessor struct {
	// contains filtered or unexported fields
}

FBaseProcessor is a base implementation of FProcessor. FProcessors should embed this and register FProcessorFunctions. This should only be used by generated code.

func NewFBaseProcessor

func NewFBaseProcessor() *FBaseProcessor

NewFBaseProcessor returns a new FBaseProcessor which FProcessors can extend.

func (*FBaseProcessor) AddMiddleware

func (f *FBaseProcessor) AddMiddleware(middleware ServiceMiddleware)

AddMiddleware adds the given ServiceMiddleware to the FProcessor. This should only be called before the server is started.

func (*FBaseProcessor) AddToAnnotationsMap

func (f *FBaseProcessor) AddToAnnotationsMap(method string, annotations map[string]string)

AddToAnnotationsMap registers the given annotations to the given method.

func (*FBaseProcessor) AddToProcessorMap

func (f *FBaseProcessor) AddToProcessorMap(key string, proc FProcessorFunction)

AddToProcessorMap registers the given FProcessorFunction.

func (*FBaseProcessor) Annotations

func (f *FBaseProcessor) Annotations() map[string]map[string]string

Annotations returns a map of method name to annotations as defined in the service IDL that is serviced by this processor.

func (*FBaseProcessor) GetWriteMutex

func (f *FBaseProcessor) GetWriteMutex() *sync.Mutex

GetWriteMutex returns the Mutex which FProcessorFunctions should use to synchronize access to the output FProtocol.

func (*FBaseProcessor) Process

func (f *FBaseProcessor) Process(iprot, oprot *FProtocol) error

Process the request from the input protocol and write the response to the output protocol.

type FBaseProcessorFunction

type FBaseProcessorFunction struct {
	// contains filtered or unexported fields
}

FBaseProcessorFunction is a base implementation of FProcessorFunction. FProcessorFunctions should embed this. This should only be used by generated code.

func NewFBaseProcessorFunction

func NewFBaseProcessorFunction(writeMu *sync.Mutex, handler *Method) *FBaseProcessorFunction

NewFBaseProcessorFunction returns a new FBaseProcessorFunction which FProcessorFunctions can extend.

func (*FBaseProcessorFunction) AddMiddleware

func (f *FBaseProcessorFunction) AddMiddleware(middleware ServiceMiddleware)

AddMiddleware adds the given ServiceMiddleware to the FProcessorFunction. This should only be called before the server is started.

func (*FBaseProcessorFunction) GetWriteMutex deprecated

func (f *FBaseProcessorFunction) GetWriteMutex() *sync.Mutex

GetWriteMutex returns the Mutex which should be used to synchronize access to the output FProtocol.

Deprecated: use SendError or SendReply instead!

func (*FBaseProcessorFunction) InvokeMethod

func (f *FBaseProcessorFunction) InvokeMethod(args []interface{}) Results

InvokeMethod invokes the handler method.

func (*FBaseProcessorFunction) SendError

func (f *FBaseProcessorFunction) SendError(fctx FContext, oprot *FProtocol, kind int32, method, message string) error

SendError writes the error to the desired transport

func (*FBaseProcessorFunction) SendReply

func (f *FBaseProcessorFunction) SendReply(fctx FContext, oprot *FProtocol, method string, result thrift.TStruct) error

SendReply ...

type FClient

type FClient interface {
	Open() error  // holdover from publisher refactor, remove in frugal v4
	Close() error // holdover from publisher refactor, remvoe in frugal v4
	Call(ctx FContext, method string, args, result thrift.TStruct) error
	Oneway(ctx FContext, method string, args thrift.TStruct) error
	Publish(ctx FContext, op, topic string, message thrift.TStruct) error
}

FClient ...

type FContext

type FContext interface {

	// CorrelationID returns the correlation id for the context.
	CorrelationID() string

	// AddRequestHeader adds a request header to the context for the given
	// name. The headers _cid and _opid are reserved. Returns the same FContext
	// to allow for chaining calls.
	AddRequestHeader(name, value string) FContext

	// RequestHeader gets the named request header.
	RequestHeader(name string) (string, bool)

	// RequestHeaders returns the request headers map.
	RequestHeaders() map[string]string

	// AddResponseHeader adds a response header to the context for the given
	// name. The _opid header is reserved. Returns the same FContext to allow
	// for chaining calls.
	AddResponseHeader(name, value string) FContext

	// ResponseHeader gets the named response header.
	ResponseHeader(name string) (string, bool)

	// ResponseHeaders returns the response headers map.
	ResponseHeaders() map[string]string

	// SetTimeout sets the request timeout. Default is 5 seconds. Returns the
	// same FContext to allow for chaining calls.
	SetTimeout(timeout time.Duration) FContext

	// Timeout returns the request timeout.
	Timeout() time.Duration
}

FContext is the context for a Frugal message. Every RPC has an FContext, which can be used to set request headers, response headers, and the request timeout. The default timeout is five seconds. An FContext is also sent with every publish message which is then received by subscribers.

As a best practice, the request headers of an inbound FContext should not be modified, and outbound FContext instances should not be reused. Instead, the inbound FContext should be cloned before each outbound call.

In addition to headers, the FContext also contains a correlation ID which can be used for distributed tracing purposes. A random correlation ID is generated for each FContext if one is not provided.

FContext also plays a key role in Frugal's multiplexing support. A unique, per-request operation ID is set on every FContext before a request is made. This operation ID is sent in the request and included in the response, which is then used to correlate a response to a request. The operation ID is an internal implementation detail and is not exposed to the user.

An FContext should belong to a single request for the lifetime of that request. It can be reused once the request has completed, though they should generally not be reused.

Implementations of FContext must adhere to the following:

  1. The CorrelationID should be stored as a request header with the header name "_cid"
  2. Threadsafe

func Clone

func Clone(ctx FContext) FContext

Clone performs a deep copy of an FContext while handling opids correctly. TODO 4.0 consider adding this to the FContext interface.

func NewFContext

func NewFContext(correlationID string) FContext

NewFContext returns a Context for the given correlation id. If an empty correlation id is given, one will be generated. A Context should belong to a single request for the lifetime of the request. It can be reused once its request has completed, though they should generally not be reused.

type FContextImpl

type FContextImpl struct {
	// contains filtered or unexported fields
}

FContextImpl is an implementation of FContext.

func (*FContextImpl) AddEphemeralProperty

func (c *FContextImpl) AddEphemeralProperty(key, value interface{}) FContext

AddEphemeralProperty adds a keyp-value pair to the ephemeral properties.

func (*FContextImpl) AddRequestHeader

func (c *FContextImpl) AddRequestHeader(name, value string) FContext

AddRequestHeader adds a request header to the context for the given name. The headers _cid and _opid are reserved. Returns the same FContext to allow for chaining calls.

func (*FContextImpl) AddResponseHeader

func (c *FContextImpl) AddResponseHeader(name, value string) FContext

AddResponseHeader adds a response header to the context for the given name. The _opid header is reserved. Returns the same FContext to allow for chaining calls.

func (*FContextImpl) Clone

Clone performs a deep copy of an FContextWithEphemeralProperties while handling opids correctly.

func (*FContextImpl) CorrelationID

func (c *FContextImpl) CorrelationID() string

CorrelationID returns the correlation id for the context.

func (*FContextImpl) EphemeralProperties

func (c *FContextImpl) EphemeralProperties() map[interface{}]interface{}

EphemeralProperties returns a copy of the ephemeral properties map.

func (*FContextImpl) EphemeralProperty

func (c *FContextImpl) EphemeralProperty(key interface{}) (interface{}, bool)

EphemeralProperty gets the property associated with the given key.

func (*FContextImpl) RequestHeader

func (c *FContextImpl) RequestHeader(name string) (string, bool)

RequestHeader gets the named request header.

func (*FContextImpl) RequestHeaders

func (c *FContextImpl) RequestHeaders() map[string]string

RequestHeaders returns the request headers map.

func (*FContextImpl) ResponseHeader

func (c *FContextImpl) ResponseHeader(name string) (string, bool)

ResponseHeader gets the named response header.

func (*FContextImpl) ResponseHeaders

func (c *FContextImpl) ResponseHeaders() map[string]string

ResponseHeaders returns the response headers map.

func (*FContextImpl) SetTimeout

func (c *FContextImpl) SetTimeout(timeout time.Duration) FContext

SetTimeout sets the request timeout. Default is 5 seconds. Returns the same FContext to allow for chaining calls.

func (*FContextImpl) Timeout

func (c *FContextImpl) Timeout() time.Duration

Timeout returns the request timeout.

type FContextWithEphemeralProperties

type FContextWithEphemeralProperties interface {
	FContext

	// Clone performs a deep copy of an FContextWithEphemeralProperties while
	// handling opids correctly.
	Clone() FContextWithEphemeralProperties

	// EphemeralProperty gets the property associated with the given key.
	EphemeralProperty(key interface{}) (interface{}, bool)

	// EphemeralProperties returns a copy of the ephemeral properties map.
	EphemeralProperties() map[interface{}]interface{}

	// AddEphemeralProperty adds a keyp-value pair to the ephemeral properties.
	AddEphemeralProperty(key, value interface{}) FContext
}

FContextWithEphemeralProperties is an extension of the FContext interface with support for ephemeral properties. Ephemeral properties are a map of key-value pairs that won't be serialized with the rest of the FContext. TODO 4.0 add this to the FContext interface

type FHTTPTransportBuilder

type FHTTPTransportBuilder struct {
	// contains filtered or unexported fields
}

FHTTPTransportBuilder configures and builds HTTP FTransport instances.

func NewFHTTPTransportBuilder

func NewFHTTPTransportBuilder(client *http.Client, url string) *FHTTPTransportBuilder

NewFHTTPTransportBuilder creates a builder which configures and builds HTTP FTransport instances.

func (*FHTTPTransportBuilder) Build

func (h *FHTTPTransportBuilder) Build() FTransport

Build a new configured HTTP FTransport.

func (*FHTTPTransportBuilder) WithRequestHeaders

func (h *FHTTPTransportBuilder) WithRequestHeaders(requestHeaders map[string]string) *FHTTPTransportBuilder

withRequestHeaders adds custom request headers. If set to nil (the default), there is no size limit on responses.

func (*FHTTPTransportBuilder) WithRequestHeadersFromFContext

func (h *FHTTPTransportBuilder) WithRequestHeadersFromFContext(getRequestHeaders GetHeadersWithContext) *FHTTPTransportBuilder

withRequestHeadersFromFContext adds custom request headers to each request with a provided function that accepts an FContext and returns map of string key-value pairs

func (*FHTTPTransportBuilder) WithRequestSizeLimit

func (h *FHTTPTransportBuilder) WithRequestSizeLimit(requestSizeLimit uint) *FHTTPTransportBuilder

WithRequestSizeLimit adds a request size limit. If set to 0 (the default), there is no size limit on requests.

func (*FHTTPTransportBuilder) WithResponseSizeLimit

func (h *FHTTPTransportBuilder) WithResponseSizeLimit(responseSizeLimit uint) *FHTTPTransportBuilder

WithResponseSizeLimit adds a response size limit. If set to 0 (the default), there is no size limit on responses.

type FNatsPublisherTransportFactory

type FNatsPublisherTransportFactory struct {
	// contains filtered or unexported fields
}

FNatsPublisherTransportFactory creates FNatsPublisherTransports.

func NewFNatsPublisherTransportFactory

func NewFNatsPublisherTransportFactory(conn *nats.Conn) *FNatsPublisherTransportFactory

NewFNatsPublisherTransportFactory creates an FNatsPublisherTransportFactory using the provided NATS connection.

func (*FNatsPublisherTransportFactory) GetTransport

GetTransport creates a new NATS FPublisherTransport.

type FNatsServerBuilder

type FNatsServerBuilder struct {
	// contains filtered or unexported fields
}

FNatsServerBuilder configures and builds NATS server instances.

func NewFNatsServerBuilder

func NewFNatsServerBuilder(conn *nats.Conn, processor FProcessor,
	protoFactory *FProtocolFactory, subjects []string) *FNatsServerBuilder

NewFNatsServerBuilder creates a builder which configures and builds NATS server instances.

func (*FNatsServerBuilder) Build

func (f *FNatsServerBuilder) Build() FServer

Build a new configured NATS FServer.

func (*FNatsServerBuilder) WithHighWatermark

func (f *FNatsServerBuilder) WithHighWatermark(highWatermark time.Duration) *FNatsServerBuilder

WithHighWatermark controls the time duration requests wait in queue before triggering slow consumer logic.

func (*FNatsServerBuilder) WithQueueGroup

func (f *FNatsServerBuilder) WithQueueGroup(queue string) *FNatsServerBuilder

WithQueueGroup adds a NATS queue group to receive requests on.

func (*FNatsServerBuilder) WithQueueLength

func (f *FNatsServerBuilder) WithQueueLength(queueLength uint) *FNatsServerBuilder

WithQueueLength controls the length of the work queue used to buffer requests.

func (*FNatsServerBuilder) WithRequestFinishedEventHandler

func (f *FNatsServerBuilder) WithRequestFinishedEventHandler(handler func(map[interface{}]interface{})) *FNatsServerBuilder

WithRequestFinishedEventHandler sets a function to be called after the FNatsServer processes a message.

If the same functionality can be accomplished through middleware, middleware is preferred as it is more flexible and more portable between different servers. This function should only handle events and behaviour specific to an FNatsServer that aren't applicable to other frugal servers.

func (*FNatsServerBuilder) WithRequestReceivedEventHandler

func (f *FNatsServerBuilder) WithRequestReceivedEventHandler(handler func(map[interface{}]interface{})) *FNatsServerBuilder

WithRequestReceivedEventHandler sets a function to be called when the FNatsServer receives a message, but before it is put onto a work queue. The properties map will be set on the FContext before processing is started.

If the same functionality can be accomplished through middleware, middleware is preferred as it is more flexible and more portable between different servers. This function should only handle events and behaviour specific to an FNatsServer that aren't applicable to other frugal servers.

func (*FNatsServerBuilder) WithRequestStartedEventHandler

func (f *FNatsServerBuilder) WithRequestStartedEventHandler(handler func(map[interface{}]interface{})) *FNatsServerBuilder

WithRequestStartedEventHandler sets a function to be called before the FNatsServer processes a message. The properties map will be set on the FContext before processing begins.

If the same functionality can be accomplished through middleware, middleware is preferred as it is more flexible and more portable between different servers. This function should only handle events and behaviour specific to an FNatsServer that aren't applicable to other frugal servers.

func (*FNatsServerBuilder) WithWorkerCount

func (f *FNatsServerBuilder) WithWorkerCount(workerCount uint) *FNatsServerBuilder

WithWorkerCount controls the number of goroutines used to process requests.

type FNatsSubscriberFactoryBuilder

type FNatsSubscriberFactoryBuilder struct {
	// contains filtered or unexported fields
}

FNatsSubscriberFactoryBuilder configures and builds NATS subscribers.

func NewFNatsSubscriberFactoryBuilder

func NewFNatsSubscriberFactoryBuilder(conn *nats.Conn) *FNatsSubscriberFactoryBuilder

NewFNatsSubscriberFactoryBuilder creates a builder with default settings for creating a NATS subscriber

func (*FNatsSubscriberFactoryBuilder) Build

Build a new NATS subscriber.

func (*FNatsSubscriberFactoryBuilder) WithQueue

WithQueue sets the queue group in NATS that the subscriber will join

func (*FNatsSubscriberFactoryBuilder) WithQueueLength

WithQueueLength controls the length of the work queue used to buffer messages.

func (*FNatsSubscriberFactoryBuilder) WithWorkerCount

WithWorkerCount sets the number of workers in goroutines will be created for the subscriber to process messages

type FNatsSubscriberTransportFactory

type FNatsSubscriberTransportFactory struct {
	// contains filtered or unexported fields
}

FNatsSubscriberTransportFactory creates FNatsSubscriberTransports.

func NewFNatsSubscriberTransportFactory

func NewFNatsSubscriberTransportFactory(conn *nats.Conn) *FNatsSubscriberTransportFactory

NewFNatsSubscriberTransportFactory creates an FNatsSubscriberTransportFactory using the provided NATS connection. Subscribers using this transport will not use a queue.

func NewFNatsSubscriberTransportFactoryWithQueue

func NewFNatsSubscriberTransportFactoryWithQueue(conn *nats.Conn, queue string) *FNatsSubscriberTransportFactory

NewFNatsSubscriberTransportFactoryWithQueue creates an FNatsSubscriberTransportFactory using the provided NATS connection. Subscribers using this transport will subscribe to the provided queue, forming a queue group. When a queue group is formed, only one member receives the message.

func (*FNatsSubscriberTransportFactory) GetTransport

GetTransport creates a new NATS FSubscriberTransport.

type FProcessor

type FProcessor interface {
	// Process the request from the input protocol and write the response to
	// the output protocol.
	Process(in, out *FProtocol) error

	// AddMiddleware adds the given ServiceMiddleware to the FProcessor. This
	// should only be called before the server is started.
	AddMiddleware(ServiceMiddleware)

	// Annotations returns a map of method name to annotations as defined in
	// the service IDL that is serviced by this processor.
	Annotations() map[string]map[string]string
}

FProcessor is Frugal's equivalent of Thrift's TProcessor. It's a generic object which operates upon an input stream and writes to an output stream. Specifically, an FProcessor is provided to an FServer in order to wire up a service handler to process requests.

type FProcessorFunction

type FProcessorFunction interface {
	// Process the request from the input protocol and write the response to
	// the output protocol.
	Process(ctx FContext, in, out *FProtocol) error

	// AddMiddleware adds the given ServiceMiddleware to the
	// FProcessorFunction. This should only be called before the server is
	// started.
	AddMiddleware(middleware ServiceMiddleware)
}

FProcessorFunction is used internally by generated code. An FProcessor registers an FProcessorFunction for each service method. Like FProcessor, an FProcessorFunction exposes a single process call, which is used to handle a method invocation.

type FProtocol

type FProtocol struct {
	thrift.TProtocol
	// contains filtered or unexported fields
}

FProtocol is Frugal's equivalent of Thrift's TProtocol. It defines the serialization protocol used for messages, such as JSON, binary, etc. FProtocol actually extends TProtocol and adds support for serializing FContext. In practice, FProtocol simply wraps a TProtocol and uses Thrift's built-in serialization. FContext is encoded before the TProtocol serialization of the message using a simple binary protocol. See the protocol documentation for more details.

func (*FProtocol) ReadRequestHeader

func (f *FProtocol) ReadRequestHeader() (FContext, error)

ReadRequestHeader reads the request headers on the protocol into a returned Context

func (*FProtocol) ReadResponseHeader

func (f *FProtocol) ReadResponseHeader(ctx FContext) error

ReadResponseHeader reads the response headers on the protocol into a provided Context

func (*FProtocol) WriteRequestHeader

func (f *FProtocol) WriteRequestHeader(ctx FContext) error

WriteRequestHeader writes the request headers set on the given Context into the protocol

func (*FProtocol) WriteResponseHeader

func (f *FProtocol) WriteResponseHeader(ctx FContext) error

WriteResponseHeader writes the response headers set on the given Context into the protocol

type FProtocolFactory

type FProtocolFactory struct {
	// contains filtered or unexported fields
}

FProtocolFactory creates new FProtocol instances. It takes a TProtocolFactory and a TTransport and returns an FProtocol which wraps a TProtocol produced by the TProtocolFactory. The TProtocol itself wraps the provided TTransport. This makes it easy to produce an FProtocol which uses any existing Thrift transports and protocols in a composable manner.

func NewFProtocolFactory

func NewFProtocolFactory(protoFactory thrift.TProtocolFactory) *FProtocolFactory

NewFProtocolFactory creates a new FProtocolFactory with the given TProtocolFactory.

func (*FProtocolFactory) GetProtocol

func (f *FProtocolFactory) GetProtocol(tr thrift.TTransport) *FProtocol

GetProtocol returns a new FProtocol instance using the given TTransport.

type FPublisherTransport

type FPublisherTransport interface {
	// Open opens the transport.
	Open() error

	// Close closes the transport.
	Close() error

	// IsOpen returns true if the transport is open, false otherwise.
	IsOpen() bool

	// GetPublishSizeLimit returns the maximum allowable size of a payload
	// to be published. 0 is returned to indicate an unbounded allowable size.
	GetPublishSizeLimit() uint

	// Publish sends the given payload with the transport. Implementations
	// of publish should be threadsafe.
	Publish(string, []byte) error
}

FPublisherTransport is used exclusively for pub/sub scopes. Publishers use it to publish messages to a topic.

func NewNatsFPublisherTransport

func NewNatsFPublisherTransport(conn *nats.Conn) FPublisherTransport

NewNatsFPublisherTransport creates a new FPublisherTransport which is used for publishing with scopes.

type FPublisherTransportFactory

type FPublisherTransportFactory interface {
	GetTransport() FPublisherTransport
}

FPublisherTransportFactory produces FPublisherTransports and is typically used by an FScopeProvider.

type FScopeProvider

type FScopeProvider struct {
	// contains filtered or unexported fields
}

FScopeProvider produces FScopeTransports and FProtocols for use by pub/sub scopes. It does this by wrapping an FScopeTransportFactory and FProtocolFactory. This also provides a shim for adding middleware to a publisher or subscriber.

func NewFScopeProvider

NewFScopeProvider creates a new FScopeProvider using the given factories.

func (*FScopeProvider) GetMiddleware

func (p *FScopeProvider) GetMiddleware() []ServiceMiddleware

GetMiddleware returns the ServiceMiddleware stored on this FScopeProvider.

func (*FScopeProvider) NewPublisher

func (p *FScopeProvider) NewPublisher() (FPublisherTransport, *FProtocolFactory)

NewPublisher returns a new FPublisherTransport and FProtocol used by scope publishers.

func (*FScopeProvider) NewSubscriber

func (p *FScopeProvider) NewSubscriber() (FSubscriberTransport, *FProtocolFactory)

NewSubscriber returns a new FSubscriberTransport and FProtocolFactory used by scope subscribers.

type FServer

type FServer interface {
	// Serve starts the server.
	Serve() error

	// Stop the server. This is optional on a per-implementation basis. Not all
	// servers are required to be cleanly stoppable.
	Stop() error
}

FServer is Frugal's equivalent of Thrift's TServer. It's used to run a Frugal RPC service by executing an FProcessor on client connections.

type FServiceProvider

type FServiceProvider struct {
	// contains filtered or unexported fields
}

FServiceProvider produces FTransports and FProtocolFactories for use by RPC service clients. The main purpose of this is to provide a shim for adding middleware to a client.

func NewFServiceProvider

func NewFServiceProvider(transport FTransport, protocolFactory *FProtocolFactory, middleware ...ServiceMiddleware) *FServiceProvider

NewFServiceProvider creates a new FServiceProvider containing the given FTransport and FProtocolFactory.

func (*FServiceProvider) GetMiddleware

func (f *FServiceProvider) GetMiddleware() []ServiceMiddleware

GetMiddleware returns the ServiceMiddleware stored on this FServiceProvider.

func (*FServiceProvider) GetProtocolFactory

func (f *FServiceProvider) GetProtocolFactory() *FProtocolFactory

GetProtocolFactory returns the contained FProtocolFactory.

func (*FServiceProvider) GetTransport

func (f *FServiceProvider) GetTransport() FTransport

GetTransport returns the contained FTransport.

type FSimpleServer

type FSimpleServer struct {
	// contains filtered or unexported fields
}

FSimpleServer is a simple FServer which starts a goroutine for each connection.

func NewFSimpleServer

func NewFSimpleServer(
	processor FProcessor,
	serverTransport thrift.TServerTransport,
	protocolFactory *FProtocolFactory) *FSimpleServer

NewFSimpleServer creates a new FSimpleServer which is a simple FServer that starts a goroutine for each connection.

func (*FSimpleServer) Serve

func (p *FSimpleServer) Serve() error

Serve starts the server.

func (*FSimpleServer) Stop

func (p *FSimpleServer) Stop() error

Stop the server.

type FStandardClient

type FStandardClient struct {
	// contains filtered or unexported fields
}

FStandardClient implements FClient, and uses the standard message format for Frugal.

func NewFScopeClient

func NewFScopeClient(provider *FScopeProvider) *FStandardClient

NewFScopeClient ...

func NewFStandardClient

func NewFStandardClient(provider *FServiceProvider) *FStandardClient

NewFStandardClient implements FClient, and uses the standard message format for Frugal.

func (*FStandardClient) Call

func (client *FStandardClient) Call(fctx FContext, method string, args, result thrift.TStruct) error

Call invokes a service and waits for a response.

func (*FStandardClient) Close

func (client *FStandardClient) Close() error

Close ...

func (*FStandardClient) Oneway

func (client *FStandardClient) Oneway(fctx FContext, method string, args thrift.TStruct) error

Oneway sends a message to a service, without waiting for a response.

func (*FStandardClient) Open

func (client *FStandardClient) Open() error

Open ...

func (*FStandardClient) Publish

func (client *FStandardClient) Publish(fctx FContext, op, topic string, message thrift.TStruct) error

Publish sends a message to a topic.

type FStompPublisherTransportFactoryBuilder

type FStompPublisherTransportFactoryBuilder struct {
	// contains filtered or unexported fields
}

func NewFStompPublisherTransportFactoryBuilder

func NewFStompPublisherTransportFactoryBuilder(conn *stomp.Conn) *FStompPublisherTransportFactoryBuilder

NewFStompPublisherTransportFactoryBuilder creates a builder for FStompPublisherTransportFactories.

func (*FStompPublisherTransportFactoryBuilder) Build

Build creates an FStompPublisherTransportFactory with the configured settings.

func (*FStompPublisherTransportFactoryBuilder) WithMaxPublishSize

WithMaxPublishSize allows setting the maximum size of a message this transport will allow to be published.

func (*FStompPublisherTransportFactoryBuilder) WithTopicPrefix

WithTopicPrefix allows setting a string to be added to the beginning of the constructed topic.

type FStompSubscriberTransportFactoryBuilder

type FStompSubscriberTransportFactoryBuilder struct {
	// contains filtered or unexported fields
}

func NewFStompSubscriberTransportFactoryBuilder

func NewFStompSubscriberTransportFactoryBuilder(conn *stomp.Conn) *FStompSubscriberTransportFactoryBuilder

func (*FStompSubscriberTransportFactoryBuilder) Build

func (*FStompSubscriberTransportFactoryBuilder) WithTopicPrefix

func (*FStompSubscriberTransportFactoryBuilder) WithUseQueues

type FSubscriberTransport

type FSubscriberTransport interface {
	// Subscribe opens the transport and sets the subscribe topic.
	Subscribe(string, FAsyncCallback) error

	// Unsubscribe unsubscribes from the topic and closes the transport.
	Unsubscribe() error

	// IsSubscribed returns true if the transport is subscribed to a topic,
	// false otherwise.
	IsSubscribed() bool
}

FSubscriberTransport is used exclusively for pub/sub scopes. Subscribers use it to subscribe to a pub/sub topic.

func NewNatsFSubscriberTransport

func NewNatsFSubscriberTransport(conn *nats.Conn) FSubscriberTransport

NewNatsFSubscriberTransport creates a new FSubscriberTransport which is used for pub/sub. Subscribers using this transport will not use a queue.

func NewNatsFSubscriberTransportWithQueue

func NewNatsFSubscriberTransportWithQueue(conn *nats.Conn, queue string) FSubscriberTransport

NewNatsFSubscriberTransportWithQueue creates a new FSubscriberTransport which is used for pub/sub. Subscribers using this transport will subscribe to the provided queue, forming a queue group. When a queue group is formed, only one member receives the message.

type FSubscriberTransportFactory

type FSubscriberTransportFactory interface {
	GetTransport() FSubscriberTransport
}

FSubscriberTransportFactory produces FSubscriberTransports and is typically used by an FScopeProvider.

type FSubscription

type FSubscription struct {
	// contains filtered or unexported fields
}

FSubscription is a subscription to a pub/sub topic created by a scope. The topic subscription is actually handled by an FScopeTransport, which the FSubscription wraps. Each FSubscription should have its own FScopeTransport. The FSubscription is used to unsubscribe from the topic.

func NewFSubscription

func NewFSubscription(topic string, transport FSubscriberTransport) *FSubscription

NewFSubscription creates a new FSubscription to the given topic which should be subscribed on the given FScopeTransport. This is to be used by generated code and should not be called directly.

func (*FSubscription) Remove

func (s *FSubscription) Remove() error

Remove unsubscribes and removes durably stored information on the broker, if applicable.

func (*FSubscription) Topic

func (s *FSubscription) Topic() string

Topic returns the subscription topic name.

func (*FSubscription) Unsubscribe

func (s *FSubscription) Unsubscribe() error

Unsubscribe from the topic.

type FTransport

type FTransport interface {
	// SetMonitor starts a monitor that can watch the health of, and reopen,
	// the transport.
	SetMonitor(FTransportMonitor)

	// Closed channel receives the cause of an FTransport close (nil if clean
	// close).
	Closed() <-chan error

	// Open prepares the transport to send data.
	Open() error

	// IsOpen returns true if the transport is open, false otherwise.
	IsOpen() bool

	// Close closes the transport.
	Close() error

	// Oneway transmits the given data and doesn't wait for a response.
	// Implementations of oneway should be threadsafe and respect the timeout
	// present on the context.
	Oneway(ctx FContext, payload []byte) error

	// Request transmits the given data and waits for a response.
	// Implementations of request should be threadsafe and respect the timeout
	// present on the context.
	Request(ctx FContext, payload []byte) (thrift.TTransport, error)

	// GetRequestSizeLimit returns the maximum number of bytes that can be
	// transmitted. Returns a non-positive number to indicate an unbounded
	// allowable size.
	GetRequestSizeLimit() uint
}

FTransport is Frugal's equivalent of Thrift's TTransport. FTransport is comparable to Thrift's TTransport in that it represents the transport layer for frugal clients. However, frugal is callback based and sends only framed data. Due to this, instead of read, write, and flush methods, FTransport has a send method that sends framed frugal messages. To handle callback data, an FTransport also has an FRegistry, so it provides methods for registering and unregistering an FAsyncCallback to an FContext.

func NewAdapterTransport

func NewAdapterTransport(tr thrift.TTransport) FTransport

NewAdapterTransport returns an FTransport which uses the given TTransport for read/write operations in a way that is compatible with Frugal. This allows TTransports which support blocking reads to work with Frugal by starting a goroutine that reads from the underlying transport and calling the registry on received frames.

func NewFNatsTransport

func NewFNatsTransport(conn *nats.Conn, subject, inbox string) FTransport

NewFNatsTransport returns a new FTransport which uses the NATS messaging system as the underlying transport. This FTransport is stateless in that there is no connection maintained between the client and server. A request is simply published to a subject and responses are received on another subject. This requires requests and responses to fit within a single NATS message.

type FTransportFactory

type FTransportFactory interface {
	GetTransport(tr thrift.TTransport) FTransport
}

FTransportFactory produces FTransports by wrapping a provided TTransport.

func NewAdapterTransportFactory

func NewAdapterTransportFactory() FTransportFactory

NewAdapterTransportFactory creates a new FTransportFactory which produces an FTransport implementation that acts as an adapter for thrift.TTransport. This allows TTransports which support blocking reads to work with Frugal by starting a goroutine that reads from the underlying transport and calling the registry on received frames.

type FTransportMonitor

type FTransportMonitor interface {
	// OnClosedCleanly is called when the transport is closed cleanly by a call
	// to Close()
	OnClosedCleanly()

	// OnClosedUncleanly is called when the transport is closed for a reason
	// *other* than a call to Close(). Returns whether to try reopening the
	// transport and, if so, how long to wait before making the attempt.
	OnClosedUncleanly(cause error) (reopen bool, wait time.Duration)

	// OnReopenFailed is called when an attempt to reopen the transport fails.
	// Given the number of previous attempts to re-open the transport and the
	// length of the previous wait. Returns whether to attempt to re-open the
	// transport, and how long to wait before making the attempt.
	OnReopenFailed(prevAttempts uint, prevWait time.Duration) (reopen bool, wait time.Duration)

	// OnReopenSucceeded is called after the transport has been successfully
	// re-opened.
	OnReopenSucceeded()
}

FTransportMonitor watches and heals an FTransport. It exposes a number of hooks which can be used to add logic around FTransport events, such as unexpected disconnects, expected disconnects, failed reconnects, and successful reconnects.

Most Frugal implementations include a base FTransportMonitor which implements basic reconnect logic with backoffs and max attempts. This can be extended or reimplemented to provide custom logic.

func NewDefaultFTransportMonitor

func NewDefaultFTransportMonitor() FTransportMonitor

NewDefaultFTransportMonitor creates a new FTransportMonitor with default reconnect options (attempts to reconnect 60 times with 2 seconds between each attempt).

type GetHeadersWithContext

type GetHeadersWithContext func(FContext) map[string]string

type InvocationHandler

type InvocationHandler func(service reflect.Value, method reflect.Method, args Arguments) Results

InvocationHandler processes a service method invocation on a proxy instance and returns the result. The args and return value should match the arity of the proxied method and have the same types. The first argument will always be the FContext.

type Method

type Method struct {
	// contains filtered or unexported fields
}

Method contains an InvocationHandler and a handle to the method it proxies. This should only be used by generated code.

func NewMethod

func NewMethod(proxiedHandler, method interface{}, methodName string, middleware []ServiceMiddleware) *Method

NewMethod creates a new Method which proxies the given handler. ProxiedHandler must be a struct and method must be a function. This should only be called by generated code.

func (*Method) AddMiddleware

func (m *Method) AddMiddleware(middleware ServiceMiddleware)

AddMiddleware wraps the Method with the given ServiceMiddleware. This should only be called by generated code.

func (*Method) Invoke

func (m *Method) Invoke(args Arguments) Results

Invoke the Method and return its results. This should only be called by generated code.

type Results

type Results []interface{}

Results contains the return values from a service method invocation. The last return value will always be an error (or nil).

func (Results) Error

func (r Results) Error() error

Error returns the last return value as an error.

func (Results) SetError

func (r Results) SetError(err error)

SetError sets the last return value as the given error. This will result in a panic if Results has not been properly allocated. Also note that returned errors should match your IDL definition.

type ServiceMiddleware

type ServiceMiddleware func(InvocationHandler) InvocationHandler

ServiceMiddleware is used to implement interceptor logic around API calls. This can be used, for example, to implement retry policies on service calls, logging, telemetry, or authentication and authorization. ServiceMiddleware can be applied to both RPC services and pub/sub scopes.

ServiceMiddleware returns an InvocationHandler which proxies the given InvocationHandler. This can be used to apply middleware logic around a service call.

type TFramedTransport

type TFramedTransport struct {
	// contains filtered or unexported fields
}

TFramedTransport is an implementation of thrift.TTransport which frames messages with their size.

func NewTFramedTransport

func NewTFramedTransport(transport thrift.TTransport) *TFramedTransport

NewTFramedTransport creates a new TFramedTransport wrapping the given TTransport.

func NewTFramedTransportMaxLength

func NewTFramedTransportMaxLength(transport thrift.TTransport, maxLength uint32) *TFramedTransport

NewTFramedTransportMaxLength creates a new TFramedTransport wrapping the given TTransport using the given max length.

func (*TFramedTransport) Close

func (p *TFramedTransport) Close() error

Close the transport.

func (*TFramedTransport) Flush

func (p *TFramedTransport) Flush(ctx context.Context) error

Flush the transport.

func (*TFramedTransport) IsOpen

func (p *TFramedTransport) IsOpen() bool

IsOpen checks if the transport is open.

func (*TFramedTransport) Open

func (p *TFramedTransport) Open() error

Open the transport.

func (*TFramedTransport) Read

func (p *TFramedTransport) Read(buf []byte) (l int, err error)

Read from the transport.

func (*TFramedTransport) RemainingBytes

func (p *TFramedTransport) RemainingBytes() uint64

RemainingBytes returns the current frame size.

func (*TFramedTransport) Write

func (p *TFramedTransport) Write(buf []byte) (int, error)

Write to the transport.

type TMemoryOutputBuffer

type TMemoryOutputBuffer struct {
	*thrift.TMemoryBuffer
	// contains filtered or unexported fields
}

TMemoryOutputBuffer implements TTransport using a bounded memory buffer. Writes which cause the buffer to exceed its size return ErrTooLarge. The TMemoryOutputBuffer handles framing data.

func NewTMemoryOutputBuffer

func NewTMemoryOutputBuffer(size uint) *TMemoryOutputBuffer

NewTMemoryOutputBuffer returns a new TFramedMemoryBuffer with the given size limit. If the provided limit is non-positive, the buffer is allowed to grow unbounded.

func (*TMemoryOutputBuffer) Bytes

func (f *TMemoryOutputBuffer) Bytes() []byte

Bytes retrieves the framed contents of the buffer.

func (*TMemoryOutputBuffer) HasWriteData

func (f *TMemoryOutputBuffer) HasWriteData() bool

HasWriteData determines if there's any data in the buffer to send.

func (*TMemoryOutputBuffer) Reset

func (f *TMemoryOutputBuffer) Reset()

Reset clears the buffer

func (*TMemoryOutputBuffer) Write

func (f *TMemoryOutputBuffer) Write(buf []byte) (int, error)

Write the data to the buffer. Returns ErrTooLarge if the write would cause the buffer to exceed its limit.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL