prosumer: Index | Files

package prosumer

import ""


Package Files

prosumer.go queue.go types.go


var (
    ErrDiscard       = errors.New("discard current element")
    ErrDiscardOldest = errors.New("discard oldest element")

type Callback Uses

type Callback func([]Element, error)

type Config Uses

type Config struct {
    // contains filtered or unexported fields

Config defines params for Coordinator

func NewConfig Uses

func NewConfig(c Consumer, opts ...Option) Config

NewConfig returns a config with well-defined defaults Warn: default rejectPolicy is Block.

type Consumer Uses

type Consumer func(lst []Element) error

type Coordinator Uses

type Coordinator struct {
    // contains filtered or unexported fields

Coordinator implements a producer-consumer workflow. Put() add new elements into inner buffer queue, and will be processed by Consumer.

func NewCoordinator Uses

func NewCoordinator(config Config) Coordinator

func (Coordinator) Close Uses

func (c Coordinator) Close(graceful bool) error

Close closes the Coordinator, no more element can be put any more. It can be graceful, which means: 1. blocking 2. all remaining elements in buffer queue will make sure to be consumed.

func (Coordinator) Put Uses

func (c Coordinator) Put(ctx context.Context, e Element) ([]Element, error)

Put new element into inner buffer queue. It return error when inner buffer queue is full, and elements failed putting to queue is the first return value. Due to different rejectPolicy, multiple elements may be discarded before current element put successfully. Common usages pattern:

 discarded, err := c.Put(e)
 if err != nil {
	  fmt.Errorf("discarded elements %+v for err %v", discarded, err)

func (Coordinator) RemainingCapacity Uses

func (c Coordinator) RemainingCapacity() int

RemainingCapacity return how many elements inner buffer queue can hold.

func (Coordinator) Start Uses

func (c Coordinator) Start()

Start workers to consume elements from queue.

type Element Uses

type Element interface{}

Consumer process elements from queue

type Option Uses

type Option func(*Config)

Option constructs a Config

func SetBatchInterval Uses

func SetBatchInterval(interval time.Duration) Option

func SetBatchSize Uses

func SetBatchSize(batchSize int) Option

func SetBufferSize Uses

func SetBufferSize(bufferSize int) Option

SetBufferSize defines inner buffer queue's size

func SetCallback Uses

func SetCallback(cb Callback) Option

SetCallback defines callback invoked with elements and err returned from consumer

func SetNumConsumer Uses

func SetNumConsumer(numConsumer int) Option

func SetRejectPolicy Uses

func SetRejectPolicy(rp RejectPolicy) Option

SetRejectPolicy defines which elements get discarded when the queue is full

type RejectPolicy Uses

type RejectPolicy int

rejectPolicy control which elements get discarded when the queue is full

const (
    // Block current goroutine, no elements discarded
    Block RejectPolicy = iota
    // Discard current element
    // DiscardOldest remove the oldest to make room for new element

Package prosumer imports 5 packages (graph). Updated 2019-11-15. Refresh now. Tools for package owners.