worker

package
v0.38.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2023 License: MIT Imports: 16 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Balancer added in v0.22.0

type Balancer interface {
	Name() string
	Start(ctx context.Context) <-chan struct{}
	Stop(ctx context.Context)
}

type ConsulMemberList

type ConsulMemberList struct {
	// contains filtered or unexported fields
}

NewConsulMemberList is a member of a list of servers managing distributed workers backed by consul.

func NewConsulMemberList

func NewConsulMemberList(address, family string, expiration time.Duration) (*ConsulMemberList, error)

func (*ConsulMemberList) List

List lists all servers and the workers under each of them

func (*ConsulMemberList) Name

func (c *ConsulMemberList) Name() string

func (*ConsulMemberList) Register

func (c *ConsulMemberList) Register(ctx context.Context, workers []string) error

Register registers the workers under this member

func (*ConsulMemberList) Unregister added in v0.22.0

func (c *ConsulMemberList) Unregister(ctx context.Context) error

type LockerFactory added in v0.14.2

type LockerFactory func(lockName string) lock.Locker

type MemberWorkers

type MemberWorkers struct {
	Name    string
	Workers []string
}

type Memberlister

type Memberlister interface {
	Name() string
	// List lists all servers and the workers under each of them
	List(context.Context) ([]MemberWorkers, error)
	// Register registers the workers under this member
	Register(context.Context, []string) error
	// Unregister removes itself from the list
	Unregister(context.Context) error
}

Memberlister represents a member of a list to which the a worker may be assigned

type MembersBalancer added in v0.25.0

type MembersBalancer struct {
	// contains filtered or unexported fields
}

MembersBalancer manages the lifecycle of workers as well balance them over all members.

A worker can be unbalanced or paused.

  • Unbalanced worker will never be balanced across the different members and will always run.
  • Paused worker will hold its execution and will not be balanced-

func NewMembersBalancer added in v0.25.0

func NewMembersBalancer(logger *slog.Logger, name string, member Memberlister, workers []Worker, options ...MembersBalancerOption) *MembersBalancer

func (*MembersBalancer) Name added in v0.25.0

func (b *MembersBalancer) Name() string

func (*MembersBalancer) Start added in v0.25.0

func (b *MembersBalancer) Start(ctx context.Context) <-chan struct{}

func (*MembersBalancer) Stop added in v0.25.0

func (b *MembersBalancer) Stop(ctx context.Context)

type MembersBalancerOption added in v0.26.0

type MembersBalancerOption func(*MembersBalancer)

func WithHeartbeat added in v0.26.0

func WithHeartbeat(heartbeat time.Duration) MembersBalancerOption

func WithTurboHeartbeat added in v0.26.0

func WithTurboHeartbeat(heartbeat time.Duration) MembersBalancerOption

type PartitionSlot

type PartitionSlot struct {
	From uint32
	To   uint32
}

func ParseSlot

func ParseSlot(slot string) (PartitionSlot, error)

func ParseSlots

func ParseSlots(slots []string) ([]PartitionSlot, error)

func (PartitionSlot) Size

func (ps PartitionSlot) Size() uint32

type RedisMemberList

type RedisMemberList struct {
	// contains filtered or unexported fields
}

func NewRedisMemberlist

func NewRedisMemberlist(addresses, prefix string, expiration time.Duration) *RedisMemberList

func (*RedisMemberList) List

func (*RedisMemberList) Name

func (r *RedisMemberList) Name() string

func (*RedisMemberList) Register

func (r *RedisMemberList) Register(ctx context.Context, workers []string) error

func (*RedisMemberList) Unregister added in v0.22.0

func (r *RedisMemberList) Unregister(ctx context.Context) error

type RunWorker

type RunWorker struct {
	// contains filtered or unexported fields
}

RunWorker is responsible for refreshing the lease

func NewRun added in v0.37.0

func NewRun(logger *slog.Logger, name, group string, locker lock.Locker, task Task) *RunWorker

func (*RunWorker) Group added in v0.26.0

func (w *RunWorker) Group() string

func (*RunWorker) IsBalanceable added in v0.26.0

func (w *RunWorker) IsBalanceable() bool

func (*RunWorker) IsRunning

func (w *RunWorker) IsRunning() bool

func (*RunWorker) Name

func (w *RunWorker) Name() string

func (*RunWorker) Start

func (w *RunWorker) Start(ctx context.Context) (bool, error)

Start attempts to execute the worker in a separate goroutine. It returns true if it able to acquire the lock to execute, false otherwise.

func (*RunWorker) Stop

func (w *RunWorker) Stop(ctx context.Context)

type SingleBalancer added in v0.26.0

type SingleBalancer struct {
	// contains filtered or unexported fields
}

func NewSingleBalancer added in v0.24.0

func NewSingleBalancer(logger *slog.Logger, worker Worker, heartbeat time.Duration) *SingleBalancer

func RunSingleBalancer added in v0.34.0

func RunSingleBalancer(ctx context.Context, logger *slog.Logger, worker Worker, heartbeat time.Duration) *SingleBalancer

func (*SingleBalancer) Name added in v0.26.0

func (b *SingleBalancer) Name() string

func (*SingleBalancer) Start added in v0.26.0

func (b *SingleBalancer) Start(ctx context.Context) <-chan struct{}

func (*SingleBalancer) Stop added in v0.26.0

func (b *SingleBalancer) Stop(ctx context.Context)

type Task

type Task func(ctx context.Context) error

type TaskerFactory added in v0.37.0

type TaskerFactory func(partitionLow, partitionHi uint32) Task

type Worker

type Worker interface {
	Name() string
	Group() string
	IsRunning() bool
	IsBalanceable() bool
	Start(context.Context) (bool, error)
	Stop(ctx context.Context)
}

Worker represents an execution that can be started or stopped

func EventForwarder added in v0.37.0

func EventForwarder(logger *slog.Logger, name string, lockerFactory LockerFactory, task Task) Worker

EventForwarderWorker creates a single worker responsible of forwarding

func PartitionedEventForwarders added in v0.37.0

func PartitionedEventForwarders(logger *slog.Logger, name string, lockerFactory LockerFactory, taskerFactory TaskerFactory, partitionSlots []PartitionSlot) []Worker

PartitionedEventForwarderWorkers create workers responsible to forward events to their managed topic partition each worker is responsible to forward a range of partitions

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL