paxi

package module
v0.0.0-...-6823d0b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2023 License: MIT Imports: 26 Imported by: 0

README

GoDoc Go Report Card Build Status

What is Paxi?

Paxi is the framework that implements WPaxos and other Paxos protocol variants. Paxi provides most of the elements that any Paxos implementation or replication protocol needs, including network communication, state machine of a key-value store, client API and multiple types of quorum systems.

Warning: Paxi project is still under heavy development, with more features and protocols to include. Paxi API may change too.

Paxi paper (SIGMOD) can be found in https://dl.acm.org/doi/abs/10.1145/3299869.3319893. BibTex:

@inproceedings{ailijiang2019dissecting,
  title={Dissecting the Performance of Strongly-Consistent Replication Protocols},
  author={Ailijiang, Ailidani and Charapko, Aleksey and Demirbas, Murat},
  booktitle={Proceedings of the 2019 International Conference on Management of Data},
  pages={1696--1710},
  year={2019}
}

What is WPaxos?

WPaxos is a multileader Paxos protocol that provides low-latency and high-throughput consensus across wide-area network (WAN) deployments. Unlike statically partitioned multiple Paxos deployments, WPaxos perpetually adapts to the changing access locality through object stealing. Multiple concurrent leaders coinciding in different zones steal ownership of objects from each other using phase-1 of Paxos, and then use phase-2 to commit update-requests on these objects locally until they are stolen by other leaders. To achieve fast phase-2 commits, WPaxos adopts the flexible quorums idea in a novel manner, and appoints phase-2 acceptors to be close to their respective leaders.

WPaxos (WAN Paxos) paper (TPDS journal version) can be found in https://ieeexplore.ieee.org/abstract/document/8765834. BibTex:

@article{ailijiang2019wpaxos,
  title={WPaxos: Wide area network flexible consensus},
  author={Ailijiang, Ailidani and Charapko, Aleksey and Demirbas, Murat and Kosar, Tevfik},
  journal={IEEE Transactions on Parallel and Distributed Systems},
  volume={31},
  number={1},
  pages={211--223},
  year={2019},
  publisher={IEEE}
}

What is included?

Algorithms:

Features:

  • Benchmarking
  • Linerizability checker
  • Fault injection

How to build

  1. Install Go.
  2. Use go get command or Download Paxi source code from GitHub page.
go get github.com/ailidani/paxi
  1. Compile everything from paxi/bin folder.
cd github.com/ailidani/paxi/bin
./build.sh

After compile, Golang will generate 3 executable files under bin folder.

  • server is one replica instance.
  • client is a simple benchmark that generates read/write reqeust to servers.
  • cmd is a command line tool to test Get/Set requests.

How to run

Each executable file expects some parameters which can be seen by -help flag, e.g. ./server -help.

  1. Create the configuration file according to the example, then start server with -config FILE_PATH option, default to "config.json" when omit.

  2. Start 9 servers with different ids in format of "ZONE_ID.NODE_ID".

./server -id 1.1 -algorithm=paxos &
./server -id 1.2 -algorithm=paxos &
./server -id 1.3 -algorithm=paxos &
./server -id 2.1 -algorithm=paxos &
./server -id 2.2 -algorithm=paxos &
./server -id 2.3 -algorithm=paxos &
./server -id 3.1 -algorithm=paxos &
./server -id 3.2 -algorithm=paxos &
./server -id 3.3 -algorithm=paxos &
  1. Start benchmarking client that connects to server ID 1.1 and benchmark parameters specified in config.json.
./client -id 1.1 -config config.json

When flag id is absent, client will randomly select any server for each operation.

The algorithms can also be running in simulation mode, where all nodes are running in one process and transport layer is replaced by Go channels. Check simulation.sh script on how to run.

How to implement algorithms in Paxi

Replication algorithm in Paxi follows the message passing model, where several message types and their handle function are registered. We use Paxos as an example for our step-by-step tutorial.

  1. Define messages, register with gob in init() function if using gob codec. As show in msg.go.

  2. Define a Replica structure embeded with paxi.Node interface.

type Replica struct {
	paxi.Node
	*Paxos
}

Define handle function for each message type. For example, to handle client Request

func (r *Replica) handleRequest(m paxi.Request) {
	if *adaptive {
		if r.Paxos.IsLeader() || r.Paxos.Ballot() == 0 {
			r.Paxos.HandleRequest(m)
		} else {
			go r.Forward(r.Paxos.Leader(), m)
		}
	} else {
		r.Paxos.HandleRequest(m)
	}

}
  1. Register the messages with their handle function using Node.Register(interface{}, func()) interface in Replica constructor.

Replica use Send(to ID, msg interface{}), Broadcast(msg interface{}) functions in Node.Socket to send messages.

For data-store related functions check db.go file.

For quorum types check quorum.go file.

Client uses a simple RESTful API to submit requests. GET method with URL "http://ip:port/key" will read the value of given key. POST method with URL "http://ip:port/key" and body as the value, will write the value to key.

Documentation

Index

Constants

View Source
const (
	HTTPClientID  = "Id"
	HTTPCommandID = "Cid"
	HTTPTimestamp = "Timestamp"
	HTTPNodeID    = "Id"
)

http request header names

Variables

This section is empty.

Functions

func Conflict

func Conflict(gamma *Command, delta *Command) bool

Conflict checks if two commands are conflicting as reorder them will end in different states

func ConflictBatch

func ConflictBatch(batch1 []Command, batch2 []Command) bool

ConflictBatch checks if two batchs of commands are conflict

func ConnectToMaster

func ConnectToMaster(addr string, client bool, id ID)

ConnectToMaster connects to master node and set global Config

func Init

func Init()

Init setup paxi package

func Max

func Max(a, b int) int

Max of two int

func NextBallot

func NextBallot(ballot int, id ID) int

NextBallot generates next ballot number given current ballot bumber and node id

func Retry

func Retry(f func() error, attempts int, sleep time.Duration) error

Retry function f sleep time between attempts

func Schedule

func Schedule(f func(), delay time.Duration) chan bool

Schedule repeatedly call function with intervals

func Simulation

func Simulation()

Simulation enable go channel transportation to simulate distributed environment

func VMax

func VMax(v ...int) int

VMax of a vector

Types

type AdminClient

type AdminClient interface {
	Consensus(Key) bool
	Crash(ID, int)
	Drop(ID, ID, int)
	Partition(int, ...ID)
}

AdminClient interface provides fault injection opeartion

type Ballot

type Ballot uint64

Ballot is ballot number type combines 32 bits of natual number and 32 bits of node id into uint64

func NewBallot

func NewBallot(n int, id ID) Ballot

NewBallot generates ballot number in format <n, zone, node>

func NewBallotFromString

func NewBallotFromString(b string) Ballot

func (Ballot) ID

func (b Ballot) ID() ID

ID return node id as last 32 bits of ballot

func (Ballot) N

func (b Ballot) N() int

N returns first 32 bit of ballot

func (*Ballot) Next

func (b *Ballot) Next(id ID)

Next generates the next ballot number given node id

func (Ballot) String

func (b Ballot) String() string

type Bconfig

type Bconfig struct {
	T                    int     // total number of running time in seconds
	N                    int     // total number of requests
	K                    int     // key sapce
	W                    float64 // write ratio
	Throttle             int     // requests per second throttle, unused if 0
	Concurrency          int     // number of simulated clients
	Distribution         string  // distribution
	LinearizabilityCheck bool    // run linearizability checker at the end of benchmark

	// conflict distribution
	Conflicts int // percentage of conflicting keys
	Min       int // min key

	// normal distribution
	Mu    float64 // mu of normal distribution
	Sigma float64 // sigma of normal distribution
	Move  bool    // moving average (mu) of normal distribution
	Speed int     // moving speed in milliseconds intervals per key

	// zipfian distribution
	ZipfianS float64 // zipfian s parameter
	ZipfianV float64 // zipfian v parameter

	// exponential distribution
	Lambda float64 // rate parameter
}

Bconfig holds all benchmark configuration

func DefaultBConfig

func DefaultBConfig() Bconfig

DefaultBConfig returns a default benchmark config

type Benchmark

type Benchmark struct {
	Bconfig
	*History
	// contains filtered or unexported fields
}

Benchmark is benchmarking tool that generates workload and collects operation history and latency

func NewBenchmark

func NewBenchmark(db DB) *Benchmark

NewBenchmark returns new Benchmark object given implementation of DB interface

func (*Benchmark) Load

func (b *Benchmark) Load()

Load will create all K keys to DB

func (*Benchmark) Run

func (b *Benchmark) Run()

Run starts the main logic of benchmarking

type Client

type Client interface {
	Get(Key) (Value, error)
	Put(Key, Value) error
}

Client interface provides get and put for key value store

type Codec

type Codec interface {
	Scheme() string
	Encode(interface{})
	Decode(interface{})
}

Codec interface provide methods for serialization and deserialization combines json and gob encoder decoder interface

func NewCodec

func NewCodec(scheme string, rw io.ReadWriter) Codec

NewCodec creates new codec object based on scheme, i.e. json and gob

type Command

type Command struct {
	Key       Key
	Value     Value
	ClientID  ID
	CommandID int
}

Command of key-value database

func (Command) Empty

func (c Command) Empty() bool

func (Command) Equal

func (c Command) Equal(a Command) bool

func (Command) IsRead

func (c Command) IsRead() bool

func (Command) IsWrite

func (c Command) IsWrite() bool

func (Command) String

func (c Command) String() string

type Config

type Config struct {
	Addrs     map[ID]string `json:"address"`      // address for node communication
	HTTPAddrs map[ID]string `json:"http_address"` // address for client server communication

	Policy    string  `json:"policy"`    // leader change policy {consecutive, majority}
	Threshold float64 `json:"threshold"` // threshold for policy in WPaxos {n consecutive or time interval in ms}

	Thrifty        bool    `json:"thrifty"`          // only send messages to a quorum
	BufferSize     int     `json:"buffer_size"`      // buffer size for maps
	ChanBufferSize int     `json:"chan_buffer_size"` // buffer size for channels
	MultiVersion   bool    `json:"multiversion"`     // create multi-version database
	Benchmark      Bconfig `json:"benchmark"`        // benchmark configuration
	// contains filtered or unexported fields
}

Config contains every system configuration

func GetConfig

func GetConfig() Config

GetConfig returns paxi package configuration

func MakeDefaultConfig

func MakeDefaultConfig() Config

MakeDefaultConfig returns Config object with few default values only used by init() and master

func (Config) IDs

func (c Config) IDs() []ID

IDs returns all node ids

func (*Config) Load

func (c *Config) Load()

Load loads configuration from config file in JSON format

func (Config) N

func (c Config) N() int

N returns total number of nodes

func (Config) Save

func (c Config) Save() error

Save saves configuration to file in JSON format

func (Config) String

func (c Config) String() string

String is implemented to print the config

func (Config) Z

func (c Config) Z() int

Z returns total number of zones

type DB

type DB interface {
	Init() error
	Read(key int) (int, error)
	Write(key, value int) error
	Stop() error
}

DB is general interface implemented by client to call client library

type Database

type Database interface {
	Execute(Command) Value
	History(Key) []Value
	Get(Key) Value
	Put(Key, Value)
}

Database defines a database interface TODO replace with more general StateMachine interface

func NewDatabase

func NewDatabase() Database

NewDatabase returns database that impelements Database interface

type HTTPClient

type HTTPClient struct {
	Addrs  map[ID]string
	HTTP   map[ID]string
	ID     ID  // client id use the same id as servers in local site
	N      int // total number of nodes
	LocalN int // number of nodes in local zone

	CID int // command id
	*http.Client
}

HTTPClient inplements Client interface with REST API

func NewHTTPClient

func NewHTTPClient(id ID) *HTTPClient

NewHTTPClient creates a new Client from config

func (*HTTPClient) Consensus

func (c *HTTPClient) Consensus(k Key) bool

Consensus collects /history/key from every node and compare their values

func (*HTTPClient) Crash

func (c *HTTPClient) Crash(id ID, t int)

Crash stops the node for t seconds then recover node crash forever if t < 0

func (*HTTPClient) Drop

func (c *HTTPClient) Drop(from, to ID, t int)

Drop drops every message send for t seconds

func (*HTTPClient) Get

func (c *HTTPClient) Get(key Key) (Value, error)

Get gets value of given key (use REST) Default implementation of Client interface

func (*HTTPClient) GetURL

func (c *HTTPClient) GetURL(id ID, key Key) string

func (*HTTPClient) JSONGet

func (c *HTTPClient) JSONGet(key Key) (Value, error)

JSONGet posts get request in json format to server url

func (*HTTPClient) JSONPut

func (c *HTTPClient) JSONPut(key Key, value Value) (Value, error)

JSONPut posts put request in json format to server url

func (*HTTPClient) LocalQuorumGet

func (c *HTTPClient) LocalQuorumGet(key Key) ([]Value, []map[string]string)

func (*HTTPClient) MultiGet

func (c *HTTPClient) MultiGet(n int, key Key) ([]Value, []map[string]string)

MultiGet concurrently read values from n nodes

func (*HTTPClient) Partition

func (c *HTTPClient) Partition(t int, nodes ...ID)

Partition cuts the network between nodes for t seconds

func (*HTTPClient) Put

func (c *HTTPClient) Put(key Key, value Value) error

Put puts new key value pair and return previous value (use REST) Default implementation of Client interface

func (*HTTPClient) QuorumGet

func (c *HTTPClient) QuorumGet(key Key) ([]Value, []map[string]string)

QuorumGet concurrently read values from majority nodes

func (*HTTPClient) QuorumPut

func (c *HTTPClient) QuorumPut(key Key, value Value)

QuorumPut concurrently write values to majority of nodes TODO get headers

func (*HTTPClient) RESTGet

func (c *HTTPClient) RESTGet(id ID, key Key) (Value, map[string]string, error)

RESTGet issues a http call to node and return value and headers

func (*HTTPClient) RESTPut

func (c *HTTPClient) RESTPut(id ID, key Key, value Value) (Value, map[string]string, error)

RESTPut puts new value as http.request body and return previous value

type History

type History struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

History client operation history mapped by key

func NewHistory

func NewHistory() *History

NewHistory creates a History map

func (*History) Add

func (h *History) Add(key int, input, output interface{}, start, end int64)

Add puts an operation in History

func (*History) AddOperation

func (h *History) AddOperation(key int, o *operation)

AddOperation adds the operation

func (*History) Linearizable

func (h *History) Linearizable() int

Linearizable concurrently checks if each partition of the history is linearizable and returns the total number of anomaly reads

func (*History) ReadFile

func (h *History) ReadFile(path string) error

ReadFile reads csv log file and create operations in history

func (*History) WriteFile

func (h *History) WriteFile(path string) error

WriteFile writes entire operation history into file

type ID

type ID string

ID represents a generic identifier in format of Zone.Node

func LeaderID

func LeaderID(ballot int) ID

LeaderID return the node id from ballot number

func NewID

func NewID(zone, node int) ID

NewID returns a new ID type given two int number of zone and node

func (ID) Node

func (i ID) Node() int

Node returns Node ID component

func (ID) Zone

func (i ID) Zone() int

Zone returns Zond ID component

type IDs

type IDs []ID

func (IDs) Len

func (a IDs) Len() int

func (IDs) Less

func (a IDs) Less(i, j int) bool

func (IDs) Swap

func (a IDs) Swap(i, j int)

type Key

type Key int

Key type of the key-value database TODO key should be general too

type Limiter

type Limiter struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Limiter limits operation rate when used with Wait function

func NewLimiter

func NewLimiter(rate int) *Limiter

NewLimiter creates a new rate limiter, where rate is operations per second

func (*Limiter) Wait

func (l *Limiter) Wait()

Wait blocks for the limit

type Node

type Node interface {
	Socket
	Database
	ID() ID
	Run()
	Retry(r Request)
	Forward(id ID, r Request)
	Register(m interface{}, f interface{})
}

Node is the primary access point for every replica it includes networking, state machine and RESTful API server

func NewNode

func NewNode(id ID) Node

NewNode creates a new Node object from configuration

type Policy

type Policy interface {
	Hit(id ID) ID
}

Policy defines a trigger for data access patterns, that can be used in data migration protocols

func NewPolicy

func NewPolicy() Policy

NewPolicy returns the policy by policy name from config

type Quorum

type Quorum struct {
	// contains filtered or unexported fields
}

Quorum records each acknowledgement and check for different types of quorum satisfied

func NewQuorum

func NewQuorum() *Quorum

NewQuorum returns a new Quorum

func (*Quorum) ACK

func (q *Quorum) ACK(id ID)

ACK adds id to quorum ack records

func (*Quorum) ADD

func (q *Quorum) ADD()

ADD increase ack size by one

func (*Quorum) All

func (q *Quorum) All() bool

func (*Quorum) AllZones

func (q *Quorum) AllZones() bool

AllZones returns true if there is at one ack from each zone

func (*Quorum) FGridQ1

func (q *Quorum) FGridQ1(Fz int) bool

FGridQ1 is flexible grid quorum for phase 1

func (*Quorum) FGridQ2

func (q *Quorum) FGridQ2(Fz int) bool

FGridQ2 is flexible grid quorum for phase 2

func (*Quorum) FastQuorum

func (q *Quorum) FastQuorum() bool

FastQuorum from fast paxos

func (*Quorum) GridColumn

func (q *Quorum) GridColumn() bool

GridColumn == all nodes in one zone

func (*Quorum) GridRow

func (q *Quorum) GridRow() bool

GridRow == AllZones

func (*Quorum) Majority

func (q *Quorum) Majority() bool

Majority quorum satisfied

func (*Quorum) NACK

func (q *Quorum) NACK(id ID)

NACK adds id to quorum nack records

func (*Quorum) Reset

func (q *Quorum) Reset()

Reset resets the quorum to empty

func (*Quorum) Size

func (q *Quorum) Size() int

Size returns current ack size

func (*Quorum) ZoneMajority

func (q *Quorum) ZoneMajority() bool

ZoneMajority returns true if majority quorum satisfied in any zone

type Read

type Read struct {
	CommandID int
	Key       Key
}

Read can be used as a special request that directly read the value of key without go through replication protocol in Replica

func (Read) String

func (r Read) String() string

type ReadReply

type ReadReply struct {
	CommandID int
	Value     Value
}

ReadReply cid and value of reading key

func (ReadReply) String

func (r ReadReply) String() string

type Register

type Register struct {
	Client bool
	ID     ID
	Addr   string
}

Register message type is used to regitster self (node or client) with master node

type Reply

type Reply struct {
	Command    Command
	Value      Value
	Properties map[string]string
	Timestamp  int64
	Err        error
}

Reply includes all info that might replies to back the client for the coresponding reqeust

func (Reply) String

func (r Reply) String() string

type Request

type Request struct {
	Command    Command
	Properties map[string]string
	Timestamp  int64
	NodeID     ID // forward by node
	// contains filtered or unexported fields
}

Request is client reqeust with http response channel

func (*Request) Reply

func (r *Request) Reply(reply Reply)

Reply replies to current client session

func (Request) String

func (r Request) String() string

type Socket

type Socket interface {

	// Send put message to outbound queue
	Send(to ID, m interface{})

	// MulticastZone send msg to all nodes in the same site
	MulticastZone(zone int, m interface{})

	// MulticastQuorum sends msg to random number of nodes
	MulticastQuorum(quorum int, m interface{})

	// Broadcast send to all peers
	Broadcast(m interface{})

	// Recv receives a message
	Recv() interface{}

	Close()

	// Fault injection
	Drop(id ID, t int)             // drops every message send to ID last for t seconds
	Slow(id ID, d int, t int)      // delays every message send to ID for d ms and last for t seconds
	Flaky(id ID, p float64, t int) // drop message by chance p for t seconds
	Crash(t int)                   // node crash for t seconds
}

Socket integrates all networking interface and fault injections

func NewSocket

func NewSocket(id ID, addrs map[ID]string) Socket

NewSocket return Socket interface instance given self ID, node list, transport and codec name

type Stat

type Stat struct {
	Data   []float64
	Size   int
	Mean   float64
	Min    float64
	Max    float64
	Median float64
	P95    float64
	P99    float64
	P999   float64
}

Stat stores the statistics data for benchmarking results

func Statistic

func Statistic(latency []time.Duration) Stat

Statistic function creates Stat object from raw latency data

func (Stat) String

func (s Stat) String() string

func (Stat) WriteFile

func (s Stat) WriteFile(path string) error

WriteFile writes stat to new file in path

type State

type State interface {
	Hash() uint64
}

type StateMachine

type StateMachine interface {
	// Execute is the state-transition function
	// returns current state value if state unchanged or previous state value
	Execute(interface{}) interface{}
}

StateMachine defines a deterministic state machine

type Transaction

type Transaction struct {
	Commands  []Command
	Timestamp int64
	// contains filtered or unexported fields
}

Transaction contains arbitrary number of commands in one request TODO read-only or write-only transactions

func (*Transaction) Reply

func (t *Transaction) Reply(r TransactionReply)

Reply replies to current client session

func (Transaction) String

func (t Transaction) String() string

type TransactionReply

type TransactionReply struct {
	OK        bool
	Commands  []Command
	Timestamp int64
	Err       error
}

TransactionReply is the result of transaction struct

type Transport

type Transport interface {
	// Scheme returns tranport scheme
	Scheme() string

	// Send sends message into t.send chan
	Send(interface{})

	// Recv waits for message from t.recv chan
	Recv() interface{}

	// Dial connects to remote server non-blocking once connected
	Dial() error

	// Listen waits for connections, non-blocking once listener starts
	Listen()

	// Close closes send channel and stops listener
	Close()
}

Transport = transport + pipe + client + server

func NewTransport

func NewTransport(addr string) Transport

NewTransport creates new transport object with url

type Value

type Value []byte

Value type of key-value database

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL