util

package
v0.0.0-...-404dc1e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 10, 2017 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BUFFER_SIZE = 1024 * 512
)
View Source
const (
	MessageControlEOF = MessageControl(math.MinInt32)
)

Variables

View Source
var (
	Transport    *http.Transport
	SchemePrefix = "http://"
)

Functions

func BufWrites

func BufWrites(rawWriters []io.Writer, function func([]io.Writer))

BufWrites ensures all writers are bufio.Writer For any bufio.Writer created here, flush it before returning.

func ChannelToLineWriter

func ChannelToLineWriter(wg *sync.WaitGroup, stat *pb.InstructionStat, name string, reader io.Reader, writer io.WriteCloser, errorOutput io.Writer)

func ChannelToWriter

func ChannelToWriter(wg *sync.WaitGroup, name string, reader io.Reader, writer io.WriteCloser, errorOutput io.Writer) error

func CleanPath

func CleanPath(path string) string

func Compare

func Compare(a interface{}, b interface{}) (ret int)

func CopyMultipleReaders

func CopyMultipleReaders(readers []io.Reader, writer io.Writer) (inCounter int64, outCounter int64, e error)

setup asynchronously to merge multiple channels into one channel

func DecodeRow

func DecodeRow(encodedBytes []byte) (ts int64, objects []interface{}, err error)

DecodeRow decodes one row of data from a blob

func DecodeRowKeys

func DecodeRowKeys(encodedBytes []byte, indexes []int) (ts int64, keys []interface{}, err error)

DecodeRowKeys decode key fields by index[], starting from 1

func DecodeRowKeysValues

func DecodeRowKeysValues(encodedBytes []byte, indexes []int) (ts int64, keys, values []interface{}, err error)

DecodeRowKeysValues decode a row of data, with the indexes[] specified fields as key fields and the rest of fields as value fields

func DecodeRowTo

func DecodeRowTo(encodedBytes []byte, objects ...interface{}) error

func DownloadUrl

func DownloadUrl(fileUrl string) (filename string, content []byte, e error)

func EncodeKeys

func EncodeKeys(anyObject ...interface{}) ([]byte, error)

EncodeKeys encode keys to a blob

func EncodeRow

func EncodeRow(ts int64, anyObject ...interface{}) ([]byte, error)

EncodeRow encode one row of data to a blob

func Error

func Error(w http.ResponseWriter, r *http.Request, httpStatus int, obj string) (err error)

func Execute

func Execute(ctx context.Context, executeWaitGroup *sync.WaitGroup, stat *pb.InstructionStat,
	name string, command *exec.Cmd,
	reader io.Reader, writer io.Writer, prevIsPipe, isPipe bool, closeOutput bool,
	errWriter io.Writer) error

all data passing through pipe are all (size, msgpack_encoded) tuples The input and output should all be this msgpack format. Only the stdin and stdout of Pipe() is line based text.

func ExecuteWithCleanup

func ExecuteWithCleanup(parentContext context.Context, onExecute func() error, onCleanup func()) error

func Fprintf

func Fprintf(reader io.Reader, writer io.Writer, format string) error

Fprintf reads MessagePack encoded messages from reader, and formats according to a format specifier and writes to writer.

func Get

func Get(url string) ([]byte, error)

func Hash

func Hash(bytes []byte) uint32

func HashByKeys

func HashByKeys(data []interface{}) int

func Json

func Json(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error)

func LessThan

func LessThan(a interface{}, b interface{}) bool

func LineReaderToChannel

func LineReaderToChannel(wg *sync.WaitGroup, stat *pb.InstructionStat, name string, reader io.Reader, ch io.WriteCloser, closeOutput bool, errorOutput io.Writer)

func LinkChannel

func LinkChannel(wg *sync.WaitGroup, inChan, outChan chan []byte)

func ListFiles

func ListFiles(dir string, pattern string) (fileNames []string)

func Now

func Now() (ts int64)

func PartitionByKeys

func PartitionByKeys(shardCount int, data []interface{}) int

func Post

func Post(url string, values url.Values) ([]byte, error)

func PrintDelimited

func PrintDelimited(stat *pb.InstructionStat, reader io.Reader, writer io.Writer, delimiter string, lineSperator string) error

PrintDelimited Reads and formats MessagePack encoded messages with delimiter and lineSeparator.

func ProcessMessage

func ProcessMessage(reader io.Reader, f func([]byte) error) (err error)

ProcessMessage Reads and processes MessagePack encoded messages until EOF

func Range

func Range(from, to int) func(io.Writer) error

func ReadMessage

func ReadMessage(reader io.Reader) (m []byte, err error)

ReadMessage reads out the []byte for one message

func ReadRow

func ReadRow(reader io.Reader) (ts int64, row []interface{}, err error)

ReadRow read and decode one row of data

func ReaderToChannel

func ReaderToChannel(wg *sync.WaitGroup, name string, reader io.ReadCloser, writer io.WriteCloser, closeOutput bool, errorOutput io.Writer) error

func Retry

func Retry(fn func() error) error

func TakeMessage

func TakeMessage(reader io.Reader, count int, f func([]byte) error) (err error)

TakeMessage Reads and processes MessagePack encoded messages. If count is less than 0, all lines are processed.

func TakeTsv

func TakeTsv(reader io.Reader, count int, f func([]string) error) (err error)

TakeTsv Reads and processes TSV lines. If count is less than 0, all lines are processed.

func TimeDelayedRetry

func TimeDelayedRetry(fn func() error, waitTimes ...time.Duration) error

func TsvPrintf

func TsvPrintf(reader io.Reader, writer io.Writer, format string) error

TsvPrintf reads TSV lines from reader, and formats according to a format specifier and writes to writer.

func UserHomeDir

func UserHomeDir() string

func WriteEOFMessage

func WriteEOFMessage(writer io.Writer) (err error)

func WriteMessage

func WriteMessage(writer io.Writer, m []byte) (err error)

func WriteRow

func WriteRow(writer io.Writer, ts int64, anyObject ...interface{}) error

WriteRow encode and write a row of data

Types

type BufferedMessageWriter

type BufferedMessageWriter struct {
	// contains filtered or unexported fields
}

func NewBufferedMessageWriter

func NewBufferedMessageWriter(w io.Writer, size int) *BufferedMessageWriter

func (*BufferedMessageWriter) Available

func (b *BufferedMessageWriter) Available() int

func (*BufferedMessageWriter) Buffered

func (b *BufferedMessageWriter) Buffered() int

func (*BufferedMessageWriter) Flush

func (b *BufferedMessageWriter) Flush() error

func (*BufferedMessageWriter) WriteMessage

func (b *BufferedMessageWriter) WriteMessage(m []byte) (err error)

type Item

type Item struct {
	// contains filtered or unexported fields
}

An Item is something we manage in a priority queue.

type MessageControl

type MessageControl int32

type Piper

type Piper struct {
	Reader  *io.PipeReader
	Writer  *io.PipeWriter
	Counter int64
	Error   error
}

func NewPiper

func NewPiper() *Piper

type PriorityQueue

type PriorityQueue struct {
	// contains filtered or unexported fields
}

A PriorityQueue implements heap.Interface and holds Items.

func NewPriorityQueue

func NewPriorityQueue(lessFunc func(a, b interface{}) bool) *PriorityQueue

func (*PriorityQueue) Dequeue

func (pq *PriorityQueue) Dequeue() (interface{}, int)

func (*PriorityQueue) Enqueue

func (pq *PriorityQueue) Enqueue(x interface{}, sourceId int)

func (*PriorityQueue) Len

func (pq *PriorityQueue) Len() int

func (*PriorityQueue) Less

func (pq *PriorityQueue) Less(i, j int) bool

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() interface{}

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x interface{})

func (*PriorityQueue) Swap

func (pq *PriorityQueue) Swap(i, j int)

func (*PriorityQueue) Top

func (pq *PriorityQueue) Top() interface{}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL