Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AtMost ¶
AtMost can be used to execute a function at most once in the specified duration. It's thread-safe to call this from multiple go-routines.
AtMost tries to keep the critical region as small as possible. As a result, if the specified function takes too long to run compared to the duration, another copy of f may be started while the first one is running.
type HashFunc ¶
type HashFunc func(interface{}) uint32
HashFunc is the signature of a method to use as a hash function for WorkerPool.
type LimitedWaitGroup ¶
type LimitedWaitGroup struct {
// contains filtered or unexported fields
}
LimitedWaitGroup is similar to sync.WaitGroup, but allows for a limit for the number of concurrently running routines. Essentially it provides an interface that combines Semaphore and WaitGroup semantics.
Its usage is similar, but the Add method now blocks. The number of routines executing is controlled by the Add method.
Example:
wg := NewLimitedWaitGroup(100) for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() // your code }() } wg.Wait()
func NewLimitedWaitGroup ¶
func NewLimitedWaitGroup(limit int) *LimitedWaitGroup
func (*LimitedWaitGroup) Add ¶
func (wg *LimitedWaitGroup) Add(n int64)
Add uses a resource. It blocks if there are no resources are available. Resources can be released by calls to Done().
func (*LimitedWaitGroup) Done ¶
func (wg *LimitedWaitGroup) Done()
Done is similar to sync.WaitGroup.Done. It releases a resource back to the pool.
func (*LimitedWaitGroup) Wait ¶
func (wg *LimitedWaitGroup) Wait()
Wait is similar to sync.WaitGroup.Wait. It waits till all acquired resources have been released.
type WorkerFunc ¶
WorkerFunc is the signature of a method to use for WorkerPool.
type WorkerPool ¶
type WorkerPool struct { // Set these fields before calling Run. Ctx context.Context Cancel context.CancelFunc NumWorkers int BufferSize int // "Override" these function pointers. Hash HashFunc Worker WorkerFunc // contains filtered or unexported fields }
WorkerPool manages a pool of workers where incoming posted items are optionally sharded and sent to the appropriate worker or the next available worker. The sharding is useful when, for example, one needs to have a pool of workers, but have messages from the same id (used for hashing) go to the same worker each time.
See tests for sample uses.
func (*WorkerPool) Post ¶
func (p *WorkerPool) Post(datum interface{}) error
Post routes the given datum to the appropriate shard. If the channel buffer for this shard is full, this method blocks. In the case of an ordinary worker pool, it's posted on a common channel, which is shared between all workers.
If the running context has been cancelled (or timed out), it returns the corresponding error and does not post to the channels.
func (*WorkerPool) Run ¶
func (p *WorkerPool) Run()
Run initializes and starts a new worker pool using the set worker function with the specified number of workers/shards and buffer size. It also takes a context with the context's cancel function.
If a worker finishes with a non-nil error, the context, which is the same as the passed-in context and is shared with all the workers, is cancelled. Any workers checking for the context cancellation can then choose to gracefully finish whatever they are doing. This mechanism can be used if a single worker wants to request a graceful shutdown for the whole system including other worker pools.
If the set hash function is nil, an ordinary worker pool is created; if it is not nil, a sharded worker pool with one shard for each worker is created.
Run does not return till all the workers have finished.