hummingbird

package module
v0.0.0-...-0a4dc40 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2023 License: Apache-2.0 Imports: 33 Imported by: 0

README

The Hummingbird Project

As a result of "losing" my MongoPush source code, I rewrote it but on a smaller scale. I couldn't get much meaningful help in my previous close source project. To support many use cases is too big an effort for a person to take on, especially the oplogs streaming part which is in fact a reverse engineering of replication. I still think it is a great idea and many can benefit from it. So, here you go, an open source project. Contributions are welcome.

The idea of the repository name was from the movie The Hummingbird Project (2018). The world can use good ideas. Birds can't fly over the ocean, not because they lack courage, but because there is no one waiting on the other side.

Quick Start

Configuration File Example
{
  "command": "all",
  "drop": true,
  "source": "mongodb://user:password@source@example.com/?compressors=zstd,snappy&readPreference=secondaryPreferred",
  "target": "mongodb+srv://user:password@target.example.mongodb.net/?compressors=zstd,snappy&w=2&retryWrites=true",
  "license": "Apache-2.0"
}
Start Migration
  • Start neutrino
go run main/hummingbird.go -start configuration.json
  • Add Additional Workers
go run main/hummingbird.go -worker configuration.json
Progress Monitoring

http://localhost:3629

Build

./build.sh

Download

./scripts/download-from-docker.sh

Configurations

{
  "block": 10000,
  "command": "all|config|index|data|data-only",
  "drop": false,
  "includes": [
    {
      "namespace": "database.collection",
      "filter": {},
      "to": "database.collection",
      "limit": 0,
      "masks": ["field"],
      "method": "default|hex|partial"
    }
  ],
  "license": "Apache-2.0",
  "port": 3629,
  "source": "mongodb://[user:XXXXXX@]host[:port][/[db][?options]]",
  "spool": "./spool",
  "target": "mongodb+srv://user:XXXXXX@host[/[db][?options]]",
  "verbose": false,
  "workers": 8,
  "yes": false
}

License

Apache-2.0 License

Documentation

Index

Constants

View Source
const (
	// TaskAdded added
	TaskAdded = "added"
	// TaskCompleted completed
	TaskCompleted = "completed"
	// TaskFailed failed
	TaskFailed = "failed"
	// TaskProcessing processing
	TaskProcessing = "processing"
	// TaskSplitting splitting
	TaskSplitting = "splitting"
)
View Source
const (
	// MaskDefault uses default masking method
	MaskDefault = "default"
	// MaskHEX uses HEX masking method
	MaskHEX = "hex"
	// MaskPartial uses partial masking method
	MaskPartial = "partial"
)
View Source
const (
	// DefaultSpool defines default work space
	DefaultSpool = "./spool"
	// MaxBlockSize defines max batch size of a task
	MaxBlockSize = 10000
	// MaxNumberWorkers defines max number of concurrent workers
	MaxNumberWorkers = 16
	// NumberWorkers defines max number of concurrent workers
	NumberWorkers = 8
	// Port defines port number to listen to
	Port = 3629
)
View Source
const (
	// CommandAll copies all
	CommandAll = "all"
	// CommandConfig copies configurations
	CommandConfig = "config"
	// CommandData copies data and tail oplogs after completion
	CommandData = "data"
	// CommandDataOnly copies data only
	CommandDataOnly = "data-only"
	// CommandIndex copies indexes
	CommandIndex = "index"
	// CommandOplog tails oplogs
	CommandOplog = "oplog"
)
View Source
const (
	// BSONSizeLimit set to 10,000
	BSONSizeLimit = 16 * mb
	// CacheDataSizeLimit set to 10,000
	CacheDataSizeLimit = 4 * BSONSizeLimit
	// GZippedBSONFileExt is .bson.gz
	GZippedBSONFileExt = ".bson.gz"
	// OplogBatchSize set to 10,000
	OplogBatchSize = 10000
)
View Source
const (
	// DefaultDuration  s.duration to simulate
	DefaultDuration = 5 * time.Minute
	// DefaultNumOplogs default number of oplogs per thread
	DefaultNumOplogs = 300
)
View Source
const (

	// MaxBatchDataSize size of a insert batch
	MaxBatchDataSize = (64 * mb)
	// MaxBatchSize size of a insert batch
	MaxBatchSize = 1000
)
View Source
const (
	// FavIcon favicon
	FavIcon = `` /* 940-byte string literal not displayed */
	// LogoPNG hummingbird PNG
	LogoPNG = `` /* 5132-byte string literal not displayed */
)
View Source
const (
	// MetaDBName defines default meta database name
	MetaDBName = "_neutrino"
	// MetaLogs defines default meta oplogs collection name
	MetaLogs = "logs"
	// MetaOplogs defines default meta oplogs collection name
	MetaOplogs = "oplogs"
	// MetaTasks defines default meta tasks collection name
	MetaTasks = "tasks"
)
View Source
const HTMLTemplate = `` /* 4019-byte string literal not displayed */

HTMLTemplate stores contents

View Source
const (
	// NumberSplitters number of splitters
	NumberSplitters = 4
)

Variables

View Source
var (
	// Rainbow colors
	Rainbow = []string{"Red", "Orange", "Yellow", "Green", "Blue", "Indigo", "Violet"}
)

Functions

func CollectionCreator

func CollectionCreator() error

CollectionCreator creates collections at target

func Compare

func Compare(filename string) error

Compare compares migration results

func ConfigCopier

func ConfigCopier() error

ConfigCopier copies configuration including indexes from source to target

func ConfigureMaskOption

func ConfigureMaskOption(include *Include) error

ConfigureMaskOption assigns mask option

func DataCopier

func DataCopier() error

DataCopier copies data from source to target

func DataGen

func DataGen(coll *mongo.Collection, total int) (*mongo.InsertManyResult, error)

DataGen populate data

func DataGenMulti

func DataGenMulti(db *mongo.Database, total int, nColls int) error

DataGenMulti populate data into different collections

func DocGen

func DocGen(i int) bson.D

DocGen returns a bson doc

func DoesDataExist

func DoesDataExist() error

DoesDataExist check if data already exists

func DoesFileExist

func DoesFileExist(filename string) bool

DoesFileExist returns true if file exists

func GetAllReplicas

func GetAllReplicas(uri string) ([]string, error)

GetAllReplicas return all connections strings from an URI

func GetDateTime

func GetDateTime() string

GetDateTime returns formatted date/time

func GetHTMLTemplate

func GetHTMLTemplate() (*template.Template, error)

GetHTMLTemplate returns HTML template

func GetMongoClient

func GetMongoClient(uri string) (*mongo.Client, error)

GetMongoClient returns a mongo client by a connection string

func GetMongoClientWait

func GetMongoClientWait(connstr string, duration ...time.Duration) (*mongo.Client, error)

GetMongoClientWait waits and returns mongo client

func GetQualifiedDBs

func GetQualifiedDBs(client *mongo.Client, metaDB string) ([]string, error)

GetQualifiedDBs returns a list of qualified database names

func GetQualifiedNamespaces

func GetQualifiedNamespaces(client *mongo.Client, includeCollection bool, metaDB string) ([]string, error)

GetQualifiedNamespaces returns a list of qualified namespace names

func GetTailableCursor

func GetTailableCursor(client *mongo.Client, ts *primitive.Timestamp) (*mongo.Cursor, error)

GetTailableCursor returns a tailable cursor

func IndexCopier

func IndexCopier() error

IndexCopier copies indexes from source to target

func IsBalancerEnabled

func IsBalancerEnabled(client *mongo.Client) (bool, error)

IsBalancerEnabled checks if balancer is enabled

func MaskFields

func MaskFields(doc *bson.D, fields []string, method string)

MaskFields mask all matched fields by traversing a doc

func Neutrino

func Neutrino(version string) error

Neutrino routes to a command

func OplogStreamers

func OplogStreamers() error

OplogStreamers copies oplogs from source to target

func RedactedURI

func RedactedURI(uri string) string

RedactedURI redacted login and password

func Resume

func Resume(filename string, extra ...bool) error

Resume resumes a migration

func Simulate

func Simulate(filename string) error

Simulate simulates 2 inserts, 1 update/delete, and 1 find

func SkipOplog

func SkipOplog(oplog Oplog) bool

SkipOplog returns if oplog should be skipped

func Splitter

func Splitter(tasks []*Task) error

Splitter splits collection into small tasks

func Start

func Start(filename string, extra ...bool) error

Start starts a migration

func StartSimulation

func StartSimulation(sim Simulator) error

StartSimulation starts a simulation

func StartWebServer

func StartWebServer(port int) error

StartWebServer start an http server at port 3629

func Stringify

func Stringify(doc interface{}) string

Stringify returns JSON string

func ToFloat64

func ToFloat64(s interface{}) float64

ToFloat64 converts a value to float64

func ToInt32

func ToInt32(s interface{}) int32

ToInt32 converts a value to int32

func ToInt64

func ToInt64(s interface{}) int64

ToInt64 converts a value to int64

func ValidateMigratorConfig

func ValidateMigratorConfig(migrator *Migrator) error

ValidateMigratorConfig validates configuration from a file

func Wait

func Wait() error

Wait waits for all tasks to be processed

func Worker

func Worker(id string) error

Worker copies data

Types

type BSONReader

type BSONReader struct {
	Stream io.ReadCloser
}

BSONReader stores bson reader info

func NewBSONReader

func NewBSONReader(filename string) (*BSONReader, error)

NewBSONReader returns a bson reader

func (*BSONReader) Next

func (p *BSONReader) Next() []byte

Next returns next bson doc

type BulkWriteOplogsResult

type BulkWriteOplogsResult struct {
	DeletedCount  int64
	InsertedCount int64
	ModifiedCount int64
	UpsertedCount int64
	TotalCount    int64
}

BulkWriteOplogsResult stores results

func BulkWriteOplogs

func BulkWriteOplogs(oplogs []Oplog) (*BulkWriteOplogsResult, error)

BulkWriteOplogs applies oplogs in bulk

type Chart

type Chart struct {
	Title       string
	Completions [][2]interface{}
}

Chart shows progress

type ConfigChunk

type ConfigChunk struct {
	Namespace string `json:"ns" bson:"ns"`
	Max       bson.D `json:"max" bson:"max"`
	Min       bson.D `json:"min" bson:"min"`
	Shard     string `json:"shard" bson:"shard"`
}

ConfigChunk contains config.chunks

type ConfigCollection

type ConfigCollection struct {
	ID               string `bson:"_id"`
	DefaultCollation bson.D `bson:"defaultCollation"`
	Dropped          bool   `bson:"dropped"`
	Key              bson.D `bson:"key"`
	Unique           bool   `bson:"unique"`
}

ConfigCollection contains config.collections

type ConfigDB

type ConfigDB struct {
	ID          string `bson:"_id"`
	Partitioned bool   `bson:"partitioned"`
	Primary     string `bson:"primary"`
}

ConfigDB contains config.databases

type Include

type Include struct {
	Filter    bson.D   `bson:"filter,omitempty"`
	Limit     int64    `bson:"limit,omitempty"`
	Masks     []string `bson:"masks,omitempty"`
	Method    string   `bson:"method,omitempty"`
	Namespace string   `bson:"namespace"`
	To        string   `bson:"to,omitempty"`
}

Include stores namespace and query

func GetInclude

func GetInclude(value string) (*Include, error)

GetInclude returns Include

type Includes

type Includes []*Include

Includes stores Include

func (*Includes) Set

func (p *Includes) Set(value string) error

Set sets flag var

func (*Includes) String

func (p *Includes) String() string

type Migrator

type Migrator struct {
	Block    int      `bson:"block,omitempty"`
	Command  string   `bson:"command"`
	Includes Includes `bson:"includes,omitempty"`
	IsDrop   bool     `bson:"drop,omitempty"`
	License  string   `bson:"license,omitempty"`
	Port     int      `bson:"port,omitempty"`
	Source   string   `bson:"source"`
	Spool    string   `bson:"spool,omitempty"`
	Target   string   `bson:"target"`
	Verbose  bool     `bson:"verbose,omitempty"`
	Workers  int      `bson:"workers,omitempty"`
	Yes      bool     `bson:"yes,omitempty"`
	// contains filtered or unexported fields
}

Migrator stores migration configurations

func GetMigratorInstance

func GetMigratorInstance() *Migrator

GetMigratorInstance returns Migratro migratorInstance

func NewMigratorInstance

func NewMigratorInstance(filename string) (*Migrator, error)

NewMigratorInstance sets and returns a migrator instance

func ReadMigratorConfig

func ReadMigratorConfig(filename string) (*Migrator, error)

ReadMigratorConfig validates configuration from a file

func (*Migrator) AddOplogStreamer

func (inst *Migrator) AddOplogStreamer(streamer *OplogStreamer)

AddOplogStreamer returns isExit

func (*Migrator) CheckIfBalancersDisabled

func (inst *Migrator) CheckIfBalancersDisabled() error

CheckIfBalancersDisabled check if both source and target balancers are disabled

func (*Migrator) DropCollections

func (inst *Migrator) DropCollections() error

DropCollections drops all qualified collections

func (*Migrator) GetToNamespace

func (inst *Migrator) GetToNamespace(ns string) string

GetToNamespace returns target namespace

func (*Migrator) Included

func (inst *Migrator) Included() map[string]*Include

Included returns includes

func (*Migrator) IsExit

func (inst *Migrator) IsExit() bool

IsExit returns isExit

func (*Migrator) LiveStreamingOplogs

func (inst *Migrator) LiveStreamingOplogs()

LiveStreamingOplogs set isExit to true

func (*Migrator) NotifyWorkerExit

func (inst *Migrator) NotifyWorkerExit()

NotifyWorkerExit set isExit to true

func (*Migrator) Replicas

func (inst *Migrator) Replicas() map[string]string

Replicas returns replica URI map

func (*Migrator) ResetIncludesTo

func (inst *Migrator) ResetIncludesTo(includes Includes)

ResetIncludesTo is a convenient function for go tests

func (*Migrator) SkipNamespace

func (inst *Migrator) SkipNamespace(namespace string) bool

SkipNamespace skips namespace

func (*Migrator) SourceStats

func (inst *Migrator) SourceStats() *mdb.ClusterStats

SourceStats returns stats

func (*Migrator) TargetStats

func (inst *Migrator) TargetStats() *mdb.ClusterStats

TargetStats returns stats

func (*Migrator) Workspace

func (inst *Migrator) Workspace() Workspace

Workspace returns Workspace

type Oplog

type Oplog struct {
	Hash      *int64              `bson:"h"`
	Namespace string              `bson:"ns"`
	Object    bson.D              `bson:"o"`
	Operation string              `bson:"op"`
	Query     bson.D              `bson:"o2,omitempty"`
	Term      *int64              `bson:"t"`
	Timestamp primitive.Timestamp `bson:"ts"`
	Version   int                 `bson:"v"`
}

Oplog stores an oplog

type OplogStreamer

type OplogStreamer struct {
	SetName string
	Spool   string
	URI     string
	// contains filtered or unexported fields
}

OplogStreamer tails oplogs

func (*OplogStreamer) ApplyCachedOplogs

func (p *OplogStreamer) ApplyCachedOplogs() (string, error)

ApplyCachedOplogs applies cached oplogs to target

func (*OplogStreamer) CacheOplogs

func (p *OplogStreamer) CacheOplogs() error

CacheOplogs store oplogs in files

func (*OplogStreamer) IsCache

func (p *OplogStreamer) IsCache() bool

IsCache returns isCache

func (*OplogStreamer) LiveStream

func (p *OplogStreamer) LiveStream()

LiveStream begin applying oplogs to target

func (*OplogStreamer) LiveStreamOplogs

func (p *OplogStreamer) LiveStreamOplogs(ts *primitive.Timestamp) error

LiveStreamOplogs stream and apply oplogs

type OplogWriteModel

type OplogWriteModel struct {
	Namespace  string
	Operation  string
	WriteModel mongo.WriteModel
}

OplogWriteModel stores namespace and writeModel

func GetWriteModels

func GetWriteModels(oplog Oplog) []OplogWriteModel

GetWriteModels returns WriteModel from an oplog

type Simulator

type Simulator struct {
	Namespaces []string `bson:"namespaces"`
	Threads    struct {
		Find   int `bson:"find"`
		Insert int `bson:"insert"`
		Write  int `bson:"write"`
	} `bson:"threads"`
	Seconds   int    `bson:"seconds_to_run"`
	NumOplogs int    `bson:"oplogs_per_second"`
	URI       string `bson:"uri"`
	Verbose   bool   `bson:"verbose"`
	// contains filtered or unexported fields
}

Simulator stores simulation info

func (*Simulator) Find

func (s *Simulator) Find() error

Find simulates finds

func (*Simulator) Insert

func (s *Simulator) Insert() error

Insert simulates insertions

func (*Simulator) Modify

func (s *Simulator) Modify() error

Modify simulates updates and deletions

type Task

type Task struct {
	BeginTime    time.Time           `bson:"begin_time"`
	EndTime      time.Time           `bson:"end_time"`
	ID           primitive.ObjectID  `bson:"_id"`
	IDs          []interface{}       `bson:"ids"`
	Include      Include             `bson:"include"`
	Inserted     int                 `bson:"inserted"`
	Namespace    string              `bson:"ns"`
	ParentID     *primitive.ObjectID `bson:"parent_id"`
	SetName      string              `bson:"replica_set"`
	SourceCounts int                 `bson:"source_counts"`
	Status       string              `bson:"status"`
	UpdatedBy    string              `bson:"updated_by"`
}

Task holds migration task information

func (*Task) CopyData

func (p *Task) CopyData(source *mongo.Collection, target *mongo.Collection) error

CopyData copies data

type TaskStatusCounts

type TaskStatusCounts struct {
	Added      int32
	Completed  int32
	Failed     int32
	Processing int32
	Splitting  int32
}

TaskStatusCounts stores counts of all status

type Workspace

type Workspace struct {
	// contains filtered or unexported fields
}

Workspace stores meta database

func (*Workspace) CleanUpWorkspace

func (ws *Workspace) CleanUpWorkspace() error

CleanUpWorkspace removes all cached file

func (*Workspace) CountAllStatus

func (ws *Workspace) CountAllStatus() (TaskStatusCounts, error)

CountAllStatus returns task

func (*Workspace) CreateTaskIndexes

func (ws *Workspace) CreateTaskIndexes() error

CreateTaskIndexes create indexes on tasks collection

func (*Workspace) DropMetaDB

func (ws *Workspace) DropMetaDB() error

DropMetaDB drops meta database

func (*Workspace) FindAllParentTasks

func (ws *Workspace) FindAllParentTasks() ([]*Task, error)

FindAllParentTasks returns task by replica a set name

func (*Workspace) FindNextTaskAndUpdate

func (ws *Workspace) FindNextTaskAndUpdate(replset string, updatedBy string, rev int) (*Task, error)

FindNextTaskAndUpdate returns task by replica a set name

func (*Workspace) GetOplogTimestamp

func (ws *Workspace) GetOplogTimestamp(setName string) *primitive.Timestamp

GetOplogTimestamp returns timestamp of a shard/replica

func (*Workspace) InsertTasks

func (ws *Workspace) InsertTasks(tasks []*Task) error

InsertTasks inserts tasks to database

func (*Workspace) Log

func (ws *Workspace) Log(status string) error

Log adds a status to status collection

func (*Workspace) LogConfig

func (ws *Workspace) LogConfig() error

LogConfig records configs

func (*Workspace) Reset

func (ws *Workspace) Reset() error

Reset drops meta database and clean up workspace

func (*Workspace) ResetLongRunningTasks

func (ws *Workspace) ResetLongRunningTasks(ago time.Duration) (int, error)

ResetLongRunningTasks resets long running processing to added

func (*Workspace) ResetParentTask

func (ws *Workspace) ResetParentTask(task Task) error

ResetParentTask resets and deletes all child tasks

func (*Workspace) ResetProcessingTasks

func (ws *Workspace) ResetProcessingTasks() error

ResetProcessingTasks resets processing status to added

func (*Workspace) SaveOplogTimestamp

func (ws *Workspace) SaveOplogTimestamp(setName string, ts primitive.Timestamp) error

SaveOplogTimestamp updates timestamp of a shard/replica

func (*Workspace) UpdateTask

func (ws *Workspace) UpdateTask(task *Task) error

UpdateTask updates task

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL