parallel

package module
v0.0.0-...-435283f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2023 License: MIT Imports: 7 Imported by: 0

README

parallel

where xargs meets foreman or goreman for controlled, concurrent execution of shell commands.

inspired by https://www.gnu.org/software/parallel/

builds

Build Status

build:

go build cmd/*.go

or just:

make travis

Demos:

compress/decompress 10 large files (serially):
 # ls -sh
total 2224400
222440 1.txt  222440 2.txt  222440 4.txt  222440 6.txt  222440 8.txt
222440 10.txt 222440 3.txt  222440 5.txt  222440 7.txt  222440 9.txt

 # time gzip *.txt
real	0m11.662s
user	0m11.001s
sys	0m0.564s

 # time gunzip *.gz
real	0m1.750s
user	0m0.755s
sys	0m0.466s
compress/decompress 10 large files (concurrently with parallel):
time for f in *.txt ; do echo gzip $f; done | $GOPATH/bin/parallel -j 10
[       98.86µs 11:36:51 0] concurrency limit: 10
[     141.504µs 11:36:51 0] reading from stdin...
[     215.997µs 11:36:51 9] run: 'gzip 9.txt'
[     258.119µs 11:36:51 1] run: 'gzip 10.txt'
[     279.051µs 11:36:51 3] run: 'gzip 3.txt'
[    5.380117ms 11:36:51 7] run: 'gzip 7.txt'
[    5.446718ms 11:36:51 5] run: 'gzip 5.txt'
[    5.522043ms 11:36:51 2] run: 'gzip 2.txt'
[    7.693391ms 11:36:51 8] run: 'gzip 8.txt'
[    9.895031ms 11:36:51 6] run: 'gzip 6.txt'
[   10.003485ms 11:36:51 0] run: 'gzip 1.txt'
[   10.040754ms 11:36:51 4] run: 'gzip 4.txt'
[  2.670755505s 11:36:54 9] done: dt: 2.670561141s
[  2.723745718s 11:36:54 1] done: dt: 2.723551945s
[  2.732728457s 11:36:54 3] done: dt: 2.732508999s
[  2.748645989s 11:36:54 8] done: dt: 2.748406607s
[  2.762459047s 11:36:54 5] done: dt: 2.762266508s
[  2.777302364s 11:36:54 7] done: dt: 2.777068367s
[  2.785686994s 11:36:54 4] done: dt: 2.785451804s
[  2.796846523s 11:36:54 2] done: dt: 2.796563159s
[  2.810195841s 11:36:54 0] done: dt: 2.809967849s
[  2.817260242s 11:36:54 6] done: dt: 2.81703108s
[  2.817275381s 11:36:54 0] all done: dt: 2.817269836s

real	0m2.831s
user	0m16.502s
sys	0m0.831s


time for f in *.txt.gz ; do echo gunzip $f; done | $GOPATH/bin/parallel -j 10
[     105.518µs 11:41:18 0] concurrency limit: 10
[     150.091µs 11:41:18 0] reading from stdin...
[     232.374µs 11:41:18 9] run: 'gunzip 9.txt.gz'
[      279.31µs 11:41:18 1] run: 'gunzip 10.txt.gz'
[    1.434552ms 11:41:18 4] run: 'gunzip 4.txt.gz'
[    1.506747ms 11:41:18 6] run: 'gunzip 6.txt.gz'
[    4.912507ms 11:41:18 2] run: 'gunzip 2.txt.gz'
[    8.693375ms 11:41:18 0] run: 'gunzip 1.txt.gz'
[    8.733623ms 11:41:18 5] run: 'gunzip 5.txt.gz'
[   10.506841ms 11:41:18 3] run: 'gunzip 3.txt.gz'
[   12.224203ms 11:41:18 7] run: 'gunzip 7.txt.gz'
[   14.152521ms 11:41:18 8] run: 'gunzip 8.txt.gz'
[  1.191842637s 11:41:19 1] done: dt: 1.191617208s
[  1.223837314s 11:41:19 9] done: dt: 1.223637668s
[  1.240858566s 11:41:19 4] done: dt: 1.240603349s
[  1.255985175s 11:41:19 8] done: dt: 1.255724068s
[  1.273008068s 11:41:19 5] done: dt: 1.272772052s
[  1.282475414s 11:41:19 2] done: dt: 1.282202009s
[  1.288824013s 11:41:19 0] done: dt: 1.288580036s
[  1.294614678s 11:41:19 6] done: dt: 1.294366962s
[  1.295771009s 11:41:19 3] done: dt: 1.295523197s
[  1.299966607s 11:41:19 7] done: dt: 1.29971294s
[  1.299979953s 11:41:19 0] all done: dt: 1.299975556s

real	0m1.312s
user	0m0.954s
sys	0m0.674s

run concurrent jobs (pings)
echo -e " ping www.google.com\n ping apple.com\n \n\n" | ./parallel -j 10 -v

[56.609µs        15:06:07 000 I] concurrency limit: 10
[204.437µs       15:06:07 004 I] start: ''
[231.207µs       15:06:07 001 I] start: 'ping apple.com'
[269.186µs       15:06:07 002 I] start: ''
[307.086µs       15:06:07 000 I] start: 'ping www.google.com'
[400.384µs       15:06:07 003 I] start: ''
[676.3µs         15:06:07 004 I] execute: done: dt: 478.343µs
[749.079µs       15:06:07 002 I] execute: done: dt: 517.862µs
[851.008µs       15:06:07 003 I] execute: done: dt: 508.883µs
[29.482275ms     15:06:07 001 I] PING apple.com (17.253.144.10) 56(84) bytes of data.
[29.496675ms     15:06:07 001 I] 64 bytes from icloud.com (17.253.144.10): icmp_seq=1 ttl=58 time=26.2 ms
[99.010999ms     15:06:07 000 I] PING www.google.com(yx-in-x69.1e100.net (2607:f8b0:4002:c08::69)) 56 data bytes
[99.028179ms     15:06:07 000 I] 64 bytes from yx-in-x69.1e100.net (2607:f8b0:4002:c08::69): icmp_seq=1 ttl=103 time=92.8 ms
[1.027270463s    15:06:07 001 I] 64 bytes from icloud.com (17.253.144.10): icmp_seq=2 ttl=58 time=22.4 ms
[1.100067039s    15:06:07 000 I] 64 bytes from yx-in-x69.1e100.net (2607:f8b0:4002:c08::69): icmp_seq=2 ttl=102 time=92.8 ms

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ParallelPingResult_Success_DEFAULT string

Functions

This section is empty.

Types

type Cmd

type Cmd struct {
	CmdLine string `thrift:"cmdLine,1" db:"cmdLine" json:"cmdLine"`
	Ticket  int64  `thrift:"ticket,2" db:"ticket" json:"ticket"`
}

Attributes:

  • CmdLine
  • Ticket
var ParallelExecuteArgs_Command_DEFAULT *Cmd

func NewCmd

func NewCmd() *Cmd

func (*Cmd) GetCmdLine

func (p *Cmd) GetCmdLine() string

func (*Cmd) GetTicket

func (p *Cmd) GetTicket() int64

func (*Cmd) Read

func (p *Cmd) Read(iprot thrift.TProtocol) error

func (*Cmd) ReadField1

func (p *Cmd) ReadField1(iprot thrift.TProtocol) error

func (*Cmd) ReadField2

func (p *Cmd) ReadField2(iprot thrift.TProtocol) error

func (*Cmd) String

func (p *Cmd) String() string

func (*Cmd) Write

func (p *Cmd) Write(oprot thrift.TProtocol) error

type ExecuteException

type ExecuteException struct {
	What   string  `thrift:"what,1" db:"what" json:"what"`
	Output *Output `thrift:"output,2" db:"output" json:"output"`
}

Attributes:

  • What
  • Output
var ParallelExecuteResult_E_DEFAULT *ExecuteException

func NewExecuteException

func NewExecuteException() *ExecuteException

func (*ExecuteException) Error

func (p *ExecuteException) Error() string

func (*ExecuteException) GetOutput

func (p *ExecuteException) GetOutput() *Output

func (*ExecuteException) GetWhat

func (p *ExecuteException) GetWhat() string

func (*ExecuteException) IsSetOutput

func (p *ExecuteException) IsSetOutput() bool

func (*ExecuteException) Read

func (p *ExecuteException) Read(iprot thrift.TProtocol) error

func (*ExecuteException) ReadField1

func (p *ExecuteException) ReadField1(iprot thrift.TProtocol) error

func (*ExecuteException) ReadField2

func (p *ExecuteException) ReadField2(iprot thrift.TProtocol) error

func (*ExecuteException) String

func (p *ExecuteException) String() string

func (*ExecuteException) Write

func (p *ExecuteException) Write(oprot thrift.TProtocol) error

type Output

type Output struct {
	Stdout string            `thrift:"stdout,1" db:"stdout" json:"stdout"`
	Stderr string            `thrift:"stderr,2" db:"stderr" json:"stderr"`
	Tags   map[string]string `thrift:"tags,3" db:"tags" json:"tags"`
}

Attributes:

  • Stdout
  • Stderr
  • Tags
var ExecuteException_Output_DEFAULT *Output
var ParallelExecuteResult_Success_DEFAULT *Output

func NewOutput

func NewOutput() *Output

func (*Output) GetStderr

func (p *Output) GetStderr() string

func (*Output) GetStdout

func (p *Output) GetStdout() string

func (*Output) GetTags

func (p *Output) GetTags() map[string]string

func (*Output) Read

func (p *Output) Read(iprot thrift.TProtocol) error

func (*Output) ReadField1

func (p *Output) ReadField1(iprot thrift.TProtocol) error

func (*Output) ReadField2

func (p *Output) ReadField2(iprot thrift.TProtocol) error

func (*Output) ReadField3

func (p *Output) ReadField3(iprot thrift.TProtocol) error

func (*Output) String

func (p *Output) String() string

func (*Output) Write

func (p *Output) Write(oprot thrift.TProtocol) error

type Parallel

type Parallel interface {
	Ping(ctx context.Context) (r string, err error)
	// Parameters:
	//  - Command
	Execute(ctx context.Context, command *Cmd) (r *Output, err error)
}

type ParallelClient

type ParallelClient struct {
	// contains filtered or unexported fields
}

func NewParallelClient

func NewParallelClient(c thrift.TClient) *ParallelClient

func NewParallelClientProtocol

func NewParallelClientProtocol(t thrift.TTransport, iprot thrift.TProtocol, oprot thrift.TProtocol) *ParallelClient

func (*ParallelClient) Client_

func (p *ParallelClient) Client_() thrift.TClient

func (*ParallelClient) Execute

func (p *ParallelClient) Execute(ctx context.Context, command *Cmd) (r *Output, err error)

Parameters:

  • Command

func (*ParallelClient) Ping

func (p *ParallelClient) Ping(ctx context.Context) (r string, err error)

type ParallelExecuteArgs

type ParallelExecuteArgs struct {
	Command *Cmd `thrift:"command,1" db:"command" json:"command"`
}

Attributes:

  • Command

func NewParallelExecuteArgs

func NewParallelExecuteArgs() *ParallelExecuteArgs

func (*ParallelExecuteArgs) GetCommand

func (p *ParallelExecuteArgs) GetCommand() *Cmd

func (*ParallelExecuteArgs) IsSetCommand

func (p *ParallelExecuteArgs) IsSetCommand() bool

func (*ParallelExecuteArgs) Read

func (p *ParallelExecuteArgs) Read(iprot thrift.TProtocol) error

func (*ParallelExecuteArgs) ReadField1

func (p *ParallelExecuteArgs) ReadField1(iprot thrift.TProtocol) error

func (*ParallelExecuteArgs) String

func (p *ParallelExecuteArgs) String() string

func (*ParallelExecuteArgs) Write

func (p *ParallelExecuteArgs) Write(oprot thrift.TProtocol) error

type ParallelExecuteResult

type ParallelExecuteResult struct {
	Success *Output           `thrift:"success,0" db:"success" json:"success,omitempty"`
	E       *ExecuteException `thrift:"e,1" db:"e" json:"e,omitempty"`
}

Attributes:

  • Success
  • E

func NewParallelExecuteResult

func NewParallelExecuteResult() *ParallelExecuteResult

func (*ParallelExecuteResult) GetE

func (*ParallelExecuteResult) GetSuccess

func (p *ParallelExecuteResult) GetSuccess() *Output

func (*ParallelExecuteResult) IsSetE

func (p *ParallelExecuteResult) IsSetE() bool

func (*ParallelExecuteResult) IsSetSuccess

func (p *ParallelExecuteResult) IsSetSuccess() bool

func (*ParallelExecuteResult) Read

func (*ParallelExecuteResult) ReadField0

func (p *ParallelExecuteResult) ReadField0(iprot thrift.TProtocol) error

func (*ParallelExecuteResult) ReadField1

func (p *ParallelExecuteResult) ReadField1(iprot thrift.TProtocol) error

func (*ParallelExecuteResult) String

func (p *ParallelExecuteResult) String() string

func (*ParallelExecuteResult) Write

func (p *ParallelExecuteResult) Write(oprot thrift.TProtocol) error

type ParallelPingArgs

type ParallelPingArgs struct {
}

func NewParallelPingArgs

func NewParallelPingArgs() *ParallelPingArgs

func (*ParallelPingArgs) Read

func (p *ParallelPingArgs) Read(iprot thrift.TProtocol) error

func (*ParallelPingArgs) String

func (p *ParallelPingArgs) String() string

func (*ParallelPingArgs) Write

func (p *ParallelPingArgs) Write(oprot thrift.TProtocol) error

type ParallelPingResult

type ParallelPingResult struct {
	Success *string `thrift:"success,0" db:"success" json:"success,omitempty"`
}

Attributes:

  • Success

func NewParallelPingResult

func NewParallelPingResult() *ParallelPingResult

func (*ParallelPingResult) GetSuccess

func (p *ParallelPingResult) GetSuccess() string

func (*ParallelPingResult) IsSetSuccess

func (p *ParallelPingResult) IsSetSuccess() bool

func (*ParallelPingResult) Read

func (p *ParallelPingResult) Read(iprot thrift.TProtocol) error

func (*ParallelPingResult) ReadField0

func (p *ParallelPingResult) ReadField0(iprot thrift.TProtocol) error

func (*ParallelPingResult) String

func (p *ParallelPingResult) String() string

func (*ParallelPingResult) Write

func (p *ParallelPingResult) Write(oprot thrift.TProtocol) error

type ParallelProcessor

type ParallelProcessor struct {
	// contains filtered or unexported fields
}

func NewParallelProcessor

func NewParallelProcessor(handler Parallel) *ParallelProcessor

func (*ParallelProcessor) AddToProcessorMap

func (p *ParallelProcessor) AddToProcessorMap(key string, processor thrift.TProcessorFunction)

func (*ParallelProcessor) GetProcessorFunction

func (p *ParallelProcessor) GetProcessorFunction(key string) (processor thrift.TProcessorFunction, ok bool)

func (*ParallelProcessor) Process

func (p *ParallelProcessor) Process(ctx context.Context, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException)

func (*ParallelProcessor) ProcessorMap

func (p *ParallelProcessor) ProcessorMap() map[string]thrift.TProcessorFunction

type Result_

type Result_ int64
const (
	Result__OK    Result_ = 0
	Result__ERROR Result_ = 2
)

func Result_FromString

func Result_FromString(s string) (Result_, error)

func Result_Ptr

func Result_Ptr(v Result_) *Result_

func (Result_) MarshalText

func (p Result_) MarshalText() ([]byte, error)

func (*Result_) Scan

func (p *Result_) Scan(value interface{}) error

func (Result_) String

func (p Result_) String() string

func (*Result_) UnmarshalText

func (p *Result_) UnmarshalText(text []byte) error

func (*Result_) Value

func (p *Result_) Value() (driver.Value, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL