raftstore

package
v0.0.0-...-7b3ddb2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2019 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TruncateTicket = 5 * time.Minute
	TruncateCounts = 200000

	FlushTicket = 1 * time.Second
)

Variables

This section is empty.

Functions

func StartRaftServer

func StartRaftServer(nodeId entity.NodeID, ip string, resolver raft.SocketResolver) (*raft.RaftServer, error)

Types

type EventListener

type EventListener interface {
	HandleRaftReplicaEvent(event *RaftReplicaEvent)
	HandleRaftLeaderEvent(event *RaftLeaderEvent)
	HandleRaftFatalEvent(event *RaftFatalEvent)
}

this interface for event , server implements it

type RaftApplyResponse

type RaftApplyResponse struct {
	Results []*response.DocResult
	Result  *response.DocResult
	FlushC  chan error
	Err     error
}

func (*RaftApplyResponse) SetErr

func (r *RaftApplyResponse) SetErr(err error) *RaftApplyResponse

type RaftFatalEvent

type RaftFatalEvent struct {
	PartitionId entity.PartitionID
	Cause       error
}

type RaftLeaderEvent

type RaftLeaderEvent struct {
	PartitionId entity.PartitionID
	Leader      entity.NodeID
}

type RaftReplicaEvent

type RaftReplicaEvent struct {
	PartitionId entity.PartitionID
	Delete      bool
	Replica     *entity.Replica
}

type RaftResolver

type RaftResolver struct {
	// contains filtered or unexported fields
}

RaftResolver resolve NodeID to net.Addr addresses

func NewRaftResolver

func NewRaftResolver() *RaftResolver

NewRaftResolver create RaftResolver

func (*RaftResolver) AddNode

func (r *RaftResolver) AddNode(id entity.NodeID, replica *entity.Replica)

func (*RaftResolver) DeleteNode

func (r *RaftResolver) DeleteNode(id entity.NodeID)

func (*RaftResolver) GetNode

func (r *RaftResolver) GetNode(id entity.NodeID) *nodeRef

func (*RaftResolver) NodeAddress

func (r *RaftResolver) NodeAddress(nodeID uint64, stype raft.SocketType) (string, error)

NodeAddress resolve NodeID to net.Addr addresses.

func (*RaftResolver) ToReplica

func (r *RaftResolver) ToReplica(id entity.NodeID) (replica *entity.Replica)

type Store

type Store struct {
	*storage.StoreBase
	RaftPath      string
	RaftServer    *raft.RaftServer
	EventListener EventListener
	Sn            int64
	LastFlushSn   int64
	Client        *client.Client
}

func CreateStore

func CreateStore(ctx context.Context, pID entity.PartitionID, nodeID entity.NodeID, space *entity.Space, raftServer *raft.RaftServer, eventListener EventListener, client *client.Client) (*Store, error)

CreateStore create an instance of Store.

func (*Store) Apply

func (s *Store) Apply(command []byte, index uint64) (resp interface{}, err error)

Apply implements the raft interface.

func (*Store) ApplyMemberChange

func (s *Store) ApplyMemberChange(confChange *proto.ConfChange, index uint64) (interface{}, error)

ApplyMemberChange implements the raft interface.

func (*Store) ApplySnapshot

func (s *Store) ApplySnapshot(peers []proto.Peer, iter proto.SnapIterator) error

ApplySnapshot implements the raft interface.

func (*Store) ChangeMember

func (s *Store) ChangeMember(changeType proto.ConfChangeType, server *entity.Server) error

func (*Store) Close

func (s *Store) Close() error

Destroy close partition store if it running currently.

func (*Store) DeleteByQuery

func (s *Store) DeleteByQuery(ctx context.Context, readLeader bool, query *request.SearchRequest) (delCount int, err error)

func (*Store) Destroy

func (s *Store) Destroy() (err error)

Destroy close partition store if it running currently and remove all data file from filesystem.

func (*Store) Flush

func (s *Store) Flush(ctx context.Context) error

func (*Store) GetDocument

func (s *Store) GetDocument(ctx context.Context, readLeader bool, docID string) (*response.DocResult, error)

func (*Store) GetDocuments

func (s *Store) GetDocuments(ctx context.Context, readLeader bool, docIds []string) (response.DocResults, error)

func (*Store) GetLeader

func (s *Store) GetLeader() (entity.NodeID, uint64)

func (*Store) GetPartition

func (s *Store) GetPartition() *entity.Partition

func (*Store) GetRTDocument

func (s *Store) GetRTDocument(ctx context.Context, readLeader bool, docID string) (*response.DocResult, error)

func (*Store) GetUnreachable

func (s *Store) GetUnreachable(id uint64) []uint64

func (*Store) HandleFatalEvent

func (s *Store) HandleFatalEvent(err *raft.FatalError)

HandleFatalEvent implements the raft interface.

func (*Store) HandleLeaderChange

func (s *Store) HandleLeaderChange(leader uint64)

HandleLeaderChange implements the raft interface.

func (*Store) IsLeader

func (s *Store) IsLeader() bool

func (*Store) MSearch

func (s *Store) MSearch(ctx context.Context, readLeader bool, query *request.SearchRequest) (result response.SearchResponses, err error)

func (*Store) Search

func (s *Store) Search(ctx context.Context, readLeader bool, query *request.SearchRequest) (result *response.SearchResponse, err error)

func (*Store) Snapshot

func (s *Store) Snapshot() (proto.Snapshot, error)

Snapshot implements the raft interface.

func (*Store) Start

func (s *Store) Start() (err error)

Start start the store.

func (*Store) StreamSearch

func (s *Store) StreamSearch(ctx context.Context, readLeader bool, query *request.SearchRequest, resultChan chan *response.DocResult) error

func (*Store) UpdateSpace

func (s *Store) UpdateSpace(ctx context.Context, space *entity.Space) error

func (*Store) Write

func (s *Store) Write(ctx context.Context, request *pspb.DocCmd) (result *response.DocResult, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL