cogroup

package module
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2023 License: GPL-3.0 Imports: 5 Imported by: 0

README

cogroup

Golang coroutine group

Build Status Go Report Card GoDoc

Package cogroup provides an elegant goroutine group with context controls. It's designed to meet the following requirements.

  • Tasks can be executed without order
  • Group wait command will close the write access to the task queue
  • Upstream context can cancel/stop the execution of the tasks
  • When the context is canceled, the tasks in queue will be no longer consumed
  • Only spawn specified number of goroutines to consume the task queue
  • Panic recover for a single task execution
  • Custom worker with upstream context and worker id provided.
  • Wait will block until tasks are finished or canceled, and return with the queue length
Usage

Start a group and wait till all the tasks are finished.

import  (
  "context"
  "time"

  "github.com/devfans/cogroup"
)

func main() {
  f := func(context.Context) error {
    <-time.After(time.Second)
    return nil
  }

  g := cogroup.Start(context.Background(), 2, 10, false)
  for i := 0; i < 10; i++ {
    g.Add(f)
  }
  g.Wait()
}

Start a group and cancel it later.

import  (
  "context"
  "time"

  "github.com/devfans/cogroup"
)

func main() {
  f := func(ctx context.Context) error {
    <-time.After(time.Second)
    workerID := cogroup.GetWorkerID(ctx)
    println(workerID, " did one task")
    return nil
  }

  ctx, cancel := context.WithCancel(context.Background())
  g := cogroup.Start(ctx, 2, 10, false)
  go func() {
    <-time.After(1 * time.Second)
    cancel()
  }()

  for i := 0; i < 100; i++ {
    g.Add(f)
  }
  println("Tasks left:", g.Wait())
}

Start a group with custom worker

import  (
  "context"
  "time"

  "github.com/devfans/cogroup"
)

func main() {
  f := func(context.Context) error {
    <-time.After(time.Second)
    return nil
  }

  g := cogroup.New(context.Background(), 2, 10, false)
  g.StartWithWorker(func(ctx context.Context, i int, f func(context.Context) error {
    println("Worker is running with id", i)
    f(ctx)
  }))
  for i := 0; i < 10; i++ {
    g.Add(f)
  }
  g.Wait()
}


Misc

Blog:https://blog.devfans.io/create-a-golang-coroutine-group/

Documentation

Overview

Package cogroup provides a elegant goroutine group with context controls. It's designed to meet the following requirements.

- Tasks can be executed without order

- Group `wait` command will close the write acces to the task queue

- Upstream context can cancel the task queue

- When the context is canceled, the tasks in queue will be no longer consumed

- Panic recover for a single task execution

- Only spawn specified number of goroutines to consume the task

- `Wait` will block until tasks are finished or canceled, and return with the queue length

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetWorkerID added in v0.1.6

func GetWorkerID(ctx context.Context) int

GetWorkerID Get worker id from the context

Types

type CoGroup

type CoGroup struct {
	// contains filtered or unexported fields
}

CoGroup Coroutine group struct holds the group state: the task queue, context and signals.

func New added in v1.0.1

func New(ctx context.Context, n uint, m uint, sink bool) *CoGroup

New will create a cogroup instance without starting the group

func Start

func Start(ctx context.Context, n uint, m uint, sink bool) *CoGroup

Start will initialize a cogroup and start the group goroutines.

Parameter `n` specifies the number the goroutine to start as workers to consume the task queue.

Parameter `m` specifies the size of the task queue buffer, if the buffer is full, the `Insert` method will block till there's more room or a cancel signal was received.

Parameter `sink` specifies whether to pass the group context to the task.

Example
package main

import (
	"context"
	"time"

	"github.com/devfans/cogroup"
)

func main() {
	f := func(context.Context) error {
		<-time.After(time.Second)
		return nil
	}

	g := cogroup.Start(context.Background(), 2, 10, false)
	for i := 0; i < 10; i++ {
		g.Add(f)
	}
	g.Wait()
}
Output:

Example (StartWithCancelContext)
package main

import (
	"context"
	"time"

	"github.com/devfans/cogroup"
)

func main() {
	f := func(ctx context.Context) error {
		<-time.After(time.Second)
		workerID := cogroup.GetWorkerID(ctx)
		println(workerID, " did one task")
		return nil
	}

	ctx, cancel := context.WithCancel(context.Background())
	g := cogroup.Start(ctx, 2, 10, false)
	go func() {
		<-time.After(1 * time.Second)
		cancel()
	}()

	for i := 0; i < 100; i++ {
		g.Add(f)
	}
	println("Tasks left:", g.Wait())
}
Output:

func (*CoGroup) Add

func (g *CoGroup) Add(f func(context.Context) error)

Add a task into the task queue without blocking.

func (*CoGroup) GetWorkers added in v0.1.4

func (g *CoGroup) GetWorkers() int

GetWorkers Get the number of total group workers

func (*CoGroup) Insert

func (g *CoGroup) Insert(f func(context.Context) error) (success bool)

Insert a task into the task queue with blocking if the task queue buffer is full. If the group context was canceled already, it will abort with a false return.

func (*CoGroup) Reset

func (g *CoGroup) Reset()

Reset the cogroup, it will call the group `Wait` first before do a internal reset.

func (*CoGroup) Size added in v0.1.2

func (g *CoGroup) Size() int

Size return the current length the task queue

func (*CoGroup) StartWithWorker added in v1.0.1

func (g *CoGroup) StartWithWorker(worker Worker)

StartWithWorker will register customized worker and start the group goroutines

If worker is `nil`, the default plain worker will be used.

func (*CoGroup) TryInsert added in v1.1.2

func (g *CoGroup) TryInsert(f func(context.Context) error) (success bool)

TryInsert without blocking will return false when the task queue is full or closed, or the context was canceled already.

func (*CoGroup) Wait

func (g *CoGroup) Wait() int

Wait till the tasks in queue are all finished, or the group was canceled by the context.

type Worker added in v1.0.1

type Worker func(ctx context.Context, i int, f func(context.Context) error)

Worker CoGroup Worker factory

`ctx` is provided from the cogroup

`i` indicates the worker id

`f` is job to consume

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL