mongodb

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2022 License: BSD-3-Clause Imports: 20 Imported by: 3

README

MongoDB adaptor

The MongoDB adaptor is capable of reading/tailing collections and receiving data for inserts.

NOTE You may want to check your collections to ensure the proper index(es) are in place or performance may suffer.

Example

m = mongodb({
  "uri": "mongodb://127.0.0.1:27017/test"
  // "timeout": "30s",
  // "tail": false,
  // "ssl": false,
  // "cacerts": ["/path/to/cert.pem"],
  // "wc": 1,
  // "fsync": false,
  // "bulk": false,
  // "collection_filters": "{\"foo\": {\"i\": {\"$gt\": 10}}}"
})

Options

Parameter Description Default
uri Defines the full connection string of the MongoDB database. mongodb://127.0.0.1:27017/test
timeout Overrides the default session timeout and should be parseable by time.ParseDuration 10s
tail Whether the source connection will listen for updates after the initial sync (requires oplog access) false
ssl Configures the database connection to connect via TLS false
cacerts Configures the RootCAs for the underlying TLS connection []
wc Configures the write concern option for the session 0
fsync Whether the server will wait for Fsync to complete before returning a response false
bulk Whether the sink connection will use bulk inserts rather than writing one record at a time. false
collection_filters A JSON string where the top level key is the collection name and its value is a query that will be used when iterating the collection. The commented out example above would only include documents where the i field had a value greater than 10 {}

Run adaptor test

Spin up required containers

You'll need those ports on your local machine: 10000, 10001, 27017, 27018, 11112, 29017, 15000, 20000

So make sure to kill anything that might use them (like a local mongo instance)

# From transporter's root folder
version=3.2.11
# Pay attention to a WARNING telling you to add a line to /etc/hosts in the following command
scripts/run_db_in_docker.sh mongodb $version
Run the tests
# From transporter's root folder
go test -v ./adaptor/mongodb/
Tear down containers

Once you're done

TESTDIR=adaptor/mongodb scripts/teardown_db_in_docker.sh

Documentation

Index

Constants

View Source
const (
	// DefaultURI is the default endpoint of MongoDB on the local machine.
	// Primarily used when initializing a new Client without a specific URI.
	DefaultURI = "mongodb://127.0.0.1:27017/test"

	// DefaultSessionTimeout is the default timeout after which the
	// session times out when unable to connect to the provided URI.
	DefaultSessionTimeout = 10 * time.Second

	// DefaultReadPreference when connecting to a mongo replica set.
	DefaultReadPreference = mgo.Primary

	// DefaultMaxWriteBatchSize when using the bulk interface
	DefaultMaxWriteBatchSize = 1000
)

Variables

View Source
var (

	// DefaultCollectionFilter is an empty map of empty maps
	DefaultCollectionFilter = map[string]CollectionFilter{}
)
View Source
var (
	// DefaultSafety is the default saftey mode used for the underlying session.
	// These default settings are only good for local use as it makes not guarantees for writes.
	DefaultSafety = mgo.Safe{}
)
View Source
var (

	// ErrCollectionFilter is returned when an error occurs attempting to Unmarshal the string.
	ErrCollectionFilter = errors.New("malformed collection_filters")
)

Functions

This section is empty.

Types

type Bulk

type Bulk struct {
	*sync.RWMutex
	// contains filtered or unexported fields
}

Bulk implements client.Writer for use with MongoDB and takes advantage of the Bulk API for performance improvements.

func (*Bulk) Write

func (b *Bulk) Write(msg message.Msg) func(client.Session) (message.Msg, error)

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a client to the underlying MongoDB source.

func NewClient

func NewClient(options ...ClientOptionFunc) (*Client, error)

NewClient creates a new client to work with MongoDB.

The caller can configure the new client by passing configuration options to the func.

Example:

client, err := NewClient(
  WithURI("mongodb://localhost:27017"),
  WithTimeout("30s"))

If no URI is configured, it uses defaultURI by default.

An error is also returned when some configuration option is invalid

func (Client) Close

func (c Client) Close()

Close satisfies the Closer interface and handles closing the initial mgo.Session.

func (*Client) Connect

func (c *Client) Connect() (client.Session, error)

Connect tests the mongodb connection and initializes the mongo session

type ClientOptionFunc

type ClientOptionFunc func(*Client) error

ClientOptionFunc is a function that configures a Client. It is used in NewClient.

func WithCACerts

func WithCACerts(certs []string) ClientOptionFunc

WithCACerts configures the RootCAs for the underlying TLS connection

func WithFsync

func WithFsync(fsync bool) ClientOptionFunc

WithFsync configures whether the server will wait for Fsync to complete before returning a response (Default: false).

func WithReadPreference added in v0.4.0

func WithReadPreference(readPreference string) ClientOptionFunc

WithReadPreference sets the MongoDB read preference based on the provided string.

func WithSSL

func WithSSL(ssl bool) ClientOptionFunc

WithSSL configures the database connection to connect via TLS.

func WithTail

func WithTail(tail bool) ClientOptionFunc

WithTail set the flag to tell the Client whether or not access to the oplog will be needed (Default: false).

func WithTimeout

func WithTimeout(timeout string) ClientOptionFunc

WithTimeout overrides the DefaultSessionTimeout and should be parseable by time.ParseDuration

func WithURI

func WithURI(uri string) ClientOptionFunc

WithURI defines the full connection string of the MongoDB database.

func WithWriteConcern

func WithWriteConcern(wc int) ClientOptionFunc

WithWriteConcern configures the write concern option for the session (Default: 0).

type CollectionFilter added in v0.3.0

type CollectionFilter map[string]interface{}

CollectionFilter is just a typed map of strings of map[string]interface{}

type InvalidReadPreferenceError added in v0.4.0

type InvalidReadPreferenceError struct {
	ReadPreference string
}

InvalidReadPreferenceError represents the error when an incorrect mongo read preference has been set.

func (InvalidReadPreferenceError) Error added in v0.4.0

type OplogAccessError

type OplogAccessError struct {
	// contains filtered or unexported fields
}

OplogAccessError wraps the underlying error when access to the oplog fails.

func (OplogAccessError) Error

func (e OplogAccessError) Error() string

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader implements the behavior defined by client.Reader for interfacing with MongoDB.

func (*Reader) Read

func (r *Reader) Read(resumeMap map[string]client.MessageSet, filterFn client.NsFilterFunc) client.MessageChanFunc

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session serves as a wrapper for the underlying mgo.Session

func (*Session) Close

func (s *Session) Close()

Close implements necessary calls to cleanup the underlying mgo.Session

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer implements client.Writer for use with MongoDB

func (*Writer) Write

func (w *Writer) Write(msg message.Msg) func(client.Session) (message.Msg, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL