fixtures

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2021 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultTopic = "my-topic"

Variables

This section is empty.

Functions

func NewElasticRecord added in v0.5.1

func NewElasticRecord() (*models.ElasticRecord, int32)

func NewRecord

func NewRecord(ts time.Time) (*models.Record, int32, int32)

Types

type AsyncProducer

type AsyncProducer struct {
	// contains filtered or unexported fields
}

func (*AsyncProducer) Close

func (this *AsyncProducer) Close() error

func (*AsyncProducer) GetHeader

func (this *AsyncProducer) GetHeader(event KafkaEvent) ([]byte, error)

func (*AsyncProducer) GetSuccesses

func (this *AsyncProducer) GetSuccesses() <-chan *sarama.ProducerMessage

func (*AsyncProducer) Publish

func (this *AsyncProducer) Publish(event KafkaEvent) error

func (*AsyncProducer) Start

func (this *AsyncProducer) Start()

type FixtureRecord

type FixtureRecord struct {
	Id        int32
	Value     int32
	Timestamp int64 `json:"@timestamp"`
}

func NewEmptyFixtureRecord added in v1.2.0

func NewEmptyFixtureRecord() *FixtureRecord

func NewFixtureRecord

func NewFixtureRecord() *FixtureRecord

func (*FixtureRecord) Schema

func (r *FixtureRecord) Schema() string

func (*FixtureRecord) ToAvroSerialization

func (r *FixtureRecord) ToAvroSerialization() ([]byte, error)

func (*FixtureRecord) Topic

func (r *FixtureRecord) Topic() string

type FixtureSchemaRegistry

type FixtureSchemaRegistry struct {
	*schema_registry.SchemaRegistry
}

func (*FixtureSchemaRegistry) RegisterOrGetSchemaId

func (this *FixtureSchemaRegistry) RegisterOrGetSchemaId(event KafkaEvent) (int, error)

type KafkaEvent

type KafkaEvent interface {
	Topic() string
	Schema() string
	ToAvroSerialization() ([]byte, error) // should return the avro serialized object
}

type Producer

type Producer interface {
	Publish(KafkaEvent) error
	Start()
	Close() error
	GetSuccesses() <-chan *sarama.ProducerMessage
}

func NewProducer

func NewProducer(kafkaAddr string, config *sarama.Config, schemaRegistry *schema_registry.SchemaRegistry) (Producer, error)

Config example :

config.Producer.Return.Successes = true
config.Producer.MaxMessageBytes = 20 * 1024 * 1024 // 20mb
config.Producer.Flush.Frequency = 500 * time.Millisecond
config.Version = sarama.V2_3_0_0

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL