Documentation ¶
Overview ¶
* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.
Index ¶
- Constants
- Variables
- func IsPartialError(err error) bool
- func LessByID(n1, n2 *Node) bool
- type Admin
- func (a Admin) AttachNodeToCluster(addr string) error
- func (a Admin) AttachSlaveToMaster(slave *Node, master *Node) error
- func (a Admin) Close()
- func (a Admin) Connections() AdminConnectionsInterface
- func (a Admin) FlushAll()
- func (a Admin) FlushAndReset(addr string, mode string) error
- func (a Admin) ForgetNode(id string) error
- func (a Admin) ForgetNodeByAddr(id string) error
- func (a Admin) GetClusterInfos() (*ClusterInfos, error)
- func (a Admin) InitSubmarineCluster(addr string) error
- func (a Admin) StartFailover(addr string) error
- type AdminConnections
- func (cnx *AdminConnections) Add(addr string) error
- func (cnx *AdminConnections) AddAll(addrs []string)
- func (cnx *AdminConnections) Get(addr string) (ClientInterface, error)
- func (cnx *AdminConnections) GetAll() map[string]ClientInterface
- func (cnx *AdminConnections) GetDifferentFrom(addr string) (ClientInterface, error)
- func (cnx *AdminConnections) GetRandom() (ClientInterface, error)
- func (cnx *AdminConnections) GetSelected(addrs []string) map[string]ClientInterface
- func (cnx *AdminConnections) Reconnect(addr string) error
- func (cnx *AdminConnections) Remove(addr string)
- func (cnx *AdminConnections) ReplaceAll(addrs []string)
- func (cnx *AdminConnections) Reset()
- func (cnx *AdminConnections) Update(addr string) (ClientInterface, error)
- type AdminConnectionsInterface
- type AdminInterface
- type AdminOptions
- type Client
- type ClientInterface
- type Cluster
- type ClusterActionsInfo
- type ClusterInfos
- type ClusterInfosError
- type Error
- type FindNodeFunc
- type Node
- type NodeInfos
- type Nodes
Constants ¶
const ( // ClusterInfosUnset status of the cluster info: no data set ClusterInfosUnset = "Unset" // ClusterInfosPartial status of the cluster info: data is not complete (some nodes didn't respond) ClusterInfosPartial = "Partial" // ClusterInfosInconsistent status of the cluster info: nodesinfos is not consistent between nodes ClusterInfosInconsistent = "Inconsistent" // ClusterInfosConsistent status of the cluster info: nodeinfos is complete and consistent between nodes ClusterInfosConsistent = "Consistent" )
const (
// DefaultSubmarinePort define the default Submarine Port
DefaultSubmarinePort = "8080"
)
const (
// ErrNotFound cannot find a node to connect to
ErrNotFound = "Unable to find a node to connect"
)
Variables ¶
var IsMasterWithNoSlot = func(n *Node) bool { if (n.GetRole() == v1alpha1.SubmarineClusterNodeRoleMaster) && (n.TotalSlots() == 0) { return true } return false }
IsMasterWithNoSlot anonymous function for searching Master Node with no slot
var IsMasterWithSlot = func(n *Node) bool { if (n.GetRole() == v1alpha1.SubmarineClusterNodeRoleMaster) && (n.TotalSlots() > 0) { return true } return false }
IsMasterWithSlot anonymous function for searching Master Node withslot
var IsSlave = func(n *Node) bool { return n.GetRole() == v1alpha1.SubmarineClusterNodeRoleSlave }
IsSlave anonymous function for searching Slave Node
Functions ¶
func IsPartialError ¶
IsPartialError returns true if the error is due to partial data recovery
Types ¶
type Admin ¶
type Admin struct {
// contains filtered or unexported fields
}
Admin wraps submarine cluster admin logic
func (Admin) AttachNodeToCluster ¶
func (Admin) AttachSlaveToMaster ¶
func (Admin) Connections ¶
func (a Admin) Connections() AdminConnectionsInterface
func (Admin) ForgetNode ¶
func (Admin) ForgetNodeByAddr ¶
func (Admin) GetClusterInfos ¶
func (a Admin) GetClusterInfos() (*ClusterInfos, error)
func (Admin) InitSubmarineCluster ¶
func (Admin) StartFailover ¶
type AdminConnections ¶
type AdminConnections struct {
// contains filtered or unexported fields
}
AdminConnections connection map for submarine cluster currently the admin connection is not threadSafe since it is only use in the Events thread.
func (*AdminConnections) Add ¶
func (cnx *AdminConnections) Add(addr string) error
Add connect to the given address and register the client connection to the map
func (*AdminConnections) AddAll ¶
func (cnx *AdminConnections) AddAll(addrs []string)
AddAll connect to the given list of addresses and register them in the map fail silently
func (*AdminConnections) Get ¶
func (cnx *AdminConnections) Get(addr string) (ClientInterface, error)
Get returns a client connection for the given adress, connects if the connection is not in the map yet
func (*AdminConnections) GetAll ¶
func (cnx *AdminConnections) GetAll() map[string]ClientInterface
GetAll returns a map of all clients per address
func (*AdminConnections) GetDifferentFrom ¶
func (cnx *AdminConnections) GetDifferentFrom(addr string) (ClientInterface, error)
GetDifferentFrom returns random a client connection different from given address
func (*AdminConnections) GetRandom ¶
func (cnx *AdminConnections) GetRandom() (ClientInterface, error)
GetRandom returns a client connection to a random node of the client map
func (*AdminConnections) GetSelected ¶
func (cnx *AdminConnections) GetSelected(addrs []string) map[string]ClientInterface
GetSelected returns a map of clients based on the input addresses
func (*AdminConnections) Reconnect ¶
func (cnx *AdminConnections) Reconnect(addr string) error
Reconnect force a reconnection on the given address is the adress is not part of the map, act like Add
func (*AdminConnections) Remove ¶
func (cnx *AdminConnections) Remove(addr string)
Remove disconnect and remove the client connection from the map
func (*AdminConnections) ReplaceAll ¶
func (cnx *AdminConnections) ReplaceAll(addrs []string)
ReplaceAll clear the pool and re-populate it with new connections fail silently
func (*AdminConnections) Reset ¶
func (cnx *AdminConnections) Reset()
Reset close all connections and clear the connection map
func (*AdminConnections) Update ¶
func (cnx *AdminConnections) Update(addr string) (ClientInterface, error)
Update returns a client connection for the given adress, connects if the connection is not in the map yet
type AdminConnectionsInterface ¶
type AdminConnectionsInterface interface { // Add connect to the given address and // register the client connection to the pool Add(addr string) error // Remove disconnect and remove the client connection from the map Remove(addr string) // Get returns a client connection for the given address, // connects if the connection is not in the map yet Get(addr string) (ClientInterface, error) // GetRandom returns a client connection to a random node of the client map GetRandom() (ClientInterface, error) // GetDifferentFrom returns a random client connection different from given address GetDifferentFrom(addr string) (ClientInterface, error) // GetAll returns a map of all clients per address GetAll() map[string]ClientInterface //GetSelected returns a map of clients based on the input addresses GetSelected(addrs []string) map[string]ClientInterface // Reconnect force a reconnection on the given address // if the address is not part of the map, act like Add Reconnect(addr string) error // AddAll connect to the given list of addresses and // register them in the map // fail silently AddAll(addrs []string) // ReplaceAll clear the map and re-populate it with new connections // fail silently ReplaceAll(addrs []string) // ValidateResp check the submarine resp, eventually reconnect on connection error // in case of error, customize the error, log it and return it ///ValidateResp(resp *submarine.Resp, addr, errMessage string) error // ValidatePipeResp wait for all answers in the pipe and validate the response // in case of network issue clear the pipe and return // in case of error return false // ValidatePipeResp(c ClientInterface, addr, errMessage string) bool // Reset close all connections and clear the connection map Reset() }
AdminConnectionsInterface interface representing the map of admin connections to submarine cluster nodes
func NewAdminConnections ¶
func NewAdminConnections(addrs []string, options *AdminOptions) AdminConnectionsInterface
NewAdminConnections returns and instance of AdminConnectionsInterface
type AdminInterface ¶
type AdminInterface interface { // Connections returns the connection map of all clients Connections() AdminConnectionsInterface // Close the admin connections Close() // InitSubmarineCluster used to configure the first node of a cluster InitSubmarineCluster(addr string) error // GetClusterInfos get node infos for all nodes GetClusterInfos() (*ClusterInfos, error) // GetClusterInfosSelected return the Nodes infos for all nodes selected in the cluster //GetClusterInfosSelected(addrs []string) (*ClusterInfos, error) // AttachNodeToCluster command use to connect a Node to the cluster // the connection will be done on a random node part of the connection pool AttachNodeToCluster(addr string) error // AttachSlaveToMaster attach a slave to a master node AttachSlaveToMaster(slave *Node, master *Node) error // DetachSlave detach a slave to its master //DetachSlave(slave *Node) error // StartFailover execute the failover of the Submarine Master corresponding to the addr StartFailover(addr string) error // ForgetNode execute the Submarine command to force the cluster to forgot the the Node ForgetNode(id string) error // ForgetNodeByAddr execute the Submarine command to force the cluster to forgot the the Node ForgetNodeByAddr(id string) error // SetSlots exec the submarine command to set slots in a pipeline, provide // and empty nodeID if the set slots commands doesn't take a nodeID in parameter //SetSlots(addr string, action string, slots []Slot, nodeID string) error // AddSlots exec the submarine command to add slots in a pipeline //AddSlots(addr string, slots []Slot) error // DelSlots exec the submarine command to del slots in a pipeline //DelSlots(addr string, slots []Slot) error // GetKeysInSlot exec the submarine command to get the keys in the given slot on the node we are connected to //GetKeysInSlot(addr string, slot Slot, batch int, limit bool) ([]string, error) // CountKeysInSlot exec the submarine command to count the keys given slot on the node //CountKeysInSlot(addr string, slot Slot) (int64, error) // MigrateKeys from addr to destination node. returns number of slot migrated. If replace is true, replace key on busy error //MigrateKeys(addr string, dest *Node, slots []Slot, batch, timeout int, replace bool) (int, error) // FlushAndReset reset the cluster configuration of the node, the node is flushed in the same pipe to ensure reset works FlushAndReset(addr string, mode string) error // FlushAll flush all keys in cluster FlushAll() }
AdminInterface submarine cluster admin interface
func NewAdmin ¶
func NewAdmin(addrs []string, options *AdminOptions) AdminInterface
NewAdmin returns new AdminInterface instance at the same time it connects to all Submarine Nodes thanks to the addrs list
type AdminOptions ¶
type AdminOptions struct { ConnectionTimeout time.Duration ClientName string RenameCommandsFile string }
AdminOptions optional options for submarine admin
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client structure representing a client connection to submarine
type ClientInterface ¶
type ClientInterface interface { // Close closes the connection. Close() error // PipeClear clears the contents of the current pipeline queue, both commands // queued by PipeAppend which have yet to be sent and responses which have yet // to be retrieved through PipeResp. The first returned int will be the number // of pending commands dropped, the second will be the number of pending // responses dropped PipeClear() (int, int) // GetClusterAddress calls the given Submarine cluster server address list GetClusterAddress() ([]string, error) }
ClientInterface submarine client interface
type Cluster ¶
type Cluster struct { Name string Namespace string Nodes map[string]*Node Status v1.ClusterStatus NodesPlacement v1.NodesPlacementInfo ActionsInfo ClusterActionsInfo }
Cluster represents a Submarine Cluster
type ClusterActionsInfo ¶
type ClusterActionsInfo struct {
NbslotsToMigrate int32
}
ClusterActionsInfo use to store information about current action on the Cluster
type ClusterInfos ¶
ClusterInfos represents the node infos for all nodes of the cluster
func NewClusterInfos ¶
func NewClusterInfos() *ClusterInfos
NewClusterInfos returns an instance of ClusterInfos
func (*ClusterInfos) ComputeStatus ¶
func (c *ClusterInfos) ComputeStatus() bool
ComputeStatus check the ClusterInfos status based on the current data the status ClusterInfosPartial is set while building the clusterinfos if already set, do nothing returns true if consistent or if another error
func (*ClusterInfos) GetNodes ¶
func (c *ClusterInfos) GetNodes() Nodes
GetNodes returns a nodeSlice view of the cluster the slice if formed from how each node see itself you should check the Status before doing it, to wait for a consistent view
type ClusterInfosError ¶
type ClusterInfosError struct {
// contains filtered or unexported fields
}
ClusterInfosError error type for submarine cluster infos access
func NewClusterInfosError ¶
func NewClusterInfosError() ClusterInfosError
NewClusterInfosError returns an instance of cluster infos error
func (ClusterInfosError) Partial ¶
func (e ClusterInfosError) Partial() bool
Partial true if the some nodes of the cluster didn't answer
type FindNodeFunc ¶
FindNodeFunc function for finding a Node it is use as input for GetNodeByFunc and GetNodesByFunc
type Node ¶
type Node struct { ID string IP string Port string Role string LinkState string MasterReferent string FailStatus []string PingSent int64 PongRecv int64 ConfigEpoch int64 ///Slots []Slot ///MigratingSlots map[Slot]string ///ImportingSlots map[Slot]string ServerStartTime time.Time Pod *kapiv1.Pod }
Node Represent a Submarine Node
func NewDefaultNode ¶
func NewDefaultNode() *Node
NewDefaultNode builds and returns new defaultNode instance
func (*Node) GetRole ¶
func (n *Node) GetRole() v1alpha1.SubmarineClusterNodeRole
GetRole return the Submarine Cluster Node GetRole
func (*Node) TotalSlots ¶
TotalSlots return the total number of slot
type NodeInfos ¶
NodeInfos representation of a node info, i.e. data returned by the CLUSTER NODE submarine command Node is the information of the targetted node Friends are the view of the other nodes from the targetted node
func DecodeNodeInfos ¶
DecodeNodeInfos decode from the cmd output the Submarine nodes info. Second argument is the node on which we are connected to request info
type Nodes ¶
type Nodes []*Node
Nodes represent a Node slice
func (Nodes) FilterByFunc ¶
FilterByFunc remove a node from a slice by node ID and returns the slice. If not found, fail silently. Value must be unique
func (Nodes) GetNodesByFunc ¶
func (n Nodes) GetNodesByFunc(f FindNodeFunc) (Nodes, error)
GetNodesByFunc returns first node found by the FindNodeFunc