collector

package
v2.0.0-...-acbaf60 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2023 License: GPL-3.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// bson deserialize workload is CPU-intensive task
	PipelineQueueMaxNr    = 8
	PipelineQueueMiddleNr = 4
	PipelineQueueMinNr    = 1
	PipelineQueueLen      = 64 * 2

	DurationTime                  = 6000 // unit: ms.
	DDLCheckpointInterval         = 300  // unit: ms.
	FilterCheckpointGap           = 180  // unit: seconds. no checkpoint update, flush checkpoint mandatory
	FilterCheckpointCheckInterval = 180  // unit: seconds.
	CheckCheckpointUpdateTimes    = 10   // at most times of time check
)
View Source
const (
	FullSyncReaderOplogStoreDiskReadBatch = 10000
)
View Source
const MaxUnAckListLength = 128 * 256

Variables

This section is empty.

Functions

This section is empty.

Types

type Batcher

type Batcher struct {
	// contains filtered or unexported fields
}

* as we mentioned in syncer.go, Batcher is used to batch oplog before sending in order to * improve performance.

func NewBatcher

func NewBatcher(syncer *OplogSyncer, filterList filter.OplogFilterChain,
	handler OplogHandler, workerGroup []*Worker) *Batcher

func (*Batcher) BatchMore

func (batcher *Batcher) BatchMore() (genericOplogs [][]*oplog.GenericOplog, barrier bool, allEmpty bool, exit bool)

BatchMore *

  • this function is used to gather oplogs together.
  • honestly speaking, it's complicate so that reading unit tests may help you
  • to make it more clear. The reason this function is so complicate is there're
  • too much corner cases here.
  • return batched oplogs and barrier flag.
  • set barrier if find DDL.
  • i d i c u i
  • | |

func (*Batcher) Fini

func (batcher *Batcher) Fini()

type Module

type Module interface {
	IsRegistered() bool

	/**
	 * Module install and initialize. return false on failed
	 * and only invocation on WriteController is preparing
	 */
	Install() bool

	/**
	 * Handle outstanding request message. and messages
	 * are passed one by one. Any changes of message in
	 * Handle() will be preserved and delivery to next
	 *
	 * @return tunnel's error code (<0) or ack value
	 *
	 */
	Handle(message *tunnel.WMessage) int64
}

type OplogHandler

type OplogHandler interface {
	// invocation on every oplog consumed
	Handle(log *oplog.PartialLog)
}

type OplogSyncer

type OplogSyncer struct {
	OplogHandler

	// source mongodb replica set name
	Replset string

	// pending queue. used by raw log parsing. we buffered the
	// target raw oplogs in buffer and push them to pending queue
	// when buffer is filled in. and transfer to log queue
	// buffer            []*bson.Raw // move to persister
	PendingQueue []chan [][]byte

	LastFetchTs primitive.Timestamp // the previous last fetch timestamp

	// can be closed
	CanClose  bool
	SyncGroup []*OplogSyncer
	// contains filtered or unexported fields
}

OplogSyncer poll oplogs from original source MongoDB.

func NewOplogSyncer

func NewOplogSyncer(
	replset string,
	startPosition interface{},
	fullSyncFinishPosition int64,
	mongoUrl string,
	gids []string) *OplogSyncer

* Syncer is used to fetch oplog from source MongoDB and then send to different workers which can be seen as * a network sender. There are several syncer coexist to improve the fetching performance. * The data flow in syncer is: * source mongodb --> reader --> persister --> pending queue(raw data) --> logs queue(parsed data) --> worker * The reason we split pending queue and logs queue is to improve the performance.

func (*OplogSyncer) Bind

func (sync *OplogSyncer) Bind(w *Worker)

bind different worker

func (*OplogSyncer) Fini

func (sync *OplogSyncer) Fini()

func (*OplogSyncer) Handle

func (sync *OplogSyncer) Handle(log *oplog.PartialLog)

func (*OplogSyncer) Init

func (sync *OplogSyncer) Init()

func (*OplogSyncer) RestAPI

func (sync *OplogSyncer) RestAPI()

func (*OplogSyncer) Start

func (sync *OplogSyncer) Start()

start to polling oplog

func (*OplogSyncer) StartDiskApply

func (sync *OplogSyncer) StartDiskApply()

func (*OplogSyncer) String

func (sync *OplogSyncer) String() string

type Persister

type Persister struct {

	// batch data([]byte) together and send to downstream
	Buffer [][]byte

	// disk queue used to store oplog temporarily
	DiskQueue *diskQueue.DiskQueue
	// contains filtered or unexported fields
}

func NewPersister

func NewPersister(replset string, sync *OplogSyncer) *Persister

func (*Persister) GetFetchStage

func (p *Persister) GetFetchStage() int32

func (*Persister) GetQueryTsFromDiskQueue

func (p *Persister) GetQueryTsFromDiskQueue() primitive.Timestamp

func (*Persister) InitDiskQueue

func (p *Persister) InitDiskQueue(dqName string)

func (*Persister) Inject

func (p *Persister) Inject(input []byte)

inject data

func (*Persister) PushToPendingQueue

func (p *Persister) PushToPendingQueue(input []byte)

func (*Persister) RestAPI

func (p *Persister) RestAPI()

func (*Persister) SetFetchStage

func (p *Persister) SetFetchStage(fetchStage int32)

func (*Persister) Start

func (p *Persister) Start()

type TransferEventListener

type TransferEventListener struct {
	// contains filtered or unexported fields
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(syncer *OplogSyncer, id uint32) *Worker

func (*Worker) AllAcked

func (worker *Worker) AllAcked(allAcked bool)

func (*Worker) Init

func (worker *Worker) Init() bool

func (*Worker) IsAllAcked

func (worker *Worker) IsAllAcked() bool

func (*Worker) Offer

func (worker *Worker) Offer(batch []*oplog.GenericOplog)

func (*Worker) RestAPI

func (worker *Worker) RestAPI()

func (*Worker) SetInitSyncFinishTs

func (worker *Worker) SetInitSyncFinishTs(fullSyncFinishPosition int64)

func (*Worker) StartWorker

func (worker *Worker) StartWorker()

func (*Worker) String

func (worker *Worker) String() string

type WriteController

type WriteController struct {

	// current max lsn_ack value
	LatestLsnAck int64
	// contains filtered or unexported fields
}

func NewWriteController

func NewWriteController(worker *Worker) *WriteController

func (*WriteController) Send

func (controller *WriteController) Send(logs []*oplog.GenericOplog, tag uint32) int64

func (*WriteController) SetInitSyncFinishTs

func (controller *WriteController) SetInitSyncFinishTs(fullSyncFinishPosition int64)

set init sync finish timestamp if tunnel is direct

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL