filechannel

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: Apache-2.0 Imports: 3 Imported by: 0

README

File-based Persistent Channel

Example

package example

import (
	"context"

	"github.com/risingwavelabs/filechannel"
)

func Example(dir string) error {
	fch, err := filechannel.OpenFileChannel(dir)
	if err != nil {
		return err
	}
	defer fch.Close()

	msg := []byte("Hello world!")

	tx := fch.Tx()
	defer tx.Close()
	err = tx.Send(context.Background(), msg)
	if err != nil {
		return err
	}

	rx := fch.Rx()
	defer rx.Close()
	p, err := rx.Recv(context.Background())
	if err != nil {
		return err
	}

	return nil
}

Benchmarks

Check benchmarks here internal/filechannel/filechannel_benchmark_test.go.

Hardware: Macbook Pro
OS: macOS
CPU: M1 Pro
GOMAXPROCS: 4

pkg: github.com/risingwavelabs/filechannel/internal/filechannel
BenchmarkFileChannel_Write_16
BenchmarkFileChannel_Write_16-4           	18254582	       63.36 ns/op	       8 B/op	       1 allocs/op
BenchmarkFileChannel_Write_64
BenchmarkFileChannel_Write_64-4           	 8664692	       140.5 ns/op	       8 B/op	       1 allocs/op
BenchmarkFileChannel_Write_512
BenchmarkFileChannel_Write_512-4          	 1427677	       817.6 ns/op	       9 B/op	       1 allocs/op
BenchmarkFileChannel_Read_16
BenchmarkFileChannel_Read_16-4            	 9050712	       132.7 ns/op	      56 B/op	       2 allocs/op
BenchmarkFileChannel_Read_64
BenchmarkFileChannel_Read_64-4            	 8260588	       146.0 ns/op	      56 B/op	       2 allocs/op
BenchmarkFileChannel_Read_512
BenchmarkFileChannel_Read_512-4           	 3973660	       303.7 ns/op	      56 B/op	       2 allocs/op
BenchmarkFileChannel_ReadWrite_16
BenchmarkFileChannel_ReadWrite_16-4       	 6157968	       197.0 ns/op	      64 B/op	       3 allocs/op
BenchmarkFileChannel_ReadWrite_64
BenchmarkFileChannel_ReadWrite_64-4       	 4252632	       281.8 ns/op	      64 B/op	       3 allocs/op
BenchmarkFileChannel_ReadWrite_512
BenchmarkFileChannel_ReadWrite_512-4      	  902235	        1116 ns/op	      65 B/op	       3 allocs/op
BenchmarkFileChannel_ParallelRead_16
BenchmarkFileChannel_ParallelRead_16-4    	29948120	       38.91 ns/op	      56 B/op	       2 allocs/op
BenchmarkFileChannel_ParallelRead_64
BenchmarkFileChannel_ParallelRead_64-4    	26518456	       42.85 ns/op	      56 B/op	       2 allocs/op
BenchmarkFileChannel_ParallelRead_512
BenchmarkFileChannel_ParallelRead_512-4   	27467155	       42.09 ns/op	      56 B/op	       2 allocs/op

Memory Consumption

From test case TestFileChannel_MemoryConsumption.

=========== BEFORE ALL ===========
Alloc: 240.3 KiB
TotalAlloc: 240.3 KiB
Sys: 7.4 MiB
Lookups: 0
Mallocs: 1024
Frees: 122
HeapAlloc: 240.3 KiB
HeapSys: 3.7 MiB
HeapIdle: 3.0 MiB
HeapInuse: 632.0 KiB
HeapReleased: 3.0 MiB
HeapObjects: 902

=========== AFTER OPEN ===========
Alloc: 249.5 KiB
TotalAlloc: 249.5 KiB
Sys: 7.4 MiB
Lookups: 0
Mallocs: 1101
Frees: 130
HeapAlloc: 249.5 KiB
HeapSys: 3.7 MiB
HeapIdle: 3.0 MiB
HeapInuse: 656.0 KiB
HeapReleased: 3.0 MiB
HeapObjects: 971

=========== AFTER SENDING ===========
Alloc: 2.5 MiB
TotalAlloc: 16.6 MiB
Sys: 14.3 MiB
Lookups: 0
Mallocs: 2104024
Frees: 1953265
HeapAlloc: 2.5 MiB
HeapSys: 7.6 MiB
HeapIdle: 4.5 MiB
HeapInuse: 3.0 MiB
HeapReleased: 2.9 MiB
HeapObjects: 150759

=========== AFTER RECEIVING ===========
Alloc: 2.5 MiB
TotalAlloc: 226.0 MiB
Sys: 14.3 MiB
Lookups: 0
Mallocs: 10514798
Frees: 10421026
HeapAlloc: 2.5 MiB
HeapSys: 7.6 MiB
HeapIdle: 4.5 MiB
HeapInuse: 3.0 MiB
HeapReleased: 2.9 MiB
HeapObjects: 93772

=========== AFTER ALL ===========
Alloc: 2.5 MiB
TotalAlloc: 226.0 MiB
Sys: 14.3 MiB
Lookups: 0
Mallocs: 10514861
Frees: 10421037
HeapAlloc: 2.5 MiB
HeapSys: 7.5 MiB
HeapIdle: 4.5 MiB
HeapInuse: 3.1 MiB
HeapReleased: 2.9 MiB
HeapObjects: 93824

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultRotateThreshold = filechannel.DefaultRotateThreshold
	DefaultFlushInterval   = filechannel.DefaultFlushInterval
	RotateThreshold        = filechannel.RotateThreshold
	FlushInterval          = filechannel.FlushInterval
)

Default option values and options.

Functions

This section is empty.

Types

type AckFileChannel

type AckFileChannel interface {
	FileChannel

	// RxAck creates a AckReceiver. AckReceiver behaves the same as
	// Receiver from [FileChannel.Rx] except the ack. Like Receiver,
	// there also can be multiple AckReceiver at the same time.
	RxAck() AckReceiver
}

AckFileChannel is the interface for a file-based persistent channel that supports asynchronous ack of received messages.

func OpenAckFileChannel

func OpenAckFileChannel(dir string, opts ...Option) (AckFileChannel, error)

OpenAckFileChannel opens a new AckFileChannel.

type AckReceiver

type AckReceiver interface {
	Receiver

	// Ack consumes the front unacknowledged messages.
	Ack(n int) error
}

AckReceiver receives bytes like Receiver. However, it doesn't consume the data until a manual Ack is invoked. Consuming a data means telling the file channel that the data can be purged.

type FileChannel

type FileChannel interface {
	Stats

	// Tx creates a Sender. Sender is thread safe.
	// It's possible to have multiple senders at the same time.
	Tx() Sender

	// Rx creates a Receiver. Be careful that Receiver is non-thread safe.
	// However, it's possible to have multiple receivers at the same time.
	// The first message received by each receiver is undetermined and
	// leaved to implementation.
	Rx() Receiver

	// Close the channel. Unclosed senders will block the method.
	Close() error
}

FileChannel is the interface for a file-based persistent channel.

func OpenFileChannel

func OpenFileChannel(dir string, opts ...Option) (FileChannel, error)

OpenFileChannel opens a new FileChannel.

type Option

type Option = filechannel.Option

Option to create a FileChannel.

type Receiver

type Receiver interface {
	ReceiverStats

	// Recv bytes from file channel.
	Recv(context.Context) ([]byte, error)

	// Close closes the reader.
	Close() error
}

Receiver receives bytes from file channel in the sending order.

type ReceiverStats added in v0.1.1

type ReceiverStats interface {
	// ReadOffset returns the offset of the last read message.
	// Note that the offset is local to the receiver. It will only change
	// when the receiver reads a message.
	//
	// The initial offset is math.MaxUint64 to indicate that no message has
	// been read.
	ReadOffset() uint64
}

ReceiverStats is the interface for getting the stats of a receiver.

type Sender

type Sender interface {
	SenderStats

	// Send bytes to file channel. Data will be finally persistent on disk.
	Send(context.Context, []byte) error

	// Close closes a sender.
	Close() error
}

Sender sends bytes to file channel.

type SenderStats added in v0.1.1

type SenderStats interface {
	// WriteOffset returns the offset of the last written message.
	// Note that the offset is not the byte offset in the file. It's the
	// offset of the message in the channel. The offset will change no matter
	// a message is written by the sender, or the other senders of the same
	// channel.
	WriteOffset() uint64
}

SenderStats is the interface for getting the stats of a sender.

type Stats added in v0.1.1

type Stats interface {
	// DiskUsage returns the disk usage of the file channel.
	// Note that calling DiskUsage() is an expensive operation.
	DiskUsage() (uint64, error)

	// FlushOffset returns the offset of the last flushed message.
	// Messages with offset less than the flush offset are guaranteed to be
	// seen by the readers.
	FlushOffset() uint64
}

Stats is the interface for getting the stats of a file channel.

Directories

Path Synopsis
internal
fs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL