kinesis

package module
v0.0.0-...-92da4da Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2023 License: MIT Imports: 21 Imported by: 0

README

A Kinesis Pub/Sub for Watermill

This is a Pub/Sub backend for the popular Go library Watermill

Watermill is a Go library for working efficiently with message streams. It is intended for building event driven applications, enabling event sourcing, RPC over messages, sagas and basically whatever else comes to your mind. You can use conventional pub/sub implementations like Kafka or RabbitMQ, but also HTTP or MySQL binlog if that fits your use case.

Use with caution, this is alpha quality code for now. Feedback and contributions are greatly appreciated.

Here's how to build a Subscriber:

import "github.com/vmware/vmware-go-kcl-v2/clientlibrary/config"
import "github.com/dvictor/watermill-kinesis"
import "github.com/ThreeDotsLabs/watermill"


  kclConfig := config.NewKinesisClientLibConfig(
    "samplestream",
    "unused", // topic will be overwritten in Subscribe
    "us-west-2",
    workerID, // workerID identifies this worker process
              // if the process is restarted, it would continue
              // consuming the same shards
  )
  // you can start many workers, depending on the number of shards
  // that your stream has
  // a base number can be 10 shards per worker
  
  subscriber, err := kinesis.SubscriberBuilder(kclConfig).Build()
  // this is where the topic (Kinesis stream name) is provided
  messages, err := subscriber.Subscribe(ctx, topic) 

  // for each worker, you can start processing messages in parallel
  // in multiple goroutines. tune this number for your workload
  for i := 0; i < 10; i++ {
     go func() {
      for msg := range messages {
       var data map[string]any
    
       logger.Info("received message", watermill.LogFields{
        "messageID": msg.UUID,
        "pkey":      msg.Metadata[kinesis.PartitionKeyKey],
        "shard":     msg.Metadata[kinesis.ShardIDKey],
        "payload":   string(msg.Payload),
       })
       // Must acknowledge that message was processed to avoid deadlock
       // if processing failed, can instead do msg.Nack() and the message will be served again on the channel
       // there's no limit to the number of retries, but one can use msg.Metadata["retryCount"] to store retries
       msg.Ack()
      }
     }()
  }
  // Note that you should use a WaitGroup to make sure all consumer
  // goroutines above finish their work for a clean exit.

  // Wait for a signal to stop the application.
  c := make(chan os.Signal, 1)
  signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
  <-c

For more detailed examples, check the example package. A Localstack Kinesis in Docker Compose example is also provided.

Also, check this introduction article

Documentation

Index

Constants

View Source
const ApproximateArrivalTimestampKey = "approximateArrivalTimestamp"
View Source
const PartitionKeyKey = "partitionKey"
View Source
const ShardIDKey = "shardID"

Variables

This section is empty.

Functions

func JSONMarshaller

func JSONMarshaller(message *message.Message) (types.PutRecordsRequestEntry, error)

func JSONUnmarshaller

func JSONUnmarshaller(record types.Record) (*message.Message, error)

JSONUnmarshaller uses JSON to decode the message from Kinesis. Works with JSONMarshaller on the publisher side

Types

type ClientOptions

type ClientOptions = kinesis.Options

type Marshaller

type Marshaller func(message *message.Message) (types.PutRecordsRequestEntry, error)

type MessageData

type MessageData struct {
	WatermillMessageUUID string            `json:"watermill_message_uuid"`
	Data                 string            `json:"data"`
	Headers              map[string]string `json:"headers"`
}

type PubBuilder

type PubBuilder struct {
	// contains filtered or unexported fields
}

func PublisherBuilder

func PublisherBuilder(options *ClientOptions, optFuncs ...func(options *ClientOptions)) PubBuilder

func (PubBuilder) Build

func (b PubBuilder) Build() *Publisher

func (PubBuilder) WithEndpoint

func (b PubBuilder) WithEndpoint(endpoint string) PubBuilder

func (PubBuilder) WithMarshaller

func (b PubBuilder) WithMarshaller(marshaller Marshaller) PubBuilder

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func (*Publisher) Close

func (p *Publisher) Close() error

func (*Publisher) Publish

func (p *Publisher) Publish(topic string, messages ...*message.Message) error

type StdLogger

type StdLogger struct {
	// contains filtered or unexported fields
}

func NewLogger

func NewLogger(out io.Writer, info, debug, trace bool) *StdLogger

NewLogger creates a Watermill logger that prints to Stdout This is different from watermill.NewLogger in that it does not print the caller file/line. Due to the log adapter used for the KCL library, the file location is incorrect.

func NewStdLogger

func NewStdLogger(info, debug, trace bool) *StdLogger

func (*StdLogger) Debug

func (l *StdLogger) Debug(msg string, fields watermill.LogFields)

func (*StdLogger) Error

func (l *StdLogger) Error(msg string, err error, fields watermill.LogFields)

func (*StdLogger) Info

func (l *StdLogger) Info(msg string, fields watermill.LogFields)

func (*StdLogger) Trace

func (l *StdLogger) Trace(msg string, fields watermill.LogFields)

func (*StdLogger) With

func (*StdLogger) WithDebug

func (l *StdLogger) WithDebug(debug bool) *StdLogger

func (*StdLogger) WithPrefix

func (l *StdLogger) WithPrefix(prefix string) *StdLogger

type SubBuilder

type SubBuilder struct {
	// contains filtered or unexported fields
}

func SubscriberBuilder

func SubscriberBuilder(kclConfig *config.KinesisClientLibConfiguration) SubBuilder

func (SubBuilder) Build

func (b SubBuilder) Build() (*subscriber.Subscriber, error)

func (SubBuilder) WithLogger

func (b SubBuilder) WithLogger(logger watermill.LoggerAdapter) SubBuilder

func (SubBuilder) WithUnmarshaller

func (b SubBuilder) WithUnmarshaller(unmarshaller Unmarshaller) SubBuilder

type Unmarshaller

type Unmarshaller = func(record types.Record) (*message.Message, error)

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL