pbzip2

package module
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 9, 2023 License: Apache-2.0 Imports: 15 Imported by: 12

README

CircleCI GithubActions

pbzip2

This package provides parallel and streaming decompression of bzip2 files. It operates by scanning the file to find each independent bzip2 block and then uses a modified version of compress/bzip2 to decompress each block. The decompressed blocks are then reassembled into their original order and made available as a stream (via io.Reader).

The API to use the parallel decompressor is simple:

	input, err := os.Open(filepath.Join("testdata", "hello_world.bz2"))
	if err != nil {
		panic(err)
	}
	io.Copy(os.Stdout, bzip2.NewReader(input))

The scanner identifies blocks by searching for the magic numbers that denote the start of a block and the end of the file. Consequently it will be fooled if these 6 byte sequences occur in the compressed data but the probability of this happening is very low (P(a specific 6-byte sequence occurring randomly)), however, given enough data it will happen. Therefore the decompressor will attempt to merge blocks that fail to decompress, assuming that the original bzip block was split because of such a false positive. With this in place it will take two occurrences of the bzip2 block magic number occurring to break the decompressor. This boils down to the probability of a specific 6-byte sequence occurring randomly, twice, within about a MB of data.

There are three components to this package:

  1. the scanner
  2. the parallel decompressor
  3. the modified bzip2 package

The scanner operates as described above but its implementation is complicated by the fact that bzip blocks are bit aligned, the handling of concatenated and empty streams etc. The search for the bzip block magic number is implemented using three lookup tables. The first is a hash lookup of 256 values to quickly determine if the next byte could possibly contain the start of the magic number. The other two tables each consist of 32 bit ints that contain all possible patterns that the 6 byte magic numbers could occur as, thus allowing for them to be shifted 1..7 bits in the stream.

The parallel decompressor accepts requests to decompress each bzip2 block concurrently and then reassembles them into a stream allowing for incremental processing of the decompressed data. The decompressor uses a modified version of the go builtin compress/bzip2 package to decompress each block separately. Fortunately those modifictions are straight forward.

So long as the scanning portion is faster than the decompression by some reasonable factor, and it is run on a multi-core machine, then this approach will be significantly faster than a serial bzip2 decompressor. Given the coarse nature of the operations it should scale linearly with the number of cores available to it. In simple testing using the wikidata entities downloads (https://dumps.wikimedia.org/wikidatawiki/entities/20191202/wikidata-20191202-all.json.bz2) it does indeed appear to be 8 times faster than the serial version on an 8 core machine.

Documentation

Overview

Copyright 2019 Cosmos Nicolaou. All rights reserved. Use of this source code is governed by the Apache-2.0 license that can be found in the LICENSE file.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateConcurrencyPool added in v1.0.1

func CreateConcurrencyPool(maxConcurrent int) chan struct{}

CreateConcurrencyPool will create a pool that can be shared among several decompressor that will limit the total number of concurrently running decompressors. Each decompressor will still only use the number of concurrent decompressors set in BZConcurrency. Specifying <= 0 will use runtime.GOMAXPROCS to set a value. Caller should not perform any operations on the returned channel.

func NewReader

func NewReader(ctx context.Context, rd io.Reader, opts ...ReaderOption) io.Reader

NewReader returns an io.Reader that uses a scanner and decompressor to decompress bzip2 data concurrently.

Types

type CompressedBlock

type CompressedBlock struct {
	// Buffer containing compressed data as a bitstream that starts at
	// BitOffset in the first byte of Buf and is SizeInBits large.
	Data            []byte
	BitOffset       int    // Compressed data starts at BitOffset in Data
	SizeInBits      int    // SizeInBits is the size of the compressed data in Data.
	CRC             uint32 // CRC for this block.
	StreamBlockSize int    // StreamBlockSize is the 1..9 *100*1000 compression block size specified when the stream was created.

	EOS       bool   // EOS has been detected.
	StreamCRC uint32 // CRC
}

CompressedBlock represents a single bzip2 compressed block.

func (CompressedBlock) String

func (b CompressedBlock) String() string

type Decompressor

type Decompressor struct {
	// contains filtered or unexported fields
}

Decompressor represents a concurrent decompressor for pbzip streams. The decompressor is designed to work in conjunction with Scanner and its Decompress method must be called with the values returned by the scanner's Block method. Each block is then decompressed in parallel and reassembled in the original order.

func NewDecompressor

func NewDecompressor(ctx context.Context, opts ...DecompressorOption) *Decompressor

NewDecompressor creates a new parallel decompressor.

func (*Decompressor) Append

func (dc *Decompressor) Append(cb CompressedBlock) error

Append adds the supplied bzip2 block to the set to be decompressed in parallel with the results of that decompression being appended to the previously appended blocks.

func (*Decompressor) Cancel

func (dc *Decompressor) Cancel(err error)

Cancel can be called to unblock any readers that are reading from this decompressor and/or the Finish method.

func (*Decompressor) Finish

func (dc *Decompressor) Finish() error

Finish must be called to wait for all of the currently outstanding decompression processes to finish and their output to be reassembled. It should be called exactly once.

func (*Decompressor) Read

func (dc *Decompressor) Read(buf []byte) (int, error)

Read implements io.Reader on the decompressed stream.

type DecompressorOption

type DecompressorOption func(*decompressorOpts)

func BZConcurrency

func BZConcurrency(n int) DecompressorOption

BZConcurrency sets the degree of concurrency to use, that is, the number of threads used for decompression.

func BZConcurrencyPool added in v1.0.1

func BZConcurrencyPool(pool chan struct{}) DecompressorOption

BZConcurrencyPool will add a thread safe pool to control concurrency. This can be used to limit the total number of active goroutines decompressing concurrently. Use CreateConcurrencyPool to create a pool of a certain size that can be shared across several decompressors. If not set, no limit will apply.

func BZSendUpdates

func BZSendUpdates(ch chan<- Progress) DecompressorOption

BZSendUpdates sets the channel for sending progress updates over.

func BZVerbose

func BZVerbose(v bool) DecompressorOption

BZVerbose controls verbose logging for decompression,

type Progress

type Progress struct {
	Duration         time.Duration
	Block            uint64
	CRC              uint32
	Compressed, Size int
}

Progress is used to report the progress of decompression. Each report pertains to a correctly ordered decompression event.

type ReaderOption

type ReaderOption func(o *readerOpts)

ReaderOption represents an option to NewReader.

func DecompressionOptions

func DecompressionOptions(opts ...DecompressorOption) ReaderOption

DecompressionOptions passes a ScannerOption to the underlying decompressor created by NewReader.

func ScannerOptions

func ScannerOptions(opts ...ScannerOption) ReaderOption

ScannerOptions passes a ScannerOption to the underlying scanner created by NewReader.

type Scanner

type Scanner struct {
	// contains filtered or unexported fields
}

Scanner returns runs of entire bz2 blocks. It works by splitting the input into blocks terminated by either the bz2 block magic or bz2 end of stream magic number sequences as documented in https://en.wikipedia.org/wiki/Bzip2. The scanner splits the magicc numbers into multiple lookup tables that include all possible shifted values to allow for efficient matching if bit (not byte) aligned values. The first block discovered will be the stream header and this is validated and consumed. The last block will be the stream trailer and this is also consumed and validated internally.

func NewScanner

func NewScanner(rd io.Reader, opts ...ScannerOption) *Scanner

NewScanner returns a new instance of Scanner.

func (*Scanner) Block

func (sc *Scanner) Block() CompressedBlock

Block returns the current block bzip2 compression block.

func (*Scanner) Err

func (sc *Scanner) Err() error

Err returns any error encountered by the scanner.

func (*Scanner) Scan

func (sc *Scanner) Scan(ctx context.Context) bool

Scan returns true if there is a block to be returned.

type ScannerOption

type ScannerOption func(*scannerOpts)

ScannerOption represenst an option to NewBZ2BlockScanner.

func ScanBlockOverhead

func ScanBlockOverhead(b int) ScannerOption

ScanBlockOverhead sets the size of the overhead, in bytes, that the scanner assumes is sufficient to capture all of the bzip2 per block data structures. It should only ever be needed if the scanner is unable to find a magic number.

Directories

Path Synopsis
cmd
pbzip2 Module
bzip2
Package bzip2 implements bzip2 decompression.
Package bzip2 implements bzip2 decompression.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL