queue

package
v0.0.0-...-2487acb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2017 License: Apache-2.0 Imports: 21 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AccumulationInfo

type AccumulationInfo struct {
	Group    string `json:"group,omitempty"`
	Queue    string `json:"queue,omitempty"`
	Total    int64  `json:"total,omitempty"`
	Consumed int64  `json:"consumed,omitempty"`
}

type GroupConfig

type GroupConfig struct {
	Group string   `json:"group,omitempty"`
	Queue string   `json:"queue,omitempty"`
	Write bool     `json:"write"`
	Read  bool     `json:"read"`
	Url   string   `json:"url"`
	Ips   []string `json:"ips"`
}

func (*GroupConfig) Load

func (c *GroupConfig) Load(data []byte) error

func (*GroupConfig) String

func (c *GroupConfig) String() string

type GroupInfo

type GroupInfo struct {
	Group  string         `json:"group"`
	Queues []*GroupConfig `json:"queues,omitempty"`
}

func (*GroupInfo) String

func (i *GroupInfo) String() string

type Metadata

type Metadata struct {
	// contains filtered or unexported fields
}

func NewMetadata

func NewMetadata(config *config.Config, sconfig *sarama.Config) (*Metadata, error)

return a new metadata instance

func (*Metadata) Accumulation

func (m *Metadata) Accumulation(queue, group string) (int64, int64, error)

func (*Metadata) AddGroup

func (m *Metadata) AddGroup(group string, queue string,
	write bool, read bool, url string, ips []string) error

add a group to given queue

func (*Metadata) AddQueue

func (m *Metadata) AddQueue(queue string, idcs []string) error

Add a queue by name. if want use multi idc, pass idc names in `idcs`

func (*Metadata) Close

func (m *Metadata) Close()

close and stop metadata

func (*Metadata) DelQueue

func (m *Metadata) DelQueue(queue string) error

Delete a queue by name

func (*Metadata) DeleteGroup

func (m *Metadata) DeleteGroup(group string, queue string) error

delete given group

func (*Metadata) ExistGroup

func (m *Metadata) ExistGroup(queue, group string) bool

Test a group exist

func (*Metadata) ExistQueue

func (m *Metadata) ExistQueue(queue string) bool

Test a queue exist

func (*Metadata) GetBrokerAddrsByIdc

func (m *Metadata) GetBrokerAddrsByIdc(idcs ...string) map[string][]string

func (*Metadata) GetGroupConfig

func (m *Metadata) GetGroupConfig(group string, queue string) (*GroupConfig, error)

func (*Metadata) GetGroupMap

func (m *Metadata) GetGroupMap() map[string][]string

Get queue names of per group

func (*Metadata) GetProxyConfigByID

func (m *Metadata) GetProxyConfigByID(id int) (string, error)

Get a proxy's config

func (*Metadata) GetQueueConfig

func (m *Metadata) GetQueueConfig(queue string) *QueueConfig

没有深拷贝,目前貌似不需要

func (*Metadata) GetQueueInfo

func (m *Metadata) GetQueueInfo(queues ...string) ([]*QueueInfo, error)

TODO 回头修改HTTP API时同时修改返回的数据结构,能够最大化简化逻辑

func (*Metadata) GetQueueMap

func (m *Metadata) GetQueueMap() map[string][]string

Get group names of per queue

func (*Metadata) GetQueues

func (m *Metadata) GetQueues() (queues []string)

Get all queues' name

func (*Metadata) LoadMetrics

func (m *Metadata) LoadMetrics() ([]byte, error)

func (*Metadata) LocalManager

func (m *Metadata) LocalManager() *kafka.Manager

return local IDC kafka manager

func (*Metadata) Proxys

func (m *Metadata) Proxys() (map[string]string, error)

return proxy map[id]host

func (*Metadata) RefreshMetadata

func (m *Metadata) RefreshMetadata() error

refresh metadata from zookeeper

func (*Metadata) RegisterService

func (m *Metadata) RegisterService(id int, data string) error

register service to zookeeper

func (*Metadata) ResetOffset

func (m *Metadata) ResetOffset(queue string, group string, time int64) error

reset given queue-group's offset by time

func (*Metadata) SaveMetrics

func (m *Metadata) SaveMetrics(data string) error

func (*Metadata) UpdateGroupConfig

func (m *Metadata) UpdateGroupConfig(group string, queue string,
	write bool, read bool, url string, ips []string) error

update given group config

type Queue

type Queue interface {
	Create(queue string, idcs []string) error
	Update(queue string) error
	Delete(queue string) error
	Lookup(queue string, group string) ([]*QueueInfo, error)
	AddGroup(group string, queue string, write bool, read bool, url string, ips []string) error
	UpdateGroup(group string, queue string, write bool, read bool, url string, ips []string) error
	DeleteGroup(group string, queue string) error
	LookupGroup(group string) ([]*GroupInfo, error)
	GetSingleGroup(group string, queue string) (*GroupConfig, error)
	SendMessage(queue string, group string, data []byte, flag uint64) (id string, err error)
	RecvMessage(queue string, group string) (id string, data []byte, flag uint64, err error)
	AckMessage(queue string, group string, id string) error
	AccumulationStatus() ([]AccumulationInfo, error)
	Proxys() (map[string]string, error)
	GetProxyConfigByID(id int) (string, error)
	UpTime() int64
	Version() string
	Close()
}

func NewQueue

func NewQueue(config *config.Config, version string) (Queue, error)

type QueueConfig

type QueueConfig struct {
	Queue  string                 `json:"queue"`
	Ctime  int64                  `json:"ctime"`
	Length int64                  `json:"length"`
	Groups map[string]GroupConfig `json:"groups,omitempty"`
	Idcs   []string               `json:"idcs,omitempty"`
}

func (*QueueConfig) Parse

func (q *QueueConfig) Parse(data []byte) error

func (*QueueConfig) String

func (q *QueueConfig) String() string

type QueueInfo

type QueueInfo struct {
	Queue  string        `json:"queue"`
	Ctime  int64         `json:"ctime"`
	Length int64         `json:"length"`
	Groups []GroupConfig `json:"groups,omitempty"`
}

func (*QueueInfo) String

func (i *QueueInfo) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL