Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Address ¶
type Address struct { FunctionType FunctionType Id string }
An Address is the unique identity of an individual {@link StatefulFunction}, containing of the function's FunctionType and an unique identifier within the type. The function's type denotes the class of function to invoke, while the unique identifier addresses the invocation to a specific function instance.
func Caller ¶
Caller returns the address of the caller function. The caller may be nil if the message was sent directly from an ingress
type ExpirationSpec ¶
type ExpirationSpec struct { ExpireMode Ttl time.Duration }
func (ExpirationSpec) String ¶
func (e ExpirationSpec) String() string
type ExpireMode ¶
type ExpireMode int
const ( NONE ExpireMode = iota AFTER_WRITE AFTER_INVOKE )
func (ExpireMode) String ¶
func (e ExpireMode) String() string
type FunctionRegistry ¶
type FunctionRegistry interface { // Handler for processing runtime messages from // an http endpoint http.Handler // Handler for processing arbitrary payloads. // This method provides compliance with AWS Lambda // handler. Invoke(ctx context.Context, payload []byte) ([]byte, error) // Register a StatefulFunction under a FunctionType. RegisterFunction(funcType FunctionType, function StatefulFunction) // Registers a function pointer as a StatefulFunction under a FunctionType. RegisterFunctionPointer( funcType FunctionType, function func(context.Context, StatefulFunctionRuntime, *anypb.Any) error) }
Keeps a mapping from FunctionType to stateful functions and serves them to the Flink runtime.
HTTP Endpoint
import "net/http" func main() { registry := NewFunctionRegistry() registry.RegisterFunction(greeterType, GreeterFunction{}) http.Handle("/service", registry) _ = http.ListenAndServe(":8000", nil) }
AWS Lambda
import "github.com/aws/aws-lambda" func main() { registry := NewFunctionRegistry() registry.RegisterFunction(greeterType, GreeterFunction{}) lambda.StartHandler(registry) }
func NewFunctionRegistry ¶
func NewFunctionRegistry() FunctionRegistry
type FunctionType ¶
A reference to a stateful function, consisting of a namespace and a name. A function's type is part of a function's Address and serves as integral part of an individual function's identity.
func (*FunctionType) String ¶
func (functionType *FunctionType) String() string
type StateSpec ¶
type StateSpec struct { StateName string ExpirationSpec }
type StatefulFunction ¶
type StatefulFunction interface { StateSpecs() []StateSpec // Invoke this function with the given input. Invoke(ctx context.Context, runtime StatefulFunctionRuntime, msg *anypb.Any) error }
StatefulFunction is a user-defined function that can be invoked with a given input. This is the primitive building block for a Stateful Functions application.
Each individual function is a uniquely invokable "instance" of a FunctionType. Each function is identified by an Address, representing the function's unique id (a string) within its type. From a user's perspective, it would seem as if for each unique function id, there exists a stateful instance of the function that is always available to be invoked within an application.
An individual function can be invoked with arbitrary input form any other function (including itself), or routed form an ingress via a Router. To executeBatch a function, the caller simply needs to know the Address of the target function. As a result of invoking a StatefulFunction, the function may continue to executeBatch other functions, modify its state, or send messages to egresses addressed by an egress identifier.
Each individual function instance may have state that is maintained by the system, providing exactly-once guarantees.
type StatefulFunctionRuntime ¶
type StatefulFunctionRuntime interface { // Get retrieves the state for the given name and // unmarshalls the encoded value contained into the provided message state. // It returns an error if the target message does not match the type // in the Any message or if an unmarshal error occurs. Get(name string, state proto.Message) (bool, error) // Set stores the value under the given name in state and // marshals the given message m into an any.Any message // if it is not already. Set(name string, value proto.Message) error // Clear deletes the state registered under the name Clear(name string) // Invokes another function with an input, identified by the target function's Address // and marshals the given message into an any.Any. Send(target *Address, message proto.Message) error // Invokes another function with an input, identified by the target function's // FunctionType and unique id after a specified delay. This method is durable // and as such, the message will not be lost if the system experiences // downtime between when the message is sent and the end of the duration. // This method marshals the given message into an any.Any. SendAfter(target *Address, duration time.Duration, message proto.Message) error // Sends an output to an EgressIdentifier. // This method marshals the given message into an any.Any. SendEgress(egress io.EgressIdentifier, message proto.Message) error }
Provides the effect tracker for a single StatefulFunction instance. The invocation's io context may be used to invoke other functions (including itself) and to send messages to egresses. Additionally, it supports reading and writing persisted state values with exactly-once guarantees provided by the runtime.