natslock

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 2, 2023 License: Apache-2.0 Imports: 5 Imported by: 1

README

natslock

This package leverages NATS JetStream key-value stores to implement distributed locking/leader election.

The initial use-case is for governor addons that run timed reconcile loops and need to avoid multiple executions when several instances of the same addon are started. Using this package, the first instance to obtain the lock becomes the leader and will run all of the timed reconcile loops going forward. When gracefully stopped, it releases the lock and one of the others becomes the new leader at the next run. If a leader becomes unresponsive, the lock automatically times out after a pre-determined interval, allowing for a new leader to be elected.

Warning: The current implementation has the potential to introduce a race condition (because of the KeyValue Get and Put). If that's critical for your use case you probably shouldn't use this, or at least add some extra checks.

Example usage

    // assume there's an existing nc *nats.Conn

    // create a jetstream context
    jets, err := nc.JetStream()
    if err != nil {
        return nil, err
    }

    // initialize the KeyValue store with the given name and ttl
    kvStore, err := natslock.NewKeyValue(jets, "my-lock-bucket", time.Hour)
    if err != nil {
        return nil, err
    }

    // initialize the Locker
    locker := natslock.New(natslock.WithKeyValueStore(kvStore))

    // then inside your loop ...

    for {
        // check if this instance is the lead (or acquire the lock if no one else is)
        isLead, err := locker.AcquireLead()
        if err != nil {
            // error acquiring/checking leader lock (you may want to fail here)
            continue
        }

        if !isLead {
            continue
        }

        // execute remainder of code
    }

Documentation

Overview

Package natslock provides locking and leader election using NATS JetStream KV store

Index

Constants

View Source
const DefaultKeyName = "leader"

DefaultKeyName is the key name used for the lock

Variables

View Source
var ErrBadParameter = errors.New("bad parameters in request")

ErrBadParameter is returned when bad parameters are passed to a request

Functions

func NewKeyValue

func NewKeyValue(jets nats.JetStreamContext, name string, ttl time.Duration) (nats.KeyValue, error)

NewKeyValue returns a JetStream key-value store with the given name. If the bucket does not exist, it will be created with the given TTL.

Types

type Locker

type Locker struct {
	ID      uuid.UUID
	KVStore nats.KeyValue
	KVKey   string
	Logger  *zap.Logger
}

Locker is a distributed lock backed by a JetStream key-value store

func New

func New(opts ...Option) (*Locker, error)

New returns a new locker

func (*Locker) AcquireLead

func (l *Locker) AcquireLead() (bool, error)

AcquireLead attempts to acquire the leader lock returns true if successful. If the lock is already held by another id, it will return false.

func (*Locker) Name

func (l *Locker) Name() string

Name returns the name of the locker kv store

func (*Locker) ReleaseLead

func (l *Locker) ReleaseLead() error

ReleaseLead releases the leader lock if it's held by this id

func (*Locker) TTL

func (l *Locker) TTL() time.Duration

TTL returns the ttl of the locker kv store

type Option

type Option func(l *Locker)

Option is a functional configuration option

func WithKeyValueStore

func WithKeyValueStore(kv nats.KeyValue) Option

WithKeyValueStore sets the nats key value store

func WithLogger

func WithLogger(log *zap.Logger) Option

WithLogger sets logger

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL