channels.v1: gopkg.in/eapache/channels.v1 Index | Examples | Files

package channels

import "gopkg.in/eapache/channels.v1"

Package channels provides a collection of helper functions, interfaces and implementations for working with and extending the capabilities of golang's existing channels. The main interface of interest is Channel, though sub-interfaces are also provided for cases where the full Channel interface cannot be met (for example, InChannel for write-only channels).

For integration with native typed golang channels, functions Wrap and Unwrap are provided which do the appropriate type conversions. The NativeChannel, NativeInChannel and NativeOutChannel type definitions are also provided for use with native channels which already carry values of type interface{}.

The heart of the package consists of several distinct implementations of the Channel interface, including channels backed by special buffers (resizable, infinite, ring buffers, etc) and other useful types. A "black hole" channel for discarding unwanted values (similar in purpose to ioutil.Discard or /dev/null) rounds out the set.

Helper functions for operating on Channels include Pipe and Tee (which behave much like their Unix namesakes), as well as Multiplex and Distribute. "Weak" versions of these functions also exist, which do not close their output channel(s) on completion.

Due to limitations of Go's type system, importing this library directly is often not practical for production code. It serves equally well, however, as a reference guide and template for implementing many common idioms; if you use it in this way I would appreciate the inclusion of some sort of credit in the resulting code.

Warning: several types in this package provide so-called "infinite" buffers. Be *very* careful using these, as no buffer is truly infinite - if such a buffer grows too large your program will run out of memory and crash. Caveat emptor.

Index

Examples

Package Files

batching_channel.go black_hole.go channels.go infinite_channel.go native_channel.go overflowing_channel.go resizable_channel.go ring_channel.go shared_buffer.go

func Distribute Uses

func Distribute(input SimpleOutChannel, outputs ...SimpleInChannel)

Distribute takes a single input channel and an arbitrary number of output channels and duplicates each input into *one* available output. If multiple outputs are waiting for a value, one is chosen at random. When the input channel is closed, all outputs channels are closed. Distribute with a single output channel is equivalent to Pipe (though slightly less efficient).

func Multiplex Uses

func Multiplex(output SimpleInChannel, inputs ...SimpleOutChannel)

Multiplex takes an arbitrary number of input channels and multiplexes their output into a single output channel. When all input channels have been closed, the output channel is closed. Multiplex with a single input channel is equivalent to Pipe (though slightly less efficient).

func Pipe Uses

func Pipe(input SimpleOutChannel, output SimpleInChannel)

Pipe connects the input channel to the output channel so that they behave as if a single channel.

func Tee Uses

func Tee(input SimpleOutChannel, outputs ...SimpleInChannel)

Tee (like its Unix namesake) takes a single input channel and an arbitrary number of output channels and duplicates each input into every output. When the input channel is closed, all outputs channels are closed. Tee with a single output channel is equivalent to Pipe (though slightly less efficient).

func Unwrap Uses

func Unwrap(input SimpleOutChannel, output interface{})

Unwrap takes a SimpleOutChannel and uses reflection to pipe it to a typed native channel for easy integration with existing channel sources. Output can be any writable channel type (chan or chan<-). It panics if the output is not a writable channel, or if a value is received that cannot be sent on the output channel.

func WeakDistribute Uses

func WeakDistribute(input SimpleOutChannel, outputs ...SimpleInChannel)

WeakDistribute behaves like Distribute (distributing a single input amongst multiple outputs) except that it does not close the output channels when the input channel is closed.

func WeakMultiplex Uses

func WeakMultiplex(output SimpleInChannel, inputs ...SimpleOutChannel)

WeakMultiplex behaves like Multiplex (multiplexing multiple inputs into a single output) except that it does not close the output channel when the input channels are closed.

func WeakPipe Uses

func WeakPipe(input SimpleOutChannel, output SimpleInChannel)

WeakPipe behaves like Pipe (connecting the two channels) except that it does not close the output channel when the input channel is closed.

func WeakTee Uses

func WeakTee(input SimpleOutChannel, outputs ...SimpleInChannel)

WeakTee behaves like Tee (duplicating a single input into multiple outputs) except that it does not close the output channels when the input channel is closed.

type BatchingChannel Uses

type BatchingChannel struct {
    // contains filtered or unexported fields
}

BatchingChannel implements the Channel interface, with the change that instead of producing individual elements on Out(), it batches together the entire internal buffer each time. Trying to construct an unbuffered batching channel will panic, that configuration is not supported (and provides no benefit over an unbuffered NativeChannel).

func NewBatchingChannel Uses

func NewBatchingChannel(size BufferCap) *BatchingChannel

func (*BatchingChannel) Cap Uses

func (ch *BatchingChannel) Cap() BufferCap

func (*BatchingChannel) Close Uses

func (ch *BatchingChannel) Close()

func (*BatchingChannel) In Uses

func (ch *BatchingChannel) In() chan<- interface{}

func (*BatchingChannel) Len Uses

func (ch *BatchingChannel) Len() int

func (*BatchingChannel) Out Uses

func (ch *BatchingChannel) Out() <-chan interface{}

Out returns a <-chan interface{} in order that BatchingChannel conforms to the standard Channel interface provided by this package, however each output value is guaranteed to be of type []interface{} - a slice collecting the most recent batch of values sent on the In channel. The slice is guaranteed to not be empty or nil. In practice the net result is that you need an additional type assertion to access the underlying values.

type BlackHole Uses

type BlackHole struct {
    // contains filtered or unexported fields
}

BlackHole implements the InChannel interface and provides an analogue for the "Discard" variable in the ioutil package - it never blocks, and simply discards every value it reads. The number of items discarded in this way is counted and returned from Len.

func NewBlackHole Uses

func NewBlackHole() *BlackHole

func (*BlackHole) Cap Uses

func (ch *BlackHole) Cap() BufferCap

func (*BlackHole) Close Uses

func (ch *BlackHole) Close()

func (*BlackHole) In Uses

func (ch *BlackHole) In() chan<- interface{}

func (*BlackHole) Len Uses

func (ch *BlackHole) Len() int

type Buffer Uses

type Buffer interface {
    Len() int       // The number of elements currently buffered.
    Cap() BufferCap // The maximum number of elements that can be buffered.
}

Buffer is an interface for any channel that provides access to query the state of its buffer. Even unbuffered channels can implement this interface by simply returning 0 from Len() and None from Cap().

type BufferCap Uses

type BufferCap int

BufferCap represents the capacity of the buffer backing a channel. Valid values consist of all positive integers, as well as the special values below.

const (
    // None is the capacity for channels that have no buffer at all.
    None BufferCap = 0
    // Infinity is the capacity for channels with no limit on their buffer size.
    Infinity BufferCap = -1
)

type Channel Uses

type Channel interface {
    SimpleChannel
    Buffer
}

Channel is an interface representing a channel that is readable, writeable and implements the Buffer interface

Code:

var ch Channel

ch = NewInfiniteChannel()

for i := 0; i < 10; i++ {
    ch.In() <- nil
}

for i := 0; i < 10; i++ {
    <-ch.Out()
}

type DeadChannel Uses

type DeadChannel struct{}

DeadChannel is a placeholder implementation of the Channel interface with no buffer that is never ready for reading or writing. Closing a dead channel is a no-op. Behaves almost like NativeChannel(nil) except that closing a nil NativeChannel will panic.

func NewDeadChannel Uses

func NewDeadChannel() DeadChannel

func (DeadChannel) Cap Uses

func (ch DeadChannel) Cap() BufferCap

func (DeadChannel) Close Uses

func (ch DeadChannel) Close()

func (DeadChannel) In Uses

func (ch DeadChannel) In() chan<- interface{}

func (DeadChannel) Len Uses

func (ch DeadChannel) Len() int

func (DeadChannel) Out Uses

func (ch DeadChannel) Out() <-chan interface{}

type InChannel Uses

type InChannel interface {
    SimpleInChannel
    Buffer
}

InChannel is an interface representing a writeable channel with a buffer.

type InfiniteChannel Uses

type InfiniteChannel struct {
    // contains filtered or unexported fields
}

InfiniteChannel implements the Channel interface with an infinite buffer between the input and the output.

func NewInfiniteChannel Uses

func NewInfiniteChannel() *InfiniteChannel

func (*InfiniteChannel) Cap Uses

func (ch *InfiniteChannel) Cap() BufferCap

func (*InfiniteChannel) Close Uses

func (ch *InfiniteChannel) Close()

func (*InfiniteChannel) In Uses

func (ch *InfiniteChannel) In() chan<- interface{}

func (*InfiniteChannel) Len Uses

func (ch *InfiniteChannel) Len() int

func (*InfiniteChannel) Out Uses

func (ch *InfiniteChannel) Out() <-chan interface{}

type NativeChannel Uses

type NativeChannel chan interface{}

NativeChannel implements the Channel interface by wrapping a native go channel.

func NewNativeChannel Uses

func NewNativeChannel(size BufferCap) NativeChannel

NewNativeChannel makes a new NativeChannel with the given buffer size. Just a convenience wrapper to avoid having to cast the result of make().

func (NativeChannel) Cap Uses

func (ch NativeChannel) Cap() BufferCap

func (NativeChannel) Close Uses

func (ch NativeChannel) Close()

func (NativeChannel) In Uses

func (ch NativeChannel) In() chan<- interface{}

func (NativeChannel) Len Uses

func (ch NativeChannel) Len() int

func (NativeChannel) Out Uses

func (ch NativeChannel) Out() <-chan interface{}

type NativeInChannel Uses

type NativeInChannel chan<- interface{}

NativeInChannel implements the InChannel interface by wrapping a native go write-only channel.

func (NativeInChannel) Cap Uses

func (ch NativeInChannel) Cap() BufferCap

func (NativeInChannel) Close Uses

func (ch NativeInChannel) Close()

func (NativeInChannel) In Uses

func (ch NativeInChannel) In() chan<- interface{}

func (NativeInChannel) Len Uses

func (ch NativeInChannel) Len() int

type NativeOutChannel Uses

type NativeOutChannel <-chan interface{}

NativeOutChannel implements the OutChannel interface by wrapping a native go read-only channel.

func (NativeOutChannel) Cap Uses

func (ch NativeOutChannel) Cap() BufferCap

func (NativeOutChannel) Len Uses

func (ch NativeOutChannel) Len() int

func (NativeOutChannel) Out Uses

func (ch NativeOutChannel) Out() <-chan interface{}

type OutChannel Uses

type OutChannel interface {
    SimpleOutChannel
    Buffer
}

OutChannel is an interface representing a readable channel implementing the Buffer interface.

type OverflowingChannel Uses

type OverflowingChannel struct {
    // contains filtered or unexported fields
}

OverflowingChannel implements the Channel interface in a way that never blocks the writer. Specifically, if a value is written to an OverflowingChannel when its buffer is full (or, in an unbuffered case, when the recipient is not ready) then that value is simply discarded. Note that Go's scheduler can cause discarded values when they could be avoided, simply by scheduling the writer before the reader, so caveat emptor. For the opposite behaviour (discarding the oldest element, not the newest) see RingChannel.

func NewOverflowingChannel Uses

func NewOverflowingChannel(size BufferCap) *OverflowingChannel

func (*OverflowingChannel) Cap Uses

func (ch *OverflowingChannel) Cap() BufferCap

func (*OverflowingChannel) Close Uses

func (ch *OverflowingChannel) Close()

func (*OverflowingChannel) In Uses

func (ch *OverflowingChannel) In() chan<- interface{}

func (*OverflowingChannel) Len Uses

func (ch *OverflowingChannel) Len() int

func (*OverflowingChannel) Out Uses

func (ch *OverflowingChannel) Out() <-chan interface{}

type ResizableChannel Uses

type ResizableChannel struct {
    // contains filtered or unexported fields
}

ResizableChannel implements the Channel interface with a resizable buffer between the input and the output. The channel initially has a buffer size of 1, but can be resized by calling Resize().

Resizing to a buffer capacity of None is, unfortunately, not supported and will panic (see https://github.com/eapache/channels/issues/1). Resizing back and forth between a finite and infinite buffer is fully supported.

func NewResizableChannel Uses

func NewResizableChannel() *ResizableChannel

func (*ResizableChannel) Cap Uses

func (ch *ResizableChannel) Cap() BufferCap

func (*ResizableChannel) Close Uses

func (ch *ResizableChannel) Close()

func (*ResizableChannel) In Uses

func (ch *ResizableChannel) In() chan<- interface{}

func (*ResizableChannel) Len Uses

func (ch *ResizableChannel) Len() int

func (*ResizableChannel) Out Uses

func (ch *ResizableChannel) Out() <-chan interface{}

func (*ResizableChannel) Resize Uses

func (ch *ResizableChannel) Resize(newSize BufferCap)

type RingChannel Uses

type RingChannel struct {
    // contains filtered or unexported fields
}

RingChannel implements the Channel interface in a way that never blocks the writer. Specifically, if a value is written to a RingChannel when its buffer is full then the oldest value in the buffer is discarded to make room (just like a standard ring-buffer). Note that Go's scheduler can cause discarded values when they could be avoided, simply by scheduling the writer before the reader, so caveat emptor. For the opposite behaviour (discarding the newest element, not the oldest) see OverflowingChannel.

func NewRingChannel Uses

func NewRingChannel(size BufferCap) *RingChannel

func (*RingChannel) Cap Uses

func (ch *RingChannel) Cap() BufferCap

func (*RingChannel) Close Uses

func (ch *RingChannel) Close()

func (*RingChannel) In Uses

func (ch *RingChannel) In() chan<- interface{}

func (*RingChannel) Len Uses

func (ch *RingChannel) Len() int

func (*RingChannel) Out Uses

func (ch *RingChannel) Out() <-chan interface{}

type SharedBuffer Uses

type SharedBuffer struct {
    // contains filtered or unexported fields
}

SharedBuffer implements the Buffer interface, and permits multiple SimpleChannel instances to "share" a single buffer. Each channel spawned by NewChannel has its own internal queue (so values flowing through do not get mixed up with other channels) but the total number of elements buffered by all spawned channels is limited to a single capacity. This means *all* such channels block and unblock for writing together. The primary use case is for implementing pipeline-style parallelism with goroutines, limiting the total number of elements in the pipeline without limiting the number of elements at any particular step.

Code:

// never more than 3 elements in the pipeline at once
buf := NewSharedBuffer(3)

ch1 := buf.NewChannel()
ch2 := buf.NewChannel()

// or, instead of a straight pipe, implement your pipeline step
Pipe(ch1, ch2)

// inputs
go func() {
    for i := 0; i < 20; i++ {
        ch1.In() <- i
    }
    ch1.Close()
}()

for _ = range ch2.Out() {
    // outputs
}

buf.Close()

func NewSharedBuffer Uses

func NewSharedBuffer(size BufferCap) *SharedBuffer

func (*SharedBuffer) Cap Uses

func (buf *SharedBuffer) Cap() BufferCap

func (*SharedBuffer) Close Uses

func (buf *SharedBuffer) Close()

Close shuts down the SharedBuffer. It is an error to call Close while channels are still using the buffer (I'm not really sure what would happen if you do so).

func (*SharedBuffer) Len Uses

func (buf *SharedBuffer) Len() int

func (*SharedBuffer) NewChannel Uses

func (buf *SharedBuffer) NewChannel() SimpleChannel

NewChannel spawns and returns a new channel sharing the underlying buffer.

type SimpleChannel Uses

type SimpleChannel interface {
    SimpleInChannel
    SimpleOutChannel
}

SimpleChannel is an interface representing a channel that is both readable and writeable, but does not necessarily implement the Buffer interface.

type SimpleInChannel Uses

type SimpleInChannel interface {
    In() chan<- interface{} // The writeable end of the channel.
    Close()                 // Closes the channel. It is an error to write to In() after calling Close().
}

SimpleInChannel is an interface representing a writeable channel that does not necessarily implement the Buffer interface.

type SimpleOutChannel Uses

type SimpleOutChannel interface {
    Out() <-chan interface{} // The readable end of the channel.
}

SimpleOutChannel is an interface representing a readable channel that does not necessarily implement the Buffer interface.

func Wrap Uses

func Wrap(ch interface{}) SimpleOutChannel

Wrap takes any readable channel type (chan or <-chan but not chan<-) and exposes it as a SimpleOutChannel for easy integration with existing channel sources. It panics if the input is not a readable channel.

Package channels imports 2 packages (graph) and is imported by 5 packages. Updated 2018-02-26. Refresh now. Tools for package owners.