mongoreplay

package
v0.0.0-...-d7caf96 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2023 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LastUpdatedResumeFile = "/tmp/last-updated-resume-token"
)

Variables

View Source
var (
	ConfigFolder                 = os.Getenv("CONFIG_FOLDER")
	DefaultOplogConfigFile       = fmt.Sprintf("%s/oplog_config.json", ConfigFolder)
	DefaultSourceMongoConfigFile = fmt.Sprintf("%s/source_mongo_config.json", ConfigFolder)
	DefaultDestMongoFile         = fmt.Sprintf("%s/dest_mongo_config.json", ConfigFolder)
)
View Source
var (
	// Stages
	InitStage                StageTypeT = 0
	PreparingCollectionStage StageTypeT = 1
	DumpingCollectionStage   StageTypeT = 2
	TailingOplogStage        StageTypeT = 3

	// States
	PendingState StateTypeT = 0
	SuccessState StateTypeT = 1
	FailedState  StateTypeT = 2
)

Functions

This section is empty.

Types

type Buffer

type Buffer struct {
	Ctx context.Context

	Config *BufferConfig
	// contains filtered or unexported fields
}

func NewBuffer

func NewBuffer(ctx context.Context, flusherFunc FlusherFunc) (buffer *Buffer, err error)

func (*Buffer) Flush

func (buffer *Buffer) Flush() (msg *MessageN, err error)

func (*Buffer) FlushAll

func (buffer *Buffer) FlushAll() (msgs []*MessageN, err error)

func (*Buffer) IsEmpty

func (buffer *Buffer) IsEmpty() (isEmpty bool)

func (*Buffer) IsFull

func (buffer *Buffer) IsFull() (isFull bool)

func (*Buffer) Store

func (buffer *Buffer) Store(msg *MessageN) (err error)

func (*Buffer) String

func (buffer *Buffer) String() (displayStr string)

type BufferConfig

type BufferConfig struct {
	Capacity int `json:"capacity"`
}

type Controller

type Controller struct {
	Ctx context.Context

	SourceCollection *OplogCollection
	DestCollection   *OplogCollection

	StageTracker *StageTracker
	// contains filtered or unexported fields
}

func NewController

func NewController(ctx context.Context, sourceCollection *OplogCollection, destCollection *OplogCollection) (controller *Controller, err error)

func (*Controller) Run

func (controller *Controller) Run() (err error)

type Dumper

type Dumper struct {
	Ctx           context.Context
	Config        *DumperConfig
	SrcCollection *OplogCollection
	DstCollection *OplogCollection

	LastResumeToken *ResumeTokenStore

	DumperCloseCh chan bool
	DumperCh      chan *MessageN
	// contains filtered or unexported fields
}

func (*Dumper) Run

func (dumper *Dumper) Run(args ...interface{}) (err error)

func (*Dumper) StartQuery

func (dumper *Dumper) StartQuery() (err error)

type DumperConfig

type DumperConfig struct {
	FetchCountThreshold int `json:"fetch_count_threshold"`
}

type Filter

type Filter struct {
	FilterKey   string `json:"filter_key"`
	FilterValue string `json:"filter_value"`
	FilterType  string `json:"filter_type"`
}

type FlusherFunc

type FlusherFunc func(msg *MessageN) (err error)

type MessageN

type MessageN struct {
	CollectionPath string                 `json:"ns"`
	FullDocument   map[string]interface{} `json:"o"`
	OperationType  OperationTypeT         `json:"op"`
	Timestamp      primitive.Timestamp    `json:"ts"`
}

type MongoConfig

type MongoConfig struct {
	Username string `json:"username"`
	Password string `json:"password"`
	Host     string `json:"host"`
	Port     string `json:"port"`
	DbName   string `json:"db_name"`
}

func NewMongoConfig

func NewMongoConfig(fileName string) (sourceMongoConfig *MongoConfig, err error)

func (*MongoConfig) GetUrl

func (sourceMongoConfig *MongoConfig) GetUrl() (url string)

type NoopStageExecutor

type NoopStageExecutor struct{}

func (*NoopStageExecutor) Run

func (noopStageExecutor *NoopStageExecutor) Run(args ...interface{}) (err error)

type OperationTypeT

type OperationTypeT string
var (
	InsertOperation OperationTypeT = "i"
	UpdateOperation OperationTypeT = "u"
	DeleteOperation OperationTypeT = "d"
)

type Oplog

type Oplog struct {
	Ctx        context.Context
	CancelFunc context.CancelFunc

	SrcCollections map[string]*OplogCollection

	DstCollections map[string]*OplogCollection
	// contains filtered or unexported fields
}

func New

func New() (oplogCtx *Oplog, err error)

func (*Oplog) Connect

func (oplogCtx *Oplog) Connect() (err error)

func (*Oplog) Run

func (oplogCtx *Oplog) Run() (err error)

type OplogCollection

type OplogCollection struct {
	Name            string            `json:"name"`
	Filters         []Filter          `json:"filters"`
	MongoCollection *mongo.Collection `json:"mongo_collection"`
	MongoDatabase   *mongo.Database   `json:"mongo_database"`
}

func (*OplogCollection) AddCollectionFilter

func (collection *OplogCollection) AddCollectionFilter(filters bson.M, isOplog bool) (err error)

func (*OplogCollection) Delete

func (collection *OplogCollection) Delete(filters bson.M) (err error)

func (*OplogCollection) GetCollectionPath

func (collection *OplogCollection) GetCollectionPath() (collectionPath string)

func (*OplogCollection) GetOplogFilter

func (collection *OplogCollection) GetOplogFilter(resumeToken *ResumeTokenStore) (filters bson.M, err error)

type OplogConfig

type OplogConfig struct {
	DbName      string            `json:"db_name"`
	Collections []OplogCollection `json:"collections"`
}

func NewOplogConfig

func NewOplogConfig() (oplogConfig *OplogConfig, err error)

type OplogTailer

type OplogTailer struct {
	Ctx context.Context

	Collection *OplogCollection

	FetchCountThreshold int

	WatchThreshold            int
	WatchCount                int
	ShouldHonorWatchThreshold bool

	CtrlrCh chan *MessageN
}

func (*OplogTailer) FetchFromOplog

func (tailer *OplogTailer) FetchFromOplog(resumeToken *ResumeTokenStore) (messages []*MessageN, err error)

func (*OplogTailer) Run

func (tailer *OplogTailer) Run(args ...interface{}) (err error)

func (*OplogTailer) ShouldContinueProcessing

func (tailer *OplogTailer) ShouldContinueProcessing() (shouldContinue bool)

type QueryFunc

type QueryFunc func(*MessageN) error

type QueryGenerator

type QueryGenerator struct {
	Ctx        context.Context
	Collection *mongo.Collection
	// contains filtered or unexported fields
}

func NewQueryGenerator

func NewQueryGenerator(ctx context.Context, coll *mongo.Collection) (queryGen *QueryGenerator, err error)

func (*QueryGenerator) Delete

func (queryGen *QueryGenerator) Delete(msg *MessageN) (err error)

func (*QueryGenerator) Insert

func (queryGen *QueryGenerator) Insert(msg *MessageN) (err error)

func (*QueryGenerator) Process

func (queryGen *QueryGenerator) Process(msg *MessageN) (err error)

func (*QueryGenerator) ProcessAll

func (queryGen *QueryGenerator) ProcessAll(messages []*MessageN) (err error)

func (*QueryGenerator) Update

func (queryGen *QueryGenerator) Update(msg *MessageN) (err error)

type ResumeTokenStore

type ResumeTokenStore struct {
	Timestamp primitive.Timestamp `json:"timestamp"`
}

func (*ResumeTokenStore) Copy

func (resumeToken *ResumeTokenStore) Copy() (copied *ResumeTokenStore)

func (ResumeTokenStore) Fetch

func (resumeTokenStore ResumeTokenStore) Fetch() (resumeToken *ResumeTokenStore, err error)

func (*ResumeTokenStore) Store

func (resumeTokenStore *ResumeTokenStore) Store() (err error)

type Seeder

type Seeder struct {
	Count      int
	Collection *OplogCollection

	ShouldClean bool
	// contains filtered or unexported fields
}

func NewSeeder

func NewSeeder(count int, collection *OplogCollection) (seeder *Seeder, err error)

func (*Seeder) CleanDb

func (seeder *Seeder) CleanDb() (err error)

func (*Seeder) GetRowsToSeed

func (seeder *Seeder) GetRowsToSeed() (err error)

func (*Seeder) Seed

func (seeder *Seeder) Seed() (err error)

type Stage

type Stage struct {
	StartTime       time.Time `json:"start_time"`
	StopTime        time.Time `json:"stop_time"`
	LastHeartbeatAt time.Time `json:"last_heartbeat_at"`

	StageType StageTypeT `json:"stage_type"`
	Status    StateTypeT `json:"status"`

	Metadata map[string]interface{} `json:"metadata"`
}

type StageExecutor

type StageExecutor interface {
	Run(...interface{}) error
}

func NewDumper

func NewDumper(ctx context.Context, srcCollection *OplogCollection, dstCollection *OplogCollection) (stageExecutor StageExecutor, err error)

func NewNoopStageExecutor

func NewNoopStageExecutor(ctx context.Context, srcCollection *OplogCollection, dstCollection *OplogCollection) (stageExecutor StageExecutor, err error)

func NewOplogTailer

func NewOplogTailer(ctx context.Context, collection *OplogCollection, dstCollection *OplogCollection) (stageExecutor StageExecutor, err error)

func NewTailerManager

func NewTailerManager(ctx context.Context, srcColl *OplogCollection, dstColl *OplogCollection) (stageExecutor StageExecutor, err error)

type StageTracker

type StageTracker struct {
	Ctx       context.Context
	CurrStage StageTypeT
	Stages    map[StageTypeT]*Stage

	SrcCollection *OplogCollection
	DstCollection *OplogCollection
	// contains filtered or unexported fields
}

func NewStageTracker

func NewStageTracker(ctx context.Context, srcCollection *OplogCollection, dstCollection *OplogCollection) (stageTracker *StageTracker, err error)

func (*StageTracker) Next

func (stageTracker *StageTracker) Next(args ...interface{}) (err error)

func (*StageTracker) Run

func (stageTracker *StageTracker) Run() (err error)

func (*StageTracker) RunStage

func (stageTracker *StageTracker) RunStage(args ...interface{}) (err error)

type StageTypeT

type StageTypeT uint8

type StateTypeT

type StateTypeT uint8

type TailerManager

type TailerManager struct {
	Ctx context.Context

	SourceCollection *OplogCollection
	DestCollection   *OplogCollection

	LastResumeToken *ResumeTokenStore
	// contains filtered or unexported fields
}

func (*TailerManager) Run

func (tailMgr *TailerManager) Run(args ...interface{}) (err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL