cluster

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2022 License: Apache-2.0 Imports: 18 Imported by: 0

README

aeron-go/cluster

Implementation of Aeron Cluster service container in Go. Most structs and functions have near one-to-one parity with the Java classes and methods on which they are based.

The Java media driver, archive and consensus module must be used to run a cluster.

The Aeron Cluster protocol is specified in xml using the Simple Binary Encoding (SBE).

Current State

The implementation is functional and mostly feature complete, including support for snapshotting, timers, multiple services within the same cluster, sending messages back to cluster sessions, and service mark files. The Cluster interface lacks of the methods of its Java equivalent, but these would be trivial additions.

The Cluster Client has not been implemented yet. One must use the Java version to connect to cluster.

Examples

echo_service.go implements a basic echo service and can be used in place of its Java equivalent.

Backlog

  • godoc improvements
  • testing
  • cluster client
  • cluster session close handling (avoid sending duplicate close requests to consensus module)
  • SBE encoding/decoding improvements

Documentation

Index

Constants

View Source
const (
	SBEHeaderLength            = 8
	SessionMessageHeaderLength = 24
)
View Source
const (
	Follower  Role = 0
	Candidate      = 1
	Leader         = 2
)
View Source
const (
	ClusterSchemaId                = 111
	ClusterSchemaVersion           = 8
	SessionMessageHeaderTemplateId = 1
	SessionEventTemplateId         = 2
	SessionCloseRequestTemplateId  = 4
	SessionKeepAliveTemplateId     = 5
	NewLeaderEventTemlateId        = 6
	ChallengeTemplateId            = 7
)
View Source
const (
	HeaderLength      = 8 * 1024
	ErrorBufferLength = 1024 * 1024
)
View Source
const (
	ClientSessionMockedOffer = 1
)
View Source
const NullPosition = -1
View Source
const NullValue = -1
View Source
const SessionMessageHdrBlockLength = 24

Variables

This section is empty.

Functions

This section is empty.

Types

type ClientSession

type ClientSession interface {
	Id() int64
	ResponseStreamId() int32
	ResponseChannel() string
	EncodedPrincipal() []byte
	Close()
	// TODO: the other close methods are not part of interface.
	// I don't understand the closing bool implementation and why it is needed
	// Leaving out for now unless it is really important
	// IsClosing() bool
	Offer(*atomic.Buffer, int32, int32, term.ReservedValueSupplier) int64
}

type Cluster

type Cluster interface {
	LogPosition() int64
	MemberId() int32
	Role() Role
	Time() int64
	IdleStrategy() idlestrategy.Idler

	// ScheduleTimer schedules a timer for a given deadline
	ScheduleTimer(correlationId int64, deadline int64) bool
	CancelTimer(correlationId int64) bool
}

type ClusterMarkFile

type ClusterMarkFile struct {
	// contains filtered or unexported fields
}

func NewClusterMarkFile

func NewClusterMarkFile(filename string) (*ClusterMarkFile, error)

func (*ClusterMarkFile) SignalFailedStart

func (cmf *ClusterMarkFile) SignalFailedStart()

func (*ClusterMarkFile) SignalReady

func (cmf *ClusterMarkFile) SignalReady()

func (*ClusterMarkFile) UpdateActivityTimestamp

func (cmf *ClusterMarkFile) UpdateActivityTimestamp(timestamp int64)

type ClusteredService

type ClusteredService interface {
	OnStart(cluster Cluster, image *aeron.Image)

	OnSessionOpen(session ClientSession, timestamp int64)

	OnSessionClose(
		session ClientSession,
		timestamp int64,
		closeReason codecs.CloseReasonEnum,
	)

	OnSessionMessage(
		session ClientSession,
		timestamp int64,
		buffer *atomic.Buffer,
		offset int32,
		length int32,
		header *logbuffer.Header,
	)

	OnTimerEvent(correlationId, timestamp int64)

	OnTakeSnapshot(publication *aeron.Publication)

	OnRoleChange(role Role)

	OnTerminate(cluster Cluster)

	OnNewLeadershipTermEvent(
		leadershipTermId int64,
		logPosition int64,
		timestamp int64,
		termBaseLogPosition int64,
		leaderMemberId int32,
		logSessionId int32,
		timeUnit codecs.ClusterTimeUnitEnum,
		appVersion int32,
	)
}

type ClusteredServiceAgent

type ClusteredServiceAgent struct {
	// contains filtered or unexported fields
}

func NewClusteredServiceAgent

func NewClusteredServiceAgent(
	aeronCtx *aeron.Context,
	options *Options,
	service ClusteredService,
) (*ClusteredServiceAgent, error)

func (*ClusteredServiceAgent) CancelTimer added in v1.0.8

func (agent *ClusteredServiceAgent) CancelTimer(correlationId int64) bool

func (*ClusteredServiceAgent) DoWork

func (agent *ClusteredServiceAgent) DoWork() int

func (*ClusteredServiceAgent) Idle added in v1.0.8

func (agent *ClusteredServiceAgent) Idle(workCount int)

func (*ClusteredServiceAgent) IdleStrategy added in v1.0.8

func (agent *ClusteredServiceAgent) IdleStrategy() idlestrategy.Idler

func (*ClusteredServiceAgent) LogPosition

func (agent *ClusteredServiceAgent) LogPosition() int64

func (*ClusteredServiceAgent) MemberId

func (agent *ClusteredServiceAgent) MemberId() int32

func (*ClusteredServiceAgent) OnStart

func (agent *ClusteredServiceAgent) OnStart() error

func (*ClusteredServiceAgent) Role

func (agent *ClusteredServiceAgent) Role() Role

func (*ClusteredServiceAgent) ScheduleTimer added in v1.0.8

func (agent *ClusteredServiceAgent) ScheduleTimer(correlationId int64, deadline int64) bool

func (*ClusteredServiceAgent) StartAndRun

func (agent *ClusteredServiceAgent) StartAndRun()

func (*ClusteredServiceAgent) Time

func (agent *ClusteredServiceAgent) Time() int64

type MarkFileHeaderFlyweight

type MarkFileHeaderFlyweight struct {
	flyweight.FWBase

	Version           flyweight.Int32Field
	ComponentType     flyweight.Int32Field
	ActivityTimestamp flyweight.Int64Field
	StartTimestamp    flyweight.Int64Field

	Pid                     flyweight.Int64Field
	CandidateTermId         flyweight.Int64Field
	ArchiveStreamId         flyweight.Int32Field
	ServiceStreamId         flyweight.Int32Field
	ConsensusModuleStreamId flyweight.Int32Field
	IngressStreamId         flyweight.Int32Field
	MemberId                flyweight.Int32Field
	ServiceId               flyweight.Int32Field

	HeaderLength      flyweight.Int32Field
	ErrorBufferLength flyweight.Int32Field
	ClusterId         flyweight.Int32Field
}

func (*MarkFileHeaderFlyweight) Wrap

func (f *MarkFileHeaderFlyweight) Wrap(
	buf *atomic.Buffer,
	offset int,
) flyweight.Flyweight

type Options

type Options struct {
	Timeout                 time.Duration      // [runtime] How long to try sending/receiving control messages
	IdleStrategy            idlestrategy.Idler // [runtime] Idlestrategy for sending/receiving control messagesA
	RangeChecking           bool               // [runtime] archive protocol marshalling checks
	Loglevel                zapcore.Level      // [runtime] via logging.SetLevel()
	ClusterDir              string
	ClusterId               int32
	ServiceId               int32
	AppVersion              int32
	ControlChannel          string
	ConsensusModuleStreamId int32
	ServiceStreamId         int32
	SnapshotChannel         string
	SnapshotStreamId        int32
	ReplayChannel           string
	ReplayStreamId          int32
	ArchiveOptions          *archive.Options
	LogFragmentLimit        int
}

func NewOptions

func NewOptions() *Options

type Role

type Role int32

Directories

Path Synopsis
Package codecs contains the archive protocol packet encoding and decoding
Package codecs contains the archive protocol packet encoding and decoding

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL