channelx

package module
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 21, 2021 License: MIT Imports: 9 Imported by: 0

README

channelx

Go Report Card build codecov GoDoc

Some useful tools implemented by channel to increase development efficiency, e.g. event bus, stream, promise, actor, parallel runner, aggregator, etc..

blogs

Parallel runner

A simple util to run tasks in parallel

worker := func(ctx context.Context, input interface{}) (interface{}, error) {
    num := input.(int)
    return num+1, nil
}

inputs := []interface{}{1,2,3,4,5}
outputs, err := channelx.RunInParallel(context.Background(), inputs, worker, 4)

more examples, please check parallel_runner_test.go

EventBus

A PubSub pattern util

logger := channelx.NewConsoleLogger()
eventBus := channelx.NewEventBus(logger,  4,4,2, time.Second, 5 * time.Second)
eventBus.Start()

handler := NewExampleHandler(logger)
eventBus.Subscribe(ExampleEventID, handler)
eventBus.Publish(NewExampleEvent())

eventBus.Stop()

more details, please check event_bus_test.go#TestEventBus_Example

Promise

A golang style async/await, even I call it Promise, while the api is not 100% aligns with Javascript Promise.

promise := NewPromise(func() (res interface{}, err error) {
    // do work asynchronously here
    reuturn
}).Then(func(input interface{}) (interface{}, error) {
    // here is the succss handler, which aslo runs asynchronously 
}, func(err error) interface{} {
    // here is the error handler, which aslo runs asynchronously 
})

// await: wait until it completes.
res, _ := promise.Done()

more examples, please check promise_test.go

Actor

The actor pattern is also called as Active Object, it seems like Promise, but the difference is Actor can be reused, and it is FIFO.

actor := NewActor(SetActorBuffer(0))
defer actor.Close()

// do some work asynchroniously.
call := actor.Do(func() (interface{}, error) {
    time.Sleep(0 * time.Second)
    return 0, nil
})

// can to some other synchroniouse work here
// ......

// wait for the call completes.
res, err := call.Done()

more examples, please check actor_test.go

Stream

Steam works like Node.Js stream, it can be piped and data flows through the pipe one by one.

before
var multipleChan = make(chan int, 4)
var minusChan = make(chan int, 4)
var harvestChan = make(chan int, 4)

defer close(multipleChan)
defer close(minusChan)
defer close(harvestChan)

go func() {
    for i:=1;i<=100;i++{
        multipleChan <- i
    }
}()

for i:=0; i<4; i++{
    go func() {
        for data := range multipleChan {
            minusChan <- data * 2
            time.Sleep(10* time.Millisecond)
        }
    }()

    go func() {
        for data := range minusChan {
            harvestChan <- data - 1
            time.Sleep(10* time.Millisecond)
        }
    }()
}

var sum = 0
var index = 0
for data := range harvestChan{
    sum += data
    index++
    if index == 100{
        break
    }
}

fmt.Println(sum)
after
var sum = 0

NewChannelStream(func(seedChan chan<- Item, quitChannel chan struct{}) {
    for i:=1; i<=100;i++{
        seedChan <- Item{Data:i}
    }
    close(seedChan) //don't forget to close it
}).Pipe(func(Item Item) Item {
    return Item{Data: Item.Data.(int) * 2}
}).Pipe(func(Item Item) Item {
    return Item{Data: Item.Data.(int) - 1}
}).Harvest(func(Item Item) {
    sum += Item.Data.(int)
})

fmt.Println(sum)

more examples, please check stream_test.go

Aggregator

Aggregator is used for the scenario that receives request one by one while handle them in a batch would increase efficiency.

// YourKnownType, YourBatchHandler, yourRequest are faked type or object

batchProcess := func(items []interface{}) error {
    var arr YourKnownType 
    for _, item := range items{
        ykt := item.(YourKnownType)
        arr = append(arr, ykt)
    }
    
    YourBatchHandler(arr)
}

aggregator := NewAggregator(batchProcess)

aggregator.Start()

aggregator.Enqueue(yourRequest)

aggregator.Stop()

more examples, please check aggregator_test.go

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewConsoleLogger

func NewConsoleLogger() *consoleLogger

func RegisterEventName added in v0.3.0

func RegisterEventName(eventID EventID, name string)

RegisterEventName registers event name

func ResumeWhenHasError

func ResumeWhenHasError() func(p *ChannelStream)

An option means resume the stream when has error

func RunInParallel added in v0.2.0

func RunInParallel(ctx context.Context, inputs []interface{}, worker func(context.Context, interface{}) (interface{}, error), workers int) ([]interface{}, error)

RunInParallel is the short cut for ParallelRunner's Run

func SetWorkers

func SetWorkers(workers int) func(p *ChannelStream)

An option to set the count of go routines in the stream

func StopWhenHasError

func StopWhenHasError() func(p *ChannelStream)

An option means stop the stream when has error

Types

type Actor added in v0.2.0

type Actor struct {
	// contains filtered or unexported fields
}

Represents the actor

func NewActor added in v0.2.0

func NewActor(setActorOptionFuncs ...SetActorOptionFunc) *Actor

Creates a new actor

func (*Actor) Close added in v0.2.0

func (actor *Actor) Close()

Close actor

func (*Actor) Do added in v0.2.0

func (actor *Actor) Do(workFunc WorkFunc) *Promise

Do a work.

type Aggregator

type Aggregator struct {
	// contains filtered or unexported fields
}

Represents the aggregator

func NewAggregator

func NewAggregator(batchProcessor BatchProcessFunc, optionFuncs ...SetAggregatorOptionFunc) *Aggregator

Creates a new aggregator

func (*Aggregator) Enqueue

func (agt *Aggregator) Enqueue(item interface{})

Enqueue an item, will be blocked if the queue is full

func (*Aggregator) SafeStop

func (agt *Aggregator) SafeStop()

Stop the aggregator safely, the difference with Stop is it guarantees no item is missed during stop

func (*Aggregator) Start

func (agt *Aggregator) Start()

Start the aggregator

func (*Aggregator) Stop

func (agt *Aggregator) Stop()

Stop the aggregator

func (*Aggregator) TryEnqueue

func (agt *Aggregator) TryEnqueue(item interface{}) bool

Try enqueue an item, and it is non-blocked

type AggregatorOption

type AggregatorOption struct {
	BatchSize         int
	Workers           int
	ChannelBufferSize int
	LingerTime        time.Duration
	ErrorHandler      ErrorHandlerFunc
	Logger            Logger
}

Represents the aggregator option

type BatchProcessFunc

type BatchProcessFunc func([]interface{}) error

the func to batch process items

type ChannelStream

type ChannelStream struct {
	// contains filtered or unexported fields
}

Represent the channel stream

func NewChannelStream

func NewChannelStream(seedFunc SeedFunc, optionFuncs ...SetStreamOptionFunc) *ChannelStream

Create a new channel stream

func (*ChannelStream) Cancel

func (p *ChannelStream) Cancel()

Cancel current stream

func (*ChannelStream) Drain

func (p *ChannelStream) Drain() (bool, []error)

Drain the output of current stream to make sure all the items got processed

func (*ChannelStream) Harvest

func (p *ChannelStream) Harvest(harvestFunc HarvestFunc) (bool, []error)

Harvest the output of current stream

func (*ChannelStream) Pipe

func (p *ChannelStream) Pipe(dataPipeFunc PipeFunc, optionFuncs ...SetStreamOptionFunc) *ChannelStream

Pipe current steam output as another stream's input

func (*ChannelStream) Race

func (p *ChannelStream) Race(raceFunc RaceFunc)

Set race condition of current stream's output

type ErrorHandler added in v0.2.0

type ErrorHandler func(error) interface{}

Represent the error handler

type ErrorHandlerFunc

type ErrorHandlerFunc func(err error, items []interface{}, batchProcessFunc BatchProcessFunc, aggregator *Aggregator)

the func to handle error

type Event added in v0.3.0

type Event interface {
	ID() EventID
}

Event ...

type EventBus added in v0.3.0

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus ...

func NewEventBus added in v0.3.0

func NewEventBus(
	logger Logger,
	chanBuffer,
	eventWorkers, autoRetryTimes int,
	retryInterval, timeout time.Duration,
) *EventBus

NewEventBus ...

func (*EventBus) Publish added in v0.3.0

func (eb *EventBus) Publish(evt Event) <-chan JobStatus

Publish ...

func (*EventBus) Start added in v0.3.0

func (eb *EventBus) Start()

Start ...

func (*EventBus) Stop added in v0.3.0

func (eb *EventBus) Stop()

Stop ....

func (*EventBus) Subscribe added in v0.3.0

func (eb *EventBus) Subscribe(eventID EventID, handlers ...EventHandler)

Subscribe ...

func (*EventBus) Unsubscribe added in v0.3.0

func (eb *EventBus) Unsubscribe(eventID EventID, handlers ...EventHandler)

Unsubscribe ...

type EventHandler added in v0.3.0

type EventHandler interface {
	OnEvent(ctx context.Context, event Event) error
	Logger() Logger
	CanAutoRetry(err error) bool
}

EventHandler ...

type EventID added in v0.3.0

type EventID int64

EventID is type of events

func (EventID) Name added in v0.3.0

func (id EventID) Name() string

Name returns the string name for the passed eventID

type EventJob added in v0.3.0

type EventJob struct {
	// contains filtered or unexported fields
}

EventJob ...

type HarvestFunc

type HarvestFunc func(item Item)

The func to harvest in Harvest

type Item

type Item struct {
	Data interface{}
	Err  error
}

Represent the item in the stream

type JobStatus added in v0.3.0

type JobStatus struct {
	RunAt      time.Time
	FinishedAt time.Time
	Err        error
}

JobStatus holds information related to a job status

type Logger

type Logger interface {
	Debugf(str string, args ...interface{})
	Infof(str string, args ...interface{})
	Warnf(str string, args ...interface{})
	Errorf(str string, args ...interface{})
}

type ParallelRunner added in v0.2.1

type ParallelRunner struct {
	// contains filtered or unexported fields
}

ParallelRunner represents the runner configurations

func NewParallelRunner added in v0.2.1

func NewParallelRunner(workers int, configurations ...SetParallelRunner) *ParallelRunner

NewParallelRunner creates a new ParallelRunner

func (ParallelRunner) Run added in v0.2.1

func (pr ParallelRunner) Run(ctx context.Context, inputs []interface{}, worker func(context.Context, interface{}) (interface{}, error)) ([]interface{}, []error)

Run will process the requests in parallel

type PipeFunc

type PipeFunc func(item Item) Item

The func to work in Pipe

type Promise added in v0.2.0

type Promise struct {
	// contains filtered or unexported fields
}

Represent the promise

func NewPromise added in v0.2.0

func NewPromise(workFunc WorkFunc) *Promise

Create a new promise

func (*Promise) Done added in v0.2.0

func (p *Promise) Done() (interface{}, error)

Wait until the promise is done and get response.

func (*Promise) Then added in v0.2.0

func (p *Promise) Then(successHandler SuccessHandler, errorHandler ErrorHandler) *Promise

Chain the promise with success handler and error handler

func (*Promise) ThenError added in v0.2.0

func (p *Promise) ThenError(errorHandler ErrorHandler) *Promise

Chain the promise with error handler

func (*Promise) ThenSuccess added in v0.2.0

func (p *Promise) ThenSuccess(successHandler SuccessHandler) *Promise

Chain the promise with success handler

type RaceFunc

type RaceFunc func(item Item) bool

The func as a condition in Race

type SeedFunc

type SeedFunc func(seedChan chan<- Item, quitChannel chan struct{})

The func to generate the seed in NewChannelStream

type SetActorOptionFunc added in v0.2.0

type SetActorOptionFunc func(actor *Actor)

Represents the func to set actor option

func SetActorBuffer added in v0.2.0

func SetActorBuffer(buffer int) SetActorOptionFunc

Set actor buffer

type SetAggregatorOptionFunc added in v0.2.0

type SetAggregatorOptionFunc func(option AggregatorOption) AggregatorOption

the func to set option for aggregator

type SetParallelRunner added in v0.2.1

type SetParallelRunner func(runner *ParallelRunner)

SetParallelRunner represents the runner configuration setter

func QuitOnError added in v0.2.1

func QuitOnError() SetParallelRunner

QuitOnError set runner quit on error

func QuitWhenAllFinished added in v0.2.1

func QuitWhenAllFinished() SetParallelRunner

QuitWhenAllFinished sets runner quit when all request are finished

type SetStreamOptionFunc added in v0.2.0

type SetStreamOptionFunc func(cs *ChannelStream)

The func to set option in NewChannelStream/Pipe

type SuccessHandler added in v0.2.0

type SuccessHandler func(interface{}) (interface{}, error)

Represent the success handler

type WorkFunc added in v0.2.0

type WorkFunc func() (interface{}, error)

Represent the work function

Directories

Path Synopsis
Package channelx_mock is a generated GoMock package.
Package channelx_mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL