mongowatch

package module
v0.0.0-...-0b358fe Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2023 License: GPL-3.0 Imports: 4 Imported by: 0

README

mongowatch

MongoDB Event Stream Watcher

Watches target mongo collection using mongo event log and executes certain handlers based on subscribed collection changes.

Self healing

If the collection was renamed, dropped or recreated, the event stream produces an 'invalidate' event for which the watcher is implemented to recover automatically from.

However make sure to reapply the collMod command options to the collection (if necessary).

This package contains helper methods to do it (make sure you have the right Mongo user permissions):

For Mongo >= 6.0

collMod changeStreamPreAndPostImages

db.EnablePrePostImages(mongoInstance *mongo.Database, colName string) error

For Mongo < 6.0

collMod recordPreImages

db.RecordPreImages(mongoInstance *mongo.Database, colName string) error

Package testing

To be able to run tests in this repo you will need to have some local and remote mongo instances running on port 27017. Configure parts with TODO comments.

Courtesy of @ignasbernotas and @zolia

Documentation

Index

Constants

View Source
const OperationTypeInvalidate = "invalidate"

Variables

This section is empty.

Functions

This section is empty.

Types

type ChangeEventDispatcherFunc

type ChangeEventDispatcherFunc func(ctx context.Context, ce ChangeStreamEvent, err error) error

ChangeEventDispatcherFunc change event callback function returning err will stop further ChangeEventDispatcherFunc processing and the change stream watcher

type ChangeStreamEvent

type ChangeStreamEvent struct {
	ID            ResumeToken         `bson:"_id" json:"_id"`
	User          string              `bson:"user" json:"user"`
	Timestamp     primitive.Timestamp `bson:"timestamp" json:"timestamp"`
	OperationType string              `bson:"operationType" json:"operationType"`
	Database      string              `bson:"database" json:"database"`
	Collection    string              `bson:"collection" json:"collection"`
	// DocumentKey is the unique identifier for the document that was changed
	// (e.g. the _id field for a document)
	// some of our collections use custom IDs therefore it doesn't fit into the primitive.ObjectID type
	DocumentKey              string      `bson:"documentKey" json:"documentKey"`
	FullDocument             primitive.M `bson:"fullDocument" json:"fullDocument"`
	FullDocumentBeforeChange primitive.M `bson:"fullDocumentBeforeChange" json:"fullDocumentBeforeChange"`
	// TODO: get previous field values e.g. paidUntil
	UpdateDescription struct {
		UpdatedFields map[string]interface{} `bson:"updatedFields" json:"updatedFields"`
		RemovedFields interface{}            `bson:"removedFields" json:"removedFields"`
	} `bson:"updateDescription" json:"updateDescription"`
}

ChangeStreamEvent is the customized representation of a MongoDB change stream event that is captured and processed by this application.

type ChangeStreamResumePoint

type ChangeStreamResumePoint struct {
	ID        ResumeToken         `bson:"_id" json:"_id"`
	Timestamp primitive.Timestamp `bson:"timestamp" json:"timestamp"`
	// need to keep this for tests
	FullDocument primitive.M `bson:"fullDocument" json:"fullDocument"`
	// important to know before resuming the stream
	// OperationType == 'invalidate' means that the resume point is no longer valid,
	// and we need to use startAfter to resume the stream
	OperationType string `bson:"operationType" json:"operationType"`
}

ChangeStreamResumePoint holds information needed to resume a change stream from a specific point

type ChangeStreamWatcher

type ChangeStreamWatcher interface {
	// Start resumes watching change events and
	// passes event data to the supplied dispatch function for handling
	Start(ctx context.Context, fullDocumentMode options.FullDocument, resumePoint *ChangeStreamResumePoint, saveFunc, deleteFunc ChangeEventDispatcherFunc, dispatchFuncs ...ChangeEventDispatcherFunc) error
}

ChangeStreamWatcher watches a change stream and dispatches received changed events

type CollectionWatcher

type CollectionWatcher interface {
	Update(ctx context.Context, doc []byte) error
	Insert(ctx context.Context, doc []byte) error
	Delete(ctx context.Context, doc []byte) error
}

CollectionWatcher is an interface for processing document data from a change stream

type DocumentProcessor

type DocumentProcessor interface {
	StartWithRetry(bo backoff.BackOff, actions CollectionWatcher, fullDocumentMode options.FullDocument) error
	Start(actions CollectionWatcher, fullDocumentMode options.FullDocument) error
	Stop()
}

DocumentProcessor is an interface for processing document data from a change stream

type ResumeToken

type ResumeToken struct {
	TokenData interface{} `bson:"_data" json:"_data"`
}

ResumeToken denotes the token associated with a MongoDB change stream event, which may be used to resume receiving change stream events from a point in the past.

type StreamResume

type StreamResume interface {
	// GetResumePoint fetches the last stored resume point
	GetResumePoint() (*ChangeStreamResumePoint, error)
	// GetResumeTime fetches the last stored resume point and extracts the timestamp
	GetResumeTime() (*primitive.Timestamp, error)
	// DeleteResumePoint deletes a change stream resume point from the collection
	DeleteResumePoint(ctx context.Context, token ResumeToken) error
	// SaveResumePoint stores ChangeStreamResumePoint
	SaveResumePoint(ctx context.Context, ce ChangeStreamResumePoint) error
}

StreamResume stores relevant change stream events mongo's oplog has configurable expiration, but we don't need a large oplog instead we store the changes we actually need

Directories

Path Synopsis
db
tx

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL