persist

package module
v0.0.0-...-e3ffa63 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 9, 2015 License: MIT Imports: 10 Imported by: 0

README

Persistence Log

Build Status Code Coverage License Godoc

This Golang package implements a persistence log very similar to a database replay log or write-ahead-log (WAL): before committing a change to a resource (arbitrary data structure) the app writes the new state of the resource to a log. If the app crashes, the log can be replayed in order to recreate all the resources. The trick is to rotate the log periodically so it doesn't grow indefinitely. This is what this persistence package implements.

One of the special features of this persistence log is that it does not define the set of operations that can be persisted to the log nor does it require storage beyond the typical streaming encoder/decoder buffers. In particular, it does not create a copy of the log or of the in-memory data structure in order to start a new log or create a snapshot. Instead it makes a callback into the app to enumerate the set of live objects.

Goals

  • persist changes that are committed to in-memory data structures/databases to a log
  • replay all the chanages in order to re-create the in-memory data structure
  • start a fresh stream periodically in order not to grow the log indefinitely
  • use the log in order to keep a replica server up-to-date
  • not be tied to disk and instead allow the log to go to a remote server/service

Some things this persistence layer does not do:

  • multi-resource updates (transactions) where multiple resources are written atomically
  • attempt to guarantee durability in the sense that once the log write completes the data is guaranteed to be on stable storage, such as disk

Model of operation

The way the persistence log functions is as follows. After opening the log, the application calls Update() for every change and passes it an opaque desrciption of the change. The Update function serializes the change and appends it to the log.

When the time to rotate the log comes, the persistence layer stops writing to the current log and starts a fresh log. The fresh log begins with a mixture of additional updates and an enumeration of all current resources. To generate this enumeration, the persistence layer makes a callback into the application which must traverse all live resources and call Update() with a "create" descriptor.

When/if the application asks the persistence layer to replay a log, the latter locates the last log for which the initialization callback has completed, sends all updates in that log to the application, and then opens any new incomplete log and replays the updates in that log as well.

One critical question is how to handle concurrency. The persistence layer is designed to allow concurrency while keeping the model as simple as possible. The primary requirement on the application is as follows:

  • When an application calls Update() for a resource it must guarantee that no concurrent call to Update() can happen for the same resource. This guarantees strict ordering on updates to a resource, i.e., it prevents updates from getting out of order. The mutual exclusion guarantee made by the application must apply to the initialization callback during which the application enumerates all resources as well.

Typically an application acquires a lock on a resource before mutating it. This makes it easy to satisfy this requirement by calling Update() while holding the lock on the resource.

An additional requirement arises if an application makes resource mutations and calls Update() concurrently with enumerating all resources in a log initialization callback. The persistence layer writes the updates to the new log as they come in from the application which means that on a replay is is likely that the application will receive an update to a resource before it has received the initialization descriptor written in the init callback for that resource. There are several ways to deal with this issue:

  • the application can acquire a global lock for the duration of the initialization callback, thereby preventing concurrent updates
  • during replay, the application can ignore updates to resources that have not yet been created knowing that eventually it will encounter a full create descriptor as written during the initialization callback
  • if the application creates log entries that involve multiple resources, for example, a log entry to increment resource A and decrement resource B, then during a replay it must cope with the situation where one of the resources has been created and the other one hasn't. If this example, It may be that A has been created by a prior replayed log entry and the app has to increment A, but B may not yet have been created and thus the decrement B has to be skipped without producing an error (the correct value of B will be created later during the replay).

Note that the persistence layer does not prescribe what the application passes it in an Update() call. One option is to pass a full copy of the resource being updated or a delete marker. In this case replaying the log consists of re-creating or updating each resource as it is fed back to the application and deleting existing resources when encountering a delete marker.

Concepts

Resources

A resource is an object that can be serialized to the log and that is the unit of atomic update in the application.

Logs

A log is the object to which changes to resources are persisted, and it has one or multiple destinations to which data is written. Each log has a primary destination, which is the destination from which a restore can be initiated. A log has the following operations:

  • create: creates a log object and names a primary destination
  • restore: reads the last log at the destination and replays all events, making callbacks into the application in order to recreate the state
  • update: records a change to a resource, i.e., writes the serialized version to the log
  • addDestination: adds a secondary destination, this will cause a log rotation

Sample code

// A sample resource type
type resourceType struct {
	Id    int64  // unique resource ID
	Field string // sample data field
}
// The set of resources in the application
var resources map[int64]resourceType  // all resources indexed by ID
var resourcesMutex mutex.Lock         // exclusive access to the resources map
var pLog persistence.Log              // persistence log for the resources
const (
	ResourceUpsert = iota // insert or update
	ResourceDelete
)
type ResourceLogEntry struct {
	Op  int          // ResourceUpsert or ResourceDelete
	Res resourceType
}

// Create function used within the application to create a new resource. It assumes
// that a unique Id has already been generated.
func createResource(data resourceType) error {
	// lock all resources
	resourcesMutex.Lock()
	defer resourcesMutex.Unlock()
	// check non-existence
	if _, ok := resources[data.Id]; ok {
		return fmt.Errorf("duplicate resource ID")
	}
	// write to log
	pLog.Update(ResourceLogEntry{Op: ResourceUpsert, Res: data})
	// insert into map
	resources[data.Id] = data
}

// Delete function used within the application to delete an existing resource.
func deleteResource(id int64) {
	// lock all resources
	resourcesMutex.Lock()
	defer resourcesMutex.Unlock()
	// check existence
	if _, ok := resources[id]; !ok {
		return fmt.Errorf("deleting non-existing resource")
	}
	// write to log
	pLog.Update(ResourceLogEntry{Op: ResourceDelete, Res: ResourceTpe{Id: id}})
	// delete from map
	delete(resources, id)
}

// Update function used within the application to update an existing resource
func updateResource(data resourceType) error {
	// lock all resources
	resourcesMutex.Lock()
	defer resourcesMutex.Unlock()
	// check existence
	if _, ok := resources[id]; !ok {
		return fmt.Errorf("updating non-existing resource")
	}
	// write to log
	pLog.Update(ResourceLogEntry{Op: ResourceUpsert, Res: data})
	// update map
	resources[data.Id] = data
}

// Callback from persistence log to enumerate all resources in order to start a fresh log
func enumerateResources() {
	// lock all resources
	resourcesMutex.Lock()
	defer resourcesMutex.Unlock()
	// iterate through the entire map
	for _, v := range resources {
		pLog.Update(v)
	}
}

// Callback from persistence log to replay a resource operation
func replayResource(logEntry interface{}) error {
	// lock all resources
	resourcesMutex.Lock()
	defer resourcesMutex.Unlock()
	// type cast log operation
	op, ok := logEntry.(ResourceLogEntry)
	if !ok {
		return fmt.Errorf("invalid replay record type")
	}
	// perform operation
	switch op.Op {
	case ResourceUpsert:
		resources[op.Res.Id] = op.Res
	case ResourceDelete:
		delete(resources, op.Res.Id)
	}
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Register

func Register(value interface{})

Register a type being written to the log, this must be called for each type passed to Write and for any type expected in an interface type inside an event. This calls gob.Register() internally, please see the gob docs

Types

type Log

type Log interface {
	// Output an event to the log, this uses gob serialization internally. If an error
	// occurs there is a serious problem with the log, for example, disk full or socket
	// disconnected from the log destination. If the application can unroll the mutations
	// it has performed it should do so and return its client and error. If, however, the
	// application cannot unroll it is recommended not to check for error here and continue
	// to operate optimistically. Once the log problem gets repaired the persist layer will
	// do a log rotation to ensure all live data is captured.
	Output(logEvent interface{}) error

	// SetSizeLimit determines when the persist layer should rotate logs. The default is
	// 10MB
	SetSizeLimit(bytes int)

	// AddDestination adds additional destinations to the Log (not yet implemented)
	SetSecondaryDestination(dest LogDestination) error

	// HealthCheck returns any persistent error encountered in persist that prevents it
	// from logging. If HealthCheck() returns an error then all Write() calls will return
	// the same error. If the problem is fixed the error will eventually go away again and
	// the log will be "repaired" by doing a rotation. The intent of the HealthCheck call
	// is for the application to be able to reject requests early if the logging is broken.
	HealthCheck() error

	// Stats returns a list of implementation dependent statistics as name->value
	Stats() map[string]float64
}

func NewLog

func NewLog(priDest LogDestination, client LogClient, logger log15.Logger) (Log, error)

NewLog reopens an existing log, replays all log entries, and then prepares to append to it. The call to NewLog completes once any necessary replay has completed.

type LogClient

type LogClient interface {
	// Replay is called by the persistence layer during log replay in order to replay
	// an individual log event. If Replay returns an error the replay is aborted and
	// produces the error.
	// Beware that it it possible that log events are replayed that contain mutations to
	// resources that have not been created yet, i.e. for which the event produced by
	// Enumerate has not been replayed yet. In those cases, Replay must ignore the event
	// because a subsequent create will be Replayed with the correct value for the
	// resource. Note that if events contain updates to multiple resources then some
	// of them may have been created and need updating while others may not have been created
	// and shouldn't be created/updated.
	Replay(logEvent interface{}) error

	// PersistAll is called by the persistence layer in order to enumerate all live resources
	// and persist them by making calls to Log.Write().
	// (If PersistAll encounters an error it's time to panic.)
	// PersistAll can run in parallel with new updates to resources however the application
	// must ensure that calls to Log.Write() are in the same order as PersistAll's reads
	// and other update's writes.
	PersistAll(pl Log)
}

LogClient is the interface the application needs to implement so the persist can call it back

type LogDestination

type LogDestination interface {
	// StartRotate() tells the dest to open a fresh log dest and start writing all output to
	// this new destination
	StartRotate() error
	// EndRotate() tells the dest that the fresh log dest has a complete snapshot and
	// thus is now "stand-alone" and older logs are no longer needed; this is called after
	// StartRotate() *and* after the initial registration of the log destination
	EndRotate() error
	// Writer writes to current (new) log
	io.Writer
	// ReplayReaders returns an arra of readers for each log that needs to be replayed
	// in sequence
	ReplayReaders() []io.ReadCloser
	// Close ends the entire log writing and offers a way to cleanly flush and close
	Close()
}

A log destination represents something the persist layer can write log entries to, and then replay them in the future. A "New" function is expected to exist for each type of log destination in order to open/create it. At open time, the writer must work, and if there is an old log to replay the reader must work too.

func NewFileDest

func NewFileDest(basepath string, create bool, log log15.Logger) (LogDestination, error)

NewFileDest creates or opens a file for logging. The basepath must not contain any character in the set '*', '?', '[', '\', or '.'. The individual log file names will have a -<timestamp> and possibly a <-new>, <-curr>, and '.plog' extension appended. The create argument determines whether it's OK to create a new set of log files or whether an existing set is expected to be found.

func NewNoopDest

func NewNoopDest(log log15.Logger) (LogDestination, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL