parallelizer

package module
v0.0.0-...-a6776fb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 17, 2022 License: MIT Imports: 3 Imported by: 7

README

parallelizer Build Status Go Report Card GoDoc

Simplifies creating a pool of workers that execute jobs in parallel

Features

  • Easy to use
  • Context Support
  • Fail fast with errors
  • Customizable Pool Size
    • Default number of workers is 10
  • Customizable Job Queue Size
    • Default size is 100

Examples

Example 1

Running multiple function calls in parallel without a timeout.

package main

import (
	"fmt"
	"github.com/shomali11/parallelizer"
)

func main() {
	group := parallelizer.NewGroup()
	defer group.Close()

	group.Add(func() error {
		for char := 'a'; char < 'a'+3; char++ {
			fmt.Printf("%c ", char)
		}
		return nil
	})

	group.Add(func() error {
		for number := 1; number < 4; number++ {
			fmt.Printf("%d ", number)
		}
		return nil
	})

	err := group.Wait()

	fmt.Println()
	fmt.Println("Done")
	fmt.Printf("Error: %v", err)
}

Output:

a 1 b 2 c 3 
Done
Error: <nil>

Example 2

Running multiple slow function calls in parallel with a context with a short timeout. Note: The timeout will not kill the routines. It will just stop waiting for them to finish

package main

import (
	"context"
	"fmt"
	"github.com/shomali11/parallelizer"
	"time"
)

func main() {
	group := parallelizer.NewGroup()
	defer group.Close()

	group.Add(func() error {
		time.Sleep(2 * time.Second)

		fmt.Println("Finished work 1")

		return nil
	})

	group.Add(func() error {
		time.Sleep(2 * time.Second)

		fmt.Println("Finished work 2")

		return nil
	})

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	err := group.Wait(parallelizer.WithContext(ctx))

	fmt.Println("Done")
	fmt.Printf("Error: %v", err)
	fmt.Println()

	time.Sleep(2 * time.Second)
}

Output:

Done
Error: context deadline exceeded
Finished work 2
Finished work 1

Example 3

Running multiple function calls in parallel with a large enough worker pool.

package main

import (
	"fmt"
	"github.com/shomali11/parallelizer"
)

func main() {
	group := parallelizer.NewGroup(parallelizer.WithPoolSize(10))
	defer group.Close()

	for i := 1; i <= 10; i++ {
		i := i
		group.Add(func() error {
			fmt.Print(i, " ")
			return nil
		})
	}

	err := group.Wait()

	fmt.Println()
	fmt.Println("Done")
	fmt.Printf("Error: %v", err)
}

Output:

7 6 3 2 8 9 5 10 1 4  
Done
Error: <nil>

Example 4

Running multiple function calls with 1 worker. Note: the functions are no longer executed in parallel but sequentially

package main

import (
	"fmt"
	"github.com/shomali11/parallelizer"
)

func main() {
	group := parallelizer.NewGroup(parallelizer.WithPoolSize(1))
	defer group.Close()

	for i := 1; i <= 10; i++ {
		i := i
		group.Add(func() error {
			fmt.Print(i, " ")
			return nil
		})
	}

	err := group.Wait()

	fmt.Println()
	fmt.Println("Done")
	fmt.Printf("Error: %v", err)
}

Output:

1 2 3 4 5 6 7 8 9 10 
Done
Error: <nil>

Example 5

Running multiple function calls in parallel with a small worker pool and job queue size. Note: the Add call blocks until there is space to push into the Job Queue

package main

import (
	"fmt"
	"github.com/shomali11/parallelizer"
	"time"
)

func main() {
	group := parallelizer.NewGroup(parallelizer.WithPoolSize(1), parallelizer.WithJobQueueSize(1))
	defer group.Close()

	for i := 1; i <= 10; i++ {
		group.Add(func() error {
			time.Sleep(time.Second)
			return nil
		})

		fmt.Println("Job added at", time.Now().Format("04:05"))
	}

	err := group.Wait()

	fmt.Println()
	fmt.Println("Done")
	fmt.Printf("Error: %v", err)
}

Output:

Job added at 00:12
Job added at 00:13
Job added at 00:14
Job added at 00:15
Job added at 00:16
Job added at 00:17
Job added at 00:18
Job added at 00:19
Job added at 00:20
Job added at 00:21

Done
Error: <nil>

Example 6

Running multiple function calls in parallel with a large enough worker pool and job queue size. Note: In here the Add calls did not block because there was plenty of space in the Job Queue

package main

import (
	"fmt"
	"github.com/shomali11/parallelizer"
	"time"
)

func main() {
	group := parallelizer.NewGroup(parallelizer.WithPoolSize(10), parallelizer.WithJobQueueSize(10))
	defer group.Close()

	for i := 1; i <= 10; i++ {
		group.Add(func() error {
			time.Sleep(time.Second)
			return nil
		})

		fmt.Println("Job added at", time.Now().Format("04:05"))
	}

	err := group.Wait()

	fmt.Println()
	fmt.Println("Done")
	fmt.Printf("Error: %v", err)
}

Output:

Job added at 00:30
Job added at 00:30
Job added at 00:30
Job added at 00:30
Job added at 00:30
Job added at 00:30
Job added at 00:30
Job added at 00:30
Job added at 00:30
Job added at 00:30

Done
Error: <nil>

Example 7

Showing an example without calling Wait

package main

import (
	"fmt"
	"github.com/shomali11/parallelizer"
	"time"
)

func main() {
	group := parallelizer.NewGroup()
	defer group.Close()

	group.Add(func() error {
		fmt.Println("Finished work")
		return nil
	})

	fmt.Println("We did not wait!")

	time.Sleep(time.Second)
}

Output:

We did not wait!
Finished work

Example 8

Showing an example with a mixture of Add and Wait calls.

package main

import (
	"fmt"
	"github.com/shomali11/parallelizer"
)

func main() {
	group := parallelizer.NewGroup()
	defer group.Close()

	group.Add(func() error {
		fmt.Println("Worker 1")
		return nil
	})

	group.Add(func() error {
		fmt.Println("Worker 2")
		return nil
	})

	fmt.Println("Waiting for workers 1 and 2 to finish")

	group.Wait()

	fmt.Println("Workers 1 and 2 have finished")

	group.Add(func() error {
		fmt.Println("Worker 3")
		return nil
	})

	fmt.Println("Waiting for worker 3 to finish")

	group.Wait()

	fmt.Println("Worker 3 has finished")
}

Output:

Waiting for workers 1 and 2 to finish
Worker 1
Worker 2
Workers 1 and 2 have finished
Waiting for worker 3 to finish
Worker 3
Worker 3 has finished

Example 9

Showing an example with a failed task.

package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/shomali11/parallelizer"
)

func main() {
	group := parallelizer.NewGroup()
	defer group.Close()

	group.Add(func() error {
		return errors.New("something went wrong")
	})

	group.Add(func() error {
		time.Sleep(10 * time.Second)
		return nil
	})

	err := group.Wait()

	fmt.Println()
	fmt.Println("Done")
	fmt.Printf("Error: %v", err)
}

Output:

Done
Error: something went wrong

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Group

type Group struct {
	// contains filtered or unexported fields
}

Group a group of workers executing functions concurrently

func NewGroup

func NewGroup(options ...GroupOption) *Group

NewGroup create a new group of workers

func (*Group) Add

func (g *Group) Add(function func() error) error

Add adds function to queue of jobs to execute

func (*Group) Close

func (g *Group) Close()

Close closes resources

func (*Group) Wait

func (g *Group) Wait(options ...WaitOption) error

Wait waits until workers finished the jobs in the queue

type GroupOption

type GroupOption func(*GroupOptions)

GroupOption an option for Groups

func WithJobQueueSize

func WithJobQueueSize(size int) GroupOption

WithJobQueueSize sets job size

func WithPoolSize

func WithPoolSize(size int) GroupOption

WithPoolSize sets pool size

type GroupOptions

type GroupOptions struct {
	PoolSize     int
	JobQueueSize int
}

GroupOptions configuration for the Group

type WaitOption

type WaitOption func(*WaitOptions)

WaitOption an option for Waiting

func WithContext

func WithContext(ctx context.Context) WaitOption

WithContext sets context

type WaitOptions

type WaitOptions struct {
	Context context.Context
}

WaitOptions configuration for the Wait

Directories

Path Synopsis
examples
1
2
3
4
5
6
7
8
9

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL