rsync

package
v0.9.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 17, 2019 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package rsync provides an implementation of the rsync algorithm as described in Andrew Tridgell's thesis (https://www.samba.org/~tridge/phd_thesis.pdf) and the rsync technical report (https://rsync.samba.org/tech_report). Rsync algorithmic functionality is provided by the Engine type, and a transport protocol for pipelined rsync operations is provided by the Transmit function and Receiver types.

Index

Constants

View Source
const (

	// DefaultBlockSize is the default block size that will be used if a zero
	// value is passed into Engine.Signature for the blockSize parameter.
	DefaultBlockSize = 1 << 13
	// DefaultMaximumDataOperationSize is the default maximum data size
	// permitted per operation. The optimal value for this isn't at all
	// correlated with block size - it's just what's reasonable to hold
	// in-memory and pass over the wire in a single transmission. This value
	// will be used if a zero value is passed into Engine.Deltafy or
	// Engine.DeltafyBytes for the maxDataOpSize parameter.
	DefaultMaximumDataOperationSize = 1 << 14
)

Variables

This section is empty.

Functions

func DecodeToReceiver

func DecodeToReceiver(decoder Decoder, count uint64, receiver Receiver) error

DecodeToReceiver decodes messages from the specified Decoder and forwards them to the specified receiver. It must be passed the number of files to be received so that it knows when forwarding is complete. It is designed to be used with an encoding receiver, such as that returned by NewEncodingReceiver. It finalizes the provided receiver before returning.

func OptimalBlockSizeForBase

func OptimalBlockSizeForBase(base io.Seeker) (uint64, error)

OptimalBlockSizeForBase is a convenience function that will determine the optimal block size for a base that implements io.Seeker. It calls down to OptimalBlockSizeForBaseLength. After determining the base's length, it will attempt to reset the base to its original position.

func OptimalBlockSizeForBaseLength

func OptimalBlockSizeForBaseLength(baseLength uint64) uint64

OptimalBlockSizeForBaseLength uses a simpler heuristic to choose a block size based on the base length. It starts by choosing the optimal block length using the formula given in the rsync thesis. It then enforces that the block size is within a sensible range. TODO: Should we add rounding to "nice" values, e.g. the nearest multiple of 1024 bytes? Would this improve read throughput?

func Transmit

func Transmit(root string, paths []string, signatures []*Signature, receiver Receiver) error

Transmit performs streaming transmission of files (in rsync deltafied form) to the specified receiver. It is the responsibility of the caller to ensure that the provided signatures are valid by invoking their EnsureValid method. In order for this function to perform efficiently, paths should be passed in depth-first traversal order.

Types

type BlockHash

type BlockHash struct {
	// Weak is the weak hash for the block.
	Weak uint32 `protobuf:"varint,1,opt,name=weak,proto3" json:"weak,omitempty"`
	// Strong is the strong hash for the block.
	Strong               []byte   `protobuf:"bytes,2,opt,name=strong,proto3" json:"strong,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

BlockHash represents a pair of weak and strong hash for a base block.

func (*BlockHash) Descriptor added in v0.7.0

func (*BlockHash) Descriptor() ([]byte, []int)

func (*BlockHash) EnsureValid added in v0.7.0

func (h *BlockHash) EnsureValid() error

EnsureValid verifies that block hash invariants are respected.

func (*BlockHash) GetStrong added in v0.7.0

func (m *BlockHash) GetStrong() []byte

func (*BlockHash) GetWeak added in v0.7.0

func (m *BlockHash) GetWeak() uint32

func (*BlockHash) ProtoMessage added in v0.7.0

func (*BlockHash) ProtoMessage()

func (*BlockHash) Reset added in v0.7.0

func (m *BlockHash) Reset()

func (*BlockHash) String added in v0.7.0

func (m *BlockHash) String() string

func (*BlockHash) XXX_DiscardUnknown added in v0.7.0

func (m *BlockHash) XXX_DiscardUnknown()

func (*BlockHash) XXX_Marshal added in v0.7.0

func (m *BlockHash) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*BlockHash) XXX_Merge added in v0.7.0

func (m *BlockHash) XXX_Merge(src proto.Message)

func (*BlockHash) XXX_Size added in v0.7.0

func (m *BlockHash) XXX_Size() int

func (*BlockHash) XXX_Unmarshal added in v0.7.0

func (m *BlockHash) XXX_Unmarshal(b []byte) error

type Decoder

type Decoder interface {
	// Decoder decodes a transmission encoded by an encoder. The transmission
	// should be decoded into the specified Transmission object, which will be a
	// non-nil zero-valued Transmission object. The decoder is *not* responsible
	// for validating that the transmission is valid before returning it.
	// TODO: We should really elaborate on the semantics of Decoder, in
	// particular how it is allowed to re-use existing allocations within the
	// Transmission object.
	Decode(*Transmission) error
	// Finalize is called when decoding is finished. The Decoder can use this
	// call to close any underlying transmission resources.
	Finalize() error
}

Encoder is the interface used by DecodeToReceiver to receive transmissions, usually across a network.

type Encoder

type Encoder interface {
	// Encode encodes and transmits a transmission. The provided transmission
	// will never be nil. The transmission passed to the encoder may be re-used
	// and modified, so the encoder should not hold on to the transmission
	// between calls (it should either transmit it or fully copy it if
	// transmission is going to be delayed).
	Encode(*Transmission) error
	// Finalize is called when the transmission stream is finished. The Encoder
	// can use this call to close any underlying transmission resources.
	Finalize() error
}

Encoder is the interface used by an encoding receiver to forward transmissions, usually across a network.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine provides rsync functionality without any notion of transport. It is designed to be re-used to avoid heavy buffer allocation.

func NewEngine

func NewEngine() *Engine

NewEngine creates a new rsync engine.

func (*Engine) BytesSignature

func (e *Engine) BytesSignature(base []byte, blockSize uint64) *Signature

BytesSignature computes the signature for a byte slice.

func (*Engine) Deltafy

func (e *Engine) Deltafy(target io.Reader, base *Signature, maxDataOpSize uint64, transmit OperationTransmitter) error

Deltafy computes delta operations to reconstitute the target data stream using the base stream (based on the provided base signature). It streams operations to the provided transmission function. The internal engine buffer will be resized to the sum of the maximum data operation size plus the block size, and retained for the lifetime of the engine, so a reasonable value for the maximum data operation size should be provided. For performance reasons, this method does not validate that the provided signature satisfies expected invariants. It is the responsibility of the caller to verify that the signature is valid by calling its EnsureValid method. This is not necessary for signatures generated in the same process, but should be done for signatures received from untrusted locations (e.g. over the network). An invalid signature can result in undefined behavior.

func (*Engine) DeltafyBytes

func (e *Engine) DeltafyBytes(target []byte, base *Signature, maxDataOpSize uint64) []*Operation

DeltafyBytes computes delta operations for a byte slice. Unlike the streaming Deltafy method, it returns a slice of operations, which should be reasonable since the target data can already fit into memory. The internal engine buffer will be resized to the sum of the maximum data operation size plus the block size, and retained for the lifetime of the engine, so a reasonable value for the maximum data operation size should be provided. For performance reasons, this method does not validate that the provided signature satisfies expected invariants. It is the responsibility of the caller to verify that the signature is valid by calling its EnsureValid method. This is not necessary for signatures generated in the same process, but should be done for signatures received from untrusted locations (e.g. over the network). An invalid signature can result in undefined behavior.

func (*Engine) Patch

func (e *Engine) Patch(destination io.Writer, base io.ReadSeeker, signature *Signature, operation *Operation) error

Patch applies a single operation against a base stream to reconstitute the target into the destination stream. For performance reasons, this method does not validate that the provided signature and operation satisfy expected invariants. It is the responsibility of the caller to verify that the signature and operation are valid by calling their respective EnsureValid methods. This is not necessary for signatures and operations generated in the same process, but should be done for signatures and operations received from untrusted locations (e.g. over the network). An invalid signature or operation can result in undefined behavior.

func (*Engine) PatchBytes

func (e *Engine) PatchBytes(base []byte, signature *Signature, delta []*Operation) ([]byte, error)

Patch applies a series of operations against a base byte slice to reconstitute the target byte slice. For performance reasons, this method does not validate that the provided signature and operation satisfy expected invariants. It is the responsibility of the caller to verify that the signature and operation are valid by calling their respective EnsureValid methods. This is not necessary for signatures and operations generated in the same process, but should be done for signatures and operations received from untrusted locations (e.g. over the network). An invalid signature or operation can result in undefined behavior.

func (*Engine) Signature

func (e *Engine) Signature(base io.Reader, blockSize uint64) (*Signature, error)

Signature computes the signature for a base stream. If the provided block size is 0, this method will attempt to compute the optimal block size (which requires that base implement io.Seeker), and failing that will fall back to a default block size.

type Monitor

type Monitor func(*ReceiverStatus) error

Monitor is the interface that monitors must implement to capture status information from a monitoring receiver. The argument provided to this function will be allocated on each update and can be kept by the monitoring callback. There's no point in attempting to re-use the allocated argument because (a) it would be complicated and the callback would most likely just copy it anyway and (b) it will only be allocated once per received file, and the per-file allocations are already significantly higher.

type Operation

type Operation struct {
	// Data contains data for data operations. If its length is 0, the operation
	// is assumed to be a non-data operation. Operation transmitters and
	// receivers may thus treat a length-0 buffer as semantically equivalent to
	// a nil buffer and utilize that fact to efficiently re-use buffer capacity,
	// e.g. by truncating the buffer and doing a gob receive into it.
	Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
	// Start is the 0-indexed starting block for block operations.
	Start uint64 `protobuf:"varint,2,opt,name=start,proto3" json:"start,omitempty"`
	// Count is the number of blocks for block operations.
	Count                uint64   `protobuf:"varint,3,opt,name=count,proto3" json:"count,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

Operation represents an rsync operation, which can be either a data operation or a block operation.

func (*Operation) Copy added in v0.7.0

func (o *Operation) Copy() *Operation

Copy creates a deep copy of an operation.

func (*Operation) Descriptor added in v0.7.0

func (*Operation) Descriptor() ([]byte, []int)

func (*Operation) EnsureValid added in v0.7.0

func (o *Operation) EnsureValid() error

EnsureValid verifies that operation invariants are respected.

func (*Operation) GetCount added in v0.7.0

func (m *Operation) GetCount() uint64

func (*Operation) GetData added in v0.7.0

func (m *Operation) GetData() []byte

func (*Operation) GetStart added in v0.7.0

func (m *Operation) GetStart() uint64

func (*Operation) ProtoMessage added in v0.7.0

func (*Operation) ProtoMessage()

func (*Operation) Reset added in v0.7.0

func (m *Operation) Reset()

func (*Operation) String added in v0.7.0

func (m *Operation) String() string

func (*Operation) XXX_DiscardUnknown added in v0.7.0

func (m *Operation) XXX_DiscardUnknown()

func (*Operation) XXX_Marshal added in v0.7.0

func (m *Operation) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Operation) XXX_Merge added in v0.7.0

func (m *Operation) XXX_Merge(src proto.Message)

func (*Operation) XXX_Size added in v0.7.0

func (m *Operation) XXX_Size() int

func (*Operation) XXX_Unmarshal added in v0.7.0

func (m *Operation) XXX_Unmarshal(b []byte) error

type OperationTransmitter

type OperationTransmitter func(*Operation) error

OperationTransmitter transmits an operation. Operation objects and their data buffers are re-used between calls to the transmitter, so the transmitter should not return until it has either transmitted the operation or copied it for later transmission.

type Receiver

type Receiver interface {
	// Receive processes a single message in a transmission stream.
	Receive(*Transmission) error
	// contains filtered or unexported methods
}

Receiver manages the streaming reception of multiple files. It should be used in conjunction with the Transmit function.

func NewEncodingReceiver

func NewEncodingReceiver(encoder Encoder) Receiver

NewEncodingReceiver creates a new receiver that handles messages by encoding them with the specified Encoder. It is designed to be used with DecodeToReceiver.

func NewMonitoringReceiver

func NewMonitoringReceiver(receiver Receiver, paths []string, monitor Monitor) Receiver

NewMonitoringReceiver wraps a receiver and provides monitoring information via a callback.

func NewPreemptableReceiver

func NewPreemptableReceiver(receiver Receiver, run context.Context) Receiver

NewPreemptableReceiver wraps a receiver and aborts on Receive if the specified context has been cancelled.

func NewReceiver

func NewReceiver(root string, paths []string, signatures []*Signature, sinker Sinker) (Receiver, error)

NewReceiver creates a new receiver that stores files on disk. It is the responsibility of the caller to ensure that the provided signatures are valid by invoking their EnsureValid method. In order for the receiver to perform efficiently, paths should be passed in depth-first traversal order.

type ReceiverStatus

type ReceiverStatus struct {
	// Path is the path currently being received.
	Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
	// Received is the number of paths that have already been received.
	Received uint64 `protobuf:"varint,2,opt,name=received,proto3" json:"received,omitempty"`
	// Total is the total number of paths expected.
	Total                uint64   `protobuf:"varint,3,opt,name=total,proto3" json:"total,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

ReceivingStatus encodes that status of an rsync receiver.

func (*ReceiverStatus) Descriptor

func (*ReceiverStatus) Descriptor() ([]byte, []int)

func (*ReceiverStatus) EnsureValid

func (s *ReceiverStatus) EnsureValid() error

EnsureValid ensures that ReceiverStatus' invariants are respected.

func (*ReceiverStatus) GetPath

func (m *ReceiverStatus) GetPath() string

func (*ReceiverStatus) GetReceived

func (m *ReceiverStatus) GetReceived() uint64

func (*ReceiverStatus) GetTotal

func (m *ReceiverStatus) GetTotal() uint64

func (*ReceiverStatus) ProtoMessage

func (*ReceiverStatus) ProtoMessage()

func (*ReceiverStatus) Reset

func (m *ReceiverStatus) Reset()

func (*ReceiverStatus) String

func (m *ReceiverStatus) String() string

func (*ReceiverStatus) XXX_DiscardUnknown

func (m *ReceiverStatus) XXX_DiscardUnknown()

func (*ReceiverStatus) XXX_Marshal

func (m *ReceiverStatus) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ReceiverStatus) XXX_Merge

func (m *ReceiverStatus) XXX_Merge(src proto.Message)

func (*ReceiverStatus) XXX_Size

func (m *ReceiverStatus) XXX_Size() int

func (*ReceiverStatus) XXX_Unmarshal

func (m *ReceiverStatus) XXX_Unmarshal(b []byte) error

type Signature

type Signature struct {
	// BlockSize is the block size used to compute the signature.
	BlockSize uint64 `protobuf:"varint,1,opt,name=blockSize,proto3" json:"blockSize,omitempty"`
	// LastBlockSize is the size of the last block in the signature.
	LastBlockSize uint64 `protobuf:"varint,2,opt,name=lastBlockSize,proto3" json:"lastBlockSize,omitempty"`
	// Hashes are the hashes of the blocks in the base.
	Hashes               []*BlockHash `protobuf:"bytes,3,rep,name=hashes,proto3" json:"hashes,omitempty"`
	XXX_NoUnkeyedLiteral struct{}     `json:"-"`
	XXX_unrecognized     []byte       `json:"-"`
	XXX_sizecache        int32        `json:"-"`
}

Signature represents an rsync base signature. It encodes the block size used to generate the signature, the size of the last block in the signature (which may be smaller than a full block), and the hashes for the blocks of the file.

func (*Signature) Descriptor added in v0.7.0

func (*Signature) Descriptor() ([]byte, []int)

func (*Signature) EnsureValid added in v0.7.0

func (s *Signature) EnsureValid() error

EnsureValid verifies that signature invariants are respected.

func (*Signature) GetBlockSize added in v0.7.0

func (m *Signature) GetBlockSize() uint64

func (*Signature) GetHashes added in v0.7.0

func (m *Signature) GetHashes() []*BlockHash

func (*Signature) GetLastBlockSize added in v0.7.0

func (m *Signature) GetLastBlockSize() uint64

func (*Signature) ProtoMessage added in v0.7.0

func (*Signature) ProtoMessage()

func (*Signature) Reset added in v0.7.0

func (m *Signature) Reset()

func (*Signature) String added in v0.7.0

func (m *Signature) String() string

func (*Signature) XXX_DiscardUnknown added in v0.7.0

func (m *Signature) XXX_DiscardUnknown()

func (*Signature) XXX_Marshal added in v0.7.0

func (m *Signature) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Signature) XXX_Merge added in v0.7.0

func (m *Signature) XXX_Merge(src proto.Message)

func (*Signature) XXX_Size added in v0.7.0

func (m *Signature) XXX_Size() int

func (*Signature) XXX_Unmarshal added in v0.7.0

func (m *Signature) XXX_Unmarshal(b []byte) error

type Sinker

type Sinker interface {
	// Sink should return a new io.WriteCloser for staging the given path. Each
	// result it returns will be closed before Sink is invoked again.
	Sink(path string) (io.WriteCloser, error)
}

Sinker provides the interface for a receiver to store incoming files.

type Transmission

type Transmission struct {
	// Done indicates that the operation stream for the current file is
	// finished. If set, there will be no operation in the response, but there
	// may be an error.
	Done bool `protobuf:"varint,1,opt,name=done,proto3" json:"done,omitempty"`
	// Operation is the next operation in the stream for the current file.
	Operation *Operation `protobuf:"bytes,2,opt,name=operation,proto3" json:"operation,omitempty"`
	// Error indicates that a non-terminal error has occurred. It will only be
	// present if Done is true.
	Error                string   `protobuf:"bytes,3,opt,name=error,proto3" json:"error,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

Transmission represents a single message in a transmission stream. Its internals are public to allow for transmission using a reflection-based encoder (such as gob), but it should otherwise be treated as an opaque type with a private implementation.

func (*Transmission) Descriptor added in v0.7.0

func (*Transmission) Descriptor() ([]byte, []int)

func (*Transmission) EnsureValid added in v0.7.0

func (t *Transmission) EnsureValid() error

EnsureValid ensures that the Transmission's invariants are respected.

func (*Transmission) GetDone added in v0.7.0

func (m *Transmission) GetDone() bool

func (*Transmission) GetError added in v0.7.0

func (m *Transmission) GetError() string

func (*Transmission) GetOperation added in v0.7.0

func (m *Transmission) GetOperation() *Operation

func (*Transmission) ProtoMessage added in v0.7.0

func (*Transmission) ProtoMessage()

func (*Transmission) Reset added in v0.7.0

func (m *Transmission) Reset()

func (*Transmission) String added in v0.7.0

func (m *Transmission) String() string

func (*Transmission) XXX_DiscardUnknown added in v0.7.0

func (m *Transmission) XXX_DiscardUnknown()

func (*Transmission) XXX_Marshal added in v0.7.0

func (m *Transmission) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Transmission) XXX_Merge added in v0.7.0

func (m *Transmission) XXX_Merge(src proto.Message)

func (*Transmission) XXX_Size added in v0.7.0

func (m *Transmission) XXX_Size() int

func (*Transmission) XXX_Unmarshal added in v0.7.0

func (m *Transmission) XXX_Unmarshal(b []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL