leaderelection

package module
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2023 License: Apache-2.0 Imports: 11 Imported by: 0

README

Go GoDoc

LeaderElection

LeaderElection is a library for electing a leader with a pluggable backend (a RaceDecider).

RaceDecider

The RaceDecider implementation needs to provide a conditional-write or transactional-write mechanism to ensure that only one write succeeds per election-cycle, but otherwise has very minimal requirements.

Currently, this implementation only has two implementations:

  • Google Cloud Storage: intended for easy production use while running in GCP, using a GCS object for locking.
  • Memory: intended for tests.

Electing a leader

The Config struct contains callbacks and tunables for the leader election "campaign".

Each process that would like to acquire leadership must register callbacks for all three of OnElected, OnOusting and LeaderChanged, as well as specify unique LeaderID and HostPorts (the latter two are used for communication, so some use-cases may be able to ignore them)

The TermLength is the length of time that a leader acquires leadership for, and the length of any extension. This duration should be long enough to get useful work done, but short enough that you won't have a problem if the leader dies and no one takes over the remainder of the lease term. The Config.Acquire method takes care of extending the lease twice per term to reduce the likelihood of spuriously losing the lock.

MaxClockSkew specifies the corrections added to and subtracted from sleeps and leases to account for a lack of perfect synchronization among the clocks of all candidates.

ConnectionParams is a generic byte-payload side-channel. The legrpc package uses it for the GRPC ServiceConfig, but other use-cases may stash other serialized data there (may be nil).

Clock is an instance of a clocks.Clock, which should be nil outside of tests, in which case it uses clocks.DefaultClock().

There's more to Leadership than getting elected

The OnElected callback takes two arguments which indicate the state of the leadership lock with different degrees of certainty.

The ctx argument is a context derived from the one passed to Acquire that will be canceled upon losing the leadership role. This is an explicit cancellation by the goroutine handling lease renewals and acquisition, and as such is subject to normal thread-scheduling delay caveats (particularly relevant when operating with heavy CPU-contention).

To address the thread-scheduling-delay issues plaguing use of ctx, the second argument to OnElected is a *TimeView containing the current expiration time. This pointer tracks an atomic value which is updated every time the lease is extended. Before taking any action that requires holding leadership, one should always check that the time returned by t.Get() is in the future. TimeView has a ValueInFuture() convenience-method to facilitate such checks for quick operations against the correct clock.

Picking a RaceDecider

The gcs RaceDecider is the only currently usable implementation. It requires a Google Cloud Storage client, a bucket and an object.

In tests, one can use the memory RaceDecider, as that implementation is trivially fast and lacks any external dependencies, but doesn't work outside a single process.

Watching an Election

Leader election is useful on its own only for a subset of use cases. Many times it is necessary for other processes to observe a leader election to send requests to the correct process. As such, one can define a WatchConfig and call Watch() on it. The callback will be called sequentially for every lease-extension and acquisition, thus allowing an observer to track the expiration of the current leader's leadership term.

Documentation

Overview

Package leaderelection provides a simple to configure mechanism for electing a leader among processes.

There are two real entrypoints within this package: Config.Acquire() and WatchConfig.Watch(). Config.Acquire() is used for acquiring leadership, while `WatchConfig.Watch` is for observing leadership transitions and status.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func SelfHostPorts added in v0.2.0

func SelfHostPorts(port string) ([]string, error)

SelfHostPorts gets the global unicast addresses of the local execution environment and returns a slice of IP:Port which can be passed to Config.HostPort

Types

type Config

type Config struct {
	// OnElected is called when the local instance wins an election
	// The context is cancelled when the lock is lost.
	// The expirationTime argument will contain the expiration time and its
	// contents will be updated as the term expiration gets extended.
	// One should use the ValueInFuture method on TimeView to verify that
	// the lock is still held before doing anything that requires the
	// leader role.
	OnElected func(ctx context.Context, expirationTime *TimeView)
	// OnOusting is called when leadership is lost.
	OnOusting func(ctx context.Context)
	// LeaderChanged is called when another candidate becomes leader.
	LeaderChanged func(ctx context.Context, entry entry.RaceEntry)

	LeaderID string

	// HostPort is a slice of the system's unicast interface addresses to which clients should connect.
	// Optional, but recommended in general.
	// Required if used in combination with legrpc, so the RaceDecider has
	// a record of how to connect to the current leader.
	HostPort []string

	// Decider is the RaceDecider implementation in use
	Decider RaceDecider

	// TermLength indicates how long a leader is allowed to hold a
	// leadership role before it expires (if it's not extended)
	// This must be at least 2x MaxClockSkew (preferably more).
	TermLength time.Duration

	// Maximum expected clock-skew, so sleeps, time-bounds are adjusted to
	// take this into account.
	MaxClockSkew time.Duration

	// ConnectionParams should be used as a side-channel for
	// leader-election metadata for the legrpc package, e.g. we use it for
	// storing the GRPC ServiceConfig (or nothing).
	ConnectionParams []byte

	// Clock implementation to use when scheduling sleeps, renewals and comparing leader-terms.
	// The nil-value falls back to a sane default implementation that simply wraps
	// the `time` package's functions.
	Clock clocks.Clock
}

Config defines the common fields of configs for various leaderelection backend implementations.

func (Config) Acquire

func (c Config) Acquire(ctx context.Context) error

Acquire blocks until the context expires or is cancelled.

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := memory.NewDecider()

now := time.Date(2020, 5, 6, 0, 0, 0, 0, time.UTC)
fc := fake.NewClock(now)

tvLeaderIndicator := (*TimeView)(nil)
electedCh := make(chan struct{})
c := Config{
	Decider:  d,
	HostPort: []string{"127.0.0.1:8080"},
	LeaderID: "yabadabadoo",
	OnElected: func(ctx context.Context, tv *TimeView) {
		fmt.Printf("I won! Leader term expiration: %s; held: %t\n",
			tv.Get(), tv.ValueInFuture())
		tvLeaderIndicator = tv
		close(electedCh)
	},
	OnOusting: func(ctx context.Context) {
		// make sure we've already set `tvLeaderIndictor` before touching it
		<-electedCh
		fmt.Printf("I lost! Holding Lock: %t; expires: %s\n",
			// Note that ValueInFuture will return true
			// here because the clock is still within the
			// term.
			tvLeaderIndicator.ValueInFuture(),
			tvLeaderIndicator.Get())
	},
	LeaderChanged: func(ctx context.Context, entry entry.RaceEntry) {
		fmt.Printf("%q won\n", entry.LeaderID)
	},
	TermLength:   time.Minute * 30,
	MaxClockSkew: time.Second * 10,
	Clock:        fc,
}
acquireCh := make(chan error, 1)
go func() {
	acquireCh <- c.Acquire(ctx)
}()

fc.AwaitSleepers(1)
<-electedCh
fc.Advance(c.TermLength / 2)

// Wait for the lease renewal to happen before cancelling (so the
// output is predictable)
fc.AwaitSleepers(1)

cancel()
// Acquire blocks until all callbacks return (there's an internal WaitGroup)
<-acquireCh
// advance past the end of the current term (after an extension)
fc.Advance(c.TermLength + time.Minute)
fmt.Printf("Still Leading: %t; expiry: %s; current time: %s\n",
	tvLeaderIndicator.ValueInFuture(),
	tvLeaderIndicator.Get(), fc.Now())
Output:

I won! Leader term expiration: 2020-05-06 00:30:00 +0000 UTC; held: true
I lost! Holding Lock: true; expires: 2020-05-06 00:45:00 +0000 UTC
"" won
Still Leading: false; expiry: 2020-05-06 00:45:00 +0000 UTC; current time: 2020-05-06 00:46:00 +0000 UTC

type FailedAcquisitionErr

type FailedAcquisitionErr interface {
	error
	FailedAcquire()
}

FailedAcquisitionErr types indicate that the error was non-fatal and most likely a result of someone else grabbing the lock before us

type RaceDecider

type RaceDecider interface {
	// WriteEntry implementations should write the entry argument to
	// stable-storage in a transactional-way such that only one contender
	// wins per-election/term
	// The interface{} return value is a new token if the write succeeded.
	WriteEntry(ctx context.Context, entry *entry.RaceEntry) (entry.LeaderToken, error)
	// ReadCurrent should provide the latest version of RaceEntry available
	// and put any additional information needed to ensure transactional
	// behavior in the Token-field.
	ReadCurrent(ctx context.Context) (*entry.RaceEntry, error)
}

RaceDecider is a storage backend that provides transactional semantics

type TimeView

type TimeView struct {
	// contains filtered or unexported fields
}

TimeView is a value containing an atomically updatable time.Time

func NewTimeView

func NewTimeView(c clocks.Clock) *TimeView

NewTimeView constructs a TimeView

func (*TimeView) Clock

func (t *TimeView) Clock() clocks.Clock

Clock returns the clocks.Clock instance against-which times are measured.

func (*TimeView) Get

func (t *TimeView) Get() time.Time

Get provides the current value of the encapsulated timestamp

func (*TimeView) Set

func (t *TimeView) Set(v time.Time)

Set sets the current value of the encapsulated timestamp Exported so clients can change the values in tests

func (*TimeView) ValueInFuture

func (t *TimeView) ValueInFuture() bool

ValueInFuture compares the currently held timestamp against the current timestamp associated with the contained clock. (equivalent to still owning leadership as returned by Acquire)

type WatchConfig

type WatchConfig struct {
	// Decider used to lookup the current leader
	Decider RaceDecider
	// Clock implementation to use when scheduling sleeps and comparing leader-terms.
	// The nil-value falls back to a sane default implementation that simply wraps
	// the `time` package's functions.
	Clock clocks.Clock
}

WatchConfig configures the watcher

func (WatchConfig) Watch

func (w WatchConfig) Watch(ctx context.Context, cb func(ctx context.Context, entry entry.RaceEntry)) error

Watch provides a way for an observer to watch for changes to the identity of the current leader

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := memory.NewDecider()
now := time.Date(2020, 5, 6, 0, 0, 0, 0, time.UTC)
fc := fake.NewClock(now)

watchConfig := WatchConfig{
	Decider: d,
	Clock:   fc,
}

c, _ := d.ReadCurrent(ctx)

d.WriteEntry(ctx, &entry.RaceEntry{
	LeaderID:         "fimbat",
	HostPort:         []string{"test:80"},
	TermExpiry:       now.Add(time.Hour),
	ElectionNumber:   c.ElectionNumber + 1,
	ConnectionParams: nil,
	Token:            c.Token,
})

ch := make(chan struct{})
go func() {
	defer close(ch)
	_ = watchConfig.Watch(ctx, func(ctx context.Context, e entry.RaceEntry) {
		fmt.Println(e.HostPort, e.TermExpiry)
	})
}()

fc.AwaitSleepers(1)
cancel()
<-ch
Output:

[test:80] 2020-05-06 01:00:00 +0000 UTC

Directories

Path Synopsis
Package gcs contains an implementation of RaceDecider (plus helpers) for using Google Cloud Storage as a backend in leader-election.
Package gcs contains an implementation of RaceDecider (plus helpers) for using Google Cloud Storage as a backend in leader-election.
Package memory implements an in-memory variant of the Decider to allow for quick local/single-process testing
Package memory implements an in-memory variant of the Decider to allow for quick local/single-process testing

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL