Documentation ¶
Overview ¶
Package actor provides a simple actor system. It's a framework that can poll many actors concurrently.
The following diagram shows how a system polls actors.
,------. ,-------. ,-----. ,------. ,-----. |Router| |Mailbox| |ready| |System| |Actor| `--+---' `---+---' `--+--' `--+---' `--+--' |----. | | | | | | Send(msgs) | | | | |<---' | | | | | | | | | |----. | | | | | | find proc | | | | |<---' | | | | | | | | | | proc | | | | | .Mailbox | | | | | .Send(msgs) | | | | | ---------------->| | | | | | | | | | schedule(proc) | | | | ---------------------------->| | | | | | | | | | |--. | | | | | | enqueue(proc)| | | | |<-' | | | | | | | | | | signal() | | | | |---------------->| | | | | | | | | | fetchProc() | | | | |<----------------- | | | | | | | | | return proc | | | | |---------------->| | | | | | | | | | | poll proc.Actor | | | | | --------------->| | | | | | | | | tryReceive | | | |<----- ----------------------------------------| | | | | | | | | return msgs | | | |------ --------------------------------------->| | | | | | Poll(msgs) | | | | |----. | | | | | | | | | | |<---' ,--+---. ,---+---. ,--+--. ,--+---. ,--+--. |Router| |Mailbox| |ready| |System| |Actor| `------' `-------' `-----' `------' `-----'
See docs/actor-system.svg for the relationship about System, Actor Mailbox and ready.
Index ¶
- Constants
- Variables
- func InitLogger(l Logger) error
- func InitMetrics(registry *prometheus.Registry, namespace, subsystem string)
- type Actor
- type ID
- type Logger
- type Mailbox
- type Router
- type System
- type SystemBuilder
- func (b *SystemBuilder[T]) Build() (*System[T], *Router[T])
- func (b *SystemBuilder[T]) RouterChunkCap(routerChunkCap int) *SystemBuilder[T]
- func (b *SystemBuilder[T]) Throughput(actorBatchSize, msgBatchSizePerActor int) *SystemBuilder[T]
- func (b *SystemBuilder[T]) WorkerNumber(numWorker int) *SystemBuilder[T]
Constants ¶
const ( // DefaultActorBatchSize is the default size of polled actor batch. DefaultActorBatchSize = 1 // DefaultMsgBatchSizePerActor is the default size of receive message batch. DefaultMsgBatchSizePerActor = 64 )
Variables ¶
var ( // ErrActorStopped means that an actor is in stopped state. ErrActorStopped = errors.New("actor stopped") // ErrActorNotFound means that an actor is not found. ErrActorNotFound = errors.New("actor not found") // ErrActorDuplicate means that an actor is already exist. ErrActorDuplicate = errors.New("duplicated actor") )
Functions ¶
func InitLogger ¶
InitLogger is used to set the logger for error message. The initial logger is os.Stderr.
func InitMetrics ¶
func InitMetrics(registry *prometheus.Registry, namespace, subsystem string)
InitMetrics registers all metrics of the actor package.
Types ¶
type Actor ¶
type Actor[T any] interface { // Poll handles messages that are sent to actor's mailbox. // // The ctx is only for cancellation, and an actor must be aware of // the cancellation. // // If it returns true, then the actor will be rescheduled and polled later. // If it returns false, then the actor will be removed from Router and // polled if there are still messages in its mailbox. // Once it returns false, it must always return false. // // We choose message to have a concrete type instead of an interface to save // memory allocation. Poll(ctx context.Context, msgs []message.Message[T]) (running bool) // OnClose is called after Poll returns false, // or actor system is stopping and all message has been received. // An actor should release its resources during OnClose. // // OnClose must be idempotent and nonblocking. OnClose() }
Actor is a universal primitive of concurrent computation. See more https://en.wikipedia.org/wiki/Actor_model
type Mailbox ¶
type Mailbox[T any] interface { ID() ID // Send non-blocking send a message to its actor. // Returns ErrMailboxFull when it's full. // Returns ErrActorStopped when its actor is closed. Send(msg message.Message[T]) error // SendB sends a message to its actor, blocks when it's full. // Returns ErrActorStopped when its actor is closed. // Returns context.Canceled or context.DeadlineExceeded // when context is canceled or deadline exceeded. SendB(ctx context.Context, msg message.Message[T]) error // Receive a message. // It must be nonblocking and should only be called by System. Receive() (message.Message[T], bool) // contains filtered or unexported methods }
Mailbox sends messages to an actor. Mailbox is thread-safe.
type Router ¶
type Router[T any] struct { // contains filtered or unexported fields }
Router send messages to actors.
func NewRouter4Test ¶
NewRouter4Test returns a new router. Test only.
func (*Router[T]) Broadcast ¶
Broadcast a message to all actors in the router. The message may be dropped when context is canceled.
func (*Router[T]) InsertMailbox4Test ¶
InsertMailbox4Test add a mailbox into router. Test only.
type System ¶
type System[T any] struct { // contains filtered or unexported fields }
System is the runtime of Actors.
type SystemBuilder ¶
type SystemBuilder[T any] struct { // contains filtered or unexported fields }
SystemBuilder is a builder of a system.
func NewSystemBuilder ¶
func NewSystemBuilder[T any](name string) *SystemBuilder[T]
NewSystemBuilder returns a new system builder.
func (*SystemBuilder[T]) Build ¶
func (b *SystemBuilder[T]) Build() (*System[T], *Router[T])
Build builds a system and a router.
func (*SystemBuilder[T]) RouterChunkCap ¶
func (b *SystemBuilder[T]) RouterChunkCap(routerChunkCap int) *SystemBuilder[T]
RouterChunkCap sets router's chunk capacity of a system.
func (*SystemBuilder[T]) Throughput ¶
func (b *SystemBuilder[T]) Throughput( actorBatchSize, msgBatchSizePerActor int, ) *SystemBuilder[T]
Throughput sets the throughput per-poll of a system.
func (*SystemBuilder[T]) WorkerNumber ¶
func (b *SystemBuilder[T]) WorkerNumber(numWorker int) *SystemBuilder[T]
WorkerNumber sets the number of workers of a system.