adaptor

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2015 License: BSD-3-Clause Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MONGO_BUFFER_SIZE int = 1e6
	MONGO_BUFFER_LEN  int = 5e5
)

Variables

View Source
var (
	// Adaptors is a registry of adaptor types, constructors and configs
	Adaptors = make(Registry)
)

Functions

func Register

func Register(name, desc string, fn func(*pipe.Pipe, string, Config) (StopStartListener, error), config interface{})

Register registers an adaptor (database adaptor) for use with Transporter The second argument, fn, is a constructor that returns an instance of the given adaptor, config is an instance of the adaptor's config struct

Types

type Config

type Config map[string]interface{}

Config is an alias to map[string]interface{} and helps us turn a fuzzy document into a conrete named struct

func (*Config) Construct

func (c *Config) Construct(conf interface{}) error

Construct will Marshal the Config and then Unmarshal it into a named struct the generic map into a proper struct

func (Config) GetString

func (c Config) GetString(key string) string

GetString returns value stored in the config under the given key, or an empty string if the key doesn't exist, or isn't a string value

type Elasticsearch

type Elasticsearch struct {
	// contains filtered or unexported fields
}

Elasticsearch is an adaptor to connect a pipeline to an elasticsearch cluster.

func (*Elasticsearch) Listen

func (e *Elasticsearch) Listen() error

Listen starts the listener

func (*Elasticsearch) Start

func (e *Elasticsearch) Start() error

Start the adaptor as a source (not implemented)

func (*Elasticsearch) Stop

func (e *Elasticsearch) Stop() error

Stop the adaptor

type ErrAdaptor added in v0.0.3

type ErrAdaptor struct {
	// contains filtered or unexported fields
}

ErrAdaptor gives the details of the failed adaptor

func (ErrAdaptor) Error added in v0.0.3

func (a ErrAdaptor) Error() string

type Error

type Error struct {
	Lvl    ErrorLevel
	Str    string
	Path   string
	Record interface{}
}

Error is an error that happened during an adaptor's operation. Error's include both an indication of the severity, Level, as well as a reference to the Record that was in process when the error occured

func NewError

func NewError(lvl ErrorLevel, path, str string, record interface{}) Error

NewError creates an Error type with the specificed level, path, message and record

func (Error) Error

func (t Error) Error() string

Error returns the error as a string

type ErrorLevel

type ErrorLevel int

ErrorLevel indicated the severity of the error

const (
	NOTICE ErrorLevel = iota
	WARNING
	ERROR
	CRITICAL
)

Adaptor errors have levels to indicate their severity. CRITICAL errors indicate that the program cannot continue running.

ERROR errors indicate a problem with a specific document or message. a document might not have been applied properly to a source, but the program can continue

WARNING Todo

NOTICE ToDo

type File

type File struct {
	// contains filtered or unexported fields
}

File is an adaptor that can be used as a source / sink for file's on disk, as well as a sink to stdout.

func (*File) Listen

func (d *File) Listen() (err error)

Listen starts the listen loop

func (*File) Start

func (d *File) Start() (err error)

Start the file adaptor TODO: we only know how to listen on stdout for now

func (*File) Stop

func (d *File) Stop() error

Stop the adaptor

type FileConfig

type FileConfig struct {
	// URI pointing to the resource.  We only recognize file:// and stdout:// currently
	URI string `json:"uri" doc:"the uri to connect to, ie stdout://, file:///tmp/output"`
}

FileConfig is used to configure the File Adaptor,

type Mongodb

type Mongodb struct {
	// contains filtered or unexported fields
}

Mongodb is an adaptor to read / write to mongodb. it works as a source by copying files, and then optionally tailing the oplog

func (*Mongodb) Listen

func (m *Mongodb) Listen() (err error)

Listen starts the pipe's listener

func (*Mongodb) Start

func (m *Mongodb) Start() (err error)

Start the adaptor as a source

func (*Mongodb) Stop

func (m *Mongodb) Stop() error

Stop the adaptor

type MongodbConfig

type MongodbConfig struct {
	URI       string     `json:"uri" doc:"the uri to connect to, in the form mongodb://user:password@host.com:27017/auth_database"`
	Namespace string     `json:"namespace" doc:"mongo namespace to read/write"`
	Ssl       *SslConfig `json:"ssl,omitempty" doc:"ssl options for connection"`
	Timeout   string     `json:timeout" doc:"timeout for establishing connection, format must be parsable by time.ParseDuration and defaults to 10s"`
	Debug     bool       `json:"debug" doc:"display debug information"`
	Tail      bool       `json:"tail" doc:"if tail is true, then the mongodb source will tail the oplog after copying the namespace"`
	Wc        int        `` /* 143-byte string literal not displayed */
	FSync     bool       `json:"fsync" doc:"When writing, should we flush to disk before returning success"`
	Bulk      bool       `json:"bulk" doc:"use a buffer to bulk insert documents"`
}

MongodbConfig provides configuration options for a mongodb adaptor the notable difference between this and dbConfig is the presence of the Tail option

type Registry

type Registry map[string]RegistryEntry

Registry maps the adaptor's name to the RegistryEntry

type RegistryEntry

type RegistryEntry struct {
	Name        string
	Description string
	Constructor func(*pipe.Pipe, string, Config) (StopStartListener, error)
	Config      interface{}
}

RegistryEntry stores the adaptor constructor and configuration struct

func (*RegistryEntry) About

func (r *RegistryEntry) About() string

About inspects the RegistryEntry's Config object, and uses each field's tags as a docstring

type Rethinkdb

type Rethinkdb struct {
	// contains filtered or unexported fields
}

Rethinkdb is an adaptor that writes metrics to rethinkdb (http://rethinkdb.com/) An open-source distributed database

func (*Rethinkdb) Listen

func (r *Rethinkdb) Listen() (err error)

Listen start's the adaptor's listener

func (*Rethinkdb) Start

func (r *Rethinkdb) Start() error

Start the adaptor as a source

func (*Rethinkdb) Stop

func (r *Rethinkdb) Stop() error

Stop the adaptor

type SslConfig added in v0.0.4

type SslConfig struct {
	CaCerts []string `json:"cacerts,omitempty" doc:"array of root CAs to use in order to verify the server certificates"`
}

type StopStartListener

type StopStartListener interface {
	Start() error
	Listen() error
	Stop() error
}

StopStartListener defines the interface that all database connectors and nodes must follow. Start() consumes data from the interface, Listen() listens on a pipe, processes data, and then emits it. Stop() shuts down the adaptor

func Createadaptor

func Createadaptor(kind, path string, extra Config, p *pipe.Pipe) (adaptor StopStartListener, err error)

Createadaptor instantiates an adaptor given the adaptor type and the Config. Constructors are expected to be in the form

func NewWhatever(p *pipe.Pipe, extra Config) (*Whatever, error) {}

and are expected to confirm to the adaptor interface

func NewElasticsearch

func NewElasticsearch(p *pipe.Pipe, path string, extra Config) (StopStartListener, error)

NewElasticsearch creates a new Elasticsearch adaptor. Elasticsearch adaptors cannot be used as a source,

func NewFile

func NewFile(p *pipe.Pipe, path string, extra Config) (StopStartListener, error)

NewFile returns a File Adaptor

func NewMongodb

func NewMongodb(p *pipe.Pipe, path string, extra Config) (StopStartListener, error)

NewMongodb creates a new Mongodb adaptor

func NewRethinkdb

func NewRethinkdb(p *pipe.Pipe, path string, extra Config) (StopStartListener, error)

NewRethinkdb creates a new Rethinkdb database adaptor

func NewTransformer

func NewTransformer(pipe *pipe.Pipe, path string, extra Config) (StopStartListener, error)

NewTransformer creates a new transformer object

type SyncDoc added in v0.1.0

type SyncDoc struct {
	Doc        map[string]interface{}
	Collection string
}

type Transformer

type Transformer struct {
	// contains filtered or unexported fields
}

Transformer is an adaptor which consumes data from a source, transforms it using a supplied javascript function and then emits it. The javascript transformation function is supplied as a seperate file on disk, and is called by calling the defined module.exports function

func (*Transformer) Listen

func (t *Transformer) Listen() (err error)

Listen starts the transformer's listener, reads each message from the incoming channel transformers it into mejson, and then uses the supplied javascript module.exports function to transform the document. The document is then emited to this adaptor's children

func (*Transformer) Start

func (t *Transformer) Start() error

Start the adaptor as a source (not implemented for this adaptor)

func (*Transformer) Stop

func (t *Transformer) Stop() error

Stop the adaptor

type TransformerConfig

type TransformerConfig struct {
	// file containing transformer javascript
	// must define a module.exports = function(doc) { .....; return doc }
	Filename  string `json:"filename" doc:"the filename containing the javascript transform fn"`
	Namespace string `json:"namespace" doc:"namespace to transform"`

	// verbose output
	Debug bool `json:"debug" doc:"display debug information"` // debug mode
}

TransformerConfig holds config options for a transformer adaptor

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL