Documentation ¶
Overview ¶
Package oplog provides a generic oplog/replication system for REST APIs.
Most of the time, the oplog service is used thru the oplogd agent which uses this package. But in the case your application is written in Go, you may want to integrate at the code level.
Index ¶
- type GenericEvent
- type ObjectState
- type OpLog
- func (oplog *OpLog) Append(op *Operation, db *mgo.Database)
- func (oplog *OpLog) DB() *mgo.Database
- func (oplog *OpLog) Diff(createMap OperationDataMap, updateMap OperationDataMap, ...) error
- func (oplog *OpLog) HasId(id string) bool
- func (oplog *OpLog) Ingest(ops <-chan *Operation)
- func (oplog *OpLog) LastId() string
- func (oplog *OpLog) Tail(lastId string, filter OpLogFilter, out chan<- io.WriterTo, err chan<- error)
- type OpLogEvent
- type OpLogFilter
- type Operation
- type OperationData
- type OperationDataMap
- type SSEDaemon
- type Stats
- type UDPDaemon
- type UDPOperation
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type GenericEvent ¶
type ObjectState ¶
type ObjectState struct { Id string `bson:"_id,omitempty" json:"id"` Event string `bson:"event"` Data *OperationData `bson:"data"` }
ObjectState is the current state of an object given the most recent operation applied on it
func (ObjectState) GetEventId ¶
func (obj ObjectState) GetEventId() string
GetEventId returns an SSE event id as string for the object state
type OpLog ¶
type OpLog struct { Stats *Stats // ObjectURL is a template URL to be used to generate reference URL to operation's objects. // The URL can use {{type}} and {{id}} template as follow: http://api.mydomain.com/{{type}}/{{id}}. // If not provided, no "ref" field will be included in oplog events. ObjectURL string // contains filtered or unexported fields }
func New ¶
New returns an OpLog connected to the given provided mongo URL. If the capped collection does not exists, it will be created with the max size defined by maxBytes parameter.
func (*OpLog) Append ¶
Append appends an operation into the OpLog
If the db parameter is not nil, the passed db connection is used. In case of error, the db pointer may be replaced by a new alive session.
func (*OpLog) DB ¶
func (oplog *OpLog) DB() *mgo.Database
DB returns the Mongo database object used by the oplog
func (*OpLog) Diff ¶
func (oplog *OpLog) Diff(createMap OperationDataMap, updateMap OperationDataMap, deleteMap OperationDataMap) error
Diff finds which objects must be created or deleted in order to fix the delta
The createMap is a map pointing to all objects present in the source database. The function search of differences between the passed map and the oplog database and remove objects identical in both sides from the createMap and populate the deleteMap with objects that are present in the oplog database but not in the source database. If an object is present in both createMap and the oplog database but timestamp of the oplog object is earlier than createMap's, the object is added to the updateMap.
func (*OpLog) LastId ¶
LastId returns the most recently inserted operation id if any or "" if oplog is empty
func (*OpLog) Tail ¶
func (oplog *OpLog) Tail(lastId string, filter OpLogFilter, out chan<- io.WriterTo, err chan<- error)
Tail tails all the new operations in the oplog and send the operation in the given channel. If the lastId parameter is given, all operation posted after this event will be returned.
If the lastId is a unix timestamp in milliseconds, the tailing will start by replicating all the objects last updated after the timestamp.
Giving a lastId of 0 mean replicating all the stored objects before tailing the live updates.
The filter argument can be used to filter on some type of objects or objects with given parrents.
The create, update, delete events are streamed back to the sender thru the out channel with error sent thru the err channel.
type OpLogEvent ¶
OpLogEvent is used to send "technical" events with no data like "reset" or "live"
func (OpLogEvent) GetEventId ¶
func (e OpLogEvent) GetEventId() string
GetEventId returns an SSE event id
type OpLogFilter ¶
type Operation ¶
type Operation struct { Id *bson.ObjectId `bson:"_id,omitempty"` Event string `bson:"event"` Data *OperationData `bson:"data"` }
Operation represents an operation stored in the OpLog, ready to be exposed as SSE.
func (Operation) GetEventId ¶
GetEventId returns an SSE event id as string for the operation
type OperationData ¶
type OperationData struct { Timestamp time.Time `bson:"ts" json:"timestamp"` Parents []string `bson:"p" json:"parents"` Type string `bson:"t" json:"type"` Id string `bson:"id" json:"id"` Ref string `bson:"-,omitempty" json:"ref,omitempty"` }
OperationData is the data part of the SSE event for the operation.
func (OperationData) GetId ¶
func (obd OperationData) GetId() string
func (OperationData) Validate ¶
func (obd OperationData) Validate() error
type OperationDataMap ¶
type OperationDataMap map[string]OperationData
type SSEDaemon ¶
type SSEDaemon struct { // Password is the shared secret to connect to a password protected oplog. Password string // contains filtered or unexported fields }
SSEDaemon listens for events and send them to the oplog MongoDB capped collection
func NewSSEDaemon ¶
type Stats ¶
type Stats struct { Status string `json:"status"` // Total number of events recieved on the UDP interface EventsReceived *counter `json:"events_received"` // Total number of events ingested into MongoDB with success EventsIngested *counter `json:"events_ingested"` // Total number of events received on the UDP interface with an invalid format EventsError *counter `json:"events_error"` // Total number of events discarded because the queue was full EventsDiscarded *counter `json:"events_discarded"` // Current number of events in the ingestion queue QueueSize *gauge `json:"queue_size"` // Maximum number of events allowed in the ingestion queue before discarding events QueueMaxSize *gauge `json:"queue_max_size"` // Number of clients connected to the SSE API Clients *gauge `json:"clients"` }
Stats stores all the statistics about the oplog
type UDPDaemon ¶
type UDPDaemon struct {
// contains filtered or unexported fields
}
UDPDaemon listens for events and send them to the oplog MongoDB capped collection
func NewUDPDaemon ¶
Directories ¶
Path | Synopsis |
---|---|
cmd
|
|
oplog-sync
The oplog-sync command performs a maintaince operation on the oplog database to keep it in sync with the source data.
|
The oplog-sync command performs a maintaince operation on the oplog database to keep it in sync with the source data. |
oplog-tail
The oplog-tail command is a example implementation of the Go oplog consumer library.
|
The oplog-tail command is a example implementation of the Go oplog consumer library. |
oplogd
The oplogd command is an agent listening on an UDP port for operations and exposing a HTTP SSE API.
|
The oplogd command is an agent listening on an UDP port for operations and exposing a HTTP SSE API. |
Package consumer provides an easy to use client interface for the oplog service.
|
Package consumer provides an easy to use client interface for the oplog service. |