frinesis

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2019 License: MIT Imports: 14 Imported by: 1

README

frinesis

Travis Build Status Coverage Status MIT licensed GitHub release Go Report Card GoDoc

An AWS Kinesis implementation of a Frizzle Sink.

In addition to the AWS Kinesis SDK for Go, Frinesis uses a modified version of sendgridlabs/go-kinesis/batchproducer (under separate MIT license).

Frizzle is a magic message (Msg) bus designed for parallel processing w many goroutines.

  • Receive() messages from a configured Source
  • Do your processing, possibly Send() each Msg on to one or more Sink destinations
  • Ack() (or Fail()) the Msg to notify the Source that processing completed

Prereqs / Build instructions

Go mod

As of Go 1.11, frinesis uses go mod for dependency management.

Running the tests

Frinesis has integration tests which require a kinesis endpoint to test against. KINESIS_ENDPOINT environment variable is used by tests. We test with a localstack instance (docker-compose.yml provided) but other tools like kinesalite could also work.

$ docker-compose up -d
# takes a few seconds to initialize; can use a tool like wait-for-it.sh in scripting
$ export KINESIS_ENDPOINT=localhost:4568
$ go test -v --cover ./...

Configuration

Frinesis Sinks are configured using Viper.

func InitSink(config *viper.Viper) (*Sink, error)

InitSinkWithLogger(config *viper.Viper, logger *zap.Logger)

We typically initialize Viper through environment variables (but client can do whatever it wants, just needs to provide the configured Viper object with relevant values). The application might use a prefix before the below values.

Variable Required Description Default
AWS_REGION_NAME required region being used e.g. us-east-1
KINESIS_ENDPOINT optional if using a custom endpoint e.g. for local testing. Defaults to AWS standard internal and retrieving credentials from IAM if not set. http:// prefixed if no scheme set
KINESIS_FLUSH_TIMEOUT sink (optional) how long to wait for Kinesis Sink to flush remaining messages on close (use duration) 30s

Async Error Handling

Since records are sent in batch fashion, Kinesis may report errors asynchronously. Errors can be recovered via channel returned by the Sink.Events() method. In addition to the String() method required by frizzle, currently only errors are returned by frinesis (no other event types) so all Events recovered will also conform to error interface.

Contributing

Contributions welcome! Take a look at open issues.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ClientFromViper

func ClientFromViper(config *viper.Viper) (*kinesis.Kinesis, error)

ClientFromViper takes a viper config and returns an initialized go-kinesis client. Used internally and as a test helper method.

func NewClient

func NewClient(region string) *kinesis.Kinesis

NewClient returns a default AWS SDK kinesis client

func NewClientWithEndpoint

func NewClientWithEndpoint(region, endpoint string) *kinesis.Kinesis

NewClientWithEndpoint returns an AWS SDK kinesis client pointing to a custom service endpoint

Types

type Sink

type Sink struct {
	// contains filtered or unexported fields
}

Sink provides a frizzle interface for writing to AWS Kinesis

func InitSink

func InitSink(config *viper.Viper) (*Sink, error)

InitSink initializes a basic sink with no logging

func InitSinkWithLogger

func InitSinkWithLogger(config *viper.Viper, logger *zap.Logger) (*Sink, error)

InitSinkWithLogger initializes a basic Sink with a provided logger

func (*Sink) Close

func (s *Sink) Close() error

Close the Sink

func (*Sink) Events

func (s *Sink) Events() <-chan frizzle.Event

Events reports async Events that occur during processing

func (*Sink) Restart

func (s *Sink) Restart() error

Restart producer go-routines to support Send() calls after Close() without re-initializing (intended for use in Lambda where Sink object may be preserved in memory for subsequent run after `Close()`)

func (*Sink) Send

func (s *Sink) Send(m frizzle.Msg, topic string) error

Send a Msg to topic. Initializes a kinesis producer for this topic if this Sink hasn't sent to it before.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL