flightorder

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2024 License: Apache-2.0 Imports: 3 Imported by: 0

README

flightorder

This package allows to do [ordered input] -> [parallel processing] -> [ordered output] in a streaming manner.

The name was inspired by golang.org/x/sync/singleflight package.

Motivation

Sending logs from a single file to multiple kafka brokers concurrently while preserving at-least-once delivery guarantees:

  • Logs are sent to multiple kafka brokers in parallel to enhance throughput.
  • File offsets are committed in the exact order they are read to ensure at-least-once delivery guarantees and prevent data loss in case of shipper or broker failures.

Installation

go get github.com/go-faster/flightorder@latest

Example

package main

import (
	"context"
	"fmt"
	"math/rand"
	"sync"
	"time"

	"github.com/go-faster/flightorder"
)

func main() {
	input := []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
	processingOrder, output := processInput(input)
	fmt.Printf("input:     %v\n", input)
	fmt.Printf("processed: %v\n", processingOrder)
	fmt.Printf("output:    %v\n", output)
}

func processInput(input []int) (processing, output []int) {
	route := flightorder.NewRoute(flightorder.RouteParams{})

	var (
		mux sync.Mutex
		wg  sync.WaitGroup
	)

	wg.Add(len(input))
	for _, v := range input {
		ticket := route.TakeTicket()
		go func(t *flightorder.Ticket, v int) {
			defer wg.Done()
			time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))

			mux.Lock()
			processing = append(processing, v)
			mux.Unlock()

			_ = route.CompleteTicket(context.TODO(), t)

			mux.Lock()
			output = append(output, v)
			mux.Unlock()
		}(ticket, v)
	}

	wg.Wait()
	return
}

Output:

input:     [1 2 3 4 5 6 7 8 9]
processed: [3 1 9 7 6 2 5 4 8]
output:    [1 2 3 4 5 6 7 8 9]

License

Source code is available under Apache License 2.0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Route

type Route struct {
	// contains filtered or unexported fields
}

Route is responsible for tickets processing.

func NewRoute

func NewRoute(params RouteParams) *Route

NewRoute creates new route for tickets processing.

func (*Route) CompleteTicket

func (r *Route) CompleteTicket(ctx context.Context, t *Ticket) error

CompleteTicket completes a ticket. Waits for previous taken tickets to complete first, if any.

func (*Route) TakeTicket

func (r *Route) TakeTicket() *Ticket

TakeTicket takes a new ticket.

type RouteParams

type RouteParams struct {
	// TicketAllocator sets custom ticket allocator.
	// Optional.
	TicketAllocator TicketAllocator
}

RouteParams sets route parameters.

type StdAllocator

type StdAllocator struct{}

StdAllocator is a standard ticket allocator without any memory reuse.

func (StdAllocator) AcquireTicket

func (StdAllocator) AcquireTicket() *Ticket

AcquireTicket acquires a new ticket.

func (StdAllocator) ReleaseTicket

func (StdAllocator) ReleaseTicket(t *Ticket)

ReleaseTicket does nothing. Let GC erase ticket for us.

type SyncpoolAllocator

type SyncpoolAllocator struct {
	// contains filtered or unexported fields
}

SyncpoolAllocator uses sync.Pool under the hood to reuse allocated tickets.

func NewSyncpoolAllocator

func NewSyncpoolAllocator() *SyncpoolAllocator

NewSyncpoolAllocator creates new SyncpoolAllocator.

func (*SyncpoolAllocator) AcquireTicket

func (a *SyncpoolAllocator) AcquireTicket() *Ticket

AcquireTicket acquires a new ticket from the pool.

func (*SyncpoolAllocator) ReleaseTicket

func (a *SyncpoolAllocator) ReleaseTicket(t *Ticket)

ReleaseTicket releases ticket to the pool.

type Ticket

type Ticket struct {
	// contains filtered or unexported fields
}

Ticket is a route ticket.

type TicketAllocator

type TicketAllocator interface {
	AcquireTicket() *Ticket
	ReleaseTicket(t *Ticket)
}

TicketAllocator is responsible for ticket allocation.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL