connector

package module
v0.0.0-...-855af23 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2016 License: MIT Imports: 14 Imported by: 0

README

Golang Kinesis Connectors

Kinesis connector applications written in Go

With the new release of Kinesis Firehose I'd recommend using the Lambda Streams to Firehose project for loading data directly into S3 and Redshift.

Inspired by the Amazon Kinesis Connector Library. This library is intended to be a lightweight wrapper around the Kinesis API to handle batching records, setting checkpoints, respecting ratelimits, and recovering from network errors.

golang_kinesis_connector

Overview

The consumer expects a handler func that will process a buffer of incoming records.

func main() {
  var(
    app = flag.String("app", "", "The app name")
    stream = flag.String("stream", "", "The stream name")
  )
  flag.Parse()

  // create new consumer
  c := connector.NewConsumer(*app, *stream)

  // override default values
  c.Set("maxBatchCount", 200)
  c.Set("pollInterval", "3s")

  // start consuming records from the queues
  c.Start(connector.HandlerFunc(func(b connector.Buffer) {
    fmt.Println(b.GetRecords())
    // process the records
  }))

  select {}
}
Installation

Get the package source:

$ go get github.com/harlow/kinesis-connectors
Fetching Dependencies

Install gvt:

$ export GO15VENDOREXPERIMENT=1
$ go get github.com/FiloSottile/gvt

Install dependencies into ./vendor/:

$ gvt restore
Examples

Use the seed stream code to put sample data onto the stream.

Logging

Default logging is handled by go-kit package log. Applications can override the default loging behaviour by implementing the Logger interface.

connector.SetLogger(NewCustomLogger())

Contributing

Please see CONTRIBUTING.md for more information. Thank you, contributors!

License

Copyright (c) 2015 Harlow Ward. It is free software, and may be redistributed under the terms specified in the LICENSE file.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SetLogger

func SetLogger(l log.Logger)

SetLogger adds the ability to change the logger so that external packages can control the logging for this package

Types

type Buffer

type Buffer struct {
	MaxBatchCount int
	// contains filtered or unexported fields
}

Buffer holds records and answers questions on when it should be periodically flushed.

func (*Buffer) AddRecord

func (b *Buffer) AddRecord(r *kinesis.Record)

AddRecord adds a record to the buffer.

func (*Buffer) FirstSeq

func (b *Buffer) FirstSeq() string

FirstSequenceNumber returns the sequence number of the first record in the buffer.

func (*Buffer) Flush

func (b *Buffer) Flush()

Flush empties the buffer and resets the sequence counter.

func (*Buffer) GetRecords

func (b *Buffer) GetRecords() []*kinesis.Record

GetRecords returns the records in the buffer.

func (*Buffer) LastSeq

func (b *Buffer) LastSeq() string

LastSeq returns the sequence number of the last record in the buffer.

func (*Buffer) ShouldFlush

func (b *Buffer) ShouldFlush() bool

ShouldFlush determines if the buffer has reached its target size.

type Checkpoint

type Checkpoint struct {
	AppName    string
	StreamName string
	// contains filtered or unexported fields
}

RedisCheckpoint implements the Checkpont interface. This class is used to enable the Pipeline.ProcessShard to checkpoint their progress.

func (*Checkpoint) CheckpointExists

func (c *Checkpoint) CheckpointExists(shardID string) bool

CheckpointExists determines if a checkpoint for a particular Shard exists. Typically used to determine whether we should start processing the shard with TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).

func (*Checkpoint) SequenceNumber

func (c *Checkpoint) SequenceNumber() string

SequenceNumber returns the current checkpoint stored for the specified shard.

func (*Checkpoint) SetCheckpoint

func (c *Checkpoint) SetCheckpoint(shardID string, sequenceNumber string)

SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). Upon failover, record processing is resumed from this point.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(appName, streamName string) *Consumer

NewConsumer creates a new kinesis connection and returns a new consumer initialized with app and stream name

func (*Consumer) Set

func (c *Consumer) Set(option string, value interface{})

Set `option` to `value`

func (*Consumer) Start

func (c *Consumer) Start(handler Handler)

type Handler

type Handler interface {
	HandleRecords(b Buffer)
}

type HandlerFunc

type HandlerFunc func(b Buffer)

HandlerFunc is a convenience type to avoid having to declare a struct to implement the Handler interface, it can be used like this:

consumer.AddHandler(connector.HandlerFunc(func(b Buffer) {
  // ...
}))

func (HandlerFunc) HandleRecords

func (h HandlerFunc) HandleRecords(b Buffer)

HandleRecords implements the Handler interface

Directories

Path Synopsis
emitter
s3
examples
s3

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL