engine

package
v0.0.0-...-abd831d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 4, 2018 License: AGPL-3.0 Imports: 36 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultBufferSize   = 100
	DestinationWildcard = ""
)
View Source
const ConsoleDestinationName = "CONSOLE"
View Source
const DefaultExcelDateFormat = time.RFC3339
View Source
const DefaultRowsPerBatch = 500
View Source
const InsertQuery = `INSERT INTO %s (%s) VALUES (%s)`
View Source
const (
	ParameterTableName = "PARAMETERS"
)
View Source
const TxManagerMaxRetries = 32

Variables

View Source
var (
	ErrExcelTooManyWildcards     = errors.New("the Excel source/destination range can have at most one wildcard")
	ErrExcelCannotIncludeColumns = errors.New("the Excel source range cannot be dynamic in X if it includes columns")
	ErrColumnsNotSpecified       = errors.New("the Excel range should either include columns or they should be specified in the COLUMNS option")
)
View Source
var ErrEOS = errors.New("end of stream")
View Source
var ErrInterrupted = errors.New("The execution was interrupted by a context cancellation")
View Source
var ErrTransactionManagerFinished = errors.New("transaction manager is in a committed or rolled back state and can no longer provide new transactions")
View Source
var Inserters = map[string]SQLInserter{"mssql": &MSSQLInserter{}, "postgres": &PostgresInserter{}}
View Source
var LiteralSourceFormats = map[string]LiteralSourceFormat{
	"JSON_ARRAY":   JSONArray,
	"JSON_OBJECTS": JSONObjects,
	"CSV":          CSVWithoutHeader,
}
View Source
var SQLDriverManager sqlDriverManager

SQLDriverManager is a singleton that makes sure there is only a single DB object per connection, rather than one per source/destination

Functions

This section is empty.

Types

type AutoSQLTransform

type AutoSQLTransform struct {
	Name  string
	Table string `aql: "STAGING_TABLE, optional"`

	StagingSQLConnString string `aql: "STAGING_CONNECTION_STRING, optional"`
	Query                string
	ParameterTable       *ParameterTable
	ParameterNames       []string
	// contains filtered or unexported fields
}

AutoSQLTransform is a transform that drains the source, sticks the rows in an in-memory SQLite database (not GLOBAL - it doesn't share the cache), and then runs an SQL query on that, returning the result as rows.

Essentially, this makes it a combination of a SQL destination and a SQL source, where the two are automatically wired up to work together.

A current limitation is that the source dataset must fit entirely in memory. If this is not possible, it will be necessary to use eg. a GLOBAL destination and to configure SET IN_MEMORY = 'OFF';

func (*AutoSQLTransform) Open

func (a *AutoSQLTransform) Open(source Stream, dest Stream, l Logger, st Stopper)

func (*AutoSQLTransform) SetName

func (a *AutoSQLTransform) SetName(name string)

type Condition

type Condition func(msg map[string]interface{}, eof bool) bool

Condition is a func that returns true if the message passes the test and false otherwise.

func HasAtLeastNDistinctValuesCondition

func HasAtLeastNDistinctValuesCondition(col string, n int) (Condition, error)

func HasAtLeastNRowsCondition

func HasAtLeastNRowsCondition(n int) (Condition, error)

func HasAtMostNDistinctValuesCondition

func HasAtMostNDistinctValuesCondition(col string, n int) (Condition, error)

func HasAtMostNRowsCondition

func HasAtMostNRowsCondition(n int) (Condition, error)

func HasExactlyNDistinctValuesCondition

func HasExactlyNDistinctValuesCondition(col string, n int) (Condition, error)

func HasExactlyNRowsCondition

func HasExactlyNRowsCondition(n int) (Condition, error)

func HasNoDuplicates

func HasNoDuplicates(col string) (Condition, error)

func HasNoNullValues

func HasNoNullValues(col string) (Condition, error)

func NewSQLCondition

func NewSQLCondition(sql string) (Condition, error)

type ConsoleDestination

type ConsoleDestination struct {
	Name         string
	FormatAsJSON bool
	Writer       io.Writer
	// contains filtered or unexported fields
}

func (*ConsoleDestination) Open

func (cd *ConsoleDestination) Open(s Stream, l Logger, st Stopper)

func (*ConsoleDestination) Ping

func (cd *ConsoleDestination) Ping() error

type ConsoleLogger

type ConsoleLogger struct {
	MinLevel LogLevel
	// contains filtered or unexported fields
}

func NewConsoleLogger

func NewConsoleLogger(minLevel LogLevel) *ConsoleLogger

func (*ConsoleLogger) Chan

func (cl *ConsoleLogger) Chan() chan<- Event

func (*ConsoleLogger) Error

func (cl *ConsoleLogger) Error() error

func (*ConsoleLogger) Wait

func (cl *ConsoleLogger) Wait()

type Coordinator

type Coordinator interface {
	RegisterHooks(...interface{}) //arguments should be SourceHook, TransformHook or DestinationHook
	AddSource(name string, alias string, s Source) error
	AddDestination(name string, alias string, d Destination) error
	AddTest(node string, name string, desc string, c Condition) error
	AddTransform(name string, alias string, t Transform) error
	AddConstraint(before, after string) error
	Connect(from string, to string) error
	UseContext(ctx context.Context)
	Compile() error
	Execute() error
	Stop()
}

func NewCoordinator

func NewCoordinator(logger Logger, txManager TransactionManager) Coordinator

type DefaultInserter

type DefaultInserter struct {
	// contains filtered or unexported fields
}

func (*DefaultInserter) Initialize

func (d *DefaultInserter) Initialize(l Logger, tableName string, db *sql.DB, cols []string) error

func (*DefaultInserter) InsertBatch

func (d *DefaultInserter) InsertBatch(tx *sql.Tx, msgs []Message) error

func (*DefaultInserter) New

func (d *DefaultInserter) New() SQLInserter

func (*DefaultInserter) PreCommit

func (d *DefaultInserter) PreCommit() error

func (*DefaultInserter) Statement

func (d *DefaultInserter) Statement() string

type Destination

type Destination interface {

	//Ping checks that the destination is available. It is used to verify
	//the destination at runtime.
	Ping() error

	//Open gives the destination a stream to start pulling from and an error stream
	Open(Stream, Logger, Stopper)
}

func NewParameterTableDestination

func NewParameterTableDestination(p *ParameterTable, cols []string) Destination

type DestinationHook

type DestinationHook func(string, Destination) (Destination, error)

DestinationHook takes the destination name and interface and does something to it, possibly returning an error. If it returns a non-nil Destination, this will overwrite the existing Destination.

type DevNull

type DevNull struct {
	Name string
}

func (*DevNull) Open

func (d *DevNull) Open(s Stream, l Logger, st Stopper)

func (*DevNull) Ping

func (d *DevNull) Ping() error

type Event

type Event struct {
	Time    time.Time
	Source  string
	Level   LogLevel
	Message string
}

type ExcelDestination

type ExcelDestination struct {
	Name      string
	Filename  string `aql:"FILE"`
	Overwrite bool   `aql:"OVERWRITE, optional"`
	Template  string `aql:"TEMPLATE, optional"`
	Sheet     string `aql:"SHEET"`
	Range     ExcelRange
	Alias     string
	Transpose bool     `aql:"TRANSPOSE, optional"`
	Cols      []string `aql:"COLUMNS, optional"`
	// contains filtered or unexported fields
}

func (*ExcelDestination) Open

func (ed *ExcelDestination) Open(s Stream, l Logger, st Stopper)

func (*ExcelDestination) Ping

func (ed *ExcelDestination) Ping() error

type ExcelRange

type ExcelRange struct {
	X1 int
	Y1 int
	X2 ExcelRangePoint
	Y2 ExcelRangePoint
}

type ExcelRangePoint

type ExcelRangePoint struct {
	N bool //wildcard
	P int
}

type ExcelSource

type ExcelSource struct {
	Name                 string
	Filename             string `aql:"FILE"`
	Sheet                string `aql:"SHEET"`
	Range                ExcelRange
	RangeIncludesColumns bool
	Dateformat           string
	Cols                 []string `aql:"COLUMNS, optional"`
	// contains filtered or unexported fields
}

func (*ExcelSource) Columns

func (s *ExcelSource) Columns() []string

func (*ExcelSource) Open

func (s *ExcelSource) Open(dest Stream, l Logger, stop Stopper)

func (*ExcelSource) Ping

func (s *ExcelSource) Ping() error

func (*ExcelSource) SetName

func (s *ExcelSource) SetName(name string)

type GenericLogger

type GenericLogger struct {
	MinLevel LogLevel

	Writer io.Writer
	// contains filtered or unexported fields
}

func NewGenericLogger

func NewGenericLogger(minLevel LogLevel, writer io.Writer) *GenericLogger

func (*GenericLogger) Chan

func (gl *GenericLogger) Chan() chan<- Event

func (*GenericLogger) Error

func (gl *GenericLogger) Error() error

func (*GenericLogger) Wait

func (gl *GenericLogger) Wait()

type GraphNode

type GraphNode interface {
	Name() string
	Type() string
}

type HTTPSource

type HTTPSource struct {
	Name string

	URL                  string            `aql:"URL"` //URL of request
	Headers              map[string]string //Headers to add to request, optional
	JSONPath             string            `aql:"JSON_PATH, optional"` //Path to object containing array of rows, optional
	NoColumnNames        bool              //If response has array of primitive types rather than objects with column names, eg. ["bob",2] instead of {"name": "bob", "age": 2}
	ColumnNames          []string          `aql:"COLUMNS, optional"`                     //if NoColumnNames is true, this should be provided
	PaginationLimitName  string            `aql:"PAGINATION_LIMIT_PARAMETER, optional"`  //query parameter for pagination limit (optional)
	PaginationOffsetName string            `aql:"PAGINATION_OFFSET_PARAMETER, optional"` //query parameter for pagination offset (optional)
	PageSize             int               `aql:"PAGE_SIZE, optional"`                   //size of page for pagination
	// contains filtered or unexported fields
}

func (*HTTPSource) Open

func (h *HTTPSource) Open(s Stream, l Logger, st Stopper)

func (*HTTPSource) Ping

func (h *HTTPSource) Ping() error

func (*HTTPSource) SetName

func (h *HTTPSource) SetName(name string)

type LiteralSource

type LiteralSource struct {
	Name    string
	Content string
	Columns []string
	Format  LiteralSourceFormat
	// contains filtered or unexported fields
}

func (*LiteralSource) Open

func (ls *LiteralSource) Open(s Stream, l Logger, st Stopper)

func (*LiteralSource) Ping

func (ls *LiteralSource) Ping() error

func (*LiteralSource) SetName

func (ls *LiteralSource) SetName(name string)

type LiteralSourceFormat

type LiteralSourceFormat int
const (
	//JSONArray is a flat array eg. [[2,3],[3,4]]
	JSONArray LiteralSourceFormat = iota

	//JSONObjects is an array of objects, eg. [{"a": 1, "b": 2}, {"a": 4, "b": 5}]
	JSONObjects

	//CSVWithoutHeader is a CSV string without headers, eg. 1, 2\n4, 5. Only string
	//types are supported - other types will not be inferred, so eg. the above example
	//will map to strings ["1", "2"], ["4", "5"].
	CSVWithoutHeader
)

type LogLevel

type LogLevel int
const (
	Trace LogLevel = iota
	Info
	Warning
	Error
)

func StrToLevel

func StrToLevel(s string) (LogLevel, bool)

type Logger

type Logger interface {
	//  Chan returns a chan that can be used to log events
	Chan() chan<- Event

	//  Error returns the latest error that has been logged. The logger must keep track of this.
	Error() error

	// Wait should block until the logger is done processing messages in its chan. The sender should close the chan before calling this or it will deadlock
	Wait()
}

func SlackWrapper

func SlackWrapper(l Logger, opts SlackOpts) Logger

SlackWrapper intercepts messages to a logger and forwards any with the given minimum log level to Slack incoming Webhook.

type MSSQLInserter

type MSSQLInserter struct {
	// contains filtered or unexported fields
}

func (*MSSQLInserter) Initialize

func (m *MSSQLInserter) Initialize(l Logger, tableName string, db *sql.DB, cols []string) error

func (*MSSQLInserter) InsertBatch

func (m *MSSQLInserter) InsertBatch(tx *sql.Tx, msgs []Message) error

func (*MSSQLInserter) New

func (m *MSSQLInserter) New() SQLInserter

func (*MSSQLInserter) PreCommit

func (m *MSSQLInserter) PreCommit() error

type MandrillDestination

type MandrillDestination struct {
	Name       string
	APIKey     string `aql:"API_KEY"`
	Sender     *MandrillPrincipal
	Recipients []MandrillPrincipal
	SplitByRow bool   `aql:"SPLIT, optional"`
	Template   string `aql:"TEMPLATE"`
	Subject    string `aql:"SUBJECT, optional"`
	// contains filtered or unexported fields
}

func (*MandrillDestination) Open

func (d *MandrillDestination) Open(s Stream, l Logger, st Stopper)

func (*MandrillDestination) Ping

func (d *MandrillDestination) Ping() error

type MandrillPrincipal

type MandrillPrincipal struct {
	Name  string
	Email string
}

func ParseEmailRecipients

func ParseEmailRecipients(s string) ([]MandrillPrincipal, error)

type Message

type Message struct {
	Source      string
	Destination string
	Data        []interface{}
}

Message is a named message. Source and/or destination can be blank (i.e. wildcard).

type Middleware

type Middleware func(Stream) Stream

Middleware is a func that transforms a stream.

type NamedSliceSource

type NamedSliceSource struct {
	// contains filtered or unexported fields
}

func (*NamedSliceSource) Open

func (ns *NamedSliceSource) Open(dest Stream, logger Logger, stop Stopper)

func (*NamedSliceSource) Ping

func (ns *NamedSliceSource) Ping() error

func (*NamedSliceSource) SetName

func (ns *NamedSliceSource) SetName(name string)

type ParameterTable

type ParameterTable struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewParameterTable

func NewParameterTable() *ParameterTable

func (*ParameterTable) Declare

func (p *ParameterTable) Declare(name string) error

func (*ParameterTable) Get

func (p *ParameterTable) Get(name string) (interface{}, bool)

func (*ParameterTable) Set

func (p *ParameterTable) Set(name string, value interface{}) error

type ParameterTableDestination

type ParameterTableDestination struct {
	// contains filtered or unexported fields
}

func (*ParameterTableDestination) Open

func (p *ParameterTableDestination) Open(s Stream, l Logger, st Stopper)

func (*ParameterTableDestination) Ping

func (p *ParameterTableDestination) Ping() error

type Passthrough

type Passthrough struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*Passthrough) Open

func (p *Passthrough) Open(source Stream, dest Stream, logger Logger, stop Stopper)

func (*Passthrough) SetName

func (p *Passthrough) SetName(name string)

type PostgresInserter

type PostgresInserter struct {
	// contains filtered or unexported fields
}

func (*PostgresInserter) Initialize

func (m *PostgresInserter) Initialize(l Logger, tableName string, db *sql.DB, cols []string) error

func (*PostgresInserter) InsertBatch

func (m *PostgresInserter) InsertBatch(tx *sql.Tx, msgs []Message) error

func (*PostgresInserter) New

func (m *PostgresInserter) New() SQLInserter

func (*PostgresInserter) PreCommit

func (m *PostgresInserter) PreCommit() error

type SQLDestination

type SQLDestination struct {
	Name             string
	Driver           string
	ConnectionString string
	Table            string `aql:"TABLE"`
	Tx               *sql.Tx

	RowsPerBatch int  `aql:"ROWS_PER_BATCH,optional"`
	DropNulls    bool `aql:"DROP_NULLS,optional"`

	TxUseFunc     func() (*sql.Tx, error)
	TxReleaseFunc func()
	Alias         string
	// contains filtered or unexported fields
}

func (*SQLDestination) Columns

func (sq *SQLDestination) Columns() []string

func (*SQLDestination) Open

func (sq *SQLDestination) Open(s Stream, l Logger, st Stopper)

func (*SQLDestination) Ping

func (sq *SQLDestination) Ping() error

type SQLInserter

type SQLInserter interface {
	New() SQLInserter

	//Initialize with connection details and database.
	Initialize(l Logger, tableName string, db *sql.DB, cols []string) error

	//Insert a single batch
	InsertBatch(tx *sql.Tx, msgs []Message) error

	//Hook that is called before the transaction manager/etc commits/rollbacks the transaction
	PreCommit() error
}

SQLInserter inserts rows into a SQL database. It contains driver-specific optimisations:

  • MS SQL Server: uses bulk copy

It does not perform any transaction management.

type SQLSource

type SQLSource struct {
	Name             string
	Driver           string
	ConnectionString string
	Query            string
	ParameterTable   *ParameterTable
	Tx               *sql.Tx

	ExecOnly       bool
	ParameterNames []string
	TxReleaseFunc  func()
	TxUseFunc      func() (*sql.Tx, error)
	// contains filtered or unexported fields
}

func (*SQLSource) Columns

func (sq *SQLSource) Columns() []string

func (*SQLSource) Open

func (sq *SQLSource) Open(s Stream, l Logger, st Stopper)

func (*SQLSource) Ping

func (sq *SQLSource) Ping() error

func (*SQLSource) SetName

func (sq *SQLSource) SetName(name string)

type Sequenceable

type Sequenceable interface {
	Sequence(seq []string)
}

type SequenceableTransform

type SequenceableTransform interface {
	Transform
	Sequenceable
}

type Sequencer

type Sequencer interface {
	Wait(task string)
	Done(task string)
}

Sequencer is a synchronization utility to ensure that a collection of named tasks run in a given sequence even if they are started in parallel.

func NewSequencer

func NewSequencer(tasks []string) Sequencer

type SlackOpts

type SlackOpts struct {
	Channel    string `aql:"SLACK_CHANNEL, optional"`
	Emoji      string `aql:"SLACK_EMOJI, optional"`
	Username   string `aql:"SLACK_USER, optional"`
	WebhookURL string `aql:"SLACK_WEBHOOK_URL"`
	MinLevel   string `aql:"SLACK_LOG_LEVEL"`
	Script     string `aql:"SLACK_NAME, optional"`
}

type SliceDestination

type SliceDestination struct {
	Alias string
	sync.Mutex
	// contains filtered or unexported fields
}

func (*SliceDestination) Open

func (sd *SliceDestination) Open(s Stream, logger Logger, stop Stopper)

func (*SliceDestination) Ping

func (sd *SliceDestination) Ping() error

func (*SliceDestination) Results

func (sd *SliceDestination) Results() [][]interface{}

type SliceSource

type SliceSource struct {
	// contains filtered or unexported fields
}

func (*SliceSource) Open

func (s *SliceSource) Open(dest Stream, logger Logger, stop Stopper)

func (*SliceSource) Ping

func (s *SliceSource) Ping() error

func (*SliceSource) SetName

func (s *SliceSource) SetName(name string)

type Source

type Source interface {
	//SetName sets the name (or alias) of the source for outgoing messages
	SetName(name string)

	//Ping attempts to connect to the source without creating a stream.
	//This is used to check that the source is valid at run-time.
	Ping() error

	//Get connects to the source and returns a stream of data.
	Open(Stream, Logger, Stopper)
}

Source represents data inputs into the system, eg. a database query.

func NewNamedSliceSource

func NewNamedSliceSource(cols []string, msg []Message) Source

func NewSliceSource

func NewSliceSource(cols []string, msg [][]interface{}) Source

type SourceHook

type SourceHook func(string, Source) (Source, error)

SourceHook takes the source name and interface and does something to it, possibly returning an error. If it returns a non-nil Source, this will overwrite the existing Source.

type Stopper

type Stopper interface {
	//Stopped checks if the stopper is stopped
	Stopped() bool

	//Stops. This is irreversible.
	Stop()
}

Stopper is used as a condition variable stop halt the execution of the program. It is safe for concurrent use by multiple goroutines.

func NewStopper

func NewStopper() Stopper

type Stream

type Stream interface {
	//Columns returns a slice of column names
	Columns() []string

	//SetColumns sets the destination columns. destination can be a wildcard.
	SetColumns(destination string, cols []string) error

	//Chan is the channel for the stream. It will be closed by the sender when the stream is at an end.
	Chan(destination string) chan Message
}

Stream represents a stream of data such as a database resultset

func NewSequencedStream

func NewSequencedStream(s Stream, sequence []string) Stream

func NewStream

func NewStream(cols []string, bufferSize int) Stream

type TransactionManager

type TransactionManager interface {
	//  Register makes the connection known to the connection manager. It does
	//  NOT begin a new transaction.
	Register(aql.Connection) error

	//  Use will begin a new transaction (if none exists) or re-use the existing
	//  transaction, locking it so that no one may concurrently use it.
	Tx(connection string) (*sql.Tx, error)

	//  Release the transaction so that it may be used by others. It panics if the
	//  connection has not been registered.
	Release(connection string)

	//  Commit commits ALL transactions. It is an error to call Use() or Register()
	//  after Commit().
	Commit() error

	//  Rollback rolls back ALL transactions. It is an error to call Use() or Register()
	//  after Commit().
	Rollback() error
}

TransactionManager provides a single transaction per connection, to be used by all components that read or write from the connection. All transactions are then either committed or rolled back together. It is a 2PC Tx manager. Only supported for connections implementing sql.Tx for now.

func NewTransactionManager

func NewTransactionManager(l Logger) TransactionManager

type Transform

type Transform interface {
	//SetName sets the alias of the transform for outgoing messages
	SetName(name string)

	//Open gives the transform a stream to start pulling from
	Open(source Stream, dest Stream, logger Logger, stop Stopper)
}

Transform is a component that is neither a source nor a sink. It is configured with one or more sources, and one or more sinks.

type TransformHook

type TransformHook func(string, Transform) (Transform, error)

TransformHook takes the transform name and interface and does something to it, possibly returning an error. If it returns a non-nil Transform, this will overwrite the existing Transform.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL