sparkv1

package
v1.53.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2024 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const ConsumerBatch = 15
View Source
const JobGetStageResultQuery = "GET_STAGE_RESULT"

Variables

View Source
var (
	ErrTargetNotPointer            = errors.New("unable to set value of non-pointer")
	ErrUnableToBindUnknownMimeType = errors.New("unable to bind with unknown mime type")
)
View Source
var (
	ErrStageNotFoundInNodeChain = errors.New("stage not found in the Node SparkChain")
	ErrConditionalStageSkipped  = errors.New("conditional Stage execution")
	ErrChainIsNotValid          = errors.New("SparkChain is not valid")
	ErrVariableNotFound         = errors.New("variable not found")
)
View Source
var (
	ErrInvalidStageResultMimeType = errors.New("stage result expects mime-type of application/json")
	ErrTemporalIoNotSupported     = errors.New("temporal IO provider does not support input/output referencing")
)
View Source
var CompleteError = func(ctx CompleteContext) StageError {
	return NewStageErrorWithCode(errorCodeInternal, errors.New("Complete failed"))
}
View Source
var CompleteSuccess = func(ctx CompleteContext) StageError {
	return nil
}
View Source
var (
	MimeJsonError = codec.MimeTypeJson.WithType("error")
)

Functions

This section is empty.

Types

type Bindable added in v1.6.0

type Bindable interface {
	Bind(a any) error
	GetValue() ([]byte, error)
	GetMimeType() string
	String() string
}

func NewBindableError added in v1.20.0

func NewBindableError(err error) Bindable

type BindableConfig added in v1.6.0

type BindableConfig interface {
	Bind(a any) error
	Raw() ([]byte, error)
}

type BindableMap added in v1.20.0

type BindableMap map[string]Bindable

type BindableValue added in v1.44.0

type BindableValue Value

func NewBindable added in v1.20.0

func NewBindable(value Value) *BindableValue

func NewBindableValue added in v1.36.0

func NewBindableValue(value any, mimeType string) *BindableValue

func (*BindableValue) Bind added in v1.44.0

func (b *BindableValue) Bind(a any) error

func (*BindableValue) GetMimeType added in v1.44.0

func (b *BindableValue) GetMimeType() string

func (*BindableValue) GetValue added in v1.44.0

func (b *BindableValue) GetValue() ([]byte, error)

func (*BindableValue) String added in v1.44.0

func (b *BindableValue) String() string

type Builder

type Builder interface {
	NewChain(name string) BuilderChain
	ChainFinalizer
}

Builder contract for the SparkChain builder

func NewBuilder

func NewBuilder() Builder

NewBuilder main entry point to the builder

type BuilderChain

type BuilderChain interface {
	ChainNode
}

BuilderChain the root of a SparkChain

type Chain

type Chain interface {
	// contains filtered or unexported methods
}

Chain finalizes a Node in the SparkChain, used internally to build a part of the SparkChain

type ChainCancelled

type ChainCancelled interface {
	Cancelled(newNode Chain) ChainComplete
}

ChainCancelled contract the builder must implement for cancellation

type ChainCancelledOrComplete

type ChainCancelledOrComplete interface {
	ChainCancelled
	ChainComplete
}

ChainCancelledOrComplete allows defining only Cancel or completion

type ChainCompensate

type ChainCompensate interface {
	Compensate(newNode Chain) ChainCancelledOrComplete
}

ChainCompensate contract the builder must implement for compensation

type ChainComplete

type ChainComplete interface {
	Complete(completeDefinitionFn CompleteDefinitionFn, options ...StageOption) Chain
}

ChainComplete contract the builder must implement for completion

type ChainFinalizer

type ChainFinalizer interface {
	BuildChain() *SparkChain
}

ChainFinalizer finalizes the entire SparkChain, used internally to build the SparkChain

type ChainNode

type ChainNode interface {
	ChainStage // must have at least 1 Stage
}

ChainNode a Node in the SparkChain

type ChainReport

type ChainReport struct {
	Errors   []error
	StageMap map[string]ChainReportStage
	NodeMap  map[string]ChainReportNode
}

type ChainReportNode

type ChainReportNode struct {
	Name          string
	CanCompensate bool
	CanCancel     bool
	// contains filtered or unexported fields
}

type ChainReportStage

type ChainReportStage struct {
	Name  string
	Crumb string
}

type ChainStage

type ChainStage interface {
	Stage(name string, stageDefinitionFn StageDefinitionFn, options ...StageOption) ChainStageAny
}

ChainStage a Stage in the SparkChain Node

type ChainStageAny

type ChainStageAny interface {
	ChainStage
	ChainCompensate
	ChainCancelled
	ChainComplete
}

ChainStageAny allows defining more Stages and at least 1 of each Compensate, cancelled or Complete

type CompleteContext

type CompleteContext interface {
	StageContext
	Output(variables ...*Var) error
	Name() string
}

func NewCompleteContext

func NewCompleteContext(ctx context.Context, req *ExecuteStageRequest, sparkDataIO SparkDataIO, name string, logger Logger, inputs map[string]Bindable) CompleteContext

type CompleteDefinitionFn

type CompleteDefinitionFn = func(ctx CompleteContext) StageError

type CompleteStage added in v1.20.0

type CompleteStage struct {
	Node *Node
	Name string
	// contains filtered or unexported fields
}

type Config

type Config struct {
	Id                     string        `yaml:"id"`
	Name                   string        `yaml:"Name"`
	NatsRequestSubject     string        `yaml:"nats_request_subject"`
	NatsResponseSubject    string        `yaml:"nats_response_subject"`
	NatsRequestStreamName  string        `yaml:"nats_request_stream_name"`
	NatsResponseStreamName string        `yaml:"nats_response_stream_name"`
	NatsBucket             string        `yaml:"nats_bucket"`
	RetryCount             uint          `yaml:"retry_count"`
	RetryBackoff           time.Duration `yaml:"retry_backoff"`
	RetryBackoffMultiplier uint          `yaml:"retry_backoff_multiplier"`
	Timeout                time.Duration `yaml:"timeout"`
	Health                 *configHealth `yaml:"health"`
	Server                 *configServer `yaml:"plugin"`
	Log                    *configLog    `yaml:"logging"`
	App                    *configApp    `yaml:"app"`
	Nats                   *configNats   `yaml:"nats"`
}

type ConfigType added in v1.8.0

type ConfigType string
const (
	ConfigTypeYaml ConfigType = "yaml"
	ConfigTypeJson ConfigType = "json"
)

type Context

type Context interface {
	context.Context
	JobKey() string
	CorrelationID() string
	TransactionID() string
}

func NewJobContext

func NewJobContext(metadata Context, opts *SparkOpts) Context

func NewSparkMetadata

func NewSparkMetadata(jobKey, correlationID, transactionID string, logger Logger) Context

type ErrorCode added in v1.28.0

type ErrorCode string
const (
	ErrorCodeGeneric ErrorCode = "GENERIC"
)

type ErrorOption

type ErrorOption = func(err *stageError) *stageError

func WithErrorCode

func WithErrorCode(errorCode ErrorCode) ErrorOption

func WithMetadata

func WithMetadata(metadata any) ErrorOption

func WithRetry

func WithRetry(times uint, backoffMultiplier uint, firstBackoffWait time.Duration) ErrorOption

type ExecuteSparkError added in v1.20.0

type ExecuteSparkError struct {
	StageName    string           `json:"stage_name"`
	ErrorCode    ErrorCode        `json:"error_code"`
	ErrorMessage string           `json:"error_message,omitempty"`
	Metadata     map[string]any   `json:"metadata,omitempty"`
	StackTrace   []StackTraceItem `json:"stack_trace"`
}

func (*ExecuteSparkError) Error added in v1.20.0

func (ese *ExecuteSparkError) Error() string

type ExecuteSparkInputs added in v1.20.0

type ExecuteSparkInputs map[string]*BindableValue

type ExecuteSparkOutput added in v1.20.0

type ExecuteSparkOutput struct {
	Error         *ExecuteSparkError `json:"error,omitempty"`
	JobPid        *JobPid            `json:"job_pid,omitempty"`
	VariablesKey  string             `json:"variables_key,omitempty"`
	JobKey        string             `json:"job_key,omitempty"`
	CorrelationId string             `json:"correlation_id,omitempty"`
	TransactionId string             `json:"transaction_id,omitempty"`
	Model         string             `json:"model,omitempty"`
	Outputs       BindableMap        `json:"outputs,omitempty"`
}

type ExecuteStageRequest added in v1.20.0

type ExecuteStageRequest struct {
	StageName     string
	TransactionId string
	CorrelationId string
	JobKey        string
	Inputs        map[string]Bindable
}

type ExecuteStageResponse added in v1.49.0

type ExecuteStageResponse struct {
	Outputs BindableMap        `json:"outputs,omitempty"`
	Error   *ExecuteSparkError `json:"error,omitempty"`
}

type Gettable added in v1.6.0

type Gettable interface {
	Get(name string) Bindable
}

type IOState added in v1.20.0

type IOState interface {
	GetVar(varName string) any
}

type InitContext added in v1.6.0

type InitContext interface {
	Config() BindableConfig
}

func NewInitContext added in v1.20.0

func NewInitContext(opts *SparkOpts) InitContext

type Input

type Input interface {
	Bindable
}

type Inputs

type Inputs interface {
	Get(name string) Bindable
}

type InternalStageTracker added in v1.20.0

type InternalStageTracker interface {
	SetStageResult(name string, value Bindable)
	SetStageStatus(name string, status StageStatus)
}

type JobContext added in v1.20.0

type JobContext struct {
	context.Context
	Metadata *JobMetadata
}

func (*JobContext) CorrelationID added in v1.20.0

func (jc *JobContext) CorrelationID() string

func (*JobContext) JobKey added in v1.20.0

func (jc *JobContext) JobKey() string

func (*JobContext) TransactionID added in v1.20.0

func (jc *JobContext) TransactionID() string

type JobMetadata added in v1.20.0

type JobMetadata struct {
	SparkId                string             `json:"spark_id"` // id of the spark to execute
	JobKeyValue            string             `json:"job_key"`
	CorrelationIdValue     string             `json:"correlation_id"`
	TransactionIdValue     string             `json:"transaction_id"`
	RetryCount             uint               `json:"retry_count"`
	RetryBackoff           time.Duration      `json:"retry_backoff"`
	RetryBackoffMultiplier uint               `json:"retry_backoff_multiplier"`
	JobPid                 *JobPid            `json:"job_pid,omitempty"`
	VariablesBucket        string             `json:"variables_bucket"`
	VariablesKey           string             `json:"variables_key"`
	Model                  string             `json:"model,omitempty"`
	Inputs                 ExecuteSparkInputs `json:"-"`
}

JobMetadata the context for the spark we want to execute on a module TODO this type should come from the Module Library

type JobPid added in v1.49.0

type JobPid struct {
	Address   string `json:"Address"`
	Id        string `json:"Id"`
	RequestId uint32 `json:"request_id"`
}

type JobState added in v1.20.0

type JobState struct {
	JobContext   *JobMetadata
	StageResults map[string]Bindable
}

type JobWorkflow added in v1.20.0

type JobWorkflow interface {
	Run(msg jetstream.Msg)
	ExecuteStageActivity(ctx context.Context, req *ExecuteStageRequest, io SparkDataIO) (Bindable, StageError)
	ExecuteCompleteActivity(ctx context.Context, req *ExecuteStageRequest, io SparkDataIO) (*ExecuteStageResponse, StageError)
}

func NewJobWorkflow added in v1.20.0

func NewJobWorkflow(ctx context.Context, sparkId string, chain *SparkChain, opts ...WorkflowOption) (JobWorkflow, error)

type Logger

type Logger interface {
	Info(format string, v ...any)
	Warn(format string, v ...any)
	Debug(format string, v ...any)
	Error(err error, format string, v ...any)
	AddFields(k string, v any) Logger
}

func NewLogger

func NewLogger() Logger

type Node added in v1.20.0

type Node struct {
	Stages     []*Stage
	Complete   *CompleteStage
	Cancel     *Node
	Compensate *Node
	Name       string
	// contains filtered or unexported fields
}

Node wraps all the Stages of a single SparkChain these are represented as one or more Stages but only one of each - cancellation - compensation - completion (finalizer)

func (*Node) ChainName added in v1.20.0

func (n *Node) ChainName() string

func (*Node) CompletionName added in v1.20.0

func (n *Node) CompletionName() string

func (*Node) CountOfStages added in v1.20.0

func (n *Node) CountOfStages() int

func (*Node) HasCancellationStage added in v1.20.0

func (n *Node) HasCancellationStage() bool

func (*Node) HasCompensationStage added in v1.20.0

func (n *Node) HasCompensationStage() bool

func (*Node) HasCompletionStage added in v1.20.0

func (n *Node) HasCompletionStage() bool

func (*Node) IsCancel added in v1.20.0

func (n *Node) IsCancel() bool

func (*Node) IsCompensate added in v1.20.0

func (n *Node) IsCompensate() bool

func (*Node) IsRoot added in v1.20.0

func (n *Node) IsRoot() bool

type Option

type Option = func(je *SparkOpts) *SparkOpts

func WithSparkConfig added in v1.20.0

func WithSparkConfig(cfg any) Option

type RetryConfig

type RetryConfig struct {
	Times             uint          `json:"times" yaml:"times"`
	FirstBackoffWait  time.Duration `json:"first_backoff_wait" yaml:"first_backoff_wait"`
	BackoffMultiplier uint          `json:"backoff_multiplier" yaml:"backoff_multiplier"`
}

type Spark

type Spark interface {
	BuildChain(b Builder) Chain
	Init(ctx InitContext) error
	Stop()
}

Spark the contract a developer must implement in order to be accepted by a worker

type SparkChain added in v1.20.0

type SparkChain struct {
	RootNode *Node
	// contains filtered or unexported fields
}

SparkChain represents the entire SparkChain the RootNode is the main entry point of the entire SparkChain it holds its own children as a tree below the RootNode

func (*SparkChain) GetStageCompleteFunc added in v1.20.0

func (sc *SparkChain) GetStageCompleteFunc(name string) CompleteDefinitionFn

func (*SparkChain) GetStageFunc added in v1.20.0

func (sc *SparkChain) GetStageFunc(name string) StageDefinitionFn

type SparkDataIO added in v1.20.0

type SparkDataIO interface {
	NewInput(name, stageName string, value *BindableValue) Bindable
	NewOutput(stageName string, value *BindableValue) (Bindable, error)
	GetStageResult(stageName string) (Bindable, error)
	PutStageResult(stageName string, stageValue []byte) (Bindable, error)
	LoadVariables(key string) error
	GetInputValue(name string) (*BindableValue, bool)
	SetInitialInputs(inputs ExecuteSparkInputs)
}

func NewIoDataProvider added in v1.49.0

func NewIoDataProvider(ctx context.Context, store jetstream.ObjectStore) SparkDataIO

type SparkOpts added in v1.20.0

type SparkOpts struct {
	// contains filtered or unexported fields
}

type StackTraceItem added in v1.25.0

type StackTraceItem struct {
	Type     string `json:"type"`
	Filepath string `json:"filepath"`
}

type Stage added in v1.20.0

type Stage struct {
	Node *Node
	Name string
	// contains filtered or unexported fields
}

func (*Stage) ApplyConditionalExecutionOptions added in v1.20.0

func (s *Stage) ApplyConditionalExecutionOptions(ctx Context, stageName string) StageError

type StageContext

type StageContext interface {
	Context
	Input(names string) Input
	StageResult(name string) Bindable
	Log() Logger
	Name() string
}

func NewStageContext

func NewStageContext(ctx context.Context, req *ExecuteStageRequest, sparkDataIO SparkDataIO, name string, logger Logger, inputs map[string]Bindable) StageContext

type StageDefinitionFn

type StageDefinitionFn = func(ctx StageContext) (any, StageError)

type StageError

type StageError interface {
	ErrorCode() ErrorCode
	StageName() string
	Error() string
	Metadata() map[string]any
	GetRetryConfig() *RetryConfig
	// contains filtered or unexported methods
}

func NewStageError

func NewStageError(err error, opts ...ErrorOption) StageError

func NewStageErrorWithCode added in v1.28.0

func NewStageErrorWithCode(errorCode ErrorCode, err error, opts ...ErrorOption) StageError

type StageOption

type StageOption = func(StageOptionParams) StageError

type StageOptionParams

type StageOptionParams interface {
	StageName() string
	Context() Context
}

type StageStatus

type StageStatus string
const (
	StageStatus_STAGE_PENDING   StageStatus = "STAGE_PENDING"
	StageStatus_STAGE_STARTED   StageStatus = "STAGE_STARTED"
	StageStatus_STAGE_COMPLETED StageStatus = "STAGE_COMPLETED"
	StageStatus_STAGE_FAILED    StageStatus = "STAGE_FAILED"
	StageStatus_STAGE_SKIPPED   StageStatus = "STAGE_SKIPPED"
	StageStatus_STAGE_CANCELED  StageStatus = "CANCELED"
)

type StageTracker added in v1.20.0

type StageTracker interface {
	GetStageResult(name string) (data any, mime codec.MimeType, err StageError)
	AssertStageCompleted(stageName string)
	AssertStageStarted(stageName string)
	AssertStageSkipped(stageName string)
	AssertStageCancelled(stageName string)
	AssertStageFailed(stageName string)
	AssertStageResult(stageName string, expectedStageResult any)
	AssertStageOrder(stageNames ...string)
}

type Value added in v1.20.0

type Value struct {
	Value    []byte `json:"value"`
	MimeType string `json:"mime_type"`
}

type Var

type Var struct {
	Name     string
	MimeType codec.MimeType
	Value    []byte
}

func NewVar

func NewVar(name string, mimeType codec.MimeType, value any) *Var

type Worker

type Worker interface {
	Run()
}

func NewSparkWorker

func NewSparkWorker(ctx context.Context, spark Spark, options ...Option) (Worker, error)

type WorkflowOption added in v1.20.0

type WorkflowOption = func(je *workflowOpts) *workflowOpts

func WithConfig added in v1.49.0

func WithConfig(cfg *Config) WorkflowOption

func WithInputs added in v1.49.0

func WithInputs(inputs ExecuteSparkInputs) WorkflowOption

func WithNatsClient added in v1.49.0

func WithNatsClient(nc *nats.Conn) WorkflowOption

func WithObjectStore added in v1.49.0

func WithObjectStore(os jetstream.ObjectStore) WorkflowOption

func WithStageRetryOverride added in v1.53.0

func WithStageRetryOverride(override *RetryConfig) WorkflowOption

func WithStageTracker added in v1.20.0

func WithStageTracker(ist InternalStageTracker) WorkflowOption

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL