Documentation ¶
Overview ¶
Package diskq provides a local filesystem streaming system similar to Kafka.
A simplified example might be:
producer, _ := diskq.New("streams", diskq.Options{}) _, _, _ = producer.Push(diskq.Message{Data:[]byte("hello world!")})
Here we both initialize and open for writing a diskq producer.
We then push a message into it, randomly assigning it to one of (by default, 3) partitions.
We can then consume messages from outside the producing process simply by listening for filesystem write events using consumers.
Index ¶
- Constants
- Variables
- func Decode(m *Message, r io.Reader) (err error)
- func Encode(m Message, wr io.Writer) (err error)
- func FormatPartitionIndexForPath(partitionIndex uint32) string
- func FormatPathForMarkedConsumerGroupOffsetMarker(dataPath, groupName string, partitionIndex uint32) string
- func FormatPathForPartition(path string, partitionIndex uint32) string
- func FormatPathForPartitions(path string) string
- func FormatPathForSegment(path string, partitionIndex uint32, startOffset uint64) string
- func FormatPathForSentinel(path string) string
- func FormatPathForSettings(path string) string
- func FormatStartOffsetForPath(startOffset uint64) string
- func GetOffsetAfter(path string, partitionIndex uint32, after time.Time) (offset uint64, found bool, err error)
- func GetPartitionSegmentStartOffsets(path string, partitionIndex uint32) (output []uint64, err error)
- func GetPartitionSizeBytes(path string, partitionIndex uint32, skipActiveSegment bool) (sizeBytes int64, err error)
- func GetPartitions(path string) ([]uint32, error)
- func GetSegmentNewestOffset(path string, partitionIndex uint32, startOffset uint64) (newestOffset uint64, err error)
- func GetSegmentNewestOldestOffsetFromTimeIndexHandle(f *os.File) (oldestOffset, newestOffset uint64, err error)
- func GetSegmentNewestTimestamp(path string, partitionIndex uint32, startOffset uint64) (ts time.Time, err error)
- func GetSegmentOldestNewestTimestamps(path string, partitionIndex uint32, startOffset uint64) (oldest, newest time.Time, err error)
- func GetSegmentOldestTimestamp(path string, partitionIndex uint32, startOffset uint64) (oldest time.Time, err error)
- func GetSegmentStartOffsetForOffset(entries []uint64, offset uint64) (uint64, bool)
- func OpenSegmentFileForRead(path string, partitionIndex uint32, startOffset uint64, ext string) (*os.File, error)
- func ParseSegmentOffsetFromPath(path string) (uint64, error)
- func Read(path string, partitionIndex uint32, fn func(MessageWithOffset) error) error
- type Consumer
- type ConsumerEndBehavior
- type ConsumerGroup
- type ConsumerGroupOptions
- type ConsumerOptions
- type ConsumerStartBehavior
- type Diskq
- type MarkedConsumerGroup
- type MarkedConsumerGroupOptions
- type Message
- type MessageWithOffset
- type OffsetMarker
- type OffsetMarkerOptions
- type Options
- type Partition
- type PartitionStats
- type Segment
- type SegmentIndex
- type SegmentTimeIndex
- type Stats
- type UUID
Constants ¶
const ( // ExtData is the extension of segment data files, including the leading dot. ExtData = ".data" // ExtIndex is the extension of segment index files, including the leading dot. ExtIndex = ".index" // ExtTimeIndex is the extension of segment timeindex files, including the leading dot. ExtTimeIndex = ".timeindex" )
const ( // DefaultPartitionCount is the default partition count. DefaultPartitionCount = 3 // DefaultSegmentSizeBytes is the default segment size in bytes. DefaultSegmentSizeBytes = 32 * 1024 * 1024 // 32mb )
Variables ¶
var SegmentIndexSizeBytes = binary.Size(SegmentIndex{})
SegmentIndexSizeBytes is the size in bytes of an entry in the segment index.
var SegmentTimeIndexSizeBytes = binary.Size(SegmentTimeIndex{})
SegmentTimeIndexSizeBytes is the size in bytes of a segment time index element.
Functions ¶
func FormatPartitionIndexForPath ¶
FormatPartitionIndexForPath returns a partition index as a string.
It's used as the final token for the path of a partition on disk.
func FormatPathForMarkedConsumerGroupOffsetMarker ¶
func FormatPathForMarkedConsumerGroupOffsetMarker(dataPath, groupName string, partitionIndex uint32) string
FormatPathForMarkedConsumerGroupOffsetMarker formats the name of the marked.
func FormatPathForPartition ¶
FormatPathForPartition formats a path string for an individual partition within the partitions directory of a stream path.
func FormatPathForPartitions ¶
FormatPathForPartitions formats a path string for the partitions directory within a stream directory.
func FormatPathForSegment ¶
FormatPathForSegment formats a path string for a specific segment of a given partition.
func FormatPathForSentinel ¶
FormatPathForSentinel formats a path string for a sentinel (or "owner") file for a given stream path.
func FormatPathForSettings ¶
FormatPathForSettings formats a path string for the settings file.
func FormatStartOffsetForPath ¶
FormatStartOffsetForPath formats a start offset as a string.
It's used as the file basename for a segments files (e.g. the index, data or timeindex).
func GetOffsetAfter ¶
func GetOffsetAfter(path string, partitionIndex uint32, after time.Time) (offset uint64, found bool, err error)
GetOffsetAfter returns the next offset after a given timestamp.
If no offet was found after that timestamp, typically because the timestamp is after the "newest" offset the found bool will be false.
GetOffsetAfter first will iterate over the individual partion segments until it finds one whose newest timestamp is before the given after timestamp value, then will use a binary search over the individual timestamps in the time index for the segment until the correct offset is found.
func GetPartitionSegmentStartOffsets ¶
func GetPartitionSegmentStartOffsets(path string, partitionIndex uint32) (output []uint64, err error)
GetPartitionSegmentStartOffsets gets the start offsets of a given partition as derrived by the filenames of the segments within a partition's data directory.
func GetPartitionSizeBytes ¶
func GetPartitionSizeBytes(path string, partitionIndex uint32, skipActiveSegment bool) (sizeBytes int64, err error)
GetPartitionSizeBytes gets the size in bytes of a partition by path and partition index.
It does this by iterating over the segment files for the partition and stat-ing the files.
func GetPartitions ¶
GetPartitions returns the partition indices of the partitions in a given stream by path.
func GetSegmentNewestOffset ¶
func GetSegmentNewestOffset(path string, partitionIndex uint32, startOffset uint64) (newestOffset uint64, err error)
GetSegmentOldestTimestamp opens a segment index file as indicated by the path, partitionIndex and startOffset for the file and returns the newest (or last) offset in the index.
func GetSegmentNewestOldestOffsetFromTimeIndexHandle ¶
func GetSegmentNewestOldestOffsetFromTimeIndexHandle(f *os.File) (oldestOffset, newestOffset uint64, err error)
GetSegmentNewestOldestOffsetFromTimeIndexHandle returns the oldest (or first) and newest (or last) offsets from an already open file handle to a timeindex by seeking to the 0th byte and reading a timeindex segment, then seeking to the -SegmentTimeIndexSizeBytes-th byte from the end of the file, and reading the last timeindex segment.
func GetSegmentNewestTimestamp ¶
func GetSegmentNewestTimestamp(path string, partitionIndex uint32, startOffset uint64) (ts time.Time, err error)
GetSegmentNewestTimestamp opens a segment timeindex file as indicated by the path, partitionIndex and startOffset for the file and returns the newest (or last) timestamp in the index.
func GetSegmentOldestNewestTimestamps ¶
func GetSegmentOldestNewestTimestamps(path string, partitionIndex uint32, startOffset uint64) (oldest, newest time.Time, err error)
GetSegmentOldestTimestamp opens a segment timeindex file as indicated by the path, partitionIndex and startOffset for the file and returns both the oldest (or first) and the newest (or last) timestamp in the index.
func GetSegmentOldestTimestamp ¶
func GetSegmentOldestTimestamp(path string, partitionIndex uint32, startOffset uint64) (oldest time.Time, err error)
GetSegmentOldestTimestamp opens a segment timeindex file as indicated by the path, partitionIndex and startOffset for the file and returns the oldest (or first) timestamp in the index.
func GetSegmentStartOffsetForOffset ¶
GetSegmentStartOffsetForOffset searches a given list of entries for a given offset such that the entry returned would correspond to the startoffset of the segment file that _would_ contain that offset.
func OpenSegmentFileForRead ¶
func OpenSegmentFileForRead(path string, partitionIndex uint32, startOffset uint64, ext string) (*os.File, error)
OpenSegmentFileForRead opens a segment file with a given extension in "read" mode with the correct flags.
func ParseSegmentOffsetFromPath ¶
ParseSegmentOffsetFromPath parses the segment offset from a given full file path.
func Read ¶
func Read(path string, partitionIndex uint32, fn func(MessageWithOffset) error) error
Read reads a given diskq at a given path and a given partition index, and calls a given function for each message read from the partition.
Read is optimized for reading the oldest offset to the newst offset of the given partition and does not wait for new messages to be published.
As a result, read is useful in situations where you want to bootstrap a system from the data on disk quickly, as there is no overhead of pushing into channels.
It is parameterized by partition because partitions are strictly ordered, but you cannot relate the relative order of many partitions.
If you want to read all partitions of a diskq, you can first enumerate the partitions with the helper GetPartitions, which will return an []uint32 you can pass the elements of as the partitionIndex parameter.
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer handles reading messages from a given partition.
Consumers can start at known offsets, or at queried offsets based on the start behavior.
Consumers can also end at specific offsets, or end when the last offset is read in the active segment, or just block and wait for new offsets to be written to the active segment (this is the default behavior).
func OpenConsumer ¶
func OpenConsumer(path string, partitionIndex uint32, options ConsumerOptions) (*Consumer, error)
OpenConsumer creates a new consumer for a given path, partition and options.
There can be many consumers for a given partition active at a given time, and you can consume partitions that may be written to by an active producer.
func (*Consumer) Close ¶
Close closes the consumer and frees any held resources like file handles.
Generally close is called when the read loop exits on its own, but you can stop a consumer early if you call `Close` before the read loop completes.
Once a consumer is closed it cannot be re-used; to start consuming again, open a new consumer with `OpenConsumer(...)`.
func (*Consumer) Errors ¶
Errors returns a channel that carries errors returned when reading messages through the lifetime of the consumer.
Generally there will be at most (1) error pushed into this channel, and the channel will be buffered so you can read it out later.
func (*Consumer) Messages ¶
func (c *Consumer) Messages() <-chan MessageWithOffset
Messages is how you will read the messages the consumer sees as it reads the segments for the given partition.
The messages channel is unbuffered, so you must read the message for the consumer to be able to continue.
type ConsumerEndBehavior ¶
type ConsumerEndBehavior uint8
ConsumerEndBehavior controls how the consumer behaves when the last offset is read in the active segment.
const ( // ConsumerEndBehaviorWait is a consumer end behavior that // will cause the consumer to wait for new offsets when it reaches // the end of the active segment. ConsumerEndBehaviorWait ConsumerEndBehavior = iota // ConsumerEndBehaviorAtOffset is a consumer end behavior that // will cause the consumer to exit when it reaches a given offset. ConsumerEndBehaviorAtOffset // ConsumerEndBehaviorAtOffset is a consumer end behavior that // will cause the consumer to exit when it reaches the // end of the active segment. ConsumerEndBehaviorClose )
func ParseConsumerEndBehavior ¶
func ParseConsumerEndBehavior(raw string) (endBehavior ConsumerEndBehavior, err error)
ParseConsumerEndBehavior parses a given raw string as a consumer end behavior.
func (ConsumerEndBehavior) String ¶
func (ceb ConsumerEndBehavior) String() string
String returns a string form of the consumer end behavior.
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
ConsumerGroup is a consumer that reads from all partitions at once, and periodically scans for new partitions, or stops reading partitions that may have been deleted.
Partitions are read from their start offset by default, and you can only control the end behavior in practice.
func OpenConsumerGroup ¶
func OpenConsumerGroup(dataPath string, options ConsumerGroupOptions) (*ConsumerGroup, error)
OpenConsumerGroup opens a new consumer group.
A consumer group reads from all partitions at once, and scans for new partitions if they're added, mapping each partition to an underlying consumer.
ConsumerGroups are not cooperative; they do not automatically split assignment of partitions. If you want to read a subset of partitions, you can configure which partitions the consumer will read using the [ConsumerGroupOptions.ShouldConsume] function.
func (*ConsumerGroup) Close ¶
func (cg *ConsumerGroup) Close() error
Close closes the consumer groups and all the consumers it may have started.
Close is safe to call more than once.
func (*ConsumerGroup) Errors ¶
func (cg *ConsumerGroup) Errors() <-chan error
Errors returns a channel that will receive errors from the individual consumers.
You should use the `err, ok := <-cg.Errors()` form of channel reads when reading this channel to detect if the channel is closed, which would indicate all of the consumer group consumers have reached the end of their respective partitions with the consuemr end behavior of "close", or if the consumer group itself is closed.
func (*ConsumerGroup) ID ¶
func (cg *ConsumerGroup) ID() UUID
ID returns a unique identifier for this consumer group.
func (*ConsumerGroup) Messages ¶
func (cg *ConsumerGroup) Messages() <-chan MessageWithOffset
Messages returns a channel that will receive messages from the individual consumers in the order they're read.
You should use the `msg, ok := <-cg.Messages()` form of channel reads when reading this channel to detect if the channel is closed, which would indicate the all of the consumer group consumers have reached the end of their respective partitions with the consumer end behavior of "close", or if the consumer group itself is closed.
type ConsumerGroupOptions ¶
type ConsumerGroupOptions struct { // ShouldConsume is a callback that is called when a partition is // seen, and should return "true" if the partition should be consumed // by this consumer group. // // If the callback is not set, it is assumed the consumer group will // read from all partitions. ShouldConsume func(uint32) bool // OptionsForConsumer is a callback that given a partition index // expects a ConsumerOptions or error to be returned. // // If an error is returned, the entire consumer group fails // and that error is pushed into the errors channel of the consumer group. OptionsForConsumer func(uint32) (ConsumerOptions, error) // OnCloseConsumer is a callback that is called when a consumer is closed. // // A consumer is closed when the group itself is closed, but can also happen // if the underlying partition the consumer is reading is deleted, though // in practice this should almost never happen. OnCloseConsumer func(uint32) error // PartitionScanInterval is the interval the consumer group will use // to scan for new partitions. PartitionScanInterval time.Duration }
ConsumerGroupOptions are extra options for consumer groups.
func (ConsumerGroupOptions) PartitionScanIntervalOrDefault ¶
func (cg ConsumerGroupOptions) PartitionScanIntervalOrDefault() time.Duration
PartitionScanIntervalOrDefault returns the partition scan interval or a default.
type ConsumerOptions ¶
type ConsumerOptions struct { StartBehavior ConsumerStartBehavior StartOffset uint64 EndBehavior ConsumerEndBehavior EndOffset uint64 }
ConsumerOptions are options that control how consumers behave.
type ConsumerStartBehavior ¶
type ConsumerStartBehavior uint8
ConsumerStartBehavior controls how the consumer determines the first offset it will read from.
const ( // ConsumerStartBehaviorOldest is a consumer start behavior that // starts a consumer at the absolute oldest offset present on disk. ConsumerStartBehaviorOldest ConsumerStartBehavior = iota // ConsumerStartBehaviorAtOffset is a consumer start behavior that // starts a consumer at a given offset. ConsumerStartBehaviorAtOffset // ConsumerStartBehaviorActiveSegmentOldest is a consumer start behavior that // starts a consumer at the beginning of the active segment for the partition. ConsumerStartBehaviorActiveSegmentOldest // ConsumerStartBehaviorNewest is a consumer start behavior that // starts a consumer at the end of the active segment, or the newest offset. ConsumerStartBehaviorNewest )
func ParseConsumerStartBehavior ¶
func ParseConsumerStartBehavior(raw string) (startBehavior ConsumerStartBehavior, err error)
ParseConsumerStartBehavior parses a raw string as a consumer start behavior.
func (ConsumerStartBehavior) String ¶
func (csb ConsumerStartBehavior) String() string
String returns a string form of the consumer start behavior.
type Diskq ¶
type Diskq struct {
// contains filtered or unexported fields
}
Diskq is the root struct of the diskq.
It could be thought of primarily as the "producer" in the streaming system; you will use this type to "Push" messages into the partitions.
Close will release open file handles that are held by the partitions.
func New ¶
New opens or creates a diskq based on a given path and config.
The `Diskq` type itself should be thought of as a producer with exclusive access to write to the data directory named in the config.
The path should be the location on disk you want to hold all the data associated with the diskq; you cannot split the diskq up across multiple disparate paths.
The [cfg] should be customized for what you'd like to use for this particular "session" of the diskq producer. You may, for example, change the partition count between sessions, or change the segment size and retention settings. If you'd like to reuse a previous session's options for the diskq, you can use the helper MaybeReadOptions, and pass the returned options from that helper as the [cfg] argument.
If the diskq exists on disk, existing partitions will be opened, and if the configured partition count is greater than the number of existing partitions, new empty partitions will be created. Existing "orphaned" partitions will be left in place until vacuuming potentially removes them.
func (*Diskq) Close ¶
Close releases any resources associated with the diskq and removes the sentinel file.
func (*Diskq) Push ¶
Push pushes a new message into the diskq, returning the partition it was written to, the offset it was written to, and any errors that were generated while writing the message.
func (*Diskq) Sync ¶
Sync calls `fsync` on each of the partition file handles.
This has the effect of realizing any buffered data to disk.
You shouldn't ever need to call this, but it's here if you do need to.
type MarkedConsumerGroup ¶
type MarkedConsumerGroup struct {
// contains filtered or unexported fields
}
MarkedConsumerGroup is a wrapped consumer group that has offset markers automatically configured.
func OpenMarkedConsumerGroup ¶
func OpenMarkedConsumerGroup(dataPath, groupName string, options MarkedConsumerGroupOptions) (*MarkedConsumerGroup, error)
OpenMarkedConsumerGroup returns a new marked consumer group that reads a given path.
A marked consumer group lets you open a consumer group with automatic progress tracking. It wraps a standard consumer group with offset markers for each partition at a known path within the diskq data directory. If opening a consumer group with the same name after it's already recorded some offsets, it will resume the consumer for each position at the previously recorded offset, overiding [ConsumerOptions.StartBehavior] and [ConsumerOptions.StartOffset] on the returned consumer options from the consumer group options [ConsumerGroupOptions.OptionsForPartition] delegate.
To manually record the offset for a given message as successfully processed, use the MarkedConsumerGroup.SetLatestOffset helper function on the MarkedConsumerGroup struct itself.
Alternatively, you can enable [MarkedConsumerGroupOptions.AutoSetLatestOffset] on the options for the marked consumer group which will set the latest offset for a message's partition when it's read automatically by the consumer group before it's passed to your channel receiver. This is not enabled by default because it is dangerous to make assumptions about if the message was processed successfully, but it is implemented here for convenience.
func (*MarkedConsumerGroup) Close ¶
func (m *MarkedConsumerGroup) Close() error
Close closes the consumer.
func (*MarkedConsumerGroup) Errors ¶
func (m *MarkedConsumerGroup) Errors() <-chan error
Errors returns the errors channel.
As with consumer groups, and consumers generally, you should use the
err, ok := <-mcg.Errors()
form of a channel read on this channel to detect when the channel is closed.
func (*MarkedConsumerGroup) Messages ¶
func (m *MarkedConsumerGroup) Messages() <-chan MessageWithOffset
Messages returns the messages channel.
As with consumer groups, and consumers generally, you should use the
msg, ok := <-mcg.Messages()
form of a channel read on this channel to detect when the channel is closed.
func (*MarkedConsumerGroup) SetLatestOffset ¶
func (m *MarkedConsumerGroup) SetLatestOffset(partitionIndex uint32, offset uint64)
SetLatestOffset sets the latest offset for a given partition.
type MarkedConsumerGroupOptions ¶
type MarkedConsumerGroupOptions struct { ConsumerGroupOptions OffsetMarkerOptions AutoSetLatestOffset bool }
MarkedConsumerGroupOptions are options for a marked consumer group.
type Message ¶
type Message struct { // PartitionKey is used to assign the message to a partition. // // The key will be hashed and then bucketed by the // number of active partitions. // // If unset, it will be assigned a uuidv4. PartitionKey string // TimestampUTC holds the timestamp that is used in the timeindex. // // If unset it will be assigned the current time in UTC. TimestampUTC time.Time // Data is the message's contents. Data []byte }
Message is a single element of data written to a diskq.
It includes a [Message.PartitionKey] used to assign it to a partition, and a TimestampUTC to hold how the message should be ordered within the time index of the partition.
When you push a new message, the [Message.PartitionKey] will be assigned a uuidv4 if unset, and the [Message.TimestampUTC] will be set to the current time in utc.
type MessageWithOffset ¶
type MessageWithOffset struct { Message // PartitionIndex is the partition the message was read from. PartitionIndex uint32 // Offset is the offset within the partition the message was read from. Offset uint64 }
MessageWithOffset is a special wrapping type for messages that adds the partition index and the offset of messages read by consumers.
type OffsetMarker ¶
type OffsetMarker struct {
// contains filtered or unexported fields
}
OffsetMarker is a file backed store for the latest offset seen by a consumer.
It is meant to be a durable mechanism to resume work at the last committed offset for a given partition.
OffsetMarkers do not write to disk immediately, they have configurable parameters for when to sync to disk in the background automatically, reducing the number of writes to the file.
You can alternatively call OffsetMarker.Sync directly on this struct, which will write the latest offset to disk.
func OpenOrCreateOffsetMarker ¶
func OpenOrCreateOffsetMarker(path string, options OffsetMarkerOptions) (*OffsetMarker, bool, error)
OpenOrCreateOffsetMarker opens or creates a new offset marker.
It returns both the offset marker struct itself, and a bool "found" if the offset marker was present on disk. You can use this bool to control if and how you apply the offset marker to a consumer you're looking to start. The convenience helper OffsetMarker.ApplyToConsumerOptions has been provided for just this use case.
The path parameter is required and should point to the specific path of the file that will be created to store the latest offset as a file. It should be unique to the specific consumer (i.e. unique to the diskq path and partition index) of the consumer it tracks.
To store the latest offset processed, call OffsetMarker.SetLatestOffset.
The offset isn't written to disk immediately; instead depending on the autosync options provided, it will write on a ticker with a given duration (e.g. every 500 milliseconds) or after N latest offsets are set.
The offset is also written to disk regardless of the autosync options when the offset marker is closed.
You can also call OffsetMarker.Sync yourself to write the latest offset to disk on demand though it is discouraged to do this in favor of using autosync.
func (*OffsetMarker) ApplyToConsumerOptions ¶
func (om *OffsetMarker) ApplyToConsumerOptions(options *ConsumerOptions)
ApplyToConsumerOptions configures a given consumer options reference to begin where this offset marker has been recorded to have last left off.
func (*OffsetMarker) Close ¶
func (om *OffsetMarker) Close() error
Close will close the offset marker, flushing the last seen offset marker to disk and closing any other file handles or tickers.
Once an offset marker is closed it cannot be reused.
func (*OffsetMarker) Errors ¶
func (om *OffsetMarker) Errors() <-chan error
Errors returns a channel that will contain errors from writing to the offset marker file.
It is important that you check these errors!
func (*OffsetMarker) LatestOffset ¶
func (om *OffsetMarker) LatestOffset() uint64
LatestOffset returns the latest offset for the offset marker.
func (*OffsetMarker) SetLatestOffset ¶
func (om *OffsetMarker) SetLatestOffset(offset uint64)
SetLatestOffset sets the latest offset for the offset marker.
Calling this function does not write the offset to disk, instead autosync will write the latest offset to disk on a time based ticker or after certain number of offsets are set.
You can also call OffsetMarker.Sync to write the latest offset to disk on demand, but it is recommended that you just let autosync write it to disk for you to save extra file writes.
The latest offset will be written to disk on OffsetMarker.Close regardless of the autosync settings.
func (*OffsetMarker) Sync ¶
func (om *OffsetMarker) Sync() error
Sync writes the latest offset to disk on demand.
In practice you should not call this function, instead it is better to configure autosync settings on the offset marker options to write the latest offset to disk automatically.
type OffsetMarkerOptions ¶
type OffsetMarkerOptions struct { // AutosyncInterval enable autosync, specifically such that // every given interval in time the marker will record the lastet // offset recorded to the disk file. // // If AutosyncInterval is unset, the offset marker will not // create a ticker to sync the latest offset to disk. AutosyncInterval time.Duration // AutosyncEveryOffset enable autosync, specifically such that // every N offsets recorded the marker will sync the latest offset // to the disk file. // // If AutosyncEveryOffset is unset, this autosync type will be skipped. AutosyncEveryOffset uint64 }
OffsetMarkerOptions are options for autosyncing an offset marker.
type Options ¶
type Options struct { // PartitionCount is the nubmer of partitions to split data across. // // Each partition is sized and vacuumed separately, and consumers can be opened // against individual partitions. // // Messages are assigned to partitions based on their partition key, with the goal // of an even distribution of messages given a randomized partition key. // // If unset, a default value of 3 for [Options.PartitionCount] will be used. PartitionCount uint32 `json:"partition_count,omitempty"` // SegmentSizeBytes is the size in bytes of a segement of each partition. // // When writing new messages, if the partition message data file exceeds // this size a new segment will be created, allowing a partition // to be split across multiple files with a general goal for // the size of the data file. Because the segment is closed when // the data file size exceeds the [Options.SegmentSizeBytes] in practice // segments will be slightly larger than the setting when they're closed. // // These segments are also the atomic unit that is deleted when // vacuuming the partition; to have a tigher budget for keeping // partitions near a given size, you should use a smaller segment size. // // If unset, a default value of 32MiB for [Options.SegmentSizeBytes] will be used. SegmentSizeBytes int64 `json:"segment_size_bytes,omitempty"` // RetentionMaxBytes is the maximum size of a partition in bytes // as enforced by calling [Diskq.Vacuum]. The size constraint // applies to a single partition, and does not consider the // active partition size. RetentionMaxBytes int64 `json:"retention_max_bytes,omitempty"` // RetentionMaxAge is the maximum age of messages in a partition // as enforced by calling [Diskq.Vacuum]. RetentionMaxAge time.Duration `json:"retention_max_age,omitempty"` }
Options are the options for the diskq.
func MaybeReadOptions ¶
MaybeReadOptions tries to read the previous options that were written to the default location when a diskq was accessed last.
You can then pass these options to the constructor for the diskq.
func (Options) PartitionCountOrDefault ¶
PartitionCountOrDefault returns the partition count or a default value.
The default value is 3 partitions.
func (Options) SegmentSizeBytesOrDefault ¶
SegmentSizeBytesOrDefault returns the partition segment size in bytes or a default.
The default value is 32MiB.
type Partition ¶
type Partition struct {
// contains filtered or unexported fields
}
Partition organizes the time ordered series of messages that make up a stream.
The partition is responsible for rotating which segment it's writing to actively, as well a partition is responsible for implementing the vacuum steps for itself.
func (*Partition) Sync ¶
Sync calls fsync on the underlying active segment file handles for the partition.
func (*Partition) Vacuum ¶
Vacuum removes old segments from the partitions as defined by the diskq Options fields [Options.RetentionMaxBytes] and [Options.RetentionMaxAge].
You will almost never need to call this function directly, and should instead call Diskq.Vacuum.
type PartitionStats ¶
type PartitionStats struct { PartitionIndex uint32 OldestOffset uint64 OldestOffsetActive uint64 NewestOffset uint64 OldestTimestamp time.Time OldestTimestampActive time.Time NewestTimestamp time.Time Segments int SizeBytes uint64 }
PartitionStats holds information about a diskq stream partition.
type Segment ¶
type Segment struct {
// contains filtered or unexported fields
}
Segment represents an individual block of data within a partition.
It is principally responsible for the actual "writing" of data to disk.
type SegmentIndex ¶
type SegmentIndex [3]uint64
SegmentIndex is an individual element of an index file.
It is implemented as (3) uint64 values.
- 0: is the offset of the message it represents. - 1: the offset from the start of the file the message appears in bytes. - 2: the size of the message in bytes.
func NewSegmentIndex ¶
func NewSegmentIndex(offset uint64, offsetBytes uint64, sizeBytes uint64) SegmentIndex
NewSegmentIndex returns a new segment index element.
func (SegmentIndex) GetOffset ¶
func (si SegmentIndex) GetOffset() uint64
GetOffset gets the offset this entry corresponds to.
func (SegmentIndex) GetOffsetBytes ¶
func (si SegmentIndex) GetOffsetBytes() uint64
GetOffsetBytes gets the offset from the start of the data file in bytes this entry appears in the data file.
func (SegmentIndex) GetSizeBytes ¶
func (si SegmentIndex) GetSizeBytes() uint64
GetSizeBytes returns the size in bytes of the data file entry that corresponds to this offset.
It is used when reading out the data file, specifically we will allocate this many bytes to read from for the data file entry.
type SegmentTimeIndex ¶
type SegmentTimeIndex [2]uint64
SegmentTimeIndex is a fixed with element of a time index file.
It is composed of (2) uint64 values. - 0: the offset of the message this segment refers to. - 1: the timestamp of the message, represented as unix nanos.
func NewSegmentTimeIndex ¶
func NewSegmentTimeIndex(offset uint64, timestamp time.Time) SegmentTimeIndex
NewSegmentTimeIndex returns a new segment time index struct.
func (SegmentTimeIndex) GetOffset ¶
func (sti SegmentTimeIndex) GetOffset() uint64
GetOffset gets the offset the time index entry corresponds to.
func (SegmentTimeIndex) GetTimestampUTC ¶
func (sti SegmentTimeIndex) GetTimestampUTC() time.Time
GetTimestampUTC gets the timestamp (as recorded as nanos) from the index entry.
type Stats ¶
type Stats struct { Path string SizeBytes uint64 InUse bool TotalOffsets uint64 Age time.Duration Partitions []PartitionStats }
Stats holds information about a diskq stream.