octave

package module
v0.0.0-...-5405f13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2019 License: Apache-2.0 Imports: 16 Imported by: 0

README

Octave

Build Status GoDoc Go Report Card License

Octave is a Go library for file/bucket based event stream processing.

Documentation

Check out the full API on godoc.org.

Example

First, write/append your data to a log. You can index your logs:

import (
	"context"
	"io"
	"path"
	"strings"

	"github.com/bsm/accord"
	"github.com/bsm/octave"
)

func main() {
	ctx := context.Background()

	// assume a mock type
	type mockType struct {
		Name	string
		Phone	string
		Country	string
	}

	// connect to accord
	acc, err := accord.DialClient(ctx, "10.0.0.1:8432", &accord.ClientOptions{Namespace: "/custom/namespace"})
	if err != nil {
		panic(err)
	}

	// initialize a pipeline
	pipe, err := octave.Create(ctx, "s3://source", "s3://target/to/dir", acc, &octave.Options{
		Glob:	"**/*.ndjson",
		ProcessFile: func(name string) (bool, error) {
			return strings.Contains(name, ".ndjson"), nil
		},
	})
	if err != nil {
		panic(err)
	}
	defer pipe.Close()

	// run the pipeline (blocking)
	err = pipe.Run(func(emt octave.Emitter, snk octave.Sink) error {
		for {
			// decode the record
			rec := new(mockType)
			if err := emt.Decode(rec); err == io.EOF {
				break
			} else if err != nil {
				return err
			}

			// get the source file name (without extension)
			name := path.Base(emt.Name())
			if pos := strings.IndexByte(name, '.'); pos > -1 {
				name = name[:pos]
			}

			// write to output
			if err := snk.Encode(name+"-"+rec.Country+".ndjson", rec); err != nil {
				return err
			}
		}
		return nil
	})
	if err != nil {
		panic(err)
	}
}

Documentation

Overview

Example
package main

import (
	"context"
	"io"
	"path"
	"strings"

	"github.com/bsm/accord"
	"github.com/bsm/octave"
)

func main() {
	ctx := context.Background()

	// assume a mock type
	type mockType struct {
		Name    string
		Phone   string
		Country string
	}

	// connect to accord
	acc, err := accord.DialClient(ctx, "10.0.0.1:8432", &accord.ClientOptions{Namespace: "/custom/namespace"})
	if err != nil {
		panic(err)
	}

	// initialize a pipeline
	pipe, err := octave.Create(ctx, "s3://source", "s3://target/to/dir", acc, &octave.Options{
		Glob: "**/*.ndjson",
		ProcessFile: func(name string) (bool, error) {
			return strings.Contains(name, ".ndjson"), nil
		},
	})
	if err != nil {
		panic(err)
	}
	defer pipe.Close()

	// run the pipeline (blocking)
	err = pipe.Run(func(emt octave.Emitter, snk octave.Sink) error {
		for {
			// decode the record
			rec := new(mockType)
			if err := emt.Decode(rec); err == io.EOF {
				break
			} else if err != nil {
				return err
			}

			// get the source file name (without extension)
			name := path.Base(emt.Name())
			if pos := strings.IndexByte(name, '.'); pos > -1 {
				name = name[:pos]
			}

			// write to output
			if err := snk.Encode(name+"-"+rec.Country+".ndjson", rec); err != nil {
				return err
			}
		}
		return nil
	})
	if err != nil {
		panic(err)
	}
}
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ChannelFunc

type ChannelFunc func(Emitter, Sink) error

ChannelFunc connects emitters to sinks.

type Decoder

type Decoder interface {
	// Decode decodes the next message into an interface.
	// It returns io.EOF once the end of the stream is reached.
	Decode(v interface{}) error

	io.Closer
}

Decoder methods

type Emitter

type Emitter interface {
	// Name returns the name of the file the data is emitted from.
	Name() string

	// Decode decodes the next message into an interface.
	// It returns io.EOF once the end of the stream is reached.
	Decode(interface{}) error
}

Emitter is a minimal decoder.

type Encoder

type Encoder interface {
	// Encode encodes the value.
	Encode(v interface{}) error

	io.Closer
}

Encoder methods

type Options

type Options struct {
	// Number of concurrent worker threads.
	// Default: number of CPUs
	Concurrency int

	// A custom temporary directory.
	// Default: os.TempDir()
	TempDir string

	// File glob pattern.
	// Default: "**"
	Glob string

	// NewDecoder wraps the reader and returns a decoder for the given file name.
	// Default: json.NewDecoder(reader)
	NewDecoder func(name string, reader io.Reader) (Decoder, error)

	// NewEncoder wraps the writer and returns an encoder for the given file name.
	// Default: json.NewEncoder(writer)
	NewEncoder func(name string, writer io.Writer) (Encoder, error)

	// NewCompressionReader wraps the reader and returns an io.ReadCloser.
	// It may return nil to disable decompression and read the plain input.
	// Default: gzip.NewReader(reader) (if name's extension is .gz)
	NewCompressionReader func(name string, reader io.Reader) (io.ReadCloser, error)

	// NewCompressionWriter wraps the writer and returns an io.WriteCloser.
	// It may return nil to disable compression and write the plain output.
	// Default: gzip.NewWriter(writer) (if name's extension is .gz)
	NewCompressionWriter func(name string, writer io.Writer) (io.WriteCloser, error)

	// Pause between cycles. This is to prevernt the Pipeline
	// from spinning and wasting resources on empty or processed
	// buckets.
	// Default: 5s.
	Pause time.Duration

	// BeforeCycle is a callback which is triggered before each cycles.
	BeforeCycle func() error

	// ProcessFile callback is triggered before processing to determine
	// if a file should be processed or skipped. Must return true to proceed.
	ProcessFile func(name string) (bool, error)
}

Options contains a list of options.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline processes data by running parallel worker threads.

func Create

func Create(ctx context.Context, srcURL, dstURL string, acc *accord.Client, opt *Options) (*Pipeline, error)

Create creates a new Pipeline from URLs.

func New

func New(ctx context.Context, src, dst bfs.Bucket, acc *accord.Client, opt *Options) *Pipeline

New inits a new Pipeline.

func (*Pipeline) Close

func (p *Pipeline) Close() error

Close stops the Pipeline and releases all resources.

func (*Pipeline) Run

func (p *Pipeline) Run(fn ChannelFunc) error

Run starts the Pipeline and blocks until an error occurs or it is manually stopped by calling Close().

type Sink

type Sink interface {
	// Encode encodes the value to the given file name.
	Encode(name string, v interface{}) error
}

Sink is a multi-file encoder.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL