virtual

package
v0.0.0-...-3b62d95 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2023 License: MIT Imports: 29 Imported by: 1

Documentation

Index

Constants

View Source
const (
	// DiscoveryTypeLocalHost indicates that the environment should advertise its IP
	// address as localhost to the discovery service.
	DiscoveryTypeLocalHost = "localhost"
	// DiscoveryTypeRemote indicates that the environment should advertise its
	// actual IP to the discovery service.
	DiscoveryTypeRemote = "remote"
)
View Source
const DefaultHTTPRequestTimeout = 15 * time.Second
View Source
const (
	Localhost = "127.0.0.1"
)

Variables

View Source
var (
	// ErrEnvironmentClosed is an error that indicates the environment is closed.
	// It can be returned when attempting to perform an operation on a closed environment.
	ErrEnvironmentClosed = errors.New("environment is closed")

	DefaultGCActorsAfterDurationWithNoInvocations = time.Minute
)

Functions

func IsBlacklistedActivationError

func IsBlacklistedActivationError(err error) bool

IsBlacklistedActivationError returns a boolean indicating whether the error was caused by the actor being blacklisted from being activated on the server.

func NewBlacklistedActivationError

func NewBlacklistedActivationError(err error, serverIDs []string) error

NewBlacklistedActivationError creates a new BlacklistedActivationErr.

Types

type Actor

type Actor interface {
	// MemoryUsageBytes returns the estimated amount of memory the actor is using
	// in terms of bytes. This method will be called after every actor invocation
	// so its implementation should be efficient.
	MemoryUsageBytes() int

	// Close closes the in-memory actor.
	Close(ctx context.Context) error
}

Actor represents an activated actor in memory.

type ActorBytes

type ActorBytes interface {
	Actor

	// Invoke invokes the specified operation on the in-memory actor with the provided
	// payload. The transaction is invocation-specific and will automatically be
	// committed or rolled back / canceled based on whether Invoke returns an error.
	Invoke(
		ctx context.Context,
		operation string,
		payload []byte,
	) ([]byte, error)
}

ActorBytes is the version of Actor that returns responses as a []byte directly.

type ActorStream

type ActorStream interface {
	Actor

	InvokeStream(
		ctx context.Context,
		operation string,
		payload []byte,
	) (io.ReadCloser, error)
}

ActorStream is the same as ByteActor, except it can return responses as streams instead of []byte which is useful in scenarios where large amounts of data need to be shuttled around. It also allows the actor to behave in an "async" manner by return streams and then "filling them in" later.

type BlacklistedActivationErr

type BlacklistedActivationErr struct {
	// contains filtered or unexported fields
}

BlacklistedActivationErr indicates that the actor activation has been blacklisted on this specific server temporarily (usually due to resource usage or balancing reasons).

func (BlacklistedActivationErr) Error

func (b BlacklistedActivationErr) Error() string

func (BlacklistedActivationErr) HTTPStatusCode

func (b BlacklistedActivationErr) HTTPStatusCode() int

func (BlacklistedActivationErr) Is

func (b BlacklistedActivationErr) Is(target error) bool

func (BlacklistedActivationErr) ServerIDs

func (b BlacklistedActivationErr) ServerIDs() []string

type CreateActorResult

type CreateActorResult struct {
}

type Debug

type Debug interface {
	// NumActivatedActors returns the number of activated actors in the environment. It is
	// primarily used for tests.
	NumActivatedActors() int

	// Heartbeat forces the environment to heartbeat the Registry immediately. It is primarily
	// used for tests.
	Heartbeat() error
	// contains filtered or unexported methods
}

Debug contains private methods that are only used for debugging / tests.

type DiscoveryOptions

type DiscoveryOptions struct {
	// DiscoveryType is one of DiscoveryTypeLocalHost or DiscoveryTypeRemote.
	DiscoveryType string
	// Port is the port that the environment should advertise to the discovery
	// service.
	Port int
	// AllowFailedInitialHeartbeat can be set to true to allow the environment
	// to instantiate itself even if the initial heartbeat fails. This is useful
	// to avoid circular startup dependencies when using the leaderregistry
	// implementation which requires at least one environment to be up and running
	// to bootstrap the cluster.
	AllowFailedInitialHeartbeat bool
}

DiscoveryOptions contains the discovery-related options.

func (*DiscoveryOptions) Validate

func (d *DiscoveryOptions) Validate() error

type Environment

type Environment interface {
	Debug

	// RegisterGoModule registers a new Go module in the environment so it can be used in
	// subsequent calls. RegisterGoModule can be called at any time, even once the
	// Environmnt has been in use for a long time. However, the primary reason this method
	// exists (instead of being an argument provided to the Environment constructor) is
	// so that applications can write many different packages that all accept an instance
	// of Environment as a dependency and "register" whatever Go modules they need without
	// having to register all the Go modules for all the different packages in a single
	// place.
	RegisterGoModule(id types.NamespacedIDNoType, module Module) error

	// InvokeActor invokes the specified operation on the specified actorID with the
	// provided payload. If the actor is already activated somewhere in the system,
	// the invocation will be routed appropriately. Otherwise, the request will
	// activate the actor somewhere in the system and then perform the invocation.
	InvokeActor(
		ctx context.Context,
		namespace string,
		actorID string,
		moduleID string,
		operation string,
		payload []byte,
		createIfNotExist types.CreateIfNotExist,
	) ([]byte, error)

	// InvokeActorJSON is the same as InvokeActor, except it implements the functionality
	// JSON marshaling the request payload and JSON unmarshaling the response payload.
	InvokeActorJSON(
		ctx context.Context,
		namespace string,
		actorID string,
		moduleID string,
		operation string,
		payload any,
		createIfNotExist types.CreateIfNotExist,
		resp any,
	) error

	// InvokeActorStream is the same as InvokeActor, except it uses the streaming
	// interface instead of returning a []byte directly. This is useful for actors
	// that need to shuttle large volumes of data around (perhaps in an async manner).
	InvokeActorStream(
		ctx context.Context,
		namespace string,
		actorID string,
		moduleID string,
		operation string,
		payload []byte,
		createIfNotExist types.CreateIfNotExist,
	) (io.ReadCloser, error)

	// InvokeActorDirect is the same as InvokeActor, however, it performs the invocation
	// "directly".
	//
	// This method should only be called if the Registry has indicated that the specified
	// actorID should be activated in this process. If this constraint is violated then
	// inconsistencies may be introduced into the system.
	InvokeActorDirect(
		ctx context.Context,
		versionStamp int64,
		serverID string,
		serverVersion int64,
		reference types.ActorReferenceVirtual,
		operation string,
		payload []byte,
		createIfNotExist types.CreateIfNotExist,
	) ([]byte, error)

	// InvokeActorDirectStream is the same as InvokeActorDirect, except it uses the streaming
	// interface instead of returning a []byte directly. This is useful for actors that need
	// to shuttle large volumes of data around (perhaps in an async manner).
	InvokeActorDirectStream(
		ctx context.Context,
		versionStamp int64,
		serverID string,
		serverVersion int64,
		reference types.ActorReferenceVirtual,
		operation string,
		payload []byte,
		createIfNotExist types.CreateIfNotExist,
	) (io.ReadCloser, error)

	// InvokeWorker invokes the specified operation from the specified module. Unlike
	// actors, workers provide no guarantees about single-threaded execution or only
	// a single instance running at a time. This makes them easier to scale than
	// actors. They're especially useful for large workloads that don't require the
	// same guarantees actors provide.
	//
	// Also keep in mind that actor's can still "accumulate" in-memory state, just like
	// actors. However, there is no guarantee of linearizability like with Actors so
	// callers may see "inconsistent" memory state depending on which server/environment
	// their worker invocation is routed to.
	InvokeWorker(
		ctx context.Context,
		namespace string,
		moduleID string,
		operation string,
		payload []byte,
		createIfNotExist types.CreateIfNotExist,
	) ([]byte, error)

	// InvokeWorkerStream is the same as InvokeWorker, except it uses the streaming interface
	// instead of returning a []byte directly. This is useful for actors that need to shuttle
	// large volumes of data around (perhaps in an async manner).
	InvokeWorkerStream(
		ctx context.Context,
		namespace string,
		moduleID string,
		operation string,
		payload []byte,
		createIfNotExist types.CreateIfNotExist,
	) (io.ReadCloser, error)

	// Close closes the Environment and all of its associated resources.
	Close(context.Context) error
}

Environment is the interface responsible for routing invocations to the appropriate actor. If the actor is not currently activated in the environment, it will take care of activating it.

func NewDNSRegistryEnvironment

func NewDNSRegistryEnvironment(
	ctx context.Context,
	host string,
	port int,
	opts EnvironmentOptions,
) (Environment, registry.Registry, error)

NewDNSRegistryEnvironment is a convenience function that creates a virtual environment backed by a DNS-based registry. It is configured with reasonable defaults that make it suitable for production usage. Note that this convenience function is particularly nice because it can also be used for unit/integration tests and local development simply by passing virtual.Localhost as the value of host.

func NewEnvironment

func NewEnvironment(
	ctx context.Context,
	serverID string,
	reg registry.Registry,
	moduleStore registry.ModuleStore,
	client RemoteClient,
	opts EnvironmentOptions,
) (Environment, error)

NewEnvironment creates a new Environment.

func NewTestDNSRegistryEnvironment

func NewTestDNSRegistryEnvironment(
	ctx context.Context,
	opts EnvironmentOptions,
) (Environment, registry.Registry, error)

NewTestDNSRegistryEnvironment is a convenience function that creates a virtual environment backed by a DNS-based registry. It is configured already to generate a suitable setting up for writing unit/integration tests, but not for production usage.

type EnvironmentOptions

type EnvironmentOptions struct {
	// ActivationCacheTTL is the TTL of the activation cache.
	ActivationCacheTTL time.Duration
	// DisableActivationCache disables the activation cache.
	DisableActivationCache bool
	// Discovery contains the discovery options.
	Discovery DiscoveryOptions
	// ForceRemoteProcedureCalls forces the environment to *always* invoke
	// actors via RPC even if the actor is activated on the node
	// that originally received the request.
	ForceRemoteProcedureCalls bool

	// CustomHostFns contains a set of additional user-defined host
	// functions that can be exposed to activated actors. This allows
	// developeres leveraging NOLA as a library to extend the environment
	// with additional host functionality.
	CustomHostFns map[string]func([]byte) ([]byte, error)

	// GCActorsAfterDurationWithNoInvocations is the duration after which an
	// activated actor that receives no invocations will be GC'd out of memory.
	//
	// The actor's shutdown function will be invoked before the actor is GC'd.
	//
	// A value of 0 will be ignored and replaced with the default value of
	// DefaultGCActorsAfterDurationWithNoInvocations. To disable this
	// functionality entirely, just use a really large value.
	GCActorsAfterDurationWithNoInvocations time.Duration

	// MaxNumShutdownWorkers specifies the number of workers used for shutting down the active actors
	// in the environment. This determines the level of parallelism and CPU resources utilized
	// during the shutdown process. By default, all available CPUs (runtime.NumCPU()) are used.
	MaxNumShutdownWorkers int

	// Logger is a logging instance used for logging messages.
	// If no logger is provided, the default logger from the slog package (slog.Default()) will be used.
	Logger *slog.Logger
}

EnvironmentOptions is the settings for the Environment.

func (*EnvironmentOptions) Validate

func (e *EnvironmentOptions) Validate() error

type HTTPError

type HTTPError interface {
	HTTPStatusCode() int
}

HTTPError is the interface implemented by errors that map to a specific status code. It should be used in conjunction with statusCodeToErrorWrapper so that the status code is automatically set on the server, and the status code is automatically translated back into the appropriate error wrapped by the client.

type HostCapabilities

type HostCapabilities interface {
	// InvokeActor invokes a function on the specified actor.
	InvokeActor(context.Context, types.InvokeActorRequest) ([]byte, error)

	// ScheduleSelfTimer is the same as InvokeActor, except the invocation is scheduled
	// in memory to be run later on the calling actor, and only if the actor is still
	// instantiated / activated in-memory when the timer fires.
	ScheduleSelfTimer(context.Context, wapcutils.ScheduleSelfTimer) error

	// CustomFn invoke a custom (user defined) host function. This will only work if the
	// custom host function was registered with the environment when it was instantiated.
	CustomFn(
		ctx context.Context,
		operation string,
		payload []byte,
	) ([]byte, error)
}

HostCapabilities defines the interface of capabilities exposed by the host to the Actor.

type InvokeActorResult

type InvokeActorResult struct {
}

type Module

type Module interface {
	// Instantiate instantiates a new in-memory actor from the module.
	Instantiate(
		ctx context.Context,
		reference types.ActorReferenceVirtual,
		payload []byte,
		host HostCapabilities,
	) (Actor, error)
	// Close closes the modules.
	Close(ctx context.Context) error
}

Module represents a "module" / template from which new actors are constructed/instantiated.

type RemoteClient

type RemoteClient interface {
	// InvokeActorRemote is the same as Invoke, however, it performs the actor invocation on a
	// specific remote server.
	InvokeActorRemote(
		ctx context.Context,
		versionStamp int64,
		reference types.ActorReference,
		operation string,
		payload []byte,
		create types.CreateIfNotExist,
	) (io.ReadCloser, error)
}

RemoteClient is the interface implemented by a client that is capable of communicating with remote nodes in the system.

func NewHTTPClient

func NewHTTPClient() RemoteClient

NewHTTPClient returns a new HTTPClient that implements the RemoteClient interface.

type ScheduleInvocationResult

type ScheduleInvocationResult struct {
}

type Server

type Server struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewServer

func NewServer(
	moduleStore registry.ModuleStore,
	environment Environment,
) *Server

NewServer creates a new server for the actor virtual environment.

func (*Server) Start

func (s *Server) Start(port int) error

Start starts the server.

func (*Server) Stop

func (s *Server) Stop(ctx context.Context) error

Directories

Path Synopsis
kv
tuple
Package tuple provides a layer for encoding and decoding multi-element tuples into keys usable by FoundationDB.
Package tuple provides a layer for encoding and decoding multi-element tuples into keys usable by FoundationDB.
fdbregistry Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL