Documentation ¶
Index ¶
- Constants
- type ClientSession
- type Cluster
- type ClusterMarkFile
- type ClusteredService
- type ClusteredServiceAgent
- func (agent *ClusteredServiceAgent) CancelTimer(correlationId int64) bool
- func (agent *ClusteredServiceAgent) DoWork() int
- func (agent *ClusteredServiceAgent) Idle(workCount int)
- func (agent *ClusteredServiceAgent) IdleStrategy() idlestrategy.Idler
- func (agent *ClusteredServiceAgent) LogPosition() int64
- func (agent *ClusteredServiceAgent) MemberId() int32
- func (agent *ClusteredServiceAgent) Offer(buffer *atomic.Buffer, offset, length int32) int64
- func (agent *ClusteredServiceAgent) OnStart() error
- func (agent *ClusteredServiceAgent) Role() Role
- func (agent *ClusteredServiceAgent) ScheduleTimer(correlationId int64, deadline int64) bool
- func (agent *ClusteredServiceAgent) StartAndRun() error
- func (agent *ClusteredServiceAgent) Time() int64
- func (agent *ClusteredServiceAgent) TimeUnit() codecs.ClusterTimeUnitEnum
- type MarkFileHeaderFlyweight
- type Options
- type Role
Constants ¶
View Source
const ( SBEHeaderLength = 8 SessionMessageHeaderLength = 24 )
View Source
const ( Follower Role = 0 Candidate = 1 Leader = 2 )
View Source
const ( ClusterSchemaId = 111 ClusterSchemaVersion = 8 SessionMessageHeaderTemplateId = 1 SessionEventTemplateId = 2 SessionCloseRequestTemplateId = 4 SessionKeepAliveTemplateId = 5 NewLeaderEventTemlateId = 6 ChallengeTemplateId = 7 )
View Source
const ( HeaderLength = 8 * 1024 ErrorBufferLength = 1024 * 1024 )
View Source
const (
ClientSessionMockedOffer = 1
)
View Source
const NullPosition = -1
View Source
const NullValue = -1
View Source
const SessionMessageHdrBlockLength = 24
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ClientSession ¶
type ClientSession interface { Id() int64 ResponseStreamId() int32 ResponseChannel() string EncodedPrincipal() []byte Close() // TODO: the other close methods are not part of interface. // I don't understand the closing bool implementation and why it is needed // Leaving out for now unless it is really important // IsClosing() bool Offer(*atomic.Buffer, int32, int32, term.ReservedValueSupplier) int64 }
type Cluster ¶
type Cluster interface { // LogPosition returns the position the log has reached in bytes as of the current message LogPosition() int64 // MemberId returns the unique id for the hosting member of the cluster. Useful only for debugging purposes MemberId() int32 // Role returns the role the cluster node is playing Role() Role // Time returns the cluster time as time units since 1 Jan 1970 UTC Time() int64 // TimeUnit returns the unit of time applied when timestamping and time operations TimeUnit() codecs.ClusterTimeUnitEnum // IdleStrategy returns the IdleStrategy which should be used by the service when it experiences back-pressure on egress, // closing sessions, making timer requests, or any long-running actions IdleStrategy() idlestrategy.Idler // ScheduleTimer schedules a timer for a given deadline and provide a correlation id to identify the timer when it expires or // for cancellation. This action is asynchronous and will race with the timer expiring. // // If the correlationId is for an existing scheduled timer then it will be rescheduled to the new deadline. However, // it is best to generate correllationIds in a monotonic fashion and be aware of potential clashes with other // services in the same cluster. Service isolation can be achieved by using the upper bits for service id. // // Timers should only be scheduled or cancelled in the context of processing // - onSessionMessage // - onTimerEvent // - onSessionOpen // - onSessionClose // If applied to other events then they are not guaranteed to be reliable. // // Callers of this method should loop until the method succeeds. // // The cluster's idle strategy must be used in the body of the loop to allow for the clustered service to be // shutdown if required. // // ScheduleTimer returns true if the event to schedule a timer request has been sent or false if back-pressure is applied ScheduleTimer(correlationId int64, deadline int64) bool // CancelTimer cancels a previously scheduled timer. This action is asynchronous and will race with the timer expiring. // // Timers should only be scheduled or cancelled in the context of processing // - onSessionMessage // - onTimerEvent // - onSessionOpen // - onSessionClose // If applied to other events then they are not guaranteed to be reliable. // // Callers of this method should loop until the method succeeds. // // CancelTimer returns true if the event to cancel request has been sent or false if back-pressure is applied. CancelTimer(correlationId int64) bool // Offer a message as ingress to the cluster for sequencing. This will happen efficiently over IPC to the // consensus module and set the cluster session as the negative value of the cluster's ServiceID // // Callers of this method should loop until the method succeeds. // // The cluster's idle strategy must be used in the body of the loop to allow for the clustered service to be // shutdown if required Offer(*atomic.Buffer, int32, int32) int64 }
type ClusterMarkFile ¶
type ClusterMarkFile struct {
// contains filtered or unexported fields
}
func NewClusterMarkFile ¶
func NewClusterMarkFile(filename string) (*ClusterMarkFile, error)
func (*ClusterMarkFile) SignalFailedStart ¶
func (cmf *ClusterMarkFile) SignalFailedStart()
func (*ClusterMarkFile) SignalReady ¶
func (cmf *ClusterMarkFile) SignalReady()
func (*ClusterMarkFile) UpdateActivityTimestamp ¶
func (cmf *ClusterMarkFile) UpdateActivityTimestamp(timestamp int64)
type ClusteredService ¶
type ClusteredService interface { // OnStart is called to initialize the service and load snapshot state, where the snapshot image can be nil if no previous snapshot exists. // // Note: As this can potentially be a long-running operation, the implementation should use Cluster.IdleStrategy() and // occasionally call IdleStrategy.Idle(), especially when polling the Image returns 0 OnStart(cluster Cluster, image aeron.Image) // OnSessionOpen notifies the clustered service that a session has been opened for a client to the cluster OnSessionOpen(session ClientSession, timestamp int64) // OnSessionClose notifies the clustered service that a session has been closed for a client to the cluster OnSessionClose( session ClientSession, timestamp int64, closeReason codecs.CloseReasonEnum, ) // OnSessionMessage notifies the clustered service that a message has been received to be processed by a clustered service OnSessionMessage( session ClientSession, timestamp int64, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header, ) // OnTimerEvent notifies the clustered service that a scheduled timer has expired OnTimerEvent(correlationId, timestamp int64) // OnTakeSnapshot instructs the clustered service to take a snapshot and store its state to the provided aeron archive Publication. // // Note: As this is a potentially long-running operation the implementation should use // Cluster.idleStrategy() and then occasionally call IdleStrategy.idle() // especially when the snapshot ExclusivePublication returns Publication.BACK_PRESSURED OnTakeSnapshot(publication *aeron.Publication) // OnRoleChange notifies the clustered service that the cluster node has changed role OnRoleChange(role Role) // OnTerminate notifies the clustered service that the container is going to terminate OnTerminate(cluster Cluster) // OnNewLeadershipTermEvent notifies the clustered service that an election has been successful and a leader has entered a new term OnNewLeadershipTermEvent( leadershipTermId int64, logPosition int64, timestamp int64, termBaseLogPosition int64, leaderMemberId int32, logSessionId int32, timeUnit codecs.ClusterTimeUnitEnum, appVersion int32, ) }
type ClusteredServiceAgent ¶
type ClusteredServiceAgent struct {
// contains filtered or unexported fields
}
func NewClusteredServiceAgent ¶
func NewClusteredServiceAgent( aeronCtx *aeron.Context, options *Options, service ClusteredService, ) (*ClusteredServiceAgent, error)
func (*ClusteredServiceAgent) CancelTimer ¶
func (agent *ClusteredServiceAgent) CancelTimer(correlationId int64) bool
func (*ClusteredServiceAgent) DoWork ¶
func (agent *ClusteredServiceAgent) DoWork() int
func (*ClusteredServiceAgent) Idle ¶
func (agent *ClusteredServiceAgent) Idle(workCount int)
func (*ClusteredServiceAgent) IdleStrategy ¶
func (agent *ClusteredServiceAgent) IdleStrategy() idlestrategy.Idler
func (*ClusteredServiceAgent) LogPosition ¶
func (agent *ClusteredServiceAgent) LogPosition() int64
func (*ClusteredServiceAgent) MemberId ¶
func (agent *ClusteredServiceAgent) MemberId() int32
func (*ClusteredServiceAgent) Offer ¶
func (agent *ClusteredServiceAgent) Offer(buffer *atomic.Buffer, offset, length int32) int64
func (*ClusteredServiceAgent) OnStart ¶
func (agent *ClusteredServiceAgent) OnStart() error
func (*ClusteredServiceAgent) Role ¶
func (agent *ClusteredServiceAgent) Role() Role
func (*ClusteredServiceAgent) ScheduleTimer ¶
func (agent *ClusteredServiceAgent) ScheduleTimer(correlationId int64, deadline int64) bool
func (*ClusteredServiceAgent) StartAndRun ¶
func (agent *ClusteredServiceAgent) StartAndRun() error
func (*ClusteredServiceAgent) Time ¶
func (agent *ClusteredServiceAgent) Time() int64
func (*ClusteredServiceAgent) TimeUnit ¶
func (agent *ClusteredServiceAgent) TimeUnit() codecs.ClusterTimeUnitEnum
type MarkFileHeaderFlyweight ¶
type MarkFileHeaderFlyweight struct { flyweight.FWBase Version flyweight.Int32Field ComponentType flyweight.Int32Field ActivityTimestamp flyweight.Int64Field StartTimestamp flyweight.Int64Field Pid flyweight.Int64Field CandidateTermId flyweight.Int64Field ArchiveStreamId flyweight.Int32Field ServiceStreamId flyweight.Int32Field ConsensusModuleStreamId flyweight.Int32Field IngressStreamId flyweight.Int32Field MemberId flyweight.Int32Field ServiceId flyweight.Int32Field HeaderLength flyweight.Int32Field ErrorBufferLength flyweight.Int32Field ClusterId flyweight.Int32Field }
type Options ¶
type Options struct { Timeout time.Duration // [runtime] How long to try sending/receiving control messages IdleStrategy idlestrategy.Idler // [runtime] Idlestrategy for sending/receiving control messagesA RangeChecking bool // [runtime] archive protocol marshalling checks Loglevel zapcore.Level // [runtime] via logging.SetLevel() ClusterDir string ClusterId int32 ServiceId int32 AppVersion int32 ControlChannel string ConsensusModuleStreamId int32 ServiceStreamId int32 SnapshotChannel string SnapshotStreamId int32 ReplayChannel string ReplayStreamId int32 ArchiveOptions *archive.Options LogFragmentLimit int }
func NewOptions ¶
func NewOptions() *Options
Source Files ¶
Click to show internal directories.
Click to hide internal directories.