stream

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2021 License: MIT Imports: 2 Imported by: 0

README

stream

An experiment around building composable, streamable pipelines in go

the idea it self is not about maximum performance but rather an simplier way to abstract channels, workers, context cancellation and dynamic data mostly for ETL jobs. It relies heavily on interface{} to pass data around and has some heavy reflection usage on github.com/stdiopt/stream/strmutil.

it's be possible to build procs around serializing CSVs, selecting DB, producing Parquet, crawling url's, consuming API's, etc...

ProcFunc

a ProcFunc is the function signature to chain transforms in a stream ProcFuncs should block until it doesn't have more messages to send, exiting early closes the internal channel and will stop further processors

  • Each procFunc runs in a go routine
  • ProcFuncs should block until they don't have any more data to send
  • Consume or Send will be cancelled if context is done
type ProcFunc = func(p stream.Proc) error

the Proc interface:

type Proc interface {
	Context() context.Context
	Consume(func(interface{}) error) error
	Send(interface{}) error
}

Consume is a blocking method that consumes messages from the previous processor and calls the func passed as argument for each message

Send will send a value to the next processor

an usual ProcFunc looks like:

func(p stream.Proc) error {
	// Initialize things, open files, db connections, whatever fits the
	// processor

	// Since consume blocks we can call it in the end to hold the function
	// until we don't have more to consume if the underlying context is
	// cancelled due to a previous error or timeout the Consume will cease and
	// return
	return p.Consume(func(v interface{}) error {
		// do something with consumed value
		return p.Send(transformed)
	})
}

Usage

package main

import (
	"fmt"

	"github.com/stdiopt/stream"
)

func main() {
	l := stream.Line(
		produce,
		consume,
	)
	if err := stream.Run(l); err != nil {
		fmt.Println("err:", err)
	}
}
func produce(p stream.Proc) error {
	for i := 0; i < 10; i++ {
		if err := p.Send(i); err != nil {
			return err
		}
	}
	return nil
}
func consume(p stream.Proc) error {
	return p.Consume(func(v interface{}) error {
		fmt.Println("Consuming:", v)
		return nil
	})
}

Consuming API Example here

func main() {
	err := stream.Run(
		strmutil.Value("https://randomuser.me/api/?results=100"), // just sends the string
		strmutil.HTTPGet(nil),
		strmutil.JSONParse(nil),
		strmutil.Field("results"),
		strmutil.Unslice(),
		strmutil.Field("picture.thumbnail"),
		// Download profile pictures thumbnails concurrently
		stream.Workers(32, HTTPDownload(nil)),
		strmutil.JSONDump(os.Stdout),
	)
	if err != nil {
		log.Println("err:", err)
	}
}

examples

Documentation

Overview

Package stream provides stuff

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Run

func Run(pfns ...ProcFunc) error

Run will run the stream.

func RunWithContext

func RunWithContext(ctx context.Context, pfns ...ProcFunc) error

RunWithContext runs the stream with a context.

Types

type Chan

type Chan struct {
	// contains filtered or unexported fields
}

Chan wraps a channel and a context for cancellation awareness.

func NewChan

func NewChan(ctx context.Context, buffer int) Chan

NewChan returns a Chan based on context with specific buffer size.

func (Chan) Close

func (c Chan) Close()

Close closes the underlying channel

func (Chan) Consume

func (c Chan) Consume(fn ConsumerFunc) error

Consume will consume a channel and call fn with the consumed value, it will block until either context is cancelled, channel is closed or ConsumerFunc error is not nil

func (Chan) Send

func (c Chan) Send(v interface{}) error

Send sends v to the underlying channel if context is cancelled it will return the underlying ctx.Err()

type Consumer

type Consumer interface {
	Consume(ConsumerFunc) error
}

type ConsumerFunc

type ConsumerFunc = func(v interface{}) error

ConsumerFunc function type to receive messages.

type Proc

type Proc interface {
	Consumer
	Sender
	Context() context.Context
}

Proc is the interface used by ProcFuncs to Consume and send data to the next func.

func MakeProc

func MakeProc(ctx context.Context, c Consumer, s Sender) Proc

MakeProc makes a Proc based on a Consumer and Sender.

type ProcFunc

type ProcFunc = func(Proc) error

func Broadcast

func Broadcast(pfns ...ProcFunc) ProcFunc

Broadcast consumes and passes the consumed message to all pfs ProcFuncs.

func Buffer

func Buffer(n int, pfns ...ProcFunc) ProcFunc

Buffer will create an extra buffered channel.

func Line

func Line(pfns ...ProcFunc) ProcFunc

Line will consume and pass a message sequentually on all ProcFuncs.

func Workers

func Workers(n int, pfns ...ProcFunc) ProcFunc

Workers will start N ProcFuncs consuming and sending on same channels.

type Sender

type Sender interface {
	Send(v interface{}) error
}

Directories

Path Synopsis
examples
Package strmagg performs aggregation on streams
Package strmagg performs aggregation on streams
Package strmutil provides stream utils
Package strmutil provides stream utils

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL