kafka

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2022 License: MIT Imports: 14 Imported by: 0

README

go-queue-kafka

A proof of concept for a in-memory/persistent queue that uses kafka (incredibly similar to durostore)

Commands

To create a topic

kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 2 --topic queue_in

To alter an existing topic

kafka-topics.sh --zookeeper zookeeper:2181 --alter --topic queue_in -partitions 2

Documentation

Index

Constants

View Source
const (
	DefaultTopicIn           string        = "queue.in"
	DefaultTopicOut          string        = "queue.out"
	DefaultQueueSize         int           = 10000
	DefaultCommitRate        time.Duration = time.Minute
	DefaultRebalanceStrategy string        = "roundrobin"
)
View Source
const (
	EnvNameKafkaBrokers      string = "KAFKA_BROKERS"
	EnvNameKafkaClientId     string = "KAFKA_CLIENT_ID"
	EnvNameKafkaGroupId      string = "KAFKA_GROUP_ID"
	EnvNameKafkaEnableLog    string = "KAFKA_ENABLE_LOG"
	EnvNameKafkaQueueSize    string = "KAFKA_QUEUE_SIZE"
	EnvNameKafkaTopicIn      string = "KAFKA_TOPIC_IN"
	EnvNameKafkaTopicOut     string = "KAFKA_TOPIC_OUT"
	EnvNameCommitRate        string = "KAFKA_COMMIT_RATE"
	EnvNameRebalanceStrategy string = "KAFKA_REBALANCE_STRATEGY"
)
View Source
const (
	UnsupportedTypef              string = "unsupported type: %T"
	NoBrokersConfigured           string = "no brokers configured"
	NoClientIdConfigured          string = "no client id configured"
	NoGroupIdConfigured           string = "no group id configured"
	CommitRateLessThanOrEqualZero string = "commit rate less than or equal to zero"
)

Variables

View Source
var (
	ErrNoBrokersConfigured           = errors.New(NoBrokersConfigured)
	ErrNoClientIdConfigured          = errors.New(NoClientIdConfigured)
	ErrNoGroupIdConfigured           = errors.New(NoGroupIdConfigured)
	ErrCommitRateLessThanOrEqualZero = errors.New(CommitRateLessThanOrEqualZero)
)
View Source
var DefaultBrokers = []string{"localhost:9092"}

Functions

Types

type Configuration

type Configuration struct {
	Brokers           []string      `json:"brokers"`
	ClientId          string        `json:"client_id"`
	GroupId           string        `json:"group_id"`
	EnableLog         bool          `json:"enable_log"`
	QueueSize         int           `json:"queue_size"`
	TopicIn           string        `json:"topic_in"`
	TopicOut          string        `json:"topic_out"`
	CommitRate        time.Duration `json:"commit_rate"`
	RebalanceStrategy string        `json:"rebalance_strategy"`
}

func (*Configuration) Default

func (c *Configuration) Default()

func (*Configuration) FromEnv

func (c *Configuration) FromEnv(envs map[string]string)

func (*Configuration) ToKafka

func (c *Configuration) ToKafka() ([]string, *sarama.Config)

func (*Configuration) Validate

func (c *Configuration) Validate() error

type ErrorHandlerFx

type ErrorHandlerFx func(error)

type Example

type Example struct {
	*pb.Example
}

func (*Example) MarshalBinary

func (e *Example) MarshalBinary() ([]byte, error)

func (*Example) UnmarshalBinary

func (e *Example) UnmarshalBinary(bytes []byte) error

type Owner

type Owner interface {
	Initialize(config *Configuration) (err error)
	Shutdown()
}

type Wrapper

type Wrapper struct {
	*pb.Wrapper
}

func WrapperConvertMultiple

func WrapperConvertMultiple(items []interface{}) []*Wrapper

func WrapperConvertSingle

func WrapperConvertSingle(item interface{}) *Wrapper

func (*Wrapper) FromWrapper

func (w *Wrapper) FromWrapper() (interface{}, error)

func (*Wrapper) MarshalBinary

func (w *Wrapper) MarshalBinary() ([]byte, error)

func (*Wrapper) ToWrapper

func (w *Wrapper) ToWrapper(message proto.Message) error

func (*Wrapper) UnmarshalBinary

func (w *Wrapper) UnmarshalBinary(bytes []byte) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL