flatfs

package module
v0.0.0-...-4315a87 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2023 License: MIT Imports: 27 Imported by: 0

README

go-ds-flatfs

standard-readme compliant GoDoc Build Status Coverage Status

A datastore implementation using sharded directories and flat files to store data

go-ds-flatfs is used by go-ipfs to store raw block contents on disk. It supports several sharding functions (prefix, suffix, next-to-last/*).

It is not a general-purpose datastore and has several important restrictions. See the restrictions section for details.

Lead Maintainer

Jakub Sztandera

Table of Contents

Install

go-ds-flatfs can be used like any Go module:

import "github.com/ipfs/go-ds-flatfs"

Usage

Check the GoDoc module documentation for an overview of this module's functionality.

Restrictions

FlatFS keys are severely restricted. Only keys that match /[0-9A-Z+-_=]\+ are allowed. That is, keys may only contain upper-case alpha-numeric characters, '-', '+', '_', and '='. This is because values are written directly to the filesystem without encoding.

Importantly, this means namespaced keys (e.g., /FOO/BAR), are not allowed. Attempts to write to such keys will result in an error.

DiskUsage and Accuracy

This datastore implements the PersistentDatastore interface. It offers a DiskUsage() method which strives to find a balance between accuracy and performance. This implies:

  • The total disk usage of a datastore is calculated when opening the datastore
  • The current disk usage is cached frequently in a file in the datastore root (diskUsage.cache by default). This file is also written when the datastore is closed.
  • If this file is not present when the datastore is opened:
    • The disk usage will be calculated by walking the datastore's directory tree and estimating the size of each folder.
    • This may be a very slow operation for huge datastores or datastores with slow disks
    • The operation is time-limited (5 minutes by default).
    • Upon timeout, the remaining folders will be assumed to have the average of the previously processed ones.
  • After opening, the disk usage is updated in every write/delete operation.

This means that for certain datastores (huge ones, those with very slow disks or special content), the values reported by DiskUsage() might be reduced accuracy and the first startup (without a diskUsage.cache file present), might be slow.

If you need increased accuracy or a fast start from the first time, you can manually create or update the diskUsage.cache file.

The file diskUsage.cache is a JSON file with two fields diskUsage and accuracy. For example the JSON file for a small repo might be:

{"diskUsage":6357,"accuracy":"initial-exact"}

diskUsage is the calculated disk usage and accuracy is a note on the accuracy of the initial calculation. If the initial calculation was accurate the file will contain the value initial-exact. If some of the directories have too many entries and the disk usage for that directory was estimated based on the first 2000 entries, the file will contain initial-approximate. If the calculation took too long and timed out as indicated above, the file will contain initial-timed-out.

If the initial calculation timed out the JSON file might be:

{"diskUsage":7589482442898,"accuracy":"initial-timed-out"}

To fix this with a more accurate value you could do (in the datastore root):

$ du -sb .
7536515831332    .
$ echo -n '{"diskUsage":7536515831332,"accuracy":"initial-exact"}' > diskUsage.cache

Contribute

PRs accepted.

Small note: If editing the README, please conform to the standard-readme specification.

License

MIT © Protocol Labs, Inc.

Documentation

Overview

Package flatfs is a Datastore implementation that stores all objects in a two-level directory structure in the local file system, regardless of the hierarchy of the keys.

Package flatfs is a Datastore implementation that stores all objects in a two-level directory structure in the local file system, regardless of the hierarchy of the keys.

Index

Constants

View Source
const PREFIX = "/repo/flatfs/shard/"
View Source
const README_FN = "_README"
View Source
const SHARDING_FN = "SHARDING"
View Source
const SyncThreadsMax = 16

don't block more than 16 threads on sync opearation 16 should be able to sataurate most RAIDs in case of two used disks per write (RAID 1, 5) and queue depth of 2, 16 concurrent Sync calls should be able to saturate 16 HDDs RAID TODO: benchmark it out, maybe provide tweak parmeter

Variables

View Source
var (
	// DiskUsageFile is the name of the file to cache the size of the
	// datastore in disk
	DiskUsageFile = "diskUsage.cache"
	// DiskUsageFilesAverage is the maximum number of files per folder
	// to stat in order to calculate the size of the datastore.
	// The size of the rest of the files in a folder will be assumed
	// to be the average of the values obtained. This includes
	// regular files and directories.
	DiskUsageFilesAverage = 2000
	// DiskUsageCalcTimeout is the maximum time to spend
	// calculating the DiskUsage upon a start when no
	// DiskUsageFile is present.
	// If this period did not suffice to read the size of the datastore,
	// the remaining sizes will be stimated.
	DiskUsageCalcTimeout = 5 * time.Minute
	// RetryDelay is a timeout for a backoff on retrying operations
	// that fail due to transient errors like too many file descriptors open.
	RetryDelay = time.Millisecond * 200

	// RetryAttempts is the maximum number of retries that will be attempted
	// before giving up.
	RetryAttempts = 6
)
View Source
var (
	ErrDatastoreExists       = errors.New("datastore already exists")
	ErrDatastoreDoesNotExist = errors.New("datastore directory does not exist")
	ErrShardingFileMissing   = fmt.Errorf("%s file not found in datastore", SHARDING_FN)
	ErrClosed                = errors.New("datastore closed")
	ErrInvalidKey            = errors.New("key not supported by flatfs")
)
View Source
var IPFS_DEF_SHARD = NextToLast(2)
View Source
var IPFS_DEF_SHARD_STR = IPFS_DEF_SHARD.String()
View Source
var README_IPFS_DEF_SHARD = `` /* 1123-byte string literal not displayed */
View Source
var SHARD_COUNT = 32

Functions

func Create

func Create(path string, fun *ShardIdV1) error

func Deljl

func Deljl(key string)

func DowngradeV1toV0

func DowngradeV1toV0(path string) error

func Jl

func Jl(key string)

func Lz4_compress

func Lz4_compress(val []byte) (value []byte)

lz4解压缩

func Lz4_decompress

func Lz4_decompress(data []byte) (value []byte)

func Move

func Move(oldPath string, newPath string, out io.Writer) error

func Pr

func Pr()

func Snappy_compress

func Snappy_compress(val []byte) (value []byte)

snappy解压缩

func Snappy_decompress

func Snappy_decompress(data []byte) (value []byte)

func UpgradeV0toV1

func UpgradeV0toV1(path string, prefixLen int) error

func WriteReadme

func WriteReadme(dir string, id *ShardIdV1) error

func WriteShardFunc

func WriteShardFunc(dir string, id *ShardIdV1) error

func Zip_compress

func Zip_compress(val []byte) (value []byte)

zip解压缩

func Zip_decompress

func Zip_decompress(data []byte) (value []byte)

func Zlib_compress

func Zlib_compress(val []byte) (value []byte)

zlib解压缩

func Zlib_decompress

func Zlib_decompress(data []byte) (value []byte)

func Zstd_compress

func Zstd_compress(val []byte) (value []byte)

Zstd解压缩

func Zstd_decompress

func Zstd_decompress(data []byte) (value []byte)

Types

type ConcurrentMap

type ConcurrentMap[K comparable, V any] struct {
	// contains filtered or unexported fields
}

A "thread" safe map of type string:Anything. To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.

func New

func New[V any]() ConcurrentMap[string, V]

Creates a new concurrent map.

func NewStringer

func NewStringer[K Stringer, V any]() ConcurrentMap[K, V]

Creates a new concurrent map.

func NewWithCustomShardingFunction

func NewWithCustomShardingFunction[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V]

Creates a new concurrent map.

func (ConcurrentMap[K, V]) Clear

func (m ConcurrentMap[K, V]) Clear()

Clear removes all items from map.

func (ConcurrentMap[K, V]) Count

func (m ConcurrentMap[K, V]) Count() int

Count returns the number of elements within the map.

func (ConcurrentMap[K, V]) Get

func (m ConcurrentMap[K, V]) Get(key K) (V, bool)

Get retrieves an element from map under given key.

func (ConcurrentMap[K, V]) GetShard

func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V]

GetShard returns shard under given key

func (ConcurrentMap[K, V]) Has

func (m ConcurrentMap[K, V]) Has(key K) bool

Looks up an item under specified key

func (ConcurrentMap[K, V]) IsEmpty

func (m ConcurrentMap[K, V]) IsEmpty() bool

IsEmpty checks if map is empty.

func (ConcurrentMap[K, V]) Items

func (m ConcurrentMap[K, V]) Items() map[K]V

Items returns all items as map[string]V

func (ConcurrentMap[K, V]) Iter deprecated

func (m ConcurrentMap[K, V]) Iter() <-chan Tuple[K, V]

Iter returns an iterator which could be used in a for range loop.

Deprecated: using IterBuffered() will get a better performence

func (ConcurrentMap[K, V]) IterBuffered

func (m ConcurrentMap[K, V]) IterBuffered() <-chan Tuple[K, V]

IterBuffered returns a buffered iterator which could be used in a for range loop.

func (ConcurrentMap[K, V]) IterCb

func (m ConcurrentMap[K, V]) IterCb(fn IterCb[K, V])

Callback based iterator, cheapest way to read all elements in a map.

func (ConcurrentMap[K, V]) Keys

func (m ConcurrentMap[K, V]) Keys() []K

Keys returns all keys as []string

func (ConcurrentMap[K, V]) MSet

func (m ConcurrentMap[K, V]) MSet(data map[K]V)

func (ConcurrentMap[K, V]) MarshalJSON

func (m ConcurrentMap[K, V]) MarshalJSON() ([]byte, error)

Reviles ConcurrentMap "private" variables to json marshal.

func (ConcurrentMap[K, V]) Pop

func (m ConcurrentMap[K, V]) Pop(key K) (v V, exists bool)

Pop removes an element from the map and returns it

func (ConcurrentMap[K, V]) Remove

func (m ConcurrentMap[K, V]) Remove(key K)

Remove removes an element from the map.

func (ConcurrentMap[K, V]) RemoveCb

func (m ConcurrentMap[K, V]) RemoveCb(key K, cb RemoveCb[K, V]) bool

RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params If callback returns true and element exists, it will remove it from the map Returns the value returned by the callback (even if element was not present in the map)

func (ConcurrentMap[K, V]) Set

func (m ConcurrentMap[K, V]) Set(key K, value V)

Sets the given value under the specified key.

func (ConcurrentMap[K, V]) SetIfAbsent

func (m ConcurrentMap[K, V]) SetIfAbsent(key K, value V) bool

Sets the given value under the specified key if no value was associated with it.

func (*ConcurrentMap[K, V]) UnmarshalJSON

func (m *ConcurrentMap[K, V]) UnmarshalJSON(b []byte) (err error)

Reverse process of Marshal.

func (ConcurrentMap[K, V]) Upsert

func (m ConcurrentMap[K, V]) Upsert(key K, value V, cb UpsertCb[V]) (res V)

Insert or Update - updates existing element or inserts a new one using UpsertCb

type ConcurrentMapShared

type ConcurrentMapShared[K comparable, V any] struct {
	sync.RWMutex // Read Write mutex, guards access to internal map.
	// contains filtered or unexported fields
}

A "thread" safe string to anything map.

type Datastore

type Datastore struct {
	// contains filtered or unexported fields
}

Datastore implements the go-datastore Interface. Note this datastore cannot guarantee order of concurrent write operations to the same key. See the explanation in Put(). 数据存储实现go数据存储接口。 \/\/请注意,此数据存储不能保证对同一密钥执行并发\/\/写操作的顺序。请参见\/\/Put()中的说明。

func CreateOrOpen

func CreateOrOpen(path string, fun *ShardIdV1, sync bool) (*Datastore, error)

convenience method

func Open

func Open(path string, syncFiles bool) (*Datastore, error)

func (*Datastore) Accuracy

func (fs *Datastore) Accuracy() string

Accuracy returns a string representing the accuracy of the DiskUsage() result, the value returned is implementation defined and for informational purposes only

func (*Datastore) Batch

func (fs *Datastore) Batch(_ context.Context) (datastore.Batch, error)

func (*Datastore) Close

func (fs *Datastore) Close() error

func (*Datastore) Delete

func (fs *Datastore) Delete(ctx context.Context, key datastore.Key) error

Delete removes a key/value from the Datastore. Please read the Put() explanation about the handling of concurrent write operations to the same key.

func (*Datastore) DiskUsage

func (fs *Datastore) DiskUsage(ctx context.Context) (uint64, error)

DiskUsage implements the PersistentDatastore interface and returns the current disk usage in bytes used by this datastore.

The size is approximative and may slightly differ from the real disk values.

func (*Datastore) Get

func (fs *Datastore) Get(ctx context.Context, key datastore.Key) (value []byte, err error)

func (*Datastore) GetSize

func (fs *Datastore) GetSize(ctx context.Context, key datastore.Key) (size int, err error)

func (*Datastore) Get_writer

func (fs *Datastore) Get_writer(dir string, path string) (err error)

func (*Datastore) Has

func (fs *Datastore) Has(ctx context.Context, key datastore.Key) (exists bool, err error)

func (*Datastore) Put

func (fs *Datastore) Put(ctx context.Context, key datastore.Key, value []byte) error

Put stores a key/value in the datastore.

Note, that we do not guarantee order of write operations (Put or Delete) to the same key in this datastore.

For example. i.e. in the case of two concurrent Put, we only guarantee that one of them will come through, but cannot assure which one even if one arrived slightly later than the other. In the case of a concurrent Put and a Delete operation, we cannot guarantee which one will win.

func (*Datastore) Query

func (fs *Datastore) Query(ctx context.Context, q query.Query) (query.Results, error)

func (*Datastore) ShardStr

func (fs *Datastore) ShardStr() string

func (*Datastore) Sync

func (fs *Datastore) Sync(ctx context.Context, prefix datastore.Key) error

func (*Datastore) WriteBlockhotFile

func (fs *Datastore) WriteBlockhotFile(hot map[string]int, doSync bool)

type IterCb

type IterCb[K comparable, V any] func(key K, v V)

Iterator callbacalled for every key,value found in maps. RLock is held for all calls for a given shard therefore callback sess consistent view of a shard, but not across the shards

type RemoveCb

type RemoveCb[K any, V any] func(key K, v V, exists bool) bool

RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held If returns true, the element will be removed from the map

type ShardFunc

type ShardFunc func(string) string

type ShardIdV1

type ShardIdV1 struct {
	// contains filtered or unexported fields
}

func NextToLast

func NextToLast(suffixLen int) *ShardIdV1

func ParseShardFunc

func ParseShardFunc(str string) (*ShardIdV1, error)

func Prefix

func Prefix(prefixLen int) *ShardIdV1

func ReadShardFunc

func ReadShardFunc(dir string) (*ShardIdV1, error)

func Suffix

func Suffix(suffixLen int) *ShardIdV1

func (*ShardIdV1) Func

func (f *ShardIdV1) Func() ShardFunc

func (*ShardIdV1) String

func (f *ShardIdV1) String() string

type Stringer

type Stringer interface {
	fmt.Stringer
	comparable
}

type Tuple

type Tuple[K comparable, V any] struct {
	Key K
	Val V
}

Used by the Iter & IterBuffered functions to wrap two variables together over a channel,

type UpsertCb

type UpsertCb[V any] func(exist bool, valueInMap V, newValue V) V

Callback to return new element to be inserted into the map It is called while lock is held, therefore it MUST NOT try to access other keys in same map, as it can lead to deadlock since Go sync.RWLock is not reentrant

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL