Kinesis Client
This library is wrapper around the Kinesis API to consume and produce records, saving the last sequence into a checkpoint abstraction.
Features
- Iterator Strategy (SinceOldest and SinceLatest);
- Checkpoint Strategy (Saving each record or each batch);
- Handles stream deleting;
- Handles stream resharding, optionally you can set if you want to keep shard ordering or not;
- Handles throttling.
- Customizable timeouts/ticks;
- Override AWS Configuration;
- Some Checkpoint implementations out of the box;
- Logging;
- Event Logger
- Metrics
- Fully tested;
- Being used in production environment.
- Etc...
Installation
go get -u github.com/kashmirtheone/go-kinesis
- dep (github.com/golang/dep/cmd/dep)
dep ensure --add github.com/kashmirtheone/go-kinesis
Usage
Consumer
All you need is to pass a consumer configuration, inject the checkpoint implementation and send you message handler.
handler := func handler(_ context.Context, message kinesis.Message) error {
fmt.Println(string(message.Data))
return nil
}
config := kinesis.ConsumerConfig{
Group: "my-test-consumer",
Stream: "stream-name-here"
}
checkpoint := memory.NewCheckpoint()
consumer, err := kinesis.NewConsumer(config, handler, checkpoint)
if err != nil {
return errors.Wrap(err, "failed to create kinesis consumer")
}
if err := consumer.Start(); err != nil {
return errors.Wrap(err, "failed to start kinesis consumer")
}
Shard iteration is handled in handler error.
If error is nil, it will continue to iterate through shard.
Producer
config := kinesis.ProducerConfig{
Stream: "stream-name-here",
}
producer, err := kinesis.NewProducer(config)
if err != nil {
return errors.Wrap(err, "failed to create kinesis consumer")
}
message := kinesis.Message{
Data: []byte("some-data"),
Partition: "some-partition",
}
if err := producer.Publish(message); err != nil {
return errors.Wrap(err, "failed to publish message")
}
Warning: It's not recommended to publish in batch, some records can fail to be published.
At the moment it's not being handler correctly.
Tail: Consumes the specified stream and writes to stdout all records after the most recent record in the shard.
Head: Consumes the specified stream and writes to stdout all untrimmed records in the shard.
Kinesis tool uses your specified AWS profile (read THIS for more information)
For more information, nothing better than using the help parameter.
me@ubuntu:~$ go-kinesis --help
Usage:
go-kinesis [command]
Available Commands:
head The head utility displays the contents of kinesis stream to the standard output, starting in the oldest record.
help Help about any command
tail The tail utility displays the contents of kinesis stream to the standard output, starting in the latest record.
Flags:
-h, --help help for kinesis
Use "kinesis [command] --help" for more information about a command.
me@ubuntu:~$ go-kinesis tail --help
The tail utility displays the contents of kinesis stream to the standard output, starting in the latest record.
Usage:
go-kinesis tail [flags]
Flags:
-e, --endpoint string kinesis endpoint
--gzip enables gzip decoder
-h, --help help for tail
--logging enables logging, mute by default
-n, --number int number of messages to show
-r, --region string aws region, by default it will use AWS_REGION from aws config
--skip-resharding-order if enabled, consumer will skip ordering when resharding
-s, --stream string stream name
Example
me@ubuntu:~$ go get -u github.com/kashmirtheone/go-kinesis
me@ubuntu:~$ export AWS_PROFILE=some_aws_profile_here
me@ubuntu:~$ go-kinesis head -s some_stream -n 20
Future Improvements
- Split or merge runners while resharding
- Decouple logger
- Add metrics
- Handle correctly while publishing a batch