gtm

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2019 License: MIT Imports: 11 Imported by: 34

README

gtm

gtm (go tail mongo) is a utility written in Go which tails the MongoDB oplog and sends create, update, delete events to your code. It can be used to send emails to new users, index documents, write time series data, or something else.

Other branches

If you are interested in trying the development version of gtm which targets the offical golang driver from MongoDB then checkout the leafny branch.

Requirements
Installation
go get github.com/rwynn/gtm
Setup

gtm uses the MongoDB oplog as an event source. You will need to ensure that MongoDB is configured to produce an oplog by deploying a replica set.

If you haven't already done so, follow the 5 step procedure to initiate and validate your replica set. For local testing your replica set may contain a single member.

Usage
package main

import "github.com/globalsign/mgo"
import "github.com/globalsign/mgo/bson"
import "github.com/rwynn/gtm"
import "fmt"

func main() {
	// get a mgo session	
	session, err := mgo.Dial("localhost")
	if err != nil {
		panic(err)
	}
	defer session.Close()
	session.SetMode(mgo.Monotonic, true)
	// nil options get initialized to gtm.DefaultOptions()
	ctx := gtm.Start(session, nil)
	// ctx.OpC is a channel to read ops from
	// ctx.ErrC is a channel to read errors from
	// ctx.Stop() stops all go routines started by gtm.Start
	for {
		// loop forever receiving events	
		select {
		case err := <-ctx.ErrC:
			// handle errors
			fmt.Println(err)
		case op := <-ctx.OpC:
			// op will be an insert, delete, update, or drop to mongo
			// you can check which by calling 
			// op.IsInsert(), op.IsDelete(), op.IsUpdate(), or op.IsDrop()

			// op.Data will get you the full document for inserts and updates

			// op.UpdateDescription will get you a map describing the set of changes
			// as shown in https://docs.mongodb.com/manual/reference/change-events/#update-event
			// op.UpdateDescription will only be available when you update docs via $set, $push, $unset, etc.
			// if you replace the entire document using an update command, then you will NOT get
			// information in op.UpdateDescription. e.g. db.update(doc, {total: "replace"});

			msg := fmt.Sprintf(`Got op <%v> for object <%v> 
			in database <%v> 
			and collection <%v> 
			and data <%v> 
			and change <%v> 
			and timestamp <%v>`,
				op.Operation, op.Id, op.GetDatabase(),
				op.GetCollection(), op.Data, op.UpdateDescription, op.Timestamp)
			fmt.Println(msg) // or do something more interesting
		}
	}
}
Configuration
func PipeBuilder(namespace string, changeStream bool) ([]interface{}, error) {

	// to build your pipelines for change events you will want to reference
	// the MongoDB reference for change events at 
	// https://docs.mongodb.com/manual/reference/change-events/

	// you will only receive changeStream == true when you configure gtm with
	// ChangeStreamNS (requies MongoDB 3.6+).  You cannot build pipelines for
	// changes using legacy direct oplog tailing

	if namespace == "users.users" {
		// given a set of docs like {username: "joe", email: "joe@email.com", amount: 1}
		if changeStream {
			return []interface{}{
				bson.M{"$match": bson.M{"fullDocument.username": "joe"}},
			}, nil
		} else {
			return []interface{}{
				bson.M{"$match": bson.M{"username": "joe"}},
			}, nil
		}
	} else if namespace == "users.status" && changeStream {
		// return a pipeline that only receives events when a document is 
		// inserted, deleted, or a specific field is changed. In this case
		// only a change to field1 is processed.  Changes to other fields
		// do not match the pipeline query and thus you won't receive the event.
		return []interface{}{
			bson.M{"$match": bson.M{"$or": []interface{} {
				bson.M{"updateDescription": bson.M{"$exists": false}},
				bson.M{"updateDescription.updatedFields.field1": bson.M{"$exists": true}},
			}}},
		}, nil
	}
	return nil, nil
}

func NewUsers(op *gtm.Op) bool {
	return op.Namespace == "users.users" && op.IsInsert()
}

// if you want to listen only for certain events on certain collections
// pass a filter function in options
ctx := gtm.Start(session, &gtm.Options{
	NamespaceFilter: NewUsers, // only receive inserts in the user collection
})
// more options are available for tuning
ctx := gtm.Start(session, &gtm.Options{
	NamespaceFilter      nil,           // op filter function that has access to type/ns ONLY
	Filter               nil,           // op filter function that has access to type/ns/data
	After:               nil,     	    // if nil defaults to gtm.LastOpTimestamp; not yet supported for ChangeStreamNS
	OpLogDisabled:       false,         // true to disable tailing the MongoDB oplog
	OpLogDatabaseName:   nil,     	    // defaults to "local"
	OpLogCollectionName: nil,     	    // defaults to "oplog.rs"
	ChannelSize:         0,       	    // defaults to 20
	BufferSize:          25,            // defaults to 50. used to batch fetch documents on bursts of activity
	BufferDuration:      0,             // defaults to 750 ms. after this timeout the batch is force fetched
	WorkerCount:         8,             // defaults to 1. number of go routines batch fetching concurrently
	Ordering:            gtm.Document,  // defaults to gtm.Oplog. ordering guarantee of events on the output channel as compared to the oplog
	UpdateDataAsDelta:   false,         // set to true to only receive delta information in the Data field on updates (info straight from oplog)
	DirectReadNs:        []string{"db.users"}, // set to a slice of namespaces to read data directly from bypassing the oplog
	DirectReadSplitMax:  9,             // the max number of times to split a collection for concurrent reads (impacts memory consumption)
	Pipe:                PipeBuilder,   // an optional function to build aggregation pipelines
	PipeAllowDisk:       false,         // true to allow MongoDB to use disk for aggregation pipeline options with large result sets
	SplitVector:         false,         // whether or not to use internal MongoDB command split vector to split collections
	Log:                 myLogger,      // pass your own logger
	ChangeStreamNs       []string{"db.col1", "db.col2"}, // MongoDB 3.6+ only; set to a slice to namespaces to read via MongoDB change streams
})
Direct Reads

If, in addition to tailing the oplog, you would like to also read entire collections you can set the DirectReadNs field to a slice of MongoDB namespaces. Documents from these collections will be read directly and output on the ctx.OpC channel.

You can wait till all the collections have been fully read by using the DirectReadWg wait group on the ctx.

go func() {
	ctx.DirectReadWg.Wait()
	fmt.Println("direct reads are done")
}()
Pause, Resume, Since, and Stop

You can pause, resume, or seek to a timestamp from the oplog. These methods effect only change events and not direct reads.

go func() {
	ctx.Pause()
	time.Sleep(time.Duration(2) * time.Minute)
	ctx.Resume()
	ctx.Since(previousTimestamp)
}()

You can stop all goroutines created by Start or StartMulti. You cannot resume a context once it has been stopped. You would need to create a new one.

go func() {
	ctx.Stop()
	fmt.Println("all go routines are stopped")
}
Sharded Clusters

If you use ChangeStreamNs on MongoDB 3.6+ you can ignore this section. Native change streams in MongoDB are shard aware. You should connect to the mongos routing server and change streams will work across shards. This section is for those pre-3.6 that would like to read changes across all shards.

gtm has support for sharded MongoDB clusters. You will want to start with a connection to the MongoDB config server to get the list of available shards.

// assuming the CONFIG server for a sharded cluster is running locally on port 27018
configSession, err = mgo.Dial("127.0.0.1:27018")
if err != nil {
panic(err)
}
// get the list of shard servers
shardInfos := gtm.GetShards(configSession)

for each shard you will create a session and append it to a slice of sessions

var shardSessions []*mgo.Session
// add each shard server to the sync list
for _, shardInfo := range shardInfos {
log.Printf("Adding shard found at %s\n", shardInfo.GetURL())
shardURL := shardInfo.GetURL()
shard, err := mgo.Dial(shardURL)
if err != nil {
    panic(err)
}
shardSessions = append(shardSessions, shard)
}

finally you will want to start a multi context. The multi context behaves just like a single context except that it tails multiple shard servers and coalesces the events to a single output channel

multiCtx := gtm.StartMulti(shardSessions, nil)

after you have created the multi context for all the shards you can handle new shards being added to the cluster at some later time by adding a listener. You will want to add this listener before you enter a loop to read events from the multi context.

insertHandler := func(shardInfo *gtm.ShardInfo) (*mgo.Session, error) {
	log.Printf("Adding shard found at %s\n", shardInfo.GetURL())
shardURL := shardInfo.GetURL()
return mgo.Dial(shardURL)
}

multiCtx.AddShardListener(configSession, nil, insertHandler)
Custom Unmarshalling

If you'd like to unmarshall MongoDB documents into your own struct instead of the document getting unmarshalled to a generic map[string]interface{} you can use a custom unmarshal function:

type MyDoc struct {
	Id interface{} "_id"
	Foo string "foo"
}

func custom(namespace string, raw *bson.Raw) (interface{}, error) {
	// use namespace, e.g. db.col, to map to a custom struct
	if namespace == "test.test" {
		var doc MyDoc
		if err := raw.Unmarshal(&doc); err == nil {
			return doc, nil
		} else {
			return nil, err
		}
	}
	return nil, errors.New("unsupported namespace")
}

ctx := gtm.Start(session, &gtm.Options{
	Unmarshal: custom,
}

for {
	select {
	case op:= <-ctx.OpC:
		if op.Namespace == "test.test" {
			doc := op.Doc.(MyDoc)
			fmt.Println(doc.Foo)
		}
	}
}
Workers

You may want to distribute event handling between a set of worker processes on different machines. To do this you can leverage the github.com/rwynn/gtm/consistent package.

Create a TOML document containing a list of all the event handlers.

Workers = [ "Tom", "Dick", "Harry" ] 

Create a consistent filter to distribute the work between Tom, Dick, and Harry. A consistent filter needs to acces the Data attribute of each op so it needs to be set as a Filter as opposed to a NamespaceFilter.

name := flag.String("name", "", "the name of this worker")
flag.Parse()
filter, filterErr := consistent.ConsistentHashFilterFromFile(*name, "/path/to/toml")
if filterErr != nil {
	panic(filterErr)
}

// there is also a method **consistent.ConsistentHashFilterFromDocument** which allows
// you to pass a Mongo document representing the config if you would like to avoid
// copying the same config file to multiple servers

Pass the filter into the options when calling gtm.Tail

ctx := gtm.Start(session, &gtm.Options{Filter: filter})

If you have your multiple filters you can use the gtm utility method ChainOpFilters

func ChainOpFilters(filters ...OpFilter) OpFilter
Optimizing Direct Read Throughput with SplitVector enabled

If you enable SplitVector, to get the best througput possible on direct reads you will want to consider the indexes on your collections. In the best case scenario, for very large collections, you will have an index on a field with a moderately low cardinality. For example, if you have 10 million documents in your collection and have a field named category that will have a value between 1 and 20, and you have an index of this field, then gtm will be able to perform an internal MongoDB admin command named splitVector on this key. The results of the split vector command will return a sorted list of category split points. Once gtm has the split points it is able to start splits+1 go routines with range queries to consume the entire collection concurrently. You will notice a line in the log like this is this is working.

INFO 2018/04/24 18:23:23 Found 16 splits (17 segments) for namespace test.test using index on category

When this is working you will notice the connection count increase substancially in mongostat. On the other hand, if you do not have an index which yields a high number splits, gtm will force a split and it will only be able to start 2 go routines to read your collection concurrently.

The user that gtm connects with will need to have admin access to perform the splitVector command. If the user does not have this access then gtm will use paginating range read of each collection.

Gtm previously supported the parallelCollectionScan command to get multiple read cursors on a collection. However, this command only worked on the mmapv1 storage engine and will be removed completely once the mmapv1 engine is retired. It looks like splitVector or something like it will be promoted in new versions on MongoDB.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConsumeChangeStream

func ConsumeChangeStream(ctx *OpCtx, session *mgo.Session, ns string, options *Options) (err error)

func DirectReadPaged

func DirectReadPaged(ctx *OpCtx, session *mgo.Session, ns string, options *Options) (err error)

func DirectReadSegment

func DirectReadSegment(ctx *OpCtx, session *mgo.Session, ns string, options *Options, seg *CollectionSegment, stats *CollectionStats) (err error)

func DirectReadSplitVector

func DirectReadSplitVector(ctx *OpCtx, session *mgo.Session, ns string, options *Options) (err error)

func FetchDocuments

func FetchDocuments(ctx *OpCtx, session *mgo.Session, filter OpFilter, buf *OpBuf, inOp OpChan, options *Options) error

func FirstOpTimestamp

func FirstOpTimestamp(session *mgo.Session, options *Options) bson.MongoTimestamp

func GetOpLogQuery

func GetOpLogQuery(session *mgo.Session, after bson.MongoTimestamp, options *Options) *mgo.Query

func LastOpTimestamp

func LastOpTimestamp(session *mgo.Session, options *Options) bson.MongoTimestamp

func OpLogCollection

func OpLogCollection(session *mgo.Session, options *Options) *mgo.Collection

func ParseTimestamp

func ParseTimestamp(timestamp bson.MongoTimestamp) (int32, int32)

func TailOps

func TailOps(ctx *OpCtx, session *mgo.Session, channels []OpChan, options *Options) error

func UpdateIsReplace

func UpdateIsReplace(entry map[string]interface{}) bool

Types

type BuildInfo

type BuildInfo struct {
	// contains filtered or unexported fields
}

func VersionInfo

func VersionInfo(session *mgo.Session) (buildInfo *BuildInfo, err error)

type ChangeDoc

type ChangeDoc struct {
	DocKey            map[string]interface{} "documentKey"
	Id                interface{}            "_id"
	Operation         string                 "operationType"
	FullDoc           *bson.Raw              "fullDocument"
	Namespace         map[string]string      "ns"
	Timestamp         bson.MongoTimestamp    "clusterTime"
	UpdateDescription *bson.Raw              "updateDescription"
}

type CollectionSegment

type CollectionSegment struct {
	// contains filtered or unexported fields
}

type CollectionStats

type CollectionStats struct {
	Count         int64 "count"
	AvgObjectSize int64 "avgObjSize"
}

func GetCollectionStats

func GetCollectionStats(ctx *OpCtx, session *mgo.Session, ns string) (stats *CollectionStats, err error)

type DataUnmarshaller

type DataUnmarshaller func(namespace string, raw *bson.Raw) (interface{}, error)

type Doc

type Doc struct {
	Id interface{} "_id"
}

type N

type N struct {
	// contains filtered or unexported fields
}

type Op

type Op struct {
	Id                interface{}            `json:"_id"`
	Operation         string                 `json:"operation"`
	Namespace         string                 `json:"namespace"`
	Data              map[string]interface{} `json:"data,omitempty"`
	Timestamp         bson.MongoTimestamp    `json:"timestamp"`
	Source            QuerySource            `json:"source"`
	Doc               interface{}            `json:"doc,omitempty"`
	UpdateDescription map[string]interface{} `json:"updateDescription,omitempty`
}

func (*Op) GetCollection

func (this *Op) GetCollection() string

func (*Op) GetDatabase

func (this *Op) GetDatabase() string

func (*Op) IsCommand

func (this *Op) IsCommand() bool

func (*Op) IsDelete

func (this *Op) IsDelete() bool

func (*Op) IsDrop

func (this *Op) IsDrop() bool

func (*Op) IsDropCollection

func (this *Op) IsDropCollection() (string, bool)

func (*Op) IsDropDatabase

func (this *Op) IsDropDatabase() (string, bool)

func (*Op) IsInsert

func (this *Op) IsInsert() bool

func (*Op) IsSourceDirect

func (this *Op) IsSourceDirect() bool

func (*Op) IsSourceOplog

func (this *Op) IsSourceOplog() bool

func (*Op) IsUpdate

func (this *Op) IsUpdate() bool

func (*Op) ParseLogEntry

func (this *Op) ParseLogEntry(entry *OpLog, options *Options) (include bool, err error)

func (*Op) ParseNamespace

func (this *Op) ParseNamespace() []string

type OpBuf

type OpBuf struct {
	Entries        []*Op
	BufferSize     int
	BufferDuration time.Duration
}

func (*OpBuf) Append

func (this *OpBuf) Append(op *Op)

func (*OpBuf) Flush

func (this *OpBuf) Flush(s *mgo.Session, ctx *OpCtx, options *Options)

func (*OpBuf) HasOne

func (this *OpBuf) HasOne() bool

func (*OpBuf) IsFull

func (this *OpBuf) IsFull() bool

type OpChan

type OpChan chan *Op

func Tail

func Tail(session *mgo.Session, options *Options) (OpChan, chan error)

type OpCtx

type OpCtx struct {
	OpC          OpChan
	ErrC         chan error
	DirectReadWg *sync.WaitGroup
	// contains filtered or unexported fields
}

func Start

func Start(session *mgo.Session, options *Options) *OpCtx

func (*OpCtx) Pause

func (ctx *OpCtx) Pause()

func (*OpCtx) Resume

func (ctx *OpCtx) Resume()

func (*OpCtx) Since

func (ctx *OpCtx) Since(ts bson.MongoTimestamp)

func (*OpCtx) Stop

func (ctx *OpCtx) Stop()

type OpCtxMulti

type OpCtxMulti struct {
	OpC          OpChan
	ErrC         chan error
	DirectReadWg *sync.WaitGroup
	// contains filtered or unexported fields
}

func StartMulti

func StartMulti(sessions []*mgo.Session, options *Options) *OpCtxMulti

func (*OpCtxMulti) AddShardListener

func (ctx *OpCtxMulti) AddShardListener(
	configSession *mgo.Session, shardOptions *Options, handler ShardInsertHandler)

func (*OpCtxMulti) Pause

func (ctx *OpCtxMulti) Pause()

func (*OpCtxMulti) Resume

func (ctx *OpCtxMulti) Resume()

func (*OpCtxMulti) Since

func (ctx *OpCtxMulti) Since(ts bson.MongoTimestamp)

func (*OpCtxMulti) Stop

func (ctx *OpCtxMulti) Stop()

type OpFilter

type OpFilter func(*Op) bool

func ChainOpFilters

func ChainOpFilters(filters ...OpFilter) OpFilter

func OpFilterForOrdering

func OpFilterForOrdering(ordering OrderingGuarantee, workers []string, worker string) OpFilter

type OpLog

type OpLog struct {
	Timestamp    bson.MongoTimestamp "ts"
	HistoryID    int64               "h"
	MongoVersion int                 "v"
	Operation    string              "op"
	Namespace    string              "ns"
	Doc          *bson.Raw           "o"
	Update       *bson.Raw           "o2"
}

type OpLogEntry

type OpLogEntry map[string]interface{}

type Options

type Options struct {
	After               TimestampGenerator
	Filter              OpFilter
	NamespaceFilter     OpFilter
	OpLogDisabled       bool
	OpLogDatabaseName   *string
	OpLogCollectionName *string
	CursorTimeout       *string // deprecated
	ChannelSize         int
	BufferSize          int
	BufferDuration      time.Duration
	Ordering            OrderingGuarantee
	WorkerCount         int
	MaxWaitSecs         int
	UpdateDataAsDelta   bool
	ChangeStreamNs      []string
	DirectReadNs        []string
	DirectReadFilter    OpFilter
	DirectReadSplitMax  int
	Unmarshal           DataUnmarshaller
	Pipe                PipelineBuilder
	PipeAllowDisk       bool
	SplitVector         bool
	Log                 *log.Logger
}

func DefaultOptions

func DefaultOptions() *Options

func (*Options) SetDefaults

func (this *Options) SetDefaults()

type OrderingGuarantee

type OrderingGuarantee int
const (
	Oplog     OrderingGuarantee = iota // ops sent in oplog order (strong ordering)
	Namespace                          // ops sent in oplog order within a namespace
	Document                           // ops sent in oplog order for a single document
	AnyOrder                           // ops sent as they become available
)

type PipelineBuilder

type PipelineBuilder func(namespace string, changeStream bool) ([]interface{}, error)

type QuerySource

type QuerySource int
const (
	OplogQuerySource QuerySource = iota
	DirectQuerySource
)

type ShardInfo

type ShardInfo struct {
	// contains filtered or unexported fields
}

func GetShards

func GetShards(session *mgo.Session) (shardInfos []*ShardInfo)

func (*ShardInfo) GetURL

func (shard *ShardInfo) GetURL() string

type ShardInsertHandler

type ShardInsertHandler func(*ShardInfo) (*mgo.Session, error)

type SplitVectorRequest

type SplitVectorRequest struct {
	SplitVector    string      "splitVector"
	KeyPattern     bson.M      "keyPattern"
	Min            interface{} "min"
	Max            interface{} "max"
	MaxChunkSize   int         "maxChunkSize"
	MaxSplitPoints int         "maxSplitPoints"
	Force          bool        "force"
}

type SplitVectorResult

type SplitVectorResult struct {
	SplitKeys []bson.M "splitKeys"
	Ok        int      "ok"
}

type TimestampGenerator

type TimestampGenerator func(*mgo.Session, *Options) bson.MongoTimestamp

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL