mongodb

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2017 License: BSD-3-Clause Imports: 19 Imported by: 0

README

MongoDB adaptor

The MongoDB adaptor is capable of reading/tailing collections and receiving data for inserts.

collection_filters is a JSON string where the top level key is the collection name and its value is a query that will be used when iterating the collection. The commented out example below would only include documents where the i field had a value greater than 10.

NOTE You may want to check your collections to ensure the proper index(es) are in place or performance may suffer.

Configuration:
m = mongodb({
  "uri": "mongodb://127.0.0.1:27017/test"
  // "timeout": "30s",
  // "tail": false,
  // "ssl": false,
  // "cacerts": ["/path/to/cert.pem"],
  // "wc": 1,
  // "fsync": false,
  // "bulk": false,
  // "collection_filters": "{\"foo\": {\"i\": {\"$gt\": 10}}}"
})

Documentation

Index

Constants

View Source
const (
	// DefaultURI is the default endpoint of MongoDB on the local machine.
	// Primarily used when initializing a new Client without a specific URI.
	DefaultURI = "mongodb://127.0.0.1:27017/test"

	// DefaultSessionTimeout is the default timeout after which the
	// session times out when unable to connect to the provided URI.
	DefaultSessionTimeout = 10 * time.Second
)

Variables

View Source
var (

	// DefaultCollectionFilter is an empty map of empty maps
	DefaultCollectionFilter = map[string]CollectionFilter{}
)
View Source
var (
	// DefaultSafety is the default saftey mode used for the underlying session.
	// These default settings are only good for local use as it makes not guarantees for writes.
	DefaultSafety = mgo.Safe{}
)
View Source
var (

	// ErrCollectionFilter is returned when an error occurs attempting to Unmarshal the string.
	ErrCollectionFilter = errors.New("malformed collection_filters")
)

Functions

This section is empty.

Types

type Bulk

type Bulk struct {
	*sync.RWMutex
	// contains filtered or unexported fields
}

Bulk implements client.Writer for use with MongoDB and takes advantage of the Bulk API for performance improvements.

func (*Bulk) Write

func (b *Bulk) Write(msg message.Msg) func(client.Session) (message.Msg, error)

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a client to the underlying MongoDB source.

func NewClient

func NewClient(options ...ClientOptionFunc) (*Client, error)

NewClient creates a new client to work with MongoDB.

The caller can configure the new client by passing configuration options to the func.

Example:

client, err := NewClient(
  WithURI("mongodb://localhost:27017"),
  WithTimeout("30s"))

If no URI is configured, it uses defaultURI by default.

An error is also returned when some configuration option is invalid

func (Client) Close

func (c Client) Close()

Close satisfies the Closer interface and handles closing the initial mgo.Session.

func (*Client) Connect

func (c *Client) Connect() (client.Session, error)

Connect tests the mongodb connection and initializes the mongo session

type ClientOptionFunc

type ClientOptionFunc func(*Client) error

ClientOptionFunc is a function that configures a Client. It is used in NewClient.

func WithCACerts

func WithCACerts(certs []string) ClientOptionFunc

WithCACerts configures the RootCAs for the underlying TLS connection

func WithFsync

func WithFsync(fsync bool) ClientOptionFunc

WithFsync configures whether the server will wait for Fsync to complete before returning a response (Default: false).

func WithSSL

func WithSSL(ssl bool) ClientOptionFunc

WithSSL configures the database connection to connect via TLS.

func WithTail

func WithTail(tail bool) ClientOptionFunc

WithTail set the flag to tell the Client whether or not access to the oplog will be needed (Default: false).

func WithTimeout

func WithTimeout(timeout string) ClientOptionFunc

WithTimeout overrides the DefaultSessionTimeout and should be parseable by time.ParseDuration

func WithURI

func WithURI(uri string) ClientOptionFunc

WithURI defines the full connection string of the MongoDB database.

func WithWriteConcern

func WithWriteConcern(wc int) ClientOptionFunc

WithWriteConcern configures the write concern option for the session (Default: 0).

type CollectionFilter added in v0.3.0

type CollectionFilter map[string]interface{}

CollectionFilter is just a typed map of strings of map[string]interface{}

type MongoDB

type MongoDB struct {
	adaptor.BaseConfig
	SSL               bool     `json:"ssl"`
	CACerts           []string `json:"cacerts"`
	Tail              bool     `json:"tail"`
	Wc                int      `json:"wc"`
	FSync             bool     `json:"fsync"`
	Bulk              bool     `json:"bulk"`
	CollectionFilters string   `json:"collection_filters"`
}

MongoDB is an adaptor to read / write to mongodb. it works as a source by copying files, and then optionally tailing the oplog

func (*MongoDB) Client added in v0.3.0

func (m *MongoDB) Client() (client.Client, error)

func (*MongoDB) Description

func (m *MongoDB) Description() string

Description for mongodb adaptor

func (*MongoDB) Reader added in v0.3.0

func (m *MongoDB) Reader() (client.Reader, error)

func (*MongoDB) SampleConfig

func (m *MongoDB) SampleConfig() string

SampleConfig for mongodb adaptor

func (*MongoDB) Writer added in v0.3.0

func (m *MongoDB) Writer(done chan struct{}, wg *sync.WaitGroup) (client.Writer, error)

type OplogAccessError

type OplogAccessError struct {
	// contains filtered or unexported fields
}

OplogAccessError wraps the underlying error when access to the oplog fails.

func (OplogAccessError) Error

func (e OplogAccessError) Error() string

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader implements the behavior defined by client.Reader for interfacing with MongoDB.

func (*Reader) Read

func (r *Reader) Read(filterFn client.NsFilterFunc) client.MessageChanFunc

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session serves as a wrapper for the underlying mgo.Session

func (*Session) Close

func (s *Session) Close()

Close implements necessary calls to cleanup the underlying mgo.Session

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer implements client.Writer for use with MongoDB

func (*Writer) Write

func (w *Writer) Write(msg message.Msg) func(client.Session) (message.Msg, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL