coordinator

package module
v0.0.0-...-9f87cd9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2020 License: Apache-2.0 Imports: 9 Imported by: 0

README

基于 zk 组件实现的去中心化的任务调度工具

目的

服务宕机最所难免,无状态服务,比如web 服务,rpc 服务等,可以通过dubbo,服务代理的方式解决服务不可用的问题。 但是有状态的服务,如何能解决单点问题,是此工具解决的问题。

设计思路:

依赖 zookeeper,实现服务之间的调度和通信。 选择一个为主节点,并负责做服务异常监听。当主节点宕机,其他节点选择一个为主节点。

Zookeeper 目录结构
  • ACK
    • node1
    • node2
    • ...
  • BROADCAST
  • LOCK_COORDINATOR
  • REGISTER_CENTER
    • node1 【临时节点】
    • node2 【临时节点】
    • ...
  • LOCK_PATH_INIT

服务启动

  • 创建服务所需 Zookeeper 节点
  • 启动协程 监听服务广播节点数据的变更
  • 启动协程 获取协调者的锁
  • 添加临时节点到注册中心

服务关闭

  • 关闭协程
  • 关闭相应 zk 连接

消息处理

  • 节点增加
  • 节点删除
  • ReBalance
    • 获取当前可使用的计算节点
    • 计算Rebalance 后的 Sharding 与 Rebalance 之前的数据是否一致。若一致则无需ReBalance。
    • 数据不一致,则广播暂停所有正在运行的消息。
    • 确认消息已经广播完成,广播新 Sharding 数据

设计思考

  • sharding 版本。在sharding 中加入版本,防止修改了shard后,执行节点仍然使用从zk中获取的分片执行。
  • ReBalance 需要先暂停后开启。防止任务接收消息不一致,导致任务被重复执行。
  • 在 ack 节点上,增加了广播的 version,防止有些节点访问慢,导致获取到的 response 不一致。

Documentation

Index

Constants

View Source
const ZKPathACK = "/ACK"
View Source
const ZkPathBc = "/BROADCAST"
View Source
const ZkPathCoordinateLock = "/COORDINATOR"
View Source
const ZkPathRc = "/REGISTER_CENTER"
View Source
const ZkPathVersion = "/PATH_INIT"

Variables

Functions

This section is empty.

Types

type Event

type Event struct {
	NewSharding Sharding
	Resp        chan None
	Status      JobStatus
}

type JobStatus

type JobStatus int8
const (
	RUNNING JobStatus = iota
	STOP
)

type Node

type Node struct {
	Id             string        // 标记一个节点
	ZkPath         ZkPath        // 项目zookeeper 路径
	Sharding       Sharding      // 定义的Sharding, Job 也由此传入
	WaitAckTimeout time.Duration // 等待ack 超时时间, 默认5s
	// contains filtered or unexported fields
}

Using Node, just create a Node struct, and call start method.

func (*Node) Close

func (node *Node) Close()

服务关闭

func (*Node) Listener

func (node *Node) Listener() chan Event

func (*Node) Start

func (node *Node) Start(zkServers []string, timeout time.Duration) (err error)

start method will create a coordinator, listen coordinator broadcast. strategy must be implement by user.

type None

type None struct{}

type Sharding

type Sharding interface {
	Encode() []byte               // 编码和解码,保存于zk broadcast中
	Decode([]byte) Sharding       // 需要函数返回 (用于前后diff 版本)
	Equal(sharding Sharding) bool // diff sharding 是否相同, 如果相同则不再reBalance
	Version() string              // 版本, 若更新了shard内容,则需要更新版本
	ReBalance([]string) Sharding  // 重新分配任务
}

Sharding need convert to []byte, save at zookeeper response node.

type SimpleSharding

type SimpleSharding struct {
	Data map[string][]int
	Ver  string
}

一个简单的Sharding的实现,包含了版本信息和Sharding 的数据信息

func (*SimpleSharding) Decode

func (sharding *SimpleSharding) Decode(data []byte) Sharding

func (*SimpleSharding) Encode

func (sharding *SimpleSharding) Encode() []byte

func (*SimpleSharding) Equal

func (sharding *SimpleSharding) Equal(sharding2 Sharding) bool

func (*SimpleSharding) ReBalance

func (sharding *SimpleSharding) ReBalance(currentLiveNodes []string) Sharding

func (*SimpleSharding) Version

func (sharding *SimpleSharding) Version() string

type ZkPath

type ZkPath string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL