concurrency

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2021 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AsChan

func AsChan(vs ...interface{}) <-chan interface{}

AsChan - sends the contents of a slice through a channel

func BCasterEventTransformFn

func BCasterEventTransformFn(b *BCaster, input interface{}) interface{}

BCasterEventTransformFn - Gets the bcaster and input message the output in the form of an event

func Bridge

func Bridge(done <-chan interface{}, chanStream <-chan <-chan interface{}) chan interface{}

Bridge is a way to present a single-channel facade over a channel of channels. It is used to consume values from a sequence of channels (channel of channels) doing an ordered write from different sources. By bridging the channels it destructures the channel of channels into a simple channel, allowing to multiplex the input and simplify the consumption. With this pattern we can use the channel of channels from within a single range statement and focus on our loop’s logic.

func CloseChannel

func CloseChannel(ch chan interface{})

CloseChannel - Checks if the channel is not closed and closes it

func FanIn

func FanIn(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{}

FanIn - combines multiple results in the form of an slice of channels into one channel. This implementation uses a waitgroup in order to multiplex all the results of the slice of channels. The output is not produced in sequence. This pattern is good when order is not important

func FanInRec

func FanInRec(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{}

FanInRec - combines multiple results in the form of an slice of channels into one channel. This implementation uses a a recursive approach in order to multiplex all the results of the slice of channels. The output is not produced in sequence. This pattern is good when order is not important

func FilterEventTransformFn

func FilterEventTransformFn(f *Filter, input interface{}) interface{}

FilterEventTransformFn - Gets the input event returns it in the form of an output event with the sequence of the filter

func Or

func Or(channels ...<-chan interface{}) <-chan interface{}

Or - returns the value of the fastest channel.

func OrDone

func OrDone(done, c <-chan interface{}) chan interface{}

OrDone - Wraps a channel with a select statement that also selects from a done channel. Allows to cancel the channel avoiding go-routine leaks

func OrDoneChanParamFn

func OrDoneChanParamFn(done, c <-chan interface{}, fn func(param chan interface{}), param chan interface{}) chan interface{}

OrDoneChanParamFn - Wraps a channel with a select statement that also selects from a done channel with a function with a channel as parameter that can be run before returning. Allows to cancel the channel avoiding go-routine leaks.

func OrDoneChanParamsFn

func OrDoneChanParamsFn(done, c <-chan interface{}, fn func(params ...chan interface{}), params ...chan interface{}) chan interface{}

OrDoneChanParamsFn - Wraps a channel with a select statement that also selects from a done channel with a function with a list of channels as parameters that can be run before returning. Allows to cancel the channel avoiding go-routine leaks.

func OrDoneFn

func OrDoneFn(done, c <-chan interface{}, fn func()) chan interface{}

OrDoneFn - Wraps a channel with a select statement that also selects from a done channel with a function that can be run before returning. Allows to cancel the channel avoiding go-routine leaks

func OrDoneParamFn

func OrDoneParamFn(done, c <-chan interface{}, fn func(param interface{}), param interface{}) chan interface{}

OrDoneParamFn - Wraps a channel with a select statement that also selects from a done channel with a function with parameter that can be run before returning. Allows to cancel the channel avoiding go-routine leaks.

func OrDoneParamsFn

func OrDoneParamsFn(done, c <-chan interface{}, fn func(params ...interface{}), params ...interface{}) chan interface{}

OrDoneParamsFn - Wraps a channel with a select statement that also selects from a done channel with a function with a list of parameters that can be run before returning. Allows to cancel the channel avoiding go-routine leaks.

func ProcessorEventTransformFn

func ProcessorEventTransformFn(p *Processor, input interface{}, result interface{}) interface{}

ProcessorEventTransformFn - Gets the processor, input event and result message and returns the processed output in the form of an event

func Repeat

func Repeat(done <-chan interface{}, values ...interface{}) chan interface{}

Repeat - Generator that repeats the values defined on the values slice indefinitely.

func RepeatChanParamFn

func RepeatChanParamFn(done, c <-chan interface{}, fn func(param chan interface{}) interface{}, param chan interface{}) chan interface{}

RepeatChanParamFn - Generator that repeats a function with a channel as parameter indefinitely.

func RepeatChanParamsFn

func RepeatChanParamsFn(done, c <-chan interface{}, fn func(params ...chan interface{}) interface{}, params ...chan interface{}) chan interface{}

RepeatChanParamsFn - Generator that repeats a function with a list of channels as parameters indefinitely.

func RepeatFn

func RepeatFn(done <-chan interface{}, fn func() interface{}) chan interface{}

RepeatFn - Generator that repeats a function indefinitely.

func RepeatParamFn

func RepeatParamFn(done <-chan interface{}, fn func(param interface{}) interface{}, param interface{}) chan interface{}

RepeatParamFn - Generator that repeats a function with one parameter indefinitely.

func RepeatParamsFn

func RepeatParamsFn(done, c <-chan interface{}, fn func(params ...interface{}) interface{}, params ...interface{}) chan interface{}

RepeatParamsFn - Generator that repeats a function with a list of parameters indefinitely.

func Route

func Route(done <-chan interface{}, in <-chan interface{}, size int) (channels []chan interface{})

Route - Representation of the tee pattern. Takes a single input channel and an arbitrary number of output channels and duplicates each input into every output. When the input channel is closed, all outputs channels are closed. It allows to route or split an input into multiple outputs.

func Take

func Take(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{}

Take - Takes a defined number of values by num from a channel

func ToString

func ToString(done <-chan interface{}, valueStream <-chan interface{}) <-chan string

ToString - Converts any type of channel into a string channel

Types

type BCaster

type BCaster struct {
	MsgType string
	// contains filtered or unexported fields
}

BCaster - Is a broadcaster that allows to send events of type concurrency.Event to registered listeners using go concurrency patterns. Listeners are chan interfaces{} allowing for go concurrent communication. Closure of BCaster is handle by a concurrency.DoneHandler that allows to control they way a set of go routines are closed in order to prevent deadlocks and unwanted behaviour It detects when listeners are done and performs the required cleanup to ensure that events are sent to the active listeners.

func NewBCaster

func NewBCaster(dh *DoneHandler, MsgType string, opts ...BCasterOption) *BCaster

NewBCaster - Constructor

func (*BCaster) AddListener

func (b *BCaster) AddListener(dh *DoneHandler) chan interface{}

AddListener - creates a listener as chan interface{} with a DoneHandler in order to manage its closure and pass it to the requestor so it can be used in order to consume events from the Bcaster

func (*BCaster) Broadcast

func (b *BCaster) Broadcast(msg interface{})

Broadcast - Transforms a message into a concurrency.Event and broadcasts it to all the active registered listeners

func (*BCaster) ID

func (b *BCaster) ID() string

ID - retrieves the Id of the Bcaster

func (*BCaster) RemoveListener

func (b *BCaster) RemoveListener(listenerCh chan interface{})

RemoveListener - removes a listener

func (*BCaster) RemoveListenerByKey

func (b *BCaster) RemoveListenerByKey(key interface{})

RemoveListenerByKey - Removes a listener by its key value

type BCasterOption

type BCasterOption func(*BCaster)

BCasterOption - option to initialize the bcaster

func BCasterTransformFn

func BCasterTransformFn(fn func(b *BCaster, input interface{}) interface{}) BCasterOption

BCasterTransformFn - option to add a function to transform the output into the desired output structure to the BCaster

type DoneHandler

type DoneHandler struct {
	// contains filtered or unexported fields
}

DoneHandler - Handles when an object is done and it is ready to be closed.

func NewDoneHandler

func NewDoneHandler(opts ...DoneHandlerOption) *DoneHandler

NewDoneHandler - Constructor

func (*DoneHandler) Deadline

func (dh *DoneHandler) Deadline() *time.Time

Deadline - retrieves the Deadline of the DoneHandler

func (*DoneHandler) Done

func (dh *DoneHandler) Done() chan interface{}

Done - retrieves the Done channel of the DoneHandler

func (*DoneHandler) Err

func (dh *DoneHandler) Err() error

Err - retrieves the Error of the DoneHandler

func (*DoneHandler) GetDoneFunc

func (dh *DoneHandler) GetDoneFunc() func()

GetDoneFunc - retrieves the GetDone Function of the DoneHandler

func (*DoneHandler) ID

func (dh *DoneHandler) ID() string

ID - retrieves the Id of the DoneHandler

type DoneHandlerOption

type DoneHandlerOption func(*DoneHandler)

DoneHandlerOption - option to initialize the DoneHandler

func DoneHandlerWithDeadline

func DoneHandlerWithDeadline(deadline time.Time) DoneHandlerOption

DoneHandlerWithDeadline - option to add a dealine value to the DoneHandler

func DoneHandlerWithTimeout

func DoneHandlerWithTimeout(timeout time.Duration) DoneHandlerOption

DoneHandlerWithTimeout - option to add a timeout value to the DoneHandler. It sets a deadline with the value time.Now + timeout

type DoneManager

type DoneManager struct {
	// contains filtered or unexported fields
}

DoneManager - It manages a set of DoneHandlers with a layered approach, allowing to structure the way components are closed. It has an ID, a Done channel, a SortedMap of layers which key is the number of the layer and the item is represented with a SortedMap of DoneHandlers which item key is the DoneHandler.ID. It also have a donefn, a deadline property to setup a deadline to the entire set that the DoneManager is handling, a delay to space the closure of layers within the DoneManager, an err and a lock to ensure that the operations are threadsafe.

func NewDoneManager

func NewDoneManager(opts ...DoneManagerOption) *DoneManager

NewDoneManager - Constructor

func (*DoneManager) AddDoneHandler

func (dm *DoneManager) AddDoneHandler(dh *DoneHandler, layer int)

AddDoneHandler - adds a DoneHandler to the sorted map of the specified layer

func (*DoneManager) AddNewDoneHandler

func (dm *DoneManager) AddNewDoneHandler(layer int, opts ...DoneHandlerOption) *DoneHandler

AddNewDoneHandler - Creates a new DoneHandler with the relevant DoneHandlerOptions and adds it to the sorted map of the specified layer

func (*DoneManager) Deadline

func (dm *DoneManager) Deadline() *time.Time

Deadline - retrieves the Deadline of the DoneManager

func (*DoneManager) Done

func (dm *DoneManager) Done() chan interface{}

Done - retrieves the Done channel of the DoneManager

func (*DoneManager) Err

func (dm *DoneManager) Err() error

Err - retrieves the Error of the DoneManager

func (*DoneManager) GetDoneFunc

func (dm *DoneManager) GetDoneFunc() func()

GetDoneFunc - retrieves the GetDone Function of the DoneManager

func (*DoneManager) GetDoneHandler

func (dm *DoneManager) GetDoneHandler(opts ...QueryDoneHandlerOption) (*DoneHandler, int, bool)

GetDoneHandler - Retrieves a DoneHandler that meets the query defined within the QueryDoneHandlerOptions

func (*DoneManager) ID

func (dm *DoneManager) ID() string

ID - retrieves the Id of the DoneManager

func (*DoneManager) RemoveDoneHandler

func (dm *DoneManager) RemoveDoneHandler(opts ...QueryDoneHandlerOption) bool

RemoveDoneHandler - Removes a DoneHandler that meets the query defined within the QueryDoneHandlerOptions. If item is Not found it returns false

type DoneManagerOption

type DoneManagerOption func(*DoneManager)

DoneManagerOption - option to initialize the DoneManager

func DoneManagerWithDeadline

func DoneManagerWithDeadline(deadline time.Time) DoneManagerOption

DoneManagerWithDeadline - option to add a dealine value to the DoneManager

func DoneManagerWithDelay

func DoneManagerWithDelay(delay time.Duration) DoneManagerOption

DoneManagerWithDelay - option to add a delay value to the DoneManager in order to space in time the closure the different layers of the DoneHandler

func DoneManagerWithTimeout

func DoneManagerWithTimeout(timeout time.Duration) DoneManagerOption

DoneManagerWithTimeout - option to add a timeout value to the DoneManager. It sets a deadline with the value time.Now + timeout

type Event

type Event struct {
	InitMessage       *Message
	InMessageSequence Slice //Slice of Messages - we can keep track of the entire flow
	OutMessage        *Message
	Sequence          interface{}
}

Event - Struct that represents an event in the context of the concurrency package. Contains an InitMessage, An InMessageSequence to give traceability of the Event, an OutMessage, and a sequence number that is usefull to define the processing order of the event

type EventMap

type EventMap struct {
	InitMessage *Message
	Events      *Map
}

EventMap - A concurrency.Map of events with a common InitMessage

type EventSortedMap

type EventSortedMap struct {
	InitMessage *Message
	Events      *SortedMap
}

EventSortedMap - A concurrency.SortedMap of events with a common InitMessage

type Filter

type Filter struct {
	Name string
	// contains filtered or unexported fields
}

Filter - Unit that listen to an input channel (inputChan) and filter work. Closing the inputChan channel needs to be managed outside the Filter using a DoneHandler It has a DoneHandler to manage the lifecycle of the filter, a sequence to determine the order in which the filter might be run, an id of the filter, the name of the filter, the state of the filter and an output channel that emits the processed results for consumption.

func NewFilter

func NewFilter(name string, dh *DoneHandler, opts ...FilterOption) *Filter

NewFilter - Constructor

func (*Filter) Filter

func (f *Filter) Filter(fn func(input interface{}, params ...interface{}) bool, params ...interface{})

Filter - When the filter is in Processing state filters a defined function. When the Filter is in stop state the filter will still consume messages from the input channel and it will output the input event as no filter will be involved.

func (*Filter) GetState

func (f *Filter) GetState() State

GetState - retrieves the state of the Filter

func (*Filter) HasValidInputChan

func (f *Filter) HasValidInputChan() bool

HasValidInputChan - checks if the input channel is valid and not nil.

func (*Filter) ID

func (f *Filter) ID() string

ID - retrieves the Id of the Filter

func (*Filter) InputChannel

func (f *Filter) InputChannel() chan interface{}

InputChannel - retrieves the InputChannel of the Filter

func (*Filter) OutputChannel

func (f *Filter) OutputChannel() chan interface{}

OutputChannel - retrieves the OutputChannel of the Filter

func (*Filter) Sequence

func (f *Filter) Sequence() interface{}

Sequence - retrieves the Sequence of the Filter

func (*Filter) Start

func (f *Filter) Start()

Start - starts the filter.

func (*Filter) Stop

func (f *Filter) Stop()

Stop - stops the filter.

type FilterOption

type FilterOption func(*Filter)

FilterOption - option to initialize the filter

func FilterTransformFn

func FilterTransformFn(fn func(fr *Filter, input interface{}) interface{}) FilterOption

FilterTransformFn - option to add a function to transform the output into the desired output structure to the Filter

func FilterWithInputChannel

func FilterWithInputChannel(in chan interface{}) FilterOption

FilterWithInputChannel - option to add an inputchannel to the filter

func FilterWithSequence

func FilterWithSequence(seq interface{}) FilterOption

FilterWithSequence - option to add a sequence value to the filter

type Map

type Map struct {
	// contains filtered or unexported fields
}

Map is a map type that can be safely shared between goroutines that require read/write access to a map

func NewMap

func NewMap() *Map

NewMap creates a new concurrent map

func (*Map) Delete

func (cm *Map) Delete(key interface{})

Delete removes the value/key pair of a concurrent map item

func (*Map) Get

func (cm *Map) Get(key interface{}) (interface{}, bool)

Get retrieves the value for a concurrent map item

func (*Map) GetKeyByItem

func (cm *Map) GetKeyByItem(item interface{}) (interface{}, bool)

GetKeyByItem - retrieves the key for a concurrent map item by item

func (*Map) Iter

func (cm *Map) Iter() <-chan MapItem

Iter iterates over the items in a concurrent map Each item is sent over a channel, so that we can iterate over the map using the builtin range keyword

func (*Map) IterWithCancel

func (cm *Map) IterWithCancel(cancel chan interface{}) <-chan MapItem

IterWithCancel iterates over the items in a concurrent map Each item is sent over a channel, so that we can iterate over the map using the builtin range keyword allows to pass a cancel chan to make the iteration cancelable

func (*Map) Len

func (cm *Map) Len() int

Len - length of the map

func (*Map) Set

func (cm *Map) Set(key, value interface{})

Set adds an item to a concurrent map

type MapItem

type MapItem struct {
	Key   interface{}
	Value interface{}
}

MapItem contains a key/value pair item of a concurrent map

type Message

type Message struct {
	ID         string
	Message    interface{}
	TimeInNano int64
	MsgType    string
}

Message - Struct that represents an message in the context of the concurrency package. Contains the ID of the message, the Message, the time that was produced and the type of the message

type MsgMultiplexer

type MsgMultiplexer struct {
	// contains filtered or unexported fields
}

MsgMultiplexer - The default implementation MsgMultiplexer allows to create complex patterns where a Broadcaster can emit an event to multiple processors (consumers) that can potentially represent multiple processing systems, do the relevant calculation and multiplex the multiple outputs into a single channel for simplified consumption. Its main function is to Mulitplex a set of parallel processors that process a common initial concurrency.Event/ Message converging them into one channel,where the output is an Event which Event.OutMessage is a sortedmap of the output values of the processors grouped by initial concurrency.Event/Message and ordered by sequence value of each processor. Closure of MsgMultiplexer is handle by a concurrency.DoneHandler that allows to control they way a set of go routines are closed in order to prevent deadlocks and unwanted behaviour MsgMultiplexer outputs the multiplexed result in one channel using the channel bridge pattern. MsgMultiplexer default behaviour can be overridden by providing a MsgMultiplexerGetChannelItemKeyFn to provide the comparison key of the items of a channel, MsgMultiplexerGetLastRegKeyFn which should give the key to compare to. With these two items MsgMultiplexer has an algorithm to group the processed messages related to the same source into a SortedMap. MsgMultiplexerGetChannelItemSequenceFn allows to get the sequence order of the relevant channel and MsgMultiplexerTransformFn allows to transform the output into the desired structure.

func NewMsgMultiplexer

func NewMsgMultiplexer(dh *DoneHandler, opts ...MsgMultiplexerOption) *MsgMultiplexer

NewMsgMultiplexer - Constructor

func (*MsgMultiplexer) Get

func (mp *MsgMultiplexer) Get(key interface{}) (chan interface{}, bool)

Get - Retrieves a channel reqistered in the MsgMultiplexer by key

func (*MsgMultiplexer) ID

func (mp *MsgMultiplexer) ID() string

ID - retrieves the Id of the MsgMultiplexer

func (*MsgMultiplexer) Iter

func (mp *MsgMultiplexer) Iter() chan interface{}

Iter iterates over the items in the Multiplexer Each item is sent over a channel, so that we can iterate over the it using the builtin range keyword

func (*MsgMultiplexer) Sequence

func (mp *MsgMultiplexer) Sequence() interface{}

Sequence - retrieves the Sequence of the MsgMultiplexer

func (*MsgMultiplexer) Set

func (mp *MsgMultiplexer) Set(key interface{}, value chan interface{})

Set - Registers a channel in the MsgMultiplexer, starts processing it and logs the length of the registered channels map.

func (*MsgMultiplexer) Start

func (mp *MsgMultiplexer) Start()

Start - starts the main process of the MsgMultiplexer

type MsgMultiplexerOption

type MsgMultiplexerOption func(*MsgMultiplexer)

MsgMultiplexerOption - option to initialize the MsgMultiplexer

func MsgMultiplexerGetChannelItemKeyFn

func MsgMultiplexerGetChannelItemKeyFn(fn func(v interface{}) int64) MsgMultiplexerOption

MsgMultiplexerGetChannelItemKeyFn - option to add a function to resolve the key value of an item of the channel to the MsgMultiplexer

func MsgMultiplexerGetChannelItemSequenceFn

func MsgMultiplexerGetChannelItemSequenceFn(fn func(v interface{}) interface{}) MsgMultiplexerOption

MsgMultiplexerGetChannelItemSequenceFn - option to add a function to resolve the sequence value of an item of the channel to the MsgMultiplexer

func MsgMultiplexerGetLastRegKeyFn

func MsgMultiplexerGetLastRegKeyFn(fn func() int64) MsgMultiplexerOption

MsgMultiplexerGetLastRegKeyFn - option to add a function to resolve the last registered key value for later comparison with the key of an item of the channel to the MsgMultiplexer

func MsgMultiplexerSequence

func MsgMultiplexerSequence(seq interface{}) MsgMultiplexerOption

MsgMultiplexerSequence - option to add a sequence value to the MsgMultiplexer

func MsgMultiplexerTransformFn

func MsgMultiplexerTransformFn(fn func(mp *MsgMultiplexer, sm *SortedMap) interface{}) MsgMultiplexerOption

MsgMultiplexerTransformFn - option to add a function to transform the SortedMap output into the desired output structure to the MsgMultiplexer

type MultiMsgMultiplexer

type MultiMsgMultiplexer struct {
	BufferSize int
	MsgType    string
	// contains filtered or unexported fields
}

MultiMsgMultiplexer - The default implementation MultiMsgMultiplexer allows to create complex patterns where multiple Broadcasters can emit an event to multiple processors (consumers) that can potentially represent multiple processing systems, do the relevant calculation and multiplex the multiple outputs into a single channel for simplified consumption. Its main function is to Mulitplex a set of multiple messages that can be parallel processed and converge the set of initial concurrency.Event/ Message into a SortedMap ordered by messageType that can be sent on one channel. values of the processors grouped by initial concurrency.Event/Message and ordered by sequence value of each processor. Closure of MultiMsgMultiplexer is handle by a concurrency.DoneHandler that allows to control they way a set of go routines are closed in order to prevent deadlocks and unwanted behaviour MultiMsgMultiplexer outputs the multiplexed result in one channel using the channel bridge pattern. MultiMsgMultiplexer has several modes, the first one is to output the structure everytime a BCaster emits a message, giving an output of the last received message per BCaster. The second one is by using a timer to specify the sendPeriod, where the output represents the state of the last received messages at the specific point of time of the tick of the period. MultiMsgMultiplexer has also a waitForAll property that when true will just start emiting an output when the MultiMsgMultiplexer has at least received one message of each of the BCasters. MultiMsgMultiplexer has also a BufferSize property (default value is 1) where we can send the n number of last messages sent by each BCaster. MultiMsgMultiplexer default behaviour can be overridden by providing a MultiMsgMultiplexerItemKeyFn to the key of the items of a channel within the output SortedMap for a specific MessageType and MultiMsgMultiplexerTransformFn allows to transform the output into the desired structure.

func NewMultiMsgMultiplexer

func NewMultiMsgMultiplexer(dh *DoneHandler, msgType string, opts ...MultiMsgMultiplexerOption) *MultiMsgMultiplexer

NewMultiMsgMultiplexer - Constructor

func (*MultiMsgMultiplexer) AddItemToMap

func (mp *MultiMsgMultiplexer) AddItemToMap(v interface{}, m *SortedMap)

AddItemToMap - Adds an item to the SortedMap that is going to be send as part of the output, the SortedMap length is defined by the BufferSize property, allowing to retrieve the last n messages for a specific Messagetype.

func (*MultiMsgMultiplexer) Get

func (mp *MultiMsgMultiplexer) Get(key interface{}) (chan interface{}, bool)

Get - Retrieves a channel reqistered in the MultiMsgMultiplexer by key

func (*MultiMsgMultiplexer) ID

func (mp *MultiMsgMultiplexer) ID() string

ID - retrieves the Id of the MultiMsgMultiplexer

func (*MultiMsgMultiplexer) Iter

func (mp *MultiMsgMultiplexer) Iter() chan interface{}

Iter iterates over the items in the MultiMsgMultiplexer Each item is sent over a channel, so that we can iterate over the it using the builtin range keyword

func (*MultiMsgMultiplexer) Sequence

func (mp *MultiMsgMultiplexer) Sequence() interface{}

Sequence - retrieves the Sequence of the MultiMsgMultiplexer

func (*MultiMsgMultiplexer) Set

func (mp *MultiMsgMultiplexer) Set(key interface{}, value chan interface{})

Set - Registers a channel in the MultiMsgMultiplexer and starts processing it

func (*MultiMsgMultiplexer) Start

func (mp *MultiMsgMultiplexer) Start()

Start - starts the main process of the MultiMsgMultiplexer

type MultiMsgMultiplexerOption

type MultiMsgMultiplexerOption func(*MultiMsgMultiplexer)

MultiMsgMultiplexerOption - option to initialize the MultiMsgMultiplexer

func MultiMsgMultiplexerBufferSize

func MultiMsgMultiplexerBufferSize(bufferSize int) MultiMsgMultiplexerOption

MultiMsgMultiplexerBufferSize - option to add a buffersize value to the MultiMsgMultiplexer

func MultiMsgMultiplexerItemKeyFn

func MultiMsgMultiplexerItemKeyFn(fn func(v interface{}) int64) MultiMsgMultiplexerOption

MultiMsgMultiplexerItemKeyFn - option to add a function to resolve the set key value of an item of the channel to the map of the MultiMsgMultiplexer specific to a message

func MultiMsgMultiplexerSendPeriod

func MultiMsgMultiplexerSendPeriod(d *time.Duration) MultiMsgMultiplexerOption

MultiMsgMultiplexerSendPeriod - option to add a send period value to the MultiMsgMultiplexer

func MultiMsgMultiplexerSequence

func MultiMsgMultiplexerSequence(seq interface{}) MultiMsgMultiplexerOption

MultiMsgMultiplexerSequence - option to add a sequence value to the MultiMsgMultiplexer

func MultiMsgMultiplexerTransformFn

func MultiMsgMultiplexerTransformFn(fn func(mp *MultiMsgMultiplexer, sm *SortedMap) interface{}) MultiMsgMultiplexerOption

MultiMsgMultiplexerTransformFn - option to add a function to transform the SortedMap output into the desired output structure to the MultiMsgMultiplexer

func MultiMsgMultiplexerWaitForAll

func MultiMsgMultiplexerWaitForAll(waitforall bool) MultiMsgMultiplexerOption

MultiMsgMultiplexerWaitForAll - option to add a waitforall value to the MultiMsgMultiplexer

type MultiMsgResultItem

type MultiMsgResultItem struct {
	// contains filtered or unexported fields
}

MultiMsgResultItem - The result item to be stored in the output SortedMap

type Processor

type Processor struct {
	Name string
	// contains filtered or unexported fields
}

Processor - Unit that listen to an input channel (inputChan) and process work. Closing the inputChan channel needs to be managed outside the Processor using a DoneHandler It has a DoneHandler to manage the lifecycle of the processor, a sequence to determine the order in which the processor output results might be stored in a multiplexed pattern, an id of the processor, the name of the processor, the state of the processor and an output channel that emits the processed results for consumption.

func NewProcessor

func NewProcessor(name string, dh *DoneHandler, opts ...ProcessorOption) *Processor

NewProcessor - Constructor

func (*Processor) GetState

func (p *Processor) GetState() State

GetState - retrieves the state of the Processor

func (*Processor) HasValidInputChan

func (p *Processor) HasValidInputChan() bool

HasValidInputChan - checks if the input channel is valid and not nil.

func (*Processor) ID

func (p *Processor) ID() string

ID - retrieves the Id of the Processor

func (*Processor) InputChannel

func (p *Processor) InputChannel() chan interface{}

InputChannel - retrieves the InputChannel of the Processor

func (*Processor) OutputChannel

func (p *Processor) OutputChannel() chan interface{}

OutputChannel - retrieves the OutputChannel of the Processor

func (*Processor) Process

func (p *Processor) Process(f func(input interface{}, params ...interface{}) interface{}, params ...interface{})

Process - When the processor is in Processing state processes a defined function. When the Processor is in stop state the processor will still consume messages from the input channel but it will produce a nil output as no process will be involved.

func (*Processor) Sequence

func (p *Processor) Sequence() interface{}

Sequence - retrieves the Sequence of the Processor

func (*Processor) Start

func (p *Processor) Start()

Start - starts the processor.

func (*Processor) Stop

func (p *Processor) Stop()

Stop - stops the processor.

type ProcessorOption

type ProcessorOption func(*Processor)

ProcessorOption - option to initialize the processor

func ProcessorTransformFn

func ProcessorTransformFn(fn func(pr *Processor, input interface{}, result interface{}) interface{}) ProcessorOption

ProcessorTransformFn - option to add a function to transform the output into the desired output structure to the Processor

func ProcessorWithInputChannel

func ProcessorWithInputChannel(in chan interface{}) ProcessorOption

ProcessorWithInputChannel - option to add an inputchannel to the processor

func ProcessorWithSequence

func ProcessorWithSequence(seq interface{}) ProcessorOption

ProcessorWithSequence - option to add a sequence value to the processor

type QueryDoneHandler

type QueryDoneHandler struct {
	// contains filtered or unexported fields
}

QueryDoneHandler - Struct with a key and a layer that is used to query the DoneManager by specifying the key of a DoneHandler

type QueryDoneHandlerOption

type QueryDoneHandlerOption func(*QueryDoneHandler)

QueryDoneHandlerOption - option to initialize the QueryDoneHandler

func QueryDoneHandlerWithKey

func QueryDoneHandlerWithKey(key interface{}) QueryDoneHandlerOption

QueryDoneHandlerWithKey - option to add a key value to the QueryDoneHandler.

func QueryDoneHandlerWithLayer

func QueryDoneHandlerWithLayer(layer int) QueryDoneHandlerOption

QueryDoneHandlerWithLayer - option to add a layer value to the QueryDoneHandler.

type Slice

type Slice struct {
	// contains filtered or unexported fields
}

Slice type that can be safely shared between goroutines

func NewSlice

func NewSlice() *Slice

NewSlice creates a new concurrent slice

func (*Slice) Append

func (cs *Slice) Append(item interface{})

Append adds an item to the concurrent slice

func (*Slice) Cap

func (cs *Slice) Cap() int

Cap - capacity of the slice

func (*Slice) GetItemAtIndex

func (cs *Slice) GetItemAtIndex(index int) interface{}

GetItemAtIndex - Get item at index

func (*Slice) IndexOf

func (cs *Slice) IndexOf(item interface{}) int

IndexOf returns the index of a specific item

func (*Slice) Iter

func (cs *Slice) Iter() <-chan SliceItem

Iter iterates over the items in the concurrent slice Each item is sent over a channel, so that we can iterate over the slice using the builtin range keyword

func (*Slice) IterWithCancel

func (cs *Slice) IterWithCancel(cancel chan interface{}) <-chan SliceItem

IterWithCancel iterates over the items in the concurrent slice Each item is sent over a channel, so that we can iterate over the slice using the builtin range keyword allows to pass a cancel chan to make the iteration cancelable

func (*Slice) Len

func (cs *Slice) Len() int

Len - length of the slice

func (*Slice) RemoveItemAtIndex

func (cs *Slice) RemoveItemAtIndex(index int)

RemoveItemAtIndex removes the item at the specified index

type SliceItem

type SliceItem struct {
	Index int
	Value interface{}
}

SliceItem contains the index/value pair of an item in a concurrent slice

type SortedMap

type SortedMap struct {
	// contains filtered or unexported fields
}

SortedMap is a sorted map type that can be safely shared between goroutines that require read/write access to a map

func NewSortedMap

func NewSortedMap() *SortedMap

NewSortedMap creates a new concurrent sorted map

func (*SortedMap) Delete

func (sm *SortedMap) Delete(key interface{})

Delete removes the value/key pair of a concurrent sorted map item

func (*SortedMap) Get

func (sm *SortedMap) Get(key interface{}) (interface{}, bool)

Get retrieves the value for a concurrent map item

func (*SortedMap) GetByIndex

func (sm *SortedMap) GetByIndex(index int) (interface{}, bool)

GetByIndex retrieves the value for a concurrent map item given the index

func (*SortedMap) GetKeyByIndex

func (sm *SortedMap) GetKeyByIndex(index int) (interface{}, bool)

GetKeyByIndex retrieves the key for a concurrent map item given the index

func (*SortedMap) GetKeyByItem

func (sm *SortedMap) GetKeyByItem(item interface{}) (interface{}, bool)

GetKeyByItem retrieves the key for a concurrent map item

func (*SortedMap) GetMapItemByIndex

func (sm *SortedMap) GetMapItemByIndex(index int) (*SortedMapItem, bool)

GetMapItemByIndex retrieves the SortedMapItem for a concurrent map item given the index

func (*SortedMap) Iter

func (sm *SortedMap) Iter() <-chan SortedMapItem

Iter iterates over the items in a concurrent map Each item is sent over a channel, so that we can iterate over the map using the builtin range keyword

func (*SortedMap) IterWithCancel

func (sm *SortedMap) IterWithCancel(cancel chan interface{}) <-chan SortedMapItem

IterWithCancel iterates over the items in a concurrent map Each item is sent over a channel, so that we can iterate over the map using the builtin range keyword allows to pass a cancel chan to make the iteration cancelable

func (*SortedMap) Len

func (sm *SortedMap) Len() int

Len - length of the sortedmap

func (*SortedMap) Set

func (sm *SortedMap) Set(key, value interface{})

Set adds an item to a concurrent map

type SortedMapItem

type SortedMapItem struct {
	Key   interface{}
	Value interface{}
}

SortedMapItem contains a key/value pair item of a concurrent map

type SortedSlice

type SortedSlice struct {
	// contains filtered or unexported fields
}

SortedSlice type that can be safely shared between goroutines

func NewSortedSlice

func NewSortedSlice() *SortedSlice

NewSortedSlice creates a new concurrent slice

func (*SortedSlice) Append

func (cs *SortedSlice) Append(item interface{})

Append adds an item to the concurrent slice

func (*SortedSlice) Cap

func (cs *SortedSlice) Cap() int

Cap - capacity of the slice

func (*SortedSlice) GetItemAtIndex

func (cs *SortedSlice) GetItemAtIndex(index int) interface{}

GetItemAtIndex - Get item at index

func (*SortedSlice) IndexOf

func (cs *SortedSlice) IndexOf(item interface{}) int

IndexOf returns the index of a specific item

func (*SortedSlice) Iter

func (cs *SortedSlice) Iter() <-chan SortedSliceItem

Iter iterates over the items in the concurrent slice Each item is sent over a channel, so that we can iterate over the slice using the builtin range keyword

func (*SortedSlice) IterWithCancel

func (cs *SortedSlice) IterWithCancel(cancel chan interface{}) <-chan SortedSliceItem

IterWithCancel iterates over the items in the concurrent slice Each item is sent over a channel, so that we can iterate over the slice using the builtin range keyword allows to pass a cancel chan to make the iteration cancelable

func (*SortedSlice) Len

func (cs *SortedSlice) Len() int

Len - length of the slice

func (*SortedSlice) RemoveItemAtIndex

func (cs *SortedSlice) RemoveItemAtIndex(index int)

RemoveItemAtIndex removes the item at the specified index

type SortedSliceItem

type SortedSliceItem struct {
	Index int
	Value interface{}
}

SortedSliceItem contains the index/value pair of an item in a concurrent slice

type State

type State int

State establised the state of the relevant object

const (
	Init       State = iota //Init state
	Processing              // Processing state
	Stopped                 // Stopped state
	Closed                  // Closed state
)

State enum values

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL