routines

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 18, 2021 License: MIT Imports: 7 Imported by: 0

README

Routines

CI Actions Go Report Card GitHub release GitHub release date LICENSE GoDoc codecov

Routines was a fixed number thread pool to process the user task, and it would respawn a corresponding new thread when panic. It supports the sync process mode that would wait for the response, as well as the sync mode, which would not wait for the response. The most commonly used scenario was to control the max concurrency when processing jobs.

Example 1: Process Tasks

For the full example, see: examples/counter

type incrProcessor struct{}

func (p *incrProcessor) Process(ctx context.Context, i interface{}) (interface{}, error) {
   v := i.(*atomic.Int32)
   v.Inc()
   return v, nil
}

func main() {
   // the pool would spawn 3 routines to process jobs and respawn
   // a new one when panic
   pool, err := routines.NewPool(3, &incrProcessor{})
   if err != nil {
      panic(err)
   }
   // Start the routine pool
   _ = pool.Start()

   // Send the counter task to process in other routines
   var wg sync.WaitGroup
   v := atomic.NewInt32(0)
   wg.Add(10)
   for i := 0; i < 10; i++ {
      go func() {
         defer wg.Done()
         for j := 0; j < 10; j++ {
            _, _ = pool.Process(context.Background(), v)
         }
      }()
   }
   wg.Wait()

   fmt.Println("Total: ", v.Load())
   // Stop the routine pool and wait for all routines exited
   _ = pool.Stop()
   pool.Wait()
}

Example 1: Async Process Tasks

For the full example, see: examples/counter

type incrProcessor struct{}

func (p *incrProcessor) Process(ctx context.Context, i interface{}) (interface{}, error) {
   v := i.(*atomic.Int32)
   v.Inc()
   return v.Load(), nil
}

func main() {
   // the pool would spawn 3 routines to process jobs and respawn
   // a new one when panic
   pool, err := routines.NewPool(3, &incrProcessor{})
   if err != nil {
      panic(err)
   }
   // Start the routine pool
   _ = pool.Start()

   // Send the counter task to process in other routines
   futures := make([]*routines.Future, 10)
   v := atomic.NewInt32(0)
   for i := 0; i < 10; i++ {
      futures[i], _ = pool.AsyncProcess(context.Background(), v)
   }
   for _, future := range futures {
      val, err := future.Get(context.Background())
      fmt.Println(val, err)
   }

   fmt.Println("Total: ", v.Load())
   // Stop the routine pool and wait for all routines exited
   _ = pool.Stop()
   pool.Wait()
}

License

Routines is under the MIT license. See the LICENSE file for details.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Future

type Future struct {
	// contains filtered or unexported fields
}

func (*Future) Get

func (future *Future) Get(ctx context.Context) (interface{}, error)

Get was used to fetch the response from future, it would be blocked until the response was ready

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

func NewPool

func NewPool(numWorkers int, proc processor) (*Pool, error)

NewPool would create a routine pool

func (*Pool) AsyncProcess

func (p *Pool) AsyncProcess(ctx context.Context, i interface{}) (*Future, error)

AsyncProcess behaves like the Process except that don't wait for the response, and the user can use the future to fetch the response later.

func (*Pool) Process

func (p *Pool) Process(ctx context.Context, i interface{}) (interface{}, error)

Process would pass the task to worker routines

func (*Pool) SetPanicCallback

func (p *Pool) SetPanicCallback(fn callbackFn)

func (*Pool) Start

func (p *Pool) Start() error

Start the routine pool and spawn worker routines

func (*Pool) Stop

func (p *Pool) Stop() error

Stop would stop the pool and worker routines

func (*Pool) Wait

func (p *Pool) Wait()

Wait would block and wait for routines exited

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL