kcl

package module
v0.0.0-...-90805a3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2017 License: Apache-2.0 Imports: 12 Imported by: 0

README

go-kcl

Go Kinesis client library

go-kcl is a hackday project at Zemanta so api is not in any way stable yet and it does contain bugs.

Currently it covers only the streaming part of Kinesis

We do believe in tests, but we'd like to stabilize apis first. Anyway they do CircleCI

This library is a wrapper around kinesis part of AWS SDK. It facilitates reading from and putting into the streams.

It depends on AWS SDK and Aerospike client library so first of all you'll need

go get github.com/aws/aws-sdk-go
go get github.com/aerospike/aerospike-client-go

Aerospike is currently used to store locks and state but we plan to add support for etcd in the future.

Stream manipulation

Client:

client := kcl.New(awsConfig, locker, checkpointer, snitcher)

Stream creation example:

err := client.CreateStream(streamName, shardCount)
if err != nil {
    // handle err
}

// so something with the stream

Stream update example (change the number of shards):

err := client.UpdateStream(streamName, shardsCount)
if err != nil {
    // handle err
}

// so something with new shards

Delete stream example:

err := client.DeleteStream(streamName)
if err != nil {
    // handle err
}

List streams:

streamNames, err := client.ListStreams()
if err != nil {
    // handle err
}

// do something with streams
Consuming the stream

It supports reading from a single shard and locking it so two clients don't consume the same shard. Example:

client := kcl.New(awsConfig, locker, checkpointer)

reader, err := client.NewLockedReader(streamName, shardId, clientName)
if err != nil {
    return err
}

wg := &sync.WaitGroup{}
go func(wg *sync.WaitGroup) {
		for record := range reader.Records() {
			// handle record
		}
		
		wg.Done()
	}(wg)
	
// wait for until ready to close
err = reader.CloseUpdateCheckpointAndRelease(wg)

It also supports also the shared reader that tries to read from as many shards as available. Example:

client := kcl.New(awsConfig, locker, checkpointer, snitch)

reader, err := client.NewSharedReader(streamName, clientName)
if err != nil {
    return err
}

go func() {
		for record := range reader.Records() {
			// handle record
		}
	}()
	
// wait for until ready to close
err = reader.CloseUpdateCheckpointAndRelease()
if err != nil {
    // handle err
}

err = reader.UpdateCheckpoint()
Pushing into the stream

Example of putting a record into a stream:

client := kcl.New(awsConfig, locker, checkpointer, snitcher)

err := client.PutRecord(streamName, partitionKey, record)
client := kcl.New(awsConfig, locker, checkpointer, snitcher)

err := client.PutRecords(streamName, records)

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrMissingLocker       = errors.New("Missing locker")
	ErrMissingCheckpointer = errors.New("Missing checkpointer")
	ErrMissingSnitcher     = errors.New("Missing snitcher")
	ErrShardLocked         = errors.New("Shard locked")
)
View Source
var Logger = log.New(os.Stderr, "", log.LstdFlags)

Functions

func GetStreamKey

func GetStreamKey(streamName, shardId, clientName string) string

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func New

func New(awsConfig *aws.Config, distlock locker.Locker, checkpoint checkpointer.Checkpointer, snitch snitcher.Snitcher) *Client

func (*Client) CreateStream

func (c *Client) CreateStream(streamName string, shardCount int) error

func (*Client) DeleteStream

func (c *Client) DeleteStream(streamName string) error

func (*Client) ListStreams

func (c *Client) ListStreams() ([]string, error)

func (*Client) NewLockedReader

func (c *Client) NewLockedReader(streamName string, shardId string, clientName string) (*LockedReader, error)

NewLockedReader creates a new reader with default parameters and locks it so no other instance of clientName can create a new one on this shard.

func (*Client) NewLockedReaderWithParameters

func (c *Client) NewLockedReaderWithParameters(streamName string, shardId string, clientName string, streamReadInterval time.Duration, readBatchSize int, channelBufferSize int) (*LockedReader, error)

NewLockedReader creates a new reader with specified parameters and locks it so no other instance of clientName can create a new one on this shard.

func (*Client) NewReader

func (c *Client) NewReader(streamName string, shardId string, clientName string) (*Reader, error)

NewReader initialize a reader on a shard with default parameters. It reads a batch of 100 records from a shard every 100 ms

func (*Client) NewReaderWithParameters

func (c *Client) NewReaderWithParameters(streamName string, shardId string, clientName string, streamReadInterval time.Duration, readBatchSize int, channelBufferSize int) (*Reader, error)

NewReaderWithParameters initialize a reader on a shard defining how often a shard should be read, the size of a read batch and the channel buffer.

func (*Client) NewSharedReader

func (c *Client) NewSharedReader(streamName string, clientName string) (*SharedReader, error)

func (*Client) NewSharedReaderWithParameters

func (c *Client) NewSharedReaderWithParameters(streamName string, clientName string, streamReadInterval time.Duration, readBatchSize int, channelBufferSize int) (*SharedReader, error)

func (*Client) PutRecord

func (c *Client) PutRecord(streamName, partitionKey string, record []byte) error

func (*Client) PutRecords

func (c *Client) PutRecords(streamName string, records []*kinesis.PutRecordsRequestEntry) error

func (*Client) StreamDescription

func (c *Client) StreamDescription(streamName string) (*kinesis.StreamDescription, error)

func (*Client) UpdateStream

func (c *Client) UpdateStream(streamName string, shardsCount int) error

type Kinesis

type Kinesis interface {
	PutRecord(streamName, partitionKey string, record []byte) error
	PutRecords(streamName string, records []*kinesis.PutRecordsRequestEntry) error

	StreamDescription(streamName string) (*kinesis.StreamDescription, error)
	CreateStream(streamName string, shardCount int) error
	UpdateStream(streamName string, shardsCount int) error
	DeleteStream(streamName string) error
	ListStreams() ([]string, error)

	NewReader(streamName string, shardId string, clientName string) (*Reader, error)
	NewReaderWithParameters(streamName string, shardId string, clientName string, streamReadInterval time.Duration, readBatchSize int, channelBufferSize int) (*Reader, error)
	NewLockedReader(streamName string, shardId string, clientName string) (*LockedReader, error)
	NewLockedReaderWithParameters(streamName string, shardId string, clientName string, streamReadInterval time.Duration, readBatchSize int, channelBufferSize int) (*LockedReader, error)
	NewSharedReader(streamName string, clientName string) (*SharedReader, error)
}

type LockedReader

type LockedReader struct {
	*Reader
	// contains filtered or unexported fields
}

func (*LockedReader) CloseAndRelease

func (lr *LockedReader) CloseAndRelease(wg *sync.WaitGroup) error

CloseAndRelease closes the reader and releases the lock AFTER wg.Done() was called.

IMPORTANT: You should call wg.Done() when the channel is closed and consumed. Failing to call wg.Done() will result in this call hanging indefinitely.

func (*LockedReader) CloseUpdateCheckpointAndRelease

func (lr *LockedReader) CloseUpdateCheckpointAndRelease(wg *sync.WaitGroup) error

CloseUpdateCheckpointAndRelease closes the reader and updates the checkpoint of the reader and releases the lock AFTER wg.Done() was called.

IMPORTANT: You should call wg.Done() when the channel is closed and consumed. Failing to call wg.Done() will result in this call hanging indefinitely.

func (*LockedReader) Release

func (lr *LockedReader) Release() error

Release releases the lock that was created when creating this reader. Successfully calling this function more than once will result in no-op.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

func (*Reader) BlockReading

func (r *Reader) BlockReading()

BlockReading stops reading from the stream after the current batch is processed. This could be used to safely update checkpoints before the reader is closed.

func (*Reader) Close

func (r *Reader) Close() error

Close waits for the current batch to be read and pushed to the channel and then stops the reading and closes the channel. No further records will be read from the stream. After calling close and consuming the channel is safe to call UpdateCheckpoint.

func (*Reader) IsClosed

func (r *Reader) IsClosed() bool

func (*Reader) Records

func (r *Reader) Records() <-chan *kinesis.Record

Records consumes a shard from the last checkpoint if any, otherwise starts at the last untrimmed record. It returns a read-only buffered channel via which results are delivered. NOTE: checkpoints are NOT automatically set, you have to do it via UpdateCheckpoint function ideally after you call Close and consume all messages from the channel. If you want to make checkpoints while consuming the stream and be 100% safe that no messages got unprocessed you should call BlockReading, consume the channel, call UpdateCheckpoint and then call ResumeReading.

func (*Reader) ResumeReading

func (r *Reader) ResumeReading()

ResumeReading from the stream

func (*Reader) UpdateCheckpoint

func (r *Reader) UpdateCheckpoint() error

UpdateCheckpoint sets the checkpoint to the last record that was read. It waits for the current batch to be processed and pushed to the channel.

type SharedReader

type SharedReader struct {
	// contains filtered or unexported fields
}

func (*SharedReader) Close

func (sr *SharedReader) Close() error

func (*SharedReader) Records

func (sr *SharedReader) Records() chan *kinesis.Record

func (*SharedReader) UpdateCheckpoint

func (sr *SharedReader) UpdateCheckpoint() error

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL