Documentation ¶
Overview ¶
Package pool implements simplified, single-stage flow.
By default it runs with non-buffered channels and randomly distributed pool, i.e. incoming records send to one of the workers randomly. User creates pool.Worker, activates it by calling Go, submits input data via Submit. Go method returns Cursor allowing retrieval of results one-by-one (with cursor.Next) or reading them all with cursor.All method. Both cursor operation can be blocked as they read from the internal channel.
After all inputs submitted user should call Close to indicate the completion.
User may define ChunkFn returning key portion of the record and in this case record will be send to workers based on this key. The identical keys guaranteed to be send to the same worker. Such mode needed for stateful flows where each set of input records has to be processed sequentially and some state should be kept. Each worker gets an independent WorkerStore to keep some worker-local data.
The actual worker function WorkerFn provided by user and will be executed by pool's goroutines. The worker will get an input record dispatched by the pool and could publish the result via SenderFn.
Batch option sets size of internal buffer to minimize channel sends. Batch collects incoming records per worker and send them in as a slice.
Error handling by default terminates the pool on the first error, unless ContinueOnError requested.
Metrics can be retrieved and updated by user to keep some counters associated with any names.
Workers pool should not be reused and can be activated only once. Thread safe, no additional locking needed.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ContinueOnError ¶
func ContinueOnError(p *Workers)
ContinueOnError change default early termination on the first error and continue after error
Types ¶
type CompleteFn ¶
type CompleteFn func(ctx context.Context, store WorkerStore) error
CompleteFn processes input record inpRec and optionally sends response to respCh
type Cursor ¶ added in v1.3.0
type Cursor struct {
// contains filtered or unexported fields
}
Cursor provides synchronous access to async data from pool's response channel
type Option ¶
type Option func(p *Workers)
Option func type
func Batch ¶
Batch sets batch size to collect incoming records in a buffer before sending to workers
func ChunkFn ¶
ChunkFn functional option defines chunk func distributing records to particular workers. The function should return key string identifying the record. Record with a given key string guaranteed to be processed by the same worker.
func OnCompletion ¶
func OnCompletion(completeFn CompleteFn) Option
OnCompletion set function called on completion for each worker id
func ResChanSize ¶
ResChanSize sets size of response's channel buffer
func WorkerChanSize ¶
WorkerChanSize sets size of worker channel(s)
type SenderFn ¶ added in v1.3.0
type SenderFn func(val interface{}) error
SenderFn func called by worker code to publish results
type WorkerFn ¶ added in v1.3.0
type WorkerFn func(ctx context.Context, inpRec interface{}, sender SenderFn, store WorkerStore) error
WorkerFn processes input record inpRec and optionally sends results to sender func
type WorkerStore ¶
type WorkerStore interface { Set(key string, val interface{}) Get(key string) (interface{}, bool) GetInt(key string) int GetFloat(key string) float64 GetString(key string) string GetBool(key string) bool Keys() []string Delete(key string) }
WorkerStore defines interface for per-worker storage
type Workers ¶
type Workers struct {
// contains filtered or unexported fields
}
Workers is a simple case of flow with a single stage only.
Example (Basic) ¶
illustrates basic use of workers pool
workerFn := func(ctx context.Context, inpRec interface{}, sender SenderFn, store WorkerStore) error { v, ok := inpRec.(string) if !ok { return errors.New("incorrect input type") } // do something with v res := strings.ToUpper(v) // send response return sender(res) } p := New(8, workerFn) // create workers pool cursor, err := p.Go(context.Background()) if err != nil { panic(err) } // send some records in go func() { p.Submit("rec1") p.Submit("rec2") p.Submit("rec3") p.Close() // all records sent }() // consume results recs, err := cursor.All(context.TODO()) log.Printf("%+v, %v", recs, err)
Output:
Example (WithOptions) ¶
illustrates use of workers pool with all options
workerFn := func(ctx context.Context, inpRec interface{}, sender SenderFn, store WorkerStore) error { v, ok := inpRec.(string) if !ok { return errors.New("incorrect input type") } // do something with v res := strings.ToUpper(v) // update metrics m := Metrics(ctx) m.Inc("count") // send response return sender(res) } // create workers pool with chunks and batch mode. ChunkFn used to detect worker and guaranteed to send same chunk // to the same worker. This is important for stateful workers. Batch sets the size of internal buffer collecting records // internally before sending them to worker. p := New(8, workerFn, Batch(10), ResChanSize(5), WorkerChanSize(2), ChunkFn(func(val interface{}) string { v := val.(string) return v[:4] // chunks by 4chars prefix })) cursor, err := p.Go(context.Background()) if err != nil { panic(err) } // send some records in go func() { p.Submit("rec1") p.Submit("rec2") p.Submit("rec3") p.Close() // all records sent }() // consume results in streaming mode var v string for cursor.Next(context.TODO(), &v) { log.Printf("%s", v) } // show metrics log.Printf("metrics: %s", p.Metrics())
Output:
func (*Workers) Close ¶
func (p *Workers) Close()
Close pool. Has to be called by consumer as the indication of "all records submitted". after this call poll can't be reused.