prosumer: Index | Files | Directories

package prosumer

import ""


Package Files

prosumer.go queue.go


var (
    ErrDiscard       = errors.New("discard point")
    ErrDiscardOldest = errors.New("discard oldest point")

type Config Uses

type Config struct {
    // BufferSize set inner buffer queue's size
    BufferSize int
    // RejectPolicy control which elements get discarded when the queue is full

    // BatchSize set how many elements Consumer can get from queue
    BatchSize int
    // BatchInterval set how long Consumer can get from queue
    BatchInterval time.Duration
    // ErrCallback is invoked when Consumer return error, with elements provided.
    ErrCallback func(ls []interface{}, err error)

    NumConsumer int

func DefaultConfig Uses

func DefaultConfig(con Consumer) Config

DefaultConfig return a minimum config for convenience, custom specific param according to your situation. Warn: default RejectPolicy is Block.

type Consumer Uses

type Consumer func(ls []interface{}) error

Consumer process elements from queue

type Coordinator Uses

type Coordinator struct {
    // contains filtered or unexported fields

Coordinator implements a producer-consumer workflow. Put() add new elements into inner buffer queue, and will be processed by Consumer.

func NewCoordinator Uses

func NewCoordinator(config Config) Coordinator

func (Coordinator) Close Uses

func (c Coordinator) Close(graceful bool) error

Close closes the Coordinator, no more element can be put any more. It can be graceful, which means: 1. blocking 2. all remaining elements in buffer queue will make sure to be consumed.

func (Coordinator) Put Uses

func (c Coordinator) Put(e interface{}) ([]interface{}, error)

Put new element into inner buffer queue. It return error when inner buffer queue is full, and elements failed putting to queue is the first return value. Due to different RejectPolicy, multiple elements may be discarded before current element put successfully. Common usages pattern:

 discarded, err := c.Put(e)
 if err != nil {
	  fmt.Errorf("discarded elements %+v for err %v", discarded, err)

func (Coordinator) RemainingCapacity Uses

func (c Coordinator) RemainingCapacity() int

RemainingCapacity return how many elements inner buffer queue can hold.

func (Coordinator) Start Uses

func (c Coordinator) Start()

Start workers to consume elements from queue.

type RejectPolicy Uses

type RejectPolicy int

RejectPolicy control which elements get discarded when the queue is full

const (
    // Block current goroutine, no element discarded
    Block RejectPolicy = iota
    // Discard current element
    // Discard the oldest to make room for new element



Package prosumer imports 4 packages (graph) and is imported by 1 packages. Updated 2019-07-26. Refresh now. Tools for package owners.