work

package module
v4.0.0-beta.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 15, 2023 License: Apache-2.0 Imports: 10 Imported by: 0

README

work

A compact library for tracking and committing atomic changes to your entities.

GoDoc Build Status Coverage Status Release License Blog

Demo

make demo (requires docker-compose).

How to use it?

Construction

Starting with entities Foo and Bar,

// entities.
f, b := Foo{}, Bar{}

// type names.
ft, bt := unit.TypeNameOf(f), unit.TypeNameOf(b)

// data mappers.
m := map[unit.TypeName]unit.DataMapper { ft: fdm, bt: fdm }

// 🎉
opts = []unit.Option{ unit.DB(db), unit.DataMappers(m) }
unit, err := unit.New(opts...)
Adding

When creating new entities, use Add:

additions := []interface{}{ f, b }
err := u.Add(additions...)
Updating

When modifying existing entities, use Alter:

updates := []interface{}{ f, b }
err := u.Alter(updates...)
Removing

When removing existing entities, use Remove:

removals := []interface{}{ f, b }
err := u.Remove(removals...)
Registering

When retrieving existing entities, track their intial state using Register:

fetched := []interface{}{ f, b }
err := u.Register(fetched...)
Saving

When you are ready to commit your work unit, use Save:

ctx := context.Background()
err := u.Save(ctx)
Logging

We use zap as our logging library of choice. To leverage the logs emitted from the work units, utilize the unit.Logger option with an instance of *zap.Logger upon creation:

// create logger.
l, _ := zap.NewDevelopment()

opts = []unit.Option{
	unit.DB(db),
	unit.DataMappers(m),
	unit.Logger(l), // 🎉
}
u, err := unit.New(opts...)
Metrics

For emitting metrics, we use tally. To utilize the metrics emitted from the work units, leverage the unit.Scope option with a tally.Scope upon creation. Assuming we have a scope s, it would look like so:

opts = []unit.Option{
	unit.DB(db),
	unit.DataMappers(m),
	unit.Scope(s), // 🎉
}
u, err := unit.New(opts...)
Emitted Metrics

Name Type Description
[PREFIX.]unit.save.success counter The number of successful work unit saves.
[PREFIX.]unit.save timer The time duration when saving a work unit.
[PREFIX.]unit.rollback.success counter The number of successful work unit rollbacks.
[PREFIX.]unit.rollback.failure counter The number of unsuccessful work unit rollbacks.
[PREFIX.]unit.rollback timer The time duration when rolling back a work unit.
[PREFIX.]unit.retry.attempt counter The number of retry attempts.
[PREFIX.]unit.insert counter The number of successful inserts performed.
[PREFIX.]unit.update counter The number of successful updates performed.
[PREFIX.]unit.delete counter The number of successful deletes performed.
[PREFIX.]unit.cache.insert counter The number of registered entities inserted into the cache.
[PREFIX.]unit.cache.delete counter The number of registered entities removed from the cache.
Uniters

In most circumstances, an application has many aspects that result in the creation of a work unit. To tackle that challenge, we recommend using unit.Uniter to create instances of unit., like so:

opts = []unit.Option{
	unit.DB(db),
	unit.DataMappers(m),
	unit.Logger(l),
}
uniter := unit.NewUniter(opts...)

// create the unit.
u, err := uniter.Unit()

Frequently Asked Questions (FAQ)

Are batch data mapper operations supported?

In short, yes.

A work unit can accommodate an arbitrary number of entity types. When creating the work unit, you indicate the data mappers that it should use when persisting the desired state. These data mappers are organized by entity type. As such, batching occurs for each operation and entity type pair.

For example, assume we have a single work unit and have performed a myriad of unit operations for entities with either a type of Foo or Bar. All inserts for entities of type Foo will be passed to the corresponding data mapper in one shot via the Insert method. This essentially then relinquishes control to you, the author of the data mapper, to handle all of those entities to be inserted in however you see fit. You could choose to insert them all into a relational database using a single INSERT query, or perhaps issue an HTTP request to an API to create all of those entities. However, inserts for entities of type Bar will be batched separately. In fact, it's most likely the data mapper to handle inserts for Foo and Bar are completely different types (and maybe even completely different data stores).

The same applies for other operations such as updates and deletions. All supported data mapper operations follow this paradigm.

Documentation

Index

Constants

View Source
const (
	// UnitActionTypeAfterRegister indicates an action type that occurs after an entity is registered.
	UnitActionTypeAfterRegister = iota
	// UnitActionTypeAfterAdd indicates an action type that occurs after an entity is added.
	UnitActionTypeAfterAdd
	// UnitActionTypeAfterAlter indicates an action type that occurs after an entity is altered.
	UnitActionTypeAfterAlter
	// UnitActionTypeAfterRemove indicates an action type that occurs after an entity is removed.
	UnitActionTypeAfterRemove
	// UnitActionTypeAfterInserts indicates an action type that occurs after new entities are inserted in the data store.
	UnitActionTypeAfterInserts
	// UnitActionTypeAfterUpdates indicates an action type that occurs after existing entities are updated in the data store.
	UnitActionTypeAfterUpdates
	// UnitActionTypeAfterDeletes indicates an action type that occurs after existing entities are deleted in the data store.
	UnitActionTypeAfterDeletes
	// UnitActionTypeAfterRollback indicates an action type that occurs after rollback.
	UnitActionTypeAfterRollback
	// UnitActionTypeAfterSave indicates an action type that occurs after save.
	UnitActionTypeAfterSave
	// UnitActionTypeBeforeRegister indicates an action type that occurs before an entity is registered.
	UnitActionTypeBeforeRegister
	// UnitActionTypeBeforeAdd indicates an action type that occurs before an entity is added.
	UnitActionTypeBeforeAdd
	// UnitActionTypeBeforeAlter indicates an action type that occurs before an entity is altered.
	UnitActionTypeBeforeAlter
	// UnitActionTypeBeforeRemove indicates an action type that occurs before an entity is removed.
	UnitActionTypeBeforeRemove
	// UnitActionTypeBeforeInserts indicates an action type that occurs before new entities are inserted in the data store.
	UnitActionTypeBeforeInserts
	// UnitActionTypeBeforeUpdates indicates an action type that occurs before existing entities are updated in the data store.
	UnitActionTypeBeforeUpdates
	// UnitActionTypeBeforeDeletes indicates an action type that occurs before existing entities are deleted in the data store.
	UnitActionTypeBeforeDeletes
	// UnitActionTypeBeforeRollback indicates an action type that occurs before rollback.
	UnitActionTypeBeforeRollback
	// UnitActionTypeBeforeSave indicates an action type that occurs before save.
	UnitActionTypeBeforeSave
)

The various types of actions that are executed throughout the lifecycle of a work unit.

View Source
const (
	// Fixed represents a retry type that maintains a constaint delay between retry iterations.
	UnitRetryDelayTypeFixed = iota
	// BackOff represents a retry type that increases delay between retry iterations.
	UnitRetryDelayTypeBackOff
	// Random represents a retry type that utilizes a random delay between retry iterations.
	UnitRetryDelayTypeRandom
)

Variables

View Source
var (

	// ErrMissingDataMapper represents the error that is returned
	// when attempting to add, alter, remove, or register an entity
	// that doesn't have a corresponding data mapper.
	ErrMissingDataMapper = errors.New("missing data mapper or data mapper function for entity")

	// ErrNoDataMapper represents the error that occurs when attempting
	// to create a work unit without any data mappers.
	ErrNoDataMapper = errors.New("must have at least one data mapper or data mapper function")
)
View Source
var (
	// UnitDB specifies the option to provide the database for the work unit.
	UnitDB = func(db *sql.DB) UnitOption {
		return func(o *UnitOptions) {
			o.db = db
		}
	}

	// UnitDataMappers specifies the option to provide the data mappers for
	// the work unit.
	UnitDataMappers = func(dm map[TypeName]UnitDataMapper) UnitOption {
		return func(o *UnitOptions) {
			if dm == nil || len(dm) == 0 {
				return
			}
			if o.insertFuncs == nil {
				o.insertFuncs = make(map[TypeName]UnitDataMapperFunc)
			}
			if o.updateFuncs == nil {
				o.updateFuncs = make(map[TypeName]UnitDataMapperFunc)
			}
			if o.deleteFuncs == nil {
				o.deleteFuncs = make(map[TypeName]UnitDataMapperFunc)
			}
			for typeName, dataMapper := range dm {
				o.insertFuncs[typeName] = dataMapper.Insert
				o.insertFuncsLen = o.insertFuncsLen + 1
				o.updateFuncs[typeName] = dataMapper.Update
				o.updateFuncsLen = o.updateFuncsLen + 1
				o.deleteFuncs[typeName] = dataMapper.Delete
				o.deleteFuncsLen = o.deleteFuncsLen + 1
			}
		}
	}

	// UnitZapLogger specifies the option to provide a zap logger for the
	// work unit.
	UnitZapLogger = func(l *zap.Logger) UnitOption {
		return func(o *UnitOptions) {
			o.logger = l
		}
	}

	// UnitTallyMetricScope specifies the option to provide a tally metric
	// scope for the work unit.
	UnitTallyMetricScope = func(s tally.Scope) UnitOption {
		return func(o *UnitOptions) {
			o.scope = s
		}
	}

	// UnitAfterRegisterActions specifies the option to provide actions to execute
	// after entities are registered with the work unit.
	UnitAfterRegisterActions = func(a ...UnitAction) UnitOption {
		return setActions(UnitActionTypeAfterRegister, a...)
	}

	// UnitAfterAddActions specifies the option to provide actions to execute
	// after entities are added with the work unit.
	UnitAfterAddActions = func(a ...UnitAction) UnitOption {
		return setActions(UnitActionTypeAfterAdd, a...)
	}

	// UnitAfterAlterActions specifies the option to provide actions to execute
	// after entities are altered with the work unit.
	UnitAfterAlterActions = func(a ...UnitAction) UnitOption {
		return setActions(UnitActionTypeAfterAlter, a...)
	}

	// UnitAfterRemoveActions specifies the option to provide actions to execute
	// after entities are removed with the work unit.
	UnitAfterRemoveActions = func(a ...UnitAction) UnitOption {
		return setActions(UnitActionTypeAfterRemove, a...)
	}

	// UnitAfterInsertsActions specifies the option to provide actions to execute
	// after new entities are inserted in the data store.
	UnitAfterInsertsActions = func(a ...UnitAction) UnitOption {
		return setActions(UnitActionTypeAfterInserts, a...)
	}

	// UnitAfterUpdatesActions specifies the option to provide actions to execute
	// after altered entities are updated in the data store.
	UnitAfterUpdatesActions = func(a ...UnitAction) UnitOption {
		return setActions(UnitActionTypeAfterUpdates, a...)
	}

	// UnitAfterDeletesActions specifies the option to provide actions to execute
	// after removed entities are deleted in the data store.
	UnitAfterDeletesActions = func(a ...UnitAction) UnitOption {
		return setActions(UnitActionTypeAfterDeletes, a...)
	}

	// UnitAfterRollbackActions specifies the option to provide actions to execute
	// after a rollback is performed.
	UnitAfterRollbackActions = func(a ...UnitAction) UnitOption {
		return setActions(UnitActionTypeAfterRollback, a...)
	}

	// UnitAfterSaveActions specifies the option to provide actions to execute
	// after a save is performed.
	UnitAfterSaveActions = func(a ...UnitAction) UnitOption {
		return setActions(UnitActionTypeAfterSave, a...)
	}

	// UnitBeforeInsertsActions specifies the option to provide actions to execute
	// before new entities are inserted in the data store.
	UnitBeforeInsertsActions = func(a ...UnitAction) UnitOption {
		return setActions(UnitActionTypeBeforeInserts, a...)
	}

	// UnitBeforeUpdatesActions specifies the option to provide actions to execute
	// before altered entities are updated in the data store.
	UnitBeforeUpdatesActions = func(a ...UnitAction) UnitOption {
		return setActions(UnitActionTypeBeforeUpdates, a...)
	}

	// UnitBeforeDeletesActions specifies the option to provide actions to execute
	// before removed entities are deleted in the data store.
	UnitBeforeDeletesActions = func(a ...UnitAction) UnitOption {
		return setActions(UnitActionTypeBeforeDeletes, a...)
	}

	// UnitBeforeRollbackActions specifies the option to provide actions to execute
	// before a rollback is performed.
	UnitBeforeRollbackActions = func(a ...UnitAction) UnitOption {
		return setActions(UnitActionTypeBeforeRollback, a...)
	}

	// UnitBeforeSaveActions specifies the option to provide actions to execute
	// before a save is performed.
	UnitBeforeSaveActions = func(a ...UnitAction) UnitOption {
		return setActions(UnitActionTypeBeforeSave, a...)
	}

	// UnitDefaultLoggingActions specifies all of the default logging actions.
	UnitDefaultLoggingActions = func() UnitOption {
		beforeInsertLogAction := func(ctx UnitActionContext) {
			ctx.Logger.Debug(
				"attempting to insert entities",
				zap.Int("count", ctx.AdditionCount),
			)
		}
		afterInsertLogAction := func(ctx UnitActionContext) {
			ctx.Logger.Debug(
				"successfully inserted entities",
				zap.Int("count", ctx.AdditionCount),
			)
		}
		beforeUpdateLogAction := func(ctx UnitActionContext) {
			ctx.Logger.Debug(
				"attempting to update entities",
				zap.Int("count", ctx.AlterationCount),
			)
		}
		afterUpdateLogAction := func(ctx UnitActionContext) {
			ctx.Logger.Debug(
				"successfully updated entities",
				zap.Int("count", ctx.AlterationCount),
			)
		}
		beforeDeleteLogAction := func(ctx UnitActionContext) {
			ctx.Logger.Debug(
				"attempting to delete entities",
				zap.Int("count", ctx.RemovalCount),
			)
		}
		afterDeleteLogAction := func(ctx UnitActionContext) {
			ctx.Logger.Debug(
				"successfully deleted entities",
				zap.Int("count", ctx.RemovalCount),
			)
		}
		beforeSaveLogAction := func(ctx UnitActionContext) {
			ctx.Logger.Debug("attempting to save unit")
		}
		afterSaveLogAction := func(ctx UnitActionContext) {
			totalCount :=
				ctx.AdditionCount + ctx.AlterationCount + ctx.RemovalCount
			ctx.Logger.Info("successfully saved unit",
				zap.Int("insertCount", ctx.AdditionCount),
				zap.Int("updateCount", ctx.AlterationCount),
				zap.Int("deleteCount", ctx.RemovalCount),
				zap.Int("registerCount", ctx.RegisterCount),
				zap.Int("totalUpdateCount", totalCount))
		}
		beforeRollbackLogAction := func(ctx UnitActionContext) {
			ctx.Logger.Debug("attempting to roll back unit")
		}
		afterRollbackLogAction := func(ctx UnitActionContext) {
			ctx.Logger.Info("successfully rolled back unit")
		}
		return func(o *UnitOptions) {
			subOpts := []UnitOption{
				setActions(UnitActionTypeBeforeInserts, beforeInsertLogAction),
				setActions(UnitActionTypeAfterInserts, afterInsertLogAction),
				setActions(UnitActionTypeBeforeUpdates, beforeUpdateLogAction),
				setActions(UnitActionTypeAfterUpdates, afterUpdateLogAction),
				setActions(UnitActionTypeBeforeDeletes, beforeDeleteLogAction),
				setActions(UnitActionTypeAfterDeletes, afterDeleteLogAction),
				setActions(UnitActionTypeBeforeSave, beforeSaveLogAction),
				setActions(UnitActionTypeAfterSave, afterSaveLogAction),
				setActions(UnitActionTypeBeforeRollback, beforeRollbackLogAction),
				setActions(UnitActionTypeAfterRollback, afterRollbackLogAction),
			}
			for _, opt := range subOpts {
				opt(o)
			}
		}
	}

	// DisableDefaultLoggingActions disables the default logging actions.
	DisableDefaultLoggingActions = func() UnitOption {
		return func(o *UnitOptions) {
			o.disableDefaultLoggingActions = true
		}
	}

	// UnitRetryAttempts defines the number of retry attempts to perform.
	UnitRetryAttempts = func(attempts int) UnitOption {
		if attempts < 0 {
			attempts = 0
		}
		return func(o *UnitOptions) {
			o.retryAttempts = attempts
		}
	}

	// UnitRetryDelay defines the delay to utilize during retries.
	UnitRetryDelay = func(delay time.Duration) UnitOption {
		return func(o *UnitOptions) {
			o.retryDelay = delay
		}
	}

	// UnitRetryMaximumJitter defines the maximum jitter to utilize during
	// retries that utilize random delay times.
	UnitRetryMaximumJitter = func(jitter time.Duration) UnitOption {
		return func(o *UnitOptions) {
			o.retryMaximumJitter = jitter
		}
	}

	// UnitRetryType defines the type of retry to perform.
	UnitRetryType = func(retryType UnitRetryDelayType) UnitOption {
		return func(o *UnitOptions) {
			o.retryType = retryType
		}
	}

	// UnitInsertFunc defines the function to be used for inserting new
	// entities in the underlying data store.
	UnitInsertFunc = func(t TypeName, insertFunc UnitDataMapperFunc) UnitOption {
		return func(o *UnitOptions) {
			if o.insertFuncs == nil {
				o.insertFuncs = make(map[TypeName]UnitDataMapperFunc)
			}
			o.insertFuncs[t] = insertFunc
			o.insertFuncsLen = o.insertFuncsLen + 1
		}
	}

	// UnitUpdateFunc defines the function to be used for updating existing
	// entities in the underlying data store.
	UnitUpdateFunc = func(t TypeName, updateFunc UnitDataMapperFunc) UnitOption {
		return func(o *UnitOptions) {
			if o.updateFuncs == nil {
				o.updateFuncs = make(map[TypeName]UnitDataMapperFunc)
			}
			o.updateFuncs[t] = updateFunc
			o.updateFuncsLen = o.updateFuncsLen + 1
		}
	}

	// UnitDeleteFunc defines the function to be used for deleting existing
	// entities in the underlying data store.
	UnitDeleteFunc = func(t TypeName, deleteFunc UnitDataMapperFunc) UnitOption {
		return func(o *UnitOptions) {
			if o.deleteFuncs == nil {
				o.deleteFuncs = make(map[TypeName]UnitDataMapperFunc)
			}
			o.deleteFuncs[t] = deleteFunc
			o.deleteFuncsLen = o.deleteFuncsLen + 1
		}
	}

	// UnitWithCacheClient defines the cache client to be used.
	UnitWithCacheClient = func(cc UnitCacheClient) UnitOption {
		return func(o *UnitOptions) {
			o.cacheClient = cc
		}
	}
)
View Source
var (
	// ErrUncachableEntity represents the error that is returned when an attempt
	// to cache an entity with an unresolvable ID occurs.
	ErrUncachableEntity = errors.New("unable to cache entity - does not implement supported interfaces")
)

Functions

This section is empty.

Types

type TypeName

type TypeName string

TypeName represents an entity's type.

func TypeNameOf

func TypeNameOf(entity interface{}) TypeName

TypeNameOf provides the type name for the provided entity.

func (TypeName) String

func (t TypeName) String() string

String provides the string representation of the type name.

type Unit

type Unit interface {

	// Register tracks the provided entities as clean.
	Register(context.Context, ...interface{}) error

	// Cached provides the entities that have been previously registered
	// and have not been acted on via Add, Alter, or Remove.
	Cached() *UnitCache

	// Add marks the provided entities as new additions.
	Add(context.Context, ...interface{}) error

	// Alter marks the provided entities as modifications.
	Alter(context.Context, ...interface{}) error

	// Remove marks the provided entities as removals.
	Remove(context.Context, ...interface{}) error

	// Save commits the new additions, modifications, and removals
	// within the work unit to a persistent store.
	Save(context.Context) error
}

Unit represents an atomic set of entity changes.

func NewUnit

func NewUnit(opts ...UnitOption) (Unit, error)

type UnitAction

type UnitAction func(UnitActionContext)

Action represents an operation performed during a paticular lifecycle event of a work unit.

type UnitActionContext

type UnitActionContext struct {
	// Logger is the work units configured logger.
	Logger *zap.Logger
	// Scope is the work units configured metrics scope.
	Scope tally.Scope
	// AdditionCount represents the number of entities indicated as new.
	AdditionCount int
	// AlterationCount represents the number of entities indicated as modified.
	AlterationCount int
	// RemovalCount represents the number of entities indicated as removed.
	RemovalCount int
	// RegisterCount represents the number of entities indicated as registered.
	RegisterCount int
}

UnitActionContext represents the executional context for an action.

type UnitActionType

type UnitActionType int

UnitActionType represents the type of work unit action.

type UnitCache

type UnitCache struct {
	// contains filtered or unexported fields
}

UnitCache represents the cache that the work unit manipulates as a result of entity registration.

func (*UnitCache) Load

func (uc *UnitCache) Load(ctx context.Context, t TypeName, id interface{}) (entity interface{}, err error)

Load retrieves the entity with the provided type name and ID from the work unit cache.

type UnitCacheClient

type UnitCacheClient interface {
	Get(context.Context, string) (interface{}, error)
	Set(context.Context, string, interface{}) error
	Delete(context.Context, string) error
}

UnitCacheClient represents a client for a cache provider.

type UnitDataMapper

type UnitDataMapper interface {
	Insert(context.Context, UnitMapperContext, ...interface{}) error
	Update(context.Context, UnitMapperContext, ...interface{}) error
	Delete(context.Context, UnitMapperContext, ...interface{}) error
}

DataMapper represents a creator, modifier, and deleter of entities.

type UnitDataMapperFunc

type UnitDataMapperFunc func(context.Context, UnitMapperContext, ...interface{}) error

UnitDataMapperFunc represents a data mapper function that performs a single operation, such as insert, update, or delete.

type UnitMapperContext

type UnitMapperContext struct {
	// Tx is the open transaction leveraged for SQL-related data mapping
	// operations. This transaction will be nil unless the work.UnitDB option
	// is used.
	Tx *sql.Tx
}

UnitMapperContext represents the additional context provided to data mappers and data mapper functions to help facilitate the mapping process.

type UnitOption

type UnitOption func(*UnitOptions)

UnitOption applies an option to the provided configuration.

type UnitOptions

type UnitOptions struct {
	// contains filtered or unexported fields
}

UnitOptions represents the configuration options for the work unit.

type UnitRetryDelayType

type UnitRetryDelayType int

UnitRetryDelayType represents the type of retry delay to perform.

type Uniter

type Uniter interface {

	//Unit constructs a new work unit.
	Unit() (Unit, error)
}

Uniter represents a factory for work units.

func NewUniter

func NewUniter(options ...UnitOption) Uniter

NewUniter creates a new uniter with the provided unit options.

Directories

Path Synopsis
internal
mock
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL