integration

package
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 13, 2022 License: MPL-2.0 Imports: 10 Imported by: 16

Documentation

Overview

Package integration exposes types and utilities to create an integration following Temporal Land conventions.

Index

Constants

View Source
const SignalLockAcquired string = "lock-acquired"

SignalLockAcquired is the signal name sent when lock is acquired.

Note: It's exposed as a string and not a custom string type so it's easier to use in Signal-related functions when Temporal requires a signal name.

View Source
const SignalLockReleased string = "lock-released"

SignalLockReleased is the signal name sent when lock is released.

Note: It's exposed as a string and not a custom string type so it's easier to use in Signal-related functions when Temporal requires a signal name.

View Source
const SignalLockRequested string = "lock-requested"

SignalLockRequested is the signal name sent to request a lock.

Note: It's exposed as a string and not a custom string type so it's easier to use in Signal-related functions when Temporal requires a signal name.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {

	// Name is the name of the integration. If none, it shall defaults to the
	// integration's name.
	//
	// Example: "warehouse"
	Name string `json:"name"`

	// AllowPoliciesOverride indicates if the integration allows its policies to be
	// overridden by clients in the input when executing workflows and activities.
	AllowPoliciesOverride bool `json:"allow_policies_override"`

	// OnWorkflowError is the function called from a workflow of the integration in
	// case the said workflow has encountered an error. If the error comes from an
	// activity, OnActivityError will be called first. Most of the time, you would
	// want to execute an activity inside this function to notify of a workflow
	// execution error based on the number of attempts made or based on timeouts
	// reached.
	//
	// Example:
	//
	//   func(ctx workflow.Context, err error) {
	//     info := workflow.GetInfo(ctx)
	//
	//     if ((info.Attempt * 100) / info.RetryPolicy.MaximumAttempts) >= 80 {
	//       _ = workflow.ExecuteActivity(ctx, "activity.ToExecute", someInput).Get(ctx, nil)
	//     }
	//   }
	OnWorkflowError func(workflow.Context, error) `json:"-"`

	// OnActivityError is the function called from an activity of the integration
	// in case the said activity has encountered an error.
	OnActivityError func(context.Context, error) `json:"-"`
}

Config is the common configuration all integrations should embed in their own Config.

func ToConfig

func ToConfig(input map[string]any) Config

ToConfig transforms a map to integration's Config.

func (*Config) Validate added in v0.12.0

func (config *Config) Validate() error

Validate

type Integration

type Integration interface {

	// String returns the name of the integration.
	//
	// Example: "warehouse"
	String() string

	// Init tries to initialize the integration. This must be called after
	// registering the specifications of the integration and before starting the
	// Temporal worker.
	Init() error

	// Close tries to properly close the integration and its specifications.
	Close() error

	// IsReady indicates if the integration and the registered specifications are
	// ready to be used within the worker. Basically, the integration must be
	// initialized and must not be closed.
	IsReady() bool

	// Config returns the common integration's Config.
	Config() Config

	// ConfigMap returns a map of the integration's Config and its specifications.
	ConfigMap() map[string]any

	// ListWorkflows returns the workflows registered by the integration and its
	// specifications in the Temporal worker.
	ListWorkflows() []string

	// ListActivities returns the activities registered by the integration and its
	// specifications in the Temporal worker.
	ListActivities() []string
}

Integration is the common interface all integrations follow.

type Mutex

type Mutex struct {

	// SignalID is the unique identifier of the signal sent to acquire the lock.
	// It is a UUID.
	//
	// Required.
	SignalID string `json:"signal_id,omitempty"`

	// WorkflowID is the unique workflow identifier used by the integration executing
	// the mutex workflow.
	//
	// Required.
	WorkflowID string `json:"workflow_id,omitempty"`

	// RunID is the unqique workflow run identifier from the the integration executing
	// the mutex workflow. It is a UUID.
	//
	// Required.
	RunID string `json:"run_id,omitempty"`

	// Timeout is the duration the parent workflow executing the mutex is willing
	// to wait before workflow is completed from end-to-end. If the timeout is
	// exceeded, the mutex workflow is timedout as well.
	//
	// Required.
	Timeout time.Duration `json:"timeout,omitempty"`
}

Mutex holds the information regarding the mutex in use.

type With

type With func(*Wrapper) error

With allows an integration to attach optional features to the integration's Wrapper. For example, if an integration might need to access the Temporal client you might use:

func Wrap(myintegration, WithClient(client))

func WithClient

func WithClient(c client.Client) With

WithClient must be passed to New to attach the Temporal client from the overlying integration:

func Wrap(myintegration, WithClient(client))

This attachment is required to leverage the remote mutex when executing SQL operations. The mutex uses Temporal signals, and therefore a client is needed to "Signal and Start" the mutex workflow.

TODO(mutex): Is there a way to have access to the Temporal client without asking it in the Wrapper (and by consequence in the SQL-like Config)?

type Wrapper

type Wrapper struct {

	// Integration represents the integration to wrap.
	Integration
	// contains filtered or unexported fields
}

Wrapper allows to wrap an integration so it can leverage common features such as the mutex workflow.

func Wrap

func Wrap(from Integration, attachments ...With) *Wrapper

Wrap wraps an integration and attach some attachments, such as a Temporal client:

func Wrap(myintegration, WithClient(client))

func (*Wrapper) ActivityMutex

func (inte *Wrapper) ActivityMutex(ctx context.Context, input Mutex) (Mutex, error)

ActivityMutex is the activity in charge of (starting and) signaling the mutex workflow. It should be called inside a workflow that needs a mutex to lock/unlock resources for an integration within a namespace.

func (*Wrapper) WorkflowMutex

func (inte *Wrapper) WorkflowMutex(ctx workflow.Context, input Mutex) (Mutex, error)

WorkflowMutex is a Temporal workflow in charge of locking/unlock a particular resource within a particular namespace so that other workflows within the same namespace would wait until a resource lock is released. This is useful when we want to avoid race conditions or parallel mutually exclusive operations on the same resource.

As an example, this workflow is used by the Operations workflow of the sqlike specification, allowing SQL operations of a same integration to lock the mutex to avoid running multiple workflows at a same time. It first calls ActivityMutex to send a signal, then wait for acquired and released signals to process the workflow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL