conch

package module
v1.2.0-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2023 License: GPL-3.0 Imports: 10 Imported by: 0

README

conch

Golang library to implement various stream based concurency patterns

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Breaker

func Breaker[P any, R any](
	ctx context.Context,
	inStream <-chan Request[P, R],
	nbFailureToOpen int,
	nbSuccessToClose int,
	halfOpenTimeout time.Duration,
	breakerError error,
) <-chan Request[P, R]

func Buffer

func Buffer[T any](
	ctx context.Context,
	inStream <-chan T,
	size int,
) <-chan T

Buffer chains a buffered channel to inStream. This may help inStreams writer to be less frequently in a blocking state.

Keep in mind that as soon as buffer is full it behave exactly as if it wasn't buffered at all.

func Chain

func Chain[T any](
	ctx context.Context,
	inStream ...<-chan T,
) <-chan T

Chain performs streams concatenation. It generates a new stream that will produce all the elements from the streams consumed in the order they are provided.

For example if

s1 provides elements s1e1, s1e2, s1e3 then close
s2 provides elements s2e1, s2e2 then close

the resulting stream will produce elements

s1e1, s1e2, s1e3, s2e1, s2e2 then close

pay attention all input streams must be closed and in the given example s2 elements will be streamed to output only if s1 is closed.

The returned stream will be closed when all input streams are closed or context is canceled.

func Combinations

func Combinations[T any](
	ctx context.Context,
	elements []T,
	r int,
) <-chan []T

Combinations produces all combinations of elements from the given slice.

func CombinationsIndexes

func CombinationsIndexes(
	ctx context.Context,
	n, r int,
) <-chan []int

CombinationsIndexes produces all combinations of indexes.

func CombinationsIndexesWithReplacement

func CombinationsIndexesWithReplacement(
	ctx context.Context,
	n, r uint8,
) <-chan []uint8

CombinationsIndexesWithReplacement produces all combinations of indexes with replacement.

func CombinationsWithReplacement

func CombinationsWithReplacement[T any](
	ctx context.Context,
	elements []T, r uint8,
) <-chan []T

CombinationsWithReplacement produces all combinations of elements with replacement.

func Consume

func Consume[T any](
	ctx context.Context,
	inStream <-chan T,
	consumer func(ctx context.Context, v T),
)

func ContextBreaker

func ContextBreaker[T any](
	ctx context.Context,
	inStream <-chan T,
) <-chan T

ContextBreaker creates an output stream that copy input stream and that closeMe when the context is done or input stream is closed.

This is useful to cut properly stream flows, especially when down stream
enter some operators that are no longer sensitive to context termination.

func Count

func Count[T any](
	inStream <-chan T,
	ctx context.Context,
) (count <-chan int)

func Counting

func Counting[T any, R Integer](
	ctx context.Context,
	inStream <-chan T,
) <-chan R

func Delay

func Delay[T any](
	ctx context.Context,
	durationGenerator DurationGenerator,
	inStream <-chan T,
) <-chan T

Delay an output stream that replicate inputs stream elements with a delay constant. Provided duration generator defines the minimal time interval between each element emission.

Please note it can be used as a time bases event generator when pressure is low. Otherwise, it can be used to create pressure simulate processing for example.

func Distribute

func Distribute[T any](
	ctx context.Context,
	inStream ...<-chan T,
) <-chan T

Distribute generates a stream by sequentially picking elements from the input streams.

Input streams can provide different number of elements. Input streams MUST differ (cannot be te same stream).

Output streams is closed when all input streams have been exhausted and closed or the context is canceled.

func FairFanIn

func FairFanIn[T any](
	ctx context.Context,
	inStream ...<-chan T,
) <-chan T

FairFanIn merges multiple input streams into a single output stream with unbalanced priority.

Priority is defined by the input stream rank, from lowest to highest.
Function will panic if no input stream are provided.

The more back pressure is present on output stream (i.e. slow consumption)
the more unfair priority effect will occur.

If two input stream are provided, they will be merged into one output stream
with exact same priority.

If more are provided, stream in third position will have a twice the
throughput of the stream in second position.

throughput(inputSteam#n) = 2 * throughput(inputSteam#(n-1))

If less back pressure is present, the more priority is balanced.

Output stream is closed when all input stream are closed or context is done.

func FanIn

func FanIn[T any](
	ctx context.Context,
	inStreams ...<-chan T,
) <-chan T

FanIn multiplexes input streams into a single output stream with balanced priority.

Output stream is closed when the context is done or all input streams are
closed.

func Filter

func Filter[T any](
	ctx context.Context,
	inStream <-chan T,
	filter func(ctx context.Context, v T) bool,
) <-chan T

func Keep

func Keep[T any](
	ctx context.Context,
	inStream <-chan T,
	count int,
) <-chan T

Keep generates a stream by keeping the first count items from the input stream.

Output stream is closed when the input stream is closed or context ctx is canceled.

func MirrorHighThroughput

func MirrorHighThroughput[T any](
	ctx context.Context,
	inStream <-chan T,
	replicaCount int,
) []<-chan T

MirrorHighThroughput replicate input stream to multiple output streams with maximum throughput.

Because its design requires fewer copy steps in the linear chain of flows
this version is faster. But it induces a greater latency disparity between
the input stream and the outputs. Latency is proportional to the rank of the
output stream.

See MirrorLowLatency if you need the lowest latency between input stream and
outputs.

func MirrorLowLatency

func MirrorLowLatency[T any](
	ctx context.Context,
	inStream <-chan T,
	replicaCount int,
) []<-chan T

MirrorLowLatency replicate input stream to multiple output streams with minimum latency.

Because its design uses a binary tree to separate the input stream from the output streams, this version offers a homogeneous latency. But it induces more copying steps and therefore a lower throughput.

See MirrorHighThroughput if you need the highest rate between input stream
and outputs.

func PermutationsIndexes

func PermutationsIndexes(
	ctx context.Context,
	n int,
) <-chan []int

PermutationsIndexes provides all permutations indexes.

func PermutationsOf

func PermutationsOf[T any](
	ctx context.Context,
	elements []T,
) <-chan []T

PermutationsOf provides all permutations of a list of elements.

func Pressurize

func Pressurize[T any](
	ctx context.Context,
	inStream <-chan T,
	delay time.Duration,
) <-chan T

Pressurize creates some initial pressure by creating a delay before to open the stream and copy it to output

func ProcessorPool

func ProcessorPool[From, To any](
	ctx context.Context,
	concurrency int,
	processorFunc Processor[From, To],
	inStream <-chan From,
) (outputStream <-chan To)

ProcessorPool launch the given processor concurrently and multiplexes the outputs in a single output stream.

func RateLimit

func RateLimit[T any](
	ctx context.Context,
	inStream <-chan T,
	ratePerSecond int,
	option ...ratelimit.Option,
) <-chan T

func Reorder

func Reorder[Value Ordered, Payload any](
	ctx context.Context,
	inStream <-chan Indexed[Value, Payload],
	option ...ReorderOption,
) <-chan Indexed[Value, Payload]

Reorder bufferize indexed input stream.

func RequestProcessor

func RequestProcessor[P any, R any](
	ctx context.Context,
	wg *sync.WaitGroup,
	inStream <-chan Request[P, R],
	processing func(context.Context, P) (R, error),
	id string,
)

func RequestProcessorPool

func RequestProcessorPool[P any, R any](
	ctx context.Context,
	wg *sync.WaitGroup,
	inStream <-chan Request[P, R],
	processing func(context.Context, P) (R, error),
	count int,
	id string,
)

func Requester

func Requester[P, R any](ctx context.Context) (
	func(
		context.Context,
		P,
	) (
		R,
		error,
	),
	<-chan Request[P, R],
)

func RequesterC

func RequesterC[P any, R any](
	ctx context.Context,
	wg *sync.WaitGroup,
	chain ChainFunc[Request[P, R]],
) func(context.Context, P) (R, error)

func Separate

func Separate[V any](
	ctx context.Context,
	inSTream <-chan ValErrorPair[V],
) (<-chan V, <-chan error)

func SetIndex

func SetIndex[T any, C Integer](
	ctx context.Context,
	inStream <-chan T,
) <-chan Indexed[C, T]

func Shuffle

func Shuffle[T any](
	ctx context.Context,
	rnd *rand.Rand,
	inStream ...<-chan T,
) <-chan T

Shuffle generates a stream by randomly picking elements from the input streams.

Input streams can provide different number of elements. Input streams MUST differ (same stream cannot be present twice).

Output streams is closed when all input streams have been exhausted and closed or the context is canceled.

func ShuffleOrder

func ShuffleOrder[T any](
	ctx context.Context,
	maxBufferSize int,
	rnd *rand.Rand,
	inStream <-chan T,
) <-chan T

func Sieve

func Sieve[T any](
	ctx context.Context,
	choice IndexGetter[T],
	count int,
	inStream <-chan T,
) []<-chan T

func SieveTree

func SieveTree[T any](
	ctx context.Context,
	choice IndexGetter[T],
	count int,
	inStream <-chan T,
) []<-chan T

func Skip

func Skip[T any](
	ctx context.Context,
	inStream <-chan T,
	count int,
) <-chan T

Skip generates a stream by skipping the first count items from the input stream.

Output stream is closed when the input stream is closed or context ctx is canceled.

func SpawnRequestProcessor

func SpawnRequestProcessor[P any, R any](
	ctx context.Context,
	inStream <-chan Request[P, R],
	processing func(context.Context, P) (R, error),
	id string,
) func()

func SpawnRequestProcessorsPool

func SpawnRequestProcessorsPool[P any, R any](
	ctx context.Context, inStream <-chan Request[P, R],
	processing func(context.Context, P) (R, error), count int, id string,
) func()

func Spread

func Spread[V Integer, Payload any](
	ctx context.Context,
	inputStream <-chan IndexedInteger[V, Payload],
	count int,
) []<-chan Payload

func ToList

func ToList[T any](
	ctx context.Context,
	inStream <-chan T,
	expectedSize int,
) <-chan []T

func ToMap

func ToMap[In any, K comparable, V any](
	ctx context.Context,
	inStream <-chan In,
	mapper func(ctx context.Context, v In) (K, V),
	expectedCount int,
) <-chan map[K]V

func Transform

func Transform[In any, Out any](
	ctx context.Context,
	inStream <-chan In,
	transformer func(ctx context.Context, in In) Out,
) <-chan Out

func UnfairFanIn

func UnfairFanIn[T any](
	ctx context.Context,
	inStream ...<-chan T,
) <-chan T

UnfairFanIn merges multiple input streams into a single output stream with unfair priority balance.

Priority is defined by the input stream rank, from lowest to highest. Function will panic if no input stream are provided.

The more back pressure is present on output stream (i.e. slow consumption) the more priority effect will occur.

* inputSteam#n is got less priority than inputSteam#(n+1)

If no back pressure is present, the more priority looks balanced.

Output stream is closed when all input stream are closed or context is done.

func UnsetIndex

func UnsetIndex[T any, C Integer](
	ctx context.Context,
	inStream <-chan Indexed[C, T],
) <-chan T

func Valve

func Valve[T any](
	ctx context.Context,
	inStream <-chan T,
	isOpen bool,
) (
	openIt func(),
	closeIt func(),
	outStream <-chan T,
)

Valve builds an opened valve between two streams.

func Zip

func Zip[A, B any](
	ctx context.Context,
	inStreamA <-chan A,
	inStreamB <-chan B,
) <-chan Pair[A, B]

Types

type ChainFunc

type ChainFunc[T any] func(
	ctx context.Context,
	group *sync.WaitGroup,
	inStream <-chan T,
)

func Consumer

func Consumer[T any](
	f func(context.Context, T),
) ChainFunc[T]

func CountingC

func CountingC[T any, R Integer](
	chain ChainFunc[R],
) ChainFunc[T]

func KeepC

func KeepC[T any](
	count int,
	chain ChainFunc[T],
) ChainFunc[T]

func MirrorHighThroughputC

func MirrorHighThroughputC[T any](
	chain ...ChainFunc[T],
) ChainFunc[T]

func MirrorLowLatencyC

func MirrorLowLatencyC[T any](
	chain ...ChainFunc[T],
) ChainFunc[T]

func PressurizeC

func PressurizeC[T any](
	delay time.Duration,
	chain ChainFunc[T],
) ChainFunc[T]

func ProcessorPoolC

func ProcessorPoolC[From, To any](
	concurrency int,
	processorFunc Processor[From, To],
	chain ChainFunc[To],
) ChainFunc[From]

func RateLimitC

func RateLimitC[T any](
	ratePerSecond int,
	chain ChainFunc[T],
	option ...ratelimit.Option,
) ChainFunc[T]

func ReorderC

func ReorderC[Value Ordered, Payload any](
	chain ChainFunc[Indexed[Value, Payload]],
	options ...ReorderOption,
) ChainFunc[Indexed[Value, Payload]]

func RequestProcessorPoolC

func RequestProcessorPoolC[P any, R any](
	processing func(context.Context, P) (R, error),
	count int,
	id string,
) ChainFunc[Request[P, R]]

func SetIndexC

func SetIndexC[T any, C Integer](
	chain ChainFunc[Indexed[C, T]],
) ChainFunc[T]

func SieveC

func SieveC[T any](
	choice IndexGetter[T],
	chain ...ChainFunc[T],
) ChainFunc[T]

func SieveTreeC

func SieveTreeC[T any](
	choice IndexGetter[T],
	chain ...ChainFunc[T],
) ChainFunc[T]

func SkipC

func SkipC[T any](
	count int,
	chain ChainFunc[T],
) ChainFunc[T]

func UnsetIndexC

func UnsetIndexC[T any, C Integer](
	chain ChainFunc[T],
) ChainFunc[Indexed[C, T]]

type CircuitState

type CircuitState uint32
const (
	CircuitUndefined CircuitState = iota // UNDEFINED
	CircuitOpen                          // OPEN
	CircuitClosed                        // CLOSED
	CircuitHalfOpen                      // HALF_OPEN
)

func (CircuitState) String

func (i CircuitState) String() string

type Comparable

type Comparable interface {
	LessThan(other Comparable) bool
}

type Complex

type Complex interface {
	~complex64 | ~complex128
}

Complex is a constraint that permits any complex numeric type. If future releases of Go add new predeclared complex numeric types, this constraint will be modified to include them.

type DurationGenerator

type DurationGenerator interface {
	Next() time.Duration
}

type Float

type Float interface {
	~float32 | ~float64
}

Float is a constraint that permits any floating-point type. If future releases of Go add new predeclared floating-point types, this constraint will be modified to include them.

type Gen

type Gen struct {
	// contains filtered or unexported fields
}

func (*Gen) Next

func (g *Gen) Next() time.Duration

type Generator

type Generator[T any] func(context.Context) (output <-chan T, err error)

type IndexGetter

type IndexGetter[O any] func(O) int

type Indexed

type Indexed[V Ordered, Payload any] struct {
	Index   V
	Payload Payload
}

type IndexedInteger

type IndexedInteger[V Integer, Payload any] struct {
	Index   V
	Payload Payload
}

type Integer

type Integer interface {
	Signed | Unsigned
}

Integer is a constraint that permits any integer type. If future releases of Go add new predeclared integer types, this constraint will be modified to include them.

type LeakyBucket

type LeakyBucket struct {
	// contains filtered or unexported fields
}

func NewLeakyBucket

func NewLeakyBucket(
	count int,
	interval time.Duration,
) *LeakyBucket

func (*LeakyBucket) Incr

func (b *LeakyBucket) Incr() bool

type Ordered

type Ordered interface {
	Integer | Float | ~string
}

Ordered is a constraint that permits any ordered type: any type that supports the operators < <= >= >. If future releases of Go add new ordered types, this constraint will be modified to include them.

type Pair

type Pair[A, B any] struct {
	A, B any
}

type Processor

type Processor[From, To any] func(
	ctx context.Context, input <-chan From,
) <-chan To

Processor defines a function that read from a single input stream and produce elements to the resulting output stream.

func GetProcessorFor

func GetProcessorFor[From, To any](
	f func(context.Context, From) To,
) Processor[From, To]

GetProcessorFor return an async processor form the given function f.

type ReorderOption

type ReorderOption interface {
	// contains filtered or unexported methods
}

ReorderOption defines options for Reorder.

func WithBufferSize

func WithBufferSize(d int) ReorderOption

WithBufferSize sets internal buffer size.

func WithOrderReversed

func WithOrderReversed() ReorderOption

WithOrderReversed option for reversing order.

type Request

type Request[P any, R any] struct {
	P      P
	ChResp chan<- ValErrorPair[R]
}

type Signed

type Signed interface {
	~int | ~int8 | ~int16 | ~int32 | ~int64
}

Signed is a constraint that permits any signed integer type. If future releases of Go add new predeclared signed integer types, this constraint will be modified to include them.

type Unsigned

type Unsigned interface {
	~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uintptr
}

Unsigned is a constraint that permits any unsigned integer type. If future releases of Go add new predeclared unsigned integer types, this constraint will be modified to include them.

type ValErrorPair

type ValErrorPair[V any] struct {
	V   V
	Err error
}

Directories

Path Synopsis
internal
lab

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL