backend

package
v0.0.0-...-9a5c20e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2024 License: BSD-3-Clause Imports: 13 Imported by: 26

README

backend

import "github.com/blueprint-uservices/blueprint/runtime/core/backend"

Package backend provides the interfaces for common backends like caches, queues, databases, etc. that are often used by application workflow specs.

Workflow services can, and should, make use of the interfaces defined in this package.

To use a backend, an application's workflow should require this module and import the interfaces from this package. Service constructors should receive the backend interface as an argument, e.g.

func NewMyService(ctx context.Context, db backend.NoSQLDB) (MyService, error) {...}

Index

func CopyResult

func CopyResult(src any, dst any) error

Lots of APIs want to copy results into interfaces. This is a helper method to do so.

src can be anything; dst must be a pointer to the same type as src

func GetPointerValue

func GetPointerValue(val any) (any, error)

func GetSpanContext

func GetSpanContext(encoded_string string) (trace.SpanContextConfig, error)

Utility function to convert an encoded string into a Span Context

func Meter

func Meter(ctx context.Context, name string, opts ...metric.MeterOption) (metric.Meter, error)

Meter returns a new metric.Meter with a provided name and configuration

A Meter should be scoped at most to a single package. We recommend a meter being scoped to a single service. The name needs to be unique so it does not collide with other names used by an application, nor other applications.

If the name is empty, then an implementation defined default name will be used instead.

func SetDefaultLogger

func SetDefaultLogger(l Logger)

Set's the default logger to be used by the Blueprint application. NOTE: This should not be called in the workflow code. This is called from the various logger plugins.

func SetDefaultMetricCollector

func SetDefaultMetricCollector(m MetricCollector)

Sets the default metric collector to be used by BLueprint applications. This should be called from the constructor of a Metric Collector

func SetZero

func SetZero(dst any) error

Sets the zero value of a pointer

type Cache

Represents a key-value cache.

type Cache interface {
    // Store a key-value pair in the cache
    Put(ctx context.Context, key string, value interface{}) error

    // Retrieves a value from the cache.
    // val should be a pointer in which the value will be stored, e.g.
    //
    //   var value interface{}
    //   cache.Get(ctx, "key", &value)
    //
    // Reports whether the key existed in the cache
    Get(ctx context.Context, key string, val interface{}) (bool, error)

    // Store multiple key-value pairs in the cache.
    // keys and values must have the same length or an error will be returned
    Mset(ctx context.Context, keys []string, values []interface{}) error

    // Retrieve the values for multiple keys from the cache.
    // keys and values must have the same length or an error will be returned
    // values is an array of pointers to which the values will be stored, e.g.
    //
    //   var a string
    //   var b int64
    //   cache.Mget(ctx, []string{"a", "b"}, []any{&a, &b})
    Mget(ctx context.Context, keys []string, values []interface{}) error

    // Delete from the cache
    Delete(ctx context.Context, key string) error

    // Treats the value mapped to key as an integer, and increments it
    Incr(ctx context.Context, key string) (int64, error)
}

type LogOptions

type LogOptions struct {
    Level Priority
}

type Logger

Represents a logger that can be used by the logger plugin

type Logger interface {
    // Logf creates a new log record at `INFO` level with `fmt.Sprintf(format, args...)` as the log message. Same interface as fmt.Printf or log.Printf.
    // Returns a context that may-be updated by the logger with some logger specific state. If no state is set, then the passed-in context is returned as is.
    Logf(ctx context.Context, opts LogOptions, format string, args ...any) (context.Context, error)
    // Debug creates a new log record at `DEBUG` level with `fmt.Sprintf(format, args...)` as the log message. Convenience wrapper around Logf
    Debug(ctx context.Context, format string, args ...any) (context.Context, error)
    // Info creates a new log record at `INFO` level with `fmt.Sprintf(format, args...)` as the log message.
    Info(ctx context.Context, format string, args ...any) (context.Context, error)
    // Warn creates a new log record at `WARN` level with `fmt.Sprintf(format, args...)` as the log message.
    Warn(ctx context.Context, format string, args ...any) (context.Context, error)
    // Error creates a new log record at `ERROR` level with `fmt.Sprintf(format, args...)` as the log message.
    Error(ctx context.Context, format string, args ...any) (context.Context, error)
}

func GetLogger
func GetLogger() Logger

Returns the default logger

type MetricCollector

Represents a metric collector that can be used by the metric/opentelemetry plugin

type MetricCollector interface {
    // Returns a go.opentelemetry.io/otel/metric/MeterProvider
    GetMetricProvider(ctx context.Context) (metric.MeterProvider, error)
}

type NoSQLCollection

type NoSQLCollection interface {
    // Deletes the first document that matches filter
    //
    // We use the same filter semantics as mongodb
    // https://www.mongodb.com/docs/manual/tutorial/query-documents/
    DeleteOne(ctx context.Context, filter bson.D) error

    // Deletes all documents that match filter
    //
    // We use the same filter semantics as mongodb
    // https://www.mongodb.com/docs/manual/tutorial/query-documents/
    DeleteMany(ctx context.Context, filter bson.D) error

    // Inserts the document into the collection.
    InsertOne(ctx context.Context, document interface{}) error

    // Inserts all provided documents into the collection
    InsertMany(ctx context.Context, documents []interface{}) error

    // Finds a document that matches filter.
    //
    // We use the same filter semantics as mongodb
    // https://www.mongodb.com/docs/manual/tutorial/query-documents/
    //
    // Projections are optional and behave with mongodb semantics.
    FindOne(ctx context.Context, filter bson.D, projection ...bson.D) (NoSQLCursor, error)

    // Finds all documents that match the filter.
    //
    // We use the same filter semantics as mongodb
    // https://www.mongodb.com/docs/manual/tutorial/query-documents/
    //
    // Projections are optional and behave with mongodb semantics.
    FindMany(ctx context.Context, filter bson.D, projection ...bson.D) (NoSQLCursor, error) // Result is not a slice -> it is an object we can use to retrieve documents using res.All().

    // Applies the provided update to the first document that matches filter
    //
    // We use the same filter semantics as mongodb
    // https://www.mongodb.com/docs/manual/tutorial/query-documents/
    //
    // We use the same update operators as mongodb
    // https://www.mongodb.com/docs/manual/reference/method/db.collection.update/
    //
    // Returns the number of updated documents (0 or 1)
    UpdateOne(ctx context.Context, filter bson.D, update bson.D) (int, error)

    // Applies the provided update to all documents that match the filter
    //
    // We use the same filter semantics as mongodb
    // https://www.mongodb.com/docs/manual/tutorial/query-documents/
    //
    // We use the same update operators as mongodb
    // https://www.mongodb.com/docs/manual/reference/method/db.collection.update/
    //
    // Returns the number of updated documents (>= 0)
    UpdateMany(ctx context.Context, filter bson.D, update bson.D) (int, error)

    // Attempts to find a document in the collection that matches the filter.
    // If a match is found, replaces the existing document with the provided document.
    // If a match is not found, document is inserted into the collection.
    // Returns true if an existing document was updated; false otherwise
    Upsert(ctx context.Context, filter bson.D, document interface{}) (bool, error)

    // Attempts to match a document in the collection with "_id" = id.
    // If a match is found, replaces the existing document with the provided document.
    // If a match is not found, document is inserted into the collection.
    // Returns true if an existing document was updated; false otherwise
    //
    // This method requires that document has an "_id" field in its BSON representation.
    // If document is a golang struct, the standard way to do this is to tag a field as follows:
    //     ID   primitive.ObjectID `bson:"_id"`
    UpsertID(ctx context.Context, id primitive.ObjectID, document interface{}) (bool, error)

    // Replaces the first document that matches filter with the replacement document.
    //
    // We use the same filter semantics as mongodb
    // https://www.mongodb.com/docs/manual/tutorial/query-documents/
    //
    // Returns the number of replaced documents (0 or 1)
    ReplaceOne(ctx context.Context, filter bson.D, replacement interface{}) (int, error)

    // Replaces all documents that match filter with the replacement documents.
    //
    // We use the same filter semantics as mongodb
    // https://www.mongodb.com/docs/manual/tutorial/query-documents/
    //
    // Returns the number of replaced documents.
    ReplaceMany(ctx context.Context, filter bson.D, replacements ...interface{}) (int, error)
}

type NoSQLCursor

type NoSQLCursor interface {
    // Copies one result into the target pointer.
    // If there are no results, returns false; otherwise returns true.
    // Returns an error if obj is not a compatible type.
    One(ctx context.Context, obj interface{}) (bool, error)

    // Copies all results into the target pointer.
    // obj must be a pointer to a slice type.
    // Returns the number of results copied.
    // Returns an error if obj is not a compatible type.
    All(ctx context.Context, obj interface{}) error //similar logic to Decode, but for multiple documents
}

type NoSQLDatabase

type NoSQLDatabase interface {
    /*
    	A NoSQLDatabse implementation might distinguish between databases and collections,
    	or might not have those concepts.
    */
    GetCollection(ctx context.Context, db_name string, collection_name string) (NoSQLCollection, error)
}

type Priority

The Priority Level at which the message will be recorded

type Priority int

const (
    DEBUG Priority = iota
    INFO
    WARN
    ERROR
)

func (Priority) String
func (p Priority) String() string

String representation for Priority enum

type Queue

A Queue backend is used for pushing and popping elements.

type Queue interface {

    // Pushes an item to the tail of the queue.
    //
    // This call will block until the item is successfully pushed, or until the context
    // is cancelled.
    //
    // Reports whether the item was pushed to the queue, or if an error was encountered.
    // A context cancellation/timeout is not considered an error.
    Push(ctx context.Context, item interface{}) (bool, error)

    // Pops an item from the front of the queue.
    //
    // This call will block until an item is successfully popped, or until the context
    // is cancelled.
    //
    // dst must be a pointer type that can receive the item popped from the queue.
    //
    // Reports whether the item was pushed to the queue, or if an error was encountered.
    // A context cancellation/timeout is not considered an error.
    Pop(ctx context.Context, dst interface{}) (bool, error)
}

type RelationalDB

A Relational database backend is used for storing and querying structured data using SQL queries.

SQL is relatively standardized in golang under the database/sql interfaces. Blueprint's RelationalDB interface exposes the github.com/jmoiron/sqlx interfaces, which are more convenient for casual usage and help in marshalling structs into rows and back.

type RelationalDB interface {
    // Exec executes a query without returning any rows. The args are for any placeholder parameters in the query.
    //
    // Returns a [sql.Result] object from the [database/sql] package.
    Exec(ctx context.Context, query string, args ...any) (sql.Result, error)

    // Query executes a query that returns rows, typically a SELECT. The args are for any placeholder parameters in the query.
    //
    // Returns a [sql.Rows] object from the [database/sql] package, that can be used to access query results.
    // Rows' cursor starts before the first row of the result set. Use Next to advance from row to row.
    Query(ctx context.Context, query string, args ...any) (*sql.Rows, error)

    // Prepare creates a prepared statement for later queries or executions. Multiple queries or executions may
    // be run concurrently from the returned statement. The caller must call the statement's Close method
    // when the statement is no longer needed.
    //
    // Returns a [sql.Stmt] object from the [database/sql] package, that can be used to execute the prepared
    // statement.
    Prepare(ctx context.Context, query string) (*sql.Stmt, error)

    // Select using this DB. Any placeholder parameters are replaced with supplied args.
    //
    // Uses [github.com/jmoiron/sqlx] to marshal query results into dst.
    Select(ctx context.Context, dst interface{}, query string, args ...any) error

    // Get using this DB. Any placeholder parameters are replaced with supplied args. An error is returned if the result set is empty.
    //
    // Uses [github.com/jmoiron/sqlx] to marshal query results into dst.
    Get(ctx context.Context, dst interface{}, query string, args ...any) error
}

type Tracer

Represents a tracer that can be used by the tracer/opentelemetry plugin

type Tracer interface {
    // Returns a go.opentelemetry.io/otel/trace.TracerProvider
    // TracerProvider provides Tracers that are used by instrumentation code to trace computational workflows.
    GetTracerProvider(ctx context.Context) (trace.TracerProvider, error)
}

Generated by gomarkdoc

Documentation

Overview

Package backend provides the interfaces for common backends like caches, queues, databases, etc. that are often used by application workflow specs.

Workflow services can, and should, make use of the interfaces defined in this package.

To use a backend, an application's workflow should require this module and import the interfaces from this package. Service constructors should receive the backend interface as an argument, e.g.

func NewMyService(ctx context.Context, db backend.NoSQLDB) (MyService, error) {...}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CopyResult

func CopyResult(src any, dst any) error

Lots of APIs want to copy results into interfaces. This is a helper method to do so.

src can be anything; dst must be a pointer to the same type as src

func GetPointerValue

func GetPointerValue(val any) (any, error)

func GetSpanContext

func GetSpanContext(encoded_string string) (trace.SpanContextConfig, error)

Utility function to convert an encoded string into a Span Context

func Meter

func Meter(ctx context.Context, name string, opts ...metric.MeterOption) (metric.Meter, error)

Meter returns a new metric.Meter with a provided name and configuration

A Meter should be scoped at most to a single package. We recommend a meter being scoped to a single service. The name needs to be unique so it does not collide with other names used by an application, nor other applications.

If the name is empty, then an implementation defined default name will be used instead.

func SetDefaultLogger

func SetDefaultLogger(l Logger)

Set's the default logger to be used by the Blueprint application. NOTE: This should not be called in the workflow code. This is called from the various logger plugins.

func SetDefaultMetricCollector

func SetDefaultMetricCollector(m MetricCollector)

Sets the default metric collector to be used by BLueprint applications. This should be called from the constructor of a Metric Collector

func SetZero

func SetZero(dst any) error

Sets the zero value of a pointer

Types

type Cache

type Cache interface {
	// Store a key-value pair in the cache
	Put(ctx context.Context, key string, value interface{}) error

	// Retrieves a value from the cache.
	// val should be a pointer in which the value will be stored, e.g.
	//
	//   var value interface{}
	//   cache.Get(ctx, "key", &value)
	//
	// Reports whether the key existed in the cache
	Get(ctx context.Context, key string, val interface{}) (bool, error)

	// Store multiple key-value pairs in the cache.
	// keys and values must have the same length or an error will be returned
	Mset(ctx context.Context, keys []string, values []interface{}) error

	// Retrieve the values for multiple keys from the cache.
	// keys and values must have the same length or an error will be returned
	// values is an array of pointers to which the values will be stored, e.g.
	//
	//   var a string
	//   var b int64
	//   cache.Mget(ctx, []string{"a", "b"}, []any{&a, &b})
	Mget(ctx context.Context, keys []string, values []interface{}) error

	// Delete from the cache
	Delete(ctx context.Context, key string) error

	// Treats the value mapped to key as an integer, and increments it
	Incr(ctx context.Context, key string) (int64, error)
}

Represents a key-value cache.

type LogOptions

type LogOptions struct {
	Level Priority
}

type Logger

type Logger interface {
	// Logf creates a new log record at `INFO` level with `fmt.Sprintf(format, args...)` as the log message. Same interface as fmt.Printf or log.Printf.
	// Returns a context that may-be updated by the logger with some logger specific state. If no state is set, then the passed-in context is returned as is.
	Logf(ctx context.Context, opts LogOptions, format string, args ...any) (context.Context, error)
	// Debug creates a new log record at `DEBUG` level with `fmt.Sprintf(format, args...)` as the log message. Convenience wrapper around Logf
	Debug(ctx context.Context, format string, args ...any) (context.Context, error)
	// Info creates a new log record at `INFO` level with `fmt.Sprintf(format, args...)` as the log message.
	Info(ctx context.Context, format string, args ...any) (context.Context, error)
	// Warn creates a new log record at `WARN` level with `fmt.Sprintf(format, args...)` as the log message.
	Warn(ctx context.Context, format string, args ...any) (context.Context, error)
	// Error creates a new log record at `ERROR` level with `fmt.Sprintf(format, args...)` as the log message.
	Error(ctx context.Context, format string, args ...any) (context.Context, error)
}

Represents a logger that can be used by the logger plugin

func GetLogger

func GetLogger() Logger

Returns the default logger

type MetricCollector

type MetricCollector interface {
	// Returns a go.opentelemetry.io/otel/metric/MeterProvider
	GetMetricProvider(ctx context.Context) (metric.MeterProvider, error)
}

Represents a metric collector that can be used by the metric/opentelemetry plugin

type NoSQLCollection

type NoSQLCollection interface {
	// Deletes the first document that matches filter
	//
	// We use the same filter semantics as mongodb
	// https://www.mongodb.com/docs/manual/tutorial/query-documents/
	DeleteOne(ctx context.Context, filter bson.D) error

	// Deletes all documents that match filter
	//
	// We use the same filter semantics as mongodb
	// https://www.mongodb.com/docs/manual/tutorial/query-documents/
	DeleteMany(ctx context.Context, filter bson.D) error

	// Inserts the document into the collection.
	InsertOne(ctx context.Context, document interface{}) error

	// Inserts all provided documents into the collection
	InsertMany(ctx context.Context, documents []interface{}) error

	// Finds a document that matches filter.
	//
	// We use the same filter semantics as mongodb
	// https://www.mongodb.com/docs/manual/tutorial/query-documents/
	//
	// Projections are optional and behave with mongodb semantics.
	FindOne(ctx context.Context, filter bson.D, projection ...bson.D) (NoSQLCursor, error)

	// Finds all documents that match the filter.
	//
	// We use the same filter semantics as mongodb
	// https://www.mongodb.com/docs/manual/tutorial/query-documents/
	//
	// Projections are optional and behave with mongodb semantics.
	FindMany(ctx context.Context, filter bson.D, projection ...bson.D) (NoSQLCursor, error) // Result is not a slice -> it is an object we can use to retrieve documents using res.All().

	// Applies the provided update to the first document that matches filter
	//
	// We use the same filter semantics as mongodb
	// https://www.mongodb.com/docs/manual/tutorial/query-documents/
	//
	// We use the same update operators as mongodb
	// https://www.mongodb.com/docs/manual/reference/method/db.collection.update/
	//
	// Returns the number of updated documents (0 or 1)
	UpdateOne(ctx context.Context, filter bson.D, update bson.D) (int, error)

	// Applies the provided update to all documents that match the filter
	//
	// We use the same filter semantics as mongodb
	// https://www.mongodb.com/docs/manual/tutorial/query-documents/
	//
	// We use the same update operators as mongodb
	// https://www.mongodb.com/docs/manual/reference/method/db.collection.update/
	//
	// Returns the number of updated documents (>= 0)
	UpdateMany(ctx context.Context, filter bson.D, update bson.D) (int, error)

	// Attempts to find a document in the collection that matches the filter.
	// If a match is found, replaces the existing document with the provided document.
	// If a match is not found, document is inserted into the collection.
	// Returns true if an existing document was updated; false otherwise
	Upsert(ctx context.Context, filter bson.D, document interface{}) (bool, error)

	// Attempts to match a document in the collection with "_id" = id.
	// If a match is found, replaces the existing document with the provided document.
	// If a match is not found, document is inserted into the collection.
	// Returns true if an existing document was updated; false otherwise
	//
	// This method requires that document has an "_id" field in its BSON representation.
	// If document is a golang struct, the standard way to do this is to tag a field as follows:
	//     ID   primitive.ObjectID `bson:"_id"`
	UpsertID(ctx context.Context, id primitive.ObjectID, document interface{}) (bool, error)

	// Replaces the first document that matches filter with the replacement document.
	//
	// We use the same filter semantics as mongodb
	// https://www.mongodb.com/docs/manual/tutorial/query-documents/
	//
	// Returns the number of replaced documents (0 or 1)
	ReplaceOne(ctx context.Context, filter bson.D, replacement interface{}) (int, error)

	// Replaces all documents that match filter with the replacement documents.
	//
	// We use the same filter semantics as mongodb
	// https://www.mongodb.com/docs/manual/tutorial/query-documents/
	//
	// Returns the number of replaced documents.
	ReplaceMany(ctx context.Context, filter bson.D, replacements ...interface{}) (int, error)
}

type NoSQLCursor

type NoSQLCursor interface {
	// Copies one result into the target pointer.
	// If there are no results, returns false; otherwise returns true.
	// Returns an error if obj is not a compatible type.
	One(ctx context.Context, obj interface{}) (bool, error)

	// Copies all results into the target pointer.
	// obj must be a pointer to a slice type.
	// Returns the number of results copied.
	// Returns an error if obj is not a compatible type.
	All(ctx context.Context, obj interface{}) error //similar logic to Decode, but for multiple documents
}

type NoSQLDatabase

type NoSQLDatabase interface {
	/*
		A NoSQLDatabse implementation might distinguish between databases and collections,
		or might not have those concepts.
	*/
	GetCollection(ctx context.Context, db_name string, collection_name string) (NoSQLCollection, error)
}

type Priority

type Priority int

The Priority Level at which the message will be recorded

const (
	DEBUG Priority = iota
	INFO
	WARN
	ERROR
)

func (Priority) String

func (p Priority) String() string

String representation for Priority enum

type Queue

type Queue interface {

	// Pushes an item to the tail of the queue.
	//
	// This call will block until the item is successfully pushed, or until the context
	// is cancelled.
	//
	// Reports whether the item was pushed to the queue, or if an error was encountered.
	// A context cancellation/timeout is not considered an error.
	Push(ctx context.Context, item interface{}) (bool, error)

	// Pops an item from the front of the queue.
	//
	// This call will block until an item is successfully popped, or until the context
	// is cancelled.
	//
	// dst must be a pointer type that can receive the item popped from the queue.
	//
	// Reports whether the item was pushed to the queue, or if an error was encountered.
	// A context cancellation/timeout is not considered an error.
	Pop(ctx context.Context, dst interface{}) (bool, error)
}

A Queue backend is used for pushing and popping elements.

type RelationalDB

type RelationalDB interface {
	// Exec executes a query without returning any rows. The args are for any placeholder parameters in the query.
	//
	// Returns a [sql.Result] object from the [database/sql] package.
	Exec(ctx context.Context, query string, args ...any) (sql.Result, error)

	// Query executes a query that returns rows, typically a SELECT. The args are for any placeholder parameters in the query.
	//
	// Returns a [sql.Rows] object from the [database/sql] package, that can be used to access query results.
	// Rows' cursor starts before the first row of the result set. Use Next to advance from row to row.
	Query(ctx context.Context, query string, args ...any) (*sql.Rows, error)

	// Prepare creates a prepared statement for later queries or executions. Multiple queries or executions may
	// be run concurrently from the returned statement. The caller must call the statement's Close method
	// when the statement is no longer needed.
	//
	// Returns a [sql.Stmt] object from the [database/sql] package, that can be used to execute the prepared
	// statement.
	Prepare(ctx context.Context, query string) (*sql.Stmt, error)

	// Select using this DB. Any placeholder parameters are replaced with supplied args.
	//
	// Uses [github.com/jmoiron/sqlx] to marshal query results into dst.
	Select(ctx context.Context, dst interface{}, query string, args ...any) error

	// Get using this DB. Any placeholder parameters are replaced with supplied args. An error is returned if the result set is empty.
	//
	// Uses [github.com/jmoiron/sqlx] to marshal query results into dst.
	Get(ctx context.Context, dst interface{}, query string, args ...any) error
}

A Relational database backend is used for storing and querying structured data using SQL queries.

SQL is relatively standardized in golang under the database/sql interfaces. Blueprint's RelationalDB interface exposes the github.com/jmoiron/sqlx interfaces, which are more convenient for casual usage and help in marshalling structs into rows and back.

type Tracer

type Tracer interface {
	// Returns a go.opentelemetry.io/otel/trace.TracerProvider
	// TracerProvider provides Tracers that are used by instrumentation code to trace computational workflows.
	GetTracerProvider(ctx context.Context) (trace.TracerProvider, error)
}

Represents a tracer that can be used by the tracer/opentelemetry plugin

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL