actor

package
v0.0.0-...-ea15551 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2023 License: MIT Imports: 21 Imported by: 3

Documentation

Index

Constants

View Source
const LocalLookupAddr = "local"

Variables

View Source
var (
	ErrInvalidLength        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflow          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroup = fmt.Errorf("proto: unexpected end of group")
)
View Source
var File_actor_actor_proto protoreflect.FileDescriptor
View Source
var LOCK_OS_THREAD = true

Functions

This section is empty.

Types

type ActivationEvent

type ActivationEvent struct {
	PID *PID
}

ActivationEvent is broadcasted over the EventStream each time a Receiver is spawned and activated. This mean at the point of receiving this event the Receiver is ready to process messages.

type Config

type Config struct {
	// PIDSeparator separates a process ID when printed out
	// in a string representation. The default separator is "/".
	// pid := NewPID("127.0.0.1:4000", "foo", "bar")
	// 127.0.0.1:4000/foo/bar
	PIDSeparator string
}

Config holds configuration for the actor Engine.

type Context

type Context struct {
	// contains filtered or unexported fields
}

func (*Context) Child

func (c *Context) Child(id string) *PID

Child will return the PID of the child (if any) by the given name/id. PID will be nil if it could not find it.

func (*Context) Children

func (c *Context) Children() []*PID

Children returns all child PIDs for the current process.

func (*Context) Engine

func (c *Context) Engine() *Engine

Engine returns a pointer to the underlying Engine.

func (*Context) Forward

func (c *Context) Forward(pid *PID)

Forward will forward the current received message to the given PID. This will also set the "forwarder" as the sender of the message.

func (*Context) GetPID

func (c *Context) GetPID(name string, tags ...string) *PID

GetPID returns the PID of the process found by the given name and tags. Returns nil when it could not find any process..

func (*Context) Message

func (c *Context) Message() any

Message returns the message that is currently being received.

func (*Context) PID

func (c *Context) PID() *PID

PID returns the PID of the process that belongs to the context.

func (*Context) Parent

func (c *Context) Parent() *PID

Parent returns the PID of the process that spawned the current process.

func (*Context) Receiver

func (c *Context) Receiver() Receiver

Receiver returns the underlying receiver of this Context.

func (*Context) Request

func (c *Context) Request(pid *PID, msg any, timeout time.Duration) *Response

See Engine.Request for information. This is just a helper function doing that calls Request on the underlying Engine. c.Engine().Request().

func (*Context) Respond

func (c *Context) Respond(msg any)

Respond will sent the given message to the sender of the current received message.

func (*Context) Send

func (c *Context) Send(pid *PID, msg any)

Send will send the given message to the given PID. This will also set the sender of the message to the PID of the current Context. Hence, the receiver of the message can call Context.Sender() to know the PID of the process that sent this message.

func (*Context) SendRepeat

func (c *Context) SendRepeat(pid *PID, msg any, interval time.Duration) SendRepeater

SendRepeat will send the given message to the given PID each given interval. It will return a SendRepeater struct that can stop the repeating message by calling Stop().

func (*Context) Sender

func (c *Context) Sender() *PID

Sender, when available, returns the PID of the process that sent the current received message.

func (*Context) SpawnChild

func (c *Context) SpawnChild(p Producer, name string, opts ...OptFunc) *PID

SpawnChild will spawn the given Producer as a child of the current Context. If the parent process dies, all the children will be automatically shutdown gracefully. Hence, all children will receive the Stopped message.

func (*Context) SpawnChildFunc

func (c *Context) SpawnChildFunc(f func(*Context), name string, opts ...OptFunc) *PID

SpawnChildFunc spawns the given function as a child Receiver of the current Context.

type DeadLetterEvent

type DeadLetterEvent struct {
	Target  *PID
	Message any
	Sender  *PID
}

DeadLetterEvent is broadcasted over the EventStream each time a message cannot be delivered to the target PID.

type Engine

type Engine struct {
	EventStream *EventStream
	Registry    *Registry
	// contains filtered or unexported fields
}

Engine represents the actor engine.

func NewEngine

func NewEngine(cfg ...Config) *Engine

NewEngine returns a new actor Engine.

func (*Engine) Address

func (e *Engine) Address() string

Address returns the address of the actor engine. When there is no remote configured, the "local" address will be used, otherwise the listen address of the remote.

func (*Engine) Poison

func (e *Engine) Poison(pid *PID, wg ...*sync.WaitGroup)

Poison will send a poisonPill to the process that is associated with the given PID. The process will shut down once it processed all its messages before the poisonPill was received. If given a WaitGroup, you can wait till the process is completely shutdown.

func (*Engine) Request

func (e *Engine) Request(pid *PID, msg any, timeout time.Duration) *Response

Request sends the given message to the given PID as a "Request", returning a response that will resolve in the future. Calling Response.Result() will block until the deadline is exceeded or the response is being resolved.

func (*Engine) Send

func (e *Engine) Send(pid *PID, msg any)

Send sends the given message to the given PID. If the message cannot be delivered due to the fact that the given process is not registered. The message will be send to the DeadLetter process instead.

func (*Engine) SendLocal

func (e *Engine) SendLocal(pid *PID, msg any, sender *PID)

func (*Engine) SendRepeat

func (e *Engine) SendRepeat(pid *PID, msg any, interval time.Duration) SendRepeater

SendRepeat will send the given message to the given PID each given interval. It will return a SendRepeater struct that can stop the repeating message by calling Stop().

func (*Engine) SendWithSender

func (e *Engine) SendWithSender(pid *PID, msg any, sender *PID)

SendWithSender will send the given message to the given PID with the given sender. Receivers receiving this message can check the sender by calling Context.Sender().

func (*Engine) Spawn

func (e *Engine) Spawn(p Producer, name string, opts ...OptFunc) *PID

Spawn spawns a process that will producer by the given Producer and can be configured with the given opts.

func (*Engine) SpawnFunc

func (e *Engine) SpawnFunc(f func(*Context), id string, opts ...OptFunc) *PID

func (*Engine) SpawnProc

func (e *Engine) SpawnProc(p Processor) *PID

SpawnProc spawns the give Processor. This function is usefull when working with custom created Processes. Take a look at the streamWriter as an example.

func (*Engine) WithRemote

func (e *Engine) WithRemote(r Remoter)

WithRemote returns a new actor Engine with the given Remoter, and will call its Start function

type Envelope

type Envelope struct {
	Msg    any
	Sender *PID
}

type EventStream

type EventStream struct {
	// contains filtered or unexported fields
}

func NewEventStream

func NewEventStream() *EventStream

func (*EventStream) Len

func (e *EventStream) Len() int

func (*EventStream) Publish

func (e *EventStream) Publish(msg any)

func (*EventStream) Subscribe

func (e *EventStream) Subscribe(f EventStreamFunc) *EventSub

func (*EventStream) Unsubscribe

func (e *EventStream) Unsubscribe(sub *EventSub)

type EventStreamFunc

type EventStreamFunc func(event any)

type EventSub

type EventSub struct {
	// contains filtered or unexported fields
}

type Inbox

type Inbox struct {
	// contains filtered or unexported fields
}

func NewInbox

func NewInbox(size int) *Inbox

func (*Inbox) Consume

func (in *Inbox) Consume(msgs []Envelope)

func (*Inbox) Send

func (in *Inbox) Send(msg Envelope)

func (*Inbox) Start

func (in *Inbox) Start(proc Processor)

func (*Inbox) Stop

func (in *Inbox) Stop() error

type Inboxer

type Inboxer interface {
	Send(Envelope)
	Start(Processor)
	Stop() error
}

type Initialized

type Initialized struct{}

type InternalError

type InternalError struct {
	From string
	Err  error
}

type MiddlewareFunc

type MiddlewareFunc = func(ReceiveFunc) ReceiveFunc

type OptFunc

type OptFunc func(*Opts)

func WithInboxSize

func WithInboxSize(size int) OptFunc

func WithMaxRestarts

func WithMaxRestarts(n int) OptFunc

func WithMiddleware

func WithMiddleware(mw ...MiddlewareFunc) OptFunc

func WithRestartDelay

func WithRestartDelay(d time.Duration) OptFunc

func WithTags

func WithTags(tags ...string) OptFunc

type Opts

type Opts struct {
	Producer     Producer
	Name         string
	Tags         []string
	MaxRestarts  int32
	RestartDelay time.Duration
	InboxSize    int
	Middleware   []MiddlewareFunc
}

func DefaultOpts

func DefaultOpts(p Producer) Opts

DefaultOpts returns default options from the given Producer.

type PID

type PID struct {
	Address string `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"`
	ID      string `protobuf:"bytes,2,opt,name=ID,proto3" json:"ID,omitempty"`
	// contains filtered or unexported fields
}

func NewPID

func NewPID(address, id string, tags ...string) *PID

NewPID returns a new Process ID given an address, name, and optional tags. TODO(@charlesderek) Can we even optimize this more?

func (*PID) Child

func (pid *PID) Child(id string, tags ...string) *PID

func (*PID) CloneMessageVT

func (m *PID) CloneMessageVT() proto.Message

func (*PID) CloneVT

func (m *PID) CloneVT() *PID

func (*PID) Descriptor deprecated

func (*PID) Descriptor() ([]byte, []int)

Deprecated: Use PID.ProtoReflect.Descriptor instead.

func (*PID) EqualMessageVT

func (this *PID) EqualMessageVT(thatMsg proto.Message) bool

func (*PID) EqualVT

func (this *PID) EqualVT(that *PID) bool

func (*PID) Equals

func (pid *PID) Equals(other *PID) bool

func (*PID) GetAddress

func (x *PID) GetAddress() string

func (*PID) GetID

func (x *PID) GetID() string

func (*PID) HasTag

func (pid *PID) HasTag(tag string) bool

func (*PID) LookupKey

func (pid *PID) LookupKey() uint64

func (*PID) MarshalToSizedBufferVT

func (m *PID) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*PID) MarshalToSizedBufferVTStrict

func (m *PID) MarshalToSizedBufferVTStrict(dAtA []byte) (int, error)

func (*PID) MarshalToVT

func (m *PID) MarshalToVT(dAtA []byte) (int, error)

func (*PID) MarshalToVTStrict

func (m *PID) MarshalToVTStrict(dAtA []byte) (int, error)

func (*PID) MarshalVT

func (m *PID) MarshalVT() (dAtA []byte, err error)

func (*PID) MarshalVTStrict

func (m *PID) MarshalVTStrict() (dAtA []byte, err error)

func (*PID) ProtoMessage

func (*PID) ProtoMessage()

func (*PID) ProtoReflect

func (x *PID) ProtoReflect() protoreflect.Message

func (*PID) Reset

func (x *PID) Reset()

func (*PID) SizeVT

func (m *PID) SizeVT() (n int)

func (*PID) String

func (pid *PID) String() string

func (*PID) UnmarshalVT

func (m *PID) UnmarshalVT(dAtA []byte) error

type Processor

type Processor interface {
	Start()
	PID() *PID
	Send(*PID, any, *PID)
	Invoke([]Envelope)
	Shutdown(*sync.WaitGroup)
}

Processor is an interface the abstracts the way a process behaves.

type Producer

type Producer func() Receiver

Producer is any function that can return a Receiver

func NewTestProducer

func NewTestProducer(t *testing.T, f TestReceiveFunc) Producer

type ReceiveFunc

type ReceiveFunc = func(*Context)

type Receiver

type Receiver interface {
	Receive(*Context)
}

Receiver is an interface that can receive and process messages.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

func (*Registry) Remove

func (r *Registry) Remove(pid *PID)

type Remoter

type Remoter interface {
	Address() string
	Send(*PID, any, *PID)
	Start()
}

type Response

type Response struct {
	// contains filtered or unexported fields
}

func NewResponse

func NewResponse(e *Engine, timeout time.Duration) *Response

func (*Response) Invoke

func (r *Response) Invoke([]Envelope)

func (*Response) PID

func (r *Response) PID() *PID

func (*Response) Result

func (r *Response) Result() (any, error)

func (*Response) Send

func (r *Response) Send(_ *PID, msg any, _ *PID)

func (*Response) Shutdown

func (r *Response) Shutdown(_ *sync.WaitGroup)

func (*Response) Start

func (r *Response) Start()

type SendRepeater

type SendRepeater struct {
	// contains filtered or unexported fields
}

func (SendRepeater) Stop

func (sr SendRepeater) Stop()

type Started

type Started struct{}

type Stopped

type Stopped struct{}

type TerminationEvent

type TerminationEvent struct {
	PID *PID
}

TerminationEvent is broadcasted over the EventStream each time a process is terminated.

type TestReceiveFunc

type TestReceiveFunc func(*testing.T, *Context)

type TestReceiver

type TestReceiver struct {
	OnReceive TestReceiveFunc
	// contains filtered or unexported fields
}

func (*TestReceiver) Receive

func (r *TestReceiver) Receive(ctx *Context)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL