dedup: github.com/klauspost/dedup Index | Examples | Files | Directories

package dedup

import "github.com/klauspost/dedup"

depdup: A Streaming Deduplication package

This package implements streaming deduplication, allowing you to remove duplicated data in streams. It implements variable block sizes and automatic content block adaptation. It has a fully streaming mode and an indexed mode, that has significantly reduced memory requirements.

Read for an introduction to deduplication: https://blog.klauspost.com/fast-stream-deduplication-in-go

Package home: https://github.com/klauspost/dedup

Godoc: https://godoc.org/github.com/klauspost/dedup

Index

Examples

Package Files

dedup.go reader.go writer.go

Constants

const HashSize = hasher.Size

Size of the underlying hash in bytes for those interested.

const MinBlockSize = 512

The smallest "maximum" block size allowed.

Variables

var ErrMaxMemoryTooSmall = errors.New("there must be at be space for 1 block")

ErrMaxMemoryTooSmall is returned if the encoder isn't allowed to store even 1 block.

var ErrSizeTooSmall = errors.New("maximum block size too small. must be at least 512 bytes")

ErrSizeTooSmall is returned if the requested block size is smaller than hash size.

var ErrUnknownFormat = errors.New("unknown index format")

func BirthdayProblem Uses

func BirthdayProblem(blocks int) string

Returns an approximate Birthday probability calculation based on the number of blocks given and the hash size.

It uses the simplified calculation: p = k(k-1) / (2N)

From http://preshing.com/20110504/hash-collision-probabilities/

This shows an example of a birthday problem calculation. We calculate the probability of a collision of SHA1 hashes on 1 Terabyte data, using 1 Kilobyte blocks. With SHA-1, that gives a 1 in 2535301202817642046627252275200 chance of a collision occurring.

Code:

fmt.Println("Hash size is", dedup.HashSize*8, "bits")
fmt.Println("1TiB, 1KiB blocks:")
fmt.Println(dedup.BirthdayProblem((1 << 40) / (1 << 10)))

Output:

Hash size is 160 bits
1TiB, 1KiB blocks:
Collision probability is ~ 1/2535301202817642046627252275200 ~ 3.944304522431639e-31

type Fragment Uses

type Fragment struct {
    Hash    [HashSize]byte // Hash of the fragment
    Payload []byte         // Data of the fragment.
    New     bool           // Will be true, if the data hasn't been encountered before.
    N       uint           // Sequencially incrementing number for each segment.
}

Fragment is a file fragment. It is the data returned by the NewSplitter.

type IndexedReader Uses

type IndexedReader interface {
    Reader

    // Blocksizes will return the sizes of each block.
    // Will be available if an index was provided.
    BlockSizes() []int
}

IndexedReader gives access to internal information on block sizes available on indexed streams.

func NewReader Uses

func NewReader(index io.Reader, blocks io.Reader) (IndexedReader, error)

NewReader returns a reader that will decode the supplied index and data stream.

This is compatible content from the NewWriter function. The function will decode the index before returning.

When you are done with the Reader, use Close to release resources.

This will deduplicate a buffer of zeros to an indexed stream

Code:

// Create data we can read.
var idx, data bytes.Buffer
input := bytes.NewBuffer(make([]byte, 50000))
w, _ := dedup.NewWriter(&idx, &data, dedup.ModeFixed, 1000, 0)
_, _ = io.Copy(w, input)
_ = w.Close()

// Create a new reader.
r, err := dedup.NewReader(&idx, &data)
if err != nil {
    panic(err)
}

// Inspect how much memory it will use.
fmt.Println("Memory use:", r.MaxMem())

var dst bytes.Buffer

// Read everything
_, err = io.Copy(&dst, r)
if err != nil && err != io.EOF {
    panic(err)
}

// Let us inspect what was written:
fmt.Println("Returned data length:", dst.Len())
fmt.Println("Everything zero:", 0 == bytes.Compare(dst.Bytes(), make([]byte, 50000)))

Output:

Memory use: 1000
Returned data length: 50000
Everything zero: true

func NewSeekReader Uses

func NewSeekReader(index io.Reader, blocks io.ReadSeeker) (IndexedReader, error)

NewSeekRead returns a reader that will decode the supplied index and data stream.

This is compatible content from the NewWriter function.

No blocks will be kept in memory, but the block data input must be seekable. The function will decode the index before returning.

When you are done with the Reader, use Close to release resources.

type Mode Uses

type Mode int

Deduplication mode used to determine how input is split.

const (
    // Fixed block size
    //
    // This is by far the fastest mode, and checks for duplicates
    // In fixed block sizes.
    // It can be helpful to use the "Split" function to reset offset, which
    // will reset duplication search at the position you are at.
    ModeFixed Mode = 0

    // Dynamic block size.
    //
    // This mode will create a deduplicator that will split the contents written
    // to it into dynamically sized blocks.
    // The size given indicates the maximum block size. Average size is usually maxSize/4.
    // Minimum block size is maxSize/64.
    ModeDynamic = 1
)

type Reader Uses

type Reader interface {
    io.ReadCloser

    io.WriterTo

    // MaxMem returns the *maximum* memory required to decode the stream.
    MaxMem() int
}

A Reader will decode a deduplicated stream and return the data as it was encoded. Use Close when done to release resources.

func NewStreamReader Uses

func NewStreamReader(in io.Reader) (Reader, error)

NewStreamReader returns a reader that will decode the supplied data stream.

This is compatible content from the NewStreamWriter function.

When you are done with the Reader, use Close to release resources.

This will deduplicate a buffer of zeros to an indexed stream

Code:

// Create data we can read.
var data bytes.Buffer
input := bytes.NewBuffer(make([]byte, 50000))
// Set the memory limit to 10000 bytes
w, _ := dedup.NewStreamWriter(&data, dedup.ModeFixed, 1000, 10000)
_, _ = io.Copy(w, input)
_ = w.Close()

// Create a new stream reader:
r, err := dedup.NewStreamReader(&data)
if err != nil {
    panic(err)
}

// Inspect how much memory it will use.
// Since this is a stream, it will print the worst possible scenario
fmt.Println("Memory use:", r.MaxMem())

var dst bytes.Buffer

// Read everything
_, err = io.Copy(&dst, r)
if err != nil && err != io.EOF {
    panic(err)
}

// Let us inspect what was written:
fmt.Println("Returned data length:", dst.Len())
fmt.Println("Everything zero:", 0 == bytes.Compare(dst.Bytes(), make([]byte, 50000)))

Output:

Memory use: 10000
Returned data length: 50000
Everything zero: true

type Writer Uses

type Writer interface {
    io.WriteCloser

    // Split content, so a new block begins with next write.
    Split()

    // MemUse returns an approximate maximum memory use in bytes for
    // encoder (Writer) and decoder (Reader) for the given number of bytes.
    MemUse(bytes int) (encoder, decoder int64)

    // Returns the current number of blocks.
    // Blocks may still be processing.
    Blocks() int
}

func NewSplitter Uses

func NewSplitter(fragments chan<- Fragment, mode Mode, maxSize uint) (Writer, error)

NewSplitter will return a writer you can write data to, and the file will be split into separate fragments.

You must supply a fragment channel, that will output fragments for the data you have written. The channel must accept data while you write to the spliter.

For each fragment the SHA-1 hash of the data section is returned, along with the raw data of this segment.

When you call Close on the returned Writer, the final fragments will be sent and the channel will be closed.

This will deduplicate a buffer of zeros, and return each block on a channel in order.

Code:

// We will write to this
// We set a small buffer
out := make(chan dedup.Fragment, 10)

// This will consume our blocks as they are returned
// and send information about what was received.
info := make(chan int, 0)
go func() {
    n := 0
    size := 0
    for f := range out {
        n++
        if f.New {
            size += len(f.Payload)
        }
    }
    info <- n
    info <- size
}()

// This is our input:
input := bytes.NewBuffer(make([]byte, 50000))

// Create a new writer, with each block being 1000 bytes,
w, err := dedup.NewSplitter(out, dedup.ModeFixed, 1000)
if err != nil {
    panic(err)
}
// Copy our input to the writer.
io.Copy(w, input)

// Close the writer
err = w.Close()
if err != nil {
    panic(err)
}

// Let us inspect what was written:
fmt.Println("Blocks:", <-info)
fmt.Println("Data size:", <-info)

Output:

Blocks: 50
Data size: 1000

This will deduplicate a file and return each block on a channel in order.

Code:

// Our input
f, _ := os.Open("testdata/sampledata.zip")
defer f.Close()

// We will receive fragments on this channel
ch := make(chan dedup.Fragment, 10)

var wg sync.WaitGroup
wg.Add(1)

// Start a goroutine that will consume the fragments
go func() {
    defer wg.Done()
    for {
        select {
        case f, ok := <-ch:
            if !ok {
                return
            }
            if f.New {
                fmt.Printf("Got NEW fragment #%d, size %d, hash:%s\n", f.N, len(f.Payload), hex.EncodeToString(f.Hash[:]))
                // Insert payload into data store
            } else {
                fmt.Printf("Got OLD fragment #%d, size %d, hash:%s\n", f.N, len(f.Payload), hex.EncodeToString(f.Hash[:]))
            }
            // Add hash to list of hashes required to reconstruct the file.
        }
    }
}()

// Create a dynamic splitter with average size of 1024 bytes.
w, _ := dedup.NewSplitter(ch, dedup.ModeDynamic, 4*1024)

// Copy data to the splitter
_, _ = io.Copy(w, f)

// Flush the remaining fragments
_ = w.Close()

// Wait for input to be received.
wg.Wait()

Output:

Got NEW fragment #0, size 893, hash:7f8455127e82f90ea7e97716ccaefa9317279b4b
Got NEW fragment #1, size 559, hash:b554708bbfda24f1eb8fcd75a155d23bd36939d3
Got NEW fragment #2, size 3482, hash:59bca870477e14e97ae8650e74ef52abcb6340e8
Got NEW fragment #3, size 165, hash:6fb05a63e28a1bb2e880e051940f517115e7b16c
Got NEW fragment #4, size 852, hash:6671826ffff6edd32951a0e774efccb5101ba629
Got NEW fragment #5, size 3759, hash:0fae545a20195720d8e9bb9540069418d7db0873
Got OLD fragment #6, size 3482, hash:59bca870477e14e97ae8650e74ef52abcb6340e8
Got OLD fragment #7, size 165, hash:6fb05a63e28a1bb2e880e051940f517115e7b16c
Got OLD fragment #8, size 852, hash:6671826ffff6edd32951a0e774efccb5101ba629
Got NEW fragment #9, size 2380, hash:1507aa13e215517ce982b9235a0221018128ed4e

func NewStreamWriter Uses

func NewStreamWriter(out io.Writer, mode Mode, maxSize, maxMemory uint) (Writer, error)

NewStreamWriter will create a deduplicator that will split the contents written to it into blocks and de-duplicate these.

The output is delivered as a single stream, and memory use will remain stable for both writing and reading the stream.

This function returns data that is compatible with the NewStreamReader function.

You can must set the maximum memory for the decoder to use. This limits the length a match can be made. If you use dynamic blocks, also note that the average size is 1/4th of the maximum block size.

The returned writer must be closed to flush the remaining data.

This will deduplicate a buffer of zeros to an non-indexed stream

Code:

// We will write to this
data := bytes.Buffer{}

// This is our input:
input := bytes.NewBuffer(make([]byte, 50000))

// Create a new writer, with each block being 1000 bytes,
// And allow it to use 10000 bytes of memory
w, err := dedup.NewStreamWriter(&data, dedup.ModeFixed, 1000, 10000)
if err != nil {
    panic(err)
}
// Copy our input to the writer.
io.Copy(w, input)

// Close the writer
err = w.Close()
if err != nil {
    panic(err)
}

// Let us inspect what was written:
fmt.Println("Blocks:", w.Blocks())
fmt.Println("Data size:", data.Len())

Output:

Blocks: 50
Data size: 1068

This will deduplicate a buffer of zeros to an non-indexed stream written to a file. It is not recommended to use a single stream when you are writing to a stream.

Code:

// We will write to this
data, err := os.Create("outputstream.data")
if err != nil {
    panic(err)
}
// Close, print stats and remove it
defer func() {
    data.Close()
    stat, _ := os.Stat("outputstream.data")
    fmt.Println("Stream size:", stat.Size())
    os.Remove("outputstream.data")
}()

// This is our input:
input := bytes.NewBuffer(make([]byte, 500000))

// Create a new writer, with each block being 1000 bytes,
// And allow it to use 10000 bytes of memory
w, err := dedup.NewStreamWriter(data, dedup.ModeFixed, 1000, 10000)
if err != nil {
    panic(err)
}
defer w.Close()

// Copy our input to the writer.
io.Copy(w, input)

// Print the number of blocks written
fmt.Println("Blocks:", w.Blocks())

Output:

Blocks: 500
Stream size: 1518

func NewWriter Uses

func NewWriter(index io.Writer, blocks io.Writer, mode Mode, maxSize, maxMemory uint) (Writer, error)

NewWriter will create a deduplicator that will split the contents written to it into blocks and de-duplicate these.

The output is delivered as two streams, an index stream and a block stream.

The index stream will contain information about which blocks are deduplicated and the block stream will contain uncompressed data blocks.

You can set the maximum memory for the decoder to use. This limits the length a match can be made. This is very conservative, so you can set this at the absolute limit of memory available. If you use dynamic blocks, also note that the average size is 1/4th of the maximum block size. Set maxMemory to 0 to disable decoder memory limit.

This function returns data that is compatible with the NewReader function. The returned writer must be closed to flush the remaining data.

This will deduplicate a buffer of zeros to an indexed stream

Code:

// We will write to these
idx := bytes.Buffer{}
data := bytes.Buffer{}

// This is our input:
input := bytes.NewBuffer(make([]byte, 50000))

// Create a new writer, with each block being 1000 bytes
w, err := dedup.NewWriter(&idx, &data, dedup.ModeFixed, 1000, 0)
if err != nil {
    panic(err)
}

// Copy our input to the writer.
io.Copy(w, input)

// Close the writer
err = w.Close()
if err != nil {
    panic(err)
}

// Let us inspect what was written:
fmt.Println("Blocks:", w.Blocks())
fmt.Println("Index size:", idx.Len())
fmt.Println("Data size:", data.Len())

Output:

Blocks: 50
Index size: 67
Data size: 1000

This example will show how to write data to two files. Running this example will deduplicate an empty byte slice of 500000 bytes into an 'output.data' and 'output.idx' file.

In the real world, you would likely want to add a bufio.NewWriter to the output, but to keep it simple, we don't do that here.

Code:

data, err := os.Create("output.data")
if err != nil {
    panic(err)
}
// Close, print stats and remove it
defer func() {
    data.Close()
    stat, _ := os.Stat("output.data")
    fmt.Println("Data size:", stat.Size())
    os.Remove("output.data")
}()

idx, err := os.Create("output.idx")
if err != nil {
    panic(err)
}
// Close, print stats and remove it
defer func() {
    idx.Close()
    stat, _ := os.Stat("output.idx")
    fmt.Println("Index size:", stat.Size())
    os.Remove("output.idx")
}()

// This is our input:
input := bytes.NewBuffer(make([]byte, 500000))

// Create a new writer, with each block being 1000 bytes fixed size.
w, err := dedup.NewWriter(idx, data, dedup.ModeFixed, 1000, 0)
if err != nil {
    panic(err)
}
defer w.Close()

// Copy our input to the writer.
io.Copy(w, input)

// Print the number of blocks written
fmt.Println("Blocks:", w.Blocks())

Output:

Blocks: 500
Index size: 517
Data size: 1000

Directories

PathSynopsis
sort

Package dedup imports 12 packages (graph). Updated 2016-07-20. Refresh now. Tools for package owners.