filechannel

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2024 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	IteratorBufferLimit = 1 << 20
)
View Source
const MessageHeaderBinarySize = 8
View Source
const SegmentHeaderBinarySize = 64

Variables

View Source
var (
	ErrUnexpectedEOF      = fmt.Errorf("unexpected EOF")
	ErrChecksumMismatch   = errors.New("channel corrupted: checksum mismatch")
	ErrChannelClosed      = errors.New("channel closed")
	ErrNotEnoughMessages  = errors.New("not enough messages")
	ErrNotEnoughReadToAck = errors.New("not enough read to ack")
)
View Source
var (
	DefaultFlushInterval   = 100 * time.Microsecond // 100us
	DefaultRotateThreshold = uint64(512 << 20)      // 512 MB
)

Functions

func ReadNext

func ReadNext(r io.Reader, w io.Writer, hBuf []byte) error

Types

type CompressedSegmentHeader

type CompressedSegmentHeader struct {
	SegmentID         uint32
	BeginOffset       uint64
	EndOffset         uint64
	CompressionMethod CompressionMethod
}

func (*CompressedSegmentHeader) Decode

func (h *CompressedSegmentHeader) Decode(b []byte)

func (*CompressedSegmentHeader) Encode

func (h *CompressedSegmentHeader) Encode(b []byte)

type CompressionMethod

type CompressionMethod byte
const (
	Snappy CompressionMethod = iota
)

type FileChannel

type FileChannel struct {
	// contains filtered or unexported fields
}

func NewFileChannel

func NewFileChannel(dir string, opts ...Option) *FileChannel

func OpenFileChannel

func OpenFileChannel(dir string, opts ...Option) (*FileChannel, error)

func (*FileChannel) Close

func (fc *FileChannel) Close() error

func (*FileChannel) Flush

func (fc *FileChannel) Flush() error

func (*FileChannel) Iterator

func (fc *FileChannel) Iterator() *Iterator

func (*FileChannel) IteratorAcknowledgable

func (fc *FileChannel) IteratorAcknowledgable() *Iterator

func (*FileChannel) Open

func (fc *FileChannel) Open() error

func (*FileChannel) Write

func (fc *FileChannel) Write(p []byte) (err error)

type Iterator

type Iterator struct {
	// contains filtered or unexported fields
}

func NewIterator

func NewIterator(manager *SegmentManager, position *Position, autoAck bool) *Iterator

func (*Iterator) Ack

func (it *Iterator) Ack(n int) error

func (*Iterator) Close

func (it *Iterator) Close() error

func (*Iterator) Next

func (it *Iterator) Next(ctx context.Context) (b []byte, err error)

func (*Iterator) TryNext

func (it *Iterator) TryNext() ([]byte, error)

type MessageHeader

type MessageHeader struct {
	Length   uint32
	Checksum uint32
}

func (*MessageHeader) Decode

func (h *MessageHeader) Decode(b []byte)

func (*MessageHeader) Encode

func (h *MessageHeader) Encode(b []byte)

type Option

type Option func(*FileChannel)

func FlushInterval

func FlushInterval(d time.Duration) Option

func RotateThreshold

func RotateThreshold(n uint64) Option

type PlainSegmentHeader

type PlainSegmentHeader struct {
	SegmentID   uint32
	BeginOffset uint64
}

func (*PlainSegmentHeader) Decode

func (h *PlainSegmentHeader) Decode(b []byte)

func (*PlainSegmentHeader) Encode

func (h *PlainSegmentHeader) Encode(b []byte)

type Position

type Position struct {
	// contains filtered or unexported fields
}

func NewPosition

func NewPosition(offset uint64) *Position

func (*Position) Close

func (p *Position) Close()

func (*Position) Get

func (p *Position) Get() uint64

func (*Position) Update

func (p *Position) Update(offset uint64) uint64

func (*Position) Wait

func (p *Position) Wait(ctx context.Context, cond func(uint64) bool) (uint64, error)

type SegmentFileState

type SegmentFileState int
const (
	Plain SegmentFileState = iota
	Compressing
	Compressed
)

func ParseSegmentIndexAndState

func ParseSegmentIndexAndState(file string) (uint32, SegmentFileState, error)

type SegmentManager

type SegmentManager struct {
	// contains filtered or unexported fields
}

func NewSegmentManager

func NewSegmentManager(dir string) *SegmentManager

func (*SegmentManager) AdvanceReader

func (sm *SegmentManager) AdvanceReader(prev uint32, delta uint32) (uint32, uint32)

func (*SegmentManager) CloseReader

func (sm *SegmentManager) CloseReader(cur uint32) uint32

func (*SegmentManager) CurrentSegmentIndex

func (sm *SegmentManager) CurrentSegmentIndex() uint32

func (*SegmentManager) CurrentSegmentWatermark

func (sm *SegmentManager) CurrentSegmentWatermark() uint32

func (*SegmentManager) GetBeginIndex

func (sm *SegmentManager) GetBeginIndex() uint32

func (*SegmentManager) IncSegmentIndex

func (sm *SegmentManager) IncSegmentIndex() uint32

func (*SegmentManager) NewReader

func (sm *SegmentManager) NewReader() uint32

func (*SegmentManager) Pin

func (sm *SegmentManager) Pin(index uint32) bool

func (*SegmentManager) SegmentFile

func (sm *SegmentManager) SegmentFile(index uint32, state SegmentFileState) string

func (*SegmentManager) SetBeginIndex

func (sm *SegmentManager) SetBeginIndex(index uint32)

func (*SegmentManager) Unpin

func (sm *SegmentManager) Unpin(index uint32)

func (*SegmentManager) WaitUntilWatermarkAbove

func (sm *SegmentManager) WaitUntilWatermarkAbove(ctx context.Context, index uint32) (uint32, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL