parallel

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2024 License: MIT Imports: 7 Imported by: 10

README

parallel

Process lines in parallel.

This package helps to increase the performance of command line applications, that transform data and where data is read in a mostly line orientied fashion.

Note: The order of the input lines is not preserved in the output.

The main type is a parallel.Processor, which reads from an io.Reader, applies a function to each input line (separated by a newline by default) and writes the result to an io.Writer.

The transformation function takes a byte slice and therefore does not assume any specific format, so the input may be plain lines, CSV, newline delimited JSON or similar line oriented formats. The output is just bytes and can again assume any format.

An example for a simple transformation that does nothing:

func Noop(b []byte) ([]byte, error) {
	return b, nil
}

We can connect this function to IO and let it run:

p := parallel.NewProcessor(os.Stdin, os.Stdout, Noop)
if err := p.Run(); err != nil {
	log.Fatal(err)
}

That's all the setup needed. For details and self contained programs, see examples.

The processer expects a parallel.TransformerFunc. There are some functions, that take a byte slice and and return a byte slice, but do not return an error (an example would be bytes.ToUpper). These functions can be turned into a TransformerFunc with a simple helper:

p := parallel.NewProcessor(os.Stdin, os.Stdout, parallel.ToTransformerFunc(bytes.ToUpper))
if err := p.Run(); err != nil {
	log.Fatal(err)
}

Full Example

// Uppercases each line. Order of lines is not preserved.
//
//     $ printf "hello\nhi\n" | go run examples/uppercase.go
//     HELLO
//     HI

package main

import (
	"bytes"
	"log"
	"os"

	"github.com/miku/parallel"
)

func main() {
	// Setup input, output and business logic.
	p := parallel.NewProcessor(os.Stdin, os.Stdout, func(b []byte) ([]byte, error) {
		return bytes.ToUpper(b), nil
	})

	// Start processing with parallel workers.
	if err := p.Run(); err != nil {
		log.Fatal(err)
	}
}

Adjusting the processor

The processor has a few attributes, that can be adjusted prior to running:

p := parallel.NewProcessor(os.Stdin, os.Stdout, parallel.ToTransformerFunc(bytes.ToUpper))

// Adjust processor options.
p.NumWorkers = 4          // number of workers (default to runtime.NumCPU())
p.BatchSize = 10000       // how many records to batch, before sending to a worker
p.RecordSeparator = '\n'  // record separator (must be a byte at the moment)

if err := p.Run(); err != nil {
	log.Fatal(err)
}

The default should be ok for a lot of use cases. Batches are kept in memory, so higher batch sizes will need more memory but will decrease the coordination overhead. Sometimes, a batch size of one can be useful too.

Experimental arbitrary record support

scan_test.go contains an example of how to use a bufio.SplitFunc to separate records. The original parallel implementation only looked at lines (akin to bufio.ScanLines), but there are other potential use cases, such as parallel parsing of XML.

Random performance data point

Combining parallel with a fast JSON library, such as jsoniter, one can process up to 100000 JSON documents (of about 1K in size) per second. Here is an example snippet.

Documentation

Overview

Package parallel implements helpers for fast processing of line oriented inputs.

Index

Constants

View Source
const Version = "0.1.3"

Version of library.

Variables

New is a preferred way to create a new parallel processor.

Functions

This section is empty.

Types

type BytesBatch

type BytesBatch struct {
	// contains filtered or unexported fields
}

BytesBatch is a slice of byte slices.

func NewBytesBatch

func NewBytesBatch() *BytesBatch

NewBytesBatch creates a new BytesBatch with a given capacity.

func NewBytesBatchCapacity

func NewBytesBatchCapacity(cap int) *BytesBatch

NewBytesBatchCapacity creates a new BytesBatch with a given capacity.

func (*BytesBatch) Add

func (bb *BytesBatch) Add(b []byte)

Add adds an element to the batch.

func (*BytesBatch) Reset

func (bb *BytesBatch) Reset()

Reset empties this batch.

func (*BytesBatch) Size

func (bb *BytesBatch) Size() int

Size returns the number of elements in the batch.

func (*BytesBatch) Slice

func (bb *BytesBatch) Slice() [][]byte

Slice returns a slice of byte slices.

type Processor

type Processor struct {
	BatchSize       int
	RecordSeparator byte
	NumWorkers      int
	SkipEmptyLines  bool
	Verbose         bool
	R               io.Reader
	W               io.Writer
	F               TransformerFunc
}

Processor can process lines in parallel.

func NewProcessor

func NewProcessor(r io.Reader, w io.Writer, f TransformerFunc) *Processor

NewProcessor creates a new line processor.

func (*Processor) Run

func (p *Processor) Run() error

Run starts the workers, crunching through the input.

func (*Processor) RunWorkers

func (p *Processor) RunWorkers(numWorkers int) error

RunWorkers allows to quickly set the number of workers.

type SimpleTransformerFunc

type SimpleTransformerFunc func([]byte) []byte

SimpleTransformerFunc converts bytes to bytes.

type TransformerFunc

type TransformerFunc func([]byte) ([]byte, error)

TransformerFunc takes a slice of bytes and returns a slice of bytes and a an error. A common denominator of functions that transform data.

func ToTransformerFunc

func ToTransformerFunc(f SimpleTransformerFunc) TransformerFunc

ToTransformerFunc takes a simple transformer and wraps it so it can be used in places where a TransformerFunc is expected.

Directories

Path Synopsis
examples
extract
Extract a value from a JSON document.
Extract a value from a JSON document.
filter
Example for a JSON filter.
Example for a JSON filter.
reverse
reverses lines.
reverses lines.
Package scan accepts a bufio.SplitFunc and generalizes batches to non-line oriented input, e.g.
Package scan accepts a bufio.SplitFunc and generalizes batches to non-line oriented input, e.g.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL