events

package
v0.2.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2021 License: GPL-3.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func EventNotifyFromContext

func EventNotifyFromContext(ctx context.Context) (chan bool, bool)

EventNotifyFromContext is that acquire a event notify channel from giving context

func EventNotifyWithContext

func EventNotifyWithContext(ctx context.Context, c chan bool) context.Context

EventNotifyWithContext is that bind a event notify channel with a giving context

func NewKafkaEventManager

func NewKafkaEventManager(opts KafkaManagerOptions, l *log.Logger) (foundation.EventManager, error)

NewKafkaEventManager create a kafka event manager instance

func SetPartition

func SetPartition(partition int32) foundation.EventPublishOption

SetPartition is action to specify a kafka partition to publish message

Types

type KafkaConsumer

type KafkaConsumer struct {
	// contains filtered or unexported fields
}

KafkaConsumer implement EventConsumer interface

func (*KafkaConsumer) Close

func (kc *KafkaConsumer) Close()

Close kafka consumer

func (*KafkaConsumer) Do

func (kc *KafkaConsumer) Do(ctx context.Context, wg *sync.WaitGroup, h foundation.EventHandler, errQueue chan error)

Do implement EventConsumer interface

func (*KafkaConsumer) SetBatchConsumptionHandleTimeout

func (kc *KafkaConsumer) SetBatchConsumptionHandleTimeout(timeout int64) *KafkaConsumer

func (*KafkaConsumer) SetBatchConsumptionSize

func (kc *KafkaConsumer) SetBatchConsumptionSize(size int) *KafkaConsumer

type KafkaConsumerMaker

type KafkaConsumerMaker struct {
	// contains filtered or unexported fields
}

KafkaConsumerMaker is a kafka consumer factory

func (*KafkaConsumerMaker) Consumer

func (kcm *KafkaConsumerMaker) Consumer(group, topic string) (*KafkaConsumer, error)

Consumer create a kafka cluster consumer by consumer maker

type KafkaManager

type KafkaManager struct {
	// contains filtered or unexported fields
}

KafkaManager implement event manager interface

func (*KafkaManager) Close

func (km *KafkaManager) Close() error

Close is that close message consumer and publisher safety

func (*KafkaManager) ConsumerErrors

func (km *KafkaManager) ConsumerErrors() <-chan error

func (*KafkaManager) ProducerErrors

func (km *KafkaManager) ProducerErrors() <-chan error

func (*KafkaManager) Publish

func (km *KafkaManager) Publish(ctx context.Context, topic string, event foundation.Event, opts ...foundation.EventPublishOption) error

Publish a message to Kafka by specific topic

func (*KafkaManager) Subscribe

func (km *KafkaManager) Subscribe(ctx context.Context, group, topic string, handler foundation.EventHandler) error

Subscribe a topic from Kafka

type KafkaManagerOptions

type KafkaManagerOptions struct {
	// yaml
	Host               string `mapstructure:"host"`
	ErrChanSize        int    `mapstructure:"err_chan_size"`
	ThrottleBottleSize int    `mapstructure:"throttle_bottle_size"`
	Producer           struct {
		RequiredACKs    int  `mapstructure:"required_acks"`
		MaxMessageBytes int  `mapstructure:"max_message_bytes"`
		Compression     int  `mapstructure:"compression"`
		FlushFrequency  int  `mapstructure:"flush_frequency"`
		ReturnSuccess   bool `mapstructure:"return_success"`
		ReturnErrors    bool `mapstructure:"return_errors"`
	} `mapstructure:"producer"`

	Consumer struct {
		GroupMode               int   `mapstructure:"group_mode"`
		GroupNotification       bool  `mapstructure:"group_notification"`
		OffsetInitial           int   `mapstructure:"offset_initial"`
		ReturnErrors            bool  `mapstructure:"return_errors"`
		BatchConsumptionSize    int   `mapstructure:"batch_consumption_size"`
		BatchConsumptionTimeout int64 `mapstructure:"batch_consumption_timeout"`
	} `mapstructure:"consumer"`
}

Directories

Path Synopsis
main

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL