falco

package module
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2021 License: MIT Imports: 9 Imported by: 0

README

Falco - FaaS-based large-scale Computing

About

Library to implement custom FaaS-based massive parallel analytics piplines. Falco provides interfaces to implement a custom Driver for you'r analytics piplline, allowing you to manage execution and coordination of your functions without worrining about deployment needs.

Build With
Usage

Falco is a library, thus to use it you need to first implement some falco interfaces to use it properply. Check out the Getting Staarted section of the readme for a hands on example. In general Falco uses a Driver abstraction to manage FaaS Analytics Pipelines. A driver can Deploy, Execute and Remove workloads from falco.Platform. Falco comes with multiple pre-Build platforms, see platforms/.

Each falco.Driver need a Runtime, the runtime is the application that runs on the FaaS Platform, since a driver and a runtime are closely coupled it makes sense to developme them together.

Once you implemented the Driver and Runtime for your use-case you can use falco to get execution on multiple platforms for free.

Getting Started

Sample Driver/Runtime based on python:3 on OpenWhisk

package main

import (
	"flag"
	"fmt"
	"github.com/ISE-SMILE/falco"
	"github.com/ISE-SMILE/falco/executors"
	"github.com/ISE-SMILE/falco/platforms"
)

const payload = `def main(args):
    name = args.get("name", "stranger")
    greeting = "Hello " + name + "!"
    print(greeting)
    return {"greeting": greeting}`

type ExampleDriver struct {
	plan *falco.ExecutionPlan
}

type ExampleRuntime struct{}

func (e ExampleRuntime) Identifier() string {
	return "python:3"
}

func (e *ExampleRuntime) MakeDeployment(options *falco.Options, s ...string) (falco.Deployable, error) {
	return falco.NewStringDeployable(payload,e),nil
}

func (e *ExampleRuntime) InvocationPayload(options *falco.Options, s ...string) ([]falco.Invocation, error) {
	return []falco.Invocation{falco.NewSimpleInvocation("world",e)},nil
}

func (ed *ExampleDriver) Runtime() falco.Runtime {
	return &ExampleRuntime{}
}

func (ed *ExampleDriver)  ExecutionPlan() *falco.ExecutionPlan {
	return ed.plan
}

func (ed *ExampleDriver)  Execute(strategy falco.ExecutionStrategy, platform falco.AsyncPlatform) error {
	var stage = ed.plan
	for stage != nil {
		phase := stage.Phase

		err := strategy.Execute(phase, phase.Deployment, platform)
		if err != nil {
			return fmt.Errorf("failed on stage %s cause:%+v", phase.ID, err)
		}

		stage = stage.Next()
	}

	stage = ed.plan
	for stage != nil {
		phase := stage.Phase

		for _, invocation := range phase.Payloads {
			fmt.Printf("%s: %+v%+v\n",invocation.InvocationID(),invocation.Result(),invocation.Error())
		}
		stage = stage.Next()
	}

	return nil
}

func (ed *ExampleDriver)  Deploy(platform falco.AsyncPlatform) {
	var stage = ed.plan
	for stage != nil {
		phase := stage.Phase
		deployable,_ := ed.Runtime().MakeDeployment(nil)
		deployment,err := platform.Deploy(deployable)
		if err != nil {
			fmt.Printf("failed to deoloy stage %s cause:%+v\n", phase.ID, err)
		}
		phase.Deployment = deployment

		stage = stage.Next()
	}


}

func (ed *ExampleDriver)  Remove(platform falco.AsyncPlatform) {
	var stage = ed.plan
	for stage != nil {
		phase := stage.Phase
		err := platform.Remove(phase.Deployment)
		if err != nil {
			fmt.Printf("failed to undeploy stage %s cause:%+v\n", phase.ID, err)
		}
		stage = stage.Next()
	}
}

func main(){
	host := flag.String("host","","OpenWhisk Host")
	token := flag.String("token","","OpenWhisk Token")
	flag.Parse()

	whisk, err := platforms.NewOpenWhisk(platforms.WithHost(*host), platforms.WithAuthToken(*token))
	if err != nil{
		panic(err)
	}
	runtime := ExampleRuntime{}

	plan := &falco.ExecutionPlan{
		Deployable: nil,
		Phase:      &falco.AsyncInvocationPhase{
			ID:         "test",
			Payloads:   func() []falco.Invocation {payload,_ := runtime.InvocationPayload(nil);return payload}(),
			Deployment: nil,
		},
	}
	driver := ExampleDriver{
		plan: plan,
	}

	driver.Deploy(whisk)
	fmt.Println(driver.Execute(executors.SequentialExecutor{},whisk))
	driver.Remove(whisk)
}

Run with go run example/example.go -host=<OpenWhik Host Address> -token=<OpenWhisk Token>

Roadmap

  • Integration of Fact and Meeter Projects
  • Implementation of AWS Platform
  • Implementation of Google Platform

License

Distributed under the MIT License. See LICENSE for more information.

Acknowledgements

Created in the context of SMILE. SMILE is a Software Campus Project funded by BMBF. More Information at: https://ise-smile.github.io/.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AIMDLimiter added in v0.2.2

type AIMDLimiter struct {
	A       float64
	B       float64
	Thr     uint64
	MaxRate int
	// contains filtered or unexported fields
}

func NewAMIDLimiter added in v0.2.2

func NewAMIDLimiter(thr uint64, A, B float64, maxRPS int, clock LimiterClock) *AIMDLimiter

func (AIMDLimiter) Query added in v0.2.2

func (q AIMDLimiter) Query(ctx context.Context) (*time.Time, error)

func (*AIMDLimiter) Setup added in v0.2.2

func (a *AIMDLimiter) Setup(ctx context.Context)

func (AIMDLimiter) Signal added in v0.2.2

func (p AIMDLimiter) Signal(tick *time.Time)

func (AIMDLimiter) Take added in v0.2.2

func (p AIMDLimiter) Take(ctx context.Context) (*time.Time, error)

type AsyncInvocationPhase added in v0.2.1

type AsyncInvocationPhase struct {
	ID         string
	Payloads   []Invocation
	Deployment Deployment
	// contains filtered or unexported fields
}

func NewPhase added in v0.2.1

func NewPhase(ctx context.Context, id string,
	tasks []Invocation, control CongestionController, monitor ProgressMonitor) *AsyncInvocationPhase

func (*AsyncInvocationPhase) AsParentContext added in v0.2.1

func (j *AsyncInvocationPhase) AsParentContext() context.Context

func (*AsyncInvocationPhase) Done added in v0.2.1

func (j *AsyncInvocationPhase) Done(payloadID string)

func (*AsyncInvocationPhase) Finish added in v0.2.1

func (j *AsyncInvocationPhase) Finish()

func (*AsyncInvocationPhase) IsCanceled added in v0.2.2

func (j *AsyncInvocationPhase) IsCanceled() <-chan struct{}

func (*AsyncInvocationPhase) Log added in v0.2.1

func (j *AsyncInvocationPhase) Log(text string)

func (*AsyncInvocationPhase) Name added in v0.2.1

func (j *AsyncInvocationPhase) Name() string

func (*AsyncInvocationPhase) PrintStats added in v0.2.1

func (j *AsyncInvocationPhase) PrintStats()

func (*AsyncInvocationPhase) SubmittedPayloadFromId added in v0.2.1

func (j *AsyncInvocationPhase) SubmittedPayloadFromId(payloadID string) (Invocation, error)

func (*AsyncInvocationPhase) SubmittedTask added in v0.2.1

func (j *AsyncInvocationPhase) SubmittedTask(payload Invocation)

SubmittedTask is called to indicate that a payload was send to an invocation

func (*AsyncInvocationPhase) TakeInvocation added in v0.2.1

func (j *AsyncInvocationPhase) TakeInvocation() *time.Time

TakeInvocation enables rate limiting for invocation requests

func (*AsyncInvocationPhase) TakeQuery added in v0.2.1

func (j *AsyncInvocationPhase) TakeQuery() *time.Time

TakeQuery enabels rate limiting for api requests, e.g. for requesting the status of an invocation

func (*AsyncInvocationPhase) Wait added in v0.2.1

func (j *AsyncInvocationPhase) Wait()

func (*AsyncInvocationPhase) WithTimeout added in v0.2.1

func (j *AsyncInvocationPhase) WithTimeout(timeout time.Duration) error

type AsyncPlatform added in v0.2.1

type AsyncPlatform interface {
	Platform

	Submit(*AsyncInvocationPhase, Deployment, Invocation, chan<- Invocation, ...InvocableOptions) error
	Collect(*AsyncInvocationPhase, <-chan Invocation, ...InvocableOptions) error
	FetchActivationLog(deployment Deployment, invocation Invocation) map[string]interface{}
}

AsyncPlatform to implement a FaaS platform API that allows for async submissions of events

type CongestionController added in v0.2.2

type CongestionController interface {
	//Setup is called once after creating the controller, do your initialization here
	Setup(ctx context.Context)
	//Query blocks until a client can query again. Should be used for poll-based execution strategies
	Query(ctx context.Context) (*time.Time, error)
	//Take blocks until the next event can occur (blocks until ctx is cancelled)
	Take(ctx context.Context) (*time.Time, error)
	//Signal must be used for feedback to the controller every time an event is finished or failed, thus a new event can be send.
	Signal(took *time.Time)
}

The CongestionController takes care of managing the rate of invocations and api requests send to a platform

type Deployable

type Deployable interface {
	Payload() interface{}
	Option() *Options
	Runtime() Runtime
}

Deployable is the interfaced used for Platform and AsyncPlatform to deploy a Function

func NewStringDeployable added in v0.2.1

func NewStringDeployable(payload string, runtime Runtime) Deployable

type Deployment

type Deployment interface {
	DeploymentID() string
}

Each Platform or AsyncPlatform will return a reference to a deploy function using this interface.

type Driver added in v0.2.1

type Driver interface {

	//Runtime() the runtime used by this driver
	Runtime() Runtime

	//ExecutionPlan() set of phases this driver needs to execute
	ExecutionPlan() *ExecutionPlan

	//Execute starts the execution of the ExecutionPlan using the specified ExecutionStrategy on the specified runtime.
	//This method will block until all phases are done or a Phase encountered an error.
	Execute(strategy ExecutionStrategy, platform AsyncPlatform) error

	Deploy(platform AsyncPlatform)
	Remove(platform AsyncPlatform)
}

Driver interface to drive complex serverless applications that execute multiple invocations or ExecutionPlan.

type ExecutionPlan added in v0.2.1

type ExecutionPlan struct {
	Deployable Deployable
	Phase      *AsyncInvocationPhase
	// contains filtered or unexported fields
}

XXX: longterm - this should be a DAG instead of a List, there are pob. Phases that can run in parallel ;) but for now we assume full phase interdependencies

func NewExecutionPlan added in v0.2.1

func NewExecutionPlan(dep Deployable, task *AsyncInvocationPhase) *ExecutionPlan

func (*ExecutionPlan) Append added in v0.2.1

func (e *ExecutionPlan) Append(plan *ExecutionPlan)

func (*ExecutionPlan) Deploy added in v0.2.1

func (e *ExecutionPlan) Deploy(platform AsyncPlatform) (Deployment, error)

func (*ExecutionPlan) GetDeployment added in v0.2.1

func (e *ExecutionPlan) GetDeployment() Deployment

func (*ExecutionPlan) ListRemaining added in v0.2.1

func (e *ExecutionPlan) ListRemaining() []*ExecutionPlan

func (*ExecutionPlan) Next added in v0.2.1

func (e *ExecutionPlan) Next() *ExecutionPlan

func (*ExecutionPlan) Remove added in v0.2.1

func (e *ExecutionPlan) Remove(platform AsyncPlatform) error

func (*ExecutionPlan) UseDeployment added in v0.2.1

func (e *ExecutionPlan) UseDeployment(platform AsyncPlatform, deploymentID string) (Deployment, error)

type ExecutionStrategy

type ExecutionStrategy interface {
	Execute(*AsyncInvocationPhase, Deployment, AsyncPlatform) error
}

type InvocableOptions

type InvocableOptions interface {
	Apply()
}

type Invocation added in v0.2.1

type Invocation interface {
	//InvocationID unique task IID (can be used to associate returned invocations to submitted invocations)
	InvocationID() string
	//SubmittedAt the time this payload was send
	SubmittedAt() time.Time
	//InvocationDuration the Duration between the submitted time and the time Done was called
	InvocationDuration() time.Duration

	//IsCompleted if this payload is processed, e.g. we know it failed, or we have a result
	IsCompleted() bool
	//Error any error that occurred with this payload
	Error() error
	//Submitted sets submission time and counts the number or resubmissions
	Submitted() int8
	//GetNumberOfSubmissions returns the number of invocations to a platform
	GetNumberOfSubmissions() int8
	//SetError sets an error
	SetError(err error)

	//Runtime the runtime used to generate this invocation
	Runtime() Runtime
	//Done Set completed to true and stores completion time, calculates the duration using time.Now() if duration is nil
	Done(duration *time.Duration)

	//MarkAsResubmitted is called if a task gets resubmitted (due to error or because it seamed to straggle)
	MarkAsResubmitted()

	//SetResult can be used to set a result for each activation
	SetResult(result interface{})

	//Result returns data set with SetResult
	Result() interface{}

	//SetRuntimeReference for registering runtime identifier, like, activation-id
	SetRuntimeReference(id interface{})

	//RuntimeReference set with SetRuntimeReference
	RuntimeReference() interface{}
}

Invocation represents a single call to a FaaS function, capturing the state of this invocation.

func NewMockInvocation added in v0.2.2

func NewMockInvocation(s string, args map[string]interface{}) Invocation

func NewSimpleInvocation added in v0.2.1

func NewSimpleInvocation(id string, runtime Runtime) Invocation

type LimiterClock added in v0.2.2

type LimiterClock interface {
	Tick() <-chan time.Time
	Now() time.Time
}

func NewTimedClock added in v0.2.2

func NewTimedClock(d time.Duration) LimiterClock

type MockInvocation

type MockInvocation struct {
	IID   string
	SUB   time.Time
	COM   time.Time
	DONE  bool
	ERR   error
	Tries int8
	Delay time.Duration

	SuccessSelector func(invocation *MockInvocation) bool

	Args map[string]interface{} //can be Used for test instrumentation
	// contains filtered or unexported fields
}

func (*MockInvocation) DeploymentID added in v0.2.1

func (m *MockInvocation) DeploymentID() string

func (*MockInvocation) Done

func (m *MockInvocation) Done(duration *time.Duration)

func (*MockInvocation) Error

func (m *MockInvocation) Error() error

func (*MockInvocation) GetNumberOfSubmissions

func (m *MockInvocation) GetNumberOfSubmissions() int8

func (*MockInvocation) InvocationDuration added in v0.2.1

func (m *MockInvocation) InvocationDuration() time.Duration

func (*MockInvocation) InvocationID added in v0.2.2

func (m *MockInvocation) InvocationID() string

func (*MockInvocation) IsCompleted

func (m *MockInvocation) IsCompleted() bool

func (*MockInvocation) MarkAsResubmitted

func (m *MockInvocation) MarkAsResubmitted()

func (*MockInvocation) Result added in v0.2.1

func (m *MockInvocation) Result() interface{}

func (*MockInvocation) Runtime

func (m *MockInvocation) Runtime() Runtime

func (*MockInvocation) RuntimeReference added in v0.2.1

func (m *MockInvocation) RuntimeReference() interface{}

func (*MockInvocation) SetError

func (m *MockInvocation) SetError(err error)

func (*MockInvocation) SetResult added in v0.2.1

func (m *MockInvocation) SetResult(result interface{})

func (*MockInvocation) SetRuntimeReference added in v0.2.1

func (m *MockInvocation) SetRuntimeReference(id interface{})

func (*MockInvocation) Submitted

func (m *MockInvocation) Submitted() int8

func (*MockInvocation) SubmittedAt

func (m *MockInvocation) SubmittedAt() time.Time

func (*MockInvocation) Succeed

func (m *MockInvocation) Succeed() bool

type MockRuntime

type MockRuntime struct {
}

func (*MockRuntime) Identifier added in v0.2.1

func (m *MockRuntime) Identifier() string

func (*MockRuntime) InvocationPayload

func (m *MockRuntime) InvocationPayload(c *Options, s ...string) ([]Invocation, error)

func (*MockRuntime) MakeDeployment

func (m *MockRuntime) MakeDeployment(c *Options, s ...string) (Deployable, error)

type Monitor

type Monitor struct {
}

func (Monitor) Advance

func (m Monitor) Advance(i int)

func (Monitor) Expand

func (m Monitor) Expand(i int)

func (Monitor) Finish

func (m Monitor) Finish()

func (Monitor) Info

func (m Monitor) Info(s string)

func (Monitor) Render

func (m Monitor) Render()

func (Monitor) Setup

func (m Monitor) Setup()

type Options added in v0.2.1

type Options struct {
	// contains filtered or unexported fields
}

func NewFacloOptions added in v0.2.1

func NewFacloOptions(name string) *Options

func (*Options) Bool added in v0.2.1

func (r *Options) Bool(name string) bool

func (*Options) Duration added in v0.2.1

func (r *Options) Duration(name string, defaultValue time.Duration) time.Duration

func (*Options) Int added in v0.2.1

func (r *Options) Int(name string, defaultValue int) int

func (*Options) IsSet added in v0.2.1

func (r *Options) IsSet(name string) bool

func (*Options) Name added in v0.2.1

func (r *Options) Name() string

func (*Options) NewBoolOption added in v0.2.1

func (r *Options) NewBoolOption(name string, value bool)

func (*Options) NewDurationOption added in v0.2.1

func (r *Options) NewDurationOption(name string, value time.Duration)

func (*Options) NewIntOption added in v0.2.1

func (r *Options) NewIntOption(name string, value int)

func (*Options) NewSliceOption added in v0.2.1

func (r *Options) NewSliceOption(name string, value []string)

func (*Options) NewStingOption added in v0.2.1

func (r *Options) NewStingOption(name, value string)

func (*Options) PrefixMap added in v0.2.1

func (r *Options) PrefixMap(prefix string) map[string]string

func (*Options) Slice added in v0.2.1

func (r *Options) Slice(name string) []string

func (*Options) String added in v0.2.1

func (r *Options) String(name, defaultValue string) string

func (*Options) ToMap added in v0.2.1

func (r *Options) ToMap() map[string]string

type PIDLimiter added in v0.2.2

type PIDLimiter struct {
	*sync.Mutex

	P float64
	I float64
	D float64
	F float64 //this factor influences how much the number of open requests should dampen the allowed rate per interval

	TargetRPS       int //the targeted requests per second for the PID algortihm
	MaxOpenRequests int //the maximum allowed number of open request. The difference of this with the current ongoing request * F will be added to the control signal)
	// contains filtered or unexported fields
}

PIDLimiter implements a PID driven CongestionController that will use the PID algortithm to reach a target RPS

func NewPIDLimiter added in v0.2.2

func NewPIDLimiter(
	P, I, D, F float64,
	targetRPS, maxOpen int, clock LimiterClock) *PIDLimiter

func (PIDLimiter) Query added in v0.2.2

func (q PIDLimiter) Query(ctx context.Context) (*time.Time, error)

func (*PIDLimiter) Setup added in v0.2.2

func (p *PIDLimiter) Setup(ctx context.Context)

func (PIDLimiter) Signal added in v0.2.2

func (p PIDLimiter) Signal(tick *time.Time)

func (PIDLimiter) Take added in v0.2.2

func (p PIDLimiter) Take(ctx context.Context) (*time.Time, error)

type Platform

type Platform interface {
	//deploys a function
	Deploy(deployable Deployable) (Deployment, error)

	//fetches or creates a Deployment using the deploymentID
	FetchDeployment(deplotmentID string) (Deployment, error)

	//removes a deployment
	Remove(deployment Deployment) error
	//scales up the deployment or changes current configuration
	Scale(Deployment, ...ScaleOptions) (Deployment, error)

	//Invoke a makeDeploymentPayload directly
	Invoke(Deployment, Invocation) (Invocation, error)
}

Platform is the main interface to implement FaaS APIs for different Providers.

type ProgressMonitor

type ProgressMonitor interface {
	Setup()
	Advance(int)
	Expand(int)
	Render()
	Finish(map[string]interface{})
	Info(string)
}

type Runtime

type Runtime interface {
	Identifier() string
	//compile a set of given files to a deployment package that can be deployed to any platform
	MakeDeployment(*Options, ...string) (Deployable, error)

	//combine a set of input files to an invocation payload for a given runtime, an example would be a node-js file and all its dependeis into a zip for a nodeJS runtime.
	InvocationPayload(*Options, ...string) ([]Invocation, error)
}

Main interface for a processing application. Each Falco Application must implement at least one Runtime.

type ScaleOptions

type ScaleOptions func(deployment Deployment)

type SimpleInvocation added in v0.2.1

type SimpleInvocation struct {
	// contains filtered or unexported fields
}

func (*SimpleInvocation) Done added in v0.2.1

func (s *SimpleInvocation) Done(duration *time.Duration)

func (SimpleInvocation) Error added in v0.2.1

func (s SimpleInvocation) Error() error

func (SimpleInvocation) GetNumberOfSubmissions added in v0.2.1

func (s SimpleInvocation) GetNumberOfSubmissions() int8

func (SimpleInvocation) InvocationDuration added in v0.2.1

func (s SimpleInvocation) InvocationDuration() time.Duration

func (SimpleInvocation) InvocationID added in v0.2.1

func (s SimpleInvocation) InvocationID() string

func (SimpleInvocation) IsCompleted added in v0.2.1

func (s SimpleInvocation) IsCompleted() bool

func (SimpleInvocation) MarkAsResubmitted added in v0.2.1

func (s SimpleInvocation) MarkAsResubmitted()

func (*SimpleInvocation) Result added in v0.2.1

func (s *SimpleInvocation) Result() interface{}

func (SimpleInvocation) Runtime added in v0.2.1

func (s SimpleInvocation) Runtime() Runtime

func (SimpleInvocation) RuntimeReference added in v0.2.1

func (s SimpleInvocation) RuntimeReference() interface{}

func (*SimpleInvocation) SetError added in v0.2.1

func (s *SimpleInvocation) SetError(err error)

func (*SimpleInvocation) SetResult added in v0.2.1

func (s *SimpleInvocation) SetResult(result interface{})

func (SimpleInvocation) SetRuntimeReference added in v0.2.1

func (s SimpleInvocation) SetRuntimeReference(id interface{})

func (SimpleInvocation) Submitted added in v0.2.1

func (s SimpleInvocation) Submitted() int8

func (SimpleInvocation) SubmittedAt added in v0.2.1

func (s SimpleInvocation) SubmittedAt() time.Time

type StringDeployable added in v0.2.1

type StringDeployable struct {
	// contains filtered or unexported fields
}

func (StringDeployable) Option added in v0.2.1

func (ed StringDeployable) Option() *Options

func (StringDeployable) Payload added in v0.2.1

func (ed StringDeployable) Payload() interface{}

func (StringDeployable) Runtime added in v0.2.1

func (ed StringDeployable) Runtime() Runtime

type TimerClock added in v0.2.2

type TimerClock struct {
	// contains filtered or unexported fields
}

func (*TimerClock) Now added in v0.2.2

func (t *TimerClock) Now() time.Time

func (*TimerClock) Tick added in v0.2.2

func (t *TimerClock) Tick() <-chan time.Time

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL