cluster

package
v0.0.0-...-5b43f93 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2023 License: Apache-2.0 Imports: 18 Imported by: 0

README

aeron-go/cluster

Implementation of Aeron Cluster service container in Go. Most structs and functions have near one-to-one parity with the Java classes and methods on which they are based.

The Java media driver, archive and consensus module must be used to run a cluster.

The Aeron Cluster protocol is specified in xml using the Simple Binary Encoding (SBE).

Current State

The implementation is functional and mostly feature complete, including support for snapshotting, timers, multiple services within the same cluster, sending messages back to cluster sessions, and service mark files. The Cluster interface lacks of the methods of its Java equivalent, but these would be trivial additions.

Examples

echo_service.go implements a basic echo service and can be used in place of its Java equivalent.

throughput_test_client.go implements an example of using the cluster client.

Backlog

  • godoc improvements
  • testing
  • cluster session close handling (avoid sending duplicate close requests to consensus module)
  • SBE encoding/decoding improvements

Documentation

Index

Constants

View Source
const (
	SBEHeaderLength            = 8
	SessionMessageHeaderLength = 24
)
View Source
const (
	Follower  Role = 0
	Candidate      = 1
	Leader         = 2
)
View Source
const (
	ClusterSchemaId                = 111
	ClusterSchemaVersion           = 8
	SessionMessageHeaderTemplateId = 1
	SessionEventTemplateId         = 2
	SessionCloseRequestTemplateId  = 4
	SessionKeepAliveTemplateId     = 5
	NewLeaderEventTemlateId        = 6
	ChallengeTemplateId            = 7
)
View Source
const (
	HeaderLength      = 8 * 1024
	ErrorBufferLength = 1024 * 1024
)
View Source
const (
	ClientSessionMockedOffer = 1
)
View Source
const NullPosition = -1
View Source
const NullValue = -1
View Source
const SessionMessageHdrBlockLength = 24

Variables

This section is empty.

Functions

This section is empty.

Types

type ClientSession

type ClientSession interface {
	Id() int64
	ResponseStreamId() int32
	ResponseChannel() string
	EncodedPrincipal() []byte
	Close()
	// TODO: the other close methods are not part of interface.
	// I don't understand the closing bool implementation and why it is needed
	// Leaving out for now unless it is really important
	// IsClosing() bool
	Offer(*atomic.Buffer, int32, int32, term.ReservedValueSupplier) int64
}

type Cluster

type Cluster interface {

	// LogPosition returns the position the log has reached in bytes as of the current message
	LogPosition() int64

	// MemberId returns the unique id for the hosting member of the cluster. Useful only for debugging purposes
	MemberId() int32

	// Role returns the role the cluster node is playing
	Role() Role

	// Time returns the cluster time as time units since 1 Jan 1970 UTC
	Time() int64

	// TimeUnit returns the unit of time applied when timestamping and time operations
	TimeUnit() codecs.ClusterTimeUnitEnum

	// IdleStrategy returns the IdleStrategy which should be used by the service when it experiences back-pressure on egress,
	// closing sessions, making timer requests, or any long-running actions
	IdleStrategy() idlestrategy.Idler

	// ScheduleTimer schedules a timer for a given deadline and provide a correlation id to identify the timer when it expires or
	// for cancellation. This action is asynchronous and will race with the timer expiring.
	//
	// If the correlationId is for an existing scheduled timer then it will be rescheduled to the new deadline. However,
	// it is best to generate correllationIds in a monotonic fashion and be aware of potential clashes with other
	// services in the same cluster. Service isolation can be achieved by using the upper bits for service id.
	//
	// Timers should only be scheduled or cancelled in the context of processing
	// - onSessionMessage
	// - onTimerEvent
	// - onSessionOpen
	// - onSessionClose
	// If applied to other events then they are not guaranteed to be reliable.
	//
	// Callers of this method should loop until the method succeeds.
	//
	// The cluster's idle strategy must be used in the body of the loop to allow for the clustered service to be
	// shutdown if required.
	//
	// ScheduleTimer returns true if the event to schedule a timer request has been sent or false if back-pressure is applied
	ScheduleTimer(correlationId int64, deadline int64) bool

	// CancelTimer cancels a previously scheduled timer. This action is asynchronous and will race with the timer expiring.
	//
	// Timers should only be scheduled or cancelled in the context of processing
	// - onSessionMessage
	// - onTimerEvent
	// - onSessionOpen
	// - onSessionClose
	// If applied to other events then they are not guaranteed to be reliable.
	//
	// Callers of this method should loop until the method succeeds.
	//
	// CancelTimer   returns true if the event to cancel request has been sent or false if back-pressure is applied.
	CancelTimer(correlationId int64) bool

	// Offer a message as ingress to the cluster for sequencing. This will happen efficiently over IPC to the
	// consensus module and set the cluster session as the negative value of the cluster's ServiceID
	//
	// Callers of this method should loop until the method succeeds.
	//
	// The cluster's idle strategy must be used in the body of the loop to allow for the clustered service to be
	// shutdown if required
	Offer(*atomic.Buffer, int32, int32) int64
}

type ClusterMarkFile

type ClusterMarkFile struct {
	// contains filtered or unexported fields
}

func NewClusterMarkFile

func NewClusterMarkFile(filename string) (*ClusterMarkFile, error)

func (*ClusterMarkFile) SignalFailedStart

func (cmf *ClusterMarkFile) SignalFailedStart()

func (*ClusterMarkFile) SignalReady

func (cmf *ClusterMarkFile) SignalReady()

func (*ClusterMarkFile) UpdateActivityTimestamp

func (cmf *ClusterMarkFile) UpdateActivityTimestamp(timestamp int64)

type ClusteredService

type ClusteredService interface {
	// OnStart is called to initialize the service and load snapshot state, where the snapshot image can be nil if no previous snapshot exists.
	//
	// Note: As this can potentially be a long-running operation, the implementation should use Cluster.IdleStrategy() and
	// occasionally call IdleStrategy.Idle(), especially when polling the Image returns 0
	OnStart(cluster Cluster, image aeron.Image)

	// OnSessionOpen notifies the clustered service that a session has been opened for a client to the cluster
	OnSessionOpen(session ClientSession, timestamp int64)

	// OnSessionClose notifies the clustered service that a session has been closed for a client to the cluster
	OnSessionClose(
		session ClientSession,
		timestamp int64,
		closeReason codecs.CloseReasonEnum,
	)

	// OnSessionMessage notifies the clustered service that a message has been received to be processed by a clustered service
	OnSessionMessage(
		session ClientSession,
		timestamp int64,
		buffer *atomic.Buffer,
		offset int32,
		length int32,
		header *logbuffer.Header,
	)

	// OnTimerEvent notifies the clustered service that a scheduled timer has expired
	OnTimerEvent(correlationId, timestamp int64)

	// OnTakeSnapshot instructs the clustered service to take a snapshot and store its state to the provided aeron archive Publication.
	//
	// Note: As this is a potentially long-running operation the implementation should use
	// Cluster.idleStrategy() and then occasionally call IdleStrategy.idle()
	// especially when the snapshot ExclusivePublication returns Publication.BACK_PRESSURED
	OnTakeSnapshot(publication *aeron.Publication)

	// OnRoleChange notifies the clustered service that the cluster node has changed role
	OnRoleChange(role Role)

	// OnTerminate notifies the clustered service that the container is going to terminate
	OnTerminate(cluster Cluster)

	// OnNewLeadershipTermEvent notifies the clustered service that an election has been successful and a leader has entered a new term
	OnNewLeadershipTermEvent(
		leadershipTermId int64,
		logPosition int64,
		timestamp int64,
		termBaseLogPosition int64,
		leaderMemberId int32,
		logSessionId int32,
		timeUnit codecs.ClusterTimeUnitEnum,
		appVersion int32,
	)
}

type ClusteredServiceAgent

type ClusteredServiceAgent struct {
	// contains filtered or unexported fields
}

func NewClusteredServiceAgent

func NewClusteredServiceAgent(
	aeronCtx *aeron.Context,
	options *Options,
	service ClusteredService,
) (*ClusteredServiceAgent, error)

func (*ClusteredServiceAgent) CancelTimer

func (agent *ClusteredServiceAgent) CancelTimer(correlationId int64) bool

func (*ClusteredServiceAgent) DoWork

func (agent *ClusteredServiceAgent) DoWork() int

func (*ClusteredServiceAgent) Idle

func (agent *ClusteredServiceAgent) Idle(workCount int)

func (*ClusteredServiceAgent) IdleStrategy

func (agent *ClusteredServiceAgent) IdleStrategy() idlestrategy.Idler

func (*ClusteredServiceAgent) LogPosition

func (agent *ClusteredServiceAgent) LogPosition() int64

func (*ClusteredServiceAgent) MemberId

func (agent *ClusteredServiceAgent) MemberId() int32

func (*ClusteredServiceAgent) Offer

func (agent *ClusteredServiceAgent) Offer(buffer *atomic.Buffer, offset, length int32) int64

func (*ClusteredServiceAgent) OnStart

func (agent *ClusteredServiceAgent) OnStart() error

func (*ClusteredServiceAgent) Role

func (agent *ClusteredServiceAgent) Role() Role

func (*ClusteredServiceAgent) ScheduleTimer

func (agent *ClusteredServiceAgent) ScheduleTimer(correlationId int64, deadline int64) bool

func (*ClusteredServiceAgent) StartAndRun

func (agent *ClusteredServiceAgent) StartAndRun() error

func (*ClusteredServiceAgent) Time

func (agent *ClusteredServiceAgent) Time() int64

func (*ClusteredServiceAgent) TimeUnit

type MarkFileHeaderFlyweight

type MarkFileHeaderFlyweight struct {
	flyweight.FWBase

	Version           flyweight.Int32Field
	ComponentType     flyweight.Int32Field
	ActivityTimestamp flyweight.Int64Field
	StartTimestamp    flyweight.Int64Field

	Pid                     flyweight.Int64Field
	CandidateTermId         flyweight.Int64Field
	ArchiveStreamId         flyweight.Int32Field
	ServiceStreamId         flyweight.Int32Field
	ConsensusModuleStreamId flyweight.Int32Field
	IngressStreamId         flyweight.Int32Field
	MemberId                flyweight.Int32Field
	ServiceId               flyweight.Int32Field

	HeaderLength      flyweight.Int32Field
	ErrorBufferLength flyweight.Int32Field
	ClusterId         flyweight.Int32Field
}

func (*MarkFileHeaderFlyweight) Wrap

func (f *MarkFileHeaderFlyweight) Wrap(
	buf *atomic.Buffer,
	offset int,
) flyweight.Flyweight

type Options

type Options struct {
	Timeout                 time.Duration      // [runtime] How long to try sending/receiving control messages
	IdleStrategy            idlestrategy.Idler // [runtime] Idlestrategy for sending/receiving control messagesA
	RangeChecking           bool               // [runtime] archive protocol marshalling checks
	Loglevel                zapcore.Level      // [runtime] via logging.SetLevel()
	ClusterDir              string
	ClusterId               int32
	ServiceId               int32
	AppVersion              int32
	ControlChannel          string
	ConsensusModuleStreamId int32
	ServiceStreamId         int32
	SnapshotChannel         string
	SnapshotStreamId        int32
	ReplayChannel           string
	ReplayStreamId          int32
	ArchiveOptions          *archive.Options
	LogFragmentLimit        int
}

func NewOptions

func NewOptions() *Options

type Role

type Role int32

Directories

Path Synopsis
Package codecs contains the archive protocol packet encoding and decoding
Package codecs contains the archive protocol packet encoding and decoding

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL