Documentation ¶
Index ¶
- func EventNotifyFromContext(ctx context.Context) (chan bool, bool)
- func EventNotifyWithContext(ctx context.Context, c chan bool) context.Context
- func NewKafkaEventManager(opts KafkaManagerOptions, l *log.Logger) (foundation.EventManager, error)
- func SetPartition(partition int32) foundation.EventPublishOption
- type KafkaConsumer
- type KafkaConsumerMaker
- type KafkaManager
- func (km *KafkaManager) Close() error
- func (km *KafkaManager) ConsumerErrors() <-chan error
- func (km *KafkaManager) ProducerErrors() <-chan error
- func (km *KafkaManager) Publish(ctx context.Context, topic string, event foundation.Event, ...) error
- func (km *KafkaManager) Subscribe(ctx context.Context, group, topic string, handler foundation.EventHandler) error
- type KafkaManagerOptions
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func EventNotifyFromContext ¶
EventNotifyFromContext is that acquire a event notify channel from giving context
func EventNotifyWithContext ¶
EventNotifyWithContext is that bind a event notify channel with a giving context
func NewKafkaEventManager ¶
func NewKafkaEventManager(opts KafkaManagerOptions, l *log.Logger) (foundation.EventManager, error)
NewKafkaEventManager create a kafka event manager instance
func SetPartition ¶
func SetPartition(partition int32) foundation.EventPublishOption
SetPartition is action to specify a kafka partition to publish message
Types ¶
type KafkaConsumer ¶
type KafkaConsumer struct {
// contains filtered or unexported fields
}
KafkaConsumer implement EventConsumer interface
func (*KafkaConsumer) Do ¶
func (kc *KafkaConsumer) Do(ctx context.Context, wg *sync.WaitGroup, h foundation.EventHandler, errQueue chan error)
Do implement EventConsumer interface
func (*KafkaConsumer) SetBatchConsumptionHandleTimeout ¶
func (kc *KafkaConsumer) SetBatchConsumptionHandleTimeout(timeout int64) *KafkaConsumer
func (*KafkaConsumer) SetBatchConsumptionSize ¶
func (kc *KafkaConsumer) SetBatchConsumptionSize(size int) *KafkaConsumer
type KafkaConsumerMaker ¶
type KafkaConsumerMaker struct {
// contains filtered or unexported fields
}
KafkaConsumerMaker is a kafka consumer factory
func (*KafkaConsumerMaker) Consumer ¶
func (kcm *KafkaConsumerMaker) Consumer(group, topic string) (*KafkaConsumer, error)
Consumer create a kafka cluster consumer by consumer maker
type KafkaManager ¶
type KafkaManager struct {
// contains filtered or unexported fields
}
KafkaManager implement event manager interface
func (*KafkaManager) Close ¶
func (km *KafkaManager) Close() error
Close is that close message consumer and publisher safety
func (*KafkaManager) ConsumerErrors ¶
func (km *KafkaManager) ConsumerErrors() <-chan error
func (*KafkaManager) ProducerErrors ¶
func (km *KafkaManager) ProducerErrors() <-chan error
type KafkaManagerOptions ¶
type KafkaManagerOptions struct { // yaml Host string `mapstructure:"host"` ErrChanSize int `mapstructure:"err_chan_size"` ThrottleBottleSize int `mapstructure:"throttle_bottle_size"` Producer struct { RequiredACKs int `mapstructure:"required_acks"` MaxMessageBytes int `mapstructure:"max_message_bytes"` Compression int `mapstructure:"compression"` FlushFrequency int `mapstructure:"flush_frequency"` ReturnSuccess bool `mapstructure:"return_success"` ReturnErrors bool `mapstructure:"return_errors"` } `mapstructure:"producer"` Consumer struct { GroupMode int `mapstructure:"group_mode"` GroupNotification bool `mapstructure:"group_notification"` OffsetInitial int `mapstructure:"offset_initial"` ReturnErrors bool `mapstructure:"return_errors"` BatchConsumptionSize int `mapstructure:"batch_consumption_size"` BatchConsumptionTimeout int64 `mapstructure:"batch_consumption_timeout"` } `mapstructure:"consumer"` }
Click to show internal directories.
Click to hide internal directories.