diskq

package module
v0.0.0-...-d9794a8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2024 License: MIT Imports: 17 Imported by: 3

README

diskq

Continuous Integration Go Report Card

Diagram

Caveats

go-diskq is currently in preview, and there are no guarantees around the API or the features currently provided. Use at your own risk!

Overview

go-diskq provides a single node equivalent of Kafka, similar to what sqlite is to an online database like Postgres. Said another way, go-diskq is a library implementation of a streaming system which writes to a local disk. It supports high throughput writing and reading, such that the process that is producing messages can be decoupled from processes that read messages, and consumption can be triggered through filesystem events.

Streams are rooted at a path and can have a single writer at a given time. Think of streams like "topics" in Kafka parlance. Streams are split into partitions, to which messages are assigned by partition keys deterministically. Streams can be vacuumed such that they are held to a maximum size in bytes on disk, or a maximum age of messages. Vacuuming must be initiated manually, usually in a separate ticking goroutine.

Consumers will be notified of a new message on a stream partition in under a millisecond (often in single digit microseconds depending on the platform), making this useful for near realtime applications. Consumers will block until the Messages() channel is read from, holding up reading new indexes from the partition segments until the previous message is read. Consumers can mark offsets and resume from last known good offsets. Some helper types, namely ConsumerGroup and MarkedConsumerGroup can be used to save steps in monitoring for new partitions, and marking consumer progress.

Example

To create a new producer, set up vacuuming and push messages, start with diskq.New:

q, err := diskq.New("/tmp/streams/test-stream", diskq.Options{
  PartitionCount: 3, // can be 1, or can be many
  RetentionMaxAge: 24 * time.Hour, // only hold 24 hours of messages
})
if err != nil {
  return err
}
defer q.Close()

// vacuum automatically every 5 seconds
go func() {
  for time.Tick(5 * time.Second) {
    _ = q.Vacuum()
  }
}()

_, _, err = q.Push(diskq.Message{PartitionKey: "customer-00", Data: serialize(...}})
if err != nil {
  return err
}

To then read messages you can use a diskq.OpenConsumerGroup to save some steps around enumerating all the partitions and merging the messages for each:


c, err := diskq.OpenConsumerGroup("/tmp/streams/test-stream", diskq.ConsumerGroupOptions{})
if err != nil {
  return err
}
defer c.Close()

for {
  select {
  case msg, ok := <-c.Messages():
    if !ok {
      return nil
    }
    fmt.Println(string(msg.Data))
  case err, ok := <-c.Errors():
    if !ok {
      return nil
    }
    fmt.Fprintf(os.Stderr, "err: %+v\n", err)
  }
}

Stream file organization

${DATA_PATH}/
  owner
  parts/
    000000/
      00000000000000000000.data
      00000000000000000000.index
      00000000000000000000.timeindex
    000001/
      00000000000000000000.data
      00000000000000000000.index
      00000000000000000000.timeindex
      ...
    000002/
      00000000000000000000.data
      00000000000000000000.index
      00000000000000000000.timeindex
      ...

A diskq stream is rooted in a single directory.

Within the directory there is an owner file if there is a currrent active producer so that we don't allow another producer to be created. The owner file will contain a single UUID that corresponds to the *diskq.Diskq instance that is the active producer for that stream.

In addition to the owner file there is a parts directory that contains sub-directory for each partition, named as a six-zero-padded integer corresponding to the partition index (e.g. 000003.)

Within each partition sub-directory there are one or many triplets of files, each triplet corresponding to a "segment":

  • A .data file which contains binary representations of each message (more on this representation below.)
  • A .index file that contains a stream of triplets of uint64 values corresponding to each message at an offset: [offset|bytes_offset_from_start|message_size_bytes]
  • A .timeindex file that contains a stream of pairs of uint64 values corresponding to each message at an offset: [offset|timestamp_nanos]

Each triplet of files for a segment is has a prefix corresponding to the twenty-zero-padded integer of the first offset of that segment, e.g. 00000000000000000025 for a segment that starts with the 25 offset for the partition. The last segment of a partiton is referred to as the "active" segment, and is the segment that is currently being written to.

Within each segment, individual pieces of data are referred to as Messages. A message is represented on disk in the segment data file as follows:

  • A varuint for the size of the partition key in bytes.
  • A byte array of that given size holding the partition key data.
  • A uint64 timestamp in nanos for the message timestamp.
  • A varuint for the size of the data int bytes.
  • A byte array of that given size holding the partition key data.

As a result a messages minimum size in bytes in the data file is typically ~2+1+3+2 or 8 bytes.

Vacuuming

Because the data for a partition is broken up into configurably sized segments, we can cull old data without interupting publishing to the active segment.

Vacuuming operates on a segment at a time, and as a result there is a tension between creating a lot of segments (with a small segment size) and having tight vacuum tolerances.

When vacuuming evaluates segments, the entire segment must be past the cutoff, as result some extra data typically is kept around until the next vacuum pass that would fully cull a given oldest segment.

diskq cli

Included in the repository is a cli tool to read from disk data directories, force vacuuming of partitions, display stats about a stream, and write new offsets to a stream.

Documentation

Overview

Package diskq provides a local filesystem streaming system similar to Kafka.

A simplified example might be:

    producer, _ := diskq.New("streams", diskq.Options{})
	_, _, _ = producer.Push(diskq.Message{Data:[]byte("hello world!")})

Here we both initialize and open for writing a diskq producer.

We then push a message into it, randomly assigning it to one of (by default, 3) partitions.

We can then consume messages from outside the producing process simply by listening for filesystem write events using consumers.

Index

Constants

View Source
const (
	// ExtData is the extension of segment data files, including the leading dot.
	ExtData = ".data"
	// ExtIndex is the extension of segment index files, including the leading dot.
	ExtIndex = ".index"
	// ExtTimeIndex is the extension of segment timeindex files, including the leading dot.
	ExtTimeIndex = ".timeindex"
)
View Source
const (
	// DefaultPartitionCount is the default partition count.
	DefaultPartitionCount = 3
	// DefaultSegmentSizeBytes is the default segment size in bytes.
	DefaultSegmentSizeBytes = 32 * 1024 * 1024 // 32mb
)

Variables

View Source
var SegmentIndexSizeBytes = binary.Size(SegmentIndex{})

SegmentIndexSizeBytes is the size in bytes of an entry in the segment index.

View Source
var SegmentTimeIndexSizeBytes = binary.Size(SegmentTimeIndex{})

SegmentTimeIndexSizeBytes is the size in bytes of a segment time index element.

Functions

func Decode

func Decode(m *Message, r io.Reader) (err error)

Decode reads a message out of the reader.

func Encode

func Encode(m Message, wr io.Writer) (err error)

Encode writes a message out into a writer.

func FormatPartitionIndexForPath

func FormatPartitionIndexForPath(partitionIndex uint32) string

FormatPartitionIndexForPath returns a partition index as a string.

It's used as the final token for the path of a partition on disk.

func FormatPathForMarkedConsumerGroupOffsetMarker

func FormatPathForMarkedConsumerGroupOffsetMarker(dataPath, groupName string, partitionIndex uint32) string

FormatPathForMarkedConsumerGroupOffsetMarker formats the name of the marked.

func FormatPathForPartition

func FormatPathForPartition(path string, partitionIndex uint32) string

FormatPathForPartition formats a path string for an individual partition within the partitions directory of a stream path.

func FormatPathForPartitions

func FormatPathForPartitions(path string) string

FormatPathForPartitions formats a path string for the partitions directory within a stream directory.

func FormatPathForSegment

func FormatPathForSegment(path string, partitionIndex uint32, startOffset uint64) string

FormatPathForSegment formats a path string for a specific segment of a given partition.

func FormatPathForSentinel

func FormatPathForSentinel(path string) string

FormatPathForSentinel formats a path string for a sentinel (or "owner") file for a given stream path.

func FormatPathForSettings

func FormatPathForSettings(path string) string

FormatPathForSettings formats a path string for the settings file.

func FormatStartOffsetForPath

func FormatStartOffsetForPath(startOffset uint64) string

FormatStartOffsetForPath formats a start offset as a string.

It's used as the file basename for a segments files (e.g. the index, data or timeindex).

func GetOffsetAfter

func GetOffsetAfter(path string, partitionIndex uint32, after time.Time) (offset uint64, found bool, err error)

GetOffsetAfter returns the next offset after a given timestamp.

If no offet was found after that timestamp, typically because the timestamp is after the "newest" offset the found bool will be false.

GetOffsetAfter first will iterate over the individual partion segments until it finds one whose newest timestamp is before the given after timestamp value, then will use a binary search over the individual timestamps in the time index for the segment until the correct offset is found.

func GetPartitionSegmentStartOffsets

func GetPartitionSegmentStartOffsets(path string, partitionIndex uint32) (output []uint64, err error)

GetPartitionSegmentStartOffsets gets the start offsets of a given partition as derrived by the filenames of the segments within a partition's data directory.

func GetPartitionSizeBytes

func GetPartitionSizeBytes(path string, partitionIndex uint32, skipActiveSegment bool) (sizeBytes int64, err error)

GetPartitionSizeBytes gets the size in bytes of a partition by path and partition index.

It does this by iterating over the segment files for the partition and stat-ing the files.

func GetPartitions

func GetPartitions(path string) ([]uint32, error)

GetPartitions returns the partition indices of the partitions in a given stream by path.

func GetSegmentNewestOffset

func GetSegmentNewestOffset(path string, partitionIndex uint32, startOffset uint64) (newestOffset uint64, err error)

GetSegmentOldestTimestamp opens a segment index file as indicated by the path, partitionIndex and startOffset for the file and returns the newest (or last) offset in the index.

func GetSegmentNewestOldestOffsetFromTimeIndexHandle

func GetSegmentNewestOldestOffsetFromTimeIndexHandle(f *os.File) (oldestOffset, newestOffset uint64, err error)

GetSegmentNewestOldestOffsetFromTimeIndexHandle returns the oldest (or first) and newest (or last) offsets from an already open file handle to a timeindex by seeking to the 0th byte and reading a timeindex segment, then seeking to the -SegmentTimeIndexSizeBytes-th byte from the end of the file, and reading the last timeindex segment.

func GetSegmentNewestTimestamp

func GetSegmentNewestTimestamp(path string, partitionIndex uint32, startOffset uint64) (ts time.Time, err error)

GetSegmentNewestTimestamp opens a segment timeindex file as indicated by the path, partitionIndex and startOffset for the file and returns the newest (or last) timestamp in the index.

func GetSegmentOldestNewestTimestamps

func GetSegmentOldestNewestTimestamps(path string, partitionIndex uint32, startOffset uint64) (oldest, newest time.Time, err error)

GetSegmentOldestTimestamp opens a segment timeindex file as indicated by the path, partitionIndex and startOffset for the file and returns both the oldest (or first) and the newest (or last) timestamp in the index.

func GetSegmentOldestTimestamp

func GetSegmentOldestTimestamp(path string, partitionIndex uint32, startOffset uint64) (oldest time.Time, err error)

GetSegmentOldestTimestamp opens a segment timeindex file as indicated by the path, partitionIndex and startOffset for the file and returns the oldest (or first) timestamp in the index.

func GetSegmentStartOffsetForOffset

func GetSegmentStartOffsetForOffset(entries []uint64, offset uint64) (uint64, bool)

GetSegmentStartOffsetForOffset searches a given list of entries for a given offset such that the entry returned would correspond to the startoffset of the segment file that _would_ contain that offset.

func OpenSegmentFileForRead

func OpenSegmentFileForRead(path string, partitionIndex uint32, startOffset uint64, ext string) (*os.File, error)

OpenSegmentFileForRead opens a segment file with a given extension in "read" mode with the correct flags.

func ParseSegmentOffsetFromPath

func ParseSegmentOffsetFromPath(path string) (uint64, error)

ParseSegmentOffsetFromPath parses the segment offset from a given full file path.

func Read

func Read(path string, partitionIndex uint32, fn func(MessageWithOffset) error) error

Read reads a given diskq at a given path and a given partition index, and calls a given function for each message read from the partition.

Read is optimized for reading the oldest offset to the newst offset of the given partition and does not wait for new messages to be published.

As a result, read is useful in situations where you want to bootstrap a system from the data on disk quickly, as there is no overhead of pushing into channels.

It is parameterized by partition because partitions are strictly ordered, but you cannot relate the relative order of many partitions.

If you want to read all partitions of a diskq, you can first enumerate the partitions with the helper GetPartitions, which will return an []uint32 you can pass the elements of as the partitionIndex parameter.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer handles reading messages from a given partition.

Consumers can start at known offsets, or at queried offsets based on the start behavior.

Consumers can also end at specific offsets, or end when the last offset is read in the active segment, or just block and wait for new offsets to be written to the active segment (this is the default behavior).

func OpenConsumer

func OpenConsumer(path string, partitionIndex uint32, options ConsumerOptions) (*Consumer, error)

OpenConsumer creates a new consumer for a given path, partition and options.

There can be many consumers for a given partition active at a given time, and you can consume partitions that may be written to by an active producer.

func (*Consumer) Close

func (c *Consumer) Close() error

Close closes the consumer and frees any held resources like file handles.

Generally close is called when the read loop exits on its own, but you can stop a consumer early if you call `Close` before the read loop completes.

Once a consumer is closed it cannot be re-used; to start consuming again, open a new consumer with `OpenConsumer(...)`.

func (*Consumer) Errors

func (c *Consumer) Errors() <-chan error

Errors returns a channel that carries errors returned when reading messages through the lifetime of the consumer.

Generally there will be at most (1) error pushed into this channel, and the channel will be buffered so you can read it out later.

func (*Consumer) Messages

func (c *Consumer) Messages() <-chan MessageWithOffset

Messages is how you will read the messages the consumer sees as it reads the segments for the given partition.

The messages channel is unbuffered, so you must read the message for the consumer to be able to continue.

type ConsumerEndBehavior

type ConsumerEndBehavior uint8

ConsumerEndBehavior controls how the consumer behaves when the last offset is read in the active segment.

const (
	// ConsumerEndBehaviorWait is a consumer end behavior that
	// will cause the consumer to wait for new offsets when it reaches
	// the end of the active segment.
	ConsumerEndBehaviorWait ConsumerEndBehavior = iota
	// ConsumerEndBehaviorAtOffset is a consumer end behavior that
	// will cause the consumer to exit when it reaches a given offset.
	ConsumerEndBehaviorAtOffset
	// ConsumerEndBehaviorAtOffset is a consumer end behavior that
	// will cause the consumer to exit when it reaches the
	// end of the active segment.
	ConsumerEndBehaviorClose
)

func ParseConsumerEndBehavior

func ParseConsumerEndBehavior(raw string) (endBehavior ConsumerEndBehavior, err error)

ParseConsumerEndBehavior parses a given raw string as a consumer end behavior.

func (ConsumerEndBehavior) String

func (ceb ConsumerEndBehavior) String() string

String returns a string form of the consumer end behavior.

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

ConsumerGroup is a consumer that reads from all partitions at once, and periodically scans for new partitions, or stops reading partitions that may have been deleted.

Partitions are read from their start offset by default, and you can only control the end behavior in practice.

func OpenConsumerGroup

func OpenConsumerGroup(dataPath string, options ConsumerGroupOptions) (*ConsumerGroup, error)

OpenConsumerGroup opens a new consumer group.

A consumer group reads from all partitions at once, and scans for new partitions if they're added, mapping each partition to an underlying consumer.

ConsumerGroups are not cooperative; they do not automatically split assignment of partitions. If you want to read a subset of partitions, you can configure which partitions the consumer will read using the [ConsumerGroupOptions.ShouldConsume] function.

func (*ConsumerGroup) Close

func (cg *ConsumerGroup) Close() error

Close closes the consumer groups and all the consumers it may have started.

Close is safe to call more than once.

func (*ConsumerGroup) Errors

func (cg *ConsumerGroup) Errors() <-chan error

Errors returns a channel that will receive errors from the individual consumers.

You should use the `err, ok := <-cg.Errors()` form of channel reads when reading this channel to detect if the channel is closed, which would indicate all of the consumer group consumers have reached the end of their respective partitions with the consuemr end behavior of "close", or if the consumer group itself is closed.

func (*ConsumerGroup) ID

func (cg *ConsumerGroup) ID() UUID

ID returns a unique identifier for this consumer group.

func (*ConsumerGroup) Messages

func (cg *ConsumerGroup) Messages() <-chan MessageWithOffset

Messages returns a channel that will receive messages from the individual consumers in the order they're read.

You should use the `msg, ok := <-cg.Messages()` form of channel reads when reading this channel to detect if the channel is closed, which would indicate the all of the consumer group consumers have reached the end of their respective partitions with the consumer end behavior of "close", or if the consumer group itself is closed.

type ConsumerGroupOptions

type ConsumerGroupOptions struct {
	// ShouldConsume is a callback that is called when a partition is
	// seen, and should return "true" if the partition should be consumed
	// by this consumer group.
	//
	// If the callback is not set, it is assumed the consumer group will
	// read from all partitions.
	ShouldConsume func(uint32) bool

	// OptionsForConsumer is a callback that given a partition index
	// expects a ConsumerOptions or error to be returned.
	//
	// If an error is returned, the entire consumer group fails
	// and that error is pushed into the errors channel of the consumer group.
	OptionsForConsumer func(uint32) (ConsumerOptions, error)

	// OnCloseConsumer is a callback that is called when a consumer is closed.
	//
	// A consumer is closed when the group itself is closed, but can also happen
	// if the underlying partition the consumer is reading is deleted, though
	// in practice this should almost never happen.
	OnCloseConsumer func(uint32) error

	// PartitionScanInterval is the interval the consumer group will use
	// to scan for new partitions.
	PartitionScanInterval time.Duration
}

ConsumerGroupOptions are extra options for consumer groups.

func (ConsumerGroupOptions) PartitionScanIntervalOrDefault

func (cg ConsumerGroupOptions) PartitionScanIntervalOrDefault() time.Duration

PartitionScanIntervalOrDefault returns the partition scan interval or a default.

type ConsumerOptions

type ConsumerOptions struct {
	StartBehavior ConsumerStartBehavior
	StartOffset   uint64
	EndBehavior   ConsumerEndBehavior
	EndOffset     uint64
}

ConsumerOptions are options that control how consumers behave.

type ConsumerStartBehavior

type ConsumerStartBehavior uint8

ConsumerStartBehavior controls how the consumer determines the first offset it will read from.

const (
	// ConsumerStartBehaviorOldest is a consumer start behavior that
	// starts a consumer at the absolute oldest offset present on disk.
	ConsumerStartBehaviorOldest ConsumerStartBehavior = iota
	// ConsumerStartBehaviorAtOffset is a consumer start behavior that
	// starts a consumer at a given offset.
	ConsumerStartBehaviorAtOffset
	// ConsumerStartBehaviorActiveSegmentOldest is a consumer start behavior that
	// starts a consumer at the beginning of the active segment for the partition.
	ConsumerStartBehaviorActiveSegmentOldest
	// ConsumerStartBehaviorNewest is a consumer start behavior that
	// starts a consumer at the end of the active segment, or the newest offset.
	ConsumerStartBehaviorNewest
)

func ParseConsumerStartBehavior

func ParseConsumerStartBehavior(raw string) (startBehavior ConsumerStartBehavior, err error)

ParseConsumerStartBehavior parses a raw string as a consumer start behavior.

func (ConsumerStartBehavior) String

func (csb ConsumerStartBehavior) String() string

String returns a string form of the consumer start behavior.

type Diskq

type Diskq struct {
	// contains filtered or unexported fields
}

Diskq is the root struct of the diskq.

It could be thought of primarily as the "producer" in the streaming system; you will use this type to "Push" messages into the partitions.

Close will release open file handles that are held by the partitions.

func New

func New(path string, cfg Options) (*Diskq, error)

New opens or creates a diskq based on a given path and config.

The `Diskq` type itself should be thought of as a producer with exclusive access to write to the data directory named in the config.

The path should be the location on disk you want to hold all the data associated with the diskq; you cannot split the diskq up across multiple disparate paths.

The [cfg] should be customized for what you'd like to use for this particular "session" of the diskq producer. You may, for example, change the partition count between sessions, or change the segment size and retention settings. If you'd like to reuse a previous session's options for the diskq, you can use the helper MaybeReadOptions, and pass the returned options from that helper as the [cfg] argument.

If the diskq exists on disk, existing partitions will be opened, and if the configured partition count is greater than the number of existing partitions, new empty partitions will be created. Existing "orphaned" partitions will be left in place until vacuuming potentially removes them.

func (*Diskq) Close

func (dq *Diskq) Close() error

Close releases any resources associated with the diskq and removes the sentinel file.

func (*Diskq) ID

func (dq *Diskq) ID() UUID

ID returns the id of the diskq producer.

func (*Diskq) Options

func (dq *Diskq) Options() Options

Options returns the config of the diskq producer.

func (*Diskq) Path

func (dq *Diskq) Path() string

Path returns the path of the diskq producer.

func (*Diskq) Push

func (dq *Diskq) Push(value Message) (partition uint32, offset uint64, err error)

Push pushes a new message into the diskq, returning the partition it was written to, the offset it was written to, and any errors that were generated while writing the message.

func (*Diskq) Sync

func (dq *Diskq) Sync() error

Sync calls `fsync` on each of the partition file handles.

This has the effect of realizing any buffered data to disk.

You shouldn't ever need to call this, but it's here if you do need to.

func (*Diskq) Vacuum

func (dq *Diskq) Vacuum() (err error)

Vacuum deletes old segments from all partitions if retention is configured.

Vacuum will operate on the partitions as they're found on disk; specifically the currently configured partition count is ignored in lieu of the extant partition list.

type MarkedConsumerGroup

type MarkedConsumerGroup struct {
	// contains filtered or unexported fields
}

MarkedConsumerGroup is a wrapped consumer group that has offset markers automatically configured.

func OpenMarkedConsumerGroup

func OpenMarkedConsumerGroup(dataPath, groupName string, options MarkedConsumerGroupOptions) (*MarkedConsumerGroup, error)

OpenMarkedConsumerGroup returns a new marked consumer group that reads a given path.

A marked consumer group lets you open a consumer group with automatic progress tracking. It wraps a standard consumer group with offset markers for each partition at a known path within the diskq data directory. If opening a consumer group with the same name after it's already recorded some offsets, it will resume the consumer for each position at the previously recorded offset, overiding [ConsumerOptions.StartBehavior] and [ConsumerOptions.StartOffset] on the returned consumer options from the consumer group options [ConsumerGroupOptions.OptionsForPartition] delegate.

To manually record the offset for a given message as successfully processed, use the MarkedConsumerGroup.SetLatestOffset helper function on the MarkedConsumerGroup struct itself.

Alternatively, you can enable [MarkedConsumerGroupOptions.AutoSetLatestOffset] on the options for the marked consumer group which will set the latest offset for a message's partition when it's read automatically by the consumer group before it's passed to your channel receiver. This is not enabled by default because it is dangerous to make assumptions about if the message was processed successfully, but it is implemented here for convenience.

func (*MarkedConsumerGroup) Close

func (m *MarkedConsumerGroup) Close() error

Close closes the consumer.

func (*MarkedConsumerGroup) Errors

func (m *MarkedConsumerGroup) Errors() <-chan error

Errors returns the errors channel.

As with consumer groups, and consumers generally, you should use the

err, ok := <-mcg.Errors()

form of a channel read on this channel to detect when the channel is closed.

func (*MarkedConsumerGroup) Messages

func (m *MarkedConsumerGroup) Messages() <-chan MessageWithOffset

Messages returns the messages channel.

As with consumer groups, and consumers generally, you should use the

msg, ok := <-mcg.Messages()

form of a channel read on this channel to detect when the channel is closed.

func (*MarkedConsumerGroup) SetLatestOffset

func (m *MarkedConsumerGroup) SetLatestOffset(partitionIndex uint32, offset uint64)

SetLatestOffset sets the latest offset for a given partition.

type MarkedConsumerGroupOptions

type MarkedConsumerGroupOptions struct {
	ConsumerGroupOptions
	OffsetMarkerOptions
	AutoSetLatestOffset bool
}

MarkedConsumerGroupOptions are options for a marked consumer group.

type Message

type Message struct {
	// PartitionKey is used to assign the message to a partition.
	//
	// The key will be hashed and then bucketed by the
	// number of active partitions.
	//
	// If unset, it will be assigned a uuidv4.
	PartitionKey string
	// TimestampUTC holds the timestamp that is used in the timeindex.
	//
	// If unset it will be assigned the current time in UTC.
	TimestampUTC time.Time
	// Data is the message's contents.
	Data []byte
}

Message is a single element of data written to a diskq.

It includes a [Message.PartitionKey] used to assign it to a partition, and a TimestampUTC to hold how the message should be ordered within the time index of the partition.

When you push a new message, the [Message.PartitionKey] will be assigned a uuidv4 if unset, and the [Message.TimestampUTC] will be set to the current time in utc.

func GetOffset

func GetOffset(path string, partitionIndex uint32, offset uint64) (m Message, ok bool, err error)

GetOffset finds and decodes a message at a given offset for a given partition.

type MessageWithOffset

type MessageWithOffset struct {
	Message
	// PartitionIndex is the partition the message was read from.
	PartitionIndex uint32
	// Offset is the offset within the partition the message was read from.
	Offset uint64
}

MessageWithOffset is a special wrapping type for messages that adds the partition index and the offset of messages read by consumers.

type OffsetMarker

type OffsetMarker struct {
	// contains filtered or unexported fields
}

OffsetMarker is a file backed store for the latest offset seen by a consumer.

It is meant to be a durable mechanism to resume work at the last committed offset for a given partition.

OffsetMarkers do not write to disk immediately, they have configurable parameters for when to sync to disk in the background automatically, reducing the number of writes to the file.

You can alternatively call OffsetMarker.Sync directly on this struct, which will write the latest offset to disk.

func OpenOrCreateOffsetMarker

func OpenOrCreateOffsetMarker(path string, options OffsetMarkerOptions) (*OffsetMarker, bool, error)

OpenOrCreateOffsetMarker opens or creates a new offset marker.

It returns both the offset marker struct itself, and a bool "found" if the offset marker was present on disk. You can use this bool to control if and how you apply the offset marker to a consumer you're looking to start. The convenience helper OffsetMarker.ApplyToConsumerOptions has been provided for just this use case.

The path parameter is required and should point to the specific path of the file that will be created to store the latest offset as a file. It should be unique to the specific consumer (i.e. unique to the diskq path and partition index) of the consumer it tracks.

To store the latest offset processed, call OffsetMarker.SetLatestOffset.

The offset isn't written to disk immediately; instead depending on the autosync options provided, it will write on a ticker with a given duration (e.g. every 500 milliseconds) or after N latest offsets are set.

The offset is also written to disk regardless of the autosync options when the offset marker is closed.

You can also call OffsetMarker.Sync yourself to write the latest offset to disk on demand though it is discouraged to do this in favor of using autosync.

func (*OffsetMarker) ApplyToConsumerOptions

func (om *OffsetMarker) ApplyToConsumerOptions(options *ConsumerOptions)

ApplyToConsumerOptions configures a given consumer options reference to begin where this offset marker has been recorded to have last left off.

func (*OffsetMarker) Close

func (om *OffsetMarker) Close() error

Close will close the offset marker, flushing the last seen offset marker to disk and closing any other file handles or tickers.

Once an offset marker is closed it cannot be reused.

func (*OffsetMarker) Errors

func (om *OffsetMarker) Errors() <-chan error

Errors returns a channel that will contain errors from writing to the offset marker file.

It is important that you check these errors!

func (*OffsetMarker) LatestOffset

func (om *OffsetMarker) LatestOffset() uint64

LatestOffset returns the latest offset for the offset marker.

func (*OffsetMarker) SetLatestOffset

func (om *OffsetMarker) SetLatestOffset(offset uint64)

SetLatestOffset sets the latest offset for the offset marker.

Calling this function does not write the offset to disk, instead autosync will write the latest offset to disk on a time based ticker or after certain number of offsets are set.

You can also call OffsetMarker.Sync to write the latest offset to disk on demand, but it is recommended that you just let autosync write it to disk for you to save extra file writes.

The latest offset will be written to disk on OffsetMarker.Close regardless of the autosync settings.

func (*OffsetMarker) Sync

func (om *OffsetMarker) Sync() error

Sync writes the latest offset to disk on demand.

In practice you should not call this function, instead it is better to configure autosync settings on the offset marker options to write the latest offset to disk automatically.

type OffsetMarkerOptions

type OffsetMarkerOptions struct {
	// AutosyncInterval enable autosync, specifically such that
	// every given interval in time the marker will record the lastet
	// offset recorded to the disk file.
	//
	// If AutosyncInterval is unset, the offset marker will not
	// create a ticker to sync the latest offset to disk.
	AutosyncInterval time.Duration
	// AutosyncEveryOffset enable autosync, specifically such that
	// every N offsets recorded the marker will sync the latest offset
	// to the disk file.
	//
	// If AutosyncEveryOffset is unset, this autosync type will be skipped.
	AutosyncEveryOffset uint64
}

OffsetMarkerOptions are options for autosyncing an offset marker.

type Options

type Options struct {
	// PartitionCount is the nubmer of partitions to split data across.
	//
	// Each partition is sized and vacuumed separately, and consumers can be opened
	// against individual partitions.
	//
	// Messages are assigned to partitions based on their partition key, with the goal
	// of an even distribution of messages given a randomized partition key.
	//
	// If unset, a default value of 3 for [Options.PartitionCount] will be used.
	PartitionCount uint32 `json:"partition_count,omitempty"`
	// SegmentSizeBytes is the size in bytes of a segement of each partition.
	//
	// When writing new messages, if the partition message data file exceeds
	// this size a new segment will be created, allowing a partition
	// to be split across multiple files with a general goal for
	// the size of the data file. Because the segment is closed when
	// the data file size exceeds the [Options.SegmentSizeBytes] in practice
	// segments will be slightly larger than the setting when they're closed.
	//
	// These segments are also the atomic unit that is deleted when
	// vacuuming the partition; to have a tigher budget for keeping
	// partitions near a given size, you should use a smaller segment size.
	//
	// If unset, a default value of 32MiB for [Options.SegmentSizeBytes] will be used.
	SegmentSizeBytes int64 `json:"segment_size_bytes,omitempty"`
	// RetentionMaxBytes is the maximum size of a partition in bytes
	// as enforced by calling [Diskq.Vacuum]. The size constraint
	// applies to a single partition, and does not consider the
	// active partition size.
	RetentionMaxBytes int64 `json:"retention_max_bytes,omitempty"`
	// RetentionMaxAge is the maximum age of messages in a partition
	// as enforced by calling [Diskq.Vacuum].
	RetentionMaxAge time.Duration `json:"retention_max_age,omitempty"`
}

Options are the options for the diskq.

func MaybeReadOptions

func MaybeReadOptions(path string) (cfg Options, found bool, err error)

MaybeReadOptions tries to read the previous options that were written to the default location when a diskq was accessed last.

You can then pass these options to the constructor for the diskq.

func (Options) PartitionCountOrDefault

func (c Options) PartitionCountOrDefault() uint32

PartitionCountOrDefault returns the partition count or a default value.

The default value is 3 partitions.

func (Options) SegmentSizeBytesOrDefault

func (c Options) SegmentSizeBytesOrDefault() int64

SegmentSizeBytesOrDefault returns the partition segment size in bytes or a default.

The default value is 32MiB.

type Partition

type Partition struct {
	// contains filtered or unexported fields
}

Partition organizes the time ordered series of messages that make up a stream.

The partition is responsible for rotating which segment it's writing to actively, as well a partition is responsible for implementing the vacuum steps for itself.

func (*Partition) Close

func (p *Partition) Close() error

func (*Partition) Sync

func (p *Partition) Sync() error

Sync calls fsync on the underlying active segment file handles for the partition.

func (*Partition) Vacuum

func (p *Partition) Vacuum() error

Vacuum removes old segments from the partitions as defined by the diskq Options fields [Options.RetentionMaxBytes] and [Options.RetentionMaxAge].

You will almost never need to call this function directly, and should instead call Diskq.Vacuum.

func (*Partition) Write

func (p *Partition) Write(message Message) (offset uint64, err error)

Write writes a given message to the active segment of the partition.

You will almost never need to call this directly, instead call Diskq.Push.

type PartitionStats

type PartitionStats struct {
	PartitionIndex uint32

	OldestOffset       uint64
	OldestOffsetActive uint64
	NewestOffset       uint64

	OldestTimestamp       time.Time
	OldestTimestampActive time.Time
	NewestTimestamp       time.Time

	Segments  int
	SizeBytes uint64
}

PartitionStats holds information about a diskq stream partition.

type Segment

type Segment struct {
	// contains filtered or unexported fields
}

Segment represents an individual block of data within a partition.

It is principally responsible for the actual "writing" of data to disk.

func (*Segment) Close

func (s *Segment) Close() error

Close closes the segment.

func (*Segment) Sync

func (s *Segment) Sync() error

Sync calls fsync on the (3) underlying file handles for the segment.

You should almost never need to call this, but it's here in case you do want to.

type SegmentIndex

type SegmentIndex [3]uint64

SegmentIndex is an individual element of an index file.

It is implemented as (3) uint64 values.

- 0: is the offset of the message it represents. - 1: the offset from the start of the file the message appears in bytes. - 2: the size of the message in bytes.

func NewSegmentIndex

func NewSegmentIndex(offset uint64, offsetBytes uint64, sizeBytes uint64) SegmentIndex

NewSegmentIndex returns a new segment index element.

func (SegmentIndex) GetOffset

func (si SegmentIndex) GetOffset() uint64

GetOffset gets the offset this entry corresponds to.

func (SegmentIndex) GetOffsetBytes

func (si SegmentIndex) GetOffsetBytes() uint64

GetOffsetBytes gets the offset from the start of the data file in bytes this entry appears in the data file.

func (SegmentIndex) GetSizeBytes

func (si SegmentIndex) GetSizeBytes() uint64

GetSizeBytes returns the size in bytes of the data file entry that corresponds to this offset.

It is used when reading out the data file, specifically we will allocate this many bytes to read from for the data file entry.

type SegmentTimeIndex

type SegmentTimeIndex [2]uint64

SegmentTimeIndex is a fixed with element of a time index file.

It is composed of (2) uint64 values. - 0: the offset of the message this segment refers to. - 1: the timestamp of the message, represented as unix nanos.

func NewSegmentTimeIndex

func NewSegmentTimeIndex(offset uint64, timestamp time.Time) SegmentTimeIndex

NewSegmentTimeIndex returns a new segment time index struct.

func (SegmentTimeIndex) GetOffset

func (sti SegmentTimeIndex) GetOffset() uint64

GetOffset gets the offset the time index entry corresponds to.

func (SegmentTimeIndex) GetTimestampUTC

func (sti SegmentTimeIndex) GetTimestampUTC() time.Time

GetTimestampUTC gets the timestamp (as recorded as nanos) from the index entry.

type Stats

type Stats struct {
	Path         string
	SizeBytes    uint64
	InUse        bool
	TotalOffsets uint64
	Age          time.Duration
	Partitions   []PartitionStats
}

Stats holds information about a diskq stream.

func GetStats

func GetStats(path string) (*Stats, error)

GetStats returns stats about a stream rooted at a given path.

type UUID

type UUID [16]byte

UUID is a unique id.

func UUIDv4

func UUIDv4() (output UUID)

UUIDv4 returns a new version 4 unique identifier.

func (UUID) IsZero

func (id UUID) IsZero() bool

IsZero returns if the identifier is unset.

func (UUID) Short

func (id UUID) Short() string

Short returns the short hex representation of the id.

In practice this is the last ~8 bytes of the identifier.

func (UUID) String

func (id UUID) String() string

String returns the full hex representation of the id.

Directories

Path Synopsis
cmd
diskq Module
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL