kafka

package
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2024 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var NoOpHandler = &noOpHandler{}

Functions

func NewProducer

func NewProducer(ctx context.Context, conf Config, logger *log.Logger) (*kafkaProducer, error)

the caller can cancel the producer's context to initiate shutdown.

Types

type AvroEncoder

type AvroEncoder struct {
	SchemaID int
	Content  []byte
}

AvroEncoder encodes schemaId and Avro message.

func (*AvroEncoder) Encode

func (a *AvroEncoder) Encode() ([]byte, error)

Notice: the Confluent schema registry has special requirements for the Avro serialization rules, not only need to serialize the specific content, but also attach the Schema ID and Magic Byte. Ref: https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format

func (*AvroEncoder) Length

func (a *AvroEncoder) Length() int

Length of schemaId and Content.

type CachedSchemaRegistryClient

type CachedSchemaRegistryClient struct {
	SchemaRegistryClient *SchemaRegistryClient
	// contains filtered or unexported fields
}

CachedSchemaRegistryClient is a schema registry client that will cache some data to improve performance

func NewCachedSchemaRegistryClient

func NewCachedSchemaRegistryClient(connect []string) *CachedSchemaRegistryClient

func NewCachedSchemaRegistryClientWithRetries

func NewCachedSchemaRegistryClientWithRetries(connect []string, retries int) *CachedSchemaRegistryClient

func (*CachedSchemaRegistryClient) CreateSubject

func (client *CachedSchemaRegistryClient) CreateSubject(subject string, codec *goavro.Codec) (int, error)

CreateSubject will return and cache the id with the given codec

func (*CachedSchemaRegistryClient) DeleteSubject

func (client *CachedSchemaRegistryClient) DeleteSubject(subject string) error

DeleteSubject deletes the subject, should only be used in development

func (*CachedSchemaRegistryClient) DeleteVersion

func (client *CachedSchemaRegistryClient) DeleteVersion(subject string, version int) error

DeleteVersion deletes the a specific version of a subject, should only be used in development.

func (*CachedSchemaRegistryClient) GetLatestSchema

func (client *CachedSchemaRegistryClient) GetLatestSchema(subject string) (*goavro.Codec, error)

GetLatestSchema returns the highest version schema for a subject

func (*CachedSchemaRegistryClient) GetSchema

func (client *CachedSchemaRegistryClient) GetSchema(id int) (*goavro.Codec, error)

GetSchema will return and cache the codec with the given id

func (*CachedSchemaRegistryClient) GetSchemaByVersion

func (client *CachedSchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*goavro.Codec, error)

GetSchemaByVersion returns the codec for a specific version of a subject

func (*CachedSchemaRegistryClient) GetSubjects

func (client *CachedSchemaRegistryClient) GetSubjects() ([]string, error)

GetSubjects returns a list of subjects

func (*CachedSchemaRegistryClient) GetVersions

func (client *CachedSchemaRegistryClient) GetVersions(subject string) ([]int, error)

GetVersions returns a list of all versions of a subject

func (*CachedSchemaRegistryClient) IsSchemaRegistered

func (client *CachedSchemaRegistryClient) IsSchemaRegistered(subject string, codec *goavro.Codec) (int, error)

IsSchemaRegistered checks if a specific codec is already registered to a subject

type Config

type Config struct {
	Brokers  string `envconfig:"KAFKA_BROKERS"`
	Version  string `envconfig:"KAFKA_VERSION"`
	Verbose  bool   `envconfig:"KAFKA_VERBOSE"`
	ClientID string `envconfig:"KAFKA_CLIENT_ID"`
	Topics   string `envconfig:"KAFKA_TOPICS"`

	TLSEnabled bool   `envconfig:"KAFKA_TLS_ENABLED"`
	TLSKey     string `envconfig:"KAFKA_TLS_KEY"`
	TLSCert    string `envconfig:"KAFKA_TLS_CERT"`
	CACerts    string `envconfig:"KAFKA_CA_CERTS"`

	// Consumer specific parameters
	Group             string        `envconfig:"KAFKA_GROUP"`
	RebalanceStrategy string        `envconfig:"KAFKA_REBALANCE_STRATEGY"`
	RebalanceTimeout  time.Duration `envconfig:"KAFKA_REBALANCE_TIMEOUT"`
	InitOffsets       string        `envconfig:"KAFKA_INIT_OFFSETS"`
	CommitInterval    time.Duration `envconfig:"KAFKA_COMMIT_INTERVAL"`

	// Producer specific parameters
	FlushInterval time.Duration `envconfig:"KAFKA_FLUSH_INTERVAL"`

	// Schema Registry server
	SchemaRegistryServers string `envconfig:"KAFKA_SCHEMA_REGISTRY_SERVERS"`

	IsolationLevel string `envconfig:"KAFKA_ISOLATION_LEVEL"`
}

simple Kafka config abstraction; can be populated from env vars via FromEnv() or fields can applied to CLI flags by the caller.

func FromEnv

func FromEnv() (Config, error)

hydrate kafka.Config using environment variables

func NewKafkaConfig

func NewKafkaConfig() Config

returns a new kafka.Config with reasonable defaults for some values

type Consumer

type Consumer interface {
	// caller should run the returned function in a goroutine, and consume
	// the returned error channel until it's closed at shutdown.
	Background() (func(), chan error)
}

func NewConsumer

func NewConsumer(ctx context.Context, conf Config, handler Handler, logger *log.Logger) (Consumer, error)

caller should cancel the supplied context when a graceful consumer shutdown is desired

type ConsumerMessage

type ConsumerMessage sarama.ConsumerMessage

alias these to abstract the Sarama-specific message type from end users

type Error

type Error struct {
	ErrorCode int    `json:"error_code"`
	Message   string `json:"message"`
}

Error holds more detailed information about errors coming back from schema registry

func (*Error) Error

func (e *Error) Error() string

type Handler

type Handler interface {
	Handle(*Message) error
}

services consuming from Kafka should meet this contract

type Message

type Message struct {
	SchemaId            int
	Topic               string
	Partition           int32
	Offset              int64
	Key                 string
	Value               string
	HighWaterMarkOffset int64
}

type Producer

type Producer interface {
	// caller should run the returned function in a goroutine, and consume
	// the returned error channel until it's closed at shutdown.
	Background() (func(), chan error)

	// user-facing event emit API
	Send(ProducerMessage) error
}

type ProducerMessage

type ProducerMessage struct {
	Topic string
	Key   []byte
	Value []byte
}

abstracts kafka.Producer message type

type SchemaRegistryClient

type SchemaRegistryClient struct {
	SchemaRegistryConnect []string
	// contains filtered or unexported fields
}

SchemaRegistryClient is a basic http client to interact with schema registry

func NewSchemaRegistryClient

func NewSchemaRegistryClient(connect []string) *SchemaRegistryClient

NewSchemaRegistryClient creates a client to talk with the schema registry at the connect string By default it will retry failed requests (5XX responses and http errors) len(connect) number of times

func NewSchemaRegistryClientWithRetries

func NewSchemaRegistryClientWithRetries(connect []string, retries int) *SchemaRegistryClient

NewSchemaRegistryClientWithRetries creates an http client with a configurable amount of retries on 5XX responses

func (*SchemaRegistryClient) CreateSubject

func (client *SchemaRegistryClient) CreateSubject(subject string, codec *goavro.Codec) (int, error)

CreateSubject adds a schema to the subject

func (*SchemaRegistryClient) DeleteSubject

func (client *SchemaRegistryClient) DeleteSubject(subject string) error

DeleteSubject deletes a subject. It should only be used in development

func (*SchemaRegistryClient) DeleteVersion

func (client *SchemaRegistryClient) DeleteVersion(subject string, version int) error

DeleteVersion deletes a subject. It should only be used in development

func (*SchemaRegistryClient) GetLatestSchema

func (client *SchemaRegistryClient) GetLatestSchema(subject string) (*goavro.Codec, error)

GetLatestSchema returns a goavro.Codec for the latest version of the subject

func (*SchemaRegistryClient) GetSchema

func (client *SchemaRegistryClient) GetSchema(id int) (*goavro.Codec, error)

GetSchema returns a goavro.Codec by unique id

func (*SchemaRegistryClient) GetSchemaByVersion

func (client *SchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*goavro.Codec, error)

GetSchemaByVersion returns a goavro.Codec for the version of the subject

func (*SchemaRegistryClient) GetSubjects

func (client *SchemaRegistryClient) GetSubjects() ([]string, error)

GetSubjects returns a list of all subjects in the schema registry

func (*SchemaRegistryClient) GetVersions

func (client *SchemaRegistryClient) GetVersions(subject string) ([]int, error)

GetVersions returns a list of the versions of a subject

func (*SchemaRegistryClient) IsSchemaRegistered

func (client *SchemaRegistryClient) IsSchemaRegistered(subject string, codec *goavro.Codec) (int, error)

IsSchemaRegistered tests if the schema is registered, if so it returns the unique id of that schema

type SchemaRegistryClientInterface

type SchemaRegistryClientInterface interface {
	GetSchema(int) (*goavro.Codec, error)
	GetSubjects() ([]string, error)
	GetVersions(string) ([]int, error)
	GetSchemaByVersion(string, int) (*goavro.Codec, error)
	GetLatestSchema(string) (*goavro.Codec, error)
	CreateSubject(string, *goavro.Codec) (int, error)
	IsSchemaRegistered(string, *goavro.Codec) (int, error)
	DeleteSubject(string) error
	DeleteVersion(string, int) error
}

SchemaRegistryClientInterface defines the api for all clients interfacing with schema registry

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL