streamsort

package module
v0.6.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2018 License: Apache-2.0 Imports: 11 Imported by: 0

README

StreamSort

Build Status GoDoc Go Report Card License

Sort arbitrarily large data sets with a predictable amount of memory using temporary files.

Example:
import(
  "fmt"

  "github.com/bsm/streamsort"
)

func main() {
	// Init a Sorter with default options
	sorter := streamsort.New(nil)
	defer sorter.Close()

	// Append data
	_ = sorter.Append([]byte("foo"))
	_ = sorter.Append([]byte("bar"))
	_ = sorter.Append([]byte("baz"))
	_ = sorter.Append([]byte("boo"))

	// Sort and iterate
	iter, err := sorter.Sort(context.Background())
	if err != nil {
		panic(err)
	}
	defer iter.Close()

	for iter.Next() {
		fmt.Println(string(iter.Bytes()))
	}
	if err := iter.Err(); err != nil {
		panic(err)
	}

}

For more complex examples, please see our Documentation

Documentation

Overview

Example
package main

import (
	"context"
	"fmt"

	"github.com/bsm/streamsort"
)

func main() {
	// Init a Sorter with default options
	sorter := streamsort.New(nil)
	defer sorter.Close()

	// Append data
	_ = sorter.Append([]byte("foo"))
	_ = sorter.Append([]byte("bar"))
	_ = sorter.Append([]byte("baz"))
	_ = sorter.Append([]byte("boo"))

	// Sort and iterate
	iter, err := sorter.Sort(context.Background())
	if err != nil {
		panic(err)
	}
	defer iter.Close()

	for iter.Next() {
		fmt.Println(string(iter.Bytes()))
	}
	if err := iter.Err(); err != nil {
		panic(err)
	}

}
Output:

bar
baz
boo
foo
Example (Json)
package main

import (
	"bufio"
	"context"
	"encoding/json"
	"fmt"
	"os"

	"github.com/bsm/streamsort"
)

func main() {
	// Define a custom comparer.
	// Sort by year ascending, then by price descending
	comparer := streamsort.ComparerFunc(func(b1, b2 []byte) int {
		var s1, s2 Stock

		if e1, e2 := json.Unmarshal(b1, &s1), json.Unmarshal(b2, &s2); e1 != nil && e2 != nil {
			return 0 // equal if both a and b are invalid
		} else if e2 != nil {
			return -1 // a before b if a is valid but not b
		} else if e1 != nil {
			return 1 // b before a if b is valid but not a
		}

		if s1.Year < s2.Year {
			return -1
		} else if s2.Year < s1.Year {
			return 1
		} else if s1.Price < s2.Price {
			return 1
		} else if s2.Price < s1.Price {
			return -1
		}
		return 0
	})

	// Init a new Sorter, use compression and no more than 1M of memory
	sorter := streamsort.New(&streamsort.Options{
		MaxMemBuffer: 1024 * 1024,
		Comparer:     comparer,
		Compression:  streamsort.CompressionGzip,
	})
	defer sorter.Close()

	// Open input JSON file
	file, err := os.Open("testdata/stocks.json")
	if err != nil {
		panic(err)
	}
	defer file.Close()

	// Scan it line by line
	scanner := bufio.NewScanner(file)
	for scanner.Scan() {
		if err := sorter.Append(scanner.Bytes()); err != nil {
			panic(err)
		}
	}
	if err := scanner.Err(); err != nil {
		panic(err)
	}

	// Sort intput, retrieve iterator
	iter, err := sorter.Sort(context.Background())
	if err != nil {
		panic(err)
	}
	defer iter.Close()

	// Iterate over the sorted results,
	// abort after the first five.
	n := 0
	for iter.Next() {
		fmt.Println(string(iter.Bytes()))

		if n++; n == 5 {
			break
		}
	}
	if err := iter.Err(); err != nil {
		panic(err)
	}

}

type Stock struct {
	ID      int
	Company string
	Year    int
	Price   float64
}
Output:

{"id":32663,"company":"Macejkovic-Feest","year":1988,"price":99.97}
{"id":26921,"company":"Wuckert, West and Skiles","year":1988,"price":99.7}
{"id":33631,"company":"Stiedemann, Senger and McLaughlin","year":1988,"price":99.48}
{"id":11931,"company":"Nitzsche-Corkery","year":1988,"price":98.87}
{"id":67013,"company":"Mills, Olson and Effertz","year":1988,"price":98.75}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Comparer

type Comparer interface {
	// Compare returns -1 when a is 'less than', 0 when a is 'equal to' or
	// +1' when a is 'greater than' b.
	Compare(a, b []byte) int
}

Comparer is used to compare data chunks for ordering

type ComparerFunc

type ComparerFunc func(a, b []byte) int

func (ComparerFunc) Compare

func (f ComparerFunc) Compare(a, b []byte) int

type Compression

type Compression uint8
const (
	CompressionNone Compression = iota
	CompressionGzip
)

type Iterator

type Iterator struct {
	// contains filtered or unexported fields
}

Iterator allows to iterate over sorted outputs

func (*Iterator) Bytes

func (i *Iterator) Bytes() []byte

Bytes returns the chunk at the current position

func (*Iterator) Close

func (i *Iterator) Close() error

Close closes and releases the iterator

func (*Iterator) Err

func (i *Iterator) Err() error

Err returns the iterator error

func (*Iterator) Next

func (i *Iterator) Next() bool

Next advances the cursor to the next entry

type Options

type Options struct {
	// TempDir specifies the working directory.
	// By default standard temp is used
	TempDir string

	// Compararer defines the sort order.
	// Default: bytes.Compare
	Comparer Comparer

	// Compression is used for intermediate files.
	// Default: CompressionNone
	Compression Compression

	// MaxOpenFiles limits the number of open files; must be >1.
	// Default: 100
	MaxOpenFiles int

	// MaxMemBuffer limits the memory used for sorting
	// Default: 64M (must be at least 16k)
	MaxMemBuffer int
}

Options contains sorting options

type Sorter

type Sorter struct {
	// contains filtered or unexported fields
}

Sorter is responsible for sorting a stream

func New

func New(opt *Options) *Sorter

New inits a sorter

func (*Sorter) Append

func (s *Sorter) Append(data []byte) error

Append appends data to the sorter

func (*Sorter) Close

func (s *Sorter) Close() error

Close stops the processing and removes all temporary fnames

func (*Sorter) Sort

func (s *Sorter) Sort(ctx context.Context) (*Iterator, error)

Sort sorts the inputs and returns an interator output iterator. You must close the iterator after use.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL