redeoraft

package module
v0.0.0-...-e9ef53c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2018 License: Apache-2.0 Imports: 17 Imported by: 1

README

Redeo Raft

GoDoc Build Status Go Report Card License

Raft transport implementation for Redeo servers.

Example

func run() {
	// Init server with default config
	srv := redeo.NewServer(nil)

	// Init a new transport, this installs three new commands on your
	// server:
	// * raftappend - appends replicated log entries from leader
	// * raftvote - replies to vote requests in an leadership election
	// * raftsnapshot - installs a snapshot
	tsp := redeoraft.NewTransport(srv, "10.0.0.1:9736", &redeoraft.Config{
		Timeout: time.Minute,
	})
	defer tsp.Close()

	// Use the transport in your raft configuration
	rft, err := raft.NewRaft(raft.DefaultConfig(), &ExampleRaftService{}, raft.NewInmemStore(), raft.NewInmemStore(), raft.NewInmemSnapshotStore(), tsp)
	if err != nil {
		panic(err)
	}
	defer rft.Shutdown()
}

Dependencies

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddPeer

func AddPeer(r *raft.Raft) redeo.Handler

AddPeer handler add a voting member to the cluster

func Leader

func Leader(r *raft.Raft) redeo.Handler

Leader handler retrieves the address of the cluster leader

Example

This example demonstrates the use of the leader handler

package main

import (
	"io"
	"time"

	"github.com/bsm/redeo"
	"github.com/bsm/redeoraft"
	"github.com/hashicorp/raft"
)

func main() {
	// Init server
	srv := redeo.NewServer(nil)

	// Start raft
	rft, tsp, err := startRaft(srv)
	if err != nil {
		panic(err)
	}
	defer rft.Shutdown()
	defer tsp.Close()

	// Report leader
	srv.Handle("raftleader", redeoraft.Leader(rft))

	// $ redis-cli -p 9736 raftleader
	// "10.0.0.1:9736"
}

func startRaft(srv *redeo.Server) (*raft.Raft, *redeoraft.Transport, error) {

	tsp := redeoraft.NewTransport(srv, "10.0.0.1:9736", &redeoraft.Config{
		Timeout: time.Minute,
	})

	rft, err := raft.NewRaft(raft.DefaultConfig(), &ExampleRaftService{}, raft.NewInmemStore(), raft.NewInmemStore(), raft.NewInmemSnapshotStore(), tsp)
	if err != nil {
		_ = tsp.Close()
		return nil, nil, err
	}

	return rft, tsp, nil
}

type ExampleRaftService struct{}

func (s *ExampleRaftService) Apply(_ *raft.Log) interface{} { return nil }
func (s *ExampleRaftService) Restore(_ io.ReadCloser) error { return nil }
func (s *ExampleRaftService) Snapshot() (raft.FSMSnapshot, error) {
	return nil, raft.ErrNothingNewToSnapshot
}
Output:

func Peers

func Peers(r *raft.Raft) redeo.Handler

Peers handler retrieves a list of peers

func RemovePeer

func RemovePeer(r *raft.Raft) redeo.Handler

RemovePeer removes a member from the cluster

func Sentinel

func Sentinel(name string, r *raft.Raft, b *redeo.PubSubBroker) redeo.Handler

Sentinel handler respond to a subset of SENTINEL commands and makes your server behave like an instance of a sentinel cluster.

Implemented sub-commands are:

SENTINELS - returns (abbreviated) peer attributes
MASTER - returns (abbreviated) master attributes
SLAVES - returns (abbreviated) slave attributes
GET-MASTER-ADDR-BY-NAME - returns a the master address
Example

This example demonstrates the use of sentinel commands on the server

package main

import (
	"io"
	"time"

	"github.com/bsm/redeo"
	"github.com/bsm/redeoraft"
	"github.com/hashicorp/raft"
)

func main() {
	// Init server
	srv := redeo.NewServer(nil)

	// Start raft
	rft, tsp, err := startRaft(srv)
	if err != nil {
		panic(err)
	}
	defer rft.Shutdown()
	defer tsp.Close()

	// Create a pub-sub broker and handle messages
	broker := redeo.NewPubSubBroker()
	srv.Handle("publish", broker.Publish())
	srv.Handle("subscribe", broker.Subscribe())

	// Listen to sentinel commands
	srv.Handle("sentinel", redeoraft.Sentinel("", rft, broker))

	// $ redis-cli -p 9736 sentinel get-master-addr-by-name mymaster
	// 1) 10.0.0.1
	// 2) 9736
}

func startRaft(srv *redeo.Server) (*raft.Raft, *redeoraft.Transport, error) {

	tsp := redeoraft.NewTransport(srv, "10.0.0.1:9736", &redeoraft.Config{
		Timeout: time.Minute,
	})

	rft, err := raft.NewRaft(raft.DefaultConfig(), &ExampleRaftService{}, raft.NewInmemStore(), raft.NewInmemStore(), raft.NewInmemSnapshotStore(), tsp)
	if err != nil {
		_ = tsp.Close()
		return nil, nil, err
	}

	return rft, tsp, nil
}

type ExampleRaftService struct{}

func (s *ExampleRaftService) Apply(_ *raft.Log) interface{} { return nil }
func (s *ExampleRaftService) Restore(_ io.ReadCloser) error { return nil }
func (s *ExampleRaftService) Snapshot() (raft.FSMSnapshot, error) {
	return nil, raft.ErrNothingNewToSnapshot
}
Output:

func Snapshot

func Snapshot(r *raft.Raft) redeo.Handler

Snapshot handler trigger a snapshot

func State

func State(r *raft.Raft) redeo.Handler

State handler returns the state of the current node

Example

This example demonstrates the use of the state handler

package main

import (
	"io"
	"time"

	"github.com/bsm/redeo"
	"github.com/bsm/redeoraft"
	"github.com/hashicorp/raft"
)

func main() {
	// Init server
	srv := redeo.NewServer(nil)

	// Start raft
	rft, tsp, err := startRaft(srv)
	if err != nil {
		panic(err)
	}
	defer rft.Shutdown()
	defer tsp.Close()

	// Report state
	srv.Handle("raftstate", redeoraft.State(rft))

	// $ redis-cli -p 9736 raftstate
	// "leader"
}

func startRaft(srv *redeo.Server) (*raft.Raft, *redeoraft.Transport, error) {

	tsp := redeoraft.NewTransport(srv, "10.0.0.1:9736", &redeoraft.Config{
		Timeout: time.Minute,
	})

	rft, err := raft.NewRaft(raft.DefaultConfig(), &ExampleRaftService{}, raft.NewInmemStore(), raft.NewInmemStore(), raft.NewInmemSnapshotStore(), tsp)
	if err != nil {
		_ = tsp.Close()
		return nil, nil, err
	}

	return rft, tsp, nil
}

type ExampleRaftService struct{}

func (s *ExampleRaftService) Apply(_ *raft.Log) interface{} { return nil }
func (s *ExampleRaftService) Restore(_ io.ReadCloser) error { return nil }
func (s *ExampleRaftService) Snapshot() (raft.FSMSnapshot, error) {
	return nil, raft.ErrNothingNewToSnapshot
}
Output:

func Stats

func Stats(r *raft.Raft) redeo.Handler

Stats handler retrieves the stats of the cluster

Types

type Config

type Config struct {
	// AppendEntriesCommand allows to customise the
	// command name which is used to append entries.
	// Default: raftappend
	AppendEntriesCommand string

	// RequestVoteCommand allows to customise the
	// command name which is used to request a vote.
	// Default: raftvote
	RequestVoteCommand string

	// InstallSnapshotCommand allows to customise the
	// command name which is used to install a snapshot.
	// Default: raftsnapshot
	InstallSnapshotCommand string

	// Timeout is used to apply I/O deadlines.
	// Default: 0 (= no timeout)
	Timeout time.Duration
}

Config allows to customise transports

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport allows redeo instances to communicate cluster messages

Example
package main

import (
	"io"
	"time"

	"github.com/bsm/redeo"
	"github.com/bsm/redeoraft"
	"github.com/hashicorp/raft"
)

func main() {
	// Init server with default config
	srv := redeo.NewServer(nil)

	// Init a new transport, this installs three new commands on your
	// server:
	// * raftappend - appends replicated log entries from leader
	// * raftvote - replies to vote requests in an leadership election
	// * raftsnapshot - installs a snapshot
	tsp := redeoraft.NewTransport(srv, "10.0.0.1:9736", &redeoraft.Config{
		Timeout: time.Minute,
	})
	defer tsp.Close()

	// Use the transport in your raft configuration
	rft, err := raft.NewRaft(raft.DefaultConfig(), &ExampleRaftService{}, raft.NewInmemStore(), raft.NewInmemStore(), raft.NewInmemSnapshotStore(), tsp)
	if err != nil {
		panic(err)
	}
	defer rft.Shutdown()
}

type ExampleRaftService struct{}

func (s *ExampleRaftService) Apply(_ *raft.Log) interface{} { return nil }
func (s *ExampleRaftService) Restore(_ io.ReadCloser) error { return nil }
func (s *ExampleRaftService) Snapshot() (raft.FSMSnapshot, error) {
	return nil, raft.ErrNothingNewToSnapshot
}
Output:

func NewTransport

func NewTransport(s *redeo.Server, advertise raft.ServerAddress, conf *Config) *Transport

NewTransport creates a new transport and installs the required handlers on the server (see Config). It also requires an address it can advertise to peers.

func (*Transport) AppendEntries

AppendEntries implements the Transport interface.

func (*Transport) AppendEntriesPipeline

func (t *Transport) AppendEntriesPipeline(_ raft.ServerID, target raft.ServerAddress) (raft.AppendPipeline, error)

AppendEntriesPipeline returns an interface that can be used to pipeline AppendEntries requests.

func (*Transport) Close

func (t *Transport) Close() error

Close closes the transport and abandons calls in progress

func (*Transport) Consumer

func (t *Transport) Consumer() <-chan raft.RPC

Consumer implements the raft.Transport interface.

func (*Transport) DecodePeer

func (t *Transport) DecodePeer(peer []byte) raft.ServerAddress

DecodePeer implements the raft.Transport interface.

func (*Transport) EncodePeer

func (t *Transport) EncodePeer(_ raft.ServerID, peer raft.ServerAddress) []byte

EncodePeer implements the raft.Transport interface.

func (*Transport) InstallSnapshot

InstallSnapshot implements the Transport interface.

func (*Transport) LocalAddr

func (t *Transport) LocalAddr() raft.ServerAddress

LocalAddr implements the raft.Transport interface.

func (*Transport) RequestVote

RequestVote implements the Transport interface.

func (*Transport) SetHeartbeatHandler

func (t *Transport) SetHeartbeatHandler(fn func(rpc raft.RPC))

SetHeartbeatHandler implements the raft.Transport interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL