mongostream

package
v0.9.71 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2020 License: Apache-2.0 Imports: 21 Imported by: 3

Documentation

Index

Constants

View Source
const Name = "mongo-stream"
View Source
const OplogCheckerCollectionName = "heartbeat"

Variables

This section is empty.

Functions

func GetRowDataFromOp

func GetRowDataFromOp(op *gtm.Op) *map[string]interface{}

func OplogPositionValueDecoder

func OplogPositionValueDecoder(v string) (interface{}, error)

func OplogPositionValueEncoder

func OplogPositionValueEncoder(v interface{}) (string, error)

func SetupInitialPosition

func SetupInitialPosition(cache position_cache.PositionCacheInterface, startPositionInSpec *config.MongoPosition) error

func UpdateCurrentPositionValue

func UpdateCurrentPositionValue(cache position_cache.PositionCacheInterface, positionValue config.MongoPosition) error

Types

type OplogChecker

type OplogChecker struct {
	// contains filtered or unexported fields
}

func NewOplogChecker

func NewOplogChecker(session *mgo.Session, sourceHost string, pipelineName string, ctx context.Context) *OplogChecker

func (*OplogChecker) MarkActive

func (checker *OplogChecker) MarkActive(data map[string]interface{})

func (*OplogChecker) Run

func (checker *OplogChecker) Run()

func (*OplogChecker) Stop

func (checker *OplogChecker) Stop()

type OplogHeartbeat

type OplogHeartbeat struct {
	ID   bson.ObjectId `bson:"_id,omitempty"`
	Name string        `bson:"name"`
	T    string        `bson:"t"`
}

type OplogPositionValue

type OplogPositionValue struct {
	StartPosition   *config.MongoPosition `json:"start_position" bson:"start_position"`
	CurrentPosition config.MongoPosition  `json:"current_position" bson:"current_position"`
}

type OplogTailer

type OplogTailer struct {
	// contains filtered or unexported fields
}

func NewOplogTailer

func NewOplogTailer(opts *OplogTailerOpt) *OplogTailer

func (*OplogTailer) AfterMsgCommit

func (tailer *OplogTailer) AfterMsgCommit(msg *core.Msg) error

func (*OplogTailer) Filter

func (tailer *OplogTailer) Filter(op *gtm.Op, option *filterOpt) bool

func (*OplogTailer) Run

func (tailer *OplogTailer) Run()

func (*OplogTailer) SendDeadSignal

func (tailer *OplogTailer) SendDeadSignal() error

func (*OplogTailer) Stop

func (tailer *OplogTailer) Stop()

func (*OplogTailer) Wait

func (tailer *OplogTailer) Wait()

type OplogTailerOpt

type OplogTailerOpt struct {
	// contains filtered or unexported fields
}

type PluginConfig

type PluginConfig struct {
	Source        *config.MongoConnConfig     `mapstructure:"source" toml:"source" json:"source"`
	PositionRepo  *config.GenericPluginConfig `mapstructure:"position-repo" toml:"position-repo" json:"position-repo"`
	StartPosition *config.MongoPosition       `mapstructure:"start-position" toml:"start-position" json:"start-position"`
	GtmConfig     *config.GtmConfig           `mapstructure:"gtm-config" toml:"gtm-config" json:"gtm-config"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL