Documentation ¶
Overview ¶
Example ¶
package main import ( "fmt" "os" "strings" "time" "github.com/alexcesaro/statsd" "github.com/qntfy/frizzle" "github.com/qntfy/frizzle/basic" "github.com/spf13/viper" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) // Processor implements simple processing on a Frizzle type Processor struct { frizzle.Frizzle count int quit <-chan int } // Process() prints strings that are all lower case // and keeps a running count of characters seen func (p *Processor) Process(m frizzle.Msg) { data := m.Data() str := string(data) if str == "fail" { p.Fail(m) return } // count total characters seen p.count += len(str) // print and send any message that is only lower case if str == strings.ToLower(str) { fmt.Println(str) p.Send(m, "all-lower") p.Ack(m) return } // otherwise just Ack() p.Ack(m) return } // Loop processes received messages until quit signal received func (p *Processor) Loop() { for { select { case <-p.quit: return case m := <-p.Receive(): p.Process(m) } } } // Configure Viper for this example func configViper() *viper.Viper { v := viper.GetViper() v.Set("track_fails", "true") return v } // helper method to extract payloads from []*msg.Msg func msgsToStrings(msgs []frizzle.Msg) []string { result := make([]string, len(msgs)) for i, m := range msgs { data := m.Data() result[i] = string(data) } return result } func inputMsgs(input chan<- frizzle.Msg, msgs []string) { for _, m := range msgs { input <- frizzle.NewSimpleMsg(m, []byte(m), time.Now()) } } func main() { // Initialize a Processor including a simple Frizzle message bus v := configViper() source, input, _ := basic.InitSource(v) lowerSink, _ := basic.InitSink(v) failSink, _ := basic.InitSink(v) exampleLog := exampleLogger() stats, _ := statsd.New(statsd.Mute(true)) inputMsgs(input, []string{"foo", "BAR", "fail", "baSil", "frizzle"}) f := frizzle.Init(source, lowerSink, frizzle.FailSink(failSink, "fail"), frizzle.Logger(exampleLog), frizzle.Stats(stats), ) quit := make(chan int) p := &Processor{ Frizzle: f, quit: quit, } // Process messages go p.Loop() // Close() returns an error until all Msgs have Fail() or Ack() run stillRunning := true for stillRunning { select { case <-time.After(100 * time.Millisecond): if err := p.Close(); err == nil { stillRunning = false } } } f.(*frizzle.Friz).LogProcessingRate(1 * time.Second) exampleLog.Sync() quit <- 1 // View results fmt.Printf("Characters seen: %d\n", p.count) fmt.Printf("Failed messages: %v\n", msgsToStrings(source.Failed())) fmt.Printf("Sent messages: %v\n", msgsToStrings(lowerSink.Sent("all-lower"))) } // exampleLogger replicates zap.NewExample() except at Info Level instead of Debug func exampleLogger() *zap.Logger { encoderCfg := zapcore.EncoderConfig{ MessageKey: "msg", LevelKey: "level", NameKey: "logger", EncodeLevel: zapcore.LowercaseLevelEncoder, EncodeTime: zapcore.ISO8601TimeEncoder, EncodeDuration: zapcore.StringDurationEncoder, } core := zapcore.NewCore(zapcore.NewJSONEncoder(encoderCfg), os.Stdout, zap.InfoLevel) return zap.New(core) }
Output: foo frizzle {"level":"info","msg":"Processing Rate Update","rate_per_sec":5,"module":"monitor"} Characters seen: 18 Failed messages: [fail] Sent messages: [foo frizzle]
Index ¶
- Variables
- func InitEvents(ints ...interface{}) <-chan Event
- type Error
- type Event
- type Eventer
- type Friz
- func (f *Friz) Ack(m Msg) error
- func (f *Friz) AddOptions(opts ...Option)
- func (f *Friz) Close() error
- func (f *Friz) Events() <-chan Event
- func (f *Friz) Fail(m Msg) error
- func (f *Friz) FlushAndClose(timeout time.Duration) error
- func (f *Friz) LogProcessingRate(pollPeriod time.Duration)
- func (f *Friz) Receive() <-chan Msg
- func (f *Friz) ReportAsyncErrors()
- func (f *Friz) Send(m Msg, dest string) error
- type FrizTransformer
- type Frizzle
- type Msg
- type Option
- func FailSink(s Sink, dest string) Option
- func HandleShutdown(appShutdown func()) Option
- func Logger(log *zap.Logger) Option
- func MonitorProcessingRate(pollPeriod time.Duration) Option
- func ReportAsyncErrors() Option
- func Stats(stats StatsIncrementer) Option
- func WithTransformer(ft FrizTransformer) Option
- type SimpleMsg
- type SimpleSepTransformer
- type Sink
- type Source
- type StatsIncrementer
- type Transform
- type Type
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrAlreadyAcked is returned when Ack() or Fail() are called on a Msg that was already Acked or Failed ErrAlreadyAcked = errors.New("this Msg has already been Acked") // ErrUnackedMsgsRemain is returned when Source.Close() is called while len(Source.Unacked()) > 0 ErrUnackedMsgsRemain = errors.New("attempting to close frizzle Source while there are still unAcked Msgs") )
Functions ¶
func InitEvents ¶
func InitEvents(ints ...interface{}) <-chan Event
InitEvents checks if objects are Eventers and merges any that are into one channel Note the returned channel will be closed immediately if none of the arguments are Eventers Exported in case of integrating events from multiple frizzles / sources / sinks
Types ¶
type Error ¶
type Error struct {
// contains filtered or unexported fields
}
Error conforms to Event and error interfaces for async error reporting
type Event ¶
type Event interface {
String() string
}
Event represents an async event from a Source or Sink
type Eventer ¶
type Eventer interface {
Events() <-chan Event
}
Eventer is capable of reporting Events asynchronously through a channel
type Friz ¶
type Friz struct {
// contains filtered or unexported fields
}
Friz is the internal struct implementing Frizzle.
func (*Friz) AddOptions ¶
AddOptions configures the Frizzle with the supplied Options.
func (*Friz) Close ¶
Close down the Frizzle, the Source and all configured Sinks gracefully. The Frizzle must not be used afterward.
func (*Friz) Events ¶
Events returns the async Event channel Note if neither Source or Sink implement Events(), it will be closed immediately on init.
func (*Friz) Fail ¶
Fail reports to the Source that processing failed for this Msg, and optionally sends to a Fail-specific Sink
func (*Friz) FlushAndClose ¶
FlushAndClose provides default logic for stopping, emptying and shutting down the configured Source and Sink. Any Msgs which are still unAcked after the timeout has expired are Failed.
func (*Friz) LogProcessingRate ¶
LogProcessingRate implements the logic for MonitorProcessingRate and is exposed for testing.
func (*Friz) ReportAsyncErrors ¶
func (f *Friz) ReportAsyncErrors()
ReportAsyncErrors monitors Events() output and reports via logging and/or stats It runs until f.Events() is closed and so should be run using the provided Option or in a separate goroutine.
type FrizTransformer ¶
FrizTransformer provides a Transform to apply when a Msg is sent or received
func NewSimpleSepTransformer ¶
func NewSimpleSepTransformer(sep []byte) FrizTransformer
NewSimpleSepTransformer initializes a new SepTransformer with a specified separator
type Frizzle ¶
type Frizzle interface { Receive() <-chan Msg Send(m Msg, dest string) error Ack(Msg) error Fail(Msg) error Events() <-chan Event AddOptions(...Option) FlushAndClose(timeout time.Duration) error Close() error }
Frizzle is a Msg bus for rapidly configuring and processing messages between multiple message services.
type Option ¶
type Option func(*Friz)
Option is a type that modifies a Frizzle object
func HandleShutdown ¶
func HandleShutdown(appShutdown func()) Option
HandleShutdown handles a clean shutdown for frizzle and calls an app provided shutdown function for SIGINT and SIGTERM. If Frizzle is run with this option, it does not need to call Close() explicitly as this is handled by HandleShutdown
func MonitorProcessingRate ¶
MonitorProcessingRate configures the Frizzle to periodically log the rate of Msgs processed.
func ReportAsyncErrors ¶
func ReportAsyncErrors() Option
ReportAsyncErrors monitors Events() output and reports via logging and/or stats. error events are logged at Error level and have a stat recorded; all other events are logged at Warn level.
If setting a FailSink, ReportAsyncErrors should be added/re-added AFTER the FailSink option or async events from the FailSink
func Stats ¶
func Stats(stats StatsIncrementer) Option
Stats specifies a stats object for the Frizzle
func WithTransformer ¶
func WithTransformer(ft FrizTransformer) Option
WithTransformer returns an Option to add the provided FrizTransformer to a *Friz
type SimpleMsg ¶
type SimpleMsg struct {
// contains filtered or unexported fields
}
SimpleMsg is a basic Msg implementation
type SimpleSepTransformer ¶
type SimpleSepTransformer struct {
// contains filtered or unexported fields
}
SimpleSepTransformer appends and removes a specified separator such as '\n' at the end of the Msg
func (*SimpleSepTransformer) ReceiveTransform ¶
func (st *SimpleSepTransformer) ReceiveTransform() Transform
ReceiveTransform returns a Transform to remove the separator if it is present at the end of Msg.Data()
func (*SimpleSepTransformer) SendTransform ¶
func (st *SimpleSepTransformer) SendTransform() Transform
SendTransform returns a Transform to append the separator if it is not present at the end of Msg.Data()
type Source ¶
type Source interface { Receive() <-chan Msg Ack(m Msg) error Fail(m Msg) error UnAcked() []Msg Stop() error Close() error }
Source defines a stream of incoming Msgs to be Received for processing, and reporting whether or not processing was successful.
type StatsIncrementer ¶
type StatsIncrementer interface {
Increment(bucket string)
}
StatsIncrementer is a simple stats interface that supports incrementing a bucket Met by github.com/alexcesaro/statsd and similar; used for mocking and multiple impls
type Type ¶
type Type string
Type identifies the supported types of Frizzle Sources and Sinks for use in dependent repos
const ( // Kafka (Apache: http://kafka.apache.org/) Kafka Type = "kafka" // Kinesis (AWS: https://aws.amazon.com/kinesis/) Kinesis Type = "kinesis" )