go-kinesis

command module
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2019 License: MIT Imports: 5 Imported by: 0

README

Kinesis Client

Build Status GoDoc Go Report Card License MIT

This library is wrapper around the Kinesis API to consume and produce records, saving the last sequence into a checkpoint abstraction.

Features

  • Iterator Strategy (SinceOldest and SinceLatest);
  • Checkpoint Strategy (Saving each record or each batch);
  • Handles stream deleting;
  • Handles stream resharding, optionally you can set if you want to keep shard ordering or not;
  • Handles throttling.
  • Customizable timeouts/ticks;
  • Override AWS Configuration;
  • Some Checkpoint implementations out of the box;
  • Logging;
  • Event Logger
  • Metrics
  • Fully tested;
  • Being used in production environment.
  • Etc...

Installation

  • go get
go get -u github.com/kashmirtheone/go-kinesis
  • dep (github.com/golang/dep/cmd/dep)
dep ensure --add github.com/kashmirtheone/go-kinesis

Usage

Consumer

All you need is to pass a consumer configuration, inject the checkpoint implementation and send you message handler.

handler := func handler(_ context.Context, message kinesis.Message) error {
	fmt.Println(string(message.Data))
	return nil
}

config := kinesis.ConsumerConfig{
    Group:  "my-test-consumer",
    Stream: "stream-name-here"
}

checkpoint := memory.NewCheckpoint()
consumer, err := kinesis.NewConsumer(config, handler, checkpoint)
if err != nil {
	return errors.Wrap(err, "failed to create kinesis consumer")
}

if err := consumer.Start(); err != nil {
    return errors.Wrap(err, "failed to start kinesis consumer")
}

Shard iteration is handled in handler error. If error is nil, it will continue to iterate through shard.

Producer
config := kinesis.ProducerConfig{
    Stream: "stream-name-here",
}

producer, err := kinesis.NewProducer(config)
if err != nil {
    return errors.Wrap(err, "failed to create kinesis consumer")
}

message := kinesis.Message{
    Data: []byte("some-data"), 
    Partition: "some-partition",
}

if err := producer.Publish(message); err != nil {
    return errors.Wrap(err, "failed to publish message")
}

Warning: It's not recommended to publish in batch, some records can fail to be published. At the moment it's not being handler correctly.

Tools

Tail: Consumes the specified stream and writes to stdout all records after the most recent record in the shard.

Head: Consumes the specified stream and writes to stdout all untrimmed records in the shard.

Kinesis tool uses your specified AWS profile (read THIS for more information)

For more information, nothing better than using the help parameter.

me@ubuntu:~$ go-kinesis --help
Usage:
  go-kinesis [command]

Available Commands:
  head        The head utility displays the contents of kinesis stream to the standard output, starting in the oldest record.
  help        Help about any command
  tail        The tail utility displays the contents of kinesis stream to the standard output, starting in the latest record.

Flags:
  -h, --help   help for kinesis

Use "kinesis [command] --help" for more information about a command.
me@ubuntu:~$ go-kinesis tail --help
The tail utility displays the contents of kinesis stream to the standard output, starting in the latest record.

Usage:
  go-kinesis tail [flags]

Flags:
  -e, --endpoint string         kinesis endpoint
      --gzip                    enables gzip decoder
  -h, --help                    help for tail
      --logging                 enables logging, mute by default
  -n, --number int              number of messages to show
  -r, --region string           aws region, by default it will use AWS_REGION from aws config
      --skip-resharding-order   if enabled, consumer will skip ordering when resharding
  -s, --stream string           stream name

Example

me@ubuntu:~$ go get -u github.com/kashmirtheone/go-kinesis
me@ubuntu:~$ export AWS_PROFILE=some_aws_profile_here
me@ubuntu:~$ go-kinesis head -s some_stream -n 20

Future Improvements

  • Split or merge runners while resharding
  • Decouple logger
  • Add metrics
  • Handle correctly while publishing a batch

Documentation

The Go Gopher

There is no documentation for this package.

Directories

Path Synopsis
checkpoint
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL