conduit: github.com/nickylogan/conduit Index | Examples | Files

package conduit

import "github.com/nickylogan/conduit"

In this example we create a simple pipeline to square a list of generated numbers.

Code:

// The pipeline is configured to create three workers, where there can be at most ten
// jobs in queue. As the rate limit is set to 5, only 5 jobs at most can run in one second.
cfg := conduit.Config{
    MaxJobs:      10,
    MaxWorkers:   3,
    RateLimit:    5,
    OutputBuffer: 5,
}

// Create a source to generate 10 numbers onto a channel
generatorFunc := conduit.GeneratorFunc(func(out chan<- interface{}) {
    for i := 1; i <= 10; i++ {
        out <- i
    }
})
numbers := conduit.NewSource(cfg, generatorFunc).Generate()

// Create a process pipe that squares the incoming input
squareFunc := conduit.ProcessorFunc(func(in interface{}) (out interface{}) {
    x := in.(int)
    time.Sleep(time.Duration(rand.Intn(150)) * time.Millisecond)
    return x * x
})
squares := conduit.NewPipe(cfg, squareFunc).Process(numbers)

// As a sink, print each incoming input
printer := conduit.ReceiverFunc(func(in interface{}) {
    fmt.Println(in)
})
done := conduit.NewSink(cfg, printer).Receive(squares)
<-done

Output:

1
4
9
16
25
36
49
64
81
100

Index

Examples

Package Files

pipe.go sink.go source.go types.go

type Config Uses

type Config struct {
    MaxJobs      int
    MaxWorkers   int
    RateLimit    int
    OutputBuffer int
}

Config configures the batch job worker

type Generator Uses

type Generator interface {
    // Generate generates data onto the given channel
    Generate(out chan<- interface{})
}

Generator is an interface for wrapping a generator function

type GeneratorFunc Uses

type GeneratorFunc func(out chan<- interface{})

GeneratorFunc is a type adapter allowing regular functions to be Receiver. If f is a function with the appropriate signature, GeneratorFunc(f) is a Generator the calls f.

func (GeneratorFunc) Generate Uses

func (f GeneratorFunc) Generate(out chan<- interface{})

Generate calls f(out)

type Job Uses

type Job struct {
    ID      int
    Payload interface{}
}

Job is a representation of a batch job

type Pipe Uses

type Pipe interface {
    // Process receives data from an input channel, processes it,
    // and outputs the results onto a channel. The output channel
    // is returned for use.
    Process(in chan interface{}) (out chan interface{})
}

Pipe represents a process pipe.

func NewPipe Uses

func NewPipe(cfg Config, p Processor) Pipe

NewPipe creates a new pipe with the given config. A Processor argument is given to process the incoming data.

type Processor Uses

type Processor interface {
    // Process receives a given input, processes it, and returns an output
    Process(in interface{}) (out interface{})
}

Processor is an interface for wrapping a process function

type ProcessorFunc Uses

type ProcessorFunc func(in interface{}) (out interface{})

ProcessorFunc is a type adapter allowing regular functions to be Processor. If f is a function with the appropriate signature, ProcessorFunc(f) is a Processor that calls f.

func (ProcessorFunc) Process Uses

func (f ProcessorFunc) Process(in interface{}) (out interface{})

Process calls f(in) and returns the result.

type Receiver Uses

type Receiver interface {
    // Receive is an input feeder
    Receive(in interface{})
}

Receiver is an interface for wrapping a receiver function

type ReceiverFunc Uses

type ReceiverFunc func(in interface{})

ReceiverFunc is a type adapter allowing regular functions to be Receiver. If f is a function with the appropriate signature, ReceiverFunc(f) is a Receiver the calls f.

func (ReceiverFunc) Receive Uses

func (f ReceiverFunc) Receive(in interface{})

Receive calls f(in).

type Sink Uses

type Sink interface {
    // Receive receives data from an input channel.
    // Once completed, done will send a finished signal.
    Receive(in chan interface{}) (done chan struct{})
}

Sink represents a data sink

func NewSink Uses

func NewSink(cfg Config, r Receiver) Sink

NewSink creates a new sink with the given config. A Receiver argument is given to handle incoming data.

type Source Uses

type Source interface {
    // Generate generates a data to an output channel.
    Generate() (out chan interface{})
}

Source represents a data source

func NewSource Uses

func NewSource(cfg Config, g Generator) Source

NewSource creates a new sink with the given config. A Generator argument is used for generating data.

Package conduit imports 2 packages (graph). Updated 2020-01-02. Refresh now. Tools for package owners.