Documentation ¶
Index ¶
- func AddPeer(r *raft.Raft) redeo.Handler
- func Leader(r *raft.Raft) redeo.Handler
- func Peers(r *raft.Raft) redeo.Handler
- func RemovePeer(r *raft.Raft) redeo.Handler
- func Sentinel(name string, r *raft.Raft, b *redeo.PubSubBroker) redeo.Handler
- func Snapshot(r *raft.Raft) redeo.Handler
- func State(r *raft.Raft) redeo.Handler
- func Stats(r *raft.Raft) redeo.Handler
- type Config
- type Transport
- func (t *Transport) AppendEntries(_ raft.ServerID, target raft.ServerAddress, req *raft.AppendEntriesRequest, ...) error
- func (t *Transport) AppendEntriesPipeline(_ raft.ServerID, target raft.ServerAddress) (raft.AppendPipeline, error)
- func (t *Transport) Close() error
- func (t *Transport) Consumer() <-chan raft.RPC
- func (t *Transport) DecodePeer(peer []byte) raft.ServerAddress
- func (t *Transport) EncodePeer(_ raft.ServerID, peer raft.ServerAddress) []byte
- func (t *Transport) InstallSnapshot(_ raft.ServerID, target raft.ServerAddress, req *raft.InstallSnapshotRequest, ...) error
- func (t *Transport) LocalAddr() raft.ServerAddress
- func (t *Transport) RequestVote(_ raft.ServerID, target raft.ServerAddress, req *raft.RequestVoteRequest, ...) error
- func (t *Transport) SetHeartbeatHandler(fn func(rpc raft.RPC))
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Leader ¶
Leader handler retrieves the address of the cluster leader
Example ¶
This example demonstrates the use of the leader handler
package main import ( "io" "time" "github.com/bsm/redeo" "github.com/bsm/redeoraft" "github.com/hashicorp/raft" ) func main() { // Init server srv := redeo.NewServer(nil) // Start raft rft, tsp, err := startRaft(srv) if err != nil { panic(err) } defer rft.Shutdown() defer tsp.Close() // Report leader srv.Handle("raftleader", redeoraft.Leader(rft)) // $ redis-cli -p 9736 raftleader // "10.0.0.1:9736" } func startRaft(srv *redeo.Server) (*raft.Raft, *redeoraft.Transport, error) { tsp := redeoraft.NewTransport(srv, "10.0.0.1:9736", &redeoraft.Config{ Timeout: time.Minute, }) rft, err := raft.NewRaft(raft.DefaultConfig(), &ExampleRaftService{}, raft.NewInmemStore(), raft.NewInmemStore(), raft.NewInmemSnapshotStore(), tsp) if err != nil { _ = tsp.Close() return nil, nil, err } return rft, tsp, nil } type ExampleRaftService struct{} func (s *ExampleRaftService) Apply(_ *raft.Log) interface{} { return nil } func (s *ExampleRaftService) Restore(_ io.ReadCloser) error { return nil } func (s *ExampleRaftService) Snapshot() (raft.FSMSnapshot, error) { return nil, raft.ErrNothingNewToSnapshot }
Output:
func RemovePeer ¶
RemovePeer removes a member from the cluster
func Sentinel ¶
Sentinel handler respond to a subset of SENTINEL commands and makes your server behave like an instance of a sentinel cluster.
Implemented sub-commands are:
SENTINELS - returns (abbreviated) peer attributes MASTER - returns (abbreviated) master attributes SLAVES - returns (abbreviated) slave attributes GET-MASTER-ADDR-BY-NAME - returns a the master address
Example ¶
This example demonstrates the use of sentinel commands on the server
package main import ( "io" "time" "github.com/bsm/redeo" "github.com/bsm/redeoraft" "github.com/hashicorp/raft" ) func main() { // Init server srv := redeo.NewServer(nil) // Start raft rft, tsp, err := startRaft(srv) if err != nil { panic(err) } defer rft.Shutdown() defer tsp.Close() // Create a pub-sub broker and handle messages broker := redeo.NewPubSubBroker() srv.Handle("publish", broker.Publish()) srv.Handle("subscribe", broker.Subscribe()) // Listen to sentinel commands srv.Handle("sentinel", redeoraft.Sentinel("", rft, broker)) // $ redis-cli -p 9736 sentinel get-master-addr-by-name mymaster // 1) 10.0.0.1 // 2) 9736 } func startRaft(srv *redeo.Server) (*raft.Raft, *redeoraft.Transport, error) { tsp := redeoraft.NewTransport(srv, "10.0.0.1:9736", &redeoraft.Config{ Timeout: time.Minute, }) rft, err := raft.NewRaft(raft.DefaultConfig(), &ExampleRaftService{}, raft.NewInmemStore(), raft.NewInmemStore(), raft.NewInmemSnapshotStore(), tsp) if err != nil { _ = tsp.Close() return nil, nil, err } return rft, tsp, nil } type ExampleRaftService struct{} func (s *ExampleRaftService) Apply(_ *raft.Log) interface{} { return nil } func (s *ExampleRaftService) Restore(_ io.ReadCloser) error { return nil } func (s *ExampleRaftService) Snapshot() (raft.FSMSnapshot, error) { return nil, raft.ErrNothingNewToSnapshot }
Output:
func State ¶
State handler returns the state of the current node
Example ¶
This example demonstrates the use of the state handler
package main import ( "io" "time" "github.com/bsm/redeo" "github.com/bsm/redeoraft" "github.com/hashicorp/raft" ) func main() { // Init server srv := redeo.NewServer(nil) // Start raft rft, tsp, err := startRaft(srv) if err != nil { panic(err) } defer rft.Shutdown() defer tsp.Close() // Report state srv.Handle("raftstate", redeoraft.State(rft)) // $ redis-cli -p 9736 raftstate // "leader" } func startRaft(srv *redeo.Server) (*raft.Raft, *redeoraft.Transport, error) { tsp := redeoraft.NewTransport(srv, "10.0.0.1:9736", &redeoraft.Config{ Timeout: time.Minute, }) rft, err := raft.NewRaft(raft.DefaultConfig(), &ExampleRaftService{}, raft.NewInmemStore(), raft.NewInmemStore(), raft.NewInmemSnapshotStore(), tsp) if err != nil { _ = tsp.Close() return nil, nil, err } return rft, tsp, nil } type ExampleRaftService struct{} func (s *ExampleRaftService) Apply(_ *raft.Log) interface{} { return nil } func (s *ExampleRaftService) Restore(_ io.ReadCloser) error { return nil } func (s *ExampleRaftService) Snapshot() (raft.FSMSnapshot, error) { return nil, raft.ErrNothingNewToSnapshot }
Output:
Types ¶
type Config ¶
type Config struct { // AppendEntriesCommand allows to customise the // command name which is used to append entries. // Default: raftappend AppendEntriesCommand string // RequestVoteCommand allows to customise the // command name which is used to request a vote. // Default: raftvote RequestVoteCommand string // InstallSnapshotCommand allows to customise the // command name which is used to install a snapshot. // Default: raftsnapshot InstallSnapshotCommand string // Timeout is used to apply I/O deadlines. // Default: 0 (= no timeout) Timeout time.Duration }
Config allows to customise transports
type Transport ¶
type Transport struct {
// contains filtered or unexported fields
}
Transport allows redeo instances to communicate cluster messages
Example ¶
package main import ( "io" "time" "github.com/bsm/redeo" "github.com/bsm/redeoraft" "github.com/hashicorp/raft" ) func main() { // Init server with default config srv := redeo.NewServer(nil) // Init a new transport, this installs three new commands on your // server: // * raftappend - appends replicated log entries from leader // * raftvote - replies to vote requests in an leadership election // * raftsnapshot - installs a snapshot tsp := redeoraft.NewTransport(srv, "10.0.0.1:9736", &redeoraft.Config{ Timeout: time.Minute, }) defer tsp.Close() // Use the transport in your raft configuration rft, err := raft.NewRaft(raft.DefaultConfig(), &ExampleRaftService{}, raft.NewInmemStore(), raft.NewInmemStore(), raft.NewInmemSnapshotStore(), tsp) if err != nil { panic(err) } defer rft.Shutdown() } type ExampleRaftService struct{} func (s *ExampleRaftService) Apply(_ *raft.Log) interface{} { return nil } func (s *ExampleRaftService) Restore(_ io.ReadCloser) error { return nil } func (s *ExampleRaftService) Snapshot() (raft.FSMSnapshot, error) { return nil, raft.ErrNothingNewToSnapshot }
Output:
func NewTransport ¶
NewTransport creates a new transport and installs the required handlers on the server (see Config). It also requires an address it can advertise to peers.
func (*Transport) AppendEntries ¶
func (t *Transport) AppendEntries(_ raft.ServerID, target raft.ServerAddress, req *raft.AppendEntriesRequest, res *raft.AppendEntriesResponse) error
AppendEntries implements the Transport interface.
func (*Transport) AppendEntriesPipeline ¶
func (t *Transport) AppendEntriesPipeline(_ raft.ServerID, target raft.ServerAddress) (raft.AppendPipeline, error)
AppendEntriesPipeline returns an interface that can be used to pipeline AppendEntries requests.
func (*Transport) DecodePeer ¶
func (t *Transport) DecodePeer(peer []byte) raft.ServerAddress
DecodePeer implements the raft.Transport interface.
func (*Transport) EncodePeer ¶
EncodePeer implements the raft.Transport interface.
func (*Transport) InstallSnapshot ¶
func (t *Transport) InstallSnapshot(_ raft.ServerID, target raft.ServerAddress, req *raft.InstallSnapshotRequest, res *raft.InstallSnapshotResponse, snap io.Reader) error
InstallSnapshot implements the Transport interface.
func (*Transport) LocalAddr ¶
func (t *Transport) LocalAddr() raft.ServerAddress
LocalAddr implements the raft.Transport interface.
func (*Transport) RequestVote ¶
func (t *Transport) RequestVote(_ raft.ServerID, target raft.ServerAddress, req *raft.RequestVoteRequest, res *raft.RequestVoteResponse) error
RequestVote implements the Transport interface.
func (*Transport) SetHeartbeatHandler ¶
SetHeartbeatHandler implements the raft.Transport interface.