workflow

package
v0.0.0-...-ae8e89f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2024 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RetryableErrorLimit = 10
)

Variables

Functions

func IsConsensusClusterFailure

func IsConsensusClusterFailure(err error) bool

func IsConsensusValidationFailure

func IsConsensusValidationFailure(err error) bool

func IsContinueAsNewError

func IsContinueAsNewError(err error) bool

func IsErrSessionFailed

func IsErrSessionFailed(sessionCtx workflow.Context, err error) bool

func IsNodeProviderFailed

func IsNodeProviderFailed(err error) bool

func IsScheduleToStartTimeout

func IsScheduleToStartTimeout(err error) bool

Types

type Backfiller

type Backfiller struct {
	// contains filtered or unexported fields
}

func NewBackfiller

func NewBackfiller(params BackfillerParams) *Backfiller

func (*Backfiller) Execute

func (w *Backfiller) Execute(ctx context.Context, request *BackfillerRequest) (client.WorkflowRun, error)

func (*Backfiller) StopWorkflow

func (w *Backfiller) StopWorkflow(ctx context.Context, workflowID string, reason string) error

type BackfillerParams

type BackfillerParams struct {
	fx.In
	fxparams.Params
	Runtime   cadence.Runtime
	Reader    *activity.Reader
	Extractor *activity.Extractor
	Loader    *activity.Loader
}

type BackfillerRequest

type BackfillerRequest struct {
	Tag                     uint32
	StartHeight             uint64
	EndHeight               uint64 `validate:"gt=0,gtfield=StartHeight"`
	UpdateWatermark         bool
	NumConcurrentExtractors int     // Optional. If not specified, it is read from the workflow config.
	BatchSize               uint64  // Optional. If not specified, it is read from the workflow config.
	MiniBatchSize           uint64  // Optional. If not specified, it is read from the workflow config.
	CheckpointSize          uint64  // Optional. If not specified, it is read from the workflow config.
	MaxReprocessedPerBatch  uint64  // Optional. If not specified, it is read from the workflow config.
	RehydrateFromTag        *uint32 // Optional. If not specified, rehydration is disabled.
	UpgradeFromTag          *uint32 // Optional. If not specified, upgrade is disabled.
	DataCompression         string  // Optional. If not specified, it is read from the workflow config.
	Failover                bool    // Optional. If not specified, it is set as false.
}

func (*BackfillerRequest) GetTags

func (r *BackfillerRequest) GetTags() map[string]string

type Benchmarker

type Benchmarker struct {
	// contains filtered or unexported fields
}

func NewBenchmarker

func NewBenchmarker(params BenchmarkerParams) *Benchmarker

func (*Benchmarker) Execute

func (w *Benchmarker) Execute(ctx context.Context, request *BenchmarkerRequest) (client.WorkflowRun, error)

func (*Benchmarker) StopWorkflow

func (w *Benchmarker) StopWorkflow(ctx context.Context, workflowID string, reason string) error

type BenchmarkerParams

type BenchmarkerParams struct {
	fx.In
	fxparams.Params
	Runtime cadence.Runtime
	Config  *config.Config
}

type BenchmarkerRequest

type BenchmarkerRequest struct {
	Tag                     uint32
	StartHeight             uint64
	EndHeight               uint64 `validate:"gt=0,gtfield=StartHeight"`
	StepSize                uint64 `validate:"gt=0"`
	SamplesToTest           uint64 `validate:"gt=0"`
	NumConcurrentExtractors int    `validate:"gt=0"`
	MiniBatchSize           uint64
}

type CrossValidator

type CrossValidator struct {
	// contains filtered or unexported fields
}

func NewCrossValidator

func NewCrossValidator(params CrossValidatorParams) *CrossValidator

func (*CrossValidator) Execute

func (*CrossValidator) StopWorkflow

func (w *CrossValidator) StopWorkflow(ctx context.Context, workflowID string, reason string) error

type CrossValidatorParams

type CrossValidatorParams struct {
	fx.In
	fxparams.Params
	Runtime        cadence.Runtime
	CrossValidator *activity.CrossValidator
}

type CrossValidatorRequest

type CrossValidatorRequest struct {
	StartHeight             uint64
	Tag                     uint32
	ValidationHeightPadding uint64 // Optional. If not specified, it is read from the workflow config.
	BatchSize               uint64 // Optional. If not specified, it is read from the workflow config.
	CheckpointSize          uint64 // Optional. If not specified, it is read from the workflow config.
	Parallelism             int    // Optional. If not specified, it is read from the workflow config.
	BackoffInterval         string // Optional. If not specified, it is read from the workflow config.
}

func (*CrossValidatorRequest) GetTags

func (r *CrossValidatorRequest) GetTags() map[string]string

type EventBackfiller

type EventBackfiller struct {
	// contains filtered or unexported fields
}

func NewEventBackfiller

func NewEventBackfiller(params EventBackfillerParams) *EventBackfiller

func (*EventBackfiller) Execute

func (*EventBackfiller) StopWorkflow

func (w *EventBackfiller) StopWorkflow(ctx context.Context, workflowID string, reason string) error

type EventBackfillerParams

type EventBackfillerParams struct {
	fx.In
	fxparams.Params
	Runtime         cadence.Runtime
	EventReader     *activity.EventReader
	EventReconciler *activity.EventReconciler
	EventLoader     *activity.EventLoader
}

type EventBackfillerRequest

type EventBackfillerRequest struct {
	Tag                 uint32
	EventTag            uint32
	UpgradeFromEventTag uint32
	StartSequence       uint64 `validate:"gt=0"`
	EndSequence         uint64 `validate:"gt=0,gtfield=StartSequence"`
	BatchSize           uint64 // Optional. If not specified, it is read from the workflow config.
	CheckpointSize      uint64 // Optional. If not specified, it is read from the workflow config.
}

func (EventBackfillerRequest) GetTags

func (w EventBackfillerRequest) GetTags() map[string]string

type InstrumentedRequest

type InstrumentedRequest interface {
	GetTags() map[string]string
}

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewManager

func NewManager(params ManagerParams) *Manager

type ManagerParams

type ManagerParams struct {
	fx.In
	fxparams.Params
	Lifecycle       fx.Lifecycle
	Runtime         cadence.Runtime
	Backfiller      *Backfiller
	Poller          *Poller
	Benchmarker     *Benchmarker
	Monitor         *Monitor
	Streamer        *Streamer
	CrossValidator  *CrossValidator
	EventBackfiller *EventBackfiller
	Replicator      *Replicator
}

type MetricOption

type MetricOption func(scope tally.Scope) tally.Scope

type Monitor

type Monitor struct {
	// contains filtered or unexported fields
}

func NewMonitor

func NewMonitor(params MonitorParams) *Monitor

func (*Monitor) Execute

func (w *Monitor) Execute(ctx context.Context, request *MonitorRequest) (client.WorkflowRun, error)

func (*Monitor) StopWorkflow

func (w *Monitor) StopWorkflow(ctx context.Context, workflowID string, reason string) error

type MonitorParams

type MonitorParams struct {
	fx.In
	fxparams.Params
	Runtime   cadence.Runtime
	Validator *activity.Validator
}

type MonitorRequest

type MonitorRequest struct {
	StartHeight             uint64
	Tag                     uint32
	StartEventId            int64  // Optional. If not specified or less than metastorage.EventIdStartValue, it will be set as metastorage.EventIdStartValue.
	ValidationHeightPadding uint64 // Optional. If not specified, it is read from the workflow config.
	BatchSize               uint64 // Optional. If not specified, it is read from the workflow config.
	EventBatchSize          uint64 // Optional. If not specified, it is read from the workflow config.
	CheckpointSize          uint64 // Optional. If not specified, it is read from the workflow config.
	BackoffInterval         string // Optional. If not specified, it is read from the workflow config.
	Parallelism             int    // Optional. If not specified, it is read from the workflow config.
	EventTag                uint32 // Optional.
	Failover                bool   // Optional. If not specified, it is set as false.
}

func (*MonitorRequest) GetTags

func (r *MonitorRequest) GetTags() map[string]string

type Poller

type Poller struct {
	// contains filtered or unexported fields
}

func NewPoller

func NewPoller(params PollerParams) *Poller

func (*Poller) Execute

func (w *Poller) Execute(ctx context.Context, request *PollerRequest) (client.WorkflowRun, error)

func (*Poller) StopWorkflow

func (w *Poller) StopWorkflow(ctx context.Context, workflowID string, reason string) error

type PollerParams

type PollerParams struct {
	fx.In
	fxparams.Params
	Runtime       cadence.Runtime
	Syncer        *activity.Syncer
	LivenessCheck *activity.LivenessCheck
}

type PollerRequest

type PollerRequest struct {
	Tag                          uint32
	MinStartHeight               uint64
	MaxBlocksToSync              uint64
	BackoffInterval              string
	Parallelism                  int
	CheckpointSize               uint64
	DataCompression              string
	RetryableErrorCount          int
	Failover                     bool
	ConsensusFailover            bool
	FastSync                     bool
	NumBlocksToSkip              uint64
	TransactionsWriteParallelism int
	ConsensusValidation          *bool
	ConsensusValidationMuted     *bool
	State                        *PollerState
}

func (*PollerRequest) GetTags

func (r *PollerRequest) GetTags() map[string]string

type PollerState

type PollerState struct {
	LastLivenessCheckTimestamp  int64
	LivenessCheckViolationCount uint64
}

type Replicator

type Replicator struct {
	// contains filtered or unexported fields
}

func NewReplicator

func NewReplicator(params ReplicatorParams) *Replicator

func (*Replicator) Execute

func (w *Replicator) Execute(ctx context.Context, request *ReplicatorRequest) (client.WorkflowRun, error)

func (*Replicator) StopWorkflow

func (w *Replicator) StopWorkflow(ctx context.Context, workflowID string, reason string) error

type ReplicatorParams

type ReplicatorParams struct {
	fx.In
	fxparams.Params
	Runtime         cadence.Runtime
	Replicator      *activity.Replicator
	UpdateWatermark *activity.UpdateWatermark
}

type ReplicatorRequest

type ReplicatorRequest struct {
	Tag             uint32
	StartHeight     uint64
	EndHeight       uint64 `validate:"gt=0,gtfield=StartHeight"`
	UpdateWatermark bool
	DataCompression string // Optional. If not specified, it is read from the workflow config.
	BatchSize       uint64 // Optional. If not specified, it is read from the workflow config.
	MiniBatchSize   uint64 // Optional. If not specified, it is read from the workflow config.
	CheckpointSize  uint64 // Optional. If not specified, it is read from the workflow config.
	Parallelism     int    // Optional. If not specified, it is read from the workflow config.
}

func (*ReplicatorRequest) GetTags

func (r *ReplicatorRequest) GetTags() map[string]string

GetTags implements InstrumentedRequest.

type Streamer

type Streamer struct {
	// contains filtered or unexported fields
}

func NewStreamer

func NewStreamer(params StreamerParams) *Streamer

func (*Streamer) Execute

func (w *Streamer) Execute(ctx context.Context, request *StreamerRequest) (client.WorkflowRun, error)

func (*Streamer) StopWorkflow

func (w *Streamer) StopWorkflow(ctx context.Context, workflowID string, reason string) error

type StreamerParams

type StreamerParams struct {
	fx.In
	fxparams.Params
	Runtime  cadence.Runtime
	Streamer *activity.Streamer
}

type StreamerRequest

type StreamerRequest struct {
	BatchSize             uint64 // Optional. If not specified, it is read from the workflow config.
	CheckpointSize        uint64 // Optional. If not specified, it is read from the workflow config.
	BackoffInterval       string // Optional. If not specified, it is read from the workflow config.
	MaxAllowedReorgHeight uint64 // Optional. If not specified, it is read from the workflow config.
	EventTag              uint32 // Optional.
	Tag                   uint32 // Optional.
}

func (*StreamerRequest) GetTags

func (r *StreamerRequest) GetTags() map[string]string

type WorkflowIdentity

type WorkflowIdentity int
const (
	UnknownIdentity WorkflowIdentity = iota
	BackfillerIdentity
	BenchmarkerIdentity
	MonitorIdentity
	PollerIdentity
	StreamerIdentity
	CrossValidatorIdentity
	EventBackfillerIdentity
	ReplicatorIdentity
)

func GetWorkflowIdentify

func GetWorkflowIdentify(name string) WorkflowIdentity

func (WorkflowIdentity) String

func (w WorkflowIdentity) String() (string, error)

func (WorkflowIdentity) UnmarshalJsonStringToRequest

func (w WorkflowIdentity) UnmarshalJsonStringToRequest(str string) (any, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL