frizzle

package module
v0.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2019 License: MIT Imports: 10 Imported by: 2

README

Frizzle

Travis Build Status Coverage Status MIT licensed GitHub release Go Report Card GoDoc

Frizzle is a magic message (Msg) bus designed for parallel processing with many goroutines.

  • Receive() messages from a configured Source
  • Do your processing, possibly Send() each Msg on to one or more Sink destinations
  • Ack() (or Fail()) the Msg to notify the Source that processing completed

Getting Started

Start with the example implementation which shows a simple canonical implementation of a Processor on top of Frizzle and most of the core functions.

high level interface

// Frizzle is a Msg bus for rapidly configuring and processing messages between multiple message services.
type Frizzle interface {
	Receive() <-chan Msg
	Send(m Msg, dest string) error
	Ack(Msg) error
	Fail(Msg) error
	Events() <-chan Event
	AddOptions(...Option)
	FlushAndClose(timeout time.Duration) error
	Close() error
}

func Init(source Source, sink Sink, opts ...Option) Frizzle

The core of the repo is a Friz struct (returned by Init()) which implements Frizzle. The intent is for separate Source and Sink implementations (in separate repos) to be mixed and matched with the glue of Frizzle. A processing library can take a Frizzle input to allow easy re-use with multiple underlying message technologies. Friz also implements Source and Sink to allow chaining if needed.

Source and Sink Implementations

If you write a new implementation, we'd love to add it to our list!

Msg

A basic interface which can be extended:

// Msg encapsulates an immutable message passed around by Frizzle
type Msg interface {
	ID() string
	Data() []byte
	Timestamp() time.Time
}

A frizzle.SimpleMsg struct is provided for basic use cases.

Source and Sink

// Source defines a stream of incoming Msgs to be Received for processing,
// and reporting whether or not processing was successful.
type Source interface {
	Receive() <-chan Msg
	Ack(m Msg) error
	Fail(m Msg) error
	UnAcked() []Msg
	Stop() error
	Close() error
}

// Sink defines a message service where Msgs can be sent as part of processing.
type Sink interface {
	Send(m Msg, dest string) error
	Close() error
}

Options

Frizzle supports a variety of Option parameters for additional functionality to simplify your integration. These can be included with Init() or added using a friz.AddOptions() call. Note that AddOptions() updates the current friz and does not return anything.

Currently supported options:

  • Logger(log *zap.Logger) - Include a logger to report frizzle-internal logging.
  • Stats(stats StatsIncrementer) - Include a stats client for frizzle-internal metrics reporting. See Stats for what metrics are supported.
  • FailSink(s Sink, dest string) - Provide a Sink and destination (kafka topic, kinesis stream etc) where Fail()ed Msgs will be sent automatically.
  • MonitorProcessingRate(pollPeriod time.Duration) - Log the sum count of Acked and Failed Msgs every pollPeriod.
  • ReportAsyncErrors() - Launch a simple go routine to monitor the Events() channel. All events are logged at Error or Warn level; any events that match error interface have a stat recorded. Logging and/or stats are disabled if Logger()/Stats() have not been set, respectively.
    • This is a most basic handling that does not account for any specific Event types from Source/Sink implementations; developers should write an app specific monitoring routine to parse and handle specific Event cases (for which this can be a helpful starting template).
  • HandleShutdown(appShutdown func()) - Monitor for SIGINT and SIGTERM, call FlushAndClose() followed by provided appShutdown when they are received.
  • WithTransformer(ft FrizTransformer) - Add a transformer to modify the Msg's before they are sent or received. Currently only supports a "Simple Separator" Transformer which adds a specified record separator (such as newline) before sending if it isn't already present, and removes the same separator on receive if it is present.

Events

Since Source and Sink implementations often send and receive Msgs in batch fashion, They often may find out about any errors (or other important events) asynchronously. To support this, async events can be recovered via a channel returned by the Friz.Events() method. If a Source/Sink does not implement the Eventer interface this functionality will be ignored.

Caveats for using Events()
  • Frizzle Events must provide a minimum String() interface; when consuming Events a type assertion switch is highly recommended to receive other relevant information.
    • A default: trap for unhandled cases is also highly recommended!
    • For a reference implementation of the same interface see here
  • A Friz's Events() channel will be closed after all underlying Source/Sink Events() channels are closed.
    • If a Friz is initialized without any Source/Sinks that implement Events(), the channel returned by Friz.Events() will be closed immediately.

In addition to the String() method required by frizzle, currently only errors are returned by frinesis (no other event types) so all Events recovered will also conform to error interface.

Transformers

Transformers provide a mechanism to do simple updates to a Msg prior to a Send() or Receive(), which can be added at initializiation but is otherwise transparent to the processor and Source/Sink. This can be useful in a case where e.g. you need to apply a transform when running on one messaging platform but not another, and don't want to expose the core processing code to information about which platform is in use.

Frizzle supports adding Transformers with a WithTransformer() Option:

// WithTransformer returns an Option to add the provided FrizTransformer to a Frizzle
func WithTransformer(ft FrizTransformer) Option

// Transform is a type that modifies a Msg
type Transform func(Msg) Msg

// FrizTransformer provides a Transform to apply when a Msg is sent or received
type FrizTransformer interface {
	SendTransform() Transform
	ReceiveTransform() Transform
}

An example implementation to add and remove a separator suffix on each Msg is included in transform.go. To reduce clutter we generally suggest implementing a new Transform in a separate repo, but we can consider adding high utility ones here.

Prereqs / Build instructions

Go mod

As of Go 1.11, frizzle uses go mod for dependency management.

Install
$ go get github.com/qntfy/frizzle
$ cd frizzle
$ go build

Running the tests

go test -v --cover ./...

Configuration

We recommend building Sources and Sinks to initialize using Viper, typically through environment variables (but client can do whatever it wants, just needs to provide the configured Viper object with relevant values). The application might use a prefix such as before the below values.

Basic
Variable Required Description Default
BUFFER_SIZE source (optional) size of Input() channel buffer 500
MOCK optional mocks don't track sent or unacked Msgs, just return without error false

Stats

StatsIncrementer is a simple interface with just Increment(bucket string); based on github.com/alexcesaro/statsd but potentially compatible with a variety of metrics engines. When Stats() is set, Frizzle records the following metrics. If a Logger() has been set, each of the below also generates a Debug level log with the ID() of the Msg.

Bucket Description
ctr.rcv count of Msgs received from Source
ctr.send count of Msgs sent to Sink
ctr.ack count of Msgs Ack'ed by application
ctr.fail count of Msgs Fail'ed by application
ctr.failsink count of Msgs sent to FailSink
ctr.error count of errors from Events()*

* only recorded if ReportAsyncErrors is running

Contributing

Contributions welcome! Take a look at open issues. New Source/Sink implementations should be added in separate repos. If you let us know (and link to test demonstrating it conforms to the interface) we are happy to link them here!

Documentation

Overview

Example
package main

import (
	"fmt"
	"os"
	"strings"
	"time"

	"github.com/alexcesaro/statsd"
	"github.com/qntfy/frizzle"
	"github.com/qntfy/frizzle/basic"
	"github.com/spf13/viper"
	"go.uber.org/zap"
	"go.uber.org/zap/zapcore"
)

// Processor implements simple processing on a Frizzle
type Processor struct {
	frizzle.Frizzle
	count int
	quit  <-chan int
}

// Process() prints strings that are all lower case
// and keeps a running count of characters seen
func (p *Processor) Process(m frizzle.Msg) {
	data := m.Data()
	str := string(data)
	if str == "fail" {
		p.Fail(m)
		return
	}
	// count total characters seen
	p.count += len(str)
	// print and send any message that is only lower case
	if str == strings.ToLower(str) {
		fmt.Println(str)
		p.Send(m, "all-lower")
		p.Ack(m)
		return
	}
	// otherwise just Ack()
	p.Ack(m)
	return
}

// Loop processes received messages until quit signal received
func (p *Processor) Loop() {
	for {
		select {
		case <-p.quit:
			return
		case m := <-p.Receive():
			p.Process(m)
		}
	}
}

// Configure Viper for this example
func configViper() *viper.Viper {
	v := viper.GetViper()
	v.Set("track_fails", "true")
	return v
}

// helper method to extract payloads from []*msg.Msg
func msgsToStrings(msgs []frizzle.Msg) []string {
	result := make([]string, len(msgs))
	for i, m := range msgs {
		data := m.Data()
		result[i] = string(data)
	}
	return result
}

func inputMsgs(input chan<- frizzle.Msg, msgs []string) {
	for _, m := range msgs {
		input <- frizzle.NewSimpleMsg(m, []byte(m), time.Now())
	}
}

func main() {
	// Initialize a Processor including a simple Frizzle message bus
	v := configViper()
	source, input, _ := basic.InitSource(v)
	lowerSink, _ := basic.InitSink(v)
	failSink, _ := basic.InitSink(v)
	exampleLog := exampleLogger()
	stats, _ := statsd.New(statsd.Mute(true))
	inputMsgs(input, []string{"foo", "BAR", "fail", "baSil", "frizzle"})

	f := frizzle.Init(source, lowerSink,
		frizzle.FailSink(failSink, "fail"),
		frizzle.Logger(exampleLog),
		frizzle.Stats(stats),
	)
	quit := make(chan int)
	p := &Processor{
		Frizzle: f,
		quit:    quit,
	}

	// Process messages
	go p.Loop()

	// Close() returns an error until all Msgs have Fail() or Ack() run
	stillRunning := true
	for stillRunning {
		select {
		case <-time.After(100 * time.Millisecond):
			if err := p.Close(); err == nil {
				stillRunning = false
			}
		}
	}
	f.(*frizzle.Friz).LogProcessingRate(1 * time.Second)
	exampleLog.Sync()
	quit <- 1

	// View results
	fmt.Printf("Characters seen: %d\n", p.count)
	fmt.Printf("Failed messages: %v\n", msgsToStrings(source.Failed()))
	fmt.Printf("Sent messages: %v\n", msgsToStrings(lowerSink.Sent("all-lower")))
}

// exampleLogger replicates zap.NewExample() except at Info Level instead of Debug
func exampleLogger() *zap.Logger {
	encoderCfg := zapcore.EncoderConfig{
		MessageKey:     "msg",
		LevelKey:       "level",
		NameKey:        "logger",
		EncodeLevel:    zapcore.LowercaseLevelEncoder,
		EncodeTime:     zapcore.ISO8601TimeEncoder,
		EncodeDuration: zapcore.StringDurationEncoder,
	}
	core := zapcore.NewCore(zapcore.NewJSONEncoder(encoderCfg), os.Stdout, zap.InfoLevel)
	return zap.New(core)
}
Output:

foo
frizzle
{"level":"info","msg":"Processing Rate Update","rate_per_sec":5,"module":"monitor"}
Characters seen: 18
Failed messages: [fail]
Sent messages: [foo frizzle]

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrAlreadyAcked is returned when Ack() or Fail() are called on a Msg that was already Acked or Failed
	ErrAlreadyAcked = errors.New("this Msg has already been Acked")
	// ErrUnackedMsgsRemain is returned when Source.Close() is called while len(Source.Unacked()) > 0
	ErrUnackedMsgsRemain = errors.New("attempting to close frizzle Source while there are still unAcked Msgs")
)

Functions

func InitEvents

func InitEvents(ints ...interface{}) <-chan Event

InitEvents checks if objects are Eventers and merges any that are into one channel Note the returned channel will be closed immediately if none of the arguments are Eventers Exported in case of integrating events from multiple frizzles / sources / sinks

Types

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error conforms to Event and error interfaces for async error reporting

func NewError

func NewError(str string) *Error

NewError creates an Error

func (*Error) Error

func (e *Error) Error() string

Error provides a string version to conform to golang error interface

func (*Error) String

func (e *Error) String() string

String provides a string representation of the error

type Event

type Event interface {
	String() string
}

Event represents an async event from a Source or Sink

type Eventer

type Eventer interface {
	Events() <-chan Event
}

Eventer is capable of reporting Events asynchronously through a channel

type Friz

type Friz struct {
	// contains filtered or unexported fields
}

Friz is the internal struct implementing Frizzle.

func (*Friz) Ack

func (f *Friz) Ack(m Msg) error

Ack reports to the Source that processing completed successfully for this Msg

func (*Friz) AddOptions

func (f *Friz) AddOptions(opts ...Option)

AddOptions configures the Frizzle with the supplied Options.

func (*Friz) Close

func (f *Friz) Close() error

Close down the Frizzle, the Source and all configured Sinks gracefully. The Frizzle must not be used afterward.

func (*Friz) Events

func (f *Friz) Events() <-chan Event

Events returns the async Event channel Note if neither Source or Sink implement Events(), it will be closed immediately on init.

func (*Friz) Fail

func (f *Friz) Fail(m Msg) error

Fail reports to the Source that processing failed for this Msg, and optionally sends to a Fail-specific Sink

func (*Friz) FlushAndClose

func (f *Friz) FlushAndClose(timeout time.Duration) error

FlushAndClose provides default logic for stopping, emptying and shutting down the configured Source and Sink. Any Msgs which are still unAcked after the timeout has expired are Failed.

func (*Friz) LogProcessingRate

func (f *Friz) LogProcessingRate(pollPeriod time.Duration)

LogProcessingRate implements the logic for MonitorProcessingRate and is exposed for testing.

func (*Friz) Receive

func (f *Friz) Receive() <-chan Msg

Receive a receiving channel to get incoming Msgs from the Source.

func (*Friz) ReportAsyncErrors

func (f *Friz) ReportAsyncErrors()

ReportAsyncErrors monitors Events() output and reports via logging and/or stats It runs until f.Events() is closed and so should be run using the provided Option or in a separate goroutine.

func (*Friz) Send

func (f *Friz) Send(m Msg, dest string) error

Send the Msg to Sink identified by sinkName

type FrizTransformer

type FrizTransformer interface {
	SendTransform() Transform
	ReceiveTransform() Transform
}

FrizTransformer provides a Transform to apply when a Msg is sent or received

func NewSimpleSepTransformer

func NewSimpleSepTransformer(sep []byte) FrizTransformer

NewSimpleSepTransformer initializes a new SepTransformer with a specified separator

type Frizzle

type Frizzle interface {
	Receive() <-chan Msg
	Send(m Msg, dest string) error
	Ack(Msg) error
	Fail(Msg) error
	Events() <-chan Event
	AddOptions(...Option)
	FlushAndClose(timeout time.Duration) error
	Close() error
}

Frizzle is a Msg bus for rapidly configuring and processing messages between multiple message services.

func Init

func Init(source Source, sink Sink, opts ...Option) Frizzle

Init takes an initialized Source and Sink, and a set of Options. It returns a configured Frizzle.

type Msg

type Msg interface {
	ID() string
	Data() []byte
	Timestamp() time.Time
}

Msg encapsulates an immutable message passed around by Frizzle

func NewSimpleMsg

func NewSimpleMsg(id string, data []byte, timestamp time.Time) Msg

NewSimpleMsg creates a new SimpleMsg

type Option

type Option func(*Friz)

Option is a type that modifies a Frizzle object

func FailSink

func FailSink(s Sink, dest string) Option

FailSink specifies a Sink and dest to use on Fail for the Frizzle

func HandleShutdown

func HandleShutdown(appShutdown func()) Option

HandleShutdown handles a clean shutdown for frizzle and calls an app provided shutdown function for SIGINT and SIGTERM. If Frizzle is run with this option, it does not need to call Close() explicitly as this is handled by HandleShutdown

func Logger

func Logger(log *zap.Logger) Option

Logger specifies a zap.Logger for the Frizzle

func MonitorProcessingRate

func MonitorProcessingRate(pollPeriod time.Duration) Option

MonitorProcessingRate configures the Frizzle to periodically log the rate of Msgs processed.

func ReportAsyncErrors

func ReportAsyncErrors() Option

ReportAsyncErrors monitors Events() output and reports via logging and/or stats. error events are logged at Error level and have a stat recorded; all other events are logged at Warn level.

If setting a FailSink, ReportAsyncErrors should be added/re-added AFTER the FailSink option or async events from the FailSink

func Stats

func Stats(stats StatsIncrementer) Option

Stats specifies a stats object for the Frizzle

func WithTransformer

func WithTransformer(ft FrizTransformer) Option

WithTransformer returns an Option to add the provided FrizTransformer to a *Friz

type SimpleMsg

type SimpleMsg struct {
	// contains filtered or unexported fields
}

SimpleMsg is a basic Msg implementation

func (*SimpleMsg) Data

func (s *SimpleMsg) Data() []byte

Data returns the Data

func (*SimpleMsg) ID

func (s *SimpleMsg) ID() string

ID returns the ID

func (*SimpleMsg) Timestamp

func (s *SimpleMsg) Timestamp() time.Time

Timestamp returns the Timestamp

type SimpleSepTransformer

type SimpleSepTransformer struct {
	// contains filtered or unexported fields
}

SimpleSepTransformer appends and removes a specified separator such as '\n' at the end of the Msg

func (*SimpleSepTransformer) ReceiveTransform

func (st *SimpleSepTransformer) ReceiveTransform() Transform

ReceiveTransform returns a Transform to remove the separator if it is present at the end of Msg.Data()

func (*SimpleSepTransformer) SendTransform

func (st *SimpleSepTransformer) SendTransform() Transform

SendTransform returns a Transform to append the separator if it is not present at the end of Msg.Data()

type Sink

type Sink interface {
	Send(m Msg, dest string) error
	Close() error
}

Sink defines a message service where Msgs can be sent as part of processing.

type Source

type Source interface {
	Receive() <-chan Msg
	Ack(m Msg) error
	Fail(m Msg) error
	UnAcked() []Msg
	Stop() error
	Close() error
}

Source defines a stream of incoming Msgs to be Received for processing, and reporting whether or not processing was successful.

type StatsIncrementer

type StatsIncrementer interface {
	Increment(bucket string)
}

StatsIncrementer is a simple stats interface that supports incrementing a bucket Met by github.com/alexcesaro/statsd and similar; used for mocking and multiple impls

type Transform

type Transform func(Msg) Msg

Transform is a function that modifies a Msg

type Type

type Type string

Type identifies the supported types of Frizzle Sources and Sinks for use in dependent repos

const (
	// Kafka (Apache: http://kafka.apache.org/)
	Kafka Type = "kafka"
	// Kinesis (AWS: https://aws.amazon.com/kinesis/)
	Kinesis Type = "kinesis"
)

Directories

Path Synopsis
Code generated by mockery v1.0.0.
Code generated by mockery v1.0.0.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL