mgo

package
v0.0.0-...-b1b21d8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 17, 2023 License: LGPL-3.0 Imports: 16 Imported by: 0

README

功能概述

  • 连接副本集,即使副本集处于内网状态也可通过hostMap 映射,连上
  • 为清洗数据专门创造的函数,支持断点,批量,多线程
  • 专门清洗oplog的函数,支持多线程清洗,对同一集合的多种操作阻塞进行

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNoDocuments = mongo.ErrNoDocuments
View Source
var NewObjectID = primitive.NewObjectID //创建一个新的objectid
View Source
var ObjectIDFromHex = primitive.ObjectIDFromHex //十六进制字符串转objectId

Functions

This section is empty.

Types

type ClearOplogOption

type ClearOplogOption struct {
	Thread    int64          //线程数量
	Init      bool           //是否初始化
	Oid       Timestamp      //起始id
	Show      any            //展示的字段
	Filter    map[string]any //查询参数
	BatchSize int32          //服务器每批次多少
}

type ClearOption

type ClearOption struct {
	Thread         int64          //线程数量
	Init           bool           //是否初始化
	Oid            ObjectID       //起始id
	Show           any            //展示的字段
	Desc           bool           //是否倒序
	Filter         map[string]any //查询参数
	Bar            bool           //是否开启进度条
	BatchSize      int32          //服务器每批次多少
	ClearBatchSize int64          //每次清洗的批次
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

mongodb 的操作========================================================================== start

func NewClient

func NewClient(ctx context.Context, opt ClientOption) (*Client, error)

新建客户端

func (*Client) ClearOplog

func (obj *Client) ClearOplog(preCctx context.Context, Func func(context.Context, Oplog, ObjectID, Timestamp) (ObjectID, Timestamp, error), tag string, clearOptions ...ClearOplogOption) error

清洗oplog集合

func (*Client) Dbs

func (obj *Client) Dbs(ctx context.Context) ([]string, error)

func (*Client) NewTable

func (obj *Client) NewTable(dbName string, tableName string) *Table

创建新的集合

type ClientOption

type ClientOption struct {
	Host    string
	Port    int
	Usr     string
	Pwd     string
	Direct  bool
	HostMap map[string]string
}

type FindData

type FindData struct {
	// contains filtered or unexported fields
}

func (*FindData) Bytes

func (obj *FindData) Bytes() ([]byte, error)

返回字节

func (*FindData) Data

func (obj *FindData) Data() map[string]any

func (*FindData) Decode

func (obj *FindData) Decode(val any) error

使用json.Unmarshal 解码

func (*FindData) Json

func (obj *FindData) Json() (gjson.Result, error)

返回gjson

func (*FindData) String

func (obj *FindData) String() (string, error)

返回json

type FindOption

type FindOption struct {
	BatchSize       int32          // 服务器返回的每个批次中包含的最大文件数。
	Limit           int64          //要返回的最大文档数
	Timeout         time.Duration  //超时时间
	NoCursorTimeout bool           //操作所创建的游标在一段时间不活动后不会超时
	Show            any            //描述哪些字段将被包含在操作返回的文件中的文件
	Skip            int64          //在将文档添加到结果中之前要跳过的文档数量
	Sort            map[string]int // 一个文件,指定返回文件的顺序
	Await           bool           //oplog 是否阻塞等待数据
}

type FindsData

type FindsData struct {
	// contains filtered or unexported fields
}

func (*FindsData) Close

func (obj *FindsData) Close(ctx context.Context) error

关闭游标

func (*FindsData) Decode

func (obj *FindsData) Decode(val any) error

使用json.Unmarshal 解码

func (*FindsData) Json

func (obj *FindsData) Json() (gjson.Result, error)

返回gjson

func (*FindsData) Len

func (obj *FindsData) Len() int

返回游标的长度

func (*FindsData) Map

func (obj *FindsData) Map() map[string]any

func (*FindsData) Next

func (obj *FindsData) Next(ctx context.Context) bool

是否有下一个数据

func (*FindsData) ReTry

func (obj *FindsData) ReTry(ctx context.Context) error

重试

type ObjectID

type ObjectID = primitive.ObjectID

type Oplog

type Oplog struct {
	Op        string    //操作
	Ns        string    //表名
	Timestamp Timestamp //操作时间
	ObjectID  ObjectID
	Data      map[string]any //数据
}

func ClearOplog

func ClearOplog(oplogData map[string]any) Oplog

清洗oplog data

type Table

type Table struct {
	// contains filtered or unexported fields
}

集合

func (*Table) Add

func (obj *Table) Add(pre_ctx context.Context, document any) (primitive.ObjectID, error)

添加文档

func (*Table) Adds

func (obj *Table) Adds(pre_ctx context.Context, document ...any) ([]primitive.ObjectID, error)

添加一批文档

func (*Table) ClearTable

func (obj *Table) ClearTable(preCtx context.Context, Func func(context.Context, map[string]any, ObjectID) (ObjectID, error), tag string, clearOptions ...ClearOption) error

清洗集合数据

func (*Table) ClearTables

func (obj *Table) ClearTables(preCtx context.Context, Func func(context.Context, []map[string]any, ObjectID) (ObjectID, error), tag string, clearOptions ...ClearOption) error

批量清洗集合数据

func (*Table) Count

func (obj *Table) Count(pre_ctx context.Context, filter any, opts ...FindOption) (int64, error)

集合数量

func (*Table) DbName

func (obj *Table) DbName() string

func (*Table) Del

func (obj *Table) Del(pre_ctx context.Context, document any) (int64, error)

删除一个文档

func (*Table) Dels

func (obj *Table) Dels(pre_ctx context.Context, document any) (int64, error)

删除一些文档

func (*Table) Exist

func (obj *Table) Exist(pre_ctx context.Context, filter any) (bool, error)

判断数据是否存在

func (*Table) Find

func (obj *Table) Find(pre_ctx context.Context, filter any, opts ...FindOption) (*FindData, error)

findone

func (*Table) Finds

func (obj *Table) Finds(pre_ctx context.Context, filter any, opts ...FindOption) (*FindsData, error)

findmany

func (*Table) NewTable

func (obj *Table) NewTable(tableName string) *Table

创建新的集合

func (*Table) TableName

func (obj *Table) TableName() string

集合名称

func (*Table) Tables

func (obj *Table) Tables(ctx context.Context) ([]string, error)

数据库名称

func (*Table) Update

func (obj *Table) Update(pre_ctx context.Context, filter any, update any, values ...map[string]any) (UpateResult, error)

更新一个文档

func (*Table) Updates

func (obj *Table) Updates(pre_ctx context.Context, filter any, update any, values ...map[string]any) (UpateResult, error)

更新一些文档

func (*Table) Upsert

func (obj *Table) Upsert(pre_ctx context.Context, filter any, update any, values ...map[string]any) (UpateResult, error)

upsert 一个文档

func (*Table) Upserts

func (obj *Table) Upserts(pre_ctx context.Context, filter any, update any, values ...map[string]any) (UpateResult, error)

upsert 一些文档

type Timestamp

type Timestamp = primitive.Timestamp //{T uint32   I uint32}

type UpateResult

type UpateResult struct {
	MatchedCount  int64 // 匹配的个数
	ModifiedCount int64 // 文档变更的数量,不包括增加
	UpsertedCount int64 // upsert的数量
	UpsertedID    primitive.ObjectID
	Exists        bool //是否存在
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL