zk

package
v0.0.0-...-077b43f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2017 License: Apache-2.0 Imports: 29 Imported by: 38

Documentation

Overview

Package zk is a helper lib that manages kafka cluster meta data and consumer meta data.

Index

Constants

View Source
const (
	KatewayIdsRoot = "/_kateway/ids"

	KatewayMysqlPath = "/_kateway/mysql"

	PubsubJobConfig      = "/_kateway/orchestrator/jobconfig"
	PubsubJobQueues      = "/_kateway/orchestrator/jobs"
	PubsubActors         = "/_kateway/orchestrator/actors/ids"
	PubsubJobQueueOwners = "/_kateway/orchestrator/actors/job_owners"
	PubsubWebhooks       = "/_kateway/orchestrator/webhooks"
	PubsubWebhooksOff    = "/_kateway/orchestrator/webhooks_off"
	PubsubWebhookOwners  = "/_kateway/orchestrator/actors/webhook_owners"

	KguardLeaderPath = "_kguard/leader"

	ConsumersPath           = "/consumers"
	BrokerIdsPath           = "/brokers/ids"
	BrokerTopicsPath        = "/brokers/topics"
	ControllerPath          = "/controller"
	ControllerEpochPath     = "/controller_epoch"
	BrokerSequenceIdPath    = "/brokers/seqid"
	EntityConfigChangesPath = "/config/changes"
	TopicConfigPath         = "/config/topics"
	EntityConfigPath        = "/config"
	DeleteTopicsPath        = "/admin/delete_topics"

	RedisMonPath     = "/redis"
	RedisClusterRoot = "/rediscluster"

	DbusRoot = "/dbus"
)

Variables

View Source
var (
	ErrDupConnect      = errors.New("connect while being connected")
	ErrClaimedByOthers = errors.New("claimed by others")
	ErrNotClaimed      = errors.New("release non-claimed")
)
View Source
var PanicHandler func(interface{})

Functions

func ClusterPath

func ClusterPath(cluster string) string

func DbusCheckpointRoot

func DbusCheckpointRoot(cluster string) string

func DbusClusterRoot

func DbusClusterRoot(cluster string) string

func DbusConfig

func DbusConfig(cluster string) string

func DbusConfigDir

func DbusConfigDir(cluster string) string

func DefaultZkSessionTimeout

func DefaultZkSessionTimeout() time.Duration

func TimestampToTime

func TimestampToTime(ts string) time.Time

Types

type ActorList

type ActorList []string

func (ActorList) Len

func (this ActorList) Len() int

func (ActorList) Less

func (this ActorList) Less(i, j int) bool

func (ActorList) Swap

func (this ActorList) Swap(i, j int)

type BrokerInfo

type BrokerInfo struct {
	Id   int    `json:"id"`
	Host string `json:"host"`
	Port int    `json:"port"`
}

func (*BrokerInfo) Addr

func (this *BrokerInfo) Addr() string

func (*BrokerInfo) NamedAddr

func (this *BrokerInfo) NamedAddr() string

type BrokerZnode

type BrokerZnode struct {
	Id        string   `json:-`
	JmxPort   int      `json:"jmx_port"`
	Timestamp string   `json:"timestamp"`
	Endpoints []string `json:"endpoints"`
	Host      string   `json:"host"`
	Port      int      `json:"port"`
	Version   int      `json:"version"`
}

func (*BrokerZnode) Addr

func (b *BrokerZnode) Addr() string

func (*BrokerZnode) NamedAddr

func (this *BrokerZnode) NamedAddr() (string, bool)

func (BrokerZnode) NamedString

func (b BrokerZnode) NamedString() string

func (BrokerZnode) String

func (b BrokerZnode) String() string

func (*BrokerZnode) Uptime

func (b *BrokerZnode) Uptime() time.Time

type Config

type Config struct {
	Name           string
	ZkAddrs        string
	SessionTimeout time.Duration
	PanicOnError   bool
}

func DefaultConfig

func DefaultConfig(name, addrs string) *Config

func (*Config) ZkServers

func (this *Config) ZkServers() []string

type ConsumerMeta

type ConsumerMeta struct {
	Group          string
	Online         bool
	Topic          string
	PartitionId    string
	Mtime          ZkTimestamp
	ConsumerOffset int64
	OldestOffset   int64
	ProducerOffset int64 // newest offset
	Lag            int64
	ConsumerZnode  *ConsumerZnode
}

type ConsumerZnode

type ConsumerZnode struct {
	Id           string         `json:-`
	Version      int            `json:"version"`
	Subscription map[string]int `json:"subscription"` // topic:count
	Pattern      string         `json:"pattern"`
	Timestamp    interface{}    `json:"timestamp"`
}

func (*ConsumerZnode) ClientRealIP

func (c *ConsumerZnode) ClientRealIP() (ip string)

func (*ConsumerZnode) Host

func (c *ConsumerZnode) Host() string

func (*ConsumerZnode) String

func (c *ConsumerZnode) String() string

func (*ConsumerZnode) Topics

func (c *ConsumerZnode) Topics() []string

func (*ConsumerZnode) Uptime

func (c *ConsumerZnode) Uptime() time.Time

type ControllerMeta

type ControllerMeta struct {
	Broker *BrokerZnode
	Mtime  ZkTimestamp
	Epoch  string
}

func (*ControllerMeta) String

func (c *ControllerMeta) String() string

type EsCluster

type EsCluster struct {
	Name string
	// contains filtered or unexported fields
}

func (*EsCluster) AddNode

func (ec *EsCluster) AddNode(hostPort string) error

func (*EsCluster) FirstBootstrapNode

func (ec *EsCluster) FirstBootstrapNode() string

func (*EsCluster) Nodes

func (ec *EsCluster) Nodes() []string

type KatewayMeta

type KatewayMeta struct {
	Id        string `json:"id"`
	Zone      string `json:"zone"`
	Ver       string `json:"ver"`
	Build     string `json:"build"`
	BuiltAt   string `json:"builtat"`
	Arch      string `json:"arch"`
	Host      string `json:"host"`
	Ip        string `json:"ip"`
	Cpu       string `json:"cpu"`
	PubAddr   string `json:"pub"`
	SPubAddr  string `json:"spub"`
	SubAddr   string `json:"sub"`
	SSubAddr  string `json:"ssub"`
	ManAddr   string `json:"man"`
	SManAddr  string `json:"sman"`
	DebugAddr string `json:"debug"`

	Ctime time.Time `json:"-"`
}

type KguardMeta

type KguardMeta struct {
	Host       string
	Candidates int
	Ctime      time.Time
}

type Orchestrator

type Orchestrator struct {
	*ZkZone
}

func (*Orchestrator) ActorRegistered

func (this *Orchestrator) ActorRegistered(id string) (bool, error)

func (*Orchestrator) ClaimResource

func (this *Orchestrator) ClaimResource(actorId, root, resource string) (err error)

func (*Orchestrator) JobQueueCluster

func (this *Orchestrator) JobQueueCluster(topic string) (string, error)

func (*Orchestrator) RegisterActor

func (this *Orchestrator) RegisterActor(id string, val []byte) error

func (*Orchestrator) ReleaseResource

func (this *Orchestrator) ReleaseResource(actorId, root, resource string) error

func (*Orchestrator) ResignActor

func (this *Orchestrator) ResignActor(id string) error

func (*Orchestrator) WatchActors

func (this *Orchestrator) WatchActors() (ActorList, <-chan zk.Event, error)

func (*Orchestrator) WatchResources

func (this *Orchestrator) WatchResources(path string) (ResourceList, <-chan zk.Event, error)

func (*Orchestrator) WebhookInfo

func (this *Orchestrator) WebhookInfo(topic string) (*WebhookMeta, error)

type PartitionOffset

type PartitionOffset struct {
	Cluster             string
	Topic               string
	Partition           int32
	Offset              int64
	Timestamp           int64
	Group               string
	TopicPartitionCount int
}

type RedisCluster

type RedisCluster struct {
	Name       string          `json:"name"`
	Desciption string          `json:"desc"`
	Members    []RedisInstance `json:"members"`
}

type RedisInstance

type RedisInstance struct {
	Host string `json:"host"`
	Port int    `json:"port"`
}

func (RedisInstance) String

func (ri RedisInstance) String() string

type ResourceList

type ResourceList []string

func (ResourceList) Len

func (this ResourceList) Len() int

func (ResourceList) Less

func (this ResourceList) Less(i, j int) bool

func (ResourceList) Swap

func (this ResourceList) Swap(i, j int)

type TopicConfig

type TopicConfig struct {
	Config TopicConfigInfo `json:"config"`
}

type TopicConfigInfo

type TopicConfigInfo struct {
	RetentionMs string `json:"retention.ms"`
}

func (TopicConfigInfo) RetentionSeconds

func (tci TopicConfigInfo) RetentionSeconds() time.Duration

type TopicConfigMeta

type TopicConfigMeta struct {
	Config string
	Ctime  time.Time
	Mtime  time.Time
}

type TopicZnode

type TopicZnode struct {
	Name       string           `json:-`
	Version    int              `json:"version"`
	Partitions map[string][]int `json:"partitions"` // {partitionId: replicas}
}

type WebhookMeta

type WebhookMeta struct {
	Cluster   string   `json:"cluster"`
	Endpoints []string `json:"endpoints"`
}

func (*WebhookMeta) Bytes

func (this *WebhookMeta) Bytes() []byte

func (*WebhookMeta) From

func (this *WebhookMeta) From(b []byte) error

type ZkCluster

type ZkCluster struct {
	Nickname  string       `json:"nickname"`
	Roster    []BrokerInfo `json:"roster"` // manually registered brokers
	Replicas  int          `json:"replicas"`
	Priority  int          `json:"priority"`
	Public    bool         `json:"public"`
	Retention int          `json:"retention"` // in hours
	// contains filtered or unexported fields
}

ZkCluster is a kafka cluster that has a chroot path in Zookeeper.

func (*ZkCluster) AddTopic

func (this *ZkCluster) AddTopic(topic string, ts *sla.TopicSla) (output []string, err error)

func (*ZkCluster) AlterTopic

func (this *ZkCluster) AlterTopic(topic string, ts *sla.TopicSla) (output []string, err error)

func (*ZkCluster) Broker

func (this *ZkCluster) Broker(id int) (b *BrokerZnode)

func (*ZkCluster) BrokerList

func (this *ZkCluster) BrokerList() []string

func (*ZkCluster) Brokers

func (this *ZkCluster) Brokers() map[string]*BrokerZnode

Returns online {brokerId: broker}.

func (*ZkCluster) Chroot

func (this *ZkCluster) Chroot() string

func (*ZkCluster) ClusterInfoPath

func (this *ZkCluster) ClusterInfoPath() string

func (*ZkCluster) ConfiggedTopics

func (this *ZkCluster) ConfiggedTopics() map[string]TopicConfigMeta

ConfiggedTopics returns topics and theirs configs in zk:/config/topics that have non-default configuration.

func (*ZkCluster) ConsumerGroupOffsetPath

func (this *ZkCluster) ConsumerGroupOffsetPath(group string) string

func (*ZkCluster) ConsumerGroupRoot

func (this *ZkCluster) ConsumerGroupRoot(group string) string

func (*ZkCluster) ConsumerGroups

func (this *ZkCluster) ConsumerGroups() map[string]map[string]*ConsumerZnode

Returns {groupName: {consumerId: consumer}}

func (*ZkCluster) ConsumerGroupsOfTopic

func (this *ZkCluster) ConsumerGroupsOfTopic(topic string) (map[string][]ConsumerMeta, error)

returns {consumerGroup: consumerInfo}

func (*ZkCluster) ConsumerOffsetsOfGroup

func (this *ZkCluster) ConsumerOffsetsOfGroup(group string) map[string]map[string]int64

Returns {topic: {partitionId: offset}}

func (*ZkCluster) ConsumersByGroup

func (this *ZkCluster) ConsumersByGroup(groupPattern string) map[string][]ConsumerMeta

returns {consumerGroup: consumerInfo}

func (*ZkCluster) DeleteTopic

func (this *ZkCluster) DeleteTopic(topic string) (output []string, err error)

func (*ZkCluster) GetTopicConfigPath

func (this *ZkCluster) GetTopicConfigPath(topic string) string

func (*ZkCluster) Isr

func (this *ZkCluster) Isr(topic string, partitionId int32) ([]int, time.Time, time.Time)

func (*ZkCluster) ListChildren

func (this *ZkCluster) ListChildren(recursive bool) ([]string, error)

func (*ZkCluster) Name

func (this *ZkCluster) Name() string

func (*ZkCluster) NamedBrokerList

func (this *ZkCluster) NamedBrokerList() []string

func (*ZkCluster) NamedZkConnectAddr

func (this *ZkCluster) NamedZkConnectAddr() string

func (*ZkCluster) OnlyNamedBrokerList

func (this *ZkCluster) OnlyNamedBrokerList() []string

OnlyNamedBrokerList only returns the brokers that has internal reverse DNS records.

func (*ZkCluster) OwnersOfGroupByTopic

func (this *ZkCluster) OwnersOfGroupByTopic(group, topic string) map[string]string

Returns {partitionId: consumerId} consumerId is /consumers/$group/ids/$consumerId

func (*ZkCluster) Partitions

func (this *ZkCluster) Partitions(topic string) []int32

func (*ZkCluster) RegisterBroker

func (this *ZkCluster) RegisterBroker(id int, host string, port int) error

func (*ZkCluster) RegisteredInfo

func (this *ZkCluster) RegisteredInfo() ZkCluster

Get registered cluster info from zk.

func (*ZkCluster) ResetConsumerGroupOffset

func (this *ZkCluster) ResetConsumerGroupOffset(topic, group, partition string, offset int64) error

func (*ZkCluster) SetNickname

func (this *ZkCluster) SetNickname(name string)

func (*ZkCluster) SetPriority

func (this *ZkCluster) SetPriority(priority int)

func (*ZkCluster) SetPublic

func (this *ZkCluster) SetPublic(public bool)

func (*ZkCluster) SetReplicas

func (this *ZkCluster) SetReplicas(replicas int)

func (*ZkCluster) SetRetention

func (this *ZkCluster) SetRetention(retention int)

func (*ZkCluster) SimpleConsumeKafkaTopic

func (this *ZkCluster) SimpleConsumeKafkaTopic(topic string, msgChan chan<- *sarama.ConsumerMessage) error

func (*ZkCluster) TailMessage

func (this *ZkCluster) TailMessage(topic string, partitionID int32, lastN int) ([][]byte, error)

func (*ZkCluster) TopicConfigInfo

func (this *ZkCluster) TopicConfigInfo(topic string) (tci TopicConfigInfo, err error)

func (*ZkCluster) TopicConfigRoot

func (this *ZkCluster) TopicConfigRoot() string

func (*ZkCluster) Topics

func (this *ZkCluster) Topics() ([]string, error)

func (*ZkCluster) TopicsCtime

func (this *ZkCluster) TopicsCtime() map[string]time.Time

func (*ZkCluster) TotalConsumerOffsets

func (this *ZkCluster) TotalConsumerOffsets(topicPattern string) (total int64)

func (*ZkCluster) UnregisterBroker

func (this *ZkCluster) UnregisterBroker(id int) error

func (*ZkCluster) WatchTopics

func (this *ZkCluster) WatchTopics() ([]string, <-chan zk.Event, error)

func (*ZkCluster) ZkConnectAddr

func (this *ZkCluster) ZkConnectAddr() string

kafka servers.properties zookeeper.connect=

func (*ZkCluster) ZkZone

func (this *ZkCluster) ZkZone() *ZkZone

func (*ZkCluster) ZombieConsumerGroups

func (this *ZkCluster) ZombieConsumerGroups(autofix bool) (groups []string)

type ZkStat

type ZkStat struct {
	Version     string
	Latency     string
	Connections string
	Outstanding string
	Mode        string // S=standalone, L=leader, F=follower
	Znodes      string
	Received    string
	Sent        string
}

func ParseStatResult

func ParseStatResult(s string) (stat ZkStat)

Parse `zk stat` output into ZkStat struct.

type ZkTimestamp

type ZkTimestamp int64

func (ZkTimestamp) Time

func (this ZkTimestamp) Time() time.Time

type ZkZone

type ZkZone struct {
	// contains filtered or unexported fields
}

ZkZone represents a single Zookeeper ensemble where many kafka clusters can reside each of which has a different chroot path.

func NewZkZone

func NewZkZone(config *Config) *ZkZone

NewZkZone creates a new ZkZone instance. All ephemeral nodes and watchers are automatically maintained event after zk connection lost and reconnected.

func (*ZkZone) AddRedis

func (this *ZkZone) AddRedis(host string, port int)

func (*ZkZone) AddRedisCluster

func (this *ZkZone) AddRedisCluster(cluster string, instances []RedisInstance)

func (*ZkZone) AllRedis

func (this *ZkZone) AllRedis() []string

func (*ZkZone) AllRedisClusters

func (this *ZkZone) AllRedisClusters() []RedisCluster

func (*ZkZone) CallSOS

func (this *ZkZone) CallSOS(caller string, msg string)

CallSOS will send SOS message to the zone wide kguard leader.

func (*ZkZone) ChildrenWithData

func (this *ZkZone) ChildrenWithData(path string) map[string]zkData

return {childName: zkData}

func (*ZkZone) Close

func (this *ZkZone) Close()

func (*ZkZone) ClusterPath

func (this *ZkZone) ClusterPath(name string) string

ClusterPath return the zk chroot path of a cluster.

func (*ZkZone) Clusters

func (this *ZkZone) Clusters() map[string]string

returns {clusterName: clusterZkPath}

func (*ZkZone) Conn

func (this *ZkZone) Conn() *zk.Conn

func (*ZkZone) Connect

func (this *ZkZone) Connect() (err error)

func (*ZkZone) CreateDbusCluster

func (this *ZkZone) CreateDbusCluster(name string) error

func (*ZkZone) CreateEphemeralZnode

func (this *ZkZone) CreateEphemeralZnode(path string, data []byte) error

func (*ZkZone) CreateEsCluster

func (this *ZkZone) CreateEsCluster(name string) error

func (*ZkZone) CreateJobQueue

func (this *ZkZone) CreateJobQueue(topic, cluster string) error

func (*ZkZone) CreateOrUpdateWebhook

func (this *ZkZone) CreateOrUpdateWebhook(topic string, hook WebhookMeta) error

func (*ZkZone) CreatePermenantZnode

func (this *ZkZone) CreatePermenantZnode(path string, data []byte) error

func (*ZkZone) DefaultDbusCluster

func (this *ZkZone) DefaultDbusCluster() (cluster string)

func (*ZkZone) DelRedis

func (this *ZkZone) DelRedis(host string, port int)

func (*ZkZone) DelRedisCluster

func (this *ZkZone) DelRedisCluster(cluster string)

func (*ZkZone) DeleteRecursive

func (this *ZkZone) DeleteRecursive(node string) (err error)

func (*ZkZone) DiscoverClusters

func (this *ZkZone) DiscoverClusters(rootPath string) ([]string, error)

DiscoverClusters find all possible kafka clusters.

func (*ZkZone) EnsurePathExists

func (this *ZkZone) EnsurePathExists(path string) error

func (*ZkZone) Errors

func (this *ZkZone) Errors() []error

func (*ZkZone) FlushKatewayMetrics

func (this *ZkZone) FlushKatewayMetrics(katewayId string, key string, data []byte) error

func (*ZkZone) ForSortedBrokers

func (this *ZkZone) ForSortedBrokers(fn func(cluster string, brokers map[string]*BrokerZnode))

func (*ZkZone) ForSortedClusters

func (this *ZkZone) ForSortedClusters(fn func(zkcluster *ZkCluster))

func (*ZkZone) ForSortedControllers

func (this *ZkZone) ForSortedControllers(fn func(cluster string, controller *ControllerMeta))

func (*ZkZone) ForSortedDbusClusters

func (this *ZkZone) ForSortedDbusClusters(fn func(name string, data []byte))

func (*ZkZone) ForSortedEsClusters

func (this *ZkZone) ForSortedEsClusters(fn func(*EsCluster))

func (*ZkZone) HostBelongs

func (this *ZkZone) HostBelongs(hostIp string) (liveClusters, registeredClusters []string)

func (*ZkZone) KatewayInfoById

func (this *ZkZone) KatewayInfoById(id string) *KatewayMeta

func (*ZkZone) KatewayInfos

func (this *ZkZone) KatewayInfos() ([]*KatewayMeta, error)

KatewayInfos return online kateway instances meta sort by id.

func (*ZkZone) KatewayJobClusterConfig

func (this *ZkZone) KatewayJobClusterConfig() (data []byte, err error)

func (*ZkZone) KatewayMysqlDsn

func (this *ZkZone) KatewayMysqlDsn() (string, error)

func (*ZkZone) KguardInfos

func (this *ZkZone) KguardInfos() ([]*KguardMeta, error)

func (*ZkZone) LoadKatewayMetrics

func (this *ZkZone) LoadKatewayMetrics(katewayId string, key string) ([]byte, error)

func (*ZkZone) Name

func (this *ZkZone) Name() string

Name of the zone.

func (*ZkZone) NewCluster

func (this *ZkZone) NewCluster(cluster string) *ZkCluster

func (*ZkZone) NewEsCluster

func (this *ZkZone) NewEsCluster(name string) *EsCluster

func (*ZkZone) NewOrchestrator

func (this *ZkZone) NewOrchestrator() *Orchestrator

func (*ZkZone) NewclusterWithPath

func (this *ZkZone) NewclusterWithPath(cluster, path string) *ZkCluster

func (*ZkZone) Ping

func (this *ZkZone) Ping() error

func (*ZkZone) PublicClusters

func (this *ZkZone) PublicClusters() []*ZkCluster

func (*ZkZone) RegisterCluster

func (this *ZkZone) RegisterCluster(name, path string) error

func (*ZkZone) ResetErrors

func (this *ZkZone) ResetErrors()

func (*ZkZone) RunZkFourLetterCommand

func (this *ZkZone) RunZkFourLetterCommand(cmd string) map[string]string

Returns {zkHost: outputLines}

func (*ZkZone) SessionEvents

func (this *ZkZone) SessionEvents() (<-chan zk.Event, bool)

SessionEvents returns zk connection events.

func (*ZkZone) SessionTimeout

func (this *ZkZone) SessionTimeout() time.Duration

func (*ZkZone) ZkAddrList

func (this *ZkZone) ZkAddrList() []string

func (*ZkZone) ZkAddrs

func (this *ZkZone) ZkAddrs() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL