pipeline

package
v1.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2020 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package pipeline is used to write/read content from/to a datastor cluster.

It allows to do this using a pipeline model, where data is turned into one or multiple datastor objects, and can also be optionally processed prior to storage, in a random or distributed manner.

Data written to the storage can be split, using a data splitter, into multiple smaller, "fixed-size", blocks. No padding will be applied should some (tail) data be too small to fit in the fixed-sized buffer. While this data splitting is optional, it is recommended, as it will allow you to write and read large files, without splitting the data up, this would be impossible, due to the fact that for processing all this data has to be read into memory of both the client and server, prior to storage in the database.

Each object has a key, which identifies the data. In this pipeline model, the key is generated automatically, using a cryptographic hasher. Ideally these keys are generated as signatures, as to proof ownership, but checksum-based keys are supported as well, and are in fact the default. This checksum/signature is generated using the raw (split) data slice, as it is prior to being processed. When content is read back, this key is also validated, as an extra validation (on top of the checksum provided by zstordb), as to ensure our data is the one we expect. For this reason in specific, it is recommended to use signature-based key generation when possible. See the 'crypto' package for more information about this hashing logic, and read its documentation especially if you are planning to provide your own hasher.

Data can be processed, which in the context of the pipeline means that it can be compressed and/or encrypted prior to storage, and decrypted and/or decompressed when the content is read back again. For this we make use of the 'processing' sub-package. This package defines the interfaces, types and logic used for processing data, but users can plugin their own compression/encryption processors as well, should this be desired. See the 'processing' package for more information.

Finally when the data is ready for storage, which is the case once its processed and has a key, it will be stored using the 'storage' subpackage, into a zstordb cluster of choice. An object can simply be stored on a random available shard, or it can be distributed using replication or erasure coding. While the default is the simple random storage, it is recommended to opt-in for erasure-code distribution, as it will give your data the greatest resilience offered by this package. See the 'storage' package for more information.

The easiest way to create a pipeline, is using the provided Config struct. Using a single configuration, and a pre-defined/created cluster, you are able that way to create an entire pipeline, ready for usage.

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultJobCount is the JobCount,
	// while creating a config-based pipeline, in case a jobCount lower than 1 is given.
	DefaultJobCount = runtime.NumCPU() * 2
)

Functions

func DefaultHasherConstructor

func DefaultHasherConstructor() (crypto.Hasher, error)

DefaultHasherConstructor is an implementation of a HasherConstructor, which can be used as a safe default HasherConstructor, by pipeline implementations, should such a constructor not be given by the user.

func DefaultProcessorConstructor

func DefaultProcessorConstructor() (processing.Processor, error)

DefaultProcessorConstructor is an implementation of a ProcessorConstructor, which can be used as a safe default ProcessorConstructor, by pipeline implementations, should such a constructor not be given by the user.

func NewChunkStorage

func NewChunkStorage(cfg ObjectDistributionConfig, cluster datastor.Cluster, jobCount int) (storage.ChunkStorage, error)

NewChunkStorage creates an object storage, using an easy-to-use distribution configuration. The datastor cluster has to be created upfront, and NewChunkStorage will panic if cluster is nil. The jobCount is optional and the DefaultJobCount will be used if the given count is 0 or less.

Types

type AsyncSplitterPipeline

type AsyncSplitterPipeline struct {
	// contains filtered or unexported fields
}

AsyncSplitterPipeline defines a parallel pipeline, which chunks all the content as it reads, and, processes and stores the read data as multiple objects. It is guaranteed that content that is written using this pipeline, can be read back in the same order as it is written.

func NewAsyncSplitterPipeline

func NewAsyncSplitterPipeline(cs storage.ChunkStorage, chunkSize int, pc ProcessorConstructor, hc HasherConstructor, jobCount int) *AsyncSplitterPipeline

NewAsyncSplitterPipeline creates a parallel pipeline, which chunks all the content as it reads, and, processes and stores them as multiple objects. It is guaranteed that content that is written using this pipeline, can be read back in the same order as it is written.

NewAsyncSplitterPipeline requires a non-nil ChunkStorage and will panic if it is missing. It also requires chunkSize to be positive, if not, it will panic as well.

If no ProcessorConstructor is given, a default constructor will be created for you, which will construct a processing.NopProcessor, effectively keeping the data unprocessed at all times. While the ProcessorConstructor is optional, it is recommended to define a valid constructor, as the storage of unprocessed data is both insecure and inefficient.

If no HasherConstructor is given, a default constructor will be created for you, which will construct a 256-bit crypto hasher for you, producing checksums as keys. While the HasherConstructor is optional and the default one performs well, it is still recommended to define a valid constructor, as it will allow you to give a HasherConstructor which creates a crypto Hasher that produces signatures as keys, rather than checksums.

If no jobCount is given, meaning it is 0 or less, DefaultJobCount will be used.

func (*AsyncSplitterPipeline) Check

func (asp *AsyncSplitterPipeline) Check(chunks []metatypes.Chunk, fast bool) (storage.CheckStatus, error)

Check implements Pipeline.Check

func (*AsyncSplitterPipeline) ChunkSize

func (asp *AsyncSplitterPipeline) ChunkSize() int

ChunkSize implements Pipeline.ChunkSize

func (*AsyncSplitterPipeline) Close

func (asp *AsyncSplitterPipeline) Close() error

Close implements Pipeline.Close

func (*AsyncSplitterPipeline) Delete

func (asp *AsyncSplitterPipeline) Delete(chunks []metatypes.Chunk) error

Delete implements Pipeline.Delete

func (*AsyncSplitterPipeline) Read

func (asp *AsyncSplitterPipeline) Read(chunks []metatypes.Chunk, w io.Writer) error

Read implements Pipeline.Read

The following graph visualizes the logic of this pipeline's Read method:

+--------------------------------------------------------------------------------+
| +--------------+                                                               |
| |[]*ChunkMeta  +----> Storage.Read +-+                                         |
| |     to       |                     |                                         |
| |chan ChunkMeta+----> Storage.Read +-+     +----------+                        |
| +----------+---+                     +-----> channels +--------------+         |
|            |   ...                   |     +----+-----+              |         |
|            |                         |          |         ...        |         |
|            +--------> Storage.Read +-+          |                    |         |
|                                         +-------v--------+  +--------v-------+ |
|                                         | Processor.Read |  | Processor.Read | |
|                                         |       +        |  |       +        | |
|                                         |   Hash/Data    |  |   Hash/Data    | |
|                                         |   Validation   |  |   Validation   | |
|                                         +-------+--------+  +--------+-------+ |
|                                                 |                    |         |
|                                                 |                    |         |
|                                             +---v--------------------v---+     |
|                                             |                            |     |
|                                             |     Data Composer          |     |
|                           io.Writer <-------+ (with internal buffer)     |     |
|                         (input param)       |                            |     |
|                                             +----------------------------+     |
+--------------------------------------------------------------------------------+

The data composer (and its internal buffer) is used, to ensure we write the raw chunks in the correct order to the io.Writer.

As soon as an error happens within any stage, at any point, the entire pipeline will be cancelled and that error is returned to the callee of this method.

If however only one chunk is given, a temporary created SingleObjectPipeline will be used, to read the data using the Read method of that pipeline, as to now spawn an entire async pipeline, when only one chunk is to be read. See (*SingleObjectPipeline).Read for more information about the logic for this scenario.

func (*AsyncSplitterPipeline) Repair

func (asp *AsyncSplitterPipeline) Repair(chunks []metatypes.Chunk) ([]metatypes.Chunk, error)

Repair implements Pipeline.Repair

func (*AsyncSplitterPipeline) Write

func (asp *AsyncSplitterPipeline) Write(r io.Reader) ([]metatypes.Chunk, error)

Write implements Pipeline.Write

The following graph visualizes the logic of this pipeline's Write method:

+-------------------------------------------------------------------+
| +----------+                                                      |
| | Splitter +---> Processor.Write +   +-----------+                |
| |   +      |          ...        +--->  buf-chan +-------+        |
| | Hasher   +---> Processor.Write +   +-----------+       |        |
| +----------+                                             |        |
|                 +-----------------+----------------------+        |
|                 v                 v                      v        |
|           Storage.Write     Storage.Write    ...   Storage.Write  |
|                 +                 +                      +        |
|                 |                 |                      |        |
|                 |   (ChunkMeta)   |      (ChunkMeta)     |        |
|                 |                 |                      |        |
|         +-------v-----------------v----------------------v-----+  |
|         |              ordered   [] metatypes.Chunk            |  |
|         +------------------------------------------------------+  |
+-------------------------------------------------------------------+

All channels are buffered, as to keep the pipeline as smooth as possible.

The chunks are stored and returned in an ordered slice. The order respected and defined by the order in which the data that created those chunks was read from the input io.Reader.

As soon as an error happens within any stage, at any point, the entire pipeline will be cancelled and that error is returned to the callee of this method.

type CompressionConfig

type CompressionConfig struct {
	// Mode defines the compression mode to use.
	// Note that not all compression algorithms might support all modes,
	// in which case they will fall back to the closest mode that is supported.
	// If this happens a warning will be logged.
	//
	// When no mode is defined (or an explicit empty string is defined),
	// no compressor will be created.
	//
	// All standard compression modes available are: default, best_speed, best_compression
	Mode processing.CompressionMode `yaml:"mode" json:"mode"`

	// The type of compression algorithm to use,
	// defining both the compressing and decompressing logic.
	// The string value (representing the compression algorithm type), is case-insensitive.
	//
	// The default compression type is: Snappy
	// All standard compression types available are: Snappy, LZ4, GZip
	//
	// In case you've registered a custom compression algorithm,
	// or have overridden a standard compression algorithm, using `processing.RegisterCompressorDecompressor`
	// you'll be able to use that compressor-decompressor, by providing its (stringified) type here.
	Type processing.CompressionType `yaml:"type" json:"type"`
}

CompressionConfig defines the configuration used to create a compressor-decompressor Processor.

type Config

type Config struct {
	// BlockSize defines the "fixed-size" of objects,
	// meaning that if to be written data is larger than this size,
	// it will be split up into multiple objects in order to satisfy this request.
	// If the BlockSize has a value of 0 or lower, no object will be split
	// into multiple objects prior to writing.
	BlockSize int `yaml:"block_size" json:"block_size"`

	// Hashing can not be disabled, as it is an essential part of the pipeline.
	// The keys of all stored blocks (in zstordb), are generated and
	// are equal to the checksum/signature of that block's (binary) data.
	//
	// While you cannot disable the hashing, you can however configure it.
	// Both the type of hashing algorithm can be chosen,
	// as well as the private key used to do the crypto-hashing.
	//
	// When no private key is available, this algorithm will generate a (crypto) checksum.
	// If, as recommended, a private key is available, a signature will be produced instead.
	//
	// See `HashingConfig` for more about its individual properties.
	Hashing HashingConfig `yaml:"hashing" json:"hashing"`

	// Compressor Processor Configuration, disabled by default.
	// Defining, if enabled, how to compress each block, while writing it,
	// and decompressing it while reading those blocks.
	//
	// See `CompressionConfig` for more about its individual properties.
	Compression CompressionConfig `yaml:"compression" json:"compression"`

	// Encryption algorithm configuration, defining, when enabled,
	// how to encrypt all blocks prior to writing, and decrypt them once again when reading.
	// When both Compression and Encryption is configured and used,
	// the compressed blocks will be encrypting when writing,
	// and the decrypted blocks will be decompressed.
	//
	// Encryption is disabled by default and can be enabled by providing
	// a valid private key. Optionally you can also define a different encryption algorithm on top of that.
	//
	// It is recommended to use encryption, and do so using the AES_256 algorithm.
	//
	// See EncryptionConfig for more information about its individual properties.
	Encryption EncryptionConfig `yaml:"encryption" json:"encryption"`

	// Distribution defines how all blocks should-be/are distributed.
	// These properties are optional, and when not given,
	// it will simply store each block on a single shard (zstordb server),
	// by default. Thus if you do not specify any of these properties,
	// (part of) your data is lost, as soon as
	// a shard used to store (part of) becomes unavailable.
	//
	// If only DataShardCount is given AND positive,
	// for all blocks, each block will be stored onto multiple shards (replication).
	//
	// If both DataShardCount is positive and ParityShardCount is positive as well,
	// erasure-code distribution is used, reducing the performance of the zstor client,
	// but increasing the data redendency (resilience) of the stored blocks.
	//
	// All other possible property combinations are illegal
	// and will result in an invalid Pipeline configuration,
	// returning an error upon creation.
	//
	// See ObjectDistributionConfig for more information about its individual properties.
	Distribution ObjectDistributionConfig `yaml:"distribution" json:"distribution"`
}

Config is used to configure and create a pipeline. While a pipeline can be manually created, or even defined using a custom implementation, creating a pipeline using this Config is the easiest and most recommended way to create a config.

When creating a pipeline as part of a 0-stor client, this config will be integrated as part of that client's config, this is for example the case when using client in the root client package, which is also the client used by the zstor command-line client/tool.

Note that you are required to use the same config at all times, for the same data (blocks). Data that was written with one config, are not guaranteed to be readable when using another config, and in most likelihood it is not possible at all.

With that said, make sure to keep your config stored securely, as your data might not be recoverable if you lose this. This is definitely the case in case you lose any credentials, such as a private key used for encryption (and hashing).

type EncryptionConfig

type EncryptionConfig struct {
	// Private key, the specific required length
	// is defined by the type of Encryption used.
	//
	// This key will also used by the crypto-hashing algorithm given,
	// if you did not define a separate key within the hashing configuration.
	PrivateKey string `yaml:"private_key" json:"private_key"`

	// The type of encryption algorithm to use,
	// defining both the encrypting and decrypting logic.
	// The string value (representing the encryption algorithm type), is case-insensitive.
	//
	// By default no type is used, disabling encryption,
	// encryption gets enabled as soon as a private key gets defined.
	// All standard types available are: AES
	//
	// Valid Key sizes for AES are: 16, 24 and 32 bytes
	// The recommended private key size is 32 bytes, this will select/use AES_256.
	//
	// In case you've registered a custom encryption algorithm,
	// or have overridden a standard encryption algorithm, using `processing.RegisterEncrypterDecrypter`
	// you'll be able to use that encrypter-decrypting, by providing its (stringified) type here.
	Type processing.EncryptionType `yaml:"type" json:"type"`
}

EncryptionConfig defines the configuration used to create an encrypter-decrypter Processor.

type HasherConstructor

type HasherConstructor func() (crypto.Hasher, error)

HasherConstructor is a constructor type which is used to create a unique Hasher for each goroutine where the Hasher is needed within a pipeline. This is required as a (crypto) Hasher is not thread-safe.

func NewHasherConstructor

func NewHasherConstructor(cfg HashingConfig) HasherConstructor

NewHasherConstructor creates a constructor, used to create a hasher, using a single and easy-to-use configuration, as much as needed. The configuration is optional however, and a nil-value can given, to create a default hasher constructor.

type HashingConfig

type HashingConfig struct {
	// The type of (crypto) hashing algorithm to use.
	// The string value (representing the hashing algorithm type), is case-insensitive.
	//
	// By default SHA_256 is used.
	// All standard types available are: SHA_256, SHA_512, Blake2b_256, Blake2b_512
	//
	// In case you've registered a custom hashing algorithm,
	// or have overridden a standard hashing algorithm, using `crypto.RegisterHasher`
	// you'll be able to use that registered hasher, by providing its (stringified) type here.
	Type crypto.HashType `yaml:"type" json:"type"`

	// PrivateKey is used to authorize the hash, proving ownership.
	// If not given, and you do use Encryption for all your data blocks,
	// as is recommended, the private key configured for Encryption
	// will also be used for the hashing (generation of block keys).
	//
	// It is recommend to have a private key available for hashing,
	// as this will make your hashing more secure and decrease the
	// chance of tamparing by a third party.
	//
	// Whether this private key is explicitly configured here,
	// or it is a shared key, and borrowed from the Encryption configuration,
	// is not as important, as your data will anyhow be
	// visible to an attacker as soon as it gained access to the Encryption's private key,
	// no matter if the Hashing private key is different or not.
	//
	// Hence this property should only really be used in case if for some reason,
	// you need/want a different private key for both hashing and encryption,
	// or in case for an even weirder reason, you want crypto-hashing,
	// while disabling encryption for the storage of the data (blocks).
	PrivateKey string `yaml:"private_key" json:"private_key"`
}

HashingConfig defines the configuration used to create a cryptographic hasher, which is used to generate object's keys. It can produce both checksums and signatures.

type ObjectDistributionConfig

type ObjectDistributionConfig struct {
	// Number of data shards to use for each stored block.
	// If only the DataShardCount is given, replication is used.
	// If used in combination with a positive ParityCount,
	// erasure-code distribution is used.
	DataShardCount int `yaml:"data_shards" json:"data_shards"`

	// Number of parity shards to use for each stored block.
	// When both this value and DataShardCount are positive, each,
	// erasure-code distribution is used to read and write all blocks.
	// When ParityCount is positive and DataShardCount is zero or lower,
	// it will invalidate this Config, as this is not an acceptable combination.
	//
	// Of all available data shards (defined by DataShardCount),
	// you can lose up to ParityCount of shards.
	// Meaning that if you have a DatashardCount of 10, and a ParityShardCount of 3,
	// you're data is read-able and repair-able as long
	// as you have 7 data shards, or more, available. In this example,
	// as soon as you only 6 data shards or less available, and have the others one unavailable,
	// your data will no longer be available and you will suffer from (partial) data loss.
	//
	// Note that when using this configuration,
	// make sure that you have at least DataShardCount+ParityShardCount shards available,
	// meaning that in our example of above you would need at least 13 shards.
	// However, it would be even safer if you could have make shards available than the minimum,
	// a this would mean you can still write and repair in case you lose some shards.
	// If, in our example, you would have less than 13 shards, but more than 6,
	// you would still be able to read data, writing and repairing would no longer be possible.
	// There in our example it would be better if we provide more than 13 shards.
	ParityShardCount int `yaml:"parity_shards" json:"parity_shards"`
}

ObjectDistributionConfig defines the configuration used to create an ChunkStorage, which on its turn defines the logic on how to store/load the individual objects that make up the (to be) stored data.

type Pipeline

type Pipeline interface {
	// Write content to a zstordb cluster,
	// the details depend upon the specific implementation.
	Write(r io.Reader) ([]metatypes.Chunk, error)
	// Read content from a zstordb cluster,
	// the details depend upon the specific implementation.
	Read(chunks []metatypes.Chunk, w io.Writer) error

	// Check if content stored on a zstordb cluster is (still) valid,
	// the details depend upon the specific implementation.
	Check(chunks []metatypes.Chunk, fast bool) (storage.CheckStatus, error)
	// Repair content stored on a zstordb cluster,
	// the details depend upon the specific implementation.
	Repair(chunks []metatypes.Chunk) ([]metatypes.Chunk, error)

	// Delete content stored on a zstordb cluster,
	// the details depend upon the specific implementation.
	Delete(chunks []metatypes.Chunk) error

	// ChunkSize returns the fixed chunk size, which is size used for all chunks,
	// except for the last chunk which might be less or equal to that chunk size.
	ChunkSize() int

	// Close any open resources.
	Close() error
}

Pipeline defines the interface to write and read content to/from a zstordb cluster.

Prior to storage content can be processed (compressed and/or encrypted), as well as split (into smaller chunks) and distributed in terms of replication or erasure coding.

Content written in one way, has to be read in a way that is compatible. Meaning that if content was compressed and encrypted using a certain configuration, it will have to be decrypted and decompressed using that same configuration, or else the content will not be able to be read.

func NewDevNull

func NewDevNull(chunkSize int) Pipeline

NewDevNull create a new /dev/null pipeline where all data is discared

func NewPipeline

func NewPipeline(cfg Config, cluster datastor.Cluster, jobCount int) (Pipeline, error)

NewPipeline creates a pipeline, using a given config and an already created datastor cluster. A nil-config is valid, and will allow you to create a default pipeline. The datastor cluster however is required, and NewPipeline will panic if no cluster is given. The jobCount parameter is optional and will default to DefaultJobCount when not given.

type ProcessorConstructor

type ProcessorConstructor func() (processing.Processor, error)

ProcessorConstructor is a constructor type which is used to create a unique Processor for each goroutine where the Processor is needed within a pipeline. This is required as a Processor is not thread-safe.

func NewProcessorConstructor

func NewProcessorConstructor(compression CompressionConfig, encryption EncryptionConfig) ProcessorConstructor

NewProcessorConstructor creates a constructor, used to create a processor, using easy-to-use configuration. Both the CompressionConfig and EncryptionConfig are optional, even though you should define them if you can.

type SingleObjectPipeline

type SingleObjectPipeline struct {
	// contains filtered or unexported fields
}

SingleObjectPipeline defines a single-threaded pipeline, which writes all the content it can read, at once, and write it as a single object (so no chunking). Optionally it can also process the data while writing and reading it.

func NewSingleObjectPipeline

NewSingleObjectPipeline creates single-threaded pipeline which writes all the content it can read, as a single object (no chunking), and processes and stores it all in sequence.

NewSingleObjectPipeline requires a non-nil ChunkStorage and will panic if it is missing.

If no ProcessorConstructor is given, a default constructor will be created for you, which will construct a processing.NopProcessor, effectively keeping the data unprocessed at all times. While the ProcessorConstructor is optional, it is recommended to define a valid constructor, as the storage of unprocessed data is both insecure and inefficient.

If no HasherConstructor is given, a default constructor will be created for you, which will construct a 256-bit crypto hasher for you, producing checksums as keys. While the HasherConstructor is optional and the default one performs well, it is still recommended to define a valid constructor, as it will allow you to give a HasherConstructor which creates a crypto Hasher that produces signatures as keys, rather than checksums.

func (*SingleObjectPipeline) Check

func (sop *SingleObjectPipeline) Check(chunks []metatypes.Chunk, fast bool) (storage.CheckStatus, error)

Check implements Pipeline.Check

func (*SingleObjectPipeline) ChunkSize

func (sop *SingleObjectPipeline) ChunkSize() int

ChunkSize implements Pipeline.ChunkSize

func (*SingleObjectPipeline) Close

func (sop *SingleObjectPipeline) Close() error

Close implements Pipeline.Close

func (*SingleObjectPipeline) Delete

func (sop *SingleObjectPipeline) Delete(chunks []metatypes.Chunk) error

Delete implements Pipeline.Delete

func (*SingleObjectPipeline) Read

func (sop *SingleObjectPipeline) Read(chunks []metatypes.Chunk, w io.Writer) error

Read implements Pipeline.Read

The following graph visualizes the logic of this pipeline's Read method:

+-------------------------------------------------------------+
|                                    +----------------------+ |
| metatypes.Chunk +-> storage.Read +--> Processor.Read +    | |
|                                    | Hash/Data Validation | |
|                                    +-----------+----------+ |
|                                                |            |
|                                io.Writer <-----+            |
+-------------------------------------------------------------+

As you can see, it is all blocking, sequential and the input data is expected to be only 1 chunk. If less or more than one chunk is given, an error will be returned before the pipeline even starts reading.

When an error is returned by a sub-call, at any point, the function will return immediately with that error.

func (*SingleObjectPipeline) Repair

func (sop *SingleObjectPipeline) Repair(chunks []metatypes.Chunk) ([]metatypes.Chunk, error)

Repair implements Pipeline.Repair

func (*SingleObjectPipeline) Write

func (sop *SingleObjectPipeline) Write(r io.Reader) ([]metatypes.Chunk, error)

Write implements Pipeline.Write

The following graph visualizes the logic of this pipeline's Write method:

+-----------------------------------------------------------------------+ | io.Reader+Hasher +-> Processor.Write +-> Storage.Write +-> meta.Meta | +-----------------------------------------------------------------------+

As you can see, it is all blocking, sequential and the input data is not split into chunks. Meaning this pipeline will always return single chunk, as long as the data was written successfully.

When an error is returned by a sub-call, at any point, the function will return immediately with that error.

Directories

Path Synopsis
Package crypto collects common cryptographic components.
Package crypto collects common cryptographic components.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL