gzkwrapper

package
v0.0.0-...-385f941 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2018 License: Apache-2.0 Imports: 10 Imported by: 31

Documentation

Index

Constants

View Source
const RetryInterval time.Duration = time.Second * 5

Variables

View Source
var (
	ErrKeyInvalid      = errors.New("key invalid.")
	ErrArgsInvalid     = errors.New("args invalid.")
	ErrNodeIsNull      = errors.New("node is nil.")
	ErrNodeConnInvalid = errors.New("node conn invalid.")
)

Functions

func ReleaseWatchObject

func ReleaseWatchObject(wo *WatchObject)

func CreateWatchObject(path string, conn *zk.Conn, callback WatchHandlerFunc) *WatchObject {

	if conn == nil {
		return nil
	}

	watchobject := &WatchObject{
		Path: path,
		exit: make(chan bool),
	}

	go func(wo *WatchObject, c *zk.Conn, fn WatchHandlerFunc) {
		listen := true
	NEW_WATCH:
		for listen {
			ret, _, ev, err := c.ExistsW(wo.Path)
			if err != nil {
				if callback != nil {
					callback(wo.Path, nil, err)
				}
				time.Sleep(RetryInterval)
				goto NEW_WATCH
			}

			select {
			case <-ev:
				{
					if ret {
						data, _, err := c.Get(wo.Path)
						if err != nil {
							if callback != nil {
								callback(wo.Path, nil, err)
							}
							time.Sleep(RetryInterval)
							goto NEW_WATCH
						}
						if callback != nil {
							callback(wo.Path, data, nil)
						}
					} else {
						if callback != nil {
							callback(wo.Path, nil, errors.New("watch exists not found."))
						}
						time.Sleep(RetryInterval)
						goto NEW_WATCH
					}
				}
			case <-wo.exit:
				{
					listen = false
				}
			}
		}
	}(watchobject, conn, callback)
	return watchobject
}

Types

type BaseNode

type BaseNode struct {
	NodeType `json:"type"` //节点类型
	HostName string        `json:"hostname"` //主机名称
}

type INodeNotifyHandler

type INodeNotifyHandler interface {
	OnZkWrapperPulseHandlerFunc(key string, nodedata *NodeData, err error)
	OnZkWrapperNodeHandlerFunc(nodestore *NodeStore)
}

type Node

type Node struct {
	Hosts []string
	Conn  *zk.Conn
	// contains filtered or unexported fields
}

func NewNode

func NewNode(hosts string) *Node

func (*Node) Children

func (n *Node) Children(path string) ([]string, error)

func (*Node) Close

func (n *Node) Close()

func (*Node) Create

func (n *Node) Create(path string, buffer []byte) error

func (*Node) Exists

func (n *Node) Exists(path string) (bool, error)

func (*Node) Get

func (n *Node) Get(path string) ([]byte, error)

func (*Node) Open

func (n *Node) Open() error

func (*Node) Remove

func (n *Node) Remove(path string) error

func (*Node) Server

func (n *Node) Server() string

func (*Node) Set

func (n *Node) Set(path string, buffer []byte) error

func (*Node) State

func (n *Node) State() string

func (*Node) WatchClose

func (n *Node) WatchClose(path string)

func (*Node) WatchOpen

func (n *Node) WatchOpen(path string, callback WatchHandlerFunc) error

type NodeData

type NodeData struct {
	BaseNode
	DataCenter string `json:"datacenter"` //数据中心名称(一般为调度服务器位置)
	Location   string `json:"location"`   //节点分区位置
	OS         string `json:"os"`         //节点系统
	Platform   string `json:"platform"`   //节点平台
	IpAddr     string `json:"ipaddr"`     //网络地址
	APIAddr    string `json:"apiaddr"`    //节点API
	ProcessId  int    `json:"pid"`        //节点进程号
	Singin     bool   `json:"singin"`     //签到状态
	Timestamp  int64  `json:"timestamp"`  //心跳时间戳
	Alivestamp int64  `json:"alivestamp"` //存活时间戳,若发生改变则代表节点已重启
	Attach     []byte `json:"attach"`     //附加数据
}

func NewNodeData

func NewNodeData(nodetype NodeType, hostname string, datacenter string, location string,
	os string, platform string, ipaddr string, apiaddr string, processid int) *NodeData

type NodeHandlerFunc

type NodeHandlerFunc func(nodestore *NodeStore)

func (NodeHandlerFunc) OnZkWrapperNodeHandlerFunc

func (fn NodeHandlerFunc) OnZkWrapperNodeHandlerFunc(nodestore *NodeStore)

type NodeMapper

type NodeMapper struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewNodeMapper

func NewNodeMapper() *NodeMapper

func (*NodeMapper) Append

func (mapper *NodeMapper) Append(key string, value *NodeData) int

func (*NodeMapper) Clear

func (mapper *NodeMapper) Clear()

func (*NodeMapper) Contains

func (mapper *NodeMapper) Contains(key string) bool

func (*NodeMapper) Copy

func (mapper *NodeMapper) Copy(m map[string]*NodeData)

func (*NodeMapper) Count

func (mapper *NodeMapper) Count() int

func (*NodeMapper) Get

func (mapper *NodeMapper) Get(key string) *NodeData

func (*NodeMapper) GetKeys

func (mapper *NodeMapper) GetKeys() []string

func (*NodeMapper) GetNodes

func (mapper *NodeMapper) GetNodes(location string, ipaddr string, hostname string) NodesPair

func (*NodeMapper) Remove

func (mapper *NodeMapper) Remove(key string) int

func (*NodeMapper) Set

func (mapper *NodeMapper) Set(key string, value *NodeData) int

type NodeStore

type NodeStore struct {
	New      NodesPair
	Dead     NodesPair
	Recovery NodesPair
}

func NewNodeStore

func NewNodeStore() *NodeStore

func (*NodeStore) DeadTotalSize

func (nodestore *NodeStore) DeadTotalSize() int

func (*NodeStore) NewTotalSize

func (nodestore *NodeStore) NewTotalSize() int

func (*NodeStore) RecoveryTotalSize

func (nodestore *NodeStore) RecoveryTotalSize() int

func (*NodeStore) TotalSize

func (nodestore *NodeStore) TotalSize() int

type NodeType

type NodeType int
const (
	NODE_SERVER NodeType = iota + 1 //服务节点
	NODE_WORKER                     //工作节点
)

func (NodeType) String

func (t NodeType) String() string

type NodesPair

type NodesPair map[string]*NodeData

type PulseHandlerFunc

type PulseHandlerFunc func(key string, nodedata *NodeData, err error)

func (PulseHandlerFunc) OnZkWrapperPulseHandlerFunc

func (fn PulseHandlerFunc) OnZkWrapperPulseHandlerFunc(key string, nodedata *NodeData, err error)

type Server

type Server struct {
	Key        string
	Root       string
	Pulse      time.Duration
	TimeoutSec float64
	Node       *Node
	Data       *NodeData
	Cache      *NodeMapper

	Blacklist *SuspicionMapper
	Handler   INodeNotifyHandler
	// contains filtered or unexported fields
}

func NewServer

func NewServer(key string, args *ServerArgs, handler INodeNotifyHandler) (*Server, error)

func (*Server) Children

func (s *Server) Children(path string) ([]string, error)

func (*Server) Clear

func (s *Server) Clear()

func (*Server) Close

func (s *Server) Close() error

func (*Server) Create

func (s *Server) Create(path string, buffer []byte) error

func (*Server) Exists

func (s *Server) Exists(path string) (bool, error)

func (*Server) Get

func (s *Server) Get(path string) ([]byte, error)

func (*Server) GetLocation

func (s *Server) GetLocation() string

func (*Server) GetNodes

func (s *Server) GetNodes(location string, ipaddr string, hostname string) NodesPair

func (*Server) GetOS

func (s *Server) GetOS() string

func (*Server) GetPlatform

func (s *Server) GetPlatform() string

func (*Server) Open

func (s *Server) Open() error

func (*Server) RefreshCache

func (s *Server) RefreshCache() error

func (*Server) Remove

func (s *Server) Remove(path string) error

func (*Server) Server

func (s *Server) Server() string

func (*Server) Set

func (s *Server) Set(path string, buffer []byte) error

func (*Server) SetPulse

func (s *Server) SetPulse(value string) error

func (*Server) State

func (s *Server) State() string

func (*Server) WatchClose

func (w *Server) WatchClose(path string)

func (*Server) WatchOpen

func (w *Server) WatchOpen(path string, callback WatchHandlerFunc) error

type ServerArgs

type ServerArgs struct {
	Hosts      string
	Root       string
	Device     string
	DataCenter string
	Location   string
	OS         string
	Platform   string
	APIAddr    string
	Pulse      string
	Timeout    string
	Threshold  int
}

type SuspicionMapper

type SuspicionMapper struct {
	// contains filtered or unexported fields
}

func NewSuspicionMapper

func NewSuspicionMapper() *SuspicionMapper

func (*SuspicionMapper) Add

func (mapper *SuspicionMapper) Add(key string) int

func (*SuspicionMapper) Clear

func (mapper *SuspicionMapper) Clear()

func (*SuspicionMapper) Del

func (mapper *SuspicionMapper) Del(key string) int

func (*SuspicionMapper) Get

func (mapper *SuspicionMapper) Get(key string) int64

type WatchHandlerFunc

type WatchHandlerFunc func(path string, data []byte, err error)

type WatchObject

type WatchObject struct {
	Path string
	// contains filtered or unexported fields
}

func CreateWatchObject

func CreateWatchObject(path string, conn *zk.Conn, callback WatchHandlerFunc) *WatchObject

type Worker

type Worker struct {
	Key     string
	Root    string
	Path    string
	Pulse   time.Duration
	Node    *Node
	Data    *NodeData
	Handler INodeNotifyHandler
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(key string, args *WorkerArgs, handler INodeNotifyHandler) (*Worker, error)

func (*Worker) Children

func (w *Worker) Children(path string) ([]string, error)

func (*Worker) Close

func (w *Worker) Close() error

func (*Worker) Create

func (w *Worker) Create(path string, buffer []byte) error

func (*Worker) Exists

func (w *Worker) Exists(path string) (bool, error)

func (*Worker) Get

func (w *Worker) Get(path string) ([]byte, error)

func (*Worker) GetLocation

func (w *Worker) GetLocation() string

func (*Worker) GetOS

func (w *Worker) GetOS() string

func (*Worker) GetPlatform

func (w *Worker) GetPlatform() string

func (*Worker) Open

func (w *Worker) Open() error

func (*Worker) Remove

func (w *Worker) Remove(path string) error

func (*Worker) Server

func (w *Worker) Server() string

func (*Worker) Set

func (w *Worker) Set(path string, buffer []byte) error

func (*Worker) SetAttach

func (w *Worker) SetAttach(attach []byte)

func (*Worker) SetPulse

func (w *Worker) SetPulse(value string) error

func (*Worker) Signin

func (w *Worker) Signin(attach []byte) error

func (*Worker) Signout

func (w *Worker) Signout() error

func (*Worker) State

func (w *Worker) State() string

func (*Worker) WatchClose

func (w *Worker) WatchClose(path string)

func (*Worker) WatchOpen

func (w *Worker) WatchOpen(path string, callback WatchHandlerFunc) error

type WorkerArgs

type WorkerArgs struct {
	Hosts      string
	Root       string
	Device     string
	DataCenter string
	Location   string
	OS         string
	Platform   string
	APIAddr    string
	Pulse      string
	Threshold  int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL