Documentation ¶
Index ¶
- func Breaker[P any, R any](ctx context.Context, inStream <-chan Request[P, R], nbFailureToOpen int, ...) <-chan Request[P, R]
- func Buffer[T any](ctx context.Context, inStream <-chan T, size int) <-chan T
- func Chain[T any](ctx context.Context, inStream ...<-chan T) <-chan T
- func Combinations[T any](ctx context.Context, elements []T, r int) <-chan []T
- func CombinationsIndexes(ctx context.Context, n, r int) <-chan []int
- func CombinationsIndexesWithReplacement(ctx context.Context, n, r uint8) <-chan []uint8
- func CombinationsWithReplacement[T any](ctx context.Context, elements []T, r uint8) <-chan []T
- func Consume[T any](ctx context.Context, inStream <-chan T, ...)
- func ContextBreaker[T any](ctx context.Context, inStream <-chan T) <-chan T
- func Count[T any](inStream <-chan T, ctx context.Context) (count <-chan int)
- func Counting[T any, R Integer](ctx context.Context, inStream <-chan T) <-chan R
- func Delay[T any](ctx context.Context, durationGenerator DurationGenerator, inStream <-chan T) <-chan T
- func Distribute[T any](ctx context.Context, inStream ...<-chan T) <-chan T
- func FairFanIn[T any](ctx context.Context, inStream ...<-chan T) <-chan T
- func FanIn[T any](ctx context.Context, inStreams ...<-chan T) <-chan T
- func Filter[T any](ctx context.Context, inStream <-chan T, ...) <-chan T
- func Keep[T any](ctx context.Context, inStream <-chan T, count int) <-chan T
- func MirrorHighThroughput[T any](ctx context.Context, inStream <-chan T, replicaCount int) []<-chan T
- func MirrorLowLatency[T any](ctx context.Context, inStream <-chan T, replicaCount int) []<-chan T
- func PermutationsIndexes(ctx context.Context, n int) <-chan []int
- func PermutationsOf[T any](ctx context.Context, elements []T) <-chan []T
- func Pressurize[T any](ctx context.Context, inStream <-chan T, delay time.Duration) <-chan T
- func ProcessorPool[From, To any](ctx context.Context, concurrency int, processorFunc Processor[From, To], ...) (outputStream <-chan To)
- func RateLimit[T any](ctx context.Context, inStream <-chan T, ratePerSecond int, ...) <-chan T
- func Reorder[Value Ordered, Payload any](ctx context.Context, inStream <-chan Indexed[Value, Payload], ...) <-chan Indexed[Value, Payload]
- func RequestProcessor[P any, R any](ctx context.Context, wg *sync.WaitGroup, inStream <-chan Request[P, R], ...)
- func RequestProcessorPool[P any, R any](ctx context.Context, wg *sync.WaitGroup, inStream <-chan Request[P, R], ...)
- func Requester[P, R any](ctx context.Context) (func(context.Context, P) (R, error), <-chan Request[P, R])
- func RequesterC[P any, R any](ctx context.Context, wg *sync.WaitGroup, chain ChainFunc[Request[P, R]]) func(context.Context, P) (R, error)
- func Separate[V any](ctx context.Context, inSTream <-chan ValErrorPair[V]) (<-chan V, <-chan error)
- func SetIndex[T any, C Integer](ctx context.Context, inStream <-chan T) <-chan Indexed[C, T]
- func Shuffle[T any](ctx context.Context, rnd *rand.Rand, inStream ...<-chan T) <-chan T
- func ShuffleOrder[T any](ctx context.Context, maxBufferSize int, rnd *rand.Rand, inStream <-chan T) <-chan T
- func Sieve[T any](ctx context.Context, choice IndexGetter[T], count int, inStream <-chan T) []<-chan T
- func SieveTree[T any](ctx context.Context, choice IndexGetter[T], count int, inStream <-chan T) []<-chan T
- func Skip[T any](ctx context.Context, inStream <-chan T, count int) <-chan T
- func SpawnRequestProcessor[P any, R any](ctx context.Context, inStream <-chan Request[P, R], ...) func()
- func SpawnRequestProcessorsPool[P any, R any](ctx context.Context, inStream <-chan Request[P, R], ...) func()
- func Spread[V Integer, Payload any](ctx context.Context, inputStream <-chan IndexedInteger[V, Payload], count int) []<-chan Payload
- func ToList[T any](ctx context.Context, inStream <-chan T, expectedSize int) <-chan []T
- func ToMap[In any, K comparable, V any](ctx context.Context, inStream <-chan In, ...) <-chan map[K]V
- func Transform[In any, Out any](ctx context.Context, inStream <-chan In, ...) <-chan Out
- func UnfairFanIn[T any](ctx context.Context, inStream ...<-chan T) <-chan T
- func UnsetIndex[T any, C Integer](ctx context.Context, inStream <-chan Indexed[C, T]) <-chan T
- func Valve[T any](ctx context.Context, inStream <-chan T, isOpen bool) (openIt func(), closeIt func(), outStream <-chan T)
- func Zip[A, B any](ctx context.Context, inStreamA <-chan A, inStreamB <-chan B) <-chan Pair[A, B]
- type ChainFunc
- func Consumer[T any](f func(context.Context, T)) ChainFunc[T]
- func CountingC[T any, R Integer](chain ChainFunc[R]) ChainFunc[T]
- func KeepC[T any](count int, chain ChainFunc[T]) ChainFunc[T]
- func MirrorHighThroughputC[T any](chain ...ChainFunc[T]) ChainFunc[T]
- func MirrorLowLatencyC[T any](chain ...ChainFunc[T]) ChainFunc[T]
- func PressurizeC[T any](delay time.Duration, chain ChainFunc[T]) ChainFunc[T]
- func ProcessorPoolC[From, To any](concurrency int, processorFunc Processor[From, To], chain ChainFunc[To]) ChainFunc[From]
- func RateLimitC[T any](ratePerSecond int, chain ChainFunc[T], option ...ratelimit.Option) ChainFunc[T]
- func ReorderC[Value Ordered, Payload any](chain ChainFunc[Indexed[Value, Payload]], options ...ReorderOption) ChainFunc[Indexed[Value, Payload]]
- func RequestProcessorPoolC[P any, R any](processing func(context.Context, P) (R, error), count int, id string) ChainFunc[Request[P, R]]
- func SetIndexC[T any, C Integer](chain ChainFunc[Indexed[C, T]]) ChainFunc[T]
- func SieveC[T any](choice IndexGetter[T], chain ...ChainFunc[T]) ChainFunc[T]
- func SieveTreeC[T any](choice IndexGetter[T], chain ...ChainFunc[T]) ChainFunc[T]
- func SkipC[T any](count int, chain ChainFunc[T]) ChainFunc[T]
- func UnsetIndexC[T any, C Integer](chain ChainFunc[T]) ChainFunc[Indexed[C, T]]
- type CircuitState
- type Comparable
- type Complex
- type DurationGenerator
- type Float
- type Gen
- type Generator
- type IndexGetter
- type Indexed
- type IndexedInteger
- type Integer
- type LeakyBucket
- type Ordered
- type Pair
- type Processor
- type ReorderOption
- type Request
- type Signed
- type Unsigned
- type ValErrorPair
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Buffer ¶
Buffer chains a buffered channel to inStream. This may help inStreams writer to be less frequently in a blocking state.
Keep in mind that as soon as buffer is full it behave exactly as if it wasn't buffered at all.
func Chain ¶
Chain performs streams concatenation. It generates a new stream that will produce all the elements from the streams consumed in the order they are provided.
For example if
s1 provides elements s1e1, s1e2, s1e3 then close s2 provides elements s2e1, s2e2 then close
the resulting stream will produce elements
s1e1, s1e2, s1e3, s2e1, s2e2 then close
pay attention all input streams must be closed and in the given example s2 elements will be streamed to output only if s1 is closed.
The returned stream will be closed when all input streams are closed or context is canceled.
func Combinations ¶
Combinations produces all combinations of elements from the given slice.
func CombinationsIndexes ¶
CombinationsIndexes produces all combinations of indexes.
func CombinationsIndexesWithReplacement ¶
CombinationsIndexesWithReplacement produces all combinations of indexes with replacement.
func CombinationsWithReplacement ¶
CombinationsWithReplacement produces all combinations of elements with replacement.
func ContextBreaker ¶
ContextBreaker creates an output stream that copy input stream and that closeMe when the context is done or input stream is closed.
This is useful to cut properly stream flows, especially when down stream enter some operators that are no longer sensitive to context termination.
func Delay ¶
func Delay[T any]( ctx context.Context, durationGenerator DurationGenerator, inStream <-chan T, ) <-chan T
Delay an output stream that replicate inputs stream elements with a delay constant. Provided duration generator defines the minimal time interval between each element emission.
Please note it can be used as a time bases event generator when pressure is low. Otherwise, it can be used to create pressure simulate processing for example.
func Distribute ¶
Distribute generates a stream by sequentially picking elements from the input streams.
Input streams can provide different number of elements. Input streams MUST differ (cannot be te same stream).
Output streams is closed when all input streams have been exhausted and closed or the context is canceled.
func FairFanIn ¶
FairFanIn merges multiple input streams into a single output stream with unbalanced priority.
Priority is defined by the input stream rank, from lowest to highest. Function will panic if no input stream are provided. The more back pressure is present on output stream (i.e. slow consumption) the more unfair priority effect will occur. If two input stream are provided, they will be merged into one output stream with exact same priority. If more are provided, stream in third position will have a twice the throughput of the stream in second position. throughput(inputSteam#n) = 2 * throughput(inputSteam#(n-1)) If less back pressure is present, the more priority is balanced. Output stream is closed when all input stream are closed or context is done.
func FanIn ¶
FanIn multiplexes input streams into a single output stream with balanced priority.
Output stream is closed when the context is done or all input streams are closed.
func Keep ¶
Keep generates a stream by keeping the first count items from the input stream.
Output stream is closed when the input stream is closed or context ctx is canceled.
func MirrorHighThroughput ¶
func MirrorHighThroughput[T any]( ctx context.Context, inStream <-chan T, replicaCount int, ) []<-chan T
MirrorHighThroughput replicate input stream to multiple output streams with maximum throughput.
Because its design requires fewer copy steps in the linear chain of flows this version is faster. But it induces a greater latency disparity between the input stream and the outputs. Latency is proportional to the rank of the output stream. See MirrorLowLatency if you need the lowest latency between input stream and outputs.
func MirrorLowLatency ¶
func MirrorLowLatency[T any]( ctx context.Context, inStream <-chan T, replicaCount int, ) []<-chan T
MirrorLowLatency replicate input stream to multiple output streams with minimum latency.
Because its design uses a binary tree to separate the input stream from the output streams, this version offers a homogeneous latency. But it induces more copying steps and therefore a lower throughput.
See MirrorHighThroughput if you need the highest rate between input stream and outputs.
func PermutationsIndexes ¶
PermutationsIndexes provides all permutations indexes.
func PermutationsOf ¶
PermutationsOf provides all permutations of a list of elements.
func Pressurize ¶
Pressurize creates some initial pressure by creating a delay before to open the stream and copy it to output
func ProcessorPool ¶
func ProcessorPool[From, To any]( ctx context.Context, concurrency int, processorFunc Processor[From, To], inStream <-chan From, ) (outputStream <-chan To)
ProcessorPool launch the given processor concurrently and multiplexes the outputs in a single output stream.
func Reorder ¶
func Reorder[Value Ordered, Payload any]( ctx context.Context, inStream <-chan Indexed[Value, Payload], option ...ReorderOption, ) <-chan Indexed[Value, Payload]
Reorder bufferize indexed input stream.
func RequestProcessor ¶
func RequestProcessorPool ¶
func RequesterC ¶
func Separate ¶
func Separate[V any]( ctx context.Context, inSTream <-chan ValErrorPair[V], ) (<-chan V, <-chan error)
func Shuffle ¶
Shuffle generates a stream by randomly picking elements from the input streams.
Input streams can provide different number of elements. Input streams MUST differ (same stream cannot be present twice).
Output streams is closed when all input streams have been exhausted and closed or the context is canceled.
func ShuffleOrder ¶
func Sieve ¶
func Sieve[T any]( ctx context.Context, choice IndexGetter[T], count int, inStream <-chan T, ) []<-chan T
func SieveTree ¶
func SieveTree[T any]( ctx context.Context, choice IndexGetter[T], count int, inStream <-chan T, ) []<-chan T
func Skip ¶
Skip generates a stream by skipping the first count items from the input stream.
Output stream is closed when the input stream is closed or context ctx is canceled.
func SpawnRequestProcessor ¶
func UnfairFanIn ¶
UnfairFanIn merges multiple input streams into a single output stream with unfair priority balance.
Priority is defined by the input stream rank, from lowest to highest. Function will panic if no input stream are provided.
The more back pressure is present on output stream (i.e. slow consumption) the more priority effect will occur.
* inputSteam#n is got less priority than inputSteam#(n+1)
If no back pressure is present, the more priority looks balanced.
Output stream is closed when all input stream are closed or context is done.
func UnsetIndex ¶
Types ¶
type ChainFunc ¶
func MirrorHighThroughputC ¶
func MirrorLowLatencyC ¶
func ProcessorPoolC ¶
func RateLimitC ¶
func RequestProcessorPoolC ¶
func SieveTreeC ¶
func SieveTreeC[T any]( choice IndexGetter[T], chain ...ChainFunc[T], ) ChainFunc[T]
type CircuitState ¶
type CircuitState uint32
const ( CircuitUndefined CircuitState = iota // UNDEFINED CircuitOpen // OPEN CircuitClosed // CLOSED CircuitHalfOpen // HALF_OPEN )
func (CircuitState) String ¶
func (i CircuitState) String() string
type Comparable ¶
type Comparable interface {
LessThan(other Comparable) bool
}
type Complex ¶
type Complex interface { ~complex64 | ~complex128 }
Complex is a constraint that permits any complex numeric type. If future releases of Go add new predeclared complex numeric types, this constraint will be modified to include them.
type DurationGenerator ¶
type Float ¶
Float is a constraint that permits any floating-point type. If future releases of Go add new predeclared floating-point types, this constraint will be modified to include them.
type IndexGetter ¶
type IndexedInteger ¶
type Integer ¶
Integer is a constraint that permits any integer type. If future releases of Go add new predeclared integer types, this constraint will be modified to include them.
type LeakyBucket ¶
type LeakyBucket struct {
// contains filtered or unexported fields
}
func NewLeakyBucket ¶
func NewLeakyBucket( count int, interval time.Duration, ) *LeakyBucket
func (*LeakyBucket) Incr ¶
func (b *LeakyBucket) Incr() bool
type Ordered ¶
Ordered is a constraint that permits any ordered type: any type that supports the operators < <= >= >. If future releases of Go add new ordered types, this constraint will be modified to include them.
type Processor ¶
Processor defines a function that read from a single input stream and produce elements to the resulting output stream.
type ReorderOption ¶
type ReorderOption interface {
// contains filtered or unexported methods
}
ReorderOption defines options for Reorder.
func WithBufferSize ¶
func WithBufferSize(d int) ReorderOption
WithBufferSize sets internal buffer size.
func WithOrderReversed ¶
func WithOrderReversed() ReorderOption
WithOrderReversed option for reversing order.
type Request ¶
type Request[P any, R any] struct { P P ChResp chan<- ValErrorPair[R] }
type Signed ¶
Signed is a constraint that permits any signed integer type. If future releases of Go add new predeclared signed integer types, this constraint will be modified to include them.
type Unsigned ¶
Unsigned is a constraint that permits any unsigned integer type. If future releases of Go add new predeclared unsigned integer types, this constraint will be modified to include them.
type ValErrorPair ¶
Source Files ¶
- buffer.go
- chain.go
- circuit_breaker.go
- circuitstate_string.go
- combination.go
- conch.go
- consume.go
- count.go
- counting.go
- delay.go
- distribute.go
- engineevent_string.go
- err.go
- filter.go
- iterators.go
- keep.go
- leaky_bucket.go
- order.go
- permutations.go
- pressurize.go
- reorder.go
- requester.go
- shuffle.go
- shuffle_order.go
- sieve.go
- skip.go
- spread.go
- tolist.go
- tomap.go
- transform.go
- types.go
- valve.go
- zip.go