actor

package module
v0.0.0-...-2c6bb9c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2023 License: Apache-2.0 Imports: 14 Imported by: 0

README

actor - A Minimum Go Actor Framework

GoDoc

Features

  • A minimum actor runtime.
  • Run 100k+ actors concurrently in a pool of goroutine (8*GOMAXPROCS).
  • Comprehensive metrics monitoring.

Status

This package is kind of stable, it's currently used by TiFlow project in production.

New features or bug fixes are welcome!

Examples

Ping pong
type pingpong struct {
	peer   actor.ID
	router *actor.Router[int]
}

func (p *pingpong) Poll(ctx context.Context, msgs []message.Message[int]) bool {
	select {
	case <-ctx.Done():
		return false
	default:
	}
	println("recv from peer", p.peer, msgs[0].Value)
	p.router.Send(p.peer, msgs[0])
	return true
}

func (p *pingpong) OnClose() {}

func main() {
	sys, router := actor.NewSystemBuilder[int]("ping-pong").Build()
	ctx := context.Background()
	sys.Start(ctx)

	a1 := &pingpong{peer: actor.ID(2), router: router}
	mb1 := actor.NewMailbox[int](actor.ID(1), 1)
	sys.Spawn(mb1, a1)

	a2 := &pingpong{peer: actor.ID(1), router: router}
	mb2 := actor.NewMailbox[int](actor.ID(2), 2)
	sys.Spawn(mb2, a2)

	// Initiate ping pong.
	router.Send(actor.ID(1), message.ValueMessage(0))

	time.Sleep(3 * time.Second)
	sys.Stop()
}

Documentation

Overview

Package actor provides a simple actor system. It's a framework that can poll many actors concurrently.

The following diagram shows how a system polls actors.

,------.          ,-------.    ,-----.           ,------.          ,-----.
|Router|          |Mailbox|    |ready|           |System|          |Actor|
`--+---'          `---+---'    `--+--'           `--+---'          `--+--'
   |----.             |           |                 |                 |
   |    | Send(msgs)  |           |                 |                 |
   |<---'             |           |                 |                 |
   |                  |           |                 |                 |
   |----.             |           |                 |                 |
   |    | find proc   |           |                 |                 |
   |<---'             |           |                 |                 |
   |                  |           |                 |                 |
   |  proc            |           |                 |                 |
   |    .Mailbox      |           |                 |                 |
   |    .Send(msgs)   |           |                 |                 |
   | ---------------->|           |                 |                 |
   |                  |           |                 |                 |
   |           schedule(proc)     |                 |                 |
   | ---------------------------->|                 |                 |
   |                  |           |                 |                 |
   |                  |           |--.              |                 |
   |                  |           |  | enqueue(proc)|                 |
   |                  |           |<-'              |                 |
   |                  |           |                 |                 |
   |                  |           |    signal()     |                 |
   |                  |           |---------------->|                 |
   |                  |           |                 |                 |
   |                  |           |   fetchProc()   |                 |
   |                  |           |<-----------------                 |
   |                  |           |                 |                 |
   |                  |           |   return proc   |                 |
   |                  |           |---------------->|                 |
   |                  |           |                 |                 |
   |                  |           |                 | poll proc.Actor |
   |                  |           |                 | --------------->|
   |                  |           |                 |                 |
   |                  |           |   tryReceive    |                 |
   |                  |<----- ----------------------------------------|
   |                  |           |                 |                 |
   |                  |           |  return msgs    |                 |
   |                  |------ --------------------------------------->|
   |                  |           |                 |                 | Poll(msgs)
   |                  |           |                 |                 |----.
   |                  |           |                 |                 |    |
   |                  |           |                 |                 |<---'
,--+---.          ,---+---.    ,--+--.           ,--+---.          ,--+--.
|Router|          |Mailbox|    |ready|           |System|          |Actor|
`------'          `-------'    `-----'           `------'          `-----'

See docs/actor-system.svg for the relationship about System, Actor Mailbox and ready.

Index

Constants

View Source
const (

	// DefaultActorBatchSize is the default size of polled actor batch.
	DefaultActorBatchSize = 1
	// DefaultMsgBatchSizePerActor is the default size of receive message batch.
	DefaultMsgBatchSizePerActor = 64
)

Variables

View Source
var (
	// ErrActorStopped means that an actor is in stopped state.
	ErrActorStopped = errors.New("actor stopped")
	// ErrActorNotFound means that an actor is not found.
	ErrActorNotFound = errors.New("actor not found")
	// ErrActorDuplicate means that an actor is already exist.
	ErrActorDuplicate = errors.New("duplicated actor")
)

Functions

func InitLogger

func InitLogger(l Logger) error

InitLogger is used to set the logger for error message. The initial logger is os.Stderr.

func InitMetrics

func InitMetrics(registry *prometheus.Registry, namespace, subsystem string)

InitMetrics registers all metrics of the actor package.

Types

type Actor

type Actor[T any] interface {
	// Poll handles messages that are sent to actor's mailbox.
	//
	// The ctx is only for cancellation, and an actor must be aware of
	// the cancellation.
	//
	// If it returns true, then the actor will be rescheduled and polled later.
	// If it returns false, then the actor will be removed from Router and
	// polled if there are still messages in its mailbox.
	// Once it returns false, it must always return false.
	//
	// We choose message to have a concrete type instead of an interface to save
	// memory allocation.
	Poll(ctx context.Context, msgs []message.Message[T]) (running bool)

	// OnClose is called after Poll returns false,
	// or actor system is stopping and all message has been received.
	// An actor should release its resources during OnClose.
	//
	// OnClose must be idempotent and nonblocking.
	OnClose()
}

Actor is a universal primitive of concurrent computation. See more https://en.wikipedia.org/wiki/Actor_model

type ID

type ID uint64

ID is ID for actors.

type Logger

type Logger interface {
	Printf(format string, v ...any)
	Panicf(format string, v ...any)
}

Logger is used to log error messages.

type Mailbox

type Mailbox[T any] interface {
	ID() ID
	// Send non-blocking send a message to its actor.
	// Returns ErrMailboxFull when it's full.
	// Returns ErrActorStopped when its actor is closed.
	Send(msg message.Message[T]) error
	// SendB sends a message to its actor, blocks when it's full.
	// Returns ErrActorStopped when its actor is closed.
	// Returns context.Canceled or context.DeadlineExceeded
	// when context is canceled or deadline exceeded.
	SendB(ctx context.Context, msg message.Message[T]) error

	// Receive a message.
	// It must be nonblocking and should only be called by System.
	Receive() (message.Message[T], bool)
	// contains filtered or unexported methods
}

Mailbox sends messages to an actor. Mailbox is thread-safe.

func NewMailbox

func NewMailbox[T any](id ID, cap int) Mailbox[T]

NewMailbox creates a fixed capacity mailbox. The minimum capacity is 1.

type Router

type Router[T any] struct {
	// contains filtered or unexported fields
}

Router send messages to actors.

func NewRouter4Test

func NewRouter4Test[T any](name string) *Router[T]

NewRouter4Test returns a new router. Test only.

func (*Router[T]) Broadcast

func (r *Router[T]) Broadcast(ctx context.Context, msg message.Message[T])

Broadcast a message to all actors in the router. The message may be dropped when context is canceled.

func (*Router[T]) InsertMailbox4Test

func (r *Router[T]) InsertMailbox4Test(id ID, mb Mailbox[T])

InsertMailbox4Test add a mailbox into router. Test only.

func (*Router[T]) Send

func (r *Router[T]) Send(id ID, msg message.Message[T]) error

Send a message to an actor. It's a non-blocking send. ErrMailboxFull when the actor full. ErrActorNotFound when the actor not found.

func (*Router[T]) SendB

func (r *Router[T]) SendB(ctx context.Context, id ID, msg message.Message[T]) error

SendB sends a message to an actor, blocks when it's full. ErrActorNotFound when the actor not found. Canceled or DeadlineExceeded when the context is canceled or done.

type System

type System[T any] struct {
	// contains filtered or unexported fields
}

System is the runtime of Actors.

func (*System[T]) Spawn

func (s *System[T]) Spawn(mb Mailbox[T], actor Actor[T]) error

Spawn spawns an actor in the system. Spawn is thread-safe.

func (*System[T]) Start

func (s *System[T]) Start(ctx context.Context)

Start the system. Cancelling the context to stop the system. Start is not thread-safe.

func (*System[T]) Stop

func (s *System[T]) Stop()

Stop the system, cancels all actors. It should be called after Start. Messages sent before this call will be received by actors. Stop is not thread-safe.

type SystemBuilder

type SystemBuilder[T any] struct {
	// contains filtered or unexported fields
}

SystemBuilder is a builder of a system.

func NewSystemBuilder

func NewSystemBuilder[T any](name string) *SystemBuilder[T]

NewSystemBuilder returns a new system builder.

func (*SystemBuilder[T]) Build

func (b *SystemBuilder[T]) Build() (*System[T], *Router[T])

Build builds a system and a router.

func (*SystemBuilder[T]) RouterChunkCap

func (b *SystemBuilder[T]) RouterChunkCap(routerChunkCap int) *SystemBuilder[T]

RouterChunkCap sets router's chunk capacity of a system.

func (*SystemBuilder[T]) Throughput

func (b *SystemBuilder[T]) Throughput(
	actorBatchSize, msgBatchSizePerActor int,
) *SystemBuilder[T]

Throughput sets the throughput per-poll of a system.

func (*SystemBuilder[T]) WorkerNumber

func (b *SystemBuilder[T]) WorkerNumber(numWorker int) *SystemBuilder[T]

WorkerNumber sets the number of workers of a system.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL