kafka

package
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2018 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Consumer

Consumer returns the underlying Kafka consumer of the given service.

It assumes that the service is ready; i.e. it might return nil if it's actually not.

NOTE: This will panic if `s` is not a `kafka.Service`.

func New

func New(conf *Config) bandmaster.Service

New creates a new Kafka service using the provided configuration. You may use the helpers for environment-based configuration to get a pre-configured `Config` with sane defaults.

New doesn't open any connection, doesn't do any kind of I/O, nor does it check the validity of the passed configuration; i.e. it cannot fail.

func Producer

Producer returns the underlying Kafka async-producer of the given service.

It assumes that the service is ready; i.e. it might return nil if it's actually not.

NOTE: This will panic if `s` is not a `kafka.Service`.

func WatchConsumerEvents

func WatchConsumerEvents(ctx context.Context, s bandmaster.Service) error

WatchConsumerEvents starts the logging routines that watch the notification and error events of the underlying Kafka consumer if the corresponding options have been enabled in its configuration.

This function waits for the service to be ready: the given context defines the deadline for this wait period.

Calling this function more than once on a given `kafka.Service` will result in undefined behavior (not the nice kind).

NOTE: This will panic if `s` is not a `kafka.Service`.

func WatchProducerEvents

func WatchProducerEvents(ctx context.Context, s bandmaster.Service) error

WatchProducerEvents starts the logging routines that watch the notification and error events of the underlying Kafka producer, if the corresponding options have been enabled in its configuration.

This function waits for the service to be ready, it will block if it's not.

Calling this function more than once on a given `kafka.Service` will result in undefined behavior (not the nice kind).

NOTE: This will panic if `s` is not a `kafka.Service`.

Types

type Compression

type Compression sarama.CompressionCodec

Compression is used to configure sarama's compression codec value from the environment.

func (*Compression) Decode

func (sccd *Compression) Decode(v string) error

func (Compression) String

func (sccd Compression) String() string

type Config

type Config struct {
	ClusterConf *sarama_cluster.Config

	Addrs           []string
	ConsumerTopics  []string
	ConsumerGroupID string
}

Config contains the necessary configuration for a Kafka service.

func Conf

func Conf(s bandmaster.Service) *Config

Conf returns the underlying configuration of the given service.

NOTE: This will panic if `s` is not a `kafka.Service`.

type Env

type Env struct {
	/* Common */
	Version Version  `envconfig:"VERSION" default:"V0_10_1_0"`
	Addrs   []string `envconfig:"ADDRS" default:"localhost:9092"`
	Bufsize int      `envconfig:"BUFSIZE" default:"4096"`

	/* Producer */
	ProdNotifSuccess bool        `envconfig:"PROD_NOTIF_SUCCESS" default:"false"`
	ProdNotifError   bool        `envconfig:"PROD_NOTIF_ERROR" default:"true"`
	ProdCompression  Compression `envconfig:"PROD_COMPRESSION" default:"none"`

	/* Consumer */
	ConsTopics               []string      `envconfig:"CONS_TOPICS" default:""`
	ConsMode                 uint8         `envconfig:"CONS_MODE" default:"1"` // ConsumerModePartitions
	ConsGroupID              string        `envconfig:"CONS_GROUP_ID" default:""`
	ConsNotifRebalance       bool          `envconfig:"CONS_NOTIF_REBALANCE" default:"true"`
	ConsNotifError           bool          `envconfig:"CONS_NOTIF_ERROR" default:"true"`
	ConsOffsetInitial        int64         `envconfig:"CONS_OFFSET_INITIAL" default:"-1"` // OffsetNewest
	ConsOffsetCommitInterval time.Duration `envconfig:"CONS_OFFSET_COMMIT_INTERVAL" default:"5m"`
	ConsOffsetRetention      time.Duration `envconfig:"CONS_OFFSET_RETENTION" default:"0"`
	ConsRetryBackoff         time.Duration `envconfig:"CONS_RETRY_BACKOFF" default:"1s"`
}

Env can be used to configure a Kafka session via the environment.

It comes with sane defaults for a local development set-up.

func NewEnv

func NewEnv(prefix string) (*Env, error)

NewEnv parses the environment and returns a new `Env` structure.

`prefix` defines the prefix for the environment keys, e.g. with a 'XX' prefix, 'REPLICAS' would become 'XX_REPLICAS'.

func (*Env) Config

func (e *Env) Config() *Config

Config returns a `Config` using the values from the environment.

type Service

type Service struct {
	*bandmaster.ServiceBase // "inheritance"
	// contains filtered or unexported fields
}

Service implements a Kafka service based on the 'bsm/sarama-cluster' package.

func (*Service) Start

Start checks the validity of the configuration then creates a new Kafka consumer as well as an asynchronous producer: if everything goes smoothly, the service is marked as 'started'; otherwise, an error is returned.

Start is used by BandMaster's internal machinery, it shouldn't ever be called directly by the end-user of the service.

func (*Service) Stop

func (s *Service) Stop(context.Context) error

Stop closes the underlying Kafka producer & consumer: if everything goes smoothly, the service is marked as 'stopped'; otherwise, an error is returned.

Stop is used by BandMaster's internal machinery, it shouldn't ever be called directly by the end-user of the service.

type Version

type Version sarama.KafkaVersion

Version is used to configure sarama's kafka version value from the environment.

func (*Version) Decode

func (v *Version) Decode(vv string) error

func (Version) String

func (v Version) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL