Documentation ¶
Overview ¶
Package oplog provides a generic oplog/replication system for micro-services.
Most of the time, the oplog service is used thru the oplogd agent which uses this package. But in the case your application is written in Go, you may want to integrate at the code level.
You can find more information on the oplog service here: https://github.com/dailymotion/oplog
Index ¶
- Variables
- type Event
- type Filter
- type GenericEvent
- type LastID
- type OpLog
- func (oplog *OpLog) Append(op *Operation)
- func (oplog *OpLog) Diff(createMap map[string]OperationData, updateMap map[string]OperationData, ...) error
- func (oplog *OpLog) HasID(id LastID) (bool, error)
- func (oplog *OpLog) Ingest(ops <-chan *Operation, done <-chan bool)
- func (oplog *OpLog) LastID() (LastID, error)
- func (oplog *OpLog) Tail(lastID LastID, filter Filter, out chan<- GenericEvent, stop <-chan bool)
- type Operation
- type OperationData
- type OperationLastID
- type ReplicationLastID
- type SSEDaemon
- func (daemon *SSEDaemon) GetOps(w http.ResponseWriter, r *http.Request)
- func (daemon *SSEDaemon) PostOps(w http.ResponseWriter, r *http.Request)
- func (daemon *SSEDaemon) Run() error
- func (daemon *SSEDaemon) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (daemon *SSEDaemon) Status(w http.ResponseWriter, r *http.Request)
- type Stats
- type UDPDaemon
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var Version = "1.1.6"
Version contains the current version of OpLog
Functions ¶
This section is empty.
Types ¶
type GenericEvent ¶
GenericEvent is an interface used by the oplog to send different kinds of SSE compatible events
type LastID ¶
type LastID interface { // String returns the string representation of their value String() string // Time returns the embedded time Time() time.Time }
LastID defines an interface for different kinds of oplog id representations
type OpLog ¶
type OpLog struct { Stats *Stats // ObjectURL is a template URL to be used to generate reference URL to operation's objects. // The URL can use {{type}} and {{id}} template as follow: http://api.mydomain.com/{{type}}/{{id}}. // If not provided, no "ref" field will be included in oplog events. ObjectURL string // Number of object to fetch from the states collection on each iteration. // Too large pages may create lock contention on MongoDB, too small may slow // down the iteration. PageSize int // contains filtered or unexported fields }
OpLog allows to store and stream events to/from a Mongo database
func New ¶
New returns an OpLog connected to the given provided mongo URL. If the capped collection does not exists, it will be created with the max size defined by maxBytes parameter.
func (*OpLog) Append ¶
Append appends an operation into the OpLog
Example ¶
package main import ( "log" "time" "github.com/dailymotion/oplog" ) func main() { ol, err := oplog.New("mongodb://localhost/oplog", 1048576) if err != nil { log.Fatal(err) } op := oplog.NewOperation("insert", time.Now(), "123", "user", nil) ol.Append(op) }
Output:
func (*OpLog) Diff ¶
func (oplog *OpLog) Diff(createMap map[string]OperationData, updateMap map[string]OperationData, deleteMap map[string]OperationData) error
Diff finds which objects must be created or deleted in order to fix the delta
The createMap is a map pointing to all objects present in the source database. The function search of differences between the passed map and the oplog database and remove objects identical in both sides from the createMap and populate the deleteMap with objects that are present in the oplog database but not in the source database. If an object is present in both createMap and the oplog database but timestamp of the oplog object is earlier than createMap's, the object is added to the updateMap.
func (*OpLog) Ingest ¶
Ingest appends an operation into the OpLog thru a channel
Example ¶
package main import ( "log" "strconv" "time" "github.com/dailymotion/oplog" ) func main() { ol, err := oplog.New("mongodb://localhost/oplog", 1048576) if err != nil { log.Fatal(err) } ops := make(chan *oplog.Operation) done := make(chan bool, 1) go ol.Ingest(ops, nil) // Insert a large number of operations for i := 0; i < 1000; i++ { ops <- oplog.NewOperation("insert", time.Now(), strconv.FormatInt(int64(i), 10), "user", nil) } done <- true }
Output:
func (*OpLog) LastID ¶
LastID returns the most recently inserted operation id if any or nil if oplog is empty
func (*OpLog) Tail ¶
func (oplog *OpLog) Tail(lastID LastID, filter Filter, out chan<- GenericEvent, stop <-chan bool)
Tail tails all the new operations in the oplog and send the operation in the given channel. If the lastID parameter is given, all operation posted after this event will be returned.
If the lastID is a ReplicationLastID (unix timestamp in milliseconds), the tailing will start by replicating all the objects last updated after the timestamp.
Giving a lastID of 0 mean replicating all the stored objects before tailing the live updates.
The filter argument can be used to filter on some type of objects or objects with given parrents.
The create, update, delete events are streamed back to the sender thru the out channel
Example ¶
package main import ( "log" "os" "github.com/dailymotion/oplog" ) func main() { ol, err := oplog.New("mongodb://localhost/oplog", 1048576) if err != nil { log.Fatal(err) } ops := make(chan oplog.GenericEvent) stop := make(chan bool) // Tail all future events with no filters go ol.Tail(nil, oplog.Filter{}, ops, stop) // Read 100 events for i := 0; i < 100; i++ { op := <-ops op.WriteTo(os.Stdout) } // Stop the tail stop <- true }
Output:
type Operation ¶
type Operation struct { ID *bson.ObjectId `bson:"_id,omitempty"` Event string `bson:"event"` Data *OperationData `bson:"data"` }
Operation represents an operation stored in the OpLog, ready to be exposed as SSE.
func NewOperation ¶
func NewOperation(event string, time time.Time, objID, objType string, objParents []string) *Operation
NewOperation creates an new operation from given information.
The event argument can be one of "insert", "update" or "delete". The time defines the exact modification date of the object (must be the exact same time as stored in the database).
func (Operation) GetEventID ¶
GetEventID returns an SSE last event id for the operation
type OperationData ¶
type OperationData struct { Timestamp time.Time `bson:"ts" json:"timestamp"` Parents []string `bson:"p" json:"parents"` Type string `bson:"t" json:"type"` ID string `bson:"id" json:"id"` Ref string `bson:"-,omitempty" json:"ref,omitempty"` }
OperationData is the data part of the SSE event for the operation.
func (OperationData) Validate ¶
func (obd OperationData) Validate() error
Validate ensures an operation data has the right syntax
type OperationLastID ¶
OperationLastID represents an actual stored operation id
func (*OperationLastID) Fallback ¶
func (oid *OperationLastID) Fallback() LastID
Fallback tries to convert a "event" id into a "replication" id by extracting the timestamp part of the Mongo ObjectId. If the id is not a valid ObjectId, an error is returned.
func (OperationLastID) String ¶
func (oid OperationLastID) String() string
type ReplicationLastID ¶
type ReplicationLastID struct {
// contains filtered or unexported fields
}
ReplicationLastID represents a timestamp id allowing to hook into operation feed by time
func (ReplicationLastID) String ¶
func (rid ReplicationLastID) String() string
func (ReplicationLastID) Time ¶
func (rid ReplicationLastID) Time() time.Time
Time extract the time from the replication id
type SSEDaemon ¶
type SSEDaemon struct { // Password is the shared secret to connect to a password protected oplog. Password string // IngestPassword is the shared secret to connect to the HTTP ingest endpoint. IngestPassword string // FlushInterval defines the interval between flushes of the HTTP socket. FlushInterval time.Duration // HeartbeatTickerCount defines the number of FlushInterval with nothing to flush // is required before we send an heartbeat. HeartbeatTickerCount int8 // contains filtered or unexported fields }
SSEDaemon listens for events and send them to the oplog MongoDB capped collection
func NewSSEDaemon ¶
NewSSEDaemon creates a new HTTP server configured to serve oplog stream over HTTP using Server Sent Event protocol.
func (*SSEDaemon) GetOps ¶
func (daemon *SSEDaemon) GetOps(w http.ResponseWriter, r *http.Request)
GetOps exposes an SSE endpoint to stream operations
func (*SSEDaemon) PostOps ¶
func (daemon *SSEDaemon) PostOps(w http.ResponseWriter, r *http.Request)
PostOps exposes an endpoint to POST operations
type Stats ¶
type Stats struct { Status string // Total number of events recieved on the UDP interface EventsReceived *expvar.Int // Total number of events sent thru the SSE interface EventsSent *expvar.Int // Total number of events ingested into MongoDB with success EventsIngested *expvar.Int // Total number of events received on the UDP interface with an invalid format EventsError *expvar.Int // Total number of events discarded because the queue was full EventsDiscarded *expvar.Int // Current number of events in the ingestion queue QueueSize *expvar.Int // Maximum number of events allowed in the ingestion queue before discarding events QueueMaxSize *expvar.Int // Number of clients connected to the SSE API Clients *expvar.Int // Total number of SSE connections Connections *expvar.Int }
Stats stores all the statistics about the oplog
type UDPDaemon ¶
type UDPDaemon struct {
// contains filtered or unexported fields
}
UDPDaemon listens for events and send them to the oplog MongoDB capped collection
func NewUDPDaemon ¶
NewUDPDaemon create a deamon listening for operations over UDP
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
cmd
|
|
oplog-sync
The oplog-sync command performs a maintaince operation on the oplog database to keep it in sync with the source data.
|
The oplog-sync command performs a maintaince operation on the oplog database to keep it in sync with the source data. |
oplogd
The oplogd command is an agent listening on an UDP port for operations and exposing a HTTP SSE API.
|
The oplogd command is an agent listening on an UDP port for operations and exposing a HTTP SSE API. |