stream

package
v0.0.0-...-0b358fe Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2023 License: GPL-3.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrInvalidate = fmt.Errorf("received 'invalidate' event")

Functions

func GetDeleteResumePointFunc

func GetDeleteResumePointFunc(resumeTokenRepo mongowatch.StreamResume) mongowatch.ChangeEventDispatcherFunc

GetDeleteResumePointFunc returns a function that deletes a resume point

func GetSaveResumePointFunc

func GetSaveResumePointFunc(streamResumeRepo mongowatch.StreamResume) mongowatch.ChangeEventDispatcherFunc

GetSaveResumePointFunc returns a function that saves a resume point to our collection

func NewCollection

func NewCollection(col string, mongoInstance *mongo.Database) *mongo.Collection

NewCollection returns a new collection

Types

type ChangeStreamWatcher

type ChangeStreamWatcher struct {
	// contains filtered or unexported fields
}

ChangeStreamWatcher watches a mongo change stream for change events and reacts to those events.

func NewChangeStreamWatcher

func NewChangeStreamWatcher(col *mongo.Collection) *ChangeStreamWatcher

NewChangeStreamWatcher builds a new mongo watcher instance

func (*ChangeStreamWatcher) Start

func (csw *ChangeStreamWatcher) Start(
	ctx context.Context,
	fullDocumentMode options.FullDocument,
	resumePoint *mongowatch.ChangeStreamResumePoint,
	saveFunc, deleteFunc mongowatch.ChangeEventDispatcherFunc,
	dispatchFuncs ...mongowatch.ChangeEventDispatcherFunc,
) error

Start starts watching Mongo change stream for the collection and if a valid timestamp is provided, the stream starts from that point it processes events synchronously

type DocumentProcessor

type DocumentProcessor struct {
	// contains filtered or unexported fields
}

DocumentProcessor is a wrapper around the mongo change stream watcher simplifies the usage of the stream manager by marshaling the internal mongo structure to JSON also exposing two functions for handling document changes and deletions this way handlers can flexibly unmarshal docs into their own structs

func NewDataProcessor

func NewDataProcessor(targetDB *mongo.Database, targetCollectionName string, resumeSuffix string, localDB *mongo.Database) *DocumentProcessor

NewDataProcessor creates a new DocumentProcessor

func (DocumentProcessor) Start

func (dp DocumentProcessor) Start(actions mongowatch.CollectionWatcher, fullDocumentMode options.FullDocument) error

Start starts the doc processor

func (DocumentProcessor) StartWithRetry

func (dp DocumentProcessor) StartWithRetry(bo backoff.BackOff, actions mongowatch.CollectionWatcher, fullDocumentMode options.FullDocument) error

StartWithRetry starts the doc processor with a retry mechanism

func (DocumentProcessor) Stop

func (dp DocumentProcessor) Stop()

Stop stops the doc processor

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages the change stream

func NewManager

func NewManager(
	resumeRepo mongowatch.StreamResume,
	watcher mongowatch.ChangeStreamWatcher,
	changeEventSaveFunc mongowatch.ChangeEventDispatcherFunc,
	changeEventDeleteFunc mongowatch.ChangeEventDispatcherFunc,
) *Manager

NewManager creates a new change stream manager

func (*Manager) Stop

func (m *Manager) Stop()

Stop stops the change stream manager

func (*Manager) Watch

Watch starts the change stream manager

type ResumeRepository

type ResumeRepository struct {
	// contains filtered or unexported fields
}

ResumeRepository stores metadata of mongo change events for resumption

func NewStreamResumeRepository

func NewStreamResumeRepository(col *mongo.Collection) *ResumeRepository

NewStreamResumeRepository builds a new change stream repo instance

func (*ResumeRepository) Count

func (csr *ResumeRepository) Count() (int64, error)

Count returns the total doc count

func (*ResumeRepository) DeleteResumePoint

func (csr *ResumeRepository) DeleteResumePoint(ctx context.Context, token mongowatch.ResumeToken) error

DeleteResumePoint deletes a resumption point

func (*ResumeRepository) FetchAll

FetchAll returns all resume points

func (*ResumeRepository) GetLastResumePoint

func (csr *ResumeRepository) GetLastResumePoint() (*mongowatch.ChangeStreamResumePoint, error)

GetLastResumePoint returns the last resumption point

func (*ResumeRepository) GetResumePoint

func (csr *ResumeRepository) GetResumePoint() (*mongowatch.ChangeStreamResumePoint, error)

GetResumePoint returns the mongo stream token for the last change stream event that was recorded This may be used to resume change events from the point of the last change event, meaning last event will be skipped.

func (*ResumeRepository) GetResumeTime

func (csr *ResumeRepository) GetResumeTime() (*primitive.Timestamp, error)

GetResumeTime returns the mongo stream timestamp for the last change stream event that was recorded

func (*ResumeRepository) SaveResumePoint

SaveResumePoint saves a resumption point

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL