producer

package module
v0.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2022 License: MIT-0 Imports: 13 Imported by: 0

README

Amazon kinesis producer Build status License GoDoc

A KPL-like batch producer for Amazon Kinesis built on top of the official Go AWS SDK
and using the same aggregation format that KPL use.

Example
package main

import (
	"time"

	"github.com/sirupsen/logrus"
	"github.com/a8m/kinesis-producer"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/kinesis"
)

func main() {
	client := kinesis.New(session.New(aws.NewConfig()))
	pr := producer.New(&producer.Config{
		StreamName:   "test",
		BacklogCount: 2000,
		Client:       client
	})

	pr.Start()

	// Handle failures
	go func() {
		for r := range pr.NotifyFailures() {
			// r contains `Data`, `PartitionKey` and `Error()`
			log.Error(r)
		}
	}()

	go func() {
		for i := 0; i < 5000; i++ {
			err := pr.Put([]byte("foo"), "bar")
			if err != nil {
				log.WithError(err).Fatal("error producing")
			}
		}
	}()

	time.Sleep(3 * time.Second)
	pr.Stop()
}
Specifying logger implementation

producer.Config takes an optional logging.Logger implementation.

Using a custom logger
customLogger := &CustomLogger{}

&producer.Config{
  StreamName:   "test",
  BacklogCount: 2000,
  Client:       client,
  Logger:       customLogger,
}
Using logrus
import (
	"github.com/sirupsen/logrus"
	producer "github.com/a8m/kinesis-producer"
	"github.com/a8m/kinesis-producer/loggers"
)

log := logrus.New()

&producer.Config{
  StreamName:   "test",
  BacklogCount: 2000,
  Client:       client,
  Logger:       loggers.Logrus(log),
}

kinesis-producer ships with three logger implementations.

  • producer.Standard used the standard library logger
  • loggers.Logrus uses logrus logger
  • loggers.Zap uses zap logger
License

MIT

Documentation

Overview

Package producer is a generated protocol buffer package.

It is generated from these files:

messages.proto

It has these top-level messages:

AggregatedRecord
Tag
Record

Amazon kinesis producer A KPL-like batch producer for Amazon Kinesis built on top of the official Go AWS SDK and using the same aggregation format that KPL use.

Note: this project start as a fork of `tj/go-kinesis`. if you are not intersting in the KPL aggregation logic, you probably want to check it out.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrStoppedProducer     = errors.New("Unable to Put record. Producer is already stopped")
	ErrIllegalPartitionKey = errors.New("Invalid parition key. Length must be at least 1 and at most 256")
	ErrRecordSizeExceeded  = errors.New("Data must be less than or equal to 1MB in size")
)

Errors

Functions

This section is empty.

Types

type AggregatedRecord

type AggregatedRecord struct {
	PartitionKeyTable    []string  `protobuf:"bytes,1,rep,name=partition_key_table" json:"partition_key_table,omitempty"`
	ExplicitHashKeyTable []string  `protobuf:"bytes,2,rep,name=explicit_hash_key_table" json:"explicit_hash_key_table,omitempty"`
	Records              []*Record `protobuf:"bytes,3,rep,name=records" json:"records,omitempty"`
	XXX_unrecognized     []byte    `json:"-"`
}

func (*AggregatedRecord) Descriptor

func (*AggregatedRecord) Descriptor() ([]byte, []int)

func (*AggregatedRecord) GetExplicitHashKeyTable

func (m *AggregatedRecord) GetExplicitHashKeyTable() []string

func (*AggregatedRecord) GetPartitionKeyTable

func (m *AggregatedRecord) GetPartitionKeyTable() []string

func (*AggregatedRecord) GetRecords

func (m *AggregatedRecord) GetRecords() []*Record

func (*AggregatedRecord) ProtoMessage

func (*AggregatedRecord) ProtoMessage()

func (*AggregatedRecord) Reset

func (m *AggregatedRecord) Reset()

func (*AggregatedRecord) String

func (m *AggregatedRecord) String() string

type Aggregator

type Aggregator struct {
	// contains filtered or unexported fields
}

func (*Aggregator) Count

func (a *Aggregator) Count() int

Count return how many records stored in the aggregator.

func (*Aggregator) Drain

func (a *Aggregator) Drain() (*k.PutRecordsRequestEntry, error)

Drain create an aggregated `kinesis.PutRecordsRequestEntry` that compatible with the KCL's deaggregation logic.

If you interested to know more about it. see: aggregation-format.md

func (*Aggregator) Put

func (a *Aggregator) Put(data []byte, partitionKey string)

Put record using `data` and `partitionKey`. This method is thread-safe.

func (*Aggregator) Size

func (a *Aggregator) Size() int

Size return how many bytes stored in the aggregator. including partition keys.

type Config

type Config struct {
	// StreamName is the Kinesis stream.
	StreamName string

	// FlushInterval is a regular interval for flushing the buffer. Defaults to 5s.
	FlushInterval time.Duration

	// BatchCount determine the maximum number of items to pack in batch.
	// Must not exceed length. Defaults to 500.
	BatchCount int

	// BatchSize determine the maximum number of bytes to send with a PutRecords request.
	// Must not exceed 5MiB; Default to 5MiB.
	BatchSize int

	// AggregateBatchCount determine the maximum number of items to pack into an aggregated record.
	AggregateBatchCount int

	// AggregationBatchSize determine the maximum number of bytes to pack into an aggregated record. User records larger
	// than this will bypass aggregation.
	AggregateBatchSize int

	// BacklogCount determines the channel capacity before Put() will begin blocking. Default to `BatchCount`.
	BacklogCount int

	// Number of requests to sent concurrently. Default to 24.
	MaxConnections int

	// Logger is the logger used. Default to producer.Logger.
	Logger Logger

	// Enabling verbose logging. Default to false.
	Verbose bool

	// Client is the Putter interface implementation.
	Client Putter
}

Config is the Producer configuration.

type FailureRecord

type FailureRecord struct {
	Data         []byte
	PartitionKey string
	// contains filtered or unexported fields
}

Failure record type

type LogValue

type LogValue struct {
	Name  string
	Value interface{}
}

LogValue represents a key:value pair used by the Logger interface

func (LogValue) String

func (v LogValue) String() string

type Logger

type Logger interface {
	Info(msg string, values ...LogValue)
	Error(msg string, err error, values ...LogValue)
}

Logger represents a simple interface used by kinesis-producer to handle logging

type Producer

type Producer struct {
	sync.RWMutex
	*Config
	// contains filtered or unexported fields
}

Producer batches records.

func New

func New(config *Config) *Producer

New creates new producer with the given config.

func (*Producer) NotifyFailures

func (p *Producer) NotifyFailures() <-chan *FailureRecord

NotifyFailures registers and return listener to handle undeliverable messages. The incoming struct has a copy of the Data and the PartitionKey along with some error information about why the publishing failed.

func (*Producer) Put

func (p *Producer) Put(data []byte, partitionKey string) error

Put `data` using `partitionKey` asynchronously. This method is thread-safe.

Under the covers, the Producer will automatically re-attempt puts in case of transient errors. When unrecoverable error has detected(e.g: trying to put to in a stream that doesn't exist), the message will returned by the Producer. Add a listener with `Producer.NotifyFailures` to handle undeliverable messages.

func (*Producer) Start

func (p *Producer) Start()

Start the producer

func (*Producer) Stop

func (p *Producer) Stop()

Stop the producer gracefully. Flushes any in-flight data.

type Putter

type Putter interface {
	PutRecords(*k.PutRecordsInput) (*k.PutRecordsOutput, error)
}

Putter is the interface that wraps the KinesisAPI.PutRecords method.

type Record

type Record struct {
	PartitionKeyIndex    *uint64 `protobuf:"varint,1,req,name=partition_key_index" json:"partition_key_index,omitempty"`
	ExplicitHashKeyIndex *uint64 `protobuf:"varint,2,opt,name=explicit_hash_key_index" json:"explicit_hash_key_index,omitempty"`
	Data                 []byte  `protobuf:"bytes,3,req,name=data" json:"data,omitempty"`
	Tags                 []*Tag  `protobuf:"bytes,4,rep,name=tags" json:"tags,omitempty"`
	XXX_unrecognized     []byte  `json:"-"`
}

func (*Record) Descriptor

func (*Record) Descriptor() ([]byte, []int)

func (*Record) GetData

func (m *Record) GetData() []byte

func (*Record) GetExplicitHashKeyIndex

func (m *Record) GetExplicitHashKeyIndex() uint64

func (*Record) GetPartitionKeyIndex

func (m *Record) GetPartitionKeyIndex() uint64

func (*Record) GetTags

func (m *Record) GetTags() []*Tag

func (*Record) ProtoMessage

func (*Record) ProtoMessage()

func (*Record) Reset

func (m *Record) Reset()

func (*Record) String

func (m *Record) String() string

type StdLogger

type StdLogger struct {
	Logger *log.Logger
}

StdLogger implements the Logger interface using standard library loggers

func (*StdLogger) Error

func (l *StdLogger) Error(msg string, err error, values ...LogValue)

Error prints log message

func (*StdLogger) Info

func (l *StdLogger) Info(msg string, values ...LogValue)

Info prints log message

type Tag

type Tag struct {
	Key              *string `protobuf:"bytes,1,req,name=key" json:"key,omitempty"`
	Value            *string `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"`
	XXX_unrecognized []byte  `json:"-"`
}

func (*Tag) Descriptor

func (*Tag) Descriptor() ([]byte, []int)

func (*Tag) GetKey

func (m *Tag) GetKey() string

func (*Tag) GetValue

func (m *Tag) GetValue() string

func (*Tag) ProtoMessage

func (*Tag) ProtoMessage()

func (*Tag) Reset

func (m *Tag) Reset()

func (*Tag) String

func (m *Tag) String() string

Directories

Path Synopsis
loggers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL