raft

package module
v0.0.0-...-9c33c39 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2022 License: BSD-2-Clause Imports: 18 Imported by: 0

README

raft

This is an implementation of the Raft distributed consensus protocol. It's heavily influenced by benbjohnson's implementation. It focuses on providing a clean and usable API, and well-structured internals.

Build Status

Usage

A node in a Raft network is represented by a Server structure. In a typical application, nodes will create a Server, and expose it to other nodes using a Peer interface.

Servers are only useful when they can communicate with other servers. This library includes a HTTP Transport (ingress) and HTTP Peer (egress) which combine to allow communication via REST-ish endpoints. For now, it's the simplest way to embed a Raft server in your application. See this complete example.

Several other transports are coming; see TODO, below.

Adding and removing nodes

The Raft protocol has no affordance for node discovery or "join/leave" semantics. Rather, the protocol assumes an ideal network configuration that's known a priori to nodes in the network, and describes a mechanism (called joint-consensus) to safely replicate that configuration.

My implementation of joint-consensus abides those fundamental assumptions. Nodes may be added or removed dynamically by requesting a SetConfiguration that describes a complete network topology.

TODO

  • Leader election done
  • Log replication done
  • Basic unit tests done
  • HTTP transport done
  • net/rpc transport
  • Other transports?
  • Configuration changes (joint-consensus mode) done
  • Log compaction
  • Robust demo application ☜ in progress
  • Complex unit tests (one per scenario described in the paper)

Documentation

Overview

Package raft is an implementation of the Raft distributed consensus protocol.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// IDPath is where the ID handler (GET) will be installed by the
	// HTTPTransport.
	IDPath = "/raft/id"

	// AppendEntriesPath is where the AppendEntries RPC handler (POST) will be
	// installed by the HTTPTransport.
	AppendEntriesPath = "/raft/appendentries"

	// RequestVotePath is where the requestVote RPC handler (POST) will be
	// installed by the HTTPTransport.
	RequestVotePath = "/raft/requestvote"

	// CommandPath is where the Command RPC handler (POST) will be installed by
	// the HTTPTransport.
	CommandPath = "/raft/command"

	// SetConfigurationPath is where the SetConfiguration RPC handler (POST)
	// will be installed by the HTTPTransport.
	SetConfigurationPath = "/raft/setconfiguration"
)
View Source
var (
	// MinimumElectionTimeoutMS can be set at package initialization. It may be
	// raised to achieve more reliable replication in slow networks, or lowered
	// to achieve faster replication in fast networks. Lowering is not
	// recommended.
	MinimumElectionTimeoutMS int32 = 250
)

Functions

func HTTPTransport

func HTTPTransport(mux *http.ServeMux, s *Server)

HTTPTransport creates an ingress bridge from the outside world to the passed server, by installing handlers for all the necessary RPCs to the passed mux.

Types

type ApplyFunc

type ApplyFunc func(commitIndex uint64, cmd []byte) []byte

ApplyFunc is a client-provided function that should apply a successfully replicated state transition, represented by cmd, to the local state machine, and return a response. commitIndex is the sequence number of the state transition, which is guaranteed to be gapless and monotonically increasing, but not necessarily duplicate-free. ApplyFuncs are not called concurrently. Therefore, clients should ensure they return quickly, i.e. << MinimumElectionTimeout.

type Peer

type Peer interface {
	// contains filtered or unexported methods
}

Peer is the local representation of a remote node. It's an interface that may be backed by any concrete transport: local, HTTP, net/rpc, etc. Peers must be encoding/gob encodable.

func NewHTTPPeer

func NewHTTPPeer(url *url.URL) (Peer, error)

NewHTTPPeer constructs a new HTTP peer. Part of construction involves making a HTTP GET request against the passed URL at IDPath, to resolve the remote server's ID.

type ReceiveFunc

type ReceiveFunc func(data []byte)

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the agent that performs all of the Raft protocol logic. In a typical application, each running process that wants to be part of the distributed state machine will contain a server component.

func NewServer

func NewServer(id uint64, store io.ReadWriter, a ApplyFunc) *Server

NewServer returns an initialized, un-started server. The ID must be unique in the Raft network, and greater than 0. The store will be used by the distributed log as a persistence layer. It's read-from during creation, in case a crashed server is restarted over an already-persisted log. Then, it's written-to during normal operations, when log entries are safely replicated. ApplyFunc will be called whenever a (user-domain) command has been safely replicated and committed to this server's log.

NewServer creates a server, but you'll need to couple it with a transport to make it usable. See the example(s) for usage scenarios.

Example (HTTP)
package main

import (
	"bytes"
	"net/http"
	"net/url"

	"github.com/peterbourgon/raft"
)

func main() {
	// A no-op ApplyFunc
	a := func(uint64, []byte) []byte { return []byte{} }

	// Helper function to parse URLs
	mustParseURL := func(rawurl string) *url.URL {
		u, err := url.Parse(rawurl)
		if err != nil {
			panic(err)
		}
		u.Path = ""
		return u
	}

	// Helper function to construct HTTP Peers
	mustNewHTTPPeer := func(u *url.URL) raft.Peer {
		p, err := raft.NewHTTPPeer(u)
		if err != nil {
			panic(err)
		}
		return p
	}

	// Construct the server
	s := raft.NewServer(1, &bytes.Buffer{}, a)

	// Expose the server using a HTTP transport
	raft.HTTPTransport(http.DefaultServeMux, s)
	go http.ListenAndServe(":8080", nil)

	// Set the initial server configuration
	s.SetConfiguration(
		mustNewHTTPPeer(mustParseURL("http://127.0.0.1:8080")), // this server
		mustNewHTTPPeer(mustParseURL("http://10.1.1.11:8080")),
		mustNewHTTPPeer(mustParseURL("http://10.1.1.12:8080")),
		mustNewHTTPPeer(mustParseURL("http://10.1.1.13:8080")),
		mustNewHTTPPeer(mustParseURL("http://10.1.1.14:8080")),
	)

	// Start the server
	s.Start()
}
Output:

func (*Server) Command

func (s *Server) Command(cmd []byte, response chan<- []byte) error

Command appends the passed command to the leader log. If error is nil, the command will eventually get replicated throughout the Raft network. When the command gets committed to the local server log, it's passed to the apply function, and the response from that function is provided on the passed response chan.

Example
package main

import (
	"bytes"
	"fmt"

	"github.com/peterbourgon/raft"
)

func main() {
	// A no-op ApplyFunc that always returns "PONG"
	ponger := func(uint64, []byte) []byte { return []byte(`PONG`) }

	// Assuming you have a server started
	s := raft.NewServer(1, &bytes.Buffer{}, ponger)

	// Issue a command into the network
	response := make(chan []byte)
	if err := s.Command([]byte(`PING`), response); err != nil {
		panic(err) // command not accepted
	}

	// After the command is replicated, we'll receive the response
	fmt.Printf("%s\n", <-response)
}
Output:

func (*Server) Receive

func (s *Server) Receive(receiver ReceiveFunc)

get received data

func (*Server) SetConfiguration

func (s *Server) SetConfiguration(peers ...Peer) error

SetConfiguration sets the peers that this server will attempt to communicate with. The set peers should include a peer that represents this server. SetConfiguration must be called before starting the server. Calls to SetConfiguration after the server has been started will be replicated throughout the Raft network using the joint-consensus mechanism.

TODO we need to refactor how we parse entries: a single code path from any source (snapshot, persisted log at startup, or over the network) into the log, and as part of that flow, checking if the entry is a configuration and emitting it to the configuration structure. This implies an unfortunate coupling: whatever processes log entries must have both the configuration and the log as data sinks.

func (*Server) Start

func (s *Server) Start()

Start triggers the server to begin communicating with its peers.

func (*Server) Stop

func (s *Server) Stop()

Stop terminates the server. Stopped servers should not be restarted.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL