prototube

package module
v0.0.0-...-550848e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2024 License: Apache-2.0 Imports: 8 Imported by: 0

README

Prototube

Package prototube implements an API to publish strongly-typed events into Kafka.

Applications can publish events into a specific topic. A topic is always associated with a schema which defines the schema of the event. Events that do not conform with the schemas are rejected.

Internally prototube encodes the events using the following format:

<Magic Number> <Header> <Event>
  • Magic Number: 0x50, 0x42, 0x54, 0x42
  • Header: Protobuf-encoded structure that contains the metadata of the event (e.g, timestamp / uuid)
  • Event: Protobuf-encoded event

How to use

Prerequisite

Please follow this Kafka Quickstart link to install and start Kafka locally.

Quickstart

Build and run the example application to produce random messages to local Kafka.

$ cd examples
$ go run main.go

Compile a proto file

Below command generates the example example.pb.go from example.proto with module main.

$ cd examples
$ protoc --go_out=. idl/example.proto

Use prototube producer

Please see this example for your reference.

Code snippet:

producer, err := prototube.NewWithConfig("testTopic", &prototube.ProducerConfig{
	KafkaBootstrapBrokerList: []string{"localhost:9092"},
})

producer.Emit(&ExamplePrototubeMessage{
	Int32Field:  int32(rand.Intn(10000)),
	Int64Field:  rand.Int63n(int64(10000) + 10000),
	DoubleField: rand.Float64(),
})

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer a event log producer that pushes events into Kinesis stream.

func New

func New(bootstrapServer string, topic string) (*Producer, error)

New create a new producer for the specified topic

func NewWithConfig

func NewWithConfig(topic string, producerConfig *ProducerConfig) (*Producer, error)

NewWithConfig create a new producer for the specified topic

func (*Producer) Emit

func (p *Producer) Emit(msg proto.Message) error

Emit a new log in PB format to the stream with the current timestamp

func (*Producer) Stop

func (p *Producer) Stop() error

Stop stop the producer

type ProducerConfig

type ProducerConfig struct {

	// Kafka broker list
	KafkaBootstrapBrokerList []string

	// Kafka producer config for sarama
	KafkaProducerConfig *sarama.Config
}

ProducerConfig configs to create a Sarama Kafka Producer.

type PrototubeMessageHeader

type PrototubeMessageHeader struct {
	// Wire version of the Prototube message
	Version int32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"`
	// Schema version of the message
	SchemaVersion int32 `protobuf:"varint,2,opt,name=schema_version,json=schemaVersion,proto3" json:"schema_version,omitempty"`
	// Time when the message is produced since UNIX epoch
	Ts int64 `protobuf:"varint,3,opt,name=ts,proto3" json:"ts,omitempty"`
	// 16 bytes UUID
	Uuid                 []byte   `protobuf:"bytes,4,opt,name=uuid,proto3" json:"uuid,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

Server side message decoration

func (*PrototubeMessageHeader) Descriptor

func (*PrototubeMessageHeader) Descriptor() ([]byte, []int)

func (*PrototubeMessageHeader) GetSchemaVersion

func (m *PrototubeMessageHeader) GetSchemaVersion() int32

func (*PrototubeMessageHeader) GetTs

func (m *PrototubeMessageHeader) GetTs() int64

func (*PrototubeMessageHeader) GetUuid

func (m *PrototubeMessageHeader) GetUuid() []byte

func (*PrototubeMessageHeader) GetVersion

func (m *PrototubeMessageHeader) GetVersion() int32

func (*PrototubeMessageHeader) ProtoMessage

func (*PrototubeMessageHeader) ProtoMessage()

func (*PrototubeMessageHeader) Reset

func (m *PrototubeMessageHeader) Reset()

func (*PrototubeMessageHeader) String

func (m *PrototubeMessageHeader) String() string

func (*PrototubeMessageHeader) XXX_DiscardUnknown

func (m *PrototubeMessageHeader) XXX_DiscardUnknown()

func (*PrototubeMessageHeader) XXX_Marshal

func (m *PrototubeMessageHeader) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*PrototubeMessageHeader) XXX_Merge

func (m *PrototubeMessageHeader) XXX_Merge(src proto.Message)

func (*PrototubeMessageHeader) XXX_Size

func (m *PrototubeMessageHeader) XXX_Size() int

func (*PrototubeMessageHeader) XXX_Unmarshal

func (m *PrototubeMessageHeader) XXX_Unmarshal(b []byte) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL