kcl

package module
v0.0.0-...-d1a765e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2018 License: Apache-2.0 Imports: 6 Imported by: 0

README

Introduction

This package is a simple and somewhat naïve client library for Kinesis. It was developed in order to provide an easy way to read from a Kinesis stream. We simply get the shards for the stream, poll each shard for new records, and return them to a handler function, from which you can do whatever you want.

Example

func main() {
	sess, err := session.NewSession()
	if err != nil {
		panic(err)
	}

	s := kcl.NewLocalStore()
	config := kcl.Config{
		Limit:        1000,
		Interval:     time.Millisecond * 1000,
		IteratorType: kcl.IteratorTypeLatest,
	}
	k, err := kcl.NewStream(sess, os.Getenv("AWS_KINESIS_ENDPOINT"), os.Getenv("AWS_KINESIS_STREAM"), s, config)
	if err != nil {
		panic(err)
	}

	err = k.Listen(handler)
	if err != nil {
		panic(err)
	}
}

func handler(records []*kinesis.Record) {
	for _, r := range records {
		fmt.Println(*r.SequenceNumber)
	}
}

Understanding Kinesis

  • What interval is appropriate for my stream?
  • What is the maximum number of records I should return?
  • How will I store the iterator? What if I am running this library in a distributed fashion?
Interval

This library works similar to Lambda with Kinesis. We simply poll the stream at every interval, and attempt to get the maximum number of records each time. You should understand the limits of reading from Kinesis by reading these docs.

Each shard can only be read 5 times per second. This means that if you have this package running in a distributed fashion, you could run into limits.

Limit

Kinesis has a limit of 2MB per second, you should consider your record size when configuring the limit for this package. For more information, check out Kinesis' limits.

Storing the iterator

Kinesis keeps track of your place on the stream by using an iterator. An iterator is simply a string that denotes which record you left off on. Initially, this package makes a request to Kinesis in order to get the place in the stream. Each time we get more records, a new iterator is returned.

It is important to have some record of this in persitent storage in case your application crashes.

If you are running in a distributed fashion, your store should be safe for concurrent use.

Iterator Types

  • LATEST - you will start with the next record that is put onto the stream.
  • TRIM_HORIZON - you will start with the oldest record on the stream, and work towards the head.
  • AT_SEQUENCE_NUMBER - you will start at the given sequence number. Sequence numbers are sequential since the beginning of time for each shard.
  • AFTER_SEQUENCE_NUMBER - similar to At Sequence Number, but after.
  • AT_TIMESTAMP - you will start at the first record at a given timestamp and work towards the head.

The iterator type only matters for the first time you pull records, after that, you will get records in order while working towards the head.

License

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this work except in compliance with the License. You may obtain a copy of the License in the LICENSE file, or at:

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Documentation

Index

Constants

View Source
const (
	IteratorTypeAtSequenceNumber    = "AT_SEQUENCE_NUMBER"
	IteratorTypeAfterSequenceNumber = "AFTER_SEQUENCE_NUMER"
	IteratorTypeAtTimestamp         = "AT_TIMESTAMP"
	IteratorTypeTrimHorizon         = "TRIM_HORIZON"
	IteratorTypeLatest              = "LATEST"
)

Define our iterator types, see https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#GetShardIteratorInput for more detail.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// The amount of time in between each GetRecords request. In order to not
	// exceed your ReadThroughput, you should consider the number of concurrent
	// consumers you have running.
	Interval time.Duration

	// IteratorType is the type of iterator that we want to use to read from the stream.
	// This denotes our starting position in the stream.
	IteratorType string

	// The maximum amount of records we will get on one GetRecords request.
	// In order to not run into Kinesis limits, you should consider the size
	// of your records.
	Limit int64
}

Config sets some properties that affect how we interact with the Kinesis stream.

type HandlerFunc

type HandlerFunc func(records []*kinesis.Record)

HandlerFunc is the argument to the listen function, for every batch of records that comes off of the Kinesis stream, we will call the HandlerFunc once.

type LocalStore

type LocalStore struct {
	// contains filtered or unexported fields
}

LocalStore implements Store using a local map. This store is not usable if your application is running in multiple containers.

func NewLocalStore

func NewLocalStore() *LocalStore

NewLocalStore will create a pointer to a local store that can keep track of our shard iterators.

func (*LocalStore) GetShardIterator

func (s *LocalStore) GetShardIterator(stream, shard string) (string, error)

GetShardIterator will get the shard iterator that corresponds to the stream-shard combination. We do not require a lock here, because we are simply reading.

func (*LocalStore) UpdateShardIterator

func (s *LocalStore) UpdateShardIterator(stream, shard, iterator string) error

UpdateShardIterator will use the stream-shard combination as the key, and store the iterator that corresponds to it. Updates require a mutex lock so that two goroutines are not trying to update it at the same time.

type Logger

type Logger interface {
	Log(keyvals ...interface{}) error
}

Logger is an interface that helps the user to log what is happening inside of this pacakge. This interface is pretty common and is used by https://github.com/go-kit/kit

type Shard

type Shard struct {
	// The identifier of the shard inside the stream
	ID string

	// The sequence number to start at
	StartAt string
}

Shard is a shard on the Kinesis stream

type Store

type Store interface {
	// GetShardIterator will get the current iterator for the shard. This
	// tells Amazon where we want to start reading records from.
	GetShardIterator(stream, shard string) (string, error)

	// UpdateShardIterator will update the position in the shard so that
	// on the next tick of our listener, we read records from the latest
	// position.
	UpdateShardIterator(stream, shard, iterator string) error
}

Store is an interface that defines how we will persist and retrieve the shard iterator. It is important to keep track of the shard iterator so that we know our position in the stream. The implementation of Store must be safe for concurrent use.

type Stream

type Stream struct {
	// Shards are all the shards that belong to the stream
	Shards []Shard

	// Logger is an interface that can be used to debug your stream
	Logger Logger

	// Name is the name of the stream
	Name string
	// contains filtered or unexported fields
}

Stream will keep track of where we are at on the stream for each shard

func NewStream

func NewStream(sess *session.Session, kinesisEndpoint string, stream string, store Store, config Config) (*Stream, error)

NewStream will return a pointer to a stream that you can listen on. Stream is capable of managing multiple shards, printing out log statements, and polling Kinesis at a regular interval.

func (*Stream) Listen

func (s *Stream) Listen(handler HandlerFunc) error

Listen will call the HandlerFunc for each batch of events that come off the Kinesis stream. Listen will poll the Kinesis Stream every interval, and handle any new records. We use the store to keep track of our position in the stream so that we avoid reading recoreds twice, or not progressing in the stream.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL