Documentation ¶
Index ¶
- func Consumer(s bandmaster.Service) *sarama_cluster.Consumer
- func New(conf *Config) bandmaster.Service
- func Producer(s bandmaster.Service) sarama.AsyncProducer
- func WatchConsumerEvents(ctx context.Context, s bandmaster.Service) error
- func WatchProducerEvents(ctx context.Context, s bandmaster.Service) error
- type Compression
- type Config
- type Env
- type Service
- type Version
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Consumer ¶
func Consumer(s bandmaster.Service) *sarama_cluster.Consumer
Consumer returns the underlying Kafka consumer of the given service.
It assumes that the service is ready; i.e. it might return nil if it's actually not.
NOTE: This will panic if `s` is not a `kafka.Service`.
func New ¶
func New(conf *Config) bandmaster.Service
New creates a new Kafka service using the provided configuration. You may use the helpers for environment-based configuration to get a pre-configured `Config` with sane defaults.
New doesn't open any connection, doesn't do any kind of I/O, nor does it check the validity of the passed configuration; i.e. it cannot fail.
func Producer ¶
func Producer(s bandmaster.Service) sarama.AsyncProducer
Producer returns the underlying Kafka async-producer of the given service.
It assumes that the service is ready; i.e. it might return nil if it's actually not.
NOTE: This will panic if `s` is not a `kafka.Service`.
func WatchConsumerEvents ¶
func WatchConsumerEvents(ctx context.Context, s bandmaster.Service) error
WatchConsumerEvents starts the logging routines that watch the notification and error events of the underlying Kafka consumer if the corresponding options have been enabled in its configuration.
This function waits for the service to be ready: the given context defines the deadline for this wait period.
Calling this function more than once on a given `kafka.Service` will result in undefined behavior (not the nice kind).
NOTE: This will panic if `s` is not a `kafka.Service`.
func WatchProducerEvents ¶
func WatchProducerEvents(ctx context.Context, s bandmaster.Service) error
WatchProducerEvents starts the logging routines that watch the notification and error events of the underlying Kafka producer, if the corresponding options have been enabled in its configuration.
This function waits for the service to be ready, it will block if it's not.
Calling this function more than once on a given `kafka.Service` will result in undefined behavior (not the nice kind).
NOTE: This will panic if `s` is not a `kafka.Service`.
Types ¶
type Compression ¶
type Compression sarama.CompressionCodec
Compression is used to configure sarama's compression codec value from the environment.
func (*Compression) Decode ¶
func (sccd *Compression) Decode(v string) error
func (Compression) String ¶
func (sccd Compression) String() string
type Config ¶
type Config struct { ClusterConf *sarama_cluster.Config Addrs []string ConsumerTopics []string ConsumerGroupID string }
Config contains the necessary configuration for a Kafka service.
func Conf ¶
func Conf(s bandmaster.Service) *Config
Conf returns the underlying configuration of the given service.
NOTE: This will panic if `s` is not a `kafka.Service`.
type Env ¶
type Env struct { /* Common */ Version Version `envconfig:"VERSION" default:"V0_10_1_0"` Addrs []string `envconfig:"ADDRS" default:"localhost:9092"` Bufsize int `envconfig:"BUFSIZE" default:"4096"` /* Producer */ ProdNotifSuccess bool `envconfig:"PROD_NOTIF_SUCCESS" default:"false"` ProdNotifError bool `envconfig:"PROD_NOTIF_ERROR" default:"true"` ProdCompression Compression `envconfig:"PROD_COMPRESSION" default:"none"` /* Consumer */ ConsTopics []string `envconfig:"CONS_TOPICS" default:""` ConsMode uint8 `envconfig:"CONS_MODE" default:"1"` // ConsumerModePartitions ConsGroupID string `envconfig:"CONS_GROUP_ID" default:""` ConsNotifRebalance bool `envconfig:"CONS_NOTIF_REBALANCE" default:"true"` ConsNotifError bool `envconfig:"CONS_NOTIF_ERROR" default:"true"` ConsOffsetInitial int64 `envconfig:"CONS_OFFSET_INITIAL" default:"-1"` // OffsetNewest ConsOffsetCommitInterval time.Duration `envconfig:"CONS_OFFSET_COMMIT_INTERVAL" default:"5m"` ConsOffsetRetention time.Duration `envconfig:"CONS_OFFSET_RETENTION" default:"0"` ConsRetryBackoff time.Duration `envconfig:"CONS_RETRY_BACKOFF" default:"1s"` }
Env can be used to configure a Kafka session via the environment.
It comes with sane defaults for a local development set-up.
type Service ¶
type Service struct { *bandmaster.ServiceBase // "inheritance" // contains filtered or unexported fields }
Service implements a Kafka service based on the 'bsm/sarama-cluster' package.
func (*Service) Start ¶
Start checks the validity of the configuration then creates a new Kafka consumer as well as an asynchronous producer: if everything goes smoothly, the service is marked as 'started'; otherwise, an error is returned.
Start is used by BandMaster's internal machinery, it shouldn't ever be called directly by the end-user of the service.