nats

package
v0.17.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

README

Hops/NATS

This package contains NATS utils for interacting with NATS in the context of a Hiphops server/worker/etc.

Documentation

Overview

Creates/manages an in-process NATS server

Currently this is used as part of the hops test suite, but in future it will be leveraged to enable user-side developers to run tests on Hiphops pipelines.

Index

Constants

View Source
const (
	ChannelNotify  = "notify"
	ChannelRequest = "request"

	DefaultConsumerName = "runner"
	// How far back to look for events by default
	DefaultEventLookback = -time.Hour

	// Interest topic which is used by default
	DefaultInterestTopic = "default"

	// Number of events returned max
	GetEventHistoryEventLimit = 100
)
View Source
const AllEventId = ">"
View Source
const DoneMessageId = "done"
View Source
const HopsMessageId = "hops"
View Source
const SourceEventId = "event"

Variables

View Source
var (
	ErrEventFatal          = errors.New("This event cannot be processed")
	ErrIncompleteMsgBundle = errors.New("Unable to fetch complete sequence history")
)

Functions

func CreateSourceEvent added in v0.16.1

func CreateSourceEvent(rawEvent map[string]any, source string, event string, action string, unique string) ([]byte, string, error)

func DoubleAck

func DoubleAck(ctx context.Context, msg jetstream.Msg) error

DoubleAck is a convenience wrapper around NATS acking with a timeout

func EventLogFilterSubject added in v0.14.0

func EventLogFilterSubject(accountId string, interestTopic string, eventFilter string) string

EventLogFilterSubject returns the subject used to get events for display to the user in the UI.

accountId: The account id to filter on eventFilter: either AllEventId or SourceEventId

func HealthChecker added in v0.15.1

func HealthChecker(natsClient *Client) func(w http.ResponseWriter, r *http.Request)

func NotifyFilterSubject added in v0.14.0

func NotifyFilterSubject(accountId string, interestTopic string) string

NotifyFilterSubject returns the filter subject to get notify messages for the account

func ReplayFilterSubject

func ReplayFilterSubject(accountId string, interestTopic string, sequenceId string) string

func RequestFilterSubject added in v0.14.0

func RequestFilterSubject(accountId string, interestTopic string) string

RequestFilterSubject returns the filter subject to get request messages for the account

func SequenceHopsKeyTokens

func SequenceHopsKeyTokens(sequenceId string) []string

func SourceEventSubject

func SourceEventSubject(accountId string, interestTopic string, sequenceId string) string

func WorkerRequestFilterSubject added in v0.14.0

func WorkerRequestFilterSubject(accountId string, interestTopic string, appName string, handler string) string

WorkerRequestFilterSubject returns the filter subject for the worker consumer

Types

type Client

type Client struct {
	Consumers   map[string]jetstream.Consumer
	JetStream   jetstream.JetStream
	NatsConn    *nats.Conn
	SysObjStore nats.ObjectStore
	// contains filtered or unexported fields
}

func NewClient

func NewClient(natsUrl string, accountId string, interestTopic string, logger Logger, clientOpts ...ClientOpt) (*Client, error)

NewClient returns a new hiphops specific NATS client

By default it is configured as a runner consumer (listening for incoming source events) Passing *any* ClientOpts will override this default.

func (*Client) CheckConnection

func (c *Client) CheckConnection() bool

func (*Client) Close

func (c *Client) Close()

func (*Client) Consume

func (c *Client) Consume(ctx context.Context, fromConsumer string, callback jetstream.MessageHandler) error

Consume consumes messages from the HopsNats.Consumers[fromConsumer]

This will block the calling goroutine until the context is cancelled and can be ran as a long-lived service

func (*Client) ConsumeSequences

func (c *Client) ConsumeSequences(ctx context.Context, fromConsumer string, handler SequenceHandler) error

ConsumeSequences is a wrapper around consume that presents the aggregate state of a sequence to the callback instead of individual messages.

func (*Client) DeleteMsgSequence added in v0.16.3

func (c *Client) DeleteMsgSequence(ctx context.Context, msgMeta *MsgMeta) error

DeleteMsgSequence deletes a given message and the entire sequence it is part of

Main use case is preventing build up of source events that do not relate to any configured automation

func (*Client) FetchMessageBundle

func (c *Client) FetchMessageBundle(ctx context.Context, incomingMsg *MsgMeta) (MessageBundle, error)

FetchMessageBundle pulls all historic messages for a sequenceId from the stream, converting them to a message bundle

The returned message bundle will contain all previous messages in addition to the newly received message

func (*Client) GetEventHistory added in v0.7.0

func (c *Client) GetEventHistory(ctx context.Context, start time.Time, sourceOnly bool) ([]*MsgMeta, error)

GetEventHistory pulls historic events, most recent first, from now back to start time.

Times out if events take longer than a second to be received. Only returns the first 100 events. (const GetEventHistoryEventLimit) If sourceOnly is true, only returns source events (i.e. not pipeline events)

func (*Client) GetMsg

func (c *Client) GetMsg(ctx context.Context, subjTokens ...string) (*jetstream.RawStreamMsg, error)

func (*Client) GetSysObject

func (c *Client) GetSysObject(key string) ([]byte, error)

func (*Client) Publish

func (c *Client) Publish(ctx context.Context, data []byte, subjTokens ...string) (*jetstream.PubAck, bool, error)

func (*Client) PublishResult deprecated

func (c *Client) PublishResult(ctx context.Context, startedAt time.Time, result interface{}, err error, subjTokens ...string) (error, bool)

Deprecated: PublishResult is a convenience wrapper that json encodes a ResultMsg and publishes it

In most cases you should use PublishResultWithAck instead, deferring acking of the original messaging until after we've sent a result. This method will be removed in future.

func (*Client) PublishResultWithAck added in v0.15.1

func (c *Client) PublishResultWithAck(ctx context.Context, msg jetstream.Msg, startedAt time.Time, result interface{}, err error, subjTokens ...string) (bool, error)

func (*Client) PutSysObject

func (c *Client) PutSysObject(name string, data []byte) (*nats.ObjectInfo, error)

type ClientOpt

type ClientOpt func(*Client) error

ClientOpt functions configure a nats.Client via NewClient()

func DefaultClientOpts

func DefaultClientOpts() []ClientOpt

DefaultClientOpts configures the hiphops nats.Client as a RunnerClient

func WithLocalRunner added in v0.16.0

func WithLocalRunner(name string) ClientOpt

WithLocalRunner initialises a runner with a randomised interest topic and ephemeral consumer

func WithReplay added in v0.9.1

func WithReplay(name string, sequenceId string) ClientOpt

WithReplay initialises the client with a consumer for replaying a sequence

func WithRunner added in v0.9.0

func WithRunner(name string) ClientOpt

WithRunner initialises the client with a consumer for running pipelines

func WithStreamName added in v0.9.1

func WithStreamName(name string) ClientOpt

WithStreamName overrides the stream name to be used (which defaults to accountId otherwise)

Should be given before any ClientOpts that use the stream, as otherwise they will be initialised with the default stream name

func WithWorker added in v0.9.0

func WithWorker(appName string) ClientOpt

WithWorker initialises the client with a consumer to receive call requests for a worker

type HopsResultMeta

type HopsResultMeta struct {
	Error      string    `json:"error,omitempty"`
	FinishedAt time.Time `json:"finished_at"`
	StartedAt  time.Time `json:"started_at"`
}

HopsResultMeta is metadata included in the top level of a result message

type KeyFile

type KeyFile struct {
	AccountId  string `json:"account_id"`
	Password   string `json:"password"`
	NatsDomain string `json:"nats_domain"`
}

func NewKeyFile

func NewKeyFile(keyfilePath string) (KeyFile, error)

func (*KeyFile) NatsUrl

func (k *KeyFile) NatsUrl() string

type LocalServer

type LocalServer struct {
	NatsServer *server.Server
	ServerOpts *server.Options
}

LocalServer is an in-process hiphops.io style NATS server instance created from a NATS config file.

func NewLocalServer

func NewLocalServer(natsConfigPath string, dataDir string, debug bool, logger server.Logger) (*LocalServer, error)

NewLocalServer starts an in-process nats server from a config file

LocalServer.Close() should be called when finished with the server

func (*LocalServer) AuthUrl

func (l *LocalServer) AuthUrl(accountName string) (string, error)

func (*LocalServer) Close

func (l *LocalServer) Close()

Close shuts down the local nats server, waiting until shutdown is complete

func (*LocalServer) Connect

func (l *LocalServer) Connect(accountName string) (*nats.Conn, error)

Connect establishes a client connection with the local nats server

func (*LocalServer) User

func (l *LocalServer) User(accountName string) (*server.User, error)

type Logger

type Logger interface {
	// Log a debug statement
	Debugf(format string, v ...interface{})

	// Log an error with exact error
	Errf(err error, format string, v ...interface{})

	// Log an error
	Errorf(format string, v ...interface{})

	// Log a fatal error
	Fatalf(format string, v ...interface{})

	// Log an info statement
	Infof(format string, v ...interface{})

	// Log a notice statement
	Noticef(format string, v ...interface{})

	// Log a trace statement
	Tracef(format string, v ...interface{})

	// Log a warning statement
	Warnf(format string, v ...interface{})
}

type MessageBundle

type MessageBundle map[string][]byte

MessageBundle is a map of messageIDs and the data that message contained

MessageBundle is designed to be passed to a runner to ensure it has the aggregate state of a hiphops sequence of messages.

type MsgMeta

type MsgMeta struct {
	AccountId        string
	AppName          string
	Channel          string
	ConsumerSequence uint64
	Done             bool // Message is a pipeline 'done' message
	HandlerName      string
	InterestTopic    string
	MessageId        string
	NumPending       uint64
	SequenceId       string
	StreamSequence   uint64
	Timestamp        time.Time
	// contains filtered or unexported fields
}

func Parse

func Parse(msg jetstream.Msg) (*MsgMeta, error)

func (*MsgMeta) Msg added in v0.7.0

func (m *MsgMeta) Msg() jetstream.Msg

func (*MsgMeta) ResponseSubject

func (m *MsgMeta) ResponseSubject() string

func (*MsgMeta) SequenceFilter

func (m *MsgMeta) SequenceFilter() string

type ResultMsg

type ResultMsg struct {
	Body       string            `json:"body"`
	Completed  bool              `json:"completed"`
	Done       bool              `json:"done"`
	Errored    bool              `json:"errored"`
	Headers    map[string]string `json:"headers,omitempty"`
	Hops       HopsResultMeta    `json:"hops"`
	JSON       interface{}       `json:"json,omitempty"`
	StatusCode int               `json:"status_code,omitempty"`
	URL        string            `json:"url,omitempty"`
}

ResultMsg is the schema for handler call result messages

func NewResultMsg

func NewResultMsg(startedAt time.Time, result interface{}, err error) ResultMsg

type SequenceHandler

type SequenceHandler interface {
	SequenceCallback(context.Context, string, MessageBundle) (bool, error)
}

SequenceHandler is a function that receives the sequenceId and message bundle for a sequence of messages

type SourceMeta added in v0.16.1

type SourceMeta struct {
	Source string `json:"source"`
	Event  string `json:"event"`
	Action string `json:"action"`
	Unique string `json:"unique,omitempty"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL