x

package
v0.0.0-...-ebf99ae Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 2, 2021 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Examples

Constants

View Source
const (
	ANY_TYPE = "_any_type_"
)

Variables

View Source
var ErrSQLTypeConversionError = errors.New("unknown type to convert to SQL")

ErrSQLTypeConversionError is the error

Functions

func Cap

func Cap(in <-chan interface{}, out chan<- interface{}, errs chan<- error)

Cap will cap off a pipeline with no-op. This can be useful when embedding a pipeline and don't want to forward message back to the parent pipeline.

func IF

func IF(t l.Tfunc, check IfFunc) l.Tfunc

IF applies an IfFunc to see if the Tfunc should get the message if not, let the message pass through

func IFElse

func IFElse(t, e l.Tfunc, check IfFunc) l.Tfunc

IFElse applies an IfFunc to see if the Tfunc should run of Else func

func OnlyIF

func OnlyIF(t l.Tfunc, check IfFunc) l.Tfunc

OnlyIF applies an IfFunc to see if the Tfunc should get the message if not, ignore the message

func Tap

func Tap(otherOut chan<- interface{}) func(<-chan interface{}, chan<- interface{}, chan<- error)

Tap will send message on through the pipe/line as well as to another channel.

func Tee

func Tee(targets ...line.Tfunc) line.Tfunc

Tee splits a stream into multiple streams and merges the results Example:

otherpipe := line.New()
line.New().Add(x.Tee(otherpipe)).Run()

Types

type Batch

type Batch struct {
	N         int
	Timeout   time.Duration
	ByteLimit int
	// contains filtered or unexported fields
}

Batch will take N number of messages and create a batch message that has the slice of the messages as the metdata key "batch". It will also combine the bodies into the body of the batch separated by newlines if the CombineBody is true.

func CloseableBatch

func CloseableBatch(size int, timeout time.Duration, byteLimit int) Batch

CloseableBatch creates a new batch

func (*Batch) Close

func (b *Batch) Close()

Close will send a signal to the batcher to break out and shutdown. This will send one final batch of whatever is left.

func (Batch) T

func (b Batch) T(in <-chan interface{}, out chan<- interface{}, errs chan<- error)

T inplements the pipeline transform interface.

Example (NoTimeout)
package main

import (
	"fmt"

	l "github.com/MasteryConnect/pipe/line"
	"github.com/MasteryConnect/pipe/message"
	"github.com/MasteryConnect/pipe/x"
)

func main() {
	l.New().SetP(func(out chan<- interface{}, errs chan<- error) {
		for i := 0; i < 4; i++ {
			out <- "foo"
		}
	}).Add(
		x.Batch{N: 3}.T,
		l.Inline(func(m interface{}) (interface{}, error) {
			if b, ok := m.(message.Batch); ok {
				return fmt.Sprintf("batch of size %d", b.Size()), nil
			}
			return fmt.Sprintf("want *x.BatchMsg bot %T", m), nil
		}),
		l.Stdout,
	).Run()
}
Output:

batch of size 3
batch of size 1
Example (WithByteLimit)
package main

import (
	"fmt"

	l "github.com/MasteryConnect/pipe/line"
	"github.com/MasteryConnect/pipe/message"
	"github.com/MasteryConnect/pipe/x"
)

func main() {
	l.New().SetP(func(out chan<- interface{}, errs chan<- error) {
		for i := 0; i < 10; i++ {
			out <- "foo"
		}
	}).Add(
		x.Batch{N: 4, ByteLimit: 10}.T,
		l.Inline(func(m interface{}) (interface{}, error) {
			if b, ok := m.(message.Batch); ok {
				return fmt.Sprintf("batch of size %d", b.Size()), nil
			}
			return fmt.Sprintf("want *x.BatchMsg bot %T", m), nil
		}),
		l.Stdout,
	).Run()
}
Output:

batch of size 3
batch of size 3
batch of size 3
batch of size 1
Example (WithTimeout)
package main

import (
	"fmt"
	"time"

	l "github.com/MasteryConnect/pipe/line"
	"github.com/MasteryConnect/pipe/message"
	"github.com/MasteryConnect/pipe/x"
)

func main() {
	l.New().SetP(func(out chan<- interface{}, errs chan<- error) {
		for i := 0; i < 2; i++ {
			out <- "foo"
		}

		// cause the producer to delay to allow the
		// timeout to trigger a batch that isn't full
		time.Sleep(10 * time.Millisecond)

		for i := 0; i < 2; i++ {
			out <- "foo"
		}
	}).Add(
		x.Batch{N: 3, Timeout: 9 * time.Millisecond}.T,
		l.Inline(func(m interface{}) (interface{}, error) {
			if b, ok := m.(message.Batch); ok {
				return fmt.Sprintf("batch of size %d", b.Size()), nil
			}
			return fmt.Sprintf("want *x.BatchMsg bot %T", m), nil
		}),
		l.Stdout,
	).Run()
}
Output:

batch of size 2
batch of size 2

type Buffer

type Buffer struct {
	N int
}

Buffer will create a buffer of Size to help "drain" a previous step.

Example
package main

import (
	"fmt"

	l "github.com/MasteryConnect/pipe/line"
	"github.com/MasteryConnect/pipe/x"
)

func main() {
	gateComplete := make(chan bool)
	gate := 0
	flush := make(chan bool)
	flushed := false
	msgsToSend := 100
	after := x.Count{Silent: true}

	l.New().SetP(func(out chan<- interface{}, errs chan<- error) {
		for i := 0; i < msgsToSend; i++ {
			out <- "foo"
		}

		// now wait for the buffer to load before checking the counts
		<-gateComplete
		fmt.Printf("before %d after %d\n", msgsToSend, after.Val())
		flush <- true
	}).Add(
		func(in <-chan interface{}, out chan<- interface{}, errs chan<- error) {
			for m := range in {
				gate++
				if gate == msgsToSend {
					gateComplete <- true
				}
				out <- m
			}
		},

		// Buffer will take in 100 messages regardless of downstream
		x.Buffer{N: msgsToSend}.T,

		l.Inline(func(m interface{}) (interface{}, error) {
			// don't process anything until we are flushed
			if !flushed {
				<-flush
				flushed = true
			}
			return m, nil // passthrough
		}),
		after.Use,
	).Run()

	fmt.Printf("before %d after %d\n", msgsToSend, after.Val())
}
Output:

before 100 after 0
before 100 after 100

func (Buffer) T

func (b Buffer) T(in <-chan interface{}, out chan<- interface{}, errs chan<- error)

T is the Tfunc for Buffer.

type Cmd

type Cmd struct {
	Name string
	Args []string

	NoStdin bool // don't send the msg.String() to stdin of the command
}

Cmd will execute a system command

func Command

func Command(name string, arg ...string) *Cmd

Command wraps the os/exec Command func

func (Cmd) P

func (e Cmd) P(out chan<- interface{}, errs chan<- error)

P is the producer function for the pipe/line

func (Cmd) T

func (e Cmd) T(in <-chan interface{}, out chan<- interface{}, errs chan<- error)

T is the transform function for the pipe/line. It will run the shell command for each message.

Example
package main

import (
	l "github.com/MasteryConnect/pipe/line"
	"github.com/MasteryConnect/pipe/x"
)

func main() {
	l.New().SetP(func(out chan<- interface{}, errs chan<- error) {
		out <- nil
		out <- nil
	}).Add(
		x.Command("echo", "foo").T,
		l.Stdout,
	).Run()
}
Output:

foo
foo
Example (Template_args)
package main

import (
	l "github.com/MasteryConnect/pipe/line"
	"github.com/MasteryConnect/pipe/x"
)

func main() {
	l.New().SetP(func(out chan<- interface{}, errs chan<- error) {
		out <- "ls"
		out <- "echo"
	}).Add(
		x.Command("echo", "{{.}}").T,
		l.Stdout,
	).Run()
}
Output:

ls
echo
Example (Template_name)
package main

import (
	l "github.com/MasteryConnect/pipe/line"
	"github.com/MasteryConnect/pipe/x"
)

func main() {
	l.New().SetP(func(out chan<- interface{}, errs chan<- error) {
		out <- "ls"
		out <- "echo"
	}).Add(
		x.Command("{{.}}", "foo").T,
		l.Stdout,
	).Run()
}
Output:

foo

func (Cmd) TStream

func (e Cmd) TStream(in <-chan interface{}, out chan<- interface{}, errs chan<- error)

TStream is the transform function for the pipe/line It will run the shell command once, and keep piping the messages in to stdin of the command.

type Commander

type Commander interface {
	Command() string
	Arguments() []string
}

Commander defines what it takes to be a command

type Count

type Count struct {
	Live    bool
	Silent  bool
	Mod     int64
	Raw     bool
	Mul     int64
	AutoMod bool
	// contains filtered or unexported fields
}

Count counts up the messages coming down the pipe. Options:

Live: show the counts real-time to the console
Silent: don't print to the console at all
	This is useful if you want to use the count
	programmatically instead of for the user on the console
Mod: Only print counts that are modulus this number
Raw: don't humanize the printed value (no commas)
Mul: multiply the printed number by this amount
	instead of 1 for the batch message itself
AutoMod: automatically determin the modulus
	to avoid printing to the console too much
Example
package main

import (
	l "github.com/MasteryConnect/pipe/line"
	"github.com/MasteryConnect/pipe/x"
)

func main() {
	l.New().SetP(func(out chan<- interface{}, errs chan<- error) {
		for i := 0; i < 4; i++ {
			out <- "foo"
		}
	}).Add(
		x.Count{}.T,
	).Run()
}
Output:

4
Example (Silent)
package main

import (
	"fmt"

	l "github.com/MasteryConnect/pipe/line"
	"github.com/MasteryConnect/pipe/x"
)

func main() {
	c := x.Count{Silent: true}

	l.New().SetP(func(out chan<- interface{}, errs chan<- error) {
		for i := 0; i < 4; i++ {
			out <- "foo"
		}
	}).Add(
		c.Use, // Use is on the pointer so the counted value is kept
	).Run()

	fmt.Println(c.Val())
}
Output:

4

func (Count) T

func (c Count) T(in <-chan interface{}, out chan<- interface{}, errs chan<- error)

T will count up the messages and pass them on.

func (*Count) Use

func (c *Count) Use(in <-chan interface{}, out chan<- interface{}, errs chan<- error)

Use will count up the messages and pass them on.

func (*Count) Val

func (c *Count) Val() int64

Val returns the value of the counter.

type ErrorHandler

type ErrorHandler struct {
	TaskToTry    l.InlineTfunc
	ErrorHandler l.Tfunc
}

func (ErrorHandler) T

func (eh ErrorHandler) T(in <-chan interface{}, out chan<- interface{}, errs chan<- error)

Process a message through the 'try' function. If there is an error, then pass it to the error handler. The error handler can determine what should happen, e.g. log in a special way and pass the error on, or ignore the error

type Fanout

type Fanout struct {
	// contains filtered or unexported fields
}

func NewFanout

func NewFanout(tfuncs []FanoutTfunc, msgTypes FanoutMsgTypesFunc) *Fanout

Create a new Fanout with FanoutMsgTypesFunc to get the types (names) of a message with the intention of filtering messages to the provided FanoutTfunc's. The filtering only works if a FanoutTfunc implements FanoutIncludeTypes. If no FanoutMsgTypesFunc is passed in, then FanoutTfunc's get all messages

func (*Fanout) T

func (f *Fanout) T(in <-chan interface{}, out chan<- interface{}, errs chan<- error)

type FanoutIncludeTypes

type FanoutIncludeTypes interface {
	I() []string
}

A FanoutTfunc can implement this interface to tell Fanout that it only wants the message types returned by I() sent to it.

type FanoutMsgTypesFunc

type FanoutMsgTypesFunc func(msg interface{}) (types []string)

Pass this in when creating Fanout if you want to include certain messages sent to a FanoutTfunc. When a message is passed to this function it should return the types (names) of the msg. These along with a FanoutTfunc's implementation of FanoutIncludeTypes will be used to determine the FanoutTfunc's the message is sent to.

type FanoutTfunc

type FanoutTfunc interface {
	T(<-chan interface{}, chan<- interface{}, chan<- error)
}

The Tfunc's Fanout fans out messages to.

type Group

type Group struct {
	By   GroupByFunc
	Size int

	Reduce ReduceFunc
	// contains filtered or unexported fields
}

Group is the grouping transformer for pipe/line.

Example
package main

import (
	"fmt"

	l "github.com/MasteryConnect/pipe/line"
	"github.com/MasteryConnect/pipe/x"
)

func main() {
	l.New().SetP(func(out chan<- interface{}, errs chan<- error) {
		for i := 0; i < 10; i++ {
			out <- i
		}
	}).Add(
		x.NewGroup(3, func(msg interface{}) []string {
			if msg.(int)%2 == 0 {
				return []string{"even"} // put in the "even" group
			}
			return []string{"odd"} // put in the "odd" group
		}).T,
		l.I(func(msg interface{}) (interface{}, error) {
			group := msg.(*x.GroupMsg) // type cast to the group message
			return fmt.Sprintf("-- %s --\n%v\n", group.Name, group.Batch), nil
		}),
		l.Stdout,
	).Run()

}
Output:

-- even --
0
2
4

-- odd --
1
3
5

-- even --
6
8

-- odd --
7
9

func NewGroup

func NewGroup(size int, by GroupByFunc) *Group

NewGroup makes a new Group.

func NewReduceGroup

func NewReduceGroup(by GroupByFunc, reduce ReduceFunc) *Group

NewReduceGroup makes a new Group with a reducer

func (*Group) T

func (g *Group) T(in <-chan interface{}, out chan<- interface{}, errs chan<- error)

T is the transform function.

type GroupByFunc

type GroupByFunc func(msg interface{}) (groups []string)

GroupByFunc is the function that is used to get the group names to group a message by.

type GroupMsg

type GroupMsg struct {
	message.Batch
	Name string
}

GroupMsg is the message to send down stream.

func (*GroupMsg) In

func (gm *GroupMsg) In() interface{}

In implements Inner for message.Get

type Head struct {
	N int // limit message to N
}

Head only allows the first Limit number of messages through then stops the pipeline.

func (Head) T

func (h Head) T(inMsgs <-chan interface{}, outMsgs chan<- interface{}, errs chan<- error)

T implements the pipeline interface.

type If

type If struct {
	Check  IfFunc
	Tfunc  l.Tfunc
	Else   l.Tfunc
	OnlyIf bool
}

If only allows the first Limit number of messages through then stops the pipeline.

func (*If) T

func (i *If) T(inMsgs <-chan interface{}, outMsgs chan<- interface{}, errs chan<- error)

T implements the pipeline interface.

type IfFunc

type IfFunc func(interface{}) bool

IfFunc is a func passed in to determin if the message should be used

type Progress

type Progress struct {
	*pb.ProgressBar
}

Progress shows the progress of the stream

func NewProgress

func NewProgress(total int) *Progress

NewProgress creates a new progress transformer

func (*Progress) AddToTotal

func (p *Progress) AddToTotal(in <-chan interface{}, out chan<- interface{}, errs chan<- error)

AddToTotal will passthrough message in a stream and add the count to the total

Example
package main

import (
	"bytes"
	"fmt"

	l "github.com/MasteryConnect/pipe/line"
	"github.com/MasteryConnect/pipe/x"
)

func main() {
	// unknown total so set to 0
	pb := x.NewProgress(0)

	// some message to send down stream
	msg := bytes.NewBufferString("foo")

	l.New().SetP(func(out chan<- interface{}, errs chan<- error) {
		for range [30]struct{}{} { // loop 30 times
			out <- msg
		}
	}).Add(
		pb.AddToTotal, // increments the total for the progress
	).Run()

	fmt.Println(pb.Total)
}
Output:

30

func (*Progress) T

func (p *Progress) T(in <-chan interface{}, out chan<- interface{}, errs chan<- error)

T will add each message to the progress bar progress

Example (KnownTotal)
package main

import (
	"bytes"
	"time"

	l "github.com/MasteryConnect/pipe/line"
	"github.com/MasteryConnect/pipe/x"
)

func main() {
	// known total of 3
	pb := x.NewProgress(3)

	l.New().SetP(func(out chan<- interface{}, errs chan<- error) {
		out <- bytes.NewBufferString("1")
		out <- bytes.NewBufferString("2")
		out <- bytes.NewBufferString("3")
	}).Add(
		x.RateLimit{N: 1, Per: time.Second}.T, // do some work
		pb.T,                                  // increments the progress bar
	).Run()
}
Output:

Example (UnknownTotal)
package main

import (
	"bytes"
	"time"

	l "github.com/MasteryConnect/pipe/line"
	"github.com/MasteryConnect/pipe/x"
)

func main() {
	// unknown total so set to 0
	pb := x.NewProgress(0)

	l.New().SetP(func(out chan<- interface{}, errs chan<- error) {
		out <- bytes.NewBufferString("1")
		out <- bytes.NewBufferString("2")
		out <- bytes.NewBufferString("3")
	}).Add(
		pb.AddToTotal,                         // increments the total for the progress
		x.RateLimit{N: 1, Per: time.Second}.T, // do some work
		pb.T,                                  // increments the progress bar
	).Run()
}
Output:

type RateLimit

type RateLimit struct {
	N   int64
	Per time.Duration

	// smooth out the rate instead of bursting
	Smooth bool
}

RateLimit limits how many messages and go through in a time frame.

func (RateLimit) T

func (rl RateLimit) T(in <-chan interface{}, out chan<- interface{}, errs chan<- error)

T is the Tfunc for the rate limiter

Example
package main

import (
	"time"

	l "github.com/MasteryConnect/pipe/line"
	"github.com/MasteryConnect/pipe/x"
)

func main() {
	l.New().SetP(func(out chan<- interface{}, errs chan<- error) {
		out <- "1"
		out <- "2"
		out <- "3"
	}).Add(
		x.RateLimit{N: 1, Per: time.Millisecond}.T,
		l.Stdout,
	).Run()
}
Output:

1
2
3

type ReduceFunc

type ReduceFunc func(memo, msg interface{}) (newmemo interface{})

ReduceFunc defines the reducer you can optionaly specify. The purpose is to define how a group of messages is collapsed down into a single message.

type SQL

type SQL struct {
	Table    string   // optional table to apply the mutation to
	MaskKeys []string // useful for logging without sensitive values like passwords

	NumberArgs bool // use $1 style placeholders for the resulting queries
}

SQL will transform a message to a query. It is batch aware

func (SQL) I

func (s SQL) I(m interface{}) (interface{}, error)

I implements the InlineTfunc interface

Example
package main

import (
	l "github.com/MasteryConnect/pipe/line"
	"github.com/MasteryConnect/pipe/message"
	"github.com/MasteryConnect/pipe/x"
)

func main() {
	l.New().SetP(func(out chan<- interface{}, errs chan<- error) {
		out <- message.NewRecordFromMSI(map[string]interface{}{
			"foo": "bar",
		})
	}).Add(
		l.I(x.SQL{Table: "foo"}.I),
		l.Stdout,
	).Run()

}
Output:

INSERT INTO foo (foo) VALUES ('bar')

func (SQL) SQLInsertFromBatch

func (s SQL) SQLInsertFromBatch(v message.Batch) (*message.Query, error)

SQLInsertFromBatch converts a batch message to a bulk INSERT query message

func (SQL) T

func (s SQL) T(in <-chan interface{}, out chan<- interface{}, errs chan<- error)

T implements the Tfunc interface

Example
package main

import (
	l "github.com/MasteryConnect/pipe/line"
	"github.com/MasteryConnect/pipe/message"
	"github.com/MasteryConnect/pipe/x"
)

func main() {
	l.New().SetP(func(out chan<- interface{}, errs chan<- error) {
		out <- message.NewRecordFromMSI(map[string]interface{}{
			"foo": "bar",
		})
	}).Add(
		x.SQL{Table: "foo"}.T,
		l.Stdout,
	).Run()

}
Output:

INSERT INTO foo (foo) VALUES ('bar')
Example (Batch)
package main

import (
	l "github.com/MasteryConnect/pipe/line"
	"github.com/MasteryConnect/pipe/message"
	"github.com/MasteryConnect/pipe/x"
)

func main() {
	l.New().SetP(func(out chan<- interface{}, errs chan<- error) {
		out <- message.NewRecordFromMSI(map[string]interface{}{
			"foo": "bar",
		})
		out <- message.NewRecordFromMSI(map[string]interface{}{
			"foo": "bar2",
		})
	}).Add(
		x.Batch{N: 2}.T,
		x.SQL{Table: "foo"}.T,
		l.Stdout,
	).Run()

}
Output:

INSERT INTO foo (foo) VALUES ('bar'),('bar2')

type ShardMany

type ShardMany struct {
	// contains filtered or unexported fields
}

func NewShardMany

func NewShardMany(concurrency int, tfunc l.Tfunc, shardManyKeyFunc ShardManyKeyFunc) (*ShardMany, error)

Create a new ShardMany instance concurrency - the number of go routines (shards) tfunc - the pipeline/function to process each message shardManyKeyFunc - when passed a message this function with return a key

func (*ShardMany) T

func (sm *ShardMany) T(in <-chan interface{}, out chan<- interface{}, errs chan<- error)

The transformer function Each message pulled from the in channel will be:

  1. Passed to the ShardManyKeyFunc to get the messages key
  2. Sent to the shard processor (channel) for messages with the key from step 1

To determine which shard to pass the message to, the key is turned into an int, and then the int of the key is mapped to a shard by using the modulus operator with the number of shards (concurrency).

type ShardManyKeyFunc

type ShardManyKeyFunc func(msg interface{}) (key []byte)

Given a message return a key for the message. Multiple messages can return the same key if they should be processed in order they come down the in channel

type Sort

type Sort struct {
	N       int
	Compare func(x, y interface{}) bool // true if x <= y
}

Sort will kind of sort the messages withing a size window. There is no guarantee of exact ordering, but better than nothing.

func (Sort) T

func (s Sort) T(in <-chan interface{}, out chan<- interface{}, errs chan<- error)

T is the Tfunc for Sort.

type Tail

type Tail struct {
	N int // limit message to N
}

Tail only allows the last Limit number of messages through the pipeline.

func (Tail) T

func (t Tail) T(inMsgs <-chan interface{}, outMsgs chan<- interface{}, errs chan<- error)

T implements the pipeline interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL