kafka

package
v1.1.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 11, 2019 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var KafkaClientRef = &KafkaClientImpl{}
View Source
var SampleKafkaConfig = KafkaConfig{
	Brokers: "127.0.0.1:9092",
	Version: "",
	Topic:   "sarama",
	Verbose: true,
	Consumer: ConsumerConfiguration{
		Group:  "example",
		Oldest: true,
	},
}

Functions

This section is empty.

Types

type AcceptMessageFunc

type AcceptMessageFunc func(logrus.FieldLogger, []byte) (fault.TypedError, string)

type ConsumG

type ConsumG struct {
	Log *logger.Logger

	AcceptMessageFunc AcceptMessageFunc
	KafkaClient       *KafkaClientImpl
	// contains filtered or unexported fields
}

Consumer represents a Sarama consumer group consumer

func (*ConsumG) Cleanup

func (consumer *ConsumG) Cleanup(sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*ConsumG) ConsumeClaim

func (this *ConsumG) ConsumeClaim(
	session sarama.ConsumerGroupSession,
	claim sarama.ConsumerGroupClaim,
) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*ConsumG) Setup

func (consumer *ConsumG) Setup(sarama.ConsumerGroupSession) error

Setup is run at the beginning of a new session, before ConsumeClaim

type ConsumerConfiguration

type ConsumerConfiguration struct {
	Group             string
	Oldest            bool
	AcceptMessageFunc AcceptMessageFunc
}

type KafkaClient

type KafkaClient interface {
	Init(*logger.Logger, *KafkaConfig)
	KafkaConfig() *KafkaConfig
	Loop() fault.TypedError
	GroupLoop() fault.TypedError
	ConfigurationName() string
	Version() string
	Push([]byte) fault.TypedError
	PushText(string) fault.TypedError
	PushEvent(interface{}) fault.TypedError
	Ping() fault.TypedError
	Status() string
	IsWorked() bool
	GetMetrics() map[string]*Metrics
	GetHost() []string
	SetAcceptMessageFunc(AcceptMessageFunc)
}

type KafkaClientImpl

type KafkaClientImpl struct {
	Log *logger.Logger

	Topic          string
	Partition      int32
	Offset         string
	MsgCount       int64
	IsWorkedStatus bool
	StatusError    error

	Metrics map[string]*Metrics
	Brokers []string
	// contains filtered or unexported fields
}

func (*KafkaClientImpl) ConfigurationName

func (this *KafkaClientImpl) ConfigurationName() string

func (*KafkaClientImpl) GetHost

func (this *KafkaClientImpl) GetHost() []string

func (*KafkaClientImpl) GetMetrics

func (this *KafkaClientImpl) GetMetrics() map[string]*Metrics

func (*KafkaClientImpl) GetTopicMetrics

func (this *KafkaClientImpl) GetTopicMetrics(metricTopicIdx string) *Metrics

func (*KafkaClientImpl) GroupLoop

func (this *KafkaClientImpl) GroupLoop() fault.TypedError

func (*KafkaClientImpl) Init

func (this *KafkaClientImpl) Init(
	log *logger.Logger,
	kafkaConfig *KafkaConfig,
)

func (*KafkaClientImpl) IsWorked

func (this *KafkaClientImpl) IsWorked() bool

func (*KafkaClientImpl) KafkaConfig

func (this *KafkaClientImpl) KafkaConfig() *KafkaConfig

func (*KafkaClientImpl) Loop

func (this *KafkaClientImpl) Loop() fault.TypedError

func (*KafkaClientImpl) Ping

func (this *KafkaClientImpl) Ping() fault.TypedError

func (*KafkaClientImpl) Pong

func (this *KafkaClientImpl) Pong() (fault.TypedError, string)

func (*KafkaClientImpl) Push

func (this *KafkaClientImpl) Push(
	message []byte,
) fault.TypedError

func (*KafkaClientImpl) PushEvent

func (this *KafkaClientImpl) PushEvent(
	message interface{},
) fault.TypedError

func (*KafkaClientImpl) PushText

func (this *KafkaClientImpl) PushText(message string) fault.TypedError

func (*KafkaClientImpl) SetAcceptMessageFunc

func (this *KafkaClientImpl) SetAcceptMessageFunc(acceptMessageFunc AcceptMessageFunc)

func (*KafkaClientImpl) Status

func (this *KafkaClientImpl) Status() (status string)

func (*KafkaClientImpl) Version

func (this *KafkaClientImpl) Version() (version string)

type KafkaConfig

type KafkaConfig struct {
	ConfigurationName string
	Enable            bool
	ClientID          string
	Brokers           string
	Version           string
	Topic             string
	Verbose           bool
	Partition         int32
	ProducerEnable    bool
	ConsumerEnable    bool
	Consumer          ConsumerConfiguration
	ActionName        string
}

type Metrics

type Metrics struct {
	ProduceMsgCount         uint64
	ConsumeMsgCount         uint64
	ProduceFailedMsgCount   uint64
	ProduceSuccessMsgCount  uint64
	HealthCheckFailedCount  uint64
	ConsumeFailedMsgCount   uint64
	ConsumeSuccessMsgCount  uint64
	HealthCheckCount        uint64
	HealthCheckSuccessCount uint64
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL