bigslice: github.com/grailbio/bigslice/exec Index | Files

package exec

import "github.com/grailbio/bigslice/exec"

Package exec implements compilation, evaluation, and execution of Bigslice slice operations.

Index

Package Files

bigmachine.go buffer.go combiner.go compile.go config.go eval.go graph.go index.go local.go session.go slicemachine.go slicestatus.go store.go task.go topn.go trace.go

Constants

const BigmachineStatusGroup = "bigmachine"
const DefaultMaxLoad = 0.95

DefaultMaxLoad is the default machine max load.

Variables

var DoShuffleReaders = true

DoShuffleReaders determines whether reader tasks should be shuffled in order to avoid potential thundering herd issues. This should only be used in testing when deterministic ordering matters.

TODO(marius): make this a session option instead.

var ErrTaskLost = errors.New("task was lost")

ErrTaskLost indicates that a Task was in TaskLost state.

var ProbationTimeout = 30 * time.Second

ProbationTimeout is the amount of time that a machine will remain in probation without being explicitly marked healthy.

func Eval Uses

func Eval(ctx context.Context, executor Executor, inv bigslice.Invocation, roots []*Task, group *status.Group) error

Eval simultaneously evaluates a set of task graphs from the provided set of roots. Eval uses the provided executor to dispatch tasks when their dependencies have been satisfied. Eval returns on evaluation error or else when all roots are fully evaluated.

TODO(marius): consider including the invocation in the task definitions themselves. This way, a task's name is entirely self contained and can be interpreted without an accompanying invocation. TODO(marius): we can often stream across shuffle boundaries. This would complicate scheduling, but may be worth doing.

type Executor Uses

type Executor interface {
    // Start starts the executor. It is called before evaluation has started
    // and after all funcs have been registered. Start need not return:
    // for example, the Bigmachine implementation of Executor uses
    // Start as an entry point for worker processes.
    Start(*Session) (shutdown func())

    // Run runs a task. The executor sets the state of the task as it
    // progresses. The task should enter in state TaskWaiting; by the
    // time Run returns the task state is >= TaskOk.
    Run(*Task)

    // Reader returns a locally accessible ReadCloser for the requested task.
    Reader(*Task, int) sliceio.ReadCloser

    // HandleDebug adds executor-specific debug handlers to the provided
    // http.ServeMux. This is used to serve diagnostic information relating
    // to the executor.
    HandleDebug(handler *http.ServeMux)
}

Executor defines an interface used to provide implementations of task runners. An Executor is responsible for running single tasks, partitioning their outputs, and instantiating readers to retrieve the output of any given task.

type Option Uses

type Option func(s *Session)

An Option represents a session configuration parameter value.

var Local Option = func(s *Session) {
    s.executor = newLocalExecutor()
}

Local configures a session with the local in-binary executor.

var MachineCombiners Option = func(s *Session) {
    s.machineCombiners = true
}

MachineCombiners is a session option that turns on machine-local combine buffers. If turned on, each combiner task that belongs to the same shard-set and runs on the same machine combines values into a single, machine-local combine buffer. This can be a big performance optimization for tasks that have low key cardinality, or a key-set with very hot keys. However, due to the way it is implemented, error recovery is currently not implemented for such tasks.

func Bigmachine Uses

func Bigmachine(system bigmachine.System, params ...bigmachine.Param) Option

Bigmachine configures a session using the bigmachine executor configured with the provided system. If any params are provided, they are applied to each bigmachine allocated by Bigslice.

func MaxLoad Uses

func MaxLoad(maxLoad float64) Option

MaxLoad configures the session with the provided max machine load.

func Parallelism Uses

func Parallelism(p int) Option

Parallelism configures the session with the provided target parallelism.

func Status Uses

func Status(status *status.Status) Option

Status configures the session with a status object to which run statuses are reported.

type Result Uses

type Result struct {
    bigslice.Slice
    // contains filtered or unexported fields
}

A Result is the output of a Slice evaluation. It is the only type implementing bigslice.Slice that is a legal argument to a bigslice.Func.

func (*Result) Scanner Uses

func (r *Result) Scanner() *sliceio.Scanner

Scanner returns a scanner that scans the output. If the output contains multiple shards, they are scanned sequentially. You must call Close on the returned scanner when you are done scanning. You may get and scan multiple scanners concurrently from r.

type Session Uses

type Session struct {
    context.Context
    // contains filtered or unexported fields
}

Session represents a Bigslice compute session. A session shares a binary and executor, and is valid for the run of the binary. A session can run multiple bigslice functions, allowing for iterative computing.

A session is started by the Start method. Some executors use may launch multiple copies of the binary: these additional binaries are called workers and Start in these Start does not return.

All functions must be created before Start is called, and must be created in a deterministic order. This is provided by default when functions are created as part of package initialization. Registering toplevel functions this way is both safe and encouraged:

var Computation = bigslice.Func(func(..) (slice Slice) {
	// Build up the computation, parameterized by the function.
	slice = ...
	slice = ...
	return slice
})

// Possibly in another package:
func main() {
	sess := exec.Start()
	if err := sess.Run(ctx, Computation, args...); err != nil {
		log.Fatal(err)
	}
	// Success!
}

func Start Uses

func Start(options ...Option) *Session

Start creates and starts a new bigslice session, configuring it according to the provided options. Only one session may be created in a single binary invocation. The returned session remains valid for the lifetime of the binary. If no executor is configured, the session is configured to use the bigmachine executor.

func (*Session) HandleDebug Uses

func (s *Session) HandleDebug(handler *http.ServeMux)

func (*Session) MaxLoad Uses

func (s *Session) MaxLoad() float64

MaxLoad returns the maximum load on each allocated machine.

func (*Session) Must Uses

func (s *Session) Must(ctx context.Context, funcv *bigslice.FuncValue, args ...interface{}) *Result

Must is a version of Run that panics if the computation fails.

func (*Session) Parallelism Uses

func (s *Session) Parallelism() int

Parallelism returns the desired amount of evaluation parallelism.

func (*Session) Run Uses

func (s *Session) Run(ctx context.Context, funcv *bigslice.FuncValue, args ...interface{}) (*Result, error)

Run evaluates the slice returned by the bigslice func funcv applied to the provided arguments. Tasks are run by the session's executor. Run returns when the computation has completed, or else on error. It is safe to make concurrent calls to Run; the underlying computation will be performed in parallel.

func (*Session) Shutdown Uses

func (s *Session) Shutdown()

Shutdown tears down resources associated with this session. It should be called when the session is discarded.

func (*Session) Status Uses

func (s *Session) Status() *status.Status

Status returns the session's status aggregator.

type Store Uses

type Store interface {
    // Create returns a writer that populates data for the given
    // task name and partition. The data is not be available
    // to Open until the returned closer has been closed.
    //
    // TODO(marius): should we allow writes to be discarded as well?
    Create(ctx context.Context, task TaskName, partition int) (writeCommitter, error)

    // Open returns a ReadCloser from which the stored contents of the named task
    // and partition can be read. If the task and partition are not stored, an
    // error with kind errors.NotExist is returned. The offset specifies the byte
    // position from which to read.
    Open(ctx context.Context, task TaskName, partition int, offset int64) (io.ReadCloser, error)

    // Stat returns metadata for the stored slice.
    Stat(ctx context.Context, task TaskName, partition int) (sliceInfo, error)
}

Store is an abstraction that stores partitioned data as produced by a task.

type Task Uses

type Task struct {
    slicetype.Type
    // Invocation is the task's invocation, i.e. the Func invocation
    // from which this task was compiled.
    Invocation bigslice.Invocation
    // Name is the name of the task. Tasks are named uniquely inside each
    // Bigslice session.
    Name TaskName
    // Do starts computation for this task, returning a reader that
    // computes batches of values on demand. Do is invoked with readers
    // for the task's dependencies.
    Do  func([]sliceio.Reader) sliceio.Reader
    // Deps are the task's dependencies. See TaskDep for details.
    Deps []TaskDep
    // NumPartition is the number of partitions that are output by this task.
    // If NumPartition > 1, then the task must also define a partitioner.
    NumPartition int

    // Combiner specifies an (optional) combiner to use for this task's output.
    // If a Combiner is not Nil, CombineKey names the combine buffer used:
    // each combine buffer contains combiner outputs from multiple tasks.
    // If CombineKey is not set, then per-task buffers are used instead.
    Combiner   slicefunc.Func
    CombineKey string

    // Pragma comprises the pragmas of all slice operations that
    // are pipelined into this task.
    bigslice.Pragma

    // Slices is the set of slices to which this task directly contributes.
    Slices []bigslice.Slice

    // Group stores an ordered list of peer tasks. If Group is nonempty,
    // it is guaranteed that these sets of tasks constitute a shuffle
    // dependency, and share a set of shuffle dependencies. This allows
    // the evaluator to perform optimizations while tracking such
    // dependencies.
    Group []*Task

    sync.Mutex

    // Status is a status object to which task status is reported.
    Status *status.Task
    // contains filtered or unexported fields
}

A Task represents a concrete computational task. Tasks form graphs through dependencies; task graphs are compiled from slices.

Tasks also maintain executor state, and are used to coordinate execution between concurrent evaluators and a single executor (which may be evaluating many tasks concurrently). Tasks thus embed a mutex for coordination and provide a context-aware conditional variable to coordinate runtime state changes.

func (*Task) All Uses

func (t *Task) All() []*Task

All returns all tasks reachable from t. The returned set of tasks is unique.

func (*Task) Broadcast Uses

func (t *Task) Broadcast()

Broadcast notifies waiters of a state change. Broadcast must only be called while the task's lock is held.

func (*Task) Err Uses

func (t *Task) Err() error

Err returns an error if the task's state is >= TaskErr. When the state is > TaskErr, Err returns an error describing the task's failed state, otherwise, t.err is returned.

func (*Task) Error Uses

func (t *Task) Error(err error)

Error sets the task's state to TaskErr and its error to the provided error. Waiters are notified.

func (*Task) Errorf Uses

func (t *Task) Errorf(format string, v ...interface{})

Errorf formats an error message using fmt.Errorf, sets the task's state to TaskErr and its err to the resulting error message.

func (*Task) GraphString Uses

func (t *Task) GraphString() string

GraphString returns a schematic string of the task graph rooted at t.

func (*Task) Head Uses

func (t *Task) Head() *Task

Head returns the head task of this task's phase. If the task does not belong to a phase, Head returns the task t.

func (*Task) Phase Uses

func (t *Task) Phase() []*Task

Phase returns the phase to which this task belongs.

func (*Task) Set Uses

func (t *Task) Set(state TaskState)

Set sets the task's state to the provided state and notifies any waiters.

func (*Task) State Uses

func (t *Task) State() TaskState

State returns the task's current state.

func (*Task) String Uses

func (t *Task) String() string

String returns a short, human-readable string describing the task's state.

func (*Task) Subscribe Uses

func (t *Task) Subscribe(s *TaskSubscriber)

Subscribe subscribes s to be notified of any changes to t's state. If s has already been subscribed, no-op.

func (*Task) Unsubscribe Uses

func (t *Task) Unsubscribe(s *TaskSubscriber)

Unsubscribe unsubscribes previously subscribe s. s will on longer receive task state change notifications. No-op if s was never subscribed.

func (*Task) Wait Uses

func (t *Task) Wait(ctx context.Context) error

Wait returns after the next call to Broadcast, or if the context is complete. The task's lock must be held when calling Wait.

func (*Task) WaitState Uses

func (t *Task) WaitState(ctx context.Context, state TaskState) (TaskState, error)

WaitState returns when the task's state is at least the provided state, or else when the context is done.

func (*Task) WriteGraph Uses

func (t *Task) WriteGraph(w io.Writer)

WriteGraph writes a schematic string of the task graph rooted at t into w.

type TaskDep Uses

type TaskDep struct {
    // Head holds the underlying task that represents this dependency.
    // For shuffle dependencies, that task is the head task of the
    // phase, and the evaluator must expand the phase.
    Head      *Task
    Partition int

    // Expand indicates that the task's dependencies for a given
    // partition should not be merged, but rather passed individually to
    // the task implementation.
    Expand bool

    // CombineKey is an optional label that names the combination key to
    // be used by this dependency. It is used to name a single combiner
    // buffer from which is read a number of combined tasks.
    //
    // CombineKeys must be provided to tasks that contain combiners.
    CombineKey string
}

A TaskDep describes a single dependency for a task. A dependency comprises one or more tasks and the partition number of the task set that must be read at run time.

func (TaskDep) NumTask Uses

func (d TaskDep) NumTask() int

NumTask returns the number of tasks that are comprised by this dependency.

func (TaskDep) Task Uses

func (d TaskDep) Task(i int) *Task

Task returns the i'th task comprised by this dependency.

type TaskName Uses

type TaskName struct {
    // Op is a unique string describing the operation that is provided
    // by the task.
    Op  string
    // Shard and NumShard describe the shard processed by this task
    // and the total number of shards to be processed.
    Shard, NumShard int
}

A TaskName uniquely names a task by its constituent components. Tasks with 0 shards are taken to be combiner tasks: they are machine-local buffers of combiner outputs for some (non-overlapping) subset of shards for a task.

func (TaskName) IsCombiner Uses

func (n TaskName) IsCombiner() bool

IsCombiner returns whether the named task is a combiner task.

func (TaskName) String Uses

func (n TaskName) String() string

String returns a canonical representation of the task name, formatted as:

{n.Op}@{n.NumShard}:{n.Shard}
{n.Op}_combiner

type TaskState Uses

type TaskState int

TaskState represents the runtime state of a Task. TaskState values are defined so that their magnitudes correspond with task progression.

const (
    // TaskInit is the initial state of a task. Tasks in state TaskInit
    // have usually not yet been seen by an executor.
    TaskInit TaskState = iota

    // TaskWaiting indicates that a task has been scheduled for
    // execution (it is runnable) but has not yet been allocated
    // resources by the executor.
    TaskWaiting
    // TaskRunning is the state of a task that's currently being run.
    // After a task is in state TaskRunning, it can only enter a
    // larger-valued state.
    TaskRunning

    // TaskOk indicates that a task has successfully completed;
    // the task's results are available to dependent tasks.
    //
    // All TaskState values greater than TaskOk indicate task
    // errors.
    TaskOk

    // TaskErr indicates that the task experienced a failure while
    // running.
    TaskErr
    // TaskLost indicates that the task was lost, usually because
    // the machine to which the task was assigned failed.
    TaskLost
)

func (TaskState) String Uses

func (s TaskState) String() string

String returns the task's state as an upper-case string.

type TaskSubscriber Uses

type TaskSubscriber struct {
    sync.Mutex
    // contains filtered or unexported fields
}

TaskSubscriber is subscribed to a Task using Subscribe. It is then notified whenever the Task state changes. This is useful for efficiently observing the state changes of many tasks.

func NewTaskSubscriber Uses

func NewTaskSubscriber() *TaskSubscriber

NewTaskSubscriber returns a new TaskSubscriber. It needs to be subscribed to a Task with Subscribe for it to be notified of task state changes.

func (*TaskSubscriber) Notify Uses

func (s *TaskSubscriber) Notify(task *Task)

Notify notifies s of a task whose state has changed.

func (*TaskSubscriber) Ready Uses

func (s *TaskSubscriber) Ready() <-chan struct{}

Ready returns a channel that is closed if a subsequent call to Tasks will return a non-nil slice.

func (*TaskSubscriber) Tasks Uses

func (s *TaskSubscriber) Tasks() []*Task

Tasks returns the tasks whose state has changed since the last call to Tasks.

Package exec imports 50 packages (graph) and is imported by 4 packages. Updated 2019-11-19. Refresh now. Tools for package owners.