natty

package module
v0.0.39 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2023 License: MIT Imports: 14 Imported by: 0

README

natty

Go Reference Go Report Card

An opinionated, NATS Jetstream client wrapper lib for Go.

Used by plumber and other Batch applications.

Why

NATS allows you tweak a lot of things - create push or pull streams, durable or ephemeral consumers and all kinds of other settings.

The library exposes several, opinionated, quality-of-life functionality such as:

  • Simplified publish/consume API specifically to be used with NATS-JS
    • Consume() uses ONLY durable consumers (as we want kafka-like behavior)
  • Methods for interacting with key/value store in NATS
    • Ability to perform safe key Refresh()
  • Concurrency / leader election functionality

See the full interface here.

Consume & Publish

This library uses ONLY durable consumers and provides a two method API to interact with your NATS deployment:

  • Consume(ctx context.Context, subject string, errorCh chan error, cb func(msg *nats.Msg)) error
  • Publish(ctx context.Context, subject string, data []byte) error

The Consume() will block and has to be cancelled via context. You can also pass an optional error channel that the lib will write to when the callback func runs into an error.

HasLeader

natty provides an easy way to execute a function only if the instance is the leader for a given bucket and key.

Example:


bucketName := "election-bucket"
keyName := "election-key"

n.AsLeader(context.Background(), natty.AsLeaderConfig{
	Looper:   director.NewFreeLooper(director.Forever, make(error chan, 1)),
	Bucket:   bucketName,
	Key:      keyName,
	NodeName: "node1"
}, func() error {
	fmt.Println("executed by node 1")
})

n.AsLeader(context.Background(), natty.AsLeaderConfig{
	Looper:   director.NewFreeLooper(director.Forever, make(error chan, 1)),
    Bucket:   bucketName, 
    Key:      keyName,
    NodeName: "node2"
}, func() error {
    fmt.Println("executed by node 2")
})

// Only one will be executed

AsLeader uses NATS k/v store to facilitate leader election.

Election Logic

During first execution, all instances running AsLeader() on the same bucket and key will attempt to Create() the leader key - only one will succeed as Create() will error if a key already exists.

On subsequent iterations, each AsLeader() will first check if it is the leader by reading the key in the bucket. If it is the leader, it will Put() the cfg.Key with contents set to cfg.NodeName - the Put() will NOT error if the key already exists.

If the current leader is unable to Put() - it will try again next time until it either succeeds or the key is TTL'd by the bucket policy.

When the bucket TTL is reached, the key will be deleted by NATS at which point, one of the AsLeader() instances Create() call will succeed and they will become the current leader.

TLS NATS

The NATS server started via docker-compose is configured to use TLS (with keys and certs located in ./assets/*).

We are doing NATS w/ TLS purely to ensure that the library will work with it.

Documentation

Index

Constants

View Source
const (
	DefaultAsLeaderBucketTTL              = time.Second * 10
	DefaultAsLeaderElectionLooperInterval = time.Second
	DefaultAsLeaderReplicaCount           = 1
)
View Source
const (
	DefaultMaxMsgs           = 10_000
	DefaultFetchSize         = 100
	DefaultFetchTimeout      = time.Second * 1
	DefaultDeliverPolicy     = nats.DeliverLastPolicy
	DefaultSubBatchSize      = 256
	DefaultWorkerIdleTimeout = time.Minute
	DefaultPublishTimeout    = time.Second * 5 // TODO: figure out a good value for this
)

Variables

View Source
var (
	ErrEmptyStreamName   = errors.New("StreamName cannot be empty")
	ErrEmptyConsumerName = errors.New("ConsumerName cannot be empty")
	ErrEmptySubject      = errors.New("Subject cannot be empty")
)
View Source
var (
	ErrBucketTTLMismatch = errors.New("bucket ttl mismatch")
)

Functions

func GenerateTLSConfig

func GenerateTLSConfig(caCertFile, clientKeyFile, clientCertFile string, tlsSkipVerify bool) (*tls.Config, error)

func MustNewUUID

func MustNewUUID() string

Types

type AsLeaderConfig

type AsLeaderConfig struct {
	// Looper is the loop construct that will be used to execute Func (required)
	Looper director.Looper

	// Bucket specifies what K/V bucket will be used for leader election (required)
	Bucket string

	// Key specifies the keyname that the leader election will occur on (required)
	Key string

	// NodeName is the name used for this node; should be unique in cluster (required)
	NodeName string

	// Description will set the bucket description (optional)
	Description string

	// ElectionLooper allows you to override the used election looper (optional)
	ElectionLooper director.Looper

	// BucketTTL specifies the TTL policy the bucket should use (optional)
	BucketTTL time.Duration

	// ReplicaCount specifies the number of replicas the bucket should use (optional, default 1)
	ReplicaCount int
}

type Config

type Config struct {
	// NatsURL defines the NATS urls the library will attempt to connect to. Iff
	// first URL fails, we will try to connect to the next one. Only fail if all
	// URLs fail.
	NatsURL []string

	// MaxMsgs defines the maximum number of messages a stream will contain.
	MaxMsgs int64

	// FetchSize defines the number of messages to fetch from the stream during
	// a single Fetch() call.
	FetchSize int

	// FetchTimeout defines how long a Fetch() call will wait to attempt to reach
	// defined FetchSize before continuing.
	FetchTimeout time.Duration

	// DeliverPolicy defines the policy the library will use to deliver messages.
	// Default: DeliverLastPolicy which will deliver from the last message that
	// the consumer has seen.
	DeliverPolicy nats.DeliverPolicy

	// Logger allows you to inject a logger into the library. Optional.
	Logger Logger

	// Whether to use TLS
	UseTLS bool

	// TLS CA certificate file
	TLSCACertFile string

	// TLS client certificate file
	TLSClientCertFile string

	// TLS client key file
	TLSClientKeyFile string

	// Do not perform server certificate checks
	TLSSkipVerify bool

	// PublishBatchSize is how many messages to async publish at once
	// Default: 256
	PublishBatchSize int

	// ServiceShutdownContext is used by main() to shutdown services before application termination
	ServiceShutdownContext context.Context

	// MainShutdownFunc is triggered by watchForShutdown() after all publisher queues are exhausted
	// and is used to trigger shutdown of APIs and then main()
	MainShutdownFunc context.CancelFunc

	// WorkerIdleTimeout determines how long to keep a publish worker alive if no activity
	WorkerIdleTimeout time.Duration

	// PublishTimeout is how long to wait for a batch of async publish calls to be ACK'd
	PublishTimeout time.Duration

	// PublishErrorCh will receive any
	PublishErrorCh chan *PublishError
}

type ConsumerConfig

type ConsumerConfig struct {
	// Subject is the subject to consume off of a stream
	Subject string

	// StreamName is the name of JS stream to consume from.
	// This should first be created with CreateStream()
	StreamName string

	// ConsumerName is the consumer that was made with CreateConsumer()
	ConsumerName string

	// Looper is optional, if none is provided, one will be created
	Looper director.Looper

	// ErrorCh is used to retrieve any errors returned during asynchronous publishing
	// If nil, errors will only be logged
	ErrorCh chan error
}

ConsumerConfig is used to pass configuration options to Consume()

type INatty

type INatty interface {
	// Consume subscribes to given subject and executes callback every time a
	// message is received. Consumed messages must be explicitly ACK'd or NAK'd.
	//
	// This is a blocking call; cancellation should be performed via the context.
	Consume(ctx context.Context, cfg *ConsumerConfig, cb func(ctx context.Context, msg *nats.Msg) error) error

	// Publish publishes a single message with the given subject; this method
	// will perform automatic batching as configured during `natty.New(..)`
	Publish(ctx context.Context, subject string, data []byte)

	// DeletePublisher shuts down a publisher and deletes it from the internal publisherMap
	DeletePublisher(ctx context.Context, id string) bool

	// CreateStream creates a new stream if it does not exist
	CreateStream(ctx context.Context, name string, subjects []string) error

	// DeleteStream deletes an existing stream
	DeleteStream(ctx context.Context, name string) error

	// CreateConsumer creates a new consumer if it does not exist
	CreateConsumer(ctx context.Context, streamName, consumerName string, filterSubject ...string) error

	// DeleteConsumer deletes an existing consumer
	DeleteConsumer(ctx context.Context, consumerName, streamName string) error

	// Get will fetch the value for a given bucket and key. Will NOT auto-create
	// bucket if it does not exist.
	Get(ctx context.Context, bucket string, key string) ([]byte, error)

	// Create will attempt to create a key in KV. It will return an error if
	// the key already exists. Will auto-create the bucket if it does not
	// already exist.
	Create(ctx context.Context, bucket string, key string, data []byte, keyTTL ...time.Duration) error

	// Put will put a new value for a given bucket and key. Will auto-create
	// the bucket if it does not already exist.
	Put(ctx context.Context, bucket string, key string, data []byte, ttl ...time.Duration) error

	// Delete will delete a key from a given bucket. Will no-op if the bucket
	// or key does not exist.
	Delete(ctx context.Context, bucket string, key string) error

	// CreateBucket will attempt to create a new bucket. Will return an error if
	// bucket already exists.
	CreateBucket(ctx context.Context, bucket string, ttl time.Duration, replicas int, description ...string) error

	// DeleteBucket will delete the specified bucket
	DeleteBucket(ctx context.Context, bucket string) error

	// WatchBucket returns an instance of nats.KeyWatcher for the given bucket
	WatchBucket(ctx context.Context, bucket string) (nats.KeyWatcher, error)

	// WatchKey returns an instance of nats.KeyWatcher for the given bucket and key
	WatchKey(ctx context.Context, bucket, key string) (nats.KeyWatcher, error)

	// Keys will return all of the keys in a bucket (empty slice if none found)
	Keys(ctx context.Context, bucket string) ([]string, error)

	// Refresh will attempt to perform a "safe" refresh of a key that has a TTL.
	// Natty will first attempt to fetch the key so that it can get its revision
	// and then perform an Update() referencing the revision. If the revision
	// does not match, it will return an error.
	Refresh(ctx context.Context, bucket, key string) error

	// Status queries the status of the KV bucket
	Status(ctx context.Context, bucket string) (nats.KeyValueStatus, error)

	// AsLeader enables simple leader election by using NATS k/v functionality.
	//
	// AsLeader will execute opts.Func if and only if the node executing AsLeader
	// acquires leader role. It will continue executing opts.Func until it loses
	// leadership and another node becomes leader.
	AsLeader(ctx context.Context, opts AsLeaderConfig, f func() error) error

	// HaveLeader returns bool indicating whether node-name in given cfg is the
	// leader for the cfg.Bucket and cfg.Key
	HaveLeader(ctx context.Context, nodeName, bucketName, keyName string) bool

	// GetLeader returns the current leader for a given bucket and key
	GetLeader(ctx context.Context, bucketName, keyName string) (string, error)
}

type KeyValueMap

type KeyValueMap struct {
	// contains filtered or unexported fields
}

func (*KeyValueMap) Delete

func (k *KeyValueMap) Delete(key string)

Delete functionality is not used because there is no way to list buckets in NATS

func (*KeyValueMap) Get

func (k *KeyValueMap) Get(key string) (nats.KeyValue, bool)

func (*KeyValueMap) Put

func (k *KeyValueMap) Put(key string, value nats.KeyValue)

type Logger

type Logger interface {
	// Debug sends out a debug message with the given arguments to the logger.
	Debug(args ...interface{})
	// Debugf formats a debug message using the given arguments and sends it to the logger.
	Debugf(format string, args ...interface{})
	// Info sends out an informational message with the given arguments to the logger.
	Info(args ...interface{})
	// Infof formats an informational message using the given arguments and sends it to the logger.
	Infof(format string, args ...interface{})
	// Warn sends out a warning message with the given arguments to the logger.
	Warn(args ...interface{})
	// Warnf formats a warning message using the given arguments and sends it to the logger.
	Warnf(format string, args ...interface{})
	// Error sends out an error message with the given arguments to the logger.
	Error(args ...interface{})
	// Errorf formats an error message using the given arguments and sends it to the logger.
	Errorf(format string, args ...interface{})
}

Logger is the common interface for user-provided loggers.

type Mode

type Mode int

type Natty

type Natty struct {
	*Config
	// contains filtered or unexported fields
}

func New

func New(cfg *Config) (*Natty, error)

func (*Natty) AsLeader

func (n *Natty) AsLeader(ctx context.Context, cfg AsLeaderConfig, f func() error) error

func (*Natty) Consume

func (n *Natty) Consume(ctx context.Context, cfg *ConsumerConfig, f func(ctx context.Context, msg *nats.Msg) error) error

Consume will create a durable consumer and consume messages from the configured stream

func (*Natty) Create

func (n *Natty) Create(ctx context.Context, bucket string, key string, data []byte, keyTTL ...time.Duration) error

Create will add the key/value pair iff it does not exist; it will create the bucket if it does not already exist. TTL is optional - it will only be used if the bucket does not exist & only the first TTL will be used.

func (*Natty) CreateBucket

func (n *Natty) CreateBucket(_ context.Context, name string, ttl time.Duration, replicaCount int, description ...string) error

CreateBucket creates a bucket; returns an error if it already exists. Context usage not supported by NATS kv (yet).

func (*Natty) CreateConsumer

func (n *Natty) CreateConsumer(ctx context.Context, streamName, consumerName string, filterSubject ...string) error

func (*Natty) CreateStream

func (n *Natty) CreateStream(ctx context.Context, name string, subjects []string) error

func (*Natty) Delete

func (n *Natty) Delete(ctx context.Context, bucket string, key string) error

func (*Natty) DeleteBucket

func (n *Natty) DeleteBucket(_ context.Context, bucket string) error

func (*Natty) DeleteConsumer

func (n *Natty) DeleteConsumer(ctx context.Context, consumerName, streamName string) error

func (*Natty) DeletePublisher

func (n *Natty) DeletePublisher(ctx context.Context, topic string) bool

DeletePublisher will stop the batch publisher goroutine and remove the publisher from the shared publisher map.

It is safe to call this if a publisher for the topic does not exist.

Returns bool which indicate if publisher exists.

func (*Natty) DeleteStream

func (n *Natty) DeleteStream(ctx context.Context, name string) error

func (*Natty) Get

func (n *Natty) Get(ctx context.Context, bucket string, key string) ([]byte, error)

func (*Natty) GetLeader

func (n *Natty) GetLeader(ctx context.Context, bucketName, keyName string) (string, error)

func (*Natty) HaveLeader

func (n *Natty) HaveLeader(ctx context.Context, nodeName, bucketName, keyName string) bool

func (*Natty) Keys

func (n *Natty) Keys(ctx context.Context, bucket string) ([]string, error)

func (*Natty) Publish

func (n *Natty) Publish(ctx context.Context, subject string, value []byte)

func (*Natty) Put

func (n *Natty) Put(ctx context.Context, bucket string, key string, data []byte, keyTTL ...time.Duration) error

Put puts a key/val into a bucket and will create bucket if it doesn't already exit. TTL is optional - it will only be used if the bucket does not exist & only the first TTL will be used.

func (*Natty) Refresh added in v0.0.36

func (n *Natty) Refresh(_ context.Context, bucket string, key string) error

Refresh will refresh the TTL of a key in a bucket. Since there is no built-in way to perform a refresh, we will first get the key and then attempt to update it referencing the revision ID we got.

func (*Natty) Status added in v0.0.39

func (n *Natty) Status(ctx context.Context, bucket string) (nats.KeyValueStatus, error)

func (*Natty) WatchBucket

func (n *Natty) WatchBucket(ctx context.Context, bucket string) (nats.KeyWatcher, error)

WatchBucket returns an instance of nats.KeyWatcher for the given bucket

func (*Natty) WatchKey added in v0.0.38

func (n *Natty) WatchKey(ctx context.Context, bucket, key string) (nats.KeyWatcher, error)

WatchKey returns an instance of nats.KeyWatcher for the given bucket

type NoOpLogger

type NoOpLogger struct {
}

NoOpLogger is a do-nothing logger; it is used internally as the default Logger when none is provided in the Options.

func (*NoOpLogger) Debug

func (l *NoOpLogger) Debug(args ...interface{})

Debug is no-op implementation of Logger's Debug.

func (*NoOpLogger) Debugf

func (l *NoOpLogger) Debugf(format string, args ...interface{})

Debugf is no-op implementation of Logger's Debugf.

func (*NoOpLogger) Error

func (l *NoOpLogger) Error(args ...interface{})

Error is no-op implementation of Logger's Error.

func (*NoOpLogger) Errorf

func (l *NoOpLogger) Errorf(format string, args ...interface{})

Errorf is no-op implementation of Logger's Errorf.

func (*NoOpLogger) Info

func (l *NoOpLogger) Info(args ...interface{})

Info is no-op implementation of Logger's Info.

func (*NoOpLogger) Infof

func (l *NoOpLogger) Infof(format string, args ...interface{})

Infof is no-op implementation of Logger's Infof.

func (*NoOpLogger) Warn

func (l *NoOpLogger) Warn(args ...interface{})

Warn is no-op implementation of Logger's Warn.

func (*NoOpLogger) Warnf

func (l *NoOpLogger) Warnf(format string, args ...interface{})

Warnf is no-op implementation of Logger's Warnf.

type PublishError

type PublishError struct {
	Subject string
	Message error
}

PublishError is a wrapper struct used to return errors to code that occur during async batch publishes

type Publisher

type Publisher struct {
	Subject     string
	QueueMutex  *sync.RWMutex
	Queue       []*message
	Natty       *Natty
	IdleTimeout time.Duration

	// ErrorCh is optional. It will receive async publish errors if specified
	// Otherwise errors will only be logged
	ErrorCh chan *PublishError

	// PublisherContext is used to close a specific publisher
	PublisherContext context.Context

	// PublisherCancel is used to cancel a specific publisher's context
	PublisherCancel context.CancelFunc

	// ServiceShutdownContext is used by main() to shutdown services before application termination
	ServiceShutdownContext context.Context
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL