actorsystem

package module
v1.5.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2022 License: Apache-2.0 Imports: 9 Imported by: 18

README

actor-system

Health Check

godoc for jancajthaml-openbank/actor-system

codebeat badge

FOSSA Status

No nonsense, easiblity extensible actor system support without need or service discovery and actor tracking.

Features
  • Multiple actor systems in single service
  • No zipkin, no kafka, no heavy weight framework
  • State aware actors with actor enveloping
  • csp on actor level, parallel on application level
  • stash and blacklog support on actor instance
  • Spray actor or do something else on message receive JIT with ProcessLocalMessage and ProcessRemoteMessage
Simplest Example
import (
  "context"
  "fmt"

  system "github.com/jancajthaml-openbank/actor-system"
)

type ActorSystem struct {
  system.Support
}

func NewActorSystem() ActorSystem {
  ctx := context.Background()
  region := "MyRegion"
  lakeEndpoint := "127.0.0.1"

  return ActorSystem{
    System: system.New(ctx, region, lakeEndpoint),
  }
}

func (s ActorSystem) ProcessMessage(msg string, to system.Coordinates, from system.Coordinates) {
  fmt.Printf("%+v -> %+v says %s\n", from, to, msg)
}

func main() {
  instance := NewActorSystem()

  instance.ActorSystemSupport.RegisterOnMessage(as.ProcessMessage)

  instance.Start()
}
Messaging

Uses lake relay for remote messages relay.

When message is recieved from remote environment function registered by RegisterOnRemoteMessage is called. The simplest implementation of such function would be

func (s ActorSystem) ProcessMessage(msg string, to system.Coordinates, from system.Coordinates) {

  var message interface{}

  switch msg {

  case "X":
    message = new(XMessage)

  default:
    message = new(DefaultMessage)
  }

  s.ProcessLocalMessage(message, to, from)
}

if you want to spray actors on-demand on messages

func EchoActor(s ActorSystemSupport) func(State, Context) {
  return func(state State, context Context) {
    defer s.UnregisterActor(context.Receiver.Name)
    log.Debug("Echo actor in %+v received %+v", state, context)
  }
}
func (s ActorSystemSupport) ProcessLocalMessage(msg interface{}, to system.Coordinates, from system.Coordinates) {
  ref, err := s.ActorOf(to)
  if err != nil {
    ref = actor.NewActor(to)
    err = s.RegisterActor(ref, EchoActor(s))
    if err != nil {
      log.Warnf("Unable to register Actor [%s local]", to)
      return
    }
  }
  ref.Tell(msg, from)
}

You don't need to have actor instance to send message to remote region, you can use system.SendMessage function instead.

Lifecycle control

ActorSystemSupport has methods Start for proper system start an Stop for gracefull shutdown.

License

Licensed under Apache 2.0 see LICENSE.md for details

FOSSA Status

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b feature/my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin feature/my-new-feature)
  5. Create new Pull Request

Responsible Disclosure

I take the security of my systems seriously, and I value input from the security community. The disclosure of security vulnerabilities helps me ensure the security and integrity of my systems. If you believe you've found a security vulnerability in one of my systems or services please tell me via email.

Author

Jan Cajthaml (a.k.a johnny)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BytesToString added in v1.4.2

func BytesToString(bytes []byte) string

BytesToString casts []byte type to string, this does not copy the original value so if original slice is changed string will also change

func StringToBytes added in v1.4.2

func StringToBytes(s string) []byte

StringToBytes casts string type to []byte, this does not copy the original value so if original string is changed []byte will also change

Types

type Actor added in v1.2.0

type Actor struct {
	Name string

	Backlog chan Context
	Exit    chan interface{}
	// contains filtered or unexported fields
}

Actor represents single actor

func NewActor added in v1.2.0

func NewActor(name string, receive ReceiverFunction) *Actor

NewActor returns new actor instance

func (*Actor) Receive added in v1.2.0

func (ref *Actor) Receive(msg Context)

Receive dequeues message to actor

func (*Actor) String added in v1.2.0

func (ref *Actor) String() string

func (*Actor) Tell added in v1.2.0

func (ref *Actor) Tell(data interface{}, receiver Coordinates, sender Coordinates) (err error)

Tell queues message to actor

type Context

type Context struct {
	Data     interface{}
	Self     *Actor
	Receiver Coordinates
	Sender   Coordinates
}

Context represents actor message envelope

type Coordinates

type Coordinates struct {
	Name   string
	Region string
}

Coordinates represents actor namespace

func (Coordinates) String

func (ref Coordinates) String() string

type ProcessMessage added in v1.0.1

type ProcessMessage func(msg string, to Coordinates, from Coordinates)

ProcessMessage is a function signature definition for remote message processing

type Pusher added in v1.2.2

type Pusher struct {
	Data chan string
	// contains filtered or unexported fields
}

Pusher holds PUSH socket wrapper

func NewPusher added in v1.2.2

func NewPusher(host string) Pusher

NewPusher returns new PUSH worker connected to host

func (*Pusher) Start added in v1.2.2

func (s *Pusher) Start() error

Start creates PUSH socket and relays all Data chan to that socket

func (*Pusher) Stop added in v1.2.2

func (s *Pusher) Stop()

Stop closes socket and waits for zmq to terminate

type ReceiverFunction added in v1.5.0

type ReceiverFunction func(data Context) ReceiverFunction

ReceiverFunction is function that processes Context and transitions state

type Subber added in v1.2.2

type Subber struct {
	Data chan string
	// contains filtered or unexported fields
}

Subber holds SUB socket wrapper

func NewSubber added in v1.2.2

func NewSubber(host string, topic string) Subber

NewSubber returns new SUB worker connected to host

func (*Subber) Start added in v1.2.2

func (s *Subber) Start() error

Start creates SUB socket and relays all data from it to Data channel

func (*Subber) Stop added in v1.2.2

func (s *Subber) Stop()

Stop closes socket and waits for zmq to terminate

type System added in v1.0.1

type System struct {
	Name string
	// contains filtered or unexported fields
}

System provides support for graceful shutdown

func New added in v1.2.0

func New(name string, lakeHostname string) (System, error)

New returns new actor system fascade

func (*System) ActorOf added in v1.0.1

func (s *System) ActorOf(name string) (*Actor, error)

ActorOf return actor reference by name

func (*System) RegisterActor added in v1.0.1

func (s *System) RegisterActor(ref *Actor) error

RegisterActor register new actor into actor system

func (*System) RegisterOnMessage added in v1.0.1

func (s *System) RegisterOnMessage(f ProcessMessage)

RegisterOnMessage register callback on message receive

func (*System) SendMessage added in v1.0.1

func (s *System) SendMessage(msg string, to Coordinates, from Coordinates)

SendMessage send message to to local of remote actor system

func (*System) Start added in v1.0.1

func (s *System) Start()

Start spins PUSH and SUB workers

func (*System) Stop added in v1.0.1

func (s *System) Stop()

Stop terminates work

func (*System) UnregisterActor added in v1.0.1

func (s *System) UnregisterActor(name string)

UnregisterActor stops actor and removes it from actor system

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL