saramaconsumer

package
v0.0.0-...-18de4b0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2023 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func New

func New(setters ...Setter) consumer.Consumer

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) AddHandler

func (c *Consumer) AddHandler(handlers ...consumer.Handler)

func (*Consumer) Cleanup

func (c *Consumer) Cleanup(session sarama.ConsumerGroupSession) error

func (*Consumer) Close

func (c *Consumer) Close()

func (*Consumer) ConsumeClaim

func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

func (*Consumer) Run

func (c *Consumer) Run()

func (*Consumer) Setup

func (c *Consumer) Setup(session sarama.ConsumerGroupSession) error

type Setter

type Setter func(c *Consumer)

func SetAdmin

func SetAdmin(maxRetry int, backoff, timeout time.Duration) Setter

func SetApiVersionRequest

func SetApiVersionRequest(apiVersionRequest bool) Setter

func SetBrokers

func SetBrokers(brokers ...string) Setter

func SetChannelBufferSize

func SetChannelBufferSize(channelBufferSize int) Setter

func SetClientID

func SetClientID(clientID string) Setter

func SetConsumerFetch

func SetConsumerFetch(min int32, defaultBytes int32, max int32) Setter

func SetConsumerGroupHeartbeatInterval

func SetConsumerGroupHeartbeatInterval(interval time.Duration) Setter

func SetConsumerGroupInstanceId

func SetConsumerGroupInstanceId(instanceId string) Setter

func SetConsumerGroupMemberUserData

func SetConsumerGroupMemberUserData(userData []byte) Setter

func SetConsumerGroupRebalanceGroupStrategies

func SetConsumerGroupRebalanceGroupStrategies(groupStrategies []sarama.BalanceStrategy) Setter

func SetConsumerGroupRebalanceRetry

func SetConsumerGroupRebalanceRetry(max int, backoff time.Duration) Setter

func SetConsumerGroupRebalanceTimeout

func SetConsumerGroupRebalanceTimeout(timeout time.Duration) Setter

func SetConsumerGroupResetInvalidOffsets

func SetConsumerGroupResetInvalidOffsets(resetInvalidOffsets bool) Setter

func SetConsumerGroupSessionTimeout

func SetConsumerGroupSessionTimeout(timeout time.Duration) Setter

func SetConsumerInterceptors

func SetConsumerInterceptors(interceptors []sarama.ConsumerInterceptor) Setter

func SetConsumerIsolationLevel

func SetConsumerIsolationLevel(isolationLevel sarama.IsolationLevel) Setter

func SetConsumerMaxWaitTime

func SetConsumerMaxWaitTime(maxWaitTime time.Duration) Setter

func SetConsumerOffsetsAutoCommit

func SetConsumerOffsetsAutoCommit(enable bool, interval time.Duration) Setter

func SetConsumerOffsetsInitial

func SetConsumerOffsetsInitial(initial int64) Setter

func SetConsumerOffsetsRetention

func SetConsumerOffsetsRetention(retention time.Duration) Setter

func SetConsumerOffsetsRetry

func SetConsumerOffsetsRetry(max int) Setter

func SetConsumerRetry

func SetConsumerRetry(backoff time.Duration, backoffFunc func(retries int) time.Duration) Setter

func SetConsumerReturn

func SetConsumerReturn(errors bool) Setter

func SetGroupID

func SetGroupID(groupID string) Setter

func SetLogger

func SetLogger(logger logger.LogWriter) Setter

func SetMetadataAllowAutoTopicCreation

func SetMetadataAllowAutoTopicCreation(allowAutoTopicCreation bool) Setter

func SetMetadataFull

func SetMetadataFull(full bool) Setter

func SetMetadataRefreshFrequency

func SetMetadataRefreshFrequency(refreshFrequency time.Duration) Setter

func SetMetadataRetry

func SetMetadataRetry(max int, backoff time.Duration, backoffFunc func(retries, maxRetries int) time.Duration) Setter

func SetMetadataTimeout

func SetMetadataTimeout(timeout time.Duration) Setter

func SetMetricRegistry

func SetMetricRegistry(metricRegistry metrics.Registry) Setter

func SetNetDialTimeout

func SetNetDialTimeout(dialTimeout time.Duration) Setter

func SetNetKeepAlive

func SetNetKeepAlive(keepAlive time.Duration) Setter

func SetNetLocalAddr

func SetNetLocalAddr(addr net.Addr) Setter

func SetNetMaxOpenRequests

func SetNetMaxOpenRequests(maxOpenRequests int) Setter

func SetNetProxy

func SetNetProxy(enable bool, dialer proxy.Dialer) Setter

func SetNetReadTimeout

func SetNetReadTimeout(readTimeout time.Duration) Setter

func SetNetSASLAuthIdentity

func SetNetSASLAuthIdentity(authIdentity string) Setter

func SetNetSASLEnable

func SetNetSASLEnable(enable bool) Setter

func SetNetSASLGSSAPI

func SetNetSASLGSSAPI(gssAPI sarama.GSSAPIConfig) Setter

func SetNetSASLHandshake

func SetNetSASLHandshake(handshake bool) Setter

func SetNetSASLMechanism

func SetNetSASLMechanism(saslMechanism sarama.SASLMechanism) Setter

func SetNetSASLPassword

func SetNetSASLPassword(password string) Setter

func SetNetSASLSCRAMAuthzID

func SetNetSASLSCRAMAuthzID(scramAuthzID string) Setter

func SetNetSASLTokenProvider

func SetNetSASLTokenProvider(tokenProvider sarama.AccessTokenProvider) Setter

func SetNetSASLUser

func SetNetSASLUser(user string) Setter

func SetNetSASLVersion

func SetNetSASLVersion(version int16) Setter

func SetNetTLS

func SetNetTLS(enable bool, config *tls.Config) Setter

func SetNetWriteTimeout

func SetNetWriteTimeout(writeTimeout time.Duration) Setter

func SetRackID

func SetRackID(rackID string) Setter

func SetVersion

func SetVersion(version sarama.KafkaVersion) Setter

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL