Documentation ¶
Index ¶
- Constants
- Variables
- type Buffer
- func (buffer *Buffer) Flush() (msg *MessageN, err error)
- func (buffer *Buffer) FlushAll() (msgs []*MessageN, err error)
- func (buffer *Buffer) IsEmpty() (isEmpty bool)
- func (buffer *Buffer) IsFull() (isFull bool)
- func (buffer *Buffer) Store(msg *MessageN) (err error)
- func (buffer *Buffer) String() (displayStr string)
- type BufferConfig
- type Controller
- type Dumper
- type DumperConfig
- type Filter
- type FlusherFunc
- type MessageN
- type MongoConfig
- type NoopStageExecutor
- type OperationTypeT
- type Oplog
- type OplogCollection
- func (collection *OplogCollection) AddCollectionFilter(filters bson.M, isOplog bool) (err error)
- func (collection *OplogCollection) Delete(filters bson.M) (err error)
- func (collection *OplogCollection) GetCollectionPath() (collectionPath string)
- func (collection *OplogCollection) GetOplogFilter(resumeToken *ResumeTokenStore) (filters bson.M, err error)
- type OplogConfig
- type OplogTailer
- type QueryFunc
- type QueryGenerator
- func (queryGen *QueryGenerator) Delete(msg *MessageN) (err error)
- func (queryGen *QueryGenerator) Insert(msg *MessageN) (err error)
- func (queryGen *QueryGenerator) Process(msg *MessageN) (err error)
- func (queryGen *QueryGenerator) ProcessAll(messages []*MessageN) (err error)
- func (queryGen *QueryGenerator) Update(msg *MessageN) (err error)
- type ResumeTokenStore
- type Seeder
- type Stage
- type StageExecutor
- func NewDumper(ctx context.Context, srcCollection *OplogCollection, ...) (stageExecutor StageExecutor, err error)
- func NewNoopStageExecutor(ctx context.Context, srcCollection *OplogCollection, ...) (stageExecutor StageExecutor, err error)
- func NewOplogTailer(ctx context.Context, collection *OplogCollection, ...) (stageExecutor StageExecutor, err error)
- func NewTailerManager(ctx context.Context, srcColl *OplogCollection, dstColl *OplogCollection) (stageExecutor StageExecutor, err error)
- type StageFunction
- type StageTracker
- type StageTypeT
- type StateTypeT
- type TailerManager
Constants ¶
View Source
const (
LastUpdatedResumeFile = "/tmp/last-updated-resume-token"
)
Variables ¶
View Source
var ( ConfigFolder = os.Getenv("CONFIG_FOLDER") DefaultOplogConfigFile = fmt.Sprintf("%s/oplog_config.json", ConfigFolder) DefaultSourceMongoConfigFile = fmt.Sprintf("%s/source_mongo_config.json", ConfigFolder) DefaultDestMongoFile = fmt.Sprintf("%s/dest_mongo_config.json", ConfigFolder) )
View Source
var ( // Stages InitStage StageTypeT = 0 PreparingCollectionStage StageTypeT = 1 DumpingCollectionStage StageTypeT = 2 TailingOplogStage StageTypeT = 3 // States PendingState StateTypeT = 0 SuccessState StateTypeT = 1 FailedState StateTypeT = 2 )
Functions ¶
This section is empty.
Types ¶
type Buffer ¶
type Buffer struct { Ctx context.Context Config *BufferConfig // contains filtered or unexported fields }
type BufferConfig ¶
type BufferConfig struct {
Capacity int `json:"capacity"`
}
type Controller ¶
type Controller struct { Ctx context.Context SourceCollection *OplogCollection DestCollection *OplogCollection StageTracker *StageTracker // contains filtered or unexported fields }
func NewController ¶
func NewController(ctx context.Context, sourceCollection *OplogCollection, destCollection *OplogCollection) (controller *Controller, err error)
func (*Controller) Run ¶
func (controller *Controller) Run() (err error)
type Dumper ¶
type Dumper struct { Ctx context.Context Config *DumperConfig SrcCollection *OplogCollection DstCollection *OplogCollection LastResumeToken *ResumeTokenStore DumperCloseCh chan bool DumperCh chan *MessageN // contains filtered or unexported fields }
func (*Dumper) StartQuery ¶
type DumperConfig ¶
type DumperConfig struct {
FetchCountThreshold int `json:"fetch_count_threshold"`
}
type FlusherFunc ¶
type MessageN ¶
type MessageN struct { CollectionPath string `json:"ns"` FullDocument map[string]interface{} `json:"o"` OperationType OperationTypeT `json:"op"` Timestamp primitive.Timestamp `json:"ts"` }
type MongoConfig ¶
type MongoConfig struct { Username string `json:"username"` Password string `json:"password"` Host string `json:"host"` Port string `json:"port"` DbName string `json:"db_name"` }
func NewMongoConfig ¶
func NewMongoConfig(fileName string) (sourceMongoConfig *MongoConfig, err error)
func (*MongoConfig) GetUrl ¶
func (sourceMongoConfig *MongoConfig) GetUrl() (url string)
type NoopStageExecutor ¶
type NoopStageExecutor struct{}
func (*NoopStageExecutor) Run ¶
func (noopStageExecutor *NoopStageExecutor) Run(args ...interface{}) (err error)
type OperationTypeT ¶
type OperationTypeT string
var ( InsertOperation OperationTypeT = "i" UpdateOperation OperationTypeT = "u" DeleteOperation OperationTypeT = "d" )
type Oplog ¶
type Oplog struct { Ctx context.Context CancelFunc context.CancelFunc SrcCollections map[string]*OplogCollection DstCollections map[string]*OplogCollection // contains filtered or unexported fields }
type OplogCollection ¶
type OplogCollection struct { Name string `json:"name"` Filters []Filter `json:"filters"` MongoCollection *mongo.Collection `json:"mongo_collection"` MongoDatabase *mongo.Database `json:"mongo_database"` }
func (*OplogCollection) AddCollectionFilter ¶
func (collection *OplogCollection) AddCollectionFilter(filters bson.M, isOplog bool) (err error)
func (*OplogCollection) Delete ¶
func (collection *OplogCollection) Delete(filters bson.M) (err error)
func (*OplogCollection) GetCollectionPath ¶
func (collection *OplogCollection) GetCollectionPath() (collectionPath string)
func (*OplogCollection) GetOplogFilter ¶
func (collection *OplogCollection) GetOplogFilter(resumeToken *ResumeTokenStore) (filters bson.M, err error)
type OplogConfig ¶
type OplogConfig struct { DbName string `json:"db_name"` Collections []OplogCollection `json:"collections"` }
func NewOplogConfig ¶
func NewOplogConfig() (oplogConfig *OplogConfig, err error)
type OplogTailer ¶
type OplogTailer struct { Ctx context.Context Collection *OplogCollection FetchCountThreshold int WatchThreshold int WatchCount int ShouldHonorWatchThreshold bool CtrlrCh chan *MessageN }
func (*OplogTailer) FetchFromOplog ¶
func (tailer *OplogTailer) FetchFromOplog(resumeToken *ResumeTokenStore) (messages []*MessageN, err error)
func (*OplogTailer) Run ¶
func (tailer *OplogTailer) Run(args ...interface{}) (err error)
func (*OplogTailer) ShouldContinueProcessing ¶
func (tailer *OplogTailer) ShouldContinueProcessing() (shouldContinue bool)
type QueryGenerator ¶
type QueryGenerator struct { Ctx context.Context Collection *mongo.Collection // contains filtered or unexported fields }
func NewQueryGenerator ¶
func NewQueryGenerator(ctx context.Context, coll *mongo.Collection) (queryGen *QueryGenerator, err error)
func (*QueryGenerator) Delete ¶
func (queryGen *QueryGenerator) Delete(msg *MessageN) (err error)
func (*QueryGenerator) Insert ¶
func (queryGen *QueryGenerator) Insert(msg *MessageN) (err error)
func (*QueryGenerator) Process ¶
func (queryGen *QueryGenerator) Process(msg *MessageN) (err error)
func (*QueryGenerator) ProcessAll ¶
func (queryGen *QueryGenerator) ProcessAll(messages []*MessageN) (err error)
func (*QueryGenerator) Update ¶
func (queryGen *QueryGenerator) Update(msg *MessageN) (err error)
type ResumeTokenStore ¶
func (*ResumeTokenStore) Copy ¶
func (resumeToken *ResumeTokenStore) Copy() (copied *ResumeTokenStore)
func (ResumeTokenStore) Fetch ¶
func (resumeTokenStore ResumeTokenStore) Fetch() (resumeToken *ResumeTokenStore, err error)
func (*ResumeTokenStore) Store ¶
func (resumeTokenStore *ResumeTokenStore) Store() (err error)
type Seeder ¶
type Seeder struct { Count int Collection *OplogCollection ShouldClean bool // contains filtered or unexported fields }
func (*Seeder) GetRowsToSeed ¶
type Stage ¶
type Stage struct { StartTime time.Time `json:"start_time"` StopTime time.Time `json:"stop_time"` LastHeartbeatAt time.Time `json:"last_heartbeat_at"` StageType StageTypeT `json:"stage_type"` Status StateTypeT `json:"status"` Metadata map[string]interface{} `json:"metadata"` }
type StageExecutor ¶
type StageExecutor interface {
Run(...interface{}) error
}
func NewDumper ¶
func NewDumper(ctx context.Context, srcCollection *OplogCollection, dstCollection *OplogCollection) (stageExecutor StageExecutor, err error)
func NewNoopStageExecutor ¶
func NewNoopStageExecutor(ctx context.Context, srcCollection *OplogCollection, dstCollection *OplogCollection) (stageExecutor StageExecutor, err error)
func NewOplogTailer ¶
func NewOplogTailer(ctx context.Context, collection *OplogCollection, dstCollection *OplogCollection) (stageExecutor StageExecutor, err error)
func NewTailerManager ¶
func NewTailerManager(ctx context.Context, srcColl *OplogCollection, dstColl *OplogCollection) (stageExecutor StageExecutor, err error)
type StageFunction ¶
type StageFunction func(context.Context, *OplogCollection, *OplogCollection) (StageExecutor, error)
type StageTracker ¶
type StageTracker struct { Ctx context.Context CurrStage StageTypeT Stages map[StageTypeT]*Stage SrcCollection *OplogCollection DstCollection *OplogCollection // contains filtered or unexported fields }
func NewStageTracker ¶
func NewStageTracker(ctx context.Context, srcCollection *OplogCollection, dstCollection *OplogCollection) (stageTracker *StageTracker, err error)
func (*StageTracker) Next ¶
func (stageTracker *StageTracker) Next(args ...interface{}) (err error)
func (*StageTracker) Run ¶
func (stageTracker *StageTracker) Run() (err error)
func (*StageTracker) RunStage ¶
func (stageTracker *StageTracker) RunStage(args ...interface{}) (err error)
type StageTypeT ¶
type StageTypeT uint8
type StateTypeT ¶
type StateTypeT uint8
type TailerManager ¶
type TailerManager struct { Ctx context.Context SourceCollection *OplogCollection DestCollection *OplogCollection LastResumeToken *ResumeTokenStore // contains filtered or unexported fields }
func (*TailerManager) Run ¶
func (tailMgr *TailerManager) Run(args ...interface{}) (err error)
Click to show internal directories.
Click to hide internal directories.