submarine

package
v0.0.0-...-6265879 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2021 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

Index

Constants

View Source
const (
	// ClusterInfosUnset status of the cluster info: no data set
	ClusterInfosUnset = "Unset"
	// ClusterInfosPartial status of the cluster info: data is not complete (some nodes didn't respond)
	ClusterInfosPartial = "Partial"
	// ClusterInfosInconsistent status of the cluster info: nodesinfos is not consistent between nodes
	ClusterInfosInconsistent = "Inconsistent"
	// ClusterInfosConsistent status of the cluster info: nodeinfos is complete and consistent between nodes
	ClusterInfosConsistent = "Consistent"
)
View Source
const (
	// DefaultSubmarinePort define the default Submarine Port
	DefaultSubmarinePort = "8080"
)
View Source
const (

	// ErrNotFound cannot find a node to connect to
	ErrNotFound = "Unable to find a node to connect"
)

Variables

View Source
var IsMasterWithNoSlot = func(n *Node) bool {
	if (n.GetRole() == v1alpha1.SubmarineClusterNodeRoleMaster) && (n.TotalSlots() == 0) {
		return true
	}
	return false
}

IsMasterWithNoSlot anonymous function for searching Master Node with no slot

View Source
var IsMasterWithSlot = func(n *Node) bool {
	if (n.GetRole() == v1alpha1.SubmarineClusterNodeRoleMaster) && (n.TotalSlots() > 0) {
		return true
	}
	return false
}

IsMasterWithSlot anonymous function for searching Master Node withslot

View Source
var IsSlave = func(n *Node) bool {
	return n.GetRole() == v1alpha1.SubmarineClusterNodeRoleSlave
}

IsSlave anonymous function for searching Slave Node

Functions

func IsPartialError

func IsPartialError(err error) bool

IsPartialError returns true if the error is due to partial data recovery

func LessByID

func LessByID(n1, n2 *Node) bool

LessByID compare 2 Nodes with there ID

Types

type Admin

type Admin struct {
	// contains filtered or unexported fields
}

Admin wraps submarine cluster admin logic

func (Admin) AttachNodeToCluster

func (a Admin) AttachNodeToCluster(addr string) error

func (Admin) AttachSlaveToMaster

func (a Admin) AttachSlaveToMaster(slave *Node, master *Node) error

func (Admin) Close

func (a Admin) Close()

func (Admin) Connections

func (a Admin) Connections() AdminConnectionsInterface

func (Admin) FlushAll

func (a Admin) FlushAll()

func (Admin) FlushAndReset

func (a Admin) FlushAndReset(addr string, mode string) error

func (Admin) ForgetNode

func (a Admin) ForgetNode(id string) error

func (Admin) ForgetNodeByAddr

func (a Admin) ForgetNodeByAddr(id string) error

func (Admin) GetClusterInfos

func (a Admin) GetClusterInfos() (*ClusterInfos, error)

func (Admin) InitSubmarineCluster

func (a Admin) InitSubmarineCluster(addr string) error

func (Admin) StartFailover

func (a Admin) StartFailover(addr string) error

type AdminConnections

type AdminConnections struct {
	// contains filtered or unexported fields
}

AdminConnections connection map for submarine cluster currently the admin connection is not threadSafe since it is only use in the Events thread.

func (*AdminConnections) Add

func (cnx *AdminConnections) Add(addr string) error

Add connect to the given address and register the client connection to the map

func (*AdminConnections) AddAll

func (cnx *AdminConnections) AddAll(addrs []string)

AddAll connect to the given list of addresses and register them in the map fail silently

func (*AdminConnections) Get

func (cnx *AdminConnections) Get(addr string) (ClientInterface, error)

Get returns a client connection for the given adress, connects if the connection is not in the map yet

func (*AdminConnections) GetAll

func (cnx *AdminConnections) GetAll() map[string]ClientInterface

GetAll returns a map of all clients per address

func (*AdminConnections) GetDifferentFrom

func (cnx *AdminConnections) GetDifferentFrom(addr string) (ClientInterface, error)

GetDifferentFrom returns random a client connection different from given address

func (*AdminConnections) GetRandom

func (cnx *AdminConnections) GetRandom() (ClientInterface, error)

GetRandom returns a client connection to a random node of the client map

func (*AdminConnections) GetSelected

func (cnx *AdminConnections) GetSelected(addrs []string) map[string]ClientInterface

GetSelected returns a map of clients based on the input addresses

func (*AdminConnections) Reconnect

func (cnx *AdminConnections) Reconnect(addr string) error

Reconnect force a reconnection on the given address is the adress is not part of the map, act like Add

func (*AdminConnections) Remove

func (cnx *AdminConnections) Remove(addr string)

Remove disconnect and remove the client connection from the map

func (*AdminConnections) ReplaceAll

func (cnx *AdminConnections) ReplaceAll(addrs []string)

ReplaceAll clear the pool and re-populate it with new connections fail silently

func (*AdminConnections) Reset

func (cnx *AdminConnections) Reset()

Reset close all connections and clear the connection map

func (*AdminConnections) Update

func (cnx *AdminConnections) Update(addr string) (ClientInterface, error)

Update returns a client connection for the given adress, connects if the connection is not in the map yet

type AdminConnectionsInterface

type AdminConnectionsInterface interface {
	// Add connect to the given address and
	// register the client connection to the pool
	Add(addr string) error
	// Remove disconnect and remove the client connection from the map
	Remove(addr string)
	// Get returns a client connection for the given address,
	// connects if the connection is not in the map yet
	Get(addr string) (ClientInterface, error)
	// GetRandom returns a client connection to a random node of the client map
	GetRandom() (ClientInterface, error)
	// GetDifferentFrom returns a random client connection different from given address
	GetDifferentFrom(addr string) (ClientInterface, error)
	// GetAll returns a map of all clients per address
	GetAll() map[string]ClientInterface
	//GetSelected returns a map of clients based on the input addresses
	GetSelected(addrs []string) map[string]ClientInterface
	// Reconnect force a reconnection on the given address
	// if the address is not part of the map, act like Add
	Reconnect(addr string) error
	// AddAll connect to the given list of addresses and
	// register them in the map
	// fail silently
	AddAll(addrs []string)
	// ReplaceAll clear the map and re-populate it with new connections
	// fail silently
	ReplaceAll(addrs []string)
	// ValidateResp check the submarine resp, eventually reconnect on connection error
	// in case of error, customize the error, log it and return it
	///ValidateResp(resp *submarine.Resp, addr, errMessage string) error
	// ValidatePipeResp wait for all answers in the pipe and validate the response
	// in case of network issue clear the pipe and return
	// in case of error return false
	// ValidatePipeResp(c ClientInterface, addr, errMessage string) bool
	// Reset close all connections and clear the connection map
	Reset()
}

AdminConnectionsInterface interface representing the map of admin connections to submarine cluster nodes

func NewAdminConnections

func NewAdminConnections(addrs []string, options *AdminOptions) AdminConnectionsInterface

NewAdminConnections returns and instance of AdminConnectionsInterface

type AdminInterface

type AdminInterface interface {
	// Connections returns the connection map of all clients
	Connections() AdminConnectionsInterface
	// Close the admin connections
	Close()
	// InitSubmarineCluster used to configure the first node of a cluster
	InitSubmarineCluster(addr string) error
	// GetClusterInfos get node infos for all nodes
	GetClusterInfos() (*ClusterInfos, error)
	// GetClusterInfosSelected return the Nodes infos for all nodes selected in the cluster
	//GetClusterInfosSelected(addrs []string) (*ClusterInfos, error)
	// AttachNodeToCluster command use to connect a Node to the cluster
	// the connection will be done on a random node part of the connection pool
	AttachNodeToCluster(addr string) error
	// AttachSlaveToMaster attach a slave to a master node
	AttachSlaveToMaster(slave *Node, master *Node) error
	// DetachSlave detach a slave to its master
	//DetachSlave(slave *Node) error
	// StartFailover execute the failover of the Submarine Master corresponding to the addr
	StartFailover(addr string) error
	// ForgetNode execute the Submarine command to force the cluster to forgot the the Node
	ForgetNode(id string) error
	// ForgetNodeByAddr execute the Submarine command to force the cluster to forgot the the Node
	ForgetNodeByAddr(id string) error
	// SetSlots exec the submarine command to set slots in a pipeline, provide
	// and empty nodeID if the set slots commands doesn't take a nodeID in parameter
	//SetSlots(addr string, action string, slots []Slot, nodeID string) error
	// AddSlots exec the submarine command to add slots in a pipeline
	//AddSlots(addr string, slots []Slot) error
	// DelSlots exec the submarine command to del slots in a pipeline
	//DelSlots(addr string, slots []Slot) error
	// GetKeysInSlot exec the submarine command to get the keys in the given slot on the node we are connected to
	//GetKeysInSlot(addr string, slot Slot, batch int, limit bool) ([]string, error)
	// CountKeysInSlot exec the submarine command to count the keys given slot on the node
	//CountKeysInSlot(addr string, slot Slot) (int64, error)
	// MigrateKeys from addr to destination node. returns number of slot migrated. If replace is true, replace key on busy error
	//MigrateKeys(addr string, dest *Node, slots []Slot, batch, timeout int, replace bool) (int, error)
	// FlushAndReset reset the cluster configuration of the node, the node is flushed in the same pipe to ensure reset works
	FlushAndReset(addr string, mode string) error
	// FlushAll flush all keys in cluster
	FlushAll()
}

AdminInterface submarine cluster admin interface

func NewAdmin

func NewAdmin(addrs []string, options *AdminOptions) AdminInterface

NewAdmin returns new AdminInterface instance at the same time it connects to all Submarine Nodes thanks to the addrs list

type AdminOptions

type AdminOptions struct {
	ConnectionTimeout  time.Duration
	ClientName         string
	RenameCommandsFile string
}

AdminOptions optional options for submarine admin

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client structure representing a client connection to submarine

func (*Client) GetClusterAddress

func (c *Client) GetClusterAddress(host string) ([]string, error)

GetClusterAddress calls the given Submarine cluster server address list.

type ClientInterface

type ClientInterface interface {
	// Close closes the connection.
	Close() error

	// PipeClear clears the contents of the current pipeline queue, both commands
	// queued by PipeAppend which have yet to be sent and responses which have yet
	// to be retrieved through PipeResp. The first returned int will be the number
	// of pending commands dropped, the second will be the number of pending
	// responses dropped
	PipeClear() (int, int)

	// GetClusterAddress calls the given Submarine cluster server address list
	GetClusterAddress() ([]string, error)
}

ClientInterface submarine client interface

func NewClient

func NewClient(addr string, cnxTimeout time.Duration, commandsMapping map[string]string) (ClientInterface, error)

NewClient build a client connection and connect to a submarine address

type Cluster

type Cluster struct {
	Name           string
	Namespace      string
	Nodes          map[string]*Node
	Status         v1.ClusterStatus
	NodesPlacement v1.NodesPlacementInfo
	ActionsInfo    ClusterActionsInfo
}

Cluster represents a Submarine Cluster

func (*Cluster) GetNodeByID

func (c *Cluster) GetNodeByID(id string) (*Node, error)

GetNodeByID returns a Cluster Node by its ID if not present in the cluster return an error

type ClusterActionsInfo

type ClusterActionsInfo struct {
	NbslotsToMigrate int32
}

ClusterActionsInfo use to store information about current action on the Cluster

type ClusterInfos

type ClusterInfos struct {
	Infos  map[string]*NodeInfos
	Status string
}

ClusterInfos represents the node infos for all nodes of the cluster

func NewClusterInfos

func NewClusterInfos() *ClusterInfos

NewClusterInfos returns an instance of ClusterInfos

func (*ClusterInfos) ComputeStatus

func (c *ClusterInfos) ComputeStatus() bool

ComputeStatus check the ClusterInfos status based on the current data the status ClusterInfosPartial is set while building the clusterinfos if already set, do nothing returns true if consistent or if another error

func (*ClusterInfos) GetNodes

func (c *ClusterInfos) GetNodes() Nodes

GetNodes returns a nodeSlice view of the cluster the slice if formed from how each node see itself you should check the Status before doing it, to wait for a consistent view

type ClusterInfosError

type ClusterInfosError struct {
	// contains filtered or unexported fields
}

ClusterInfosError error type for submarine cluster infos access

func NewClusterInfosError

func NewClusterInfosError() ClusterInfosError

NewClusterInfosError returns an instance of cluster infos error

func (ClusterInfosError) Error

func (e ClusterInfosError) Error() string

Error error string

func (ClusterInfosError) Partial

func (e ClusterInfosError) Partial() bool

Partial true if the some nodes of the cluster didn't answer

type Error

type Error string

Error used to represent an error

func (Error) Error

func (e Error) Error() string

type FindNodeFunc

type FindNodeFunc func(node *Node) bool

FindNodeFunc function for finding a Node it is use as input for GetNodeByFunc and GetNodesByFunc

type Node

type Node struct {
	ID             string
	IP             string
	Port           string
	Role           string
	LinkState      string
	MasterReferent string
	FailStatus     []string
	PingSent       int64
	PongRecv       int64
	ConfigEpoch    int64
	///Slots           []Slot
	///MigratingSlots  map[Slot]string
	///ImportingSlots  map[Slot]string
	ServerStartTime time.Time

	Pod *kapiv1.Pod
}

Node Represent a Submarine Node

func NewDefaultNode

func NewDefaultNode() *Node

NewDefaultNode builds and returns new defaultNode instance

func (*Node) GetRole

func (n *Node) GetRole() v1alpha1.SubmarineClusterNodeRole

GetRole return the Submarine Cluster Node GetRole

func (*Node) IPPort

func (n *Node) IPPort() string

IPPort returns join Ip Port string

func (*Node) TotalSlots

func (n *Node) TotalSlots() int

TotalSlots return the total number of slot

type NodeInfos

type NodeInfos struct {
	Node    *Node
	Friends Nodes
}

NodeInfos representation of a node info, i.e. data returned by the CLUSTER NODE submarine command Node is the information of the targetted node Friends are the view of the other nodes from the targetted node

func DecodeNodeInfos

func DecodeNodeInfos(input *string, addr string) *NodeInfos

DecodeNodeInfos decode from the cmd output the Submarine nodes info. Second argument is the node on which we are connected to request info

func NewNodeInfos

func NewNodeInfos() *NodeInfos

NewNodeInfos returns an instance of NodeInfo

type Nodes

type Nodes []*Node

Nodes represent a Node slice

func (Nodes) FilterByFunc

func (n Nodes) FilterByFunc(fn func(*Node) bool) Nodes

FilterByFunc remove a node from a slice by node ID and returns the slice. If not found, fail silently. Value must be unique

func (Nodes) GetNodesByFunc

func (n Nodes) GetNodesByFunc(f FindNodeFunc) (Nodes, error)

GetNodesByFunc returns first node found by the FindNodeFunc

func (Nodes) SortByFunc

func (n Nodes) SortByFunc(less func(*Node, *Node) bool) Nodes

SortByFunc returns a new ordered NodeSlice, determined by a func defining ‘less’.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL