consumerserver

package
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2024 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const PackageName = "component.ekafka.consumerserver"

PackageName is the name of this component.

Variables

This section is empty.

Functions

func DefaultConfig

func DefaultConfig() *config

DefaultConfig returns a default config.

Types

type BatchHandler added in v1.0.4

type BatchHandler func(lastCtx context.Context, messages []*ekafka.CtxMessage) error

type BatchListener added in v1.0.4

type BatchListener struct {
	Batch           []*ekafka.CtxMessage
	BatchUpdateSize int
	Timeout         time.Duration
	Handler         BatchHandler
	// contains filtered or unexported fields
}

func (*BatchListener) Handle added in v1.0.4

func (l *BatchListener) Handle(ctx context.Context, message *ekafka.Message, optFuncs ...handleOption) (bool, error)

type Component

type Component struct {
	ServerCtx context.Context
	// contains filtered or unexported fields
}

Component starts an Ego server for message consuming.

func NewConsumerServerComponent

func NewConsumerServerComponent(name string, config *config, ekafkaComponent *ekafka.Component, logger *elog.Component) *Component

NewConsumerServerComponent creates a new server instance.

func (*Component) Consumer

func (cmp *Component) Consumer() *ekafka.Consumer

Consumer returns the default Consumer.

func (*Component) ConsumerGroup

func (cmp *Component) ConsumerGroup() *ekafka.ConsumerGroup

ConsumerGroup returns the default ConsumerGroup.

func (*Component) GracefulStop

func (cmp *Component) GracefulStop(ctx context.Context) error

GracefulStop stops the server.

func (*Component) Info

func (cmp *Component) Info() *server.ServiceInfo

Info returns server info, used by governor and consumer balancer.

func (*Component) Init

func (cmp *Component) Init() error

Init ...

func (*Component) Name

func (cmp *Component) Name() string

Name returns the name of this instance.

func (*Component) OnConsumeEachMessage added in v1.0.2

func (cmp *Component) OnConsumeEachMessage(handler OnConsumeEachMessageHandler) error

OnConsumeEachMessage register a handler for each message. When the handler returns an error, the message will be retried if the error is ErrRecoverableError else the message will not be committed.

func (*Component) OnConsumerGroupStart

func (cmp *Component) OnConsumerGroupStart(handler OnConsumerGroupStartHandler) error

OnConsumerGroupStart ...

func (*Component) OnEachMessage

func (cmp *Component) OnEachMessage(consumptionErrors chan<- error, handler OnEachMessageHandler) error

OnEachMessage ... Deprecated: use OnConsumeEachMessage instead.

func (*Component) OnStart

func (cmp *Component) OnStart(handler OnStartHandler) error

OnStart ...

func (*Component) PackageName

func (cmp *Component) PackageName() string

PackageName returns the package name.

func (*Component) Start

func (cmp *Component) Start() error

Start will start consuming.

func (*Component) Stop

func (cmp *Component) Stop() error

Stop stops the server.

func (*Component) Subscribe added in v1.0.4

func (cmp *Component) Subscribe(listener Listener)

Subscribe append a handler for each message.

func (*Component) SubscribeBatchHandler added in v1.0.4

func (cmp *Component) SubscribeBatchHandler(handler BatchHandler, batchSize int, timeout time.Duration)

SubscribeBatchHandler append a batch listener with this handler for each message. A batch messages will be handled when batch size or timeout reached

func (*Component) SubscribeSingleHandler added in v1.0.4

func (cmp *Component) SubscribeSingleHandler(handler Handler)

SubscribeSingleHandler append a single listener with this handler for each message

type Container

type Container struct {
	// contains filtered or unexported fields
}

func DefaultContainer

func DefaultContainer() *Container

DefaultContainer 返回默认Container

func Load

func Load(key string) *Container

Load 载入配置,初始化Container

func (*Container) Build

func (c *Container) Build(options ...Option) *Component

Build 构建Container

type Handler added in v1.0.4

type Handler func(ctx context.Context, message *ekafka.Message) error

type Listener added in v1.0.4

type Listener interface {
	Handle(ctx context.Context, message *ekafka.Message, opts ...handleOption) (bool, error)
}

type OnConsumeEachMessageHandler added in v1.0.3

type OnConsumeEachMessageHandler = func(ctx context.Context, message *ekafka.Message) error

OnConsumeEachMessageHandler ...

type OnConsumerGroupStartHandler

type OnConsumerGroupStartHandler = func(ctx context.Context, consumerGroup *ekafka.ConsumerGroup) error

OnConsumerGroupStartHandler ...

type OnEachMessageHandler

type OnEachMessageHandler = func(ctx context.Context, message kafka.Message) error

OnEachMessageHandler ...

type OnStartHandler

type OnStartHandler = func(ctx context.Context, consumer *ekafka.Consumer) error

OnStartHandler ...

type Option

type Option func(c *Container)

func WithDebug

func WithDebug(debug bool) Option

WithDebug enables debug mode.

func WithEkafka

func WithEkafka(ekafkaComponent *ekafka.Component) Option

WithEkafka ...

type SyncListener added in v1.0.4

type SyncListener struct {
	Handler Handler
	// contains filtered or unexported fields
}

func (*SyncListener) Handle added in v1.0.4

func (l *SyncListener) Handle(ctx context.Context, message *ekafka.Message, opts ...handleOption) (bool, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL