consumer

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2023 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SemanticAtMostOnce  = "atMostOnce"
	SemanticAtLeastOnce = "atLeastOnce"

	OwnerName = "kaproxy"
)
View Source
const (
	PeerProxyID = "__peer_proxy__"
)

Variables

View Source
var (
	ErrGroupStopped         = errors.New("consumer group was stopped")
	ErrGroupNotFound        = errors.New("consumer group not found")
	ErrTopicNotFound        = errors.New("topic not found in this consumer group")
	ErrNoMessage            = errors.New("no message in brokers")
	ErrUnackManagerStopped  = errors.New("unack manager was stopped")
	ErrUnackMessageNotFound = errors.New("unack message not found")
	ErrGroupNotAllowACK     = errors.New("consumer group is not allowed to be acknowledged")
	ErrNoPartition          = errors.New("consumer group doesn't claim any partition")
	ErrNoPeer               = errors.New("no peer can pull message")
)

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	Puller *Puller
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(saramaClient sarama.Client, conf *config.Config) (*Consumer, error)

func (*Consumer) ACK

func (c *Consumer) ACK(token, group, topic string, partition int32, offset int64) error

func (*Consumer) Consume

func (c *Consumer) Consume(group, topic string, timeout, ttr time.Duration) (message *sarama.ConsumerMessage, err error)

func (*Consumer) Export

func (c *Consumer) Export(group string) (map[string]interface{}, error)

func (*Consumer) GetConsumerGroup

func (c *Consumer) GetConsumerGroup(group string) *consumerGroup

func (*Consumer) GetConsumerGroupState

func (c *Consumer) GetConsumerGroupState(group string) (string, error)

func (*Consumer) GetMetadataByName

func (c *Consumer) GetMetadataByName(group string) *GroupMetadata

func (*Consumer) ListConsumerGroup

func (c *Consumer) ListConsumerGroup() []string

func (*Consumer) ReleaseConsumerGroups

func (c *Consumer) ReleaseConsumerGroups()

func (*Consumer) Start

func (c *Consumer) Start() error

func (*Consumer) Stop

func (c *Consumer) Stop()

func (*Consumer) UpdateConsumerGroupState

func (c *Consumer) UpdateConsumerGroupState(group string, stopped bool) error

type GroupMetadata

type GroupMetadata struct {
	Owner     string                `json:"owner"`
	Topics    []string              `json:"topics"`
	Semantics string                `json:"semantics"`
	Consumer  config.ConsumerConfig `json:"consumer"`
	Stopped   bool                  `json:"stopped"`
	// contains filtered or unexported fields
}

type Puller

type Puller struct {
	// contains filtered or unexported fields
}

func (*Puller) Close

func (p *Puller) Close() error

func (*Puller) PullFromPeers

func (p *Puller) PullFromPeers(group, topic, token string, timeout, ttr uint64, noPartition bool) (*sarama.ConsumerMessage, error)

func (*Puller) Start

func (p *Puller) Start() error

type TopicUnackManager

type TopicUnackManager struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL