sharded

package
v0.4.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2023 License: Apache-2.0 Imports: 8 Imported by: 0

README

About

tl;dr

A purpose-built implementation of memlog.Log with support for sharding (partitioning) Records by key.

Usage

The sharded.Log is built on memlog.Log and provides a similar API for reading and writing data (Records), Offset handling, etc.

The Read() and Write() methods accept a sharding key to distribute the Records based on a (configurable) sharding strategy.

Unless specified otherwise, the default sharding strategy uses Golang's fnv.New32a to retrieve a hash and find the corresponding Shard using a modulo operation based on the number of (configurable) Shards in the Log.

💡 Depending on the number of Shards, number of distinct keys and their hashes, multiple keys might be stored in the same Shard. If strict key separation is required, a custom Sharder can be implemented. For convenience, a KeySharder is provided.

See pkg.go.dev for the API reference and examples.

Example

package main

import (
	"context"
	"errors"
	"fmt"
	"os"

	"github.com/embano1/memlog"
	"github.com/embano1/memlog/sharded"
)

func main() {
	ctx := context.Background()

	// the default number of shards (1000) is sufficient to assign a shard per key
	// for this example (i.e. no key overlap within a shard)
	l, err := sharded.New(ctx)
	if err != nil {
		fmt.Printf("create log: %v", err)
		os.Exit(1)
	}

	data := map[string][]string{
		"users":  {"tom", "sarah", "ajit"},
		"groups": {"friends", "family", "colleagues"},
	}

	for key, vals := range data {
		for _, val := range vals {
			_, err := l.Write(ctx, []byte(key), []byte(val))
			if err != nil {
				fmt.Printf("write: %v", err)
				os.Exit(1)
			}
		}
	}

	fmt.Println("reading all users...")
	offset := memlog.Offset(0)
	for {
		record, err := l.Read(ctx, []byte("users"), offset)
		if err != nil {
			if errors.Is(err, memlog.ErrFutureOffset) {
				break // end of log
			}
			fmt.Printf("read: %v", err)
			os.Exit(1)
		}

		fmt.Printf("- %s\n", string(record.Data))
		offset++
	}

	// Output: reading all users...
	// - tom
	// - sarah
	// - ajit
}

Documentation

Overview

Example
package main

import (
	"context"
	"errors"
	"fmt"
	"os"

	"github.com/embano1/memlog"
	"github.com/embano1/memlog/sharded"
)

func main() {
	ctx := context.Background()

	// the default number of shards (1000) is sufficient to assign a shard per key
	// for this example (i.e. no key overlap within a shard)
	l, err := sharded.New(ctx)
	if err != nil {
		fmt.Printf("create log: %v", err)
		os.Exit(1)
	}

	data := map[string][]string{
		"users":  {"tom", "sarah", "ajit"},
		"groups": {"friends", "family", "colleagues"},
	}

	for key, vals := range data {
		for _, val := range vals {
			_, err := l.Write(ctx, []byte(key), []byte(val))
			if err != nil {
				fmt.Printf("write: %v", err)
				os.Exit(1)
			}
		}
	}

	fmt.Println("reading all users...")
	offset := memlog.Offset(0)
	for {
		record, err := l.Read(ctx, []byte("users"), offset)
		if err != nil {
			if errors.Is(err, memlog.ErrFutureOffset) {
				break // end of log
			}
			fmt.Printf("read: %v", err)
			os.Exit(1)
		}

		fmt.Printf("- %s\n", string(record.Data))
		offset++
	}

}
Output:

reading all users...
- tom
- sarah
- ajit
Example (Sharder)
package main

import (
	"context"
	"errors"
	"fmt"
	"os"

	"github.com/embano1/memlog"
	"github.com/embano1/memlog/sharded"
)

func main() {
	ctx := context.Background()

	keys := []string{"galaxies", "planets"}
	ks := sharded.NewKeySharder(keys)

	opts := []sharded.Option{
		sharded.WithNumShards(uint(len(keys))), // must be >=len(keys)
		sharded.WithSharder(ks),
	}
	l, err := sharded.New(ctx, opts...)
	if err != nil {
		fmt.Printf("create log: %v", err)
		os.Exit(1)
	}

	data := map[string][]string{
		keys[0]: {"Centaurus A", "Andromeda", "Eye of Sauron"},
		keys[1]: {"Mercury", "Venus", "Earth", "Mars", "Jupiter", "Saturn", "Uranus", "Neptune"},
	}

	for key, vals := range data {
		for _, val := range vals {
			_, err := l.Write(ctx, []byte(key), []byte(val))
			if err != nil {
				fmt.Printf("write: %v", err)
				os.Exit(1)
			}
		}
	}

KEYS:
	for _, key := range keys {
		fmt.Printf("reading all %s...\n", key)

		offset := memlog.Offset(0)
		for {
			read, err := l.Read(ctx, []byte(key), offset)
			if err != nil {
				if errors.Is(err, memlog.ErrFutureOffset) {
					fmt.Println()
					continue KEYS
				}
				fmt.Printf("read: %v", err)
				os.Exit(1)
			}

			fmt.Printf("- %s\n", string(read.Data))
			offset++
		}

	}

}
Output:

reading all galaxies...
- Centaurus A
- Andromeda
- Eye of Sauron

reading all planets...
- Mercury
- Venus
- Earth
- Mars
- Jupiter
- Saturn
- Uranus
- Neptune

Index

Examples

Constants

View Source
const (
	// DefaultShards is the number of shards unless specified otherwise
	DefaultShards = 1000
	// DefaultStartOffset is the start offset in a shard
	DefaultStartOffset = memlog.DefaultStartOffset
	// DefaultSegmentSize is the segment size, i.e. number of offsets, in a shard
	DefaultSegmentSize = memlog.DefaultSegmentSize
	// DefaultMaxRecordDataBytes is the maximum data (payload) size of a record in a shard
	DefaultMaxRecordDataBytes = memlog.DefaultMaxRecordDataBytes
)

Variables

This section is empty.

Functions

This section is empty.

Types

type KeySharder

type KeySharder struct {
	// contains filtered or unexported fields
}

KeySharder assigns a shard per unique key

func NewKeySharder

func NewKeySharder(keys []string) *KeySharder

NewKeySharder creates a new key-based Sharder, assigning a shard to each unique key. The caller must ensure that there are at least len(keys) shards available in the log.

func (*KeySharder) Shard

func (k *KeySharder) Shard(key []byte, shards uint) (uint, error)

Shard implements Sharder interface

type Log

type Log struct {
	// contains filtered or unexported fields
}

Log is a sharded log implementation on top of memlog.Log. It uses a configurable sharding strategy (see Sharder interface) during reads and writes.

func New

func New(ctx context.Context, options ...Option) (*Log, error)

New creates a new sharded log which can be customized with options. If not specified, the default sharding strategy uses fnv.New32a for key hashing.

func (*Log) Read

func (l *Log) Read(ctx context.Context, key []byte, offset memlog.Offset) (memlog.Record, error)

Read reads a record from the log at offset using the specified key for shard lookup

func (*Log) Write

func (l *Log) Write(ctx context.Context, key []byte, data []byte) (memlog.Offset, error)

Write writes data to the log using the specified key for sharding

type Option

type Option func(*Log) error

Option customizes a log

func WithClock

func WithClock(c clock.Clock) Option

WithClock uses the specified clock for setting record timestamps

func WithMaxRecordDataSize

func WithMaxRecordDataSize(size int) Option

WithMaxRecordDataSize sets the maximum record data (payload) size in bytes in each shard

func WithMaxSegmentSize

func WithMaxSegmentSize(size int) Option

WithMaxSegmentSize sets the maximum size, i.e. number of offsets, in each shard. Must be greater than 0.

func WithNumShards

func WithNumShards(n uint) Option

WithNumShards sets the number of shards in a log

func WithSharder

func WithSharder(s Sharder) Option

WithSharder uses the specified sharder for key sharding

func WithStartOffset

func WithStartOffset(offset memlog.Offset) Option

WithStartOffset sets the start offset of each shard. Must be equal or greater than 0.

type Sharder

type Sharder interface {
	Shard(key []byte, shards uint) (uint, error)
}

Sharder returns a shard for the specified key and number of shards in the log. The number of shards specified is expected to match the number of shards during log creation to avoid undefined behavior.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL