pool

package
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2020 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package pool implements simplified, single-stage flow.

By default it runs with non-buffered channels and randomly distributed pool, i.e. incoming records send to one of the workers randomly. User creates pool.Worker, activates it by calling Go, submits input data via Submit. Go method returns Cursor allowing retrieval of results one-by-one (with cursor.Next) or reading them all with cursor.All method. Both cursor operation can be blocked as they read from the internal channel.

After all inputs submitted user should call Close to indicate the completion.

User may define ChunkFn returning key portion of the record and in this case record will be send to workers based on this key. The identical keys guaranteed to be send to the same worker. Such mode needed for stateful flows where each set of input records has to be processed sequentially and some state should be kept. Each worker gets an independent WorkerStore to keep some worker-local data.

The actual worker function WorkerFn provided by user and will be executed by pool's goroutines. The worker will get an input record dispatched by the pool and could publish the result via SenderFn.

Batch option sets size of internal buffer to minimize channel sends. Batch collects incoming records per worker and send them in as a slice.

Error handling by default terminates the pool on the first error, unless ContinueOnError requested.

Metrics can be retrieved and updated by user to keep some counters associated with any names.

Workers pool should not be reused and can be activated only once. Thread safe, no additional locking needed.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func ContinueOnError

func ContinueOnError(p *Workers)

ContinueOnError change default early termination on the first error and continue after error

func Metrics

func Metrics(ctx context.Context) *flow.Metrics

Metrics return set of metrics from the context

func WorkerID

func WorkerID(ctx context.Context) int

WorkerID returns worker ID from the context

Types

type CompleteFn

type CompleteFn func(ctx context.Context, store WorkerStore) error

CompleteFn processes input record inpRec and optionally sends response to respCh

type Cursor added in v1.3.0

type Cursor struct {
	// contains filtered or unexported fields
}

Cursor provides synchronous access to async data from pool's response channel

func (*Cursor) All added in v1.3.0

func (c *Cursor) All(ctx context.Context) (res []interface{}, err error)

All gets all data from the cursor

func (*Cursor) Err added in v1.3.0

func (c *Cursor) Err() error

Err returns error collected by Next

func (*Cursor) Next added in v1.3.0

func (c *Cursor) Next(ctx context.Context, v interface{}) bool

Next returns next result from the cursor, ok = false on completion. Any error saved internally and can be returned by Err call

type Option

type Option func(p *Workers)

Option func type

func Batch

func Batch(size int) Option

Batch sets batch size to collect incoming records in a buffer before sending to workers

func ChunkFn

func ChunkFn(chunkFn func(val interface{}) string) Option

ChunkFn functional option defines chunk func distributing records to particular workers. The function should return key string identifying the record. Record with a given key string guaranteed to be processed by the same worker.

func OnCompletion

func OnCompletion(completeFn CompleteFn) Option

OnCompletion set function called on completion for each worker id

func ResChanSize

func ResChanSize(size int) Option

ResChanSize sets size of response's channel buffer

func WorkerChanSize

func WorkerChanSize(size int) Option

WorkerChanSize sets size of worker channel(s)

type SenderFn added in v1.3.0

type SenderFn func(val interface{}) error

SenderFn func called by worker code to publish results

type WorkerFn added in v1.3.0

type WorkerFn func(ctx context.Context, inpRec interface{}, sender SenderFn, store WorkerStore) error

WorkerFn processes input record inpRec and optionally sends results to sender func

type WorkerStore

type WorkerStore interface {
	Set(key string, val interface{})
	Get(key string) (interface{}, bool)
	GetInt(key string) int
	GetFloat(key string) float64
	GetString(key string) string
	GetBool(key string) bool
	Keys() []string
	Delete(key string)
}

WorkerStore defines interface for per-worker storage

func NewLocalStore

func NewLocalStore() WorkerStore

NewLocalStore makes map-based worker store

type Workers

type Workers struct {
	// contains filtered or unexported fields
}

Workers is a simple case of flow with a single stage only.

Example (Basic)

illustrates basic use of workers pool

workerFn := func(ctx context.Context, inpRec interface{}, sender SenderFn, store WorkerStore) error {
	v, ok := inpRec.(string)
	if !ok {
		return errors.New("incorrect input type")
	}
	// do something with v
	res := strings.ToUpper(v)

	// send response
	return sender(res)
}

p := New(8, workerFn) // create workers pool
cursor, err := p.Go(context.Background())
if err != nil {
	panic(err)
}

// send some records in
go func() {
	p.Submit("rec1")
	p.Submit("rec2")
	p.Submit("rec3")
	p.Close() // all records sent
}()

// consume results
recs, err := cursor.All(context.TODO())
log.Printf("%+v, %v", recs, err)
Output:

Example (WithOptions)

illustrates use of workers pool with all options

workerFn := func(ctx context.Context, inpRec interface{}, sender SenderFn, store WorkerStore) error {
	v, ok := inpRec.(string)
	if !ok {
		return errors.New("incorrect input type")
	}
	// do something with v
	res := strings.ToUpper(v)

	// update metrics
	m := Metrics(ctx)
	m.Inc("count")

	// send response
	return sender(res)
}

// create workers pool with chunks and batch mode. ChunkFn used to detect worker and guaranteed to send same chunk
// to the same worker. This is important for stateful workers. Batch sets the size of internal buffer collecting records
// internally before sending them to worker.
p := New(8, workerFn, Batch(10), ResChanSize(5), WorkerChanSize(2), ChunkFn(func(val interface{}) string {
	v := val.(string)
	return v[:4] // chunks by 4chars prefix
}))

cursor, err := p.Go(context.Background())
if err != nil {
	panic(err)
}
// send some records in
go func() {
	p.Submit("rec1")
	p.Submit("rec2")
	p.Submit("rec3")
	p.Close() // all records sent
}()

// consume results in streaming mode
var v string
for cursor.Next(context.TODO(), &v) {
	log.Printf("%s", v)
}

// show metrics
log.Printf("metrics: %s", p.Metrics())
Output:

func New

func New(poolSize int, workerFn WorkerFn, options ...Option) *Workers

New creates worker pool, can be activated once

func (*Workers) Close

func (p *Workers) Close()

Close pool. Has to be called by consumer as the indication of "all records submitted". after this call poll can't be reused.

func (*Workers) Go

func (p *Workers) Go(ctx context.Context) (Cursor, error)

Go activates worker pool, closes result chan on completion

func (*Workers) Metrics

func (p *Workers) Metrics() *flow.Metrics

Metrics returns all user-defined counters from context.

func (*Workers) Submit

func (p *Workers) Submit(v interface{})

Submit record to pool, can be blocked

func (*Workers) Wait

func (p *Workers) Wait(ctx context.Context) (err error)

Wait till workers completed and result channel closed. This can be used instead of the cursor in case if the result channel can be ignored and the goal is to wait for the completion.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL