common

package
v1.1.1039 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: MIT Imports: 33 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrStreamCancel = errors.New("stream cancelled")

ErrStreamCancel is an error variable that represents a stream cancellation.

Functions

func CheckVersion added in v1.1.927

func CheckVersion(ctx context.Context, nc *nats.Conn) error

CheckVersion checks the NATS server version against a minimum supported version

func ContextLoggerWithWfState added in v1.1.754

func ContextLoggerWithWfState(ctx context.Context, state *model.WorkflowState) (context.Context, *slog.Logger)

ContextLoggerWithWfState will populate a context with relevant fields from a WorkflowState model

func CopyWorkflowState added in v1.0.446

func CopyWorkflowState(state *model.WorkflowState) *model.WorkflowState

CopyWorkflowState - clones a proto model.WorkflowState for modification.

func Delete added in v0.1.192

func Delete(ctx context.Context, kv jetstream.KeyValue, key string) error

Delete deletes an item from a key value store.

func DeleteLarge added in v1.0.498

func DeleteLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, k string) error

DeleteLarge deletes a large binary from the object store

func DropStateParams added in v1.0.451

func DropStateParams(state *model.WorkflowState)

DropStateParams removes any parameters unsafe to send across a state transition.

func ElementTable added in v0.1.134

func ElementTable(process *model.Workflow) map[string]*model.Element

ElementTable indexes an entire process for quick ID lookups

func EnsureBucket added in v1.0.477

func EnsureBucket(ctx context.Context, js jetstream.JetStream, storageType jetstream.StorageType, name string, ttl time.Duration) error

EnsureBucket creates a bucket if it does not exist

func EnsureBuckets

func EnsureBuckets(ctx context.Context, js jetstream.JetStream, storageType jetstream.StorageType, names []string) error

EnsureBuckets ensures that a list of key value stores exist

func ExtendLock added in v1.0.477

func ExtendLock(ctx context.Context, kv jetstream.KeyValue, lockID string) error

ExtendLock extends the lock past its stale time.

func IndexProcessElements added in v0.1.134

func IndexProcessElements(elements []*model.Element, el map[string]*model.Element)

IndexProcessElements is the recursive part of the index

func KSuidTo128bit added in v0.1.78

func KSuidTo128bit(k string) [16]byte

KSuidTo128bit returns a KSuid as bytes.

func KSuidTo64bit added in v0.1.78

func KSuidTo64bit(k string) [8]byte

KSuidTo64bit takes the most variable 64 bits of a KSuid and returns them as bytes.

func KeyPrefixSearch added in v1.1.927

func KeyPrefixSearch(ctx context.Context, js jetstream.JetStream, kv jetstream.KeyValue, prefix string, opts KeyPrefixResultOpts) ([]string, error)

KeyPrefixSearch searches for keys in a key-value store that have a specified prefix. It retrieves the keys by querying the JetStream stream associated with the key-value store. It returns a slice of strings containing the keys, and an error if any.

func Load

func Load(ctx context.Context, wf jetstream.KeyValue, k string) ([]byte, error)

Load loads a value from a key value store

func LoadLarge added in v1.0.498

func LoadLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, key string, opt ...jetstream.GetObjectOpt) ([]byte, error)

LoadLarge load a large binary from the object store

func LoadLargeObj added in v1.0.498

LoadLargeObj loads a protobuf message from a key value store

func LoadObj

func LoadObj(ctx context.Context, wf jetstream.KeyValue, k string, v proto.Message) error

LoadObj loads a protobuf message from a key value store

func Lock added in v1.0.477

func Lock(ctx context.Context, kv jetstream.KeyValue, lockID string) (bool, error)

Lock ensures a lock on a given ID, it returns true if a lock was granted.

func Log added in v1.0.271

func Log(ctx context.Context, js nats.JetStream, trackingID string, source model.LogSource, severity messages.WorkflowLogLevel, code int32, message string, attrs map[string]string) error

Log is the generic metod to output to SHAR telemetry.

func NewOtelHandler added in v1.1.754

func NewOtelHandler() (slog.Handler, func() error)

NewOtelHandler constructs and initialises an otel handler for log exports

func NewSharHandler added in v1.1.754

func NewSharHandler(opts HandlerOptions, logPublisher LogPublisher) slog.Handler

NewSharHandler will return a new instance of a SharHandler

func NewTextHandler added in v1.1.754

func NewTextHandler(level slog.Level, addSource bool) slog.Handler

NewTextHandler initialises a text handler writing to stdout for slog

func Process added in v0.1.78

func Process(
	ctx context.Context,
	js jetstream.JetStream,
	streamName string,
	traceName string,
	closer chan struct{},
	subject string,
	durable string,
	concurrency int,
	middleware []middleware.Receive,
	fn func(ctx context.Context, log *slog.Logger, msg jetstream.Msg) (bool, error),
	opts ...ProcessOption,
) error

Process processes messages from a nats consumer and executes a function against each one.

func PublishObj added in v1.1.754

func PublishObj(ctx context.Context, conn NatsConn, subject string, prot proto.Message, middlewareFn func(*nats.Msg) error) error

PublishObj publishes a proto message to a subject.

func PublishOnce added in v1.1.732

func PublishOnce(ctx context.Context, js jetstream.JetStream, lockingKV jetstream.KeyValue, streamName string, consumerName string, msg *nats.Msg) error

PublishOnce sets up a single message to be used as a timer.

func RemoveWhere added in v1.0.484

func RemoveWhere[T comparable](slice []T, fn func(T) bool) []T

RemoveWhere removes an array element based upon a condition.

func Save

func Save(ctx context.Context, wf jetstream.KeyValue, k string, v []byte) error

Save saves a value to a key value store

func SaveLarge added in v1.0.498

func SaveLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, key string, data []byte) error

SaveLarge saves a large binary from the object store

func SaveLargeObj added in v1.0.498

func SaveLargeObj(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, k string, v proto.Message, opt ...nats.ObjectOpt) error

SaveLargeObj save an protobuf message to a document store

func SaveObj

func SaveObj(ctx context.Context, wf jetstream.KeyValue, k string, v proto.Message) error

SaveObj save an protobuf message to a key value store

func StreamingReplyClient added in v1.1.975

func StreamingReplyClient(ctx context.Context, nc *nats.Conn, msg *nats.Msg, fn func(msg *nats.Msg) error) error

StreamingReplyClient establishes a streaming reply client. It creates a subscription for replies and invokes a callback function for each received message.

func StreamingReplyServer added in v1.1.975

func StreamingReplyServer(nc streamNatsReplyconnection, subject string, fn func(req *nats.Msg, ret chan *nats.Msg, errs chan error)) (*nats.Subscription, error)

StreamingReplyServer is a function that sets up a NATS subscription to handle streaming reply messages. When a message is received, it begins streaming by creating channels for return messages and error messages. It then executes the provided function to process the request and send the response messages. The function runs in a separate goroutine. It continuously listens for return messages and error messages, and publishes them to the reply inbox. It exits when an error or cancellation occurs. The function returns the NATS subscription and any error that occurred during setup.

func UnLock added in v1.0.477

func UnLock(ctx context.Context, kv jetstream.KeyValue, lockID string) error

UnLock closes an existing lock.

func UpdateLargeObj added in v1.0.498

func UpdateLargeObj[T proto.Message](ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, k string, msg T, updateFn func(v T) (T, error)) (T, error)

UpdateLargeObj saves an protobuf message to a document store after using updateFN to update the message.

func UpdateObj

func UpdateObj[T proto.Message](ctx context.Context, wf jetstream.KeyValue, k string, msg T, updateFn func(v T) (T, error)) error

UpdateObj saves an protobuf message to a key value store after using updateFN to update the message.

func UpdateObjIsNew added in v1.0.446

func UpdateObjIsNew[T proto.Message](ctx context.Context, wf jetstream.KeyValue, k string, msg T, updateFn func(v T) (T, error)) (bool, error)

UpdateObjIsNew saves an protobuf message to a key value store after using updateFN to update the message, and returns true if this is a new value.

Types

type BackoffFn added in v1.0.623

type BackoffFn func(ctx context.Context, msg jetstream.Msg) error

BackoffFn represents a function that completely handles the backoff for a message including ACK/NAK

type BackoffProcessOption added in v1.0.623

type BackoffProcessOption struct {
	// contains filtered or unexported fields
}

BackoffProcessOption holds the backoff function. Don't use this directly. Use the convenience function WithBackoffFn

func WithBackoffFn added in v1.0.623

func WithBackoffFn(fn BackoffFn) BackoffProcessOption

WithBackoffFn adds a back-off function to message processing

func (BackoffProcessOption) Set added in v1.0.623

func (b BackoffProcessOption) Set(opts *ProcessOpts)

Set the backoff function in the process settings

type HandlerOptions added in v1.1.754

type HandlerOptions struct {
	Level slog.Leveler
}

HandlerOptions provides an ability to configure a shar slog handler

type KeyPrefixResultOpts added in v1.1.927

type KeyPrefixResultOpts struct {
	Sort           bool // Sort the returned values
	ExcludeDeleted bool // ExcludeDeleted filters deleted key-values from the result (cost penalty)¬.
}

KeyPrefixResultOpts represents the options for KeyPrefixSearch function. Sort field indicates whether the returned values should be sorted. ExcludeDeleted field filters out deleted key-values from the result.

type LogPublisher added in v1.1.754

type LogPublisher interface {
	Publish(ctx context.Context, lr *model.LogRequest) error
}

LogPublisher is an interface defining the ability to send a LogRequest to a destination

type MultiHandler added in v1.1.754

type MultiHandler struct {
	Handlers []slog.Handler
}

MultiHandler implements slog.Handler and wraps a collection of slog.Handlers that are delegated to

func NewMultiHandler added in v1.1.754

func NewMultiHandler(handlers []slog.Handler) *MultiHandler

NewMultiHandler creates a new instance of a multi handler

func (*MultiHandler) Enabled added in v1.1.754

func (mh *MultiHandler) Enabled(_ context.Context, _ slog.Level) bool

Enabled always returns true as the actual decision as to whether a log record is emitted is delegated to the wrapped handlers in the Handle method

func (*MultiHandler) Handle added in v1.1.754

func (mh *MultiHandler) Handle(ctx context.Context, r slog.Record) error

Handle will iterate over the slice of wrapped handlers and determine whether that handler is Enabled for the given record log level

func (*MultiHandler) WithAttrs added in v1.1.754

func (mh *MultiHandler) WithAttrs(attrs []slog.Attr) slog.Handler

WithAttrs adds the supplied attrs slice to the delegated handler attrs and returns new instances of them

func (*MultiHandler) WithGroup added in v1.1.754

func (mh *MultiHandler) WithGroup(name string) slog.Handler

WithGroup adds the supplied group name to the delegated handler name and returns new instances of them

type NatsConn

type NatsConn interface {
	QueueSubscribe(subj string, queue string, cb nats.MsgHandler) (*nats.Subscription, error)
	Publish(subj string, bytes []byte) error
	PublishMsg(msg *nats.Msg) error
	Subscribe(subj string, cb nats.MsgHandler) (*nats.Subscription, error)
}

NatsConn is the trimmed down NATS Connection interface that only encompasses the methods used by SHAR

type NatsLogPublisher added in v1.1.754

type NatsLogPublisher struct {
	Conn *nats.Conn
}

NatsLogPublisher is an impl of LogPublisher sending a LogRequest to a destination nats subject

func (*NatsLogPublisher) Publish added in v1.1.754

func (nlp *NatsLogPublisher) Publish(ctx context.Context, lr *model.LogRequest) error

Publish writes a LogRequest to a Nats subject

type NatsMsgWrapper added in v1.1.927

type NatsMsgWrapper struct {
	jetstream.Msg
	// contains filtered or unexported fields
}

NatsMsgWrapper is a wrapper type that combines the jetstream.Msg and nats.Msg types.

func NewNatsMsgWrapper added in v1.1.927

func NewNatsMsgWrapper(msg *nats.Msg) *NatsMsgWrapper

NewNatsMsgWrapper is a function that creates a new instance of NatsMsgWrapper, which is a wrapper type that combines the jetstream.Msg and nats.Msg types.

func (*NatsMsgWrapper) Ack added in v1.1.927

func (w *NatsMsgWrapper) Ack() error

Ack is a method that acknowledges the receipt of the NATS message by calling the underlying nats.Msg's Ack method. If an error occurs while acknowledging the message, it returns an error with a message indicating the failure. Returns nil if the acknowledgement is successful.

func (*NatsMsgWrapper) Data added in v1.1.927

func (w *NatsMsgWrapper) Data() []byte

Data is a method that retrieves the data from the underlying nats.Msg.

func (*NatsMsgWrapper) DoubleAck added in v1.1.927

func (w *NatsMsgWrapper) DoubleAck(context.Context) error

DoubleAck is a method that simulates a double acknowledgement of the NATS message. It returns an error with a message indicating that double ack is not allowed.

func (*NatsMsgWrapper) Headers added in v1.1.927

func (w *NatsMsgWrapper) Headers() nats.Header

Headers is a method that retrieves the headers from the underlying nats.Msg.

func (*NatsMsgWrapper) InProgress added in v1.1.927

func (w *NatsMsgWrapper) InProgress() error

InProgress is a method that indicates that the message is still in progress.

func (*NatsMsgWrapper) Metadata added in v1.1.927

func (w *NatsMsgWrapper) Metadata() (*jetstream.MsgMetadata, error)

Metadata is a method that retrieves the metadata from the underlying nats.Msg.

func (*NatsMsgWrapper) Nak added in v1.1.927

func (w *NatsMsgWrapper) Nak() error

Nak is a method that nak's the message..

func (*NatsMsgWrapper) NakWithDelay added in v1.1.927

func (w *NatsMsgWrapper) NakWithDelay(delay time.Duration) error

NakWithDelay is a method that nak's the message, and will not re-process before delay.

func (*NatsMsgWrapper) Reply added in v1.1.927

func (w *NatsMsgWrapper) Reply() string

Reply is a method that retrieves the reply from the underlying nats.Msg.

func (*NatsMsgWrapper) SetData added in v1.1.927

func (w *NatsMsgWrapper) SetData(b []byte)

SetData is a method that sets the data of the underlying nats.Msg.

func (*NatsMsgWrapper) Subject added in v1.1.927

func (w *NatsMsgWrapper) Subject() string

Subject is a method that retrieves the subject from the underlying nats.Msg.

func (*NatsMsgWrapper) Term added in v1.1.927

func (w *NatsMsgWrapper) Term() error

Term is a method that calls the `Term` method on the underlying `NatsMsgWrapper` instance.

type ProcessOption added in v1.0.623

type ProcessOption interface {
	Set(opts *ProcessOpts)
}

ProcessOption represents an option function that can be passed to message processing.

type ProcessOpts added in v1.0.623

type ProcessOpts struct {
	BackoffCalc BackoffFn
}

ProcessOpts holds the settings for message processing.

type SharHandler added in v1.1.754

type SharHandler struct {
	// contains filtered or unexported fields
}

SharHandler is an implementation of a shar specific slog.Handler

func (*SharHandler) Enabled added in v1.1.754

func (sh *SharHandler) Enabled(_ context.Context, level slog.Level) bool

Enabled determine whether or not a log message is written based on log level

func (*SharHandler) Handle added in v1.1.754

func (sh *SharHandler) Handle(ctx context.Context, r slog.Record) error

Handle will accept an slog.Record, transform to a LogRequest and publish it to nats subject

func (*SharHandler) WithAttrs added in v1.1.754

func (sh *SharHandler) WithAttrs(attrs []slog.Attr) slog.Handler

WithAttrs will append the given attrs slice to any existing attrs stored in the handler and return a new handler instance

func (*SharHandler) WithGroup added in v1.1.754

func (sh *SharHandler) WithGroup(name string) slog.Handler

WithGroup will append the given groupt name to any existing group names stored in the handler and return a new handler instance

type TrackingID added in v1.0.215

type TrackingID []string

TrackingID is an ID stack that maintains the callstack

func (TrackingID) Ancestor added in v1.0.215

func (t TrackingID) Ancestor(gen int) string

Ancestor provides the ID of the caller back <gen> generations.

func (TrackingID) ID added in v1.0.215

func (t TrackingID) ID() string

ID provides the current ID

func (TrackingID) ParentID added in v1.0.215

func (t TrackingID) ParentID() string

ParentID provides the ID of the caller.

func (TrackingID) Pop added in v1.0.215

func (t TrackingID) Pop() TrackingID

Pop removes the current ID from the callstack.

func (TrackingID) Push added in v1.0.215

func (t TrackingID) Push(id string) TrackingID

Push adds a new ID to the callstack.

Directories

Path Synopsis
Package base62 implements base62 encoding.
Package base62 implements base62 encoding.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL