store

package
v0.18.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2021 License: Apache-2.0 Imports: 3 Imported by: 7

Documentation

Overview

Package store provides the development kit for working with a database-as-a-queue, so the jobs queue can be persisted in a datastore.

At any point in time, destinations will be in a state of failure. By using a store, in tandem with the scheduler, Blacksmith applications benefit a reliable system for delivering jobs at scale and keep track of successes, failures, and discards.

Index

Constants

This section is empty.

Variables

View Source
var Defaults = &Options{
	PurgePolicies: []*PurgePolicy{},
}

Defaults are the defaults options set for the store. When not set, these values will automatically be applied.

View Source
var InterfaceStore = "store"

InterfaceStore is the string representation for the store interface.

View Source
var StatusAcknowledged = "acknowledged"

StatusAcknowledged is used to mark a job status as acknowledged. This is used when receiveing new jobs from the gateway into the store.

View Source
var StatusAwaiting = "awaiting"

StatusAwaiting is used to mark a job status as awaiting. This is used when a job is awaiting to be run.

View Source
var StatusDiscarded = "discarded"

StatusDiscarded is used to mark a job status as discarded. This is used when a job reached the maximum retries possible so it will not try to run again.

View Source
var StatusExecuting = "executing"

StatusExecuting is used to mark a job status as executing. This is used when a job is being executed.

View Source
var StatusFailed = "failed"

StatusFailed is used to mark a job status as failed.

View Source
var StatusSucceeded = "succeeded"

StatusSucceeded is used to mark a job status as succeeded.

View Source
var StatusUnknown = "unknown"

StatusUnknown is used to mark a job status as unknown. This is used when the scheduler is not aware of a job's status. This only happen when an action does not return the job ID(s) in the "Then" channel. There is no way for the scheduler to associate the job ID(s) to the error or the actions to execute so it can only marks the status as unknown.

Functions

This section is empty.

Types

type Driver added in v0.17.0

type Driver string

Driver is a custom type allowing the user to only pass supported drivers when configuring the store adapter.

var DriverPostgreSQL Driver = "postgres"

DriverPostgreSQL is used to leverage PostgreSQL as the store adapter.

type Event

type Event struct {

	// ID is the unique identifier of the event. It must be a valid KSUID.
	//
	// Example: "1UYc8EebLqCAFMOSkbYZdJwNLAJ"
	ID string `json:"id"`

	// Source is the string representation of the event's source.
	Source string `json:"source"`

	// Trigger is the string representation of the event's trigger.
	Trigger string `json:"trigger"`

	// Version is the version number of the source used by the event when triggered.
	//
	// Examples: "v1.0", "2020-10-01"
	Version string `json:"version,omitempty"`

	// Context is the marshaled representation of the event's metadata.
	Context []byte `json:"context"`

	// Data is the marshaled representation of the event's data.
	Data []byte `json:"data"`

	// Jobs is a list of jobs to execute related to the event.
	Jobs []*Job `json:"jobs"`

	// SentAt is the timestamp of when the event is originally sent by the source.
	// It can be nil if none was provided.
	SentAt *time.Time `json:"sent_at,omitempty"`

	// ReceivedAt is the timestamp of when the event is received by the gateway.
	// This shall always be overridden by the gateway.
	ReceivedAt time.Time `json:"received_at"`

	// IngestedAt is a timestamp of the event creation date into the store.
	// This shall always be overridden by the store.
	IngestedAt *time.Time `json:"ingested_at,omitempty"`

	// ParentEventID is the ID of the parent event.
	//
	// Example: "1UYc8EebLqCAFMOSkbYZdJwNLAJ"
	ParentEventID *string `json:"parent_event_id,omitempty"`
}

Event define the fields stored in the datastore about an event.

type Job

type Job struct {

	// ID is the unique identifier of the job. It must be a valid KSUID.
	//
	// Example: "1UYc8EebLqCAFMOSkbYZdJwNLAJ"
	ID string `json:"id"`

	// Destination is the string representation of the destination the job needs to
	// run to.
	Destination string `json:"destination"`

	// Action is the string representation of the action to execute against the
	// destination.
	Action string `json:"action"`

	// Version is the version number of the destination used by a flow when executed.
	//
	// Examples: "v1.0", "2020-10-01"
	Version string `json:"version,omitempty"`

	// Context is the marshaled representation of the job's metadata.
	Context []byte `json:"context"`

	// Data is the marshaled representation of the job's data.
	Data []byte `json:"data"`

	// Transitions is an array of the job's transitions. It is used to keep track of
	// successes, failures, and errors so the store is aware of the job's status.
	//
	// Note: It is up to the adapter to only return the latest job's transition since
	// this is the only one that really matters in this context.
	Transitions [1]*Transition `json:"transitions"`

	// CreatedAt is a timestamp of the job creation date into the store.
	CreatedAt time.Time `json:"created_at"`

	// EventID is the ID of the event related to this job.
	EventID string `json:"event_id,omitempty"`

	// ParentJobID is the ID of the parent job.
	ParentJobID *string `json:"parent_job_id,omitempty"`
}

Job is the definition of a job that needs to run for a given action against a specific destination.

type Meta added in v0.11.0

type Meta struct {

	// Count is the number of entries found that match the constraints applied to
	// the query (without the limit).
	Count uint16 `json:"count"`

	// Pagination is the pagination details based on the count, offset, and limit.
	Pagination *rest.Pagination `json:"pagination"`

	// Where is the constraints applied to the query to find events, jobs, or
	// transitions. This is included in the meta because the store can set defaults
	// or override some constraints (such as a maximum limit). This allows to be aware
	// of the constraints actually applied to the query.
	Where *WhereEvents `json:"where"`
}

Meta includes information about the query's result returned by the store when looking for entries (events, jobs, or transitions).

type Options

type Options struct {

	// From is used to set the desired driver for the store adapter.
	From Driver `json:"from,omitempty"`

	// Connection is the connection string to connect to the store.
	Connection string `json:"-"`

	// PurgePolicies allows to define several intervals to purge entries in the
	// store given advanced constraints.
	PurgePolicies []*PurgePolicy `json:"purge"`
}

Options is the options a user can pass to use the store adapter.

type PurgePolicy added in v0.15.0

type PurgePolicy struct {

	// Define the constraints to retrieve the entries to purge from store.
	//
	// Note: Offset, limit, and pagination will not be applied.
	WhereEvents *WhereEvents `json:"where"`

	// Interval represents an interval or a CRON string at which events, jobs, and
	// transitions shall be purged from the store.
	Interval string `json:"interval"`
}

PurgePolicy is a policy to purge the store. It will run given the interval and will be applied given the constraints.

type Queue

type Queue struct {

	// Events is the collection of events including their related jobs and jobs'
	// status.
	Events []*Event `json:"events"`
}

Queue keeps track of events, their jobs, and their jobs' transitions.

type Store

type Store interface {

	// String returns the string representation of the adapter.
	//
	// Example: "postgres"
	String() string

	// Options returns the options originally passed to the Options struct. This
	// can be used to validate and override user's options if necessary.
	Options() *Options

	// AddEvents inserts a queue of events into the datastore given the data passed
	// in params. It returns an error if any occurred.
	AddEvents(*Toolkit, []*Event) error

	// FindEvent returns a event given the event ID passed in params.
	FindEvent(*Toolkit, string) (*Event, error)

	// FindEvents returns a list of events matching the constraints passed in params.
	// It also returns meta information about the query, such as pagination and the
	// constraints actually applied to it.
	FindEvents(*Toolkit, *WhereEvents) ([]*Event, *Meta, error)

	// AddJobs inserts a list of jobs into the datastore.
	AddJobs(*Toolkit, []*Job) error

	// FindJob returns a job given the job ID passed in params.
	FindJob(*Toolkit, string) (*Job, error)

	// FindJobs returns a list of jobs matching the constraints passed in params.
	// It also returns meta information about the query, such as pagination and the
	// constraints actually applied to it.
	FindJobs(*Toolkit, *WhereEvents) ([]*Job, *Meta, error)

	// AddTransitions inserts a list of transitions into the datastore to update
	// their related job status. We insert new transitions instead of updating the
	// job itself to keep track of the job's history.
	AddTransitions(*Toolkit, []*Transition) error

	// FindTransition returns a transition given the transition ID passed in params.
	FindTransition(*Toolkit, string) (*Transition, error)

	// FindTransitions returns a list of transitions matching the constraints passed
	// in params. It also returns meta information about the query, such as pagination
	// and the constraints really applied to it.
	FindTransitions(*Toolkit, *WhereEvents) ([]*Transition, *Meta, error)

	// Purge purges every events, jobs, and transitions from the store. It is run
	// for each purge policies defined in the store's options, at the defined
	// intervals.
	Purge(*Toolkit, *WhereEvents) error
}

Store is the interface used to persist the jobs queue in a datastore to keep track of jobs states.

type Toolkit

type Toolkit struct {

	// Logger gives access to the logrus Logger passed in options when creating the
	// Blacksmith application.
	Logger *logrus.Logger
}

Toolkit contains a suite of utilities and data to help the adapter successfully run the store functions.

type Transition

type Transition struct {

	// ID is the unique identifier of the transition. It must be a valid KSUID.
	//
	// Example: "1UYc8EebLqCAFMOSkbYZdJwNLAJ"
	ID string `json:"id"`

	// Attempt represents the number of tentatives that the job has run before
	// succeeded.
	Attempt uint16 `json:"attempt"`

	// StateBefore is the state of the job before running the new transition. This
	// shall be nil when receiving the job from the gateway.
	StateBefore *string `json:"state_before"`

	// StateAfter is the state of the job after running the new transition.
	StateAfter string `json:"state_after"`

	// Error keeps track of encountered error if any.
	Error error `json:"error"`

	// CreatedAt is a timestamp of the transition creation date into the store.
	CreatedAt time.Time `json:"created_at"`

	// EventID is the ID of the event related to this job's transition.
	EventID string `json:"event_id,omitempty"`

	// JobID is the ID of the job related to this transition.
	JobID string `json:"job_id,omitempty"`
}

Transition represents a job's transition to keep track of its states.

type WhereEvents added in v0.11.0

type WhereEvents struct {

	// SourcesIn makes sure the entries returned by the query have any of the source
	// name present in the slice.
	SourcesIn []string `json:"events.sources_in,omitempty"`

	// SourcesNotIn makes sure the entries returned by the query do not have any
	// of the source name present in the slice.
	SourcesNotIn []string `json:"events.sources_notin,omitempty"`

	// TriggersIn makes sure the entries returned by the query have any of the source's
	// trigger name present in the slice.
	TriggersIn []string `json:"events.triggers_in,omitempty"`

	// TriggersNotIn makes sure the entries returned by the query do not have any
	// of the source's trigger name present in the slice.
	TriggersNotIn []string `json:"events.triggers_notin,omitempty"`

	// VersionsIn makes sure the entries returned by the query have any of the source's
	// version present in the slice.
	VersionsIn []string `json:"events.versions_in,omitempty"`

	// VersionsNotIn makes sure the entries returned by the query do not have any
	// of the source's version present in the slice.
	VersionsNotIn []string `json:"events.versions_notin,omitempty"`

	// ReceivedBefore makes sure the entries returned by the query are related to
	// an event received before this instant.
	ReceivedBefore *time.Time `json:"events.received_before,omitempty"`

	// ReceivedAfter makes sure the entries returned by the query are related to
	// an event received after this instant.
	ReceivedAfter *time.Time `json:"events.received_after,omitempty"`

	// AndWhereJobs lets you define additional constraints related to the jobs for
	// the entries you are looking for.
	AndWhereJobs *WhereJobs `json:"jobs,omitempty"`

	// Offset specifies the number of entries to skip before starting to return entries
	// from the query.
	Offset uint16 `json:"offset"`

	// Limit specifies the number of entries to return after the offset clause has
	// been processed.
	Limit uint16 `json:"limit"`
}

WhereEvents is used to set constraints on the events when looking for entries into the store.

type WhereJobs added in v0.11.0

type WhereJobs struct {

	// EventID allows to find every entries related to a specific event ID.
	//
	// Note: When set, other constraints are not applied (except parent offset and
	// limit).
	EventID string `json:"event.id,omitempty"`

	// DestinationsIn makes sure the entries returned by the query have any of the
	// destination name present in the slice.
	DestinationsIn []string `json:"jobs.destinations_in,omitempty"`

	// DestinationsNotIn makes sure the entries returned by the query do not have any
	// of the destination name present in the slice.
	DestinationsNotIn []string `json:"jobs.destinations_notin,omitempty"`

	// ActionsIn makes sure the entries returned by the query have any of the destination's
	// action name present in the slice.
	ActionsIn []string `json:"jobs.actions_in,omitempty"`

	// ActionsNotIn makes sure the entries returned by the query do not have any of
	// the destination's action name present in the slice.
	ActionsNotIn []string `json:"jobs.actions_notin,omitempty"`

	// VersionsIn makes sure the entries returned by the query have any of the
	// destination's version present in the slice.
	VersionsIn []string `json:"jobs.versions_in,omitempty"`

	// VersionsNotIn makes sure the entries returned by the query do not have any of
	// the destination's version present in the slice.
	VersionsNotIn []string `json:"jobs.versions_notin,omitempty"`

	// CreatedBefore makes sure the entries returned by the query are related to a
	// job created before this instant.
	CreatedBefore *time.Time `json:"jobs.created_before,omitempty"`

	// CreatedAfter makes sure the entries returned by the query are related to a
	// job created after this instant.
	CreatedAfter *time.Time `json:"jobs.created_after,omitempty"`

	// AndWhereTransitions lets you define additional constraints related to the
	// transitions for the entries you are looking for.
	AndWhereTransitions *WhereTransitions `json:"transitions,omitempty"`
}

WhereJobs is used to set constraints on jobs when looking for entries into the store.

type WhereTransitions added in v0.11.0

type WhereTransitions struct {

	// JobID allows to find every entries related to a specific job ID.
	//
	// Note: When set, other constraints are not applied (except parent offset and
	// limit).
	JobID string `json:"job.id,omitempty"`

	// StatusIn makes sure the entries returned by the query have any of the status
	// present in the slice.
	StatusIn []string `json:"jobs.status_in,omitempty"`

	// StatusNotIn makes sure the entries returned by the query do not have any of
	// the status present in the slice.
	StatusNotIn []string `json:"jobs.status_notin,omitempty"`

	// MinAttempts makes sure the entries returned by the query have equal to or greater
	// than this number of attempts.
	MinAttempts uint16 `json:"jobs.min_attempts,omitempty"`

	// MaxAttempts makes sure the entries returned by the query have equal to or lesser
	// than this number of attempts.
	MaxAttempts uint16 `json:"jobs.max_attempts,omitempty"`
}

WhereTransitions is used to set constraints on transitions when looking for entries into the store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL