memlog

package module
v0.4.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2023 License: Apache-2.0 Imports: 6 Imported by: 8

README

Go Reference Tests Latest Release Go Report Card codecov go.mod Go version Mentioned in Awesome Go

About

tl;dr

An easy to use, lightweight, thread-safe and append-only in-memory data structure modeled as a Log.

The Log also serves as an abstraction and building block. See sharded.Log for an implementation of a sharded variant of memlog.Log.

❌ Note: this package is not about providing an in-memory logging library. To read more about the ideas behind memlog please see "The Log: What every software engineer should know about real-time data's unifying abstraction".

Motivation

I keep hitting the same user story (use case) over and over again: one or more clients connected to my application wanting to read an immutable stream of data, e.g. events or sensor data, in-order, concurrently (thread-safe) and asynchronously (at their own pace) and in a resource (memory) efficient way.

There's many solutions to this problem, e.g. exposing some sort of streaming API (gRPC, HTTP/REST long-polling) based on custom logic using Go channels or an internal ring buffer, or putting data into an external platform like Kafka, Redis Streams or RabbitMQ Streams.

The challenges I faced with these solutions were that either they were too complex (or simply overkill) for my problem. Or, the system I had to integrate with and read data from did not have a nice streaming API or Go SDK, thus repeating myself writing complex internal caching, buffering and concurrency handling logic for the client APIs.

I looked around and could not find a simple and easy to use Go library for this problem, so I created memlog: an easy to use, lightweight (in-memory), thread-safe, append-only log inspired by popular streaming systems with a minimal API using Go's standard library primitives 🤩

💡 For an end-to-end API modernization example using memlog see the vsphere-event-streaming project, which transforms a SOAP-based events API into an HTTP/REST streaming API.

Usage

	ml, _ := memlog.New(ctx) // create log
	offset, _ := ml.Write(ctx, []byte("Hello World")) // write some data
	record, _ := ml.Read(ctx, offset) // read back data
	fmt.Printf(string(record.Data)) // prints "Hello World"

The memlog API is intentionally kept minimal. A new Log is constructed with memlog.New(ctx, options...). Data as []byte is written to the log with Log.Write(ctx, data).

The first write to the Log using default Options starts at position (Offset) 0. Every write creates an immutable Record in the Log. Records are purged from the Log when the history segment is replaced (see notes below).

The earliest and latest Offset available in a Log can be retrieved with Log.Range(ctx).

A specified Record can be read with Log.Read(ctx, offset).

💡 Instead of manually polling the Log for new Records, the streaming API Log.Stream(ctx, startOffset) should be used.

(Not) one Log to rule them all

One is not constrained by just creating one Log. For certain use cases, creating multiple Logs might be useful. For example:

  • Manage completely different data sets/sizes in the same process
  • Setting different Log sizes (i.e. retention times), e.g. premium users will have access to a larger history of Records
  • Partitioning input data by type or key

💡 For use cases where you want to order the log by key(s), consider using the specialised sharded.Log.

Full Example

package main

import (
	"context"
	"fmt"
	"os"

	"github.com/embano1/memlog"
)

func main() {
	ctx := context.Background()
	l, err := memlog.New(ctx)
	if err != nil {
		fmt.Printf("create log: %v", err)
		os.Exit(1)
	}

	offset, err := l.Write(ctx, []byte("Hello World"))
	if err != nil {
		fmt.Printf("write: %v", err)
		os.Exit(1)
	}

	fmt.Printf("reading record at offset %d\n", offset)
	record, err := l.Read(ctx, offset)
	if err != nil {
		fmt.Printf("read: %v", err)
		os.Exit(1)
	}

	fmt.Printf("data says: %s", record.Data)

	// reading record at offset 0
	// data says: Hello World
}

Purging the Log

The Log is divided into an active and history segment. When the active segment is full (configurable via WithMaxSegmentSize()), it is sealed (i.e. read-only) and becomes the history segment. A new empty active segment is created for writes. If there is an existing history, it is replaced, i.e. all Records are purged from the history.

See pkg.go.dev for the API reference and examples.

A stateless Log? You gotta be kidding!

True, it sounds like an oxymoron. Why would someone use (build) an in-memory append-only log that is not durable?

I'm glad you asked 😀

This library certainly is not intended to replace messaging, queuing or streaming systems. It was built for use cases where there exists a durable data/event source, e.g. a legacy system, REST API, database, etc. that can't (or should not) be changed. But the requirement being that the (source) data should be made available over a streaming-like API, e.g. gRPC or processed by a Go application which requires the properties of a Log.

memlog helps as it allows to bridge between these different APIs and use cases as a building block to extract and store data Records from an external system into an in-memory Log (think ordered cache).

These Records can then be internally processed (lightweight ETL) or served asynchronously, in-order (Offset-based) and concurrently over a modern streaming API, e.g. gRPC or HTTP/REST (chunked encoding via long polling), to remote clients.

As another example of such an in-memory log-structured design, DDlog follows a similar approach, where a DDlog program is used in conjunction with a persistent database, with database records being fed to DDlog as ground facts.

Checkpointing

Given the data source needs to be durable in this design, one can optionally build periodic checkpointing logic using the Record Offset as the checkpoint value.

💡 When running in Kubernetes, kvstore provides a nice abstraction on top of a ConfigMap for such requirements.

If the memlog process crashes, it can then resume from the last checkpointed Offset, load the changes since then from the source and resume streaming.

💡 This approach is quiet similar to the Kubernetes ListerWatcher() pattern. See memlog_test.go for some inspiration.

Benchmark

I haven't done any extensive benchmarking or code optimization. Feel free to chime in and provide meaningful feedback/optimizations.

One could argue, whether using two slices (active and history data []Record as part of the individual segments) is a good engineering choice, e.g. over using a growable slice as an alternative.

The reason I went for two segments was that for me dividing the Log into multiple segments with fixed size (and capacity) was easier to reason about in the code (and I followed my intuition from how log-structured data platforms do it). I did not inspect the Go compiler optimizations, e.g. it might actually be smart and create one growable slice under the hood. 🤓

These are some results on my MacBook using a log size of 1,000 (records), i.e. where the Log history is constantly purged and new segments (slices) are created.

go test -v -run=none -bench=. -cpu 1,2,4,8,16 -benchmem
goos: darwin
goarch: amd64
pkg: github.com/embano1/memlog
cpu: Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
BenchmarkLog_write
BenchmarkLog_write              11107804               103.0 ns/op            89 B/op          1 allocs/op
BenchmarkLog_write-2            11115896               107.1 ns/op            89 B/op          1 allocs/op
BenchmarkLog_write-4            11419497               105.7 ns/op            89 B/op          1 allocs/op
BenchmarkLog_write-8            10253677               109.6 ns/op            89 B/op          1 allocs/op
BenchmarkLog_write-16           10865994               107.7 ns/op            89 B/op          1 allocs/op
BenchmarkLog_read
BenchmarkLog_read               24461548                49.49 ns/op           32 B/op          1 allocs/op
BenchmarkLog_read-2             25002574                46.63 ns/op           32 B/op          1 allocs/op
BenchmarkLog_read-4             23829378                47.47 ns/op           32 B/op          1 allocs/op
BenchmarkLog_read-8             22936821                47.47 ns/op           32 B/op          1 allocs/op
BenchmarkLog_read-16            24121807                48.25 ns/op           32 B/op          1 allocs/op
PASS
ok      github.com/embano1/memlog       12.541s

Documentation

Overview

Example
package main

import (
	"context"
	"fmt"
	"os"

	"github.com/embano1/memlog"
)

func main() {
	ctx := context.Background()
	l, err := memlog.New(ctx)
	if err != nil {
		fmt.Printf("create log: %v", err)
		os.Exit(1)
	}

	offset, err := l.Write(ctx, []byte("Hello World"))
	if err != nil {
		fmt.Printf("write: %v", err)
		os.Exit(1)
	}

	fmt.Printf("reading record at offset %d\n", offset)
	record, err := l.Read(ctx, offset)
	if err != nil {
		fmt.Printf("read: %v", err)
		os.Exit(1)
	}

	fmt.Printf("Data: %s", record.Data)

}
Output:

reading record at offset 0
Data: Hello World
Example (Batch)
package main

import (
	"context"
	"errors"
	"fmt"
	"os"

	"github.com/embano1/memlog"
)

func main() {
	const batchSize = 10

	ctx := context.Background()
	l, err := memlog.New(ctx)
	if err != nil {
		fmt.Printf("create log: %v", err)
		os.Exit(1)
	}

	// seed log with data
	for i := 0; i < 15; i++ {
		d := fmt.Sprintf(`{"id":%d,"message","hello world"}`, i)
		_, err = l.Write(ctx, []byte(d))
		if err != nil {
			fmt.Printf("write: %v", err)
			os.Exit(1)
		}
	}

	startOffset := memlog.Offset(0)
	batch := make([]memlog.Record, batchSize)

	fmt.Printf("reading batch starting at offset %d\n", startOffset)
	count, err := l.ReadBatch(ctx, startOffset, batch)
	if err != nil {
		fmt.Printf("read batch: %v", err)
		os.Exit(1)
	}
	fmt.Printf("records received in batch: %d\n", count)

	// print valid batch entries up to "count"
	for i := 0; i < count; i++ {
		r := batch[i]
		fmt.Printf("batch item: %d\toffset:%d\tdata: %s\n", i, r.Metadata.Offset, r.Data)
	}

	// read next batch and check if end of log reached
	startOffset += memlog.Offset(count)
	fmt.Printf("reading batch starting at offset %d\n", startOffset)
	count, err = l.ReadBatch(ctx, startOffset, batch)
	if err != nil {
		if errors.Is(err, memlog.ErrFutureOffset) {
			fmt.Println("reached end of log")
		} else {
			fmt.Printf("read batch: %v", err)
			os.Exit(1)
		}
	}
	fmt.Printf("records received in batch: %d\n", count)

	// print valid batch entries up to "count"
	for i := 0; i < count; i++ {
		r := batch[i]
		fmt.Printf("batch item: %d\toffset:%d\tdata: %s\n", i, r.Metadata.Offset, r.Data)
	}

}
Output:

reading batch starting at offset 0
records received in batch: 10
batch item: 0	offset:0	data: {"id":0,"message","hello world"}
batch item: 1	offset:1	data: {"id":1,"message","hello world"}
batch item: 2	offset:2	data: {"id":2,"message","hello world"}
batch item: 3	offset:3	data: {"id":3,"message","hello world"}
batch item: 4	offset:4	data: {"id":4,"message","hello world"}
batch item: 5	offset:5	data: {"id":5,"message","hello world"}
batch item: 6	offset:6	data: {"id":6,"message","hello world"}
batch item: 7	offset:7	data: {"id":7,"message","hello world"}
batch item: 8	offset:8	data: {"id":8,"message","hello world"}
batch item: 9	offset:9	data: {"id":9,"message","hello world"}
reading batch starting at offset 10
reached end of log
records received in batch: 5
batch item: 0	offset:10	data: {"id":10,"message","hello world"}
batch item: 1	offset:11	data: {"id":11,"message","hello world"}
batch item: 2	offset:12	data: {"id":12,"message","hello world"}
batch item: 3	offset:13	data: {"id":13,"message","hello world"}
batch item: 4	offset:14	data: {"id":14,"message","hello world"}
Example (Stream)
package main

import (
	"context"
	"errors"
	"fmt"
	"os"
	"time"

	"golang.org/x/sync/errgroup"

	"github.com/embano1/memlog"
)

func main() {
	// showing some custom options in action
	const (
		logStart     = 10
		logSize      = 100
		writeRecords = 10
	)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	opts := []memlog.Option{
		memlog.WithStartOffset(logStart),
		memlog.WithMaxSegmentSize(logSize),
	}
	l, err := memlog.New(ctx, opts...)
	if err != nil {
		fmt.Printf("create log: %v", err)
		os.Exit(1)
	}

	// write some records (offsets 10-14)
	for i := 0; i < writeRecords/2; i++ {
		d := fmt.Sprintf(`{"id":%d,"message","hello world"}`, i+logStart)
		_, err = l.Write(ctx, []byte(d))
		if err != nil {
			fmt.Printf("write: %v", err)
			os.Exit(1)
		}
	}

	eg, egCtx := errgroup.WithContext(ctx)

	_, latest := l.Range(egCtx)
	// stream records
	eg.Go(func() error {
		// start stream from latest (offset 14)
		stream := l.Stream(egCtx, latest)

		for {
			if r, ok := stream.Next(); ok {
				fmt.Printf("Record at offset %d says %q\n", r.Metadata.Offset, r.Data)
				continue
			}
			break
		}
		return stream.Err()
	})

	// continue writing while streaming
	eg.Go(func() error {
		for i := writeRecords / 2; i < writeRecords; i++ {
			d := fmt.Sprintf(`{"id":%d,"message","hello world"}`, i+logStart)
			_, err := l.Write(ctx, []byte(d))
			if err != nil && !errors.Is(err, context.Canceled) {
				return err
			}
		}
		return nil
	})

	// simulate SIGTERM after 2s
	eg.Go(func() error {
		time.Sleep(time.Second * 2)
		cancel()
		return nil
	})

	if err = eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
		fmt.Printf("run example: %v", err)
		os.Exit(1)
	}

}
Output:

Record at offset 14 says "{\"id\":14,\"message\",\"hello world\"}"
Record at offset 15 says "{\"id\":15,\"message\",\"hello world\"}"
Record at offset 16 says "{\"id\":16,\"message\",\"hello world\"}"
Record at offset 17 says "{\"id\":17,\"message\",\"hello world\"}"
Record at offset 18 says "{\"id\":18,\"message\",\"hello world\"}"
Record at offset 19 says "{\"id\":19,\"message\",\"hello world\"}"

Index

Examples

Constants

View Source
const (
	// DefaultStartOffset is the start offset of the log
	DefaultStartOffset = Offset(0)
	// DefaultSegmentSize is the segment size, i.e. number of offsets, in the log
	DefaultSegmentSize = 1024
	// DefaultMaxRecordDataBytes is the maximum data (payload) size of a record
	DefaultMaxRecordDataBytes = 1024 << 10 // 1MiB
)

Variables

View Source
var (
	// ErrRecordTooLarge is returned when the record data is larger than the
	// configured maximum record size
	ErrRecordTooLarge = errors.New("record data too large")
	// ErrFutureOffset is returned on reads when the specified offset is in the
	// future and not written yet
	ErrFutureOffset = errors.New("future offset")
	// ErrOutOfRange is returned when the specified offset is invalid for the log
	// configuration or already purged from history
	ErrOutOfRange = errors.New("offset out of range")
)

Functions

This section is empty.

Types

type Header struct {
	// Offset is the record offset relative to the log start
	Offset Offset `json:"offset,omitempty"`
	// Created is the UTC timestamp when a record was successfully written to the
	// log
	Created time.Time `json:"created"` // UTC
}

Header is metadata associated with a record

type Log

type Log struct {
	// contains filtered or unexported fields
}

Log is an append-only in-memory data structure storing records. Records are stored and retrieved using unique offsets. The log can be customized during initialization with New() to define a custom start offset, and size limits for the log and individual records.

The log is divided into an active and history segment. When the active segment is full (MaxSegmentSize), it becomes the read-only history segment and a new empty active segment with the same size is created.

The maximum number of records in a log is twice the configured segment size (active + history). When this limit is reached, the history segment is purged, replaced with the current active segment and a new empty active segment is created.

Safe for concurrent use.

func New

func New(_ context.Context, options ...Option) (*Log, error)

New creates an empty log with default options applied, unless specified otherwise.

func (*Log) Range

func (l *Log) Range(_ context.Context) (earliest, latest Offset)

Range returns the earliest and latest available record offset in the log. If the log is empty, an invalid offset (-1) for both return values is returned. If the log has been purged one or more times, earliest points to the oldest available record offset in the log, i.e. not the configured start offset.

Note that these values might have changed after retrieval, e.g. due to concurrent writes.

Safe for concurrent use.

func (*Log) Read

func (l *Log) Read(ctx context.Context, offset Offset) (Record, error)

Read reads a record from the log at the specified offset. If an error occurs, an invalid (empty) record and the error is returned.

Safe for concurrent use.

func (*Log) ReadBatch added in v0.3.0

func (l *Log) ReadBatch(ctx context.Context, offset Offset, batch []Record) (int, error)

ReadBatch reads multiple records into batch starting at the specified offset. The number of records read into batch and the error, if any, is returned.

ReadBatch will read at most len(batch) records, always starting at batch index 0. ReadBatch stops reading at the end of the log, indicated by ErrFutureOffset.

The caller must expect partial batch results and must not read more records from batch than indicated by the returned number of records. See the example for how to use this API.

Safe for concurrent use.

func (*Log) Stream

func (l *Log) Stream(ctx context.Context, start Offset) Stream

Stream returns a stream iterator to stream records, starting at the given start offset. If the start offset is in the future, stream will continuously poll until this offset is written.

Use Stream.Next() to read from the stream. See the example for how to use this API.

The returned stream iterator must only be used within the same goroutine.

func (*Log) Write

func (l *Log) Write(ctx context.Context, data []byte) (Offset, error)

Write creates a new record in the log with the provided data. The write offset of the new record is returned. If an error occurs, an invalid offset (-1) and the error is returned.

Safe for concurrent use.

type Offset

type Offset int

Offset is a monotonically increasing position of a record in the log

type Option

type Option func(*Log) error

Option customizes a log

func WithClock

func WithClock(c clock.Clock) Option

WithClock uses the specified clock for setting record timestamps

func WithMaxRecordDataSize

func WithMaxRecordDataSize(size int) Option

WithMaxRecordDataSize sets the maximum record data (payload) size in bytes

func WithMaxSegmentSize

func WithMaxSegmentSize(size int) Option

WithMaxSegmentSize sets the maximum size, i.e. number of offsets, in a log segment. Must be greater than 0.

func WithStartOffset

func WithStartOffset(offset Offset) Option

WithStartOffset sets the start offset of the log. Must be equal or greater than 0.

type Record

type Record struct {
	Metadata Header `json:"metadata"`
	Data     []byte `json:"data,omitempty"`
}

Record is an immutable entry in the log

type Stream added in v0.2.0

type Stream struct {
	// contains filtered or unexported fields
}

Stream is an iterator to stream records in order from a log. It must only be used within the same goroutine.

func (*Stream) Err added in v0.2.0

func (s *Stream) Err() error

Err returns the first error that has ocurred during streaming. This method should be called to inspect the error that caused stopping the iterator.

func (*Stream) Next added in v0.2.0

func (s *Stream) Next() (r Record, ok bool)

Next blocks until the next Record is available. ok is true if the iterator has not stopped, otherwise ok is false and any subsequent calls return an invalid record and false.

The caller must consult Err() which error caused stopping the error.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL