selina

package module
v0.23.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2023 License: MIT Imports: 12 Imported by: 0

README

selina

Test Go Report Card Coverage Status godoc

Simple Pipeline for go, inspired on ratchet https://github.com/dailyburn/ratchet

Unstable API, please use go modules

Installation

go get github.com/licaonfee/selina

Usage

package main

import (
    "fmt"
    "os"
    "strings"
    "context"

    "github.com/licaonfee/selina"
    "github.com/licaonfee/selina/workers/regex"
    "github.com/licaonfee/selina/workers/text"
)

const sample = `this is a sample text
this is second line
#lines started with # will be skipped
this line pass`

func main() {
    rd := strings.NewReader(sample)
    input := selina.NewNode("Read", text.NewReader(text.ReaderOptions{Reader: rd}))
    //https://regex101.com/r/7ZS3Uw/1
    filter := selina.NewNode("Filter", regex.NewFilter(regex.FilterOptions{Pattern: "^[^#].+"}))
    output := selina.NewNode("Write", text.NewWriter(text.WriterOptions{Writer: os.Stdout}))
    pipe := selina.NewSimplePipeline(input, filter, output)
    if err := pipe.Run(context.Background()); err != nil {
        fmt.Printf("ERR: %v\n", err)
    }
    for name, stat := range pipe.Stats(){
        fmt.Printf("Node:%s=%v\n", name, stat)
    }
}

Graphviz

Optionally you can render a graph of your pipeline

func main(){
    // here pipeline is built
    p := selina.FreePipeline(read,filter,write,store,custom,csv)
    selina.Graph(p, os.Stdout)
}

With te previous code you get a .dot graph

digraph {
  rankdir=LR;
  X0001FXACQQKSTZV3SGXZPF3C9C[label="Filter"];
  X0001FXACQQKSTZV3SGY4YSE6NX[label="Write"];
  X0001FXACQQKSTZV3SGY2XQMB1X[label="Custom"];
  X0001FXACQQKSTZV3SGY5QMFJ0Q[label="Store"];
  X0001FXACQQKSTZV3SGXTVX3MPP[label="Read"];
  X0001FXACQQKSTZV3SGXXXA43EF[label="CSV"];
  X0001FXACQQKSTZV3SGXTVX3MPP -> X0001FXACQQKSTZV3SGXXXA43EF [label="count=10000,bytes=1MiB"];
  X0001FXACQQKSTZV3SGXTVX3MPP -> X0001FXACQQKSTZV3SGXZPF3C9C [label="count=10000,bytes=1MiB"];
  X0001FXACQQKSTZV3SGXXXA43EF -> X0001FXACQQKSTZV3SGY4YSE6NX [label="count=10000,bytes=1MiB"];
  X0001FXACQQKSTZV3SGXZPF3C9C -> X0001FXACQQKSTZV3SGY2XQMB1X [label="count=10000,bytes=1MiB"];
  X0001FXACQQKSTZV3SGY2XQMB1X -> X0001FXACQQKSTZV3SGY5QMFJ0Q [label="count=10000,bytes=1MiB"];
}

Renderized with graphviz

graph

Builtin workers

By default selina has this workers implemented

  • csv.Encoder : Transform data from json to csv
  • csv.Decoder : Transform csv data into json
  • custom.Function : Allow to execute custom functions into a pipeline node
  • ops.Cron : Allow scheduled messages into a pipeline
  • ops.TimeSerie: Generate time series data
  • random.Random : Generate random byte slices
  • regex.Filter : Filter data using a regular expresion
  • remote.Server : Listen for remote data
  • remote.Client : Send data to a remote pipeline
  • sql.Reader : Execute a query against a database and return its rows as json objects
  • sql.Writer : Insert rows into a table from json objects
  • text.Reader : Use any io.Reader and read its contents as text
  • text.Writer : Write text data into any io.Writer
  • filesystem.Reader : Use afero.Fs to read arbitrary files
  • filesystem.Writer : Use afero.Fs to write to arbitrary files

Design

Selina have three main components

  • Pipeline
  • Node
  • Worker

Some utility functions are provided to build pipelines, LinealPipeline(n ... Node)*Pipeliner chain all nodes in same order as their are passed. FreePipeline(n ...Node)*Pipeliner Just runs all nodes without chain them so you can build any pipeline, including ciclic graphs or aciclic graphs

Pipeline

Start data processing and manage all chained nodes in a single object

Node

Contains methods to pass data from Worker to Worker and get metrics

Worker

All data Extraction/Transformation/Load logic is encapsulated in a Worker instance

Conventions for workers
  • A nil input channel is only for workers that produces data if a worker does not allow nil input channel it must returns selina.ErrNilUpstream
  • Workers must close its output channel when finish their job
  • A closed input channel must gracefully finalize worker
  • Pipeline finalization is triggered vía channel closing
  • All workers must handle context cancellation
Codec

Most of workers receive an optional configuration Codec that implements Marshaler/Unmarshaler interfaces, by default msgpack is used if no Codec is provided

Command line Usage

Binary

selina -file pipeline.yml -timeout 10h

Docker

#all paths are relatives to /app/selina
#you can also use absolute paths
docker run -v$PWD:/data/sample:ro licaonfee/selina:rolling -f /data/pipeline.yml

Where pipeline.yml is

---
nodes:
  - name: employes
    type: read_file
    args:
      filename: /data/employes.csv
  - name: filter_it
    type: regex
    args:
      pattern: '^.*,it,.*$'
    fetch:
      - employes
  - name: to_json
    type: csv
    args:
      mode: decode
      header: [name,role,department,id]
    fetch:
      - filter_it
  - name: it_employes
    type: write_file
    args:
      filename: it_employes.txt
      ifexists: overwrite
      mode: 0644
    fetch:
      - to_json

Autocompletion

Also yun can use any LSP compatible editor with to autocomplete selina pipelines

Get json-shema
selina -schema > /home/user/selina-schema.json
VIM

In .vim/coc-settings.json

{
  "yaml.schemas": {
  "/home/user/selina-schema.json": "*.selina.yaml"
  }
}
VSCode
  • Install redhat.vscode-yaml extension

In settings.json

{
  "yaml.schemas": {
  "/home/user/selina-schema.json": "*.selina.yaml"
  }
}

Documentation

Overview

Package selina pipeline library

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotHaveNodes attempt to start a pipeline without nodes
	ErrNotHaveNodes = errors.New("Pipeliner does not have nodes")
	// ErrInconsistentStart pipeline does not start all given nodes
	ErrInconsistentStart = errors.New("Pipeliner does not start all nodes")
	// ErrMissingStats some nodes stats are absent on call Stat method
	ErrMissingStats = errors.New("missing nodes in Stats map")
)
View Source
var (
	DefaultUnmarshaler = json.Unmarshal
	DefaultMarshaler   = json.Marshal
)

Default Marshaler and Unmarshaler

View Source
var ErrAlreadyStarted = errors.New("node already started")

ErrAlreadyStarted returned if Start method is called more than once

View Source
var ErrNilUpstream = errors.New("nil upstream channel")

ErrNilUpstream is returned when a worker does not allow to not have an upstream worker

View Source
var ErrStopNotStarted = errors.New("stopping a not started worker")

ErrStopNotStarted returned when Stop is called before Start method

Functions

func ATPipelineContextCancel

func ATPipelineContextCancel(p Pipeliner) error

ATPipelineContextCancel context must be propagated to all Nodes

func ATPipelineStartAll

func ATPipelineStartAll(p Pipeliner) error

ATPipelineStartAll all Nodes in a pipeline mus be started when pipeline.Start is called

func ATPipelineStats added in v0.3.0

func ATPipelineStats(p Pipeliner) error

ATPipelineStats check if a Pipeliner implementation get all nodes stats

func ChannelAsSlice

func ChannelAsSlice[T any](in <-chan T) []T

ChannelAsSlice read from in channel until is closed return an slice with all messages received

func DefaultErrorHandler added in v0.19.0

func DefaultErrorHandler(e error) bool

DefaultErrorHandler is a pessimist error handler always returns false on error

func FreeBuffer added in v0.23.0

func FreeBuffer(b *bytes.Buffer)

FreeBuffer calls Buffer.Reset and return buffer to the pool

func GetBuffer added in v0.23.0

func GetBuffer() *bytes.Buffer

GetBuffer returns a buffer from a pool of buffers

func Graph added in v0.17.0

func Graph(p Pipeliner, w io.Writer) error

Graph export current pipeline structure and stats to .dot notation

func SendContext added in v0.3.0

func SendContext(ctx context.Context, msg *bytes.Buffer, output chan<- *bytes.Buffer) error

SendContext try to send msg to output, it returns an error if context is canceled before msg is sent

func SliceAsChannel

func SliceAsChannel[T any](data []T, autoClose bool) chan T

SliceAsChannel return a channel that read from an slice if autoClose is true , then channel is closed after last message is consummed

func SliceAsChannelOfBuffer added in v0.23.0

func SliceAsChannelOfBuffer(data []string, autoClose bool) chan *bytes.Buffer

SliceAsChannelOfBuffer return a channel that read from an slice if autoClose is true , then channel is closed after last message is consummed

func SliceAsChannelRaw

func SliceAsChannelRaw[T any](data []T, autoClose bool) chan T

SliceAsChannelRaw same as SliceAsChannel

Types

type Broadcaster

type Broadcaster struct {
	DataCounter
	// contains filtered or unexported fields
}

Broadcaster allow to write same value to multiple groutines

func (*Broadcaster) Broadcast

func (b *Broadcaster) Broadcast(input <-chan *bytes.Buffer)

Broadcast read values from input and send it to output channels

func (*Broadcaster) Client

func (b *Broadcaster) Client() <-chan *bytes.Buffer

Client create an output chanel, it panics if Broadcast is already called

type DataCounter added in v0.3.0

type DataCounter struct {
	// contains filtered or unexported fields
}

DataCounter a simple atomic wrapper

func (*DataCounter) Stats added in v0.3.0

func (c *DataCounter) Stats() (count int64, data int64)

Stats return values as an atomic operation per value count and data will be consistent only with itself

func (*DataCounter) SumData added in v0.3.0

func (c *DataCounter) SumData(msg []byte)

SumData icrement count+1 and data + len(msg) while both values are incremented in an atomic way is posible to get inconsistent reads on call Stats while object is in use

type ErrorHandler added in v0.15.0

type ErrorHandler func(error) bool

ErrorHandler return true if error was handled inside function if error is handled Worker must continue proccesing and just skip failure

type Marshaler added in v0.18.2

type Marshaler func(interface{}) ([]byte, error)

Marshaler is a function type compatible with json.Marshal

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node a node that can send and receive data

func NewNode

func NewNode(name string, w Worker) *Node

NewNode create a new node that wraps Worker name is a user defined identifier, internally Node generates an unique id

func (*Node) Chain

func (n *Node) Chain(next *Node) *Node

Chain send messages emitted by worker to next node, it returns next node to be chained again if next is already chained this operation does nothing

func (*Node) ID added in v0.4.0

func (n *Node) ID() string

ID return a unique identifier for this node

func (*Node) IsChained added in v0.4.0

func (n *Node) IsChained(other *Node) bool

IsChained returns true if Chain was called before with other

func (*Node) Name

func (n *Node) Name() string

Name return node name this value is not unique

func (*Node) Next added in v0.17.0

func (n *Node) Next() []string

Next returns nodes id chained to current node

func (*Node) Running

func (n *Node) Running() bool

Running true if Start() method was called

func (*Node) Start

func (n *Node) Start(ctx context.Context) error

Start initialize the worker, worker.Process should be called multiple times until Node is stoped or worker.Process return an error

func (*Node) Stats added in v0.3.0

func (n *Node) Stats() Stats

Stats return Worker channels stats

func (*Node) Stop

func (n *Node) Stop() error

Stop stop worker in node, must be called after Start successive calls to Stop does nothing

type OptionsChecker added in v0.8.0

type OptionsChecker interface {
	// Check return an error if options has an invalid value
	// it must not modify values at all and by preference should be
	// implemented as a value receiver
	Check() error
}

OptionsChecker provide a way to determine if a state is valid or not

type Pipeliner

type Pipeliner interface {
	Run(context.Context) error
	Stats() map[string]Stats
	Nodes() []*Node
}

Pipeliner all implementations must meet the following conditions Run must call Node.Start of all Nodes Context passed in Run must be propagated to all Node.Start methods Nodes() return an slice with all instances of *Nod

func FreePipeline added in v0.4.0

func FreePipeline(nodes ...*Node) Pipeliner

FreePipeline provide a method to run arbitrary chained Nodes this method does not call Node.Chain

func LinealPipeline added in v0.4.0

func LinealPipeline(nodes ...*Node) Pipeliner

LinealPipeline creates a Pipeliner Nodes in "nodes" are chained in a slingle branch Pipeline Node(0)->Node(1)->Node(2)->....Node(n)

type ProcessArgs added in v0.3.0

type ProcessArgs struct {
	// Input is nil when there is no upstream channel
	Input  <-chan *bytes.Buffer
	Output chan<- *bytes.Buffer
	Err    chan error
}

ProcessArgs encapsulate arguments to Worker.Process

type Receiver

type Receiver struct {
	DataCounter
	// contains filtered or unexported fields
}

Receiver join multiple channels into a single output channel this allow to add new channels after Receive is called

func (*Receiver) Receive

func (r *Receiver) Receive() <-chan *bytes.Buffer

Receive listen to all channels configured with Watch when all channels are closed, output chanel is closed too if there is no channels in watch list , this method returns a nil channel

func (*Receiver) Watch

func (r *Receiver) Watch(input <-chan *bytes.Buffer)

Watch add a new channel to be joined Call Watch after Receive is a panic

type SimplePipeline

type SimplePipeline struct {
	// contains filtered or unexported fields
}

SimplePipeline default value is unusable, you must create it with NewSimplePipeline

func (*SimplePipeline) Nodes

func (p *SimplePipeline) Nodes() []*Node

Nodes return all instances of *Node

func (*SimplePipeline) Run

func (p *SimplePipeline) Run(ctx context.Context) error

Run init pipeline proccesing, return an error!= nil if any Node fail

func (*SimplePipeline) Stats

func (p *SimplePipeline) Stats() map[string]Stats

Stats returns a map with all nodes Stats object

type Stats added in v0.3.0

type Stats struct {
	Time          time.Time
	Sent          int64
	SentBytes     int64
	Received      int64
	ReceivedBytes int64
}

Stats contain node overall statistics Counters are garanted to be consistent only when node finalize

type Unmarshaler added in v0.18.2

type Unmarshaler func([]byte, interface{}) error

Unmarshaler is a function type compatible with json.Unmarshal

type Worker

type Worker interface {
	// Process must close write only channel
	Process(ctx context.Context, args ProcessArgs) error
}

Worker is standard interface implemented by proccessors, is used to build pipeline nodes All Worker implementations must meet the following conditions if a worker does not have another worker in upstream then its receive a nil channel in input this is useful to idetify the situation and return and error On close input channel, Process must finalize its work gracefully, and return nil On context cancellation, Process finalize ASAP and return context.Cancelled On finish, Process must close output channel and return error or nil

Directories

Path Synopsis
examples
Package workers common structure for selina workers
Package workers common structure for selina workers
csv
Package csv workers to read and write csv format
Package csv workers to read and write csv format
custom
Package custom implements an user defined function
Package custom implements an user defined function
filesystem
Package filesystem utilities for read and write files
Package filesystem utilities for read and write files
ops
sql

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL