extsort

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 29, 2022 License: Apache-2.0 Imports: 8 Imported by: 13

README

extsort

PkgGoDev

Go Report Card

An external sorting library for golang (i.e. on disk sorting) on an arbitrarily channel, even if the generated content doesn't all fit in memory at once. Once sorted, it returns a new channel returning data in sorted order.

In order to remain efficient for all implementations, extsort doesn't handle serialization, but leaves that to the user by operating on types that implement the SortType.ToBytes and FromBytes interfaces.

extsort also has a Strings() interface which does not require the overhead of converting everything to/from bytes and is faster for string types.

extsort is not a stable sort.

Example

package main

import (
    "encoding/binary"
    "fmt"
    "math/rand"

    "github.com/lanrat/extsort"
)

var count = int(1e7) // 10M

type sortInt struct {
    i int64
}

func (s sortInt) ToBytes() []byte {
    buf := make([]byte, binary.MaxVarintLen64)
    binary.PutVarint(buf, s.i)
    return buf
}

func sortIntFromBytes(b []byte) extsort.SortType {
    i, _ := binary.Varint(b)
    return sortInt{i: i}
}

func compareSortIntLess(a, b extsort.SortType) bool {
    return a.(sortInt).i < b.(sortInt).i
}

func main() {
    // create an input channel with unsorted data
    inputChan := make(chan extsort.SortType)
    go func() {
        for i := 0; i < count; i++ {
            inputChan <- sortInt{i: rand.Int63()}
        }
        close(inputChan)
    }()

    // create the sorter and start sorting
    sorter, outputChan, errChan := extsort.New(inputChan, sortIntFromBytes, compareSortIntLess, nil)
    sorter.Sort(context.Background())

    // print output sorted data
    for data := range outputChan {
        fmt.Printf("%d\n", data.(sortInt).i)
    }
    if err := <-errChan; err != nil {
        fmt.Printf("err: %s", err.Error())
    }
}

extsort/diff

The diff sub-package is a self-contained package that assists with diffing of channels of sorted data. It can be used with the extsort methods or on its own

TODO

  • parallelize merging after sorting

Documentation

Overview

Package extsort implements an unstable external sort for all the records in a chan or iterator

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func UniqStringChan

func UniqStringChan(in chan string) chan string

UniqStringChan returns a channel identical to the input but only with uniq elements only works on sorted inputs

Types

type CompareLessFunc

type CompareLessFunc func(a, b SortType) bool

CompareLessFunc compares two SortType items and returns true if a is less than b

type Config

type Config struct {
	ChunkSize          int    // amount of records to store in each chunk which will be written to disk
	NumWorkers         int    // maximum number of workers to use for parallel sorting
	ChanBuffSize       int    // buffer size for merging chunks
	SortedChanBuffSize int    // buffer size for passing records to output
	TempFilesDir       string // empty for use OS default ex: /tmp
}

Config holds configuration settings for extsort

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns the default configuration options sued if none provided

type FromBytes

type FromBytes func([]byte) SortType

FromBytes unmarshal bytes from gob to create a SortType when reading back the sorted items

type SortType

type SortType interface {
	ToBytes() []byte // ToBytes used for marshaling with gob
}

SortType defines the interface required by the extsort library to be able to sort the items

type SortTypeSorter

type SortTypeSorter struct {
	// contains filtered or unexported fields
}

SortTypeSorter stores an input chan and feeds Sort to return a sorted chan

func New

func New(i chan SortType, fromBytes FromBytes, lessFunc CompareLessFunc, config *Config) (*SortTypeSorter, chan SortType, chan error)

New returns a new Sorter instance that can be used to sort the input chan fromBytes is needed to unmarshal SortTypes from []byte on disk lessfunc is the comparator used for SortType config ca be nil to use the defaults, or only set the non-default values desired if errors or interupted, may leave temp files behind in config.TempFilesDir the returned chanels contain the data returned from calling Sort()

func NewMock

func NewMock(i chan SortType, fromBytes FromBytes, lessFunc CompareLessFunc, config *Config, n int) (*SortTypeSorter, chan SortType, chan error)

NewMock is the same as New() but is backed by memory instead of a temporary file on disk n is the size to initialize the backing bytes buffer too

func (*SortTypeSorter) Sort

func (s *SortTypeSorter) Sort(ctx context.Context)

Sort sorts the Sorter's input chan and returns a new sorted chan, and error Chan Sort is a chunking operation that runs multiple workers asynchronously this blocks while sorting chunks and unblocks when merging NOTE: the context passed to Sort must outlive Sort() returning. Merge uses the same context and runs in a goroutine after Sort returns(). for example, if calling sort in an errGroup, you must pass the group's parent context into sort.

type Sorter

type Sorter interface {
	Sort(context.Context)
}

Sorter is the interface that all extsort sorters must satisfy

type StringSorter

type StringSorter struct {
	// contains filtered or unexported fields
}

StringSorter stores an input chan and feeds Sort to return a sorted chan

func Strings

func Strings(i chan string, config *Config) (*StringSorter, chan string, chan error)

Strings returns a new Sorter instance that can be used to sort the input chan

func StringsMock

func StringsMock(i chan string, config *Config, n int) (*StringSorter, chan string, chan error)

StringsMock is the same as Strings() but is backed by memory instead of a temporary file on disk n is the size to initialize the backing bytes buffer too

func (*StringSorter) Sort

func (s *StringSorter) Sort(ctx context.Context)

Sort sorts the Sorter's input chan and returns a new sorted chan, and error Chan Sort is a chunking operation that runs multiple workers asynchronously this blocks while sorting chunks and unblocks when merging NOTE: the context passed to Sort must outlive Sort() returning. merge used the same context and runs in a goroutine after Sort returns() for example, if calling sort in an errGroup, you must pass the group's parent context into sort.

Directories

Path Synopsis
Package diff performs diffs on sorted channels of data
Package diff performs diffs on sorted channels of data
Package queue provided a generic priority queue implemtation based on the internal heap
Package queue provided a generic priority queue implemtation based on the internal heap
Package tempfile implements a virtual temp files that can be written to (in series) and then read back (series/parallel) and then removed from the filesystem when done if multiple "tempfiles" are needed on the application layer, they are mapped to sections of the same real file on the filesystem
Package tempfile implements a virtual temp files that can be written to (in series) and then read back (series/parallel) and then removed from the filesystem when done if multiple "tempfiles" are needed on the application layer, they are mapped to sections of the same real file on the filesystem

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL