maidenlanedkafka

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 23, 2021 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func KafkaCommonCobraInit

func KafkaCommonCobraInit(cmd *cobra.Command, kconf *KafkaCommonConf)

KafkaCommonCobraInit commandline common parameter init for Kafka

func KafkaValidateConf

func KafkaValidateConf(kconf *KafkaCommonConf) (err error)

KafkaValidateConf validates supplied configuration

Types

type KafkaBridge

type KafkaBridge struct {
	// contains filtered or unexported fields
}

KafkaBridge receives messages from Kafka and dispatches them to go-ethereum over JSON/RPC

func NewKafkaBridge

func NewKafkaBridge(printYAML *bool) *KafkaBridge

NewKafkaBridge creates a new KafkaBridge

func (*KafkaBridge) CobraInit

func (k *KafkaBridge) CobraInit() (cmd *cobra.Command)

CobraInit retruns a cobra command to configure this KafkaBridge

func (*KafkaBridge) Conf

func (k *KafkaBridge) Conf() *KafkaBridgeConf

Conf gets the config for this bridge

func (*KafkaBridge) ConsumerMessagesLoop

func (k *KafkaBridge) ConsumerMessagesLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)

ConsumerMessagesLoop - goroutine to process messages

func (*KafkaBridge) ProducerErrorLoop

func (k *KafkaBridge) ProducerErrorLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)

ProducerErrorLoop - goroutine to process producer errors

func (*KafkaBridge) ProducerSuccessLoop

func (k *KafkaBridge) ProducerSuccessLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)

ProducerSuccessLoop - goroutine to process producer successes

func (*KafkaBridge) SetConf

func (k *KafkaBridge) SetConf(conf *KafkaBridgeConf)

SetConf sets the config for this bridge

func (*KafkaBridge) Start

func (k *KafkaBridge) Start() (err error)

Start kicks off the bridge

func (*KafkaBridge) ValidateConf

func (k *KafkaBridge) ValidateConf() (err error)

ValidateConf validates the configuration

type KafkaBridgeConf

type KafkaBridgeConf struct {
	Kafka       KafkaCommonConf `json:"kafka"`
	MaxInFlight int             `json:"maxInFlight"`
	maidenlanedtx.TxnProcessorConf
	maidenlanedeth.RPCConf
}

KafkaBridgeConf defines the YAML config structure for a Kafka bridge instance

type KafkaClient

type KafkaClient interface {
	NewProducer(KafkaCommon) (KafkaProducer, error)
	NewConsumer(KafkaCommon) (KafkaConsumer, error)
	Brokers() []*sarama.Broker
}

KafkaClient is the kafka client

type KafkaCommon

type KafkaCommon interface {
	ValidateConf() error
	CobraInit(cmd *cobra.Command)
	Start() error
	Conf() *KafkaCommonConf
	Producer() KafkaProducer
}

KafkaCommon is the base interface for bridges that interact with Kafka

func NewKafkaCommon

func NewKafkaCommon(kf KafkaFactory, conf *KafkaCommonConf, kafkaGoRoutines KafkaGoRoutines) (k KafkaCommon)

NewKafkaCommon constructs a new KafkaCommon instance

type KafkaCommonConf

type KafkaCommonConf struct {
	Brokers       []string `json:"brokers"`
	ClientID      string   `json:"clientID"`
	ConsumerGroup string   `json:"consumerGroup"`
	TopicIn       string   `json:"topicIn"`
	TopicOut      string   `json:"topicOut"`
	ProducerFlush struct {
		Frequency int `json:"frequency"`
		Messages  int `json:"messages"`
		Bytes     int `json:"bytes"`
	} `json:"producerFlush"`
	SASL struct {
		Username string
		Password string
	} `json:"sasl"`
	TLS maidenlanedutils.TLSConfig `json:"tls"`
}

KafkaCommonConf - Common configuration for Kafka

type KafkaConsumer

type KafkaConsumer interface {
	Close() error
	Messages() <-chan *sarama.ConsumerMessage
	Errors() <-chan error
	MarkOffset(*sarama.ConsumerMessage, string)
}

KafkaConsumer provides the interface passed from KafkaCommon to consume messages

type KafkaFactory

type KafkaFactory interface {
	NewClient(KafkaCommon, *sarama.Config) (KafkaClient, error)
}

KafkaFactory builds new clients

type KafkaGoRoutines

type KafkaGoRoutines interface {
	ConsumerMessagesLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)
	ProducerErrorLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)
	ProducerSuccessLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)
}

KafkaGoRoutines defines goroutines for processing Kafka messages from KafkaCommon

type KafkaProducer

type KafkaProducer interface {
	AsyncClose()
	Input() chan<- *sarama.ProducerMessage
	Successes() <-chan *sarama.ProducerMessage
	Errors() <-chan *sarama.ProducerError
}

KafkaProducer provides the interface passed from KafkaCommon to produce messages (subset of sarama)

type MockKafkaConsumer

type MockKafkaConsumer struct {
	MockMessages       chan *sarama.ConsumerMessage
	MockErrors         chan error
	OffsetsByPartition map[int32]int64
}

MockKafkaConsumer - mock

func (*MockKafkaConsumer) Close

func (c *MockKafkaConsumer) Close() error

Close - mock

func (*MockKafkaConsumer) Errors

func (c *MockKafkaConsumer) Errors() <-chan error

Errors - mock

func (*MockKafkaConsumer) MarkOffset

func (c *MockKafkaConsumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string)

MarkOffset - mock

func (*MockKafkaConsumer) Messages

func (c *MockKafkaConsumer) Messages() <-chan *sarama.ConsumerMessage

Messages - mock

type MockKafkaFactory

type MockKafkaFactory struct {
	ClientConf         *sarama.Config
	ErrorOnNewClient   error
	ErrorOnNewProducer error
	ErrorOnNewConsumer error
	Producer           *MockKafkaProducer
	Consumer           *MockKafkaConsumer
}

MockKafkaFactory - mock

func NewErrorMockKafkaFactory

func NewErrorMockKafkaFactory(errorOnNewClient error, errorOnNewConsumer error, errorOnNewProducer error) *MockKafkaFactory

NewErrorMockKafkaFactory - mock

func NewMockKafkaFactory

func NewMockKafkaFactory() *MockKafkaFactory

NewMockKafkaFactory - mock

func (*MockKafkaFactory) Brokers

func (f *MockKafkaFactory) Brokers() []*sarama.Broker

Brokers - mock

func (*MockKafkaFactory) NewClient

func (f *MockKafkaFactory) NewClient(k KafkaCommon, clientConf *sarama.Config) (KafkaClient, error)

NewClient - mock

func (*MockKafkaFactory) NewConsumer

func (f *MockKafkaFactory) NewConsumer(k KafkaCommon) (KafkaConsumer, error)

NewConsumer - mock

func (*MockKafkaFactory) NewProducer

func (f *MockKafkaFactory) NewProducer(k KafkaCommon) (KafkaProducer, error)

NewProducer - mock

type MockKafkaProducer

type MockKafkaProducer struct {
	MockInput     chan *sarama.ProducerMessage
	MockSuccesses chan *sarama.ProducerMessage
	MockErrors    chan *sarama.ProducerError
	Closed        bool
	CloseSync     sync.Mutex
}

MockKafkaProducer - mock

func (*MockKafkaProducer) AsyncClose

func (p *MockKafkaProducer) AsyncClose()

AsyncClose - mock

func (*MockKafkaProducer) Errors

func (p *MockKafkaProducer) Errors() <-chan *sarama.ProducerError

Errors - mock

func (*MockKafkaProducer) Input

func (p *MockKafkaProducer) Input() chan<- *sarama.ProducerMessage

Input - mock

func (*MockKafkaProducer) Successes

func (p *MockKafkaProducer) Successes() <-chan *sarama.ProducerMessage

Successes - mock

type SaramaKafkaFactory

type SaramaKafkaFactory struct{}

SaramaKafkaFactory - uses sarama

func (*SaramaKafkaFactory) NewClient

func (f *SaramaKafkaFactory) NewClient(k KafkaCommon, clientConf *sarama.Config) (c KafkaClient, err error)

NewClient - returns a new client

Directories

Path Synopsis
Package mock_sarama is a generated GoMock package.
Package mock_sarama is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL