kinesumer

package module
v0.0.0-...-2d4f00f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2016 License: MIT Imports: 11 Imported by: 7

README

Kinesumer

Circle CI

Kinesumer is a simple Go client library for Amazon AWS Kinesis. It aims to be a native Go alternative to Amazon's KCL. Kinesumer includes a tool (called kinesumer) that lets you tail Kinesis streams and check the status of Kinesumer workers.

Features

  • Automatically manages one consumer goroutine per shard.
  • Handles shard splitting and merging properly.
  • Provides a simple channel interface for incoming Kinesis records.
  • Provides a tool for managing Kinesis streams:
    • Tailing a stream

Using the package

Install

go get github.com/remind101/kinesumer

Example Program

package main

import (
	"fmt"
	"os"

	"github.com/remind101/kinesumer"
)

func main() {
	k, err := kinesumer.NewDefault(
		"Stream",
	)
	if err != nil {
		panic(err)
	}
	k.Begin()
	defer k.End()
	for i := 0; i < 100; i++ {
		rec := <-k.Records()
		fmt.Println(string(rec.Data()))
	}
}

Using the tool

Install

go get -u github.com/remind101/kinesumer/cmd/kinesumer

To tail a stream make sure you have AWS credentials ready (either in ~/.aws or in env vars) and run:

kinesumer tail -s STREAM_NAME

Documentation

Index

Constants

View Source
const (
	ECrit  = "crit"
	EError = "error"
	EWarn  = "warn"
	EInfo  = "info"
	EDebug = "debug"
)
View Source
const (
	// According to the Kinesis limits documentation:
	//
	//	Each shard can support up to 5 transactions per second for
	//	reads, up to a maximum total data read rate of 2 MB per second.
	//
	// See http://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
	DefaultGetRecordsThrottle = 200 * time.Millisecond
)

Variables

View Source
var DefaultOptions = Options{

	ListStreamsLimit:        1000,
	DescribeStreamLimit:     10000,
	GetRecordsLimit:         10000,
	GetRecordsThrottle:      DefaultGetRecordsThrottle,
	PollTime:                2000,
	MaxShardWorkers:         50,
	ErrHandler:              DefaultErrHandler,
	DefaultIteratorType:     "LATEST",
	ShardAcquisitionTimeout: 90 * time.Second,
}

Functions

func DefaultErrHandler

func DefaultErrHandler(err k.Error)

func ErrHandler

func ErrHandler(errHandler func(IError)) func(k.Error)

Types

type Error

type Error struct {
	// contains filtered or unexported fields
}

func NewError

func NewError(severity, message string, origin error) *Error

func (*Error) Error

func (e *Error) Error() string

func (*Error) Origin

func (e *Error) Origin() error

func (*Error) Severity

func (e *Error) Severity() string

type ICheckpointer

type ICheckpointer kinesumeriface.Checkpointer

type IError

type IError kinesumeriface.Error

type IKinesis

type IKinesis kinesumeriface.Kinesis

type IKinesumer

type IKinesumer kinesumeriface.Kinesumer

type IProvisioner

type IProvisioner kinesumeriface.Provisioner

type IRecord

type IRecord kinesumeriface.Record

type Kinesumer

type Kinesumer struct {
	Kinesis      k.Kinesis
	Checkpointer k.Checkpointer
	Provisioner  k.Provisioner
	Stream       string
	Options      *Options
	// contains filtered or unexported fields
}

func New

func New(kinesis k.Kinesis, checkpointer k.Checkpointer, provisioner k.Provisioner,
	randSource rand.Source, stream string, opt *Options, duration time.Duration) (*Kinesumer, error)

func NewDefault

func NewDefault(stream string, duration time.Duration) (*Kinesumer, error)

func (*Kinesumer) Begin

func (kin *Kinesumer) Begin() (int, error)

func (*Kinesumer) End

func (kin *Kinesumer) End()

func (*Kinesumer) GetShards

func (kin *Kinesumer) GetShards() (shards []*kinesis.Shard, err error)

func (*Kinesumer) GetStreams

func (kin *Kinesumer) GetStreams() (streams []string, err error)

func (*Kinesumer) LaunchShardWorker

func (kin *Kinesumer) LaunchShardWorker(shards []*kinesis.Shard) (int, *ShardWorker, error)

func (*Kinesumer) Records

func (kin *Kinesumer) Records() <-chan k.Record

func (*Kinesumer) StreamExists

func (kin *Kinesumer) StreamExists() (found bool, err error)

type Options

type Options struct {
	ListStreamsLimit    int64
	DescribeStreamLimit int64
	GetRecordsLimit     int64

	// Determines how frequently GetRecords is throttled. The zero value is
	// DefaultGetRecordsThrottle.
	GetRecordsThrottle time.Duration

	// Amount of time to poll of records if consumer lag is minimal
	PollTime            int
	MaxShardWorkers     int
	ErrHandler          func(k.Error)
	DefaultIteratorType string

	// How long to try and get shard iterator
	ShardAcquisitionTimeout time.Duration

	// ShardIteratorTimestamp is used when DefaultIteratorType is "AT_TIMESTAMP"
	ShardIteratorTimestamp time.Time
}

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader provides an io.Reader implementation that can read data from a kinesis stream.

func NewReader

func NewReader(records <-chan k.Record) *Reader

NewReader returns a new Reader instance that reads data from records.

func (*Reader) Read

func (r *Reader) Read(b []byte) (n int, err error)

Read implements io.Reader Read. Read will copy <= len(b) bytes from the kinesis stream into b.

type Record

type Record struct {
	// contains filtered or unexported fields
}

func (*Record) Data

func (r *Record) Data() []byte

func (*Record) Done

func (r *Record) Done()

func (*Record) MillisBehindLatest

func (r *Record) MillisBehindLatest() int64

func (*Record) PartitionKey

func (r *Record) PartitionKey() string

func (*Record) SequenceNumber

func (r *Record) SequenceNumber() string

func (*Record) ShardId

func (r *Record) ShardId() string

type ShardWorker

type ShardWorker struct {
	GetRecordsLimit int64
	// contains filtered or unexported fields
}

func (*ShardWorker) GetRecords

func (s *ShardWorker) GetRecords(it string) ([]*kinesis.Record, string, int64, error)

func (*ShardWorker) GetRecordsAndProcess

func (s *ShardWorker) GetRecordsAndProcess(it, sequence string) (cont bool, nextIt string, nextSeq string)

func (*ShardWorker) GetShardIterator

func (s *ShardWorker) GetShardIterator(iteratorType string, sequence string, timestamp time.Time) (string, error)

func (*ShardWorker) RunWorker

func (s *ShardWorker) RunWorker()

func (*ShardWorker) TryGetShardIterator

func (s *ShardWorker) TryGetShardIterator(iteratorType string, sequence string, timestamp time.Time) string

type Unit

type Unit struct{}

Unit has only one possible value, Unit{}, and is used to make signal channels to tell the workers when to stop

Directories

Path Synopsis
checkpointers
cmd
provisioners

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL