cluster

package
v0.0.0-...-46d7ffa Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2023 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	UpsertEvent statEventEventType = iota
	DeleteEvent
	ReadEvent
)
View Source
const JOBS_KEY_NAME = "syncJobs" //tasks related to new PTs
View Source
const STREAMING_KEY_NAME = "streamingJobs" //sending data to other nodes

Variables

This section is empty.

Functions

func CreateAckFrame

func CreateAckFrame(nodeId string) *tcp.Frame

func GenerateNodeId

func GenerateNodeId(nodeIp string, nodePort string) string

Types

type ClusterManager

type ClusterManager struct {
	// contains filtered or unexported fields
}

func InitClusterManager

func InitClusterManager(currentNode *Node) (*ClusterManager, error)

func (*ClusterManager) BootstrapNewNode

func (cm *ClusterManager) BootstrapNewNode() error

func (*ClusterManager) CanSetHeartbitPartionTable

func (cm *ClusterManager) CanSetHeartbitPartionTable() bool

func (*ClusterManager) GetClusterStatus

func (cm *ClusterManager) GetClusterStatus() []server.Server

func (*ClusterManager) GetCurrentNode

func (cm *ClusterManager) GetCurrentNode() *Node

func (*ClusterManager) GetServers

func (cm *ClusterManager) GetServers() map[string]server.Server

func (*ClusterManager) GetSystemDB

func (cm *ClusterManager) GetSystemDB() *storage.Collection

func (*ClusterManager) JoinCluster

func (cm *ClusterManager) JoinCluster(forceRejoin bool) error

func (*ClusterManager) JoinClusterAsNewNode

func (cm *ClusterManager) JoinClusterAsNewNode() error

func (*ClusterManager) NodeReadyForWork

func (cm *ClusterManager) NodeReadyForWork()

func (*ClusterManager) SetNewClusterMaster

func (cm *ClusterManager) SetNewClusterMaster(fromNodeId string) (bool, int64)

func (*ClusterManager) Shutdown

func (cm *ClusterManager) Shutdown()

func (*ClusterManager) StartAddNewNode

func (cm *ClusterManager) StartAddNewNode(reqFromNodeId string, newNodeId string, newNodeIp string, newNodePort string, numberOfVNodes int) error

func (*ClusterManager) StartCommunications

func (cm *ClusterManager) StartCommunications() chan actions.NodeActions

func (*ClusterManager) StartDecommissionNode

func (cm *ClusterManager) StartDecommissionNode(reqFromNodeId, nodeId string) error

func (*ClusterManager) UpdateServers

func (cm *ClusterManager) UpdateServers(servers map[string]server.Server)

func (*ClusterManager) UpdateServersHeartbitStatus

func (cm *ClusterManager) UpdateServersHeartbitStatus()

func (*ClusterManager) VerifyNodeCrash

func (cm *ClusterManager) VerifyNodeCrash()

type CollectionStats

type CollectionStats struct {
	CollectionName  string
	NumberOfEntries uint64
	NumberOfUpserts uint64
	NumberOfReads   uint64
}

type DataSyncJob

type DataSyncJob struct {
	PartitionTable       partitioner.PartitionTable
	SynchTasks           []partitioner.DataSyncTask
	WaitingToStartDelete bool
	GoDelete             bool
}

type DataSyncManager

type DataSyncManager struct {
	// contains filtered or unexported fields
}

func InitDataSyncManager

func InitDataSyncManager(clusterManager *ClusterManager) (*DataSyncManager, error)

func (*DataSyncManager) AddStreamingTask

func (dsm *DataSyncManager) AddStreamingTask(taskId string, toNodeId string, startToken uint64, endToken uint64)

func (*DataSyncManager) CanSetPartitionTable

func (dsm *DataSyncManager) CanSetPartitionTable() bool

false if node is waiting for data

func (*DataSyncManager) ForceRetryStreamingTask

func (dsm *DataSyncManager) ForceRetryStreamingTask(taskId string)

func (*DataSyncManager) GetStatus

func (dsm *DataSyncManager) GetStatus() types.DataSyncStatusResponse

func (*DataSyncManager) InitUpdatePartitionTableProcess

func (dsm *DataSyncManager) InitUpdatePartitionTableProcess(pt *partitioner.PartitionTable) error

func (*DataSyncManager) IsWaitingForData

func (dsm *DataSyncManager) IsWaitingForData(hash uint64) (bool, string)

func (*DataSyncManager) ProcessDataDeleteTasks

func (dsm *DataSyncManager) ProcessDataDeleteTasks()

func (*DataSyncManager) ProcessNextGetDataTask

func (dsm *DataSyncManager) ProcessNextGetDataTask()

Processes one task (the first) from the queue, when task is completed calls ProcessNextGetDataTask again

func (*DataSyncManager) SaveNewDataChunk

func (dsm *DataSyncManager) SaveNewDataChunk(taskId string, collectionName string, progress uint64, data []stream.StreamEntry)

func (*DataSyncManager) StartStreaming

func (dsm *DataSyncManager) StartStreaming()

func (*DataSyncManager) TaskCompleted

func (dsm *DataSyncManager) TaskCompleted(taskId string)

remove tasks and jobs from queue

func (*DataSyncManager) VerifyClusterSyncWihtPartionTable

func (dsm *DataSyncManager) VerifyClusterSyncWihtPartionTable()

type Event

type Event struct {
	Type       EventType
	Collection string
	Key        []byte
}

func (*Event) ToString

func (e *Event) ToString() string

type EventType

type EventType uint8
const (
	Insert EventType = iota
	Update
	Delete
)

func (EventType) String

func (s EventType) String() string

type EventsManager

type EventsManager struct {
	// contains filtered or unexported fields
}

func (*EventsManager) HandleEvent

func (em *EventsManager) HandleEvent(msg tcp.MessageFromCluster) error

type HeartbitManager

type HeartbitManager struct {
	// contains filtered or unexported fields
}

func InitHeartbitManager

func InitHeartbitManager(nodeId string, port string) *HeartbitManager

func (*HeartbitManager) GetServers

func (h *HeartbitManager) GetServers() []server.Server

func (*HeartbitManager) JoinCluster

func (h *HeartbitManager) JoinCluster()

func (*HeartbitManager) SetPartitionTableTimestamp

func (h *HeartbitManager) SetPartitionTableTimestamp(partitionTableTimestamp int64)

func (*HeartbitManager) Shutdown

func (h *HeartbitManager) Shutdown()

func (*HeartbitManager) UpdateHardwareStats

func (h *HeartbitManager) UpdateHardwareStats()

type Node

type Node struct {
	NodeIp   string
	NodePort string
	// contains filtered or unexported fields
}

func InitNode

func InitNode(conf utils.Configuration) (*Node, error)

func (*Node) Delete

func (n *Node) Delete(collectionName string, key interface{}) error

func (*Node) DeleteCollection

func (n *Node) DeleteCollection(collectionName string) error

this will delete the collection locally, it wont delete it from the other nodes in the cluster

func (*Node) DeleteRaw

func (n *Node) DeleteRaw(fromNodeId string, collectionName string, key []byte) (err error)

func (*Node) Get

func (n *Node) Get(collectionName string, key interface{}, value interface{}) error

func (*Node) GetFilteredIterator

func (n *Node) GetFilteredIterator(collectionName string, from interface{}, to interface{}) (*storage.Iterator, error)

func (*Node) GetHeartbitStatus

func (n *Node) GetHeartbitStatus() []server.Server

func (*Node) GetId

func (n *Node) GetId() string

func (*Node) GetIterator

func (n *Node) GetIterator(collectionName string) (*storage.Iterator, error)

func (*Node) GetIteratorFrom

func (n *Node) GetIteratorFrom(collectionName string, from interface{}) (*storage.Iterator, error)

func (*Node) GetKeyLocationInCluster

func (n *Node) GetKeyLocationInCluster(key []byte) partitioner.HashLocation

func (*Node) GetRaw

func (n *Node) GetRaw(fromNodeId string, collectionName string, key []byte) ([]byte, error)

func (*Node) GetStats

func (n *Node) GetStats() *NodeStats

func (*Node) ManageNodesDown

func (n *Node) ManageNodesDown(nodesWithProblems []string)

func (*Node) ScanRaw

func (n *Node) ScanRaw(collectionName string, startFromKey []byte, numberOfResults int) ([]types.RWRequest, error)

func (*Node) Shutdown

func (n *Node) Shutdown()

func (*Node) Start

func (n *Node) Start(ctx context.Context, signalChan chan os.Signal, forceRejoin bool, onReadyChan chan bool) error

func (*Node) StartHeartbit

func (n *Node) StartHeartbit()

func (*Node) SubscribeTo

func (n *Node) SubscribeTo(collectionName string) chan Event

func (*Node) UpdateHeartbitPartitionTable

func (n *Node) UpdateHeartbitPartitionTable(timestamp int64)

func (*Node) Upsert

func (n *Node) Upsert(collectionName string, key interface{}, value interface{}) error

func (*Node) UpsertRaw

func (n *Node) UpsertRaw(fromNodeId string, collectionName string, key []byte, value []byte) (err error)

type NodeStats

type NodeStats struct {
	Collections        []string
	StatsPerCollection map[string]CollectionStats
	// contains filtered or unexported fields
}

func InitNodeStats

func InitNodeStats() (*NodeStats, error)

func (*NodeStats) EventsMonitor

func (ns *NodeStats) EventsMonitor()

func (*NodeStats) GetTotalNumberOfEntries

func (ns *NodeStats) GetTotalNumberOfEntries() uint64

func (*NodeStats) Log

func (ns *NodeStats) Log(e statEvent)

func (*NodeStats) Shutdown

func (ns *NodeStats) Shutdown()

type TopologyManager

type TopologyManager struct {
	// contains filtered or unexported fields
}

func InitTopologyManager

func InitTopologyManager(clusterManager *ClusterManager) *TopologyManager

func (*TopologyManager) HandlePossibleMasterCrash

func (tm *TopologyManager) HandlePossibleMasterCrash(crashedNodeId string)

func (*TopologyManager) NotifyNodeStartup

func (tm *TopologyManager) NotifyNodeStartup(nodeId string, nodeIp string, nodePort string) (string, int64, error)

func (*TopologyManager) SetNewClusterMaster

func (tm *TopologyManager) SetNewClusterMaster(fromNodeId string) (bool, int64)

func (*TopologyManager) StartAddNewNode

func (tm *TopologyManager) StartAddNewNode(reqFromNodeId string, newNodeId string, newNodeIp string, newNodePort string, numberOfVNodes int)

func (*TopologyManager) StartDecommissionNode

func (tm *TopologyManager) StartDecommissionNode(reqFromNodeId string, nodeId string) error

Directories

Path Synopsis
murmur3
Package murmur3 implements Austin Appleby's non-cryptographic MurmurHash3.
Package murmur3 implements Austin Appleby's non-cryptographic MurmurHash3.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL