consume

package
v5.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2024 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	HEX    = "hex"
	BASE64 = "base64"
	NONE   = "none"
)
View Source
const (
	ErrOffset = math.MinInt64
)

Variables

This section is empty.

Functions

func ConvertToEpocUnixMillis

func ConvertToEpocUnixMillis(timestamp string) (int64, error)

Converts string to epoch unix timestamp The string might be null in that case, the flag is considered absent and the value -1 is returned

Types

type AvroMessageDeserializer

type AvroMessageDeserializer struct {
	// contains filtered or unexported fields
}

func CreateAvroMessageDeserializer

func CreateAvroMessageDeserializer(topic string, avroSchemaRegistry string, jsonCodec avro.JSONCodec) (AvroMessageDeserializer, error)

func (AvroMessageDeserializer) CanDeserialize

func (deserializer AvroMessageDeserializer) CanDeserialize(topic string) (bool, error)

func (AvroMessageDeserializer) Deserialize

func (deserializer AvroMessageDeserializer) Deserialize(rawMsg *sarama.ConsumerMessage, flags Flags) error

type CachingSchemaRegistry

type CachingSchemaRegistry struct {
	// contains filtered or unexported fields
}

func CreateCachingSchemaRegistry

func CreateCachingSchemaRegistry(avroSchemaRegistry string) (*CachingSchemaRegistry, error)

func (*CachingSchemaRegistry) GetSchemaByID

func (registry *CachingSchemaRegistry) GetSchemaByID(id int) (string, error)

func (*CachingSchemaRegistry) Subjects

func (registry *CachingSchemaRegistry) Subjects() ([]string, error)

type ConsumedMessage

type ConsumedMessage struct {
	Partition int32
	Offset    int64
	Key       []byte
	Value     []byte
	Timestamp *time.Time
}

type Consumer

type Consumer interface {
	Start(ctx context.Context, flags Flags, messages chan<- *sarama.ConsumerMessage, stopConsumers <-chan bool) error
	Wait() error
	Close() error
}

type DefaultMessageDeserializer

type DefaultMessageDeserializer struct {
}

func (DefaultMessageDeserializer) CanDeserialize

func (deserializer DefaultMessageDeserializer) CanDeserialize(_ string) (bool, error)

func (DefaultMessageDeserializer) Deserialize

func (deserializer DefaultMessageDeserializer) Deserialize(rawMsg *sarama.ConsumerMessage, flags Flags) error

type Flags

type Flags struct {
	PrintPartitions  bool
	PrintKeys        bool
	PrintTimestamps  bool
	PrintAvroSchema  bool
	PrintHeaders     bool
	OutputFormat     string
	Separator        string
	Group            string
	Partitions       []int
	Offsets          []string
	FromBeginning    bool
	FromTimestamp    string
	ToTimestamp      string
	Tail             int
	Exit             bool
	MaxMessages      int64
	EncodeValue      string
	EncodeKey        string
	ProtoFiles       []string
	ProtoImportPaths []string
	ProtosetFiles    []string
	KeyProtoType     string
	ValueProtoType   string
	IsolationLevel   string
}

type GroupConsumer

type GroupConsumer struct {
	// contains filtered or unexported fields
}

func CreateGroupConsumer

func CreateGroupConsumer(client *sarama.Client, topic string, group string) (*GroupConsumer, error)

func (*GroupConsumer) Close

func (c *GroupConsumer) Close() error

func (*GroupConsumer) Start

func (c *GroupConsumer) Start(ctx context.Context, flags Flags, messages chan<- *sarama.ConsumerMessage, stopConsumers <-chan bool) error

func (*GroupConsumer) Wait

func (c *GroupConsumer) Wait() error

type MessageDeserializer

type MessageDeserializer interface {
	CanDeserialize(topic string) (bool, error)
	Deserialize(msg *sarama.ConsumerMessage, flags Flags) error
}

type MessageDeserializerChain

type MessageDeserializerChain []MessageDeserializer

func (MessageDeserializerChain) CanDeserialize

func (deserializer MessageDeserializerChain) CanDeserialize(topic string) (bool, error)

func (MessageDeserializerChain) Deserialize

func (deserializer MessageDeserializerChain) Deserialize(msg *sarama.ConsumerMessage, flags Flags) error

type Operation

type Operation struct {
}

func (*Operation) Consume

func (operation *Operation) Consume(topic string, flags Flags) error

type PartitionConsumer

type PartitionConsumer struct {
	// contains filtered or unexported fields
}

func CreatePartitionConsumer

func CreatePartitionConsumer(client *sarama.Client, topic string, partitions []int) (*PartitionConsumer, error)

func (*PartitionConsumer) Close

func (c *PartitionConsumer) Close() error

func (*PartitionConsumer) Start

func (c *PartitionConsumer) Start(ctx context.Context, flags Flags, messages chan<- *sarama.ConsumerMessage, stopConsumers <-chan bool) error

func (*PartitionConsumer) Wait

func (c *PartitionConsumer) Wait() error

type ProtobufMessageDeserializer

type ProtobufMessageDeserializer struct {
	// contains filtered or unexported fields
}

func CreateProtobufMessageDeserializer

func CreateProtobufMessageDeserializer(context protobuf.SearchContext, keyType, valueType string) (*ProtobufMessageDeserializer, error)

func (ProtobufMessageDeserializer) CanDeserialize

func (deserializer ProtobufMessageDeserializer) CanDeserialize(_ string) (bool, error)

func (ProtobufMessageDeserializer) Deserialize

func (deserializer ProtobufMessageDeserializer) Deserialize(rawMsg *sarama.ConsumerMessage, flags Flags) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL