producer

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2021 License: MIT Imports: 13 Imported by: 0

README

Amazon kinesis producer License GoDoc

A KPL-like batch producer for Amazon Kinesis built on top of the official Go AWS SDK and using the same aggregation format that KPL use.

Example
package main

import (
	"time"

	"github.com/sirupsen/logrus"
	"github.com/mjneil/kinesis-producer"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/kinesis"
)

func main() {
	client := kinesis.New(session.New(aws.NewConfig()))
	pr := producer.New(&producer.Config{
		StreamName:   "test",
		BacklogCount: 2000,
		Client:       client
	})

	pr.Start()

	failures := pr.NotifyFailures()

	// Handle failures
	go func() {
		for r := range failures {
			log.Error(r)
		}
	}()

	go func() {
		for i := 0; i < 5000; i++ {
			err := pr.Put([]byte("foo"), "bar")
			if err != nil {
				log.WithError(err).Fatal("error producing")
			}
		}
	}()

	time.Sleep(3 * time.Second)
	pr.Stop()
}
Shard Mapping

The Producer supports aggregation based on a shard map. UserRecords get mapped to a shard using the md5 hash of the Partition Key or a provided Explicit Hash Key. Records mapped to the same shard are aggregated together.

By default, shard mapping is disabled. To use the shard mapping feature, you need to set Config.GetShards. This function will be called on producer initialization to populate the shard map. You can optionally provide a refresh interval Config.ShardRefreshInterval to update the map. Note that Puts to the Producer are blocked while it is updating the shard map so that it can reaggregate requests based on the new map. It is only blocking during the reaggregation phase.

This package provides a GetShards function GetKinesisShardsFunc that uses an AWS client to call the ListShards API to get the shard list.

Note At the time of writing, using the shard map feature adds significant overhead. Depending on the configuration and your record set, this can be more than 2x slower. Providing an explicit hash key for user records can help reduce this by quite a bit. Take a look at the benchmarks in producer_test.go for examples.

Example
package main

import (
	"time"

	"github.com/mjneil/kinesis-producer"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/kinesis"
	"github.com/google/uuid"
)

func main() {
	client := kinesis.New(session.New(aws.NewConfig()))
	pr := producer.New(&producer.Config{
		StreamName:           "test",
		BacklogCount:         2000,
		Client:               client,
		GetShards:            producer.GetKinesisShardsFunc(client, "test"),
		ShardRefreshInterval: 5 * time.Second,
	})

	pr.Start()

	failures := pr.NotifyFailures()

	// Handle failures
	go func() {
		for r := range failures {
			log.Error(r)
		}
	}()

	go func() {
		for i := 0; i < 1000; i++ {
			pk := uuid.New().String()
			for j := 0; j < 5; j++ {
				err := pr.Put([]byte("foo"), pk)
				if err != nil {
					log.WithError(err).Fatal("error producing")
				}
			}
		}
	}()

	time.Sleep(3 * time.Second)
	pr.Stop()
}

UserRecord interface

You can optionally define a custom struct that implements the UserRecord interface and put using Producer.PutUserRecord. The producer will hold onto the reference in case of any failures. Do not attempt to modify or use the reference after passing it to the producer until you receive it back in a failure record, otherwise thread issues may occur.

Example
package main

import (
	"encoding/json"
	"math/big"
	"time"

	"github.com/mjneil/kinesis-producer"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/kinesis"
	"github.com/google/uuid"
)

type myExampleUserRecord struct {
	Id   string `json:"id"`
	Key  string `json:"key"`
	Val  string `json:"val"`
	data []byte `json:"-"`
}

func (r *myExampleUserRecord) PartitionKey() string      { return r.id }
func (r *myExampleUserRecord) ExplicitHashKey() *big.Int { return nil }
func (r *myExampleUserRecord) Data() []byte              { return r.data }
func (r *myExampleUserRecord) Size() int                 { return len(r.data) }

func newMyExampleUserRecord(key, val string) (*myExampleUserRecord, error) {
	r := &myExampleUserRecord{
		Id:  uuid.New().String(),
		Key: key,
		Val: val,
	}
	data, err := json.Marshal(r)
	if err != nil {
		return nil, err
	}
	r.data = data
	return r, nil
}

func main() {
	client := kinesis.New(session.New(aws.NewConfig()))
	pr := producer.New(&producer.Config{
		StreamName:           "test",
		BacklogCount:         2000,
		Client:               client,
		GetShards:            producer.GetKinesisShardsFunc(client, "test"),
		ShardRefreshInterval: 5 * time.Second,
	})

	pr.Start()

	failures := pr.NotifyFailures()

	// Handle failures
	go func() {
		for r := range failures {
			log.Error(r)
		}
	}()

	go func() {
		for i := 0; i < 5000; i++ {
			record, err := newMyExampleUserRecord("foo", "bar")
			if err != nil {
				log.WithError(err).Fatal("error creating user record")
			}
			err = pr.PutUserRecord(record)
			if err != nil {
				log.WithError(err).Fatal("error producing")
			}
		}
	}()

	time.Sleep(3 * time.Second)
	pr.Stop()
}
Specifying logger implementation

producer.Config takes an optional logging.Logger implementation.

Using a custom logger
customLogger := &CustomLogger{}

&producer.Config{
  StreamName:   "test",
  BacklogCount: 2000,
  Client:       client,
  Logger:       customLogger,
}
Using logrus
import (
	"github.com/sirupsen/logrus"
	producer "github.com/mjneil/kinesis-producer"
	"github.com/mjneil/kinesis-producer/loggers"
)

log := logrus.New()

&producer.Config{
  StreamName:   "test",
  BacklogCount: 2000,
  Client:       client,
  Logger:       loggers.Logrus(log),
}

kinesis-producer ships with three logger implementations.

  • producer.Standard used the standard library logger
  • loggers.Logrus uses logrus logger
  • loggers.Zap uses zap logger
License

MIT

Documentation

Overview

Amazon kinesis producer A KPL-like batch producer for Amazon Kinesis built on top of the official Go AWS SDK and using the same aggregation format that KPL use.

Note: this project start as a fork of `tj/go-kinesis`. if you are not intersting in the KPL aggregation logic, you probably want to check it out.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AggregatedRecordRequest

type AggregatedRecordRequest struct {
	Entry       *k.PutRecordsRequestEntry
	UserRecords []UserRecord
}

Contains the AWS Kinesis PutRecordsRequestEntry and UserRecords that are aggregated into the request. UserRecords are provided for more control over failure notifications

func NewAggregatedRecordRequest

func NewAggregatedRecordRequest(data []byte, partitionKey, explicitHashKey *string, userRecords []UserRecord) *AggregatedRecordRequest

type Aggregator

type Aggregator struct {
	// Aggregator holds onto its own RWMutex, but the caller of Aggregator methods is expected
	// to call Lock/Unlock
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewAggregator

func NewAggregator(explicitHashKey *string) *Aggregator

NewAggregator initializes a new Aggregator with the given partitionKey

func (*Aggregator) Count

func (a *Aggregator) Count() int

Count return how many records stored in the aggregator.

func (*Aggregator) Drain

func (a *Aggregator) Drain() (*AggregatedRecordRequest, error)

Drain create an aggregated `kinesis.PutRecordsRequestEntry` that compatible with the KCL's deaggregation logic.

If you interested to know more about it. see: aggregation-format.md

func (*Aggregator) Put

func (a *Aggregator) Put(userRecord UserRecord)

Put record using `data` and `partitionKey`. This method is thread-safe.

func (*Aggregator) Size

func (a *Aggregator) Size() int

Size return how many bytes stored in the aggregator. including partition keys.

func (*Aggregator) WillOverflow

func (a *Aggregator) WillOverflow(userRecord UserRecord) bool

WillOverflow checks if the aggregator will exceed max record size by attempting to Put the user record. If true, the aggregator should be drained before attempting a Put.

type Config

type Config struct {
	// StreamName is the Kinesis stream.
	StreamName string

	// FlushInterval is a regular interval for flushing the buffer. Defaults to 5s.
	FlushInterval time.Duration

	// ShardRefreshInterval is a regular interval for refreshing the ShardMap.
	// Config.GetShards will be called at this interval. A value of 0 means no refresh
	// occurs. Default is 0
	ShardRefreshInterval time.Duration

	// GetShards is called on NewProducer to initialze the ShardMap.
	// If ShardRefreshInterval is non-zero, GetShards will be called at that interval.
	// The default function returns a nil list of shards, which results in all records being
	// aggregated to a single record.
	GetShards GetShardsFunc

	// BatchCount determine the maximum number of items to pack in batch.
	// Must not exceed length. Defaults to 500.
	BatchCount int

	// BatchSize determine the maximum number of bytes to send with a PutRecords request.
	// Must not exceed 5MiB; Default to 5MiB.
	BatchSize int

	// AggregateBatchCount determine the maximum number of items to pack into an aggregated record.
	AggregateBatchCount int

	// AggregationBatchSize determine the maximum number of bytes to pack into an aggregated record. User records larger
	// than this will bypass aggregation.
	AggregateBatchSize int

	// BacklogCount determines the channel capacity before Put() will begin blocking. Default to `BatchCount`.
	BacklogCount int

	// Number of requests to sent concurrently. Default to 24.
	// If you are using the ListShards API in your GetShards function, those connections
	// will not be counted in MaxConnections.
	MaxConnections int

	// Logger is the logger used. Default to producer.Logger.
	Logger Logger

	// Enabling verbose logging. Default to false.
	Verbose bool

	// Client is the Putter interface implementation.
	Client Putter
}

Config is the Producer configuration.

type DataRecord

type DataRecord struct {
	// contains filtered or unexported fields
}

func NewDataRecord

func NewDataRecord(data []byte, partitionKey string) *DataRecord

func (*DataRecord) Data

func (r *DataRecord) Data() []byte

func (*DataRecord) ExplicitHashKey

func (r *DataRecord) ExplicitHashKey() *big.Int

func (*DataRecord) PartitionKey

func (r *DataRecord) PartitionKey() string

func (*DataRecord) Size

func (r *DataRecord) Size() int

type DrainError

type DrainError struct {
	Err error
	// UserRecords in the buffer when drain attempt was made
	UserRecords []UserRecord
}

func (*DrainError) Error

func (e *DrainError) Error() string

type ErrIllegalPartitionKey

type ErrIllegalPartitionKey struct {
	UserRecord
}

func (*ErrIllegalPartitionKey) Error

func (e *ErrIllegalPartitionKey) Error() string

type ErrRecordSizeExceeded

type ErrRecordSizeExceeded struct {
	UserRecord
}

func (*ErrRecordSizeExceeded) Error

func (e *ErrRecordSizeExceeded) Error() string

type ErrStoppedProducer

type ErrStoppedProducer struct {
	UserRecord
}

func (*ErrStoppedProducer) Error

func (e *ErrStoppedProducer) Error() string

type FailureRecord

type FailureRecord struct {
	Err error
	// The PartitionKey that was used in the kinesis.PutRecordsRequestEntry
	PartitionKey string
	// The ExplicitHashKey that was used in the kinesis.PutRecordsRequestEntry. Will be the
	// empty string if nil
	ExplicitHashKey string
	// UserRecords that were contained in the failed aggregated record request
	UserRecords []UserRecord
}

Failure record type for failures from Kinesis PutRecords request

func (*FailureRecord) Error

func (e *FailureRecord) Error() string

type GetShardsFunc

type GetShardsFunc func(old []*k.Shard) ([]*k.Shard, bool, error)

GetShardsFunc is called to populate the shard map on initialization and during refresh shard interval. GetShardsFunc will be called with the current shard list. During initialization, this will be nil. GetShardsFunc should return a shard list, a bool indicating if the shards should be updated and an error. If false bool or error is returned, shards will not be updated.

func GetKinesisShardsFunc

func GetKinesisShardsFunc(client ShardLister, streamName string) GetShardsFunc

GetKinesisShardsFunc gets the active list of shards from Kinesis.ListShards API

func StaticGetShardsFunc

func StaticGetShardsFunc(count int) GetShardsFunc

StaticGetShardsFunc returns a GetShardsFunc that when called, will generate a static list of shards with length count whos HashKeyRanges are evenly distributed

type LogValue

type LogValue struct {
	Name  string
	Value interface{}
}

LogValue represents a key:value pair used by the Logger interface

func (LogValue) String

func (v LogValue) String() string

type Logger

type Logger interface {
	Info(msg string, values ...LogValue)
	Error(msg string, err error, values ...LogValue)
}

Logger represents a simple interface used by kinesis-producer to handle logging

type NopLogger

type NopLogger struct{}

func (*NopLogger) Error

func (_ *NopLogger) Error(msg string, err error, values ...LogValue)

func (*NopLogger) Info

func (_ *NopLogger) Info(msg string, values ...LogValue)

type Producer

type Producer struct {
	sync.RWMutex
	*Config
	// contains filtered or unexported fields
}

func New

func New(config *Config) *Producer

func (*Producer) NotifyFailures

func (p *Producer) NotifyFailures() <-chan error

NotifyFailures registers and return listener to handle undeliverable messages. The incoming struct has a copy of the Data and the PartitionKey along with some error information about why the publishing failed.

func (*Producer) Put

func (p *Producer) Put(data []byte, partitionKey string) error

Put `data` using `partitionKey` asynchronously. This method is thread-safe.

Under the covers, the Producer will automatically re-attempt puts in case of transient errors. When unrecoverable error has detected(e.g: trying to put to in a stream that doesn't exist), the message will returned by the Producer. Add a listener with `Producer.NotifyFailures` to handle undeliverable messages.

func (*Producer) PutUserRecord

func (p *Producer) PutUserRecord(userRecord UserRecord) error

func (*Producer) Start

func (p *Producer) Start()

func (*Producer) Stop

func (p *Producer) Stop()

type Putter

type Putter interface {
	PutRecords(*k.PutRecordsInput) (*k.PutRecordsOutput, error)
}

Putter is the interface that wraps the KinesisAPI.PutRecords method.

type ShardBucketError

type ShardBucketError struct {
	UserRecord
}

func (*ShardBucketError) Error

func (s *ShardBucketError) Error() string

type ShardLister

type ShardLister interface {
	ListShards(input *k.ListShardsInput) (*k.ListShardsOutput, error)
}

ShardLister is the interface that wraps the KinesisAPI.ListShards method.

type ShardMap

type ShardMap struct {
	sync.RWMutex
	// contains filtered or unexported fields
}
Example
logger := &StdLogger{log.New(os.Stdout, "", log.LstdFlags)}
client := kinesis.New(session.New(aws.NewConfig()))
pr := New(&Config{
	StreamName:           "test",
	BacklogCount:         2000,
	Client:               client,
	GetShards:            GetKinesisShardsFunc(client, "test"),
	ShardRefreshInterval: 5 * time.Second,
	Logger:               logger,
})

pr.Start()

failures := pr.NotifyFailures()

// Handle failures
go func() {
	for r := range failures {
		logger.Error("detected put failure", r)
	}
}()

go func() {
	for i := 0; i < 1000; i++ {
		pk := uuid.New().String()
		for j := 0; j < 5; j++ {
			err := pr.Put([]byte("foo"), pk)
			if err != nil {
				logger.Error("error producing", err)
			}
		}
	}
}()

time.Sleep(3 * time.Second)
pr.Stop()
Output:

func NewShardMap

func NewShardMap(shards []*k.Shard, aggregateBatchCount int) *ShardMap

NewShardMap initializes an aggregator for each shard. UserRecords that map to the same shard based on MD5 hash of their partition key (Same method used by Kinesis) will be aggregated together. Aggregators will use an ExplicitHashKey from their assigned shards when creating kinesis.PutRecordsRequestEntry. A ShardMap with an empty shards slice will return to unsharded behavior with a single aggregator. The aggregator will instead use the PartitionKey of the first UserRecord and no ExplicitHashKey.

func (*ShardMap) Drain

func (m *ShardMap) Drain() ([]*AggregatedRecordRequest, []error)

Drain drains all the aggregators and returns a list of the results

func (*ShardMap) Put

func (m *ShardMap) Put(userRecord UserRecord) (*AggregatedRecordRequest, error)

Put puts a UserRecord into the aggregator that maps to its partition key.

func (*ShardMap) Shards

func (m *ShardMap) Shards() []*k.Shard

Shards returns the list of shards

func (*ShardMap) Size

func (m *ShardMap) Size() int

Size return how many bytes stored in all the aggregators. including partition keys.

func (*ShardMap) UpdateShards

func (m *ShardMap) UpdateShards(shards []*k.Shard, pendingRecords []*AggregatedRecordRequest) ([]*AggregatedRecordRequest, error)

Update the list of shards and redistribute buffered user records. Returns any records that were drained due to redistribution. Shards are not updated if an error occurs during redistribution. TODO: Can we optimize this? TODO: How to handle shard splitting? If a shard splits but we don't remap before sending

records to the new shards, once we do update our mapping, user records may end up
in a new shard and we would lose the shard ordering. Consumer can probably figure
it out since we retain original partition keys (but not explicit hash keys)
Shard merging should not be an issue since records from both shards should fall
into the merged hash key range.

type ShardRefreshError

type ShardRefreshError struct {
	Err error
}

func (*ShardRefreshError) Error

func (s *ShardRefreshError) Error() string

type ShardSlice

type ShardSlice []*k.Shard

func (ShardSlice) Len

func (p ShardSlice) Len() int

func (ShardSlice) Less

func (p ShardSlice) Less(i, j int) bool

func (ShardSlice) Swap

func (p ShardSlice) Swap(i, j int)

type StdLogger

type StdLogger struct {
	Logger *log.Logger
}

StdLogger implements the Logger interface using standard library loggers

func (*StdLogger) Error

func (l *StdLogger) Error(msg string, err error, values ...LogValue)

Error prints log message

func (*StdLogger) Info

func (l *StdLogger) Info(msg string, values ...LogValue)

Info prints log message

type UserRecord

type UserRecord interface {
	// PartitionKey returns the partition key of the record
	PartitionKey() string
	// ExplicitHashKey returns an optional explicit hash key that will be used for shard
	// mapping. Should return nil if there is none.
	ExplicitHashKey() *big.Int
	// The raw data payload of the record that should be added to the record
	Data() []byte
	// Size is the size of the record's data. Do not include the size of the partition key
	// in this result. The partition key's size is calculated separately by the aggregator.
	Size() int
}

UserRecord represents an individual record that is meant for aggregation

Example
logger := &StdLogger{log.New(os.Stdout, "", log.LstdFlags)}
client := kinesis.New(session.New(aws.NewConfig()))
pr := New(&Config{
	StreamName:           "test",
	BacklogCount:         2000,
	Client:               client,
	GetShards:            GetKinesisShardsFunc(client, "test"),
	ShardRefreshInterval: 5 * time.Second,
	Logger:               logger,
})

pr.Start()

failures := pr.NotifyFailures()

// Handle failures
go func() {
	for r := range failures {
		logger.Error("detected put failure", r)
	}
}()

go func() {
	for i := 0; i < 5000; i++ {
		record, err := newMyExampleUserRecord("foo", "bar")
		if err != nil {
			logger.Error("error creating user record", err)
		}
		err = pr.PutUserRecord(record)
		if err != nil {
			logger.Error("error producing", err)
		}
	}
}()

time.Sleep(3 * time.Second)
pr.Stop()
Output:

type Work

type Work struct {
	// contains filtered or unexported fields
}

func NewWork

func NewWork(records []*AggregatedRecordRequest, size int, reason string) *Work

type WorkerPool

type WorkerPool struct {
	*Config
	// contains filtered or unexported fields
}

func NewWorkerPool

func NewWorkerPool(config *Config) *WorkerPool

func (*WorkerPool) Add

func (wp *WorkerPool) Add(record *AggregatedRecordRequest)

func (*WorkerPool) Close

func (wp *WorkerPool) Close()

func (*WorkerPool) Errors

func (wp *WorkerPool) Errors() chan error

func (*WorkerPool) Flush

func (wp *WorkerPool) Flush()

func (*WorkerPool) Pause

func (wp *WorkerPool) Pause() []*AggregatedRecordRequest

func (*WorkerPool) Resume

func (wp *WorkerPool) Resume(records []*AggregatedRecordRequest)

func (*WorkerPool) Start

func (wp *WorkerPool) Start()

func (*WorkerPool) Wait

func (wp *WorkerPool) Wait()

Directories

Path Synopsis
deaggregation package from https://github.com/kimutansk/go-kinesis-deaggregation/blob/9d28647d1ff4d296bdd7c12c0cad272c9303d2fc/deaggregator.go
deaggregation package from https://github.com/kimutansk/go-kinesis-deaggregation/blob/9d28647d1ff4d296bdd7c12c0cad272c9303d2fc/deaggregator.go
loggers
Package pb is a generated protocol buffer package.
Package pb is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL