boltdb

package module
v0.0.0-...-d2b934b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2018 License: MIT Imports: 9 Imported by: 0

README

Proto Actor [Go] - BoltDB persistence provider

Go Report Card

Go package with persistence provider for Proto Actor (Go) based on BoltDB.

Get started

Install package

go get github.com/artyomturkin/protoactor-go-persistence-boltdb

Create event and snapshot proto messages. Messages can be named differently.

syntax = "proto3";
package main;

message Event {
    string state = 1;
}

message Snapshot {
    string state = 1;
}

Generate go types from proto file

protoc --gogoslick_out=. test.proto

Create DataStore implementation

type exampleDataStore struct {
	providerState persistence.ProviderState
}

func (p *exampleDataStore) GetState() persistence.ProviderState {
	return p.providerState
}

func newExampleDataStore(snapshotInterval int, db *bolt.DB) *exampleDataStore {
	return &exampleDataStore{
		providerState: boltdb.NewBoltProvider(snapshotInterval, db),
	}
}

Create Actor with persistence implementation

type exampleActor struct {
	persistence.Mixin

	state string
}

//Command to print state with external sync
type Print struct {
	wg *sync.WaitGroup
}

//ensure exampleActor is actor.Actor
var _ actor.Actor = (*exampleActor)(nil)

func (a *exampleActor) Receive(ctx actor.Context) {
	switch msg := ctx.Message().(type) {
	case *persistence.RequestSnapshot:
		// PersistSnapshot when requested
		a.PersistSnapshot(&Snapshot{State: a.state})
	case *Snapshot:
		// Restore from Snapshot
		a.state = msg.State
	case *Event:
		// Persist all events received outside of recovery
		if !a.Recovering() {
			a.PersistReceive(msg)
		}
		// Set state to whatever message says
		a.state = msg.State
	case *Print:
		fmt.Printf("State is %s\n", a.state)
		msg.wg.Done()
	}
}

Create actor.Producer implementation

func exampleActorProducer() actor.Actor {
	return &exampleActor{}
}

In open bolt database and pass it to actor.Props

func main() {
	db, err := bolt.Open(tempPath, 0666, nil)
	if err != nil {
		panic(err)
	}

	props := actor.FromProducer(exampleActorProducer).
		WithMiddleware(
			persistence.Using(newExampleDataStore(4, db)),
		)
	............
}

Spawn actor and send it some events to set state, ask actor to print its state

func main() {
	............
	pid, err := actor.SpawnNamed(props, "example.actor")
	if err != nil {
		panic(err)
	}

	//Send some messages to persist
	for index := 0; index < 10; index++ {
		pid.Tell(&Event{State: fmt.Sprintf("event-%d", index)})
	}

	//tell actor to print its state
	wg.Add(1)
	pid.Tell(&Print{wg: wg})
	wg.Wait()
	............
}

Stop actor and db, then restart everything

func main() {
	............
	//stop actor, close db and reopen it
	pid.GracefulPoison()
	db.Close()

	//reopen db and respawn actor
	db, err = bolt.Open(tempPath, 0666, nil)
	if err != nil {
		panic(err)
	}

	props = actor.FromProducer(exampleActorProducer).
		WithMiddleware(
			persistence.Using(newExampleDataStore(4, db)),
		)

	pid, err = actor.SpawnNamed(props, "example.actor")
	if err != nil {
		panic(err)
	}

	//tell actor to print its state
	wg.Add(1)
	pid.Tell(&Print{wg: wg})
	wg.Wait()

	//stop actor, close db and reopen it
	pid.GracefulPoison()
	db.Close()
}

Running example will output:

State is event-9
State is event-9

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BoltProvider

type BoltProvider struct {
	// contains filtered or unexported fields
}

BoltProvider persistence provider built on top of boltdb

Example
package main

import (
	"fmt"
	"os"
	"sync"

	"github.com/AsynkronIT/protoactor-go/actor"
	"github.com/AsynkronIT/protoactor-go/persistence"
	"github.com/artyomturkin/protoactor-go-persistence-boltdb"
	"github.com/boltdb/bolt"
)

// DataStore
type exampleDataStore struct {
	providerState persistence.ProviderState
}

func (p *exampleDataStore) GetState() persistence.ProviderState {
	return p.providerState
}

func newExampleDataStore(snapshotInterval int, db *bolt.DB) *exampleDataStore {
	return &exampleDataStore{
		providerState: boltdb.NewBoltProvider(snapshotInterval, db),
	}
}

// Actor
type exampleActor struct {
	persistence.Mixin

	state string
}

// Command to print state with external sync
type Print struct {
	wg *sync.WaitGroup
}

// ensure exampleActor is actor.Actor
var _ actor.Actor = (*exampleActor)(nil)

func (a *exampleActor) Receive(ctx actor.Context) {
	switch msg := ctx.Message().(type) {
	case *persistence.RequestSnapshot:
		// PersistSnapshot when requested
		a.PersistSnapshot(&Snapshot{State: a.state})
	case *Snapshot:
		// Restore from Snapshot
		a.state = msg.State
	case *Event:
		// Persist all events received outside of recovery
		if !a.Recovering() {
			a.PersistReceive(msg)
		}
		// Set state to whatever message says
		a.state = msg.State
	case *Print:
		fmt.Printf("State is %s\n", a.state)
		msg.wg.Done()
	}
}

func exampleActorProducer() actor.Actor {
	return &exampleActor{}
}

func main() {
	tempPath := tempfile()
	defer os.Remove(tempPath)
	db, err := bolt.Open(tempPath, 0666, nil)
	if err != nil {
		panic(err)
	}

	wg := &sync.WaitGroup{}

	props := actor.FromProducer(exampleActorProducer).
		WithMiddleware(
			persistence.Using(newExampleDataStore(4, db)),
		)

	pid, err := actor.SpawnNamed(props, "example.actor")
	if err != nil {
		panic(err)
	}

	//Send some messages to persist
	for index := 0; index < 10; index++ {
		pid.Tell(&Event{State: fmt.Sprintf("event-%d", index)})
	}

	//tell actor to print its state
	wg.Add(1)
	pid.Tell(&Print{wg: wg})
	wg.Wait()

	//stop actor, close db and reopen it
	pid.GracefulPoison()
	db.Close()

	//reopen db and respawn actor
	db, err = bolt.Open(tempPath, 0666, nil)
	if err != nil {
		panic(err)
	}

	props = actor.FromProducer(exampleActorProducer).
		WithMiddleware(
			persistence.Using(newExampleDataStore(4, db)),
		)

	pid, err = actor.SpawnNamed(props, "example.actor")
	if err != nil {
		panic(err)
	}

	//tell actor to print its state
	wg.Add(1)
	pid.Tell(&Print{wg: wg})
	wg.Wait()

	//stop actor, close db and reopen it
	pid.GracefulPoison()
	db.Close()

}
Output:

State is event-9
State is event-9

func NewBoltProvider

func NewBoltProvider(snapshotInterval int, db *bolt.DB) *BoltProvider

NewBoltProvider create new persistence provider with boltdb backend

func (*BoltProvider) GetEvents

func (provider *BoltProvider) GetEvents(actorName string, eventIndexStart int, callback func(e interface{}))

GetEvents execute callback for each event in store for actor after event index

func (*BoltProvider) GetSnapshot

func (provider *BoltProvider) GetSnapshot(actorName string) (snapshot interface{}, eventIndex int, ok bool)

GetSnapshot returns last snapshot and event index

func (*BoltProvider) GetSnapshotInterval

func (provider *BoltProvider) GetSnapshotInterval() int

GetSnapshotInterval returns snapshot interval for provider

func (*BoltProvider) PersistEvent

func (provider *BoltProvider) PersistEvent(actorName string, eventIndex int, event proto.Message)

PersistEvent save event into the store

func (*BoltProvider) PersistSnapshot

func (provider *BoltProvider) PersistSnapshot(actorName string, eventIndex int, snapshot proto.Message)

PersistSnapshot saves snapshot and event index

func (*BoltProvider) Restart

func (provider *BoltProvider) Restart()

Restart does who lnows what

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL