iter

package module
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2018 License: MIT Imports: 4 Imported by: 0

README

Package iter - Streaming Iterators for Go

Overview

Package iter is providing simple interfaces and facilities for streaming data through iterators in a functional style. This is useful if you want to create an application that is reading data from a DB, processing it and immediately streaming the results to some client (without waiting for all data to be processed first).

This package makes it easy to instantiate Streaming Iterators, taking care of all the brittle details of asynchronous, concurrent streaming while you just need to provide a mapper function to be applied to the stream.

Features

  • provides a simple Iterator interface
  • takes care of all streaming details behind the scene
  • you just need to provide a Mapper function for processing of the streamed data
  • configurable amount of worker routines
  • configurable channel buffer size
  • supports continue on error
  • supports streaming from the 3 most common sources directly:
    • Generators, Iterators and Channels
  • Iterators can be chained
  • easy to extend to specific types

Problem Statement

Implementing the iterator pattern in Go is straightforward. And thanks to Channels and Go-routines it is very easy to stream values asynchronously - just spin up a sender and a receiver Go-routine and let them communicate over a channel. If you want to also notify about errors you add another channel. If you want to cancel the sender when the receiver is done things start to become complex. At latest when you start to have multiple senders/receivers running concurrently, maybe using buffered channels, you will inevitably run into issues and surprising effects, like

  • race conditions / deadlocks
  • flaky tests / Heisenbugs that are horrible to debug
  • unpredictable ordering
  • valid results on subsequent calls after an error
  • getting an EOF before all go routines are done
  • ...

I hit these issues many times and if you want to stream values through several layers of an application you start to re-implement all this functionality (and the mistakes) again and again.

Usage

Package iter is build up on three concepts:

  • a Mapper func is applied to an input item and creates an output item
  • a Stream is a function that can be applied on an input source, sets up go-routines to apply the Mapper func on the input items and returns an Iterator for iterating over the results.
  • an Iterator can be used to iterate over the result items of a stream and to cancel the background processes when done.

There are three types of directly supported input sources to stream from: Generator funcs, Iterators and Channels.

Stream from a Generator Func
stream := iter.NewGeneratorStream(context.Background(), mapperFunc)
iterator := stream(generatorFunc)
defer iterator.Close()
for {
    item, err := iterator.Next()
    ...
}
 
Stream from an Iterator
stream := iter.NewStream(context.Background(), mapperFunc)
iterator := stream(inputIter)
defer iterator.Close()
for {
    item, err := iterator.Next()
    ...
}
 
Stream from Channels
stream := iter.NewChannelStream(context.Background(), mapperFunc)
iterator := stream(inputChan, errChan)
defer iterator.Close()
for {
    item, err := iterator.Next()
    ...
}
 
Stream Options
// supported options with their defaults:
bufsize := iter.BufSizeOpt(0)
workers := iter.WorkersOpt(1)
contOnErr := iter.ContOnErrOpt(false)

stream := iter.NewStream(context.Background(), mapperFunc, bufSize, workers, contOnErr)
...
Important Properties
  • setting less than 1 worker will cause a panic
  • after iter.Close(), iter.Next() may still return results with more than 1 worker or when using buffers
  • if an iter.Next() call returns an error, subsequent calls may still return valid results from other workers or a buffered channel
  • choosing more than 1 worker will make the order of results unpredictable
  • by default, a Stream will eventually stop streaming after a Mapper returned an error
    • use ContOnErrOpt(true) to change this behavior
  • make sure that generators and mappers are threadsafe if you want to use more than one worker
    • the iterator returned by New is threadsafe
  • adding many buffers in a chain of streams will lead to pre-fetching of many items that may be disregarded when downstream is canceling the stream

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ChannelStream

type ChannelStream func(inputChan chan interface{}, errChan chan error) Iterator

ChannelStream is a function that creates a new Iterator for processing and streaming of data from the given inputChan, while listening for errors on errChan.

func NewChannelStream

func NewChannelStream(ctx context.Context, mapper Mapper, opts ...StreamOpt) ChannelStream

NewChannelStream is setting up a ChannelStream func with the given Mapper func and StreamOpts.

type Generator

type Generator func() (interface{}, error)

Generator is the signature of a generator func.

type GeneratorStream

type GeneratorStream func(Generator) Iterator

GeneratorStream is a function that creates a new Iterator for processing and streaming of data from the given Generator func.

func NewGeneratorStream

func NewGeneratorStream(ctx context.Context, mapper Mapper, opts ...StreamOpt) GeneratorStream

NewGeneratorStream is setting up a GeneratorStream func with the given Mapper func and StreamOpts.

type Iterator

type Iterator interface {
	Next() (interface{}, error)
	Close()
}

Iterator is the interface for an object that can be used to iterate through a set of items.

func New

func New(itemChan chan interface{}, errChan chan error, cancel context.CancelFunc) Iterator

New is returning a new *iterator instance.

type Mapper

type Mapper func(ctx context.Context, input interface{}) (outpout interface{}, err error)

Mapper is the signature of a mapper function which is applied to the stream items by the worker go routines.

type Stream

type Stream func(Iterator) Iterator

Stream is the signature of a function that creates a new Iterator for processing and streaming of data from the given Iterator.

func NewStream

func NewStream(ctx context.Context, mapper Mapper, opts ...StreamOpt) Stream

NewStream is setting up a Stream func with the given Mapper func and StreamOpts.

type StreamOpt

type StreamOpt func(conf *streamConf)

StreamOpt is a functional option type.

func BufSizeOpt

func BufSizeOpt(size int) StreamOpt

BufSizeOpt is a functional option setting the channel buffer size of the stream (default: 0)

func ContOnErrOpt

func ContOnErrOpt(cont bool) StreamOpt

ContOnErrOpt is a functional option that lets the stream continue after an error if set to true (default: false).

func WorkersOpt

func WorkersOpt(workers int) StreamOpt

WorkersOpt is a functional option setting the amount of worker threads (default: 1).

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL