types

package
v0.0.0-...-cb472e6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2021 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultEventChanSize = 100

Variables

This section is empty.

Functions

func GetEventDetail

func GetEventDetail(detailStr string) string

GetEventDetail get event document detail, returns EventDetail's detail field

Types

type ChangeDescription

type ChangeDescription struct {
	// updated details's value is the current value, not the previous value.
	UpdatedFields map[string]interface{}
	RemovedFields []string
}

type Event

type Event struct {
	// Oid represent the unique document key filed "_id"
	Oid           string
	Document      interface{}
	DocBytes      []byte
	OperationType OperType

	// The timestamp from the oplog entry associated with the event.
	ClusterTime TimeStamp

	// event token for resume after.
	Token EventToken

	// changed fields details in this event, describes which fields is updated or removed.
	ChangeDesc *ChangeDescription
}

func (*Event) String

func (e *Event) String() string

type EventDetail

type EventDetail struct {
	Detail        JsonString             `json:"detail"`
	UpdatedFields map[string]interface{} `json:"update_fields"`
	RemovedFields []string               `json:"deleted_fields"`
}

EventDetail event document detail and changed fields

type EventStream

type EventStream struct {
	Token         EventToken          `bson:"_id"`
	OperationType OperType            `bson:"operationType"`
	ClusterTime   primitive.Timestamp `bson:"clusterTime"`
	Namespace     Namespace           `bson:"ns"`
	DocumentKey   Key                 `bson:"documentKey"`
	UpdateDesc    UpdateDescription   `bson:"updateDescription"`
}

reference: https://docs.mongodb.com/manual/reference/change-events/

type EventToken

type EventToken struct {
	// Hex value of document's _id
	Data string `bson:"_data"`
}

mongodb change stream token, which represent a event's identity.

type JsonString

type JsonString string

func (JsonString) MarshalJSON

func (j JsonString) MarshalJSON() ([]byte, error)

func (*JsonString) UnmarshalJSON

func (j *JsonString) UnmarshalJSON(b []byte) error

type Key

type Key struct {
	// the unique document id, as is "_id"
	ID primitive.ObjectID `bson:"_id"`
}

type ListOptions

type ListOptions struct {
	// Filter helps you filter out which kind of data's change event you want
	// to receive, such as the filter :
	// {"bk_obj_id":"biz"} means you can only receives the data that has this kv.
	// Note: the filter's key must be a exist document key filed in the collection's
	// document
	Filter map[string]interface{}

	// list the documents only with these fields.
	Fields []string

	// EventStruct is the point data struct that the event decoded into.
	// Note: must be a point value.
	EventStruct interface{}

	// Collection defines which collection you want you watch.
	Collection string

	// Step defines the list step when the client try to list all the data defines in the
	// namespace. default value is `DefaultListStep`, value range [200,2000]
	PageSize *int
}

func (*ListOptions) CheckSetDefault

func (opts *ListOptions) CheckSetDefault() error

type ListWatchOptions

type ListWatchOptions struct {
	Options

	// Step defines the list step when the client try to list all the data defines in the
	// namespace. default value is `DefaultListStep`, value range [200,2000]
	PageSize *int
}

func (*ListWatchOptions) CheckSetDefault

func (lw *ListWatchOptions) CheckSetDefault() error

type Namespace

type Namespace struct {
	Database   string `bson:"db"`
	Collection string `bson:"coll"`
}

type OperType

type OperType string
const (
	// reference doc:
	// https://docs.mongodb.com/manual/reference/change-events/#change-events
	// Document operation type
	Insert  OperType = "insert"
	Delete  OperType = "delete"
	Replace OperType = "replace"
	Update  OperType = "update"

	// collection operation type.
	Drop   OperType = "drop"
	Rename OperType = "rename"

	// dropDatabase event occurs when a database is dropped.
	DropDatabase OperType = "dropDatabase"

	// For change streams opened up against a collection, a drop event, rename event,
	// or dropDatabase event that affects the watched collection leads to an invalidate event.
	Invalidate OperType = "invalidate"

	// Lister OperType is a self defined type, which is represent this operation comes from
	// a list watcher's find operations, it does not really come form the mongodb's change event.
	Lister OperType = "lister"
	// ListerDone OperType is a self defined type, which means that the list operation has already finished,
	// and the watch events starts. this OperType send only for once.
	// Note: it's only used in the ListWatch Operation.
	ListDone OperType = "listerDone"
)

type Options

type Options struct {
	// reference doc:
	// https://docs.mongodb.com/manual/reference/method/db.collection.watch/#change-stream-with-full-document-update-lookup
	// default value is true
	MajorityCommitted *bool

	// The maximum amount of time in milliseconds the server waits for new
	// data changes to report to the change stream cursor before returning
	// an empty batch.
	// default value is 1000ms
	MaxAwaitTime *time.Duration

	// OperationType describe which kind of operation you want to watch,
	// such as a "insert" operation or a "replace" operation.
	// If you don't set, it will means watch  all kinds of operations.
	OperationType *OperType

	// Filter helps you filter out which kind of data's change event you want
	// to receive, such as the filter :
	// {"bk_obj_id":"biz"} means you can only receives the data that has this kv.
	// Note: the filter's key must be a exist document key filed in the collection's
	// document
	Filter map[string]interface{}

	// EventStruct is the point data struct that the event decoded into.
	// Note: must be a point value.
	EventStruct interface{}

	// Collection defines which collection you want you watch.
	Collection string

	// StartAfterToken describe where you want to watch the event.
	// Note: the returned event does'nt contains the token represented,
	// and will returns event just after this token.
	StartAfterToken *EventToken

	// Ensures that this watch will provide events that occurred after this timestamp.
	StartAtTime *TimeStamp
}

func (*Options) CheckSetDefault

func (opts *Options) CheckSetDefault() error

CheckSet check the legal of each option, and set the default value

type TimeStamp

type TimeStamp struct {
	// the most significant 32 bits are a time_t value (seconds since the Unix epoch)
	Sec uint32 `json:"sec"`
	// the least significant 32 bits are an incrementing ordinal for operations within a given second.
	Nano uint32 `json:"nano"`
}

type UpdateDescription

type UpdateDescription struct {
	// document's fields which is updated in a change stream
	UpdatedFields map[string]interface{} `json:"updatedFields" bson:"updatedFields"`
	// document's fields which is removed in a change stream
	RemovedFields []string `json:"removedFields" bson:"removedFields"`
}

type WatchOptions

type WatchOptions struct {
	Options
}

type Watcher

type Watcher struct {
	EventChan <-chan *Event
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL