log

package
v0.0.0-...-282181f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2016 License: GPL-3.0 Imports: 9 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	BadCRC        = errors.New("bad CRC in message")
	UnexpectedEOF = errors.New("unexpected EOF")
)

Functions

func Timestamp

func Timestamp(t time.Time) uint64

Types

type BinaryReader

type BinaryReader struct {
	io.Reader
	// contains filtered or unexported fields
}

Performs reads from a reader, unless an error occur. When an error occurs, it is recorded in the err property, and future calls to read methods are no-op.

func (*BinaryReader) Err

func (br *BinaryReader) Err() error

func (*BinaryReader) ReadByte

func (br *BinaryReader) ReadByte() byte

func (*BinaryReader) ReadBytes

func (br *BinaryReader) ReadBytes() []byte

func (*BinaryReader) ReadUint32

func (br *BinaryReader) ReadUint32() uint32

func (*BinaryReader) ReadUint64

func (br *BinaryReader) ReadUint64() uint64

type BinaryWriter

type BinaryWriter struct {
	io.Writer
	// contains filtered or unexported fields
}

Performs writes to a writer, unless an error occur. When an error occurs, it is recorded in the err property, and future calls to write methods are no-op.

func NewBinaryWriter

func NewBinaryWriter(writer io.Writer) *BinaryWriter

func (*BinaryWriter) Err

func (bw *BinaryWriter) Err() error

func (*BinaryWriter) WriteByte

func (bw *BinaryWriter) WriteByte(v byte)

func (*BinaryWriter) WriteBytes

func (bw *BinaryWriter) WriteBytes(b []byte)

func (*BinaryWriter) WriteUint32

func (bw *BinaryWriter) WriteUint32(v uint32)

func (*BinaryWriter) WriteUint64

func (bw *BinaryWriter) WriteUint64(v uint64)

type ByStartOffset

type ByStartOffset []Segment

func (ByStartOffset) Len

func (s ByStartOffset) Len() int

func (ByStartOffset) Less

func (s ByStartOffset) Less(i, j int) bool

func (ByStartOffset) Swap

func (s ByStartOffset) Swap(i, j int)

type Config

type Config struct {
	MaxSegmentSize int64
	MaxSyncLag     int
}

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) Close

func (c *Consumer) Close()

func (*Consumer) Next

func (c *Consumer) Next() (uint64, *Message, error)

type Log

type Log struct {
	// contains filtered or unexported fields
}

func Open

func Open(config Config, store Store) (*Log, error)

Open a log from a store

func (*Log) Append

func (l *Log) Append(message *Message) (uint64, error)

Append a message to this log

func (*Log) Close

func (l *Log) Close()

func (*Log) Consumer

func (l *Log) Consumer(startOffset uint64) (*Consumer, error)

Creates a new consumer starting at startOffset. If startOffset == 0, starts at the end of the log.

func (*Log) NextOffset

func (l *Log) NextOffset() uint64

func (*Log) SetConfig

func (l *Log) SetConfig(config Config)

Change the configuration

func (*Log) Sync

func (l *Log) Sync()

func (*Log) WaitOffset

func (l *Log) WaitOffset(minOffset uint64)

Wait for this log to reach an offset of at least minOffset.

func (*Log) WaitSyncOffset

func (l *Log) WaitSyncOffset(minOffset uint64)

Wait for this log to sync an offset of at least minOffset.

type Message

type Message struct {
	// 4 byte CRC32 of the message
	CRC uint32
	// 1 byte "magic" identifier to allow format changes, value is 0 or 1
	Format byte
	// 1 byte "attributes" identifier to allow annotations on the message independent
	//   bit 0 ~ 2 : Compression codec.
	//      0 : no compression
	//      1 : gzip
	//      2 : snappy
	//      3 : lz4
	//    bit 3 : Timestamp type
	//      0 : create time
	//      1 : log append time
	//    bit 4 ~ 7 : reserved
	Attributes byte
	// (Optional) 8 byte timestamp only if "magic" identifier is greater than 0
	Timestamp uint64
	// K byte key
	Key []byte
	// V byte payload
	Payload []byte
}

On-disk format of a message

offset : 8 bytes message length : 4 bytes (value: 4 + 1 + 1 + 8(if magic value > 0) + 4 + K + 4 + V) crc : 4 bytes magic value : 1 byte attributes : 1 byte timestamp : 8 bytes (Only exists when magic value is greater than zero) key length : 4 bytes key : K bytes value length : 4 bytes value : V bytes

func NewMessage

func NewMessage(timestamp uint64, key, data []byte) *Message

func (*Message) ComputeCRC

func (l *Message) ComputeCRC() uint32

func (*Message) Len

func (l *Message) Len() uint32

func (*Message) ReadFrom

func (l *Message) ReadFrom(reader io.Reader) error

func (*Message) UpdateCRC

func (l *Message) UpdateCRC()

func (*Message) WriteTo

func (l *Message) WriteTo(writer *BinaryWriter)

type Reader

type Reader struct {
	ReaderBackend
	// contains filtered or unexported fields
}

func NewReader

func NewReader(backend ReaderBackend, position int64, bufferSize int) *Reader

func (*Reader) FastRead

func (lr *Reader) FastRead() (uint64, error)

Read and check the next message but don't parse it.

func (*Reader) Next

func (lr *Reader) Next() (uint64, *Message, error)

Read and check the next message.

func (*Reader) Position

func (lr *Reader) Position() int64

func (*Reader) SeekToEnd

func (lr *Reader) SeekToEnd() (uint64, error)

func (*Reader) SeekToOffset

func (lr *Reader) SeekToOffset(offset uint64) error

type ReaderBackend

type ReaderBackend interface {
	io.Reader
	io.Seeker
	io.Closer
}

type Segment

type Segment interface {
	// The first offset of this segment (given by Store.AddSegment).
	StartOffset() uint64
	// An appender to this segment.
	Appender() (SegmentAppender, error)
	// A reader of this segment.
	Reader() (SegmentReader, error)
}

A slice of a log.

type SegmentAppender

type SegmentAppender interface {
	// Append a message to the log. Returns the position after the write (aka segment size).
	Append(offset uint64, message *Message) (int64, error)
	// Flush caches to ensure data is written.
	Sync() error
	// Close the appender.
	Close() error
}

Minimum interface to reliably append messages to a segment

type SegmentReader

type SegmentReader interface {
	// The current position in the segment.
	Position() int64
	// Read the next message from the segment.
	// Returns the offset, the message, and any error that occured while reading.
	Next() (uint64, *Message, error)
	// Seek to a given offset
	SeekToOffset(offset uint64) error
	// Seek to the end of the segment, returning the last valid offset read.
	SeekToEnd() (uint64, error)
	// Close the reader
	Close() error
}

Minimum interface to read messages from a segment

type Store

type Store interface {
	// The current list of segments in the store.
	Segments() ([]Segment, error)
	// Add a new segment to the store.
	AddSegment(startOffset uint64) (Segment, error)
}

A complete log.

type Writer

type Writer struct {
	WriterBackend
	// contains filtered or unexported fields
}

func NewWriter

func NewWriter(backend WriterBackend, position int64, bufferSize int) *Writer

func (*Writer) Append

func (lw *Writer) Append(offset uint64, message *Message) (int64, error)

Append a log message and return the position after append, or any error occured when writing.

func (*Writer) Sync

func (lw *Writer) Sync() error

type WriterBackend

type WriterBackend interface {
	io.Writer
	io.Seeker
	io.Closer

	Sync() error
}

Directories

Path Synopsis
stores

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL