agree

package module
v0.0.0-...-74f97b8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2016 License: MIT Imports: 18 Imported by: 0

README

Go Agree

Go Report Card Build Status MIT licensed

Godoc

Go-Agree is a proof-of-concept package that helps you create consistent replicated data structures using Raft.

For any Gob-marshallable interface you provide (with Gob-marshallable method args), Go-Agree will give you a wrapper to

  • apply commands that mutate the value of the interface (invoking the relevant methods on your interface through reflection)
  • create/restore snapshots (stored in BoltDB)
  • forward commands to the Raft leader (via net/rpc)
  • inspect the interface at any time or subscribe to mutations of its value

In the future it may also help you deal with partitioned data structures, probably using Blance.

This is at a proof-of-concept stage. Breaking changes to the API may occur. Suggestions, bug reports and PRs welcome.

Tutorial

Step 1: Code up the data structure that you want to distribute

Here we're going to create a very simple key-value store.


type KVStore map[string]string  // The type you want to replicate via Raft


/* Some methods we want to define on our type */

// Set sets the key `key` to value `value`
func (k KVStore) Set(key string, value string) {
	k[key] = value	// Note: no locking necessary
}

// Get returns the value stored at `key` - blank if none. The bool return parameter
// is used to indicate whether the key was present in the map.
func (k KVStore) Get(key string) (string, bool) {
	return k[key]  // Note: no locking necessary
}

Step 2: Wrap your type

Now you want a simple way to turn this into a distributed type that consistently replicates changes to all participating nodes in your cluster.

Go-agree wraps your type in a wrapper that takes care of that:


w, err := agree.Wrap(make(KVStore), &agree.Config{})

Read the godoc for configuration options - they include things like addresses of other nodes in the cluster and Raft configuration.

Step 3: Making mutations and observing the result

Mutating the value

Now you have a wrapper, we can either mutate it or observe mutations. We're basically creating a simple key-value store here, so let's set a value:


err := w.Mutate("Set", "key", "value")

Go-agree knows that your type had a "Set" method and invokes it for you, dealing with things like figuring out who the leader of the cluster is and storing snapshots of your type's value in case bad things happen.

You should only mutate your value with the "Mutate" method, not by calling your interface's methods directly

Reading the value

At any time you can read the value, that is, invoke non-mutating methods on your interface. Go-agree takes care of locking and forwarding the read to the Raft leader so you don't need to.

There are three supported consistency levels for this:

  • Any: Will invoke the method on the local node's copy of the value, which may be stale
  • Leader: Will invoke the method on the local node if it thinks it is a leader, but will not check this (so there is a small window of inconsistency)
  • Consistent: Linearizable. Will check to ensure that the node that thinks it is a leader is still a leader and will return an error if not (in that case you should retry).
val, err := w.Read("Get", agree.Consistent, "key")
if err == nil {
	fmt.Println(len(val.(string)))
} else {
	fmt.Println("Error: ", err)	
}

Your method must either return one value or two values (of which the second must be of Error type).

If you retained a pointer to your interface before you wrapped it, you can also do something like this (just make sure to lock the wrapper when doing this):

kv := make(KVStore)

w, _ := agree.Wrap(KVStore, &agree.Config{})

w.Mutate("Set", "key", "value")

w.RLock()
fmt.Println(kv["key"]) //prints "value"
w.RUnlock()

Please note that if you do this then you may be reading stale data.

Observing Mutations

You can subscribe to mutations, receiving notifications by executing a callback function of your choice or through a channel.

	//Subscribe to mutations via callback function. No locking necessary.
	w.SubscribeFunc("Set", function(m agree.Mutation){
		fmt.Println("[func] KV store changed: ", m.NewValue)
	})

Have a look at the Godoc for the structure of the Mutation type - it also tells you how the mutation occurred.

If you subscribe via a channel, Go-agree doesn't know when you'll access the value so you need to take care of locking yourself:

	//Subscribe to mutations via channel.
	c := make(chan agree.Mutation, 3)
	w.SubscribeChan("Set", c)
	
	for {
		mutation := <-c
		c.RLock() // If subscribing via channel, make sure to RLock()/RUnlock() the wrapper.
		fmt.Println("KV store changed: ", mutation)
		c.RUnlock()
	}

Cluster Membership Changes

To add a node to the cluster after you have Wrap()'ed your interface, use AddNode(). The node should be ready to receive Raft commands at that address.

	err := w.AddNode("localhost:2345")

To remove a node from the cluster after you have Wrap()'ed your interface, use RemoveNode():

	err := w.RemoveNode("localhost:2345")

Full Example (Key-value store)

Here is the full working example for a single-node cluster. Hopefully it is obvious how to extend it to multi-node with the appropriate configuration but if not please take a look in the "examples" directory.


package main

import 

(
	agree "github.com/michaelbironneau/go-agree"
	"fmt"
	"time"
)

type KVStore map[string]string

// Set sets the key `key` to value `value`
func (k KVStore) Set(key string, value string) {
	k[key] = value	// Note: no locking necessary
}

// Get returns the value stored at `key` - blank if none. The bool return parameter
// is used to indicate whether the key was present in the map.
func (k KVStore) Get(key string) (string, bool) {
	return k[key]  // Note: no locking necessary
}

func main(){
	
	//Start single-node Raft. Will block until a leader is elected.
	w, err := agree.Wrap(make(KVStore), &agree.Config{})

	if err != nil {
		fmt.Println("Error wrapping: ", err)
		return
	}
	
	//Set some keys
	w.Mutate("Set", "key", "value")
	w.Mutate("Set", "key2", "value2")
	
	//Get some values back with 'Any' consistency level.
	v,_ := w.Read("Get", agree.Any, "key2")
	fmt.Println(v)
	
	//Subscribe to mutations via channel.
	c := make(chan agree.Mutation, 3)
	w.SubscribeChan("Set", c)
	
	//Subscribe to mutations via callback function. No locking necessary.
	w.SubscribeFunc("Set", function(m agree.Mutation){
		fmt.Println("[func] KV store changed: ", m)
	})
	
	w.Mutate("Set", "key3", "value3")
	go w.Mutate("Set", "Key", "value4")
	
	for {
		mutation := <-c
		w.RLock() // If subscribing via channel, make sure to RLock()/RUnlock() the wrapper if you access NewValue field.
		fmt.Println("New value: ", mutation.NewValue)
		w.RUnlock()
		fmt.Println("It was mutated by calling ", mutation.Method)
		fmt.Println("...which was invoked with args ", mutation.MethodArgs)
	}
			
}


Limitations and Caveats

There are a few things you should know:

  • The interface{} you Wrap() should be Gob-marshallable and JSON-marshallable. All of its methods should have Gob-marshallable and JSON-marshallable arguments.
  • You can Wrap() multiple interface{}s. However, cannot currently Wrap() multiple interfaces of the same type. If you attempt this the second Wrap() call will probably block. You can work around it by aliasing the two types type T2 T1.
  • Go-Agree listens on two ports per wrapped interface: the Raft port (default 8080 - you can override it) and the command forwarding port (=Raft port + 1).

How it works

I'm using Hashicorp's Raft API as described in the excellent tutorial here.

Reflection is used to map commit log entries to method invocations and net/rpc to forward commands to the leader.

Documentation

Overview

Package agree helps you distribute any data structure using Raft.

Index

Constants

This section is empty.

Variables

View Source
var (
	//ErrMethodNotFound is the error that is returned if you try to apply a method that the type does not have.
	ErrMethodNotFound = errors.New("Cannot apply the method as it was not found")

	//DefaultRaftDirectory is the default directory where raft files should be stored.
	DefaultRaftDirectory = "."

	//DefaultRetainSnapshotCount is the number of Raft snapsnots that will be retained.
	DefaultRetainSnapshotCount = 2
)
View Source
var (
	//ErrNotLeader is returned when a Command is mistakenly sent to a follower. You should never receive this as Go-Agree takes care of following commands to the leader.
	ErrNotLeader = errors.New("Commands should be sent to leader and cannot be sent to followers")

	//ErrIncorrectType is returned when a Raft snapshot cannot be unmarshalled to the expected type.
	ErrIncorrectType = errors.New("Snapshot contained data of an incorrect type")

	//ErrTooManyalues is returned when a Read() method returns more than one value (plus optional error return)
	ErrTooManyalues = errors.New("Method returned too many values")
)

Functions

This section is empty.

Types

type Callback

type Callback func(m Mutation)

Callback is a callback function that is invoked when you subscribe to mutations using Wrapper.SusbcribeFunc() and a mutation occurs. The args contain the details of the mutation that just occurred.

type Command

type Command struct {
	ConsistencyLevel int
	Method           string
	Args             []interface{}
}

Command represents a mutating Command (log entry) in the Raft commit log.

type Config

type Config struct {
	Peers      []string     // List of peers. Peers' raft ports can be different but the forwarding port must be the same for each peer in the cluster.
	RaftConfig *raft.Config // Raft configuration, see github.com/hashicorp/raft. Default raft.DefaultConfig()

	RaftBind            string // Where to bind Raft, default ":8080"
	RaftDirectory       string // Where Raft files will be stored
	RetainSnapshotCount int    // How many Raft snapshots to retain
	// contains filtered or unexported fields
}

Config is a configuration struct that is passed to Wrap(). It specifies Raft settings and command forwarding port.

type ConsistencyLevel

type ConsistencyLevel int

ConsistencyLevel describes how consistent we want our reads to be

const (
	//Any means that stale reads are allowed
	Any ConsistencyLevel = iota

	//Leader means that there is a short window of inconsistency but requires no network round trip to verify leadership
	Leader ConsistencyLevel = iota

	//Consistent means a linearizable read. It requires a network round trip on every read.
	Consistent ConsistencyLevel = iota
)

type ForwardingClient

type ForwardingClient struct {
	// contains filtered or unexported fields
}

ForwardingClient is a client that forwards commands to the Raft leader. Should not be used, the only reason it is exported is because the rpc package requires it.

func (*ForwardingClient) AddPeer

func (r *ForwardingClient) AddPeer(addr string, reply *int) error

AddPeer accepts a forwarded request to add a peer, sent to the Raft leader.

func (*ForwardingClient) Apply

func (r *ForwardingClient) Apply(cmd []byte, reply *int) error

Apply forwards the given mutating Command to the Raft leader.

func (*ForwardingClient) Read

func (r *ForwardingClient) Read(cmd []byte, reply interface{}) error

Read forwards the given read to the Raft leader.

func (ForwardingClient) RemovePeer

func (r ForwardingClient) RemovePeer(addr string, reply *int) error

RemovePeer accepts a forwarded request to remove a peer, sent to the Raft leader.

type Mutation

type Mutation struct {
	NewValue   interface{}   // The new, mutated wrapped value
	Method     string        // The name of the method passed to Mutate()
	MethodArgs []interface{} // The arguments the method was called with
}

Mutation is passed to observers to notify them of mutations. Observers should not mutate NewValue.

type Wrapper

type Wrapper struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Wrapper is a wrapper for the datastructure you want to distribute. It inherics from sync/RWMutex and if you retained a pointer to the interface before you passed it to Wrap(), you should RLock()/RUnlock() the wrapper whenever you access the interface's value outside Go-Agree's helper methods.

func Wrap

func Wrap(i interface{}, c *Config) (*Wrapper, error)

Wrap returns a wrapper for your type. Type methods should have JSON-marshallable arguments.

func (*Wrapper) AddNode

func (w *Wrapper) AddNode(addr string) error

AddNode adds a node, located at addr, to the cluster. The node must be ready to respond to Raft commands at the address.

func (*Wrapper) Marshal

func (w *Wrapper) Marshal() ([]byte, error)

Marshal marshals the wrapper's value using encoding/json.

func (*Wrapper) Mutate

func (w *Wrapper) Mutate(method string, args ...interface{}) error

Mutate performs an operation that mutates your data.

func (*Wrapper) Read

func (w *Wrapper) Read(method string, c ConsistencyLevel, args ...interface{}) (interface{}, error)

Read invokes the specified method on the wrapped interface, at the given consistency level. The invoked method should return a single value or a single value followed by an error. The method should **not** mutate the value of the wrapper as this mutation is not committed to the Raft log. To mutate the value use Mutate() instead.

func (*Wrapper) RemoveNode

func (w *Wrapper) RemoveNode(addr string) error

RemoveNode removes a node, located at addr, from the cluster.

func (*Wrapper) SubscribeChan

func (w *Wrapper) SubscribeChan(method string, c chan *Mutation)

SubscribeChan sends values to the returned channel when the underlying structure is mutated. The callback should not mutate the interface or strange things will happen.

func (*Wrapper) SubscribeFunc

func (w *Wrapper) SubscribeFunc(method string, f Callback)

SubscribeFunc executes the `Callback` func when the distributed object is mutated by applying `Mutate` on `method`. The callback should not mutate the interface or strange things will happen.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL