orderedconcurrently

package module
v1.0.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2021 License: Apache-2.0 Imports: 1 Imported by: 0

README

Tests codecov Go Reference Gitpod ready-to-code Go Report Card

Ordered Concurrently

A library for parallel processing with ordered output in Go. This module processes work concurrently / in parallel and returns output in a channel in the order of input. It is useful in concurrently / parallelly processing items in a queue, and get output in the order provided by the queue.

Usage

Get Module

go get github.com/tejzpr/ordered-concurrently

Import Module in your source code

import concurrently "github.com/tejzpr/ordered-concurrently" 

Create a work function

// The work that needs to be performed
func workFn(val interface{}) interface{} {
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
	return val.(int) * 2
}

Run

Example - 1
func main() {
	max := 10
	inputChan := make(chan *concurrently.OrderedInput)
	output := concurrently.Process(inputChan, workFn, &concurrently.Options{PoolSize: 10, OutChannelBuffer: 10})
	go func() {
		for work := 0; work < max; work++ {
			inputChan <- &concurrently.OrderedInput{work}
		}
		close(inputChan)
	}()
	for out := range output {
		log.Println(out.Value)
	}
}
Example - 2
func main() {
	max := 100
	// Can be a non blocking channel as well
	inputChan := make(chan *concurrently.OrderedInput)
	wg := &sync.WaitGroup{}

	outChan := concurrently.Process(inputChan, workFn, &concurrently.Options{PoolSize: 10})
	go func() {
		for out := range outChan {
			log.Println(out.Value)
			wg.Done()
		}
	}()

	// Create work and sent to input channel
	// Output will be in the order of input
	for work := 0; work < max; work++ {
		wg.Add(1)
		input := &concurrently.OrderedInput{work}
		inputChan <- input
	}
	close(inputChan)
	wg.Wait()
}

Credits

  1. u/justinisrael for inputs on improving resource usage.
  2. mh-cbon for identifying potential deadlocks.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Process

func Process(inputChan <-chan *OrderedInput, wf WorkFunction, options *Options) <-chan *OrderedOutput

Process processes work function based on input. It Accepts an OrderedInput read channel, work function and concurrent go routine pool size. It Returns an OrderedOutput channel.

Types

type Options

type Options struct {
	PoolSize         int
	OutChannelBuffer int
}

Options options for Process

type OrderedInput

type OrderedInput struct {
	Value interface{}
}

OrderedInput input for Processing

type OrderedOutput

type OrderedOutput struct {
	Value interface{}
}

OrderedOutput is the output channel type from Process

type WorkFunction

type WorkFunction func(interface{}) interface{}

WorkFunction the function which performs work

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL