luigi

package module
v0.3.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 13, 2021 License: MIT Imports: 5 Imported by: 42

README

luigi GoDoc Build Status

Luigi is an Golang package for implementing streams of values. It defines types for data Sources and Sinks, and ways to manupulate them via Map / Filter / Reduce.

Examples

These may be found in the Go documentation of the package.

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrPourToClosedSink = errors.New("pour to closed sink")

Functions

func IsEOS

func IsEOS(err error) bool

IsEOS checks whether the error is due to a closed stream.

func NewBroadcast

func NewBroadcast() (Sink, Broadcast)

NewBroadcast returns the Sink, to write to the broadcaster, and the new broadcast instance.

func NewPipe

func NewPipe(opts ...PipeOpt) (Source, Sink)

NewPipe returns both ends of a stream.

func Pump

func Pump(ctx context.Context, dst Sink, src Source) error

Pump moves values from a source into a sink.

Currently this doesn't work atomically, so if a Sink errors in the Pour call, the value that was read from the source is lost.

Types

type Broadcast

type Broadcast interface {
	// Register a Sink for updates to be sent.
	Register(dst Sink) func()
}

Broadcast is an interface for registering one or more Sinks to recieve updates.

Example
sink, bcast := NewBroadcast()
defer sink.Close()

var printOutput FuncSink = func(
	_ context.Context,
	v interface{},
	_ error,
) error {
	if v == nil {
		return nil
	}
	fmt.Println(*v.(*string))
	return nil
}

closeSink := bcast.Register(printOutput)
defer closeSink()
closeSink = bcast.Register(printOutput)
defer closeSink()

msg := "I sink this should be printed twice"
_ = sink.Pour(context.Background(), &msg)
Output:

I sink this should be printed twice
I sink this should be printed twice

type EOS

type EOS struct{}

EOS stands for End Of Stream. It signals when a non-blocking stream is empty, or a stream is closed.

Similar the io package's EOF.

func (EOS) Error

func (_ EOS) Error() string

type ErrorCloser

type ErrorCloser interface {
	CloseWithError(error) error
}

type FuncSink

type FuncSink func(ctx context.Context, v interface{}, err error) error

FuncSink defines a function which can be used as a Sink.

func (FuncSink) Close

func (fSink FuncSink) Close() error

Close implements the Sink interface.

func (FuncSink) CloseWithError

func (fSink FuncSink) CloseWithError(err error) error

func (FuncSink) Pour

func (fSink FuncSink) Pour(ctx context.Context, v interface{}) error

Pour implements the Sink interface.

type FuncSource

type FuncSource func(context.Context) (interface{}, error)

FuncSource defines a function which can be used as a Source.

func (FuncSource) Next

func (fSink FuncSource) Next(ctx context.Context) (interface{}, error)

Next implements the Pour interface.

type Observable

type Observable interface {
	// Broadcast allows subscribing to changes
	Broadcast

	// Set sets a new value
	Set(interface{}) error

	// Value returns the current value
	Value() (interface{}, error)
}

Observabe wraps an interface{} value and allows tracking changes to it

func NewObservable

func NewObservable(v interface{}) Observable

NewObservable returns a new Observable

type PipeOpt

type PipeOpt func(*pipeOpts) error

PipeOpt configures NewPipes behavior

func NonBlocking

func NonBlocking() PipeOpt

NonBlocking changes the behavior to assume a non-blocking backing medium

func WithBuffer

func WithBuffer(bufSize int) PipeOpt

WithBuffer sets the buffer size of the internal channel

type PushSource

type PushSource interface {
	Push(ctx context.Context, dst Sink) error
}

PushSource is the interface for requesting all content be written to the given sink.

type Sink

type Sink interface {
	Pour(ctx context.Context, v interface{}) error
	Close() error
}

Sink is the interface which wraps methods writing to a stream.

type SliceSink

type SliceSink struct {
	// contains filtered or unexported fields
}

SliceSink binds Sink methods to an interface array.

func NewSliceSink

func NewSliceSink(arg *[]interface{}) *SliceSink

NewSliceSink returns a new SliceSink bound to the given interface array.

func (*SliceSink) Close

func (sink *SliceSink) Close() error

Close is a dummy method to implement the Sink interface.

func (*SliceSink) Pour

func (sink *SliceSink) Pour(ctx context.Context, v interface{}) error

Pour implements the Sink interface. It writes value to a destination Sink.

type SliceSource

type SliceSource []interface{}

SliceSink binds Source methods to an interface array.

func (*SliceSource) Next

func (src *SliceSource) Next(context.Context) (v interface{}, err error)

Next implements the Source interface.

type Source

type Source interface {
	Next(context.Context) (obj interface{}, err error)
}

Source is the interface which wraps the Next method for reading from a stream.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL