observation

package module
v0.0.0-...-7e64b0f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2018 License: MIT Imports: 3 Imported by: 0

README

Proto Actor [Go] - Observation

Go Report Card

Go package that provides a mechanism for actors to receive notifications about changes in other actors.

Idea based on Microsoft Orleans Observers

Get started

Install package:

go get github.com/artyomturkin/protoactor-go-observation

Actor, that will be observed, must implement observation.Mixin interface:

type ExampleObservableActor struct {
    observation.Mixin

    data string
}

func (o *ExampleObservableActor) CurrentObservation() interface{} {
    return o.data
}

And send notifications about state changes:

func (o *ExampleObservableActor) Receive(ctx actor.Context) {
    switch m := ctx.Message().(type) {
    case string:
        o.data = m
        o.Notify(o.data)
    }
}

To subscribe observer actor to notifications from observable actor, tell observable actor &observation.Subscribe message with PID of observer actor:

observable.Tell(&observation.Subscribe{
    PID: observer,
})

Observer will receive observation data as a *observation.Observation object:

func (p *Observer) Receive(ctx actor.Context) {
    switch m := ctx.Message().(type) {
    case *observation.Observation:
        fmt.Printf("received observation %v\n", m)
    }
}

Full Example

package main

import (
    "fmt"
    "time"

    "github.com/AsynkronIT/protoactor-go/actor"
    "github.com/artyomturkin/protoactor-go-observation"
)

//ExampleObservableActor simple observable actor for tests
type ExampleObservableActor struct {
    observation.Mixin

    data string
}

func (o *ExampleObservableActor) CurrentObservation() interface{} {
    return o.data
}

// ensure ObservableActor implements observation.Observable interface
var _ observation.Observable = (*ExampleObservableActor)(nil)

func (o *ExampleObservableActor) Receive(ctx actor.Context) {
    switch m := ctx.Message().(type) {
    case string:
        o.data = m
        o.Notify(o.data)
    }
}

//Printer
type Printer struct{}

func (p *Printer) Receive(ctx actor.Context) {
    if m, ok := ctx.Message().(*observation.Observation); ok {
        fmt.Printf("received observation with value %v\n", m)
    }
}

func main() {
    observableProps := actor.
        FromProducer(func() actor.Actor {
            return &ExampleObservableActor{} 
        }).
        WithMiddleware(observation.Middleware)

    printerProps := actor.
        FromProducer(func() actor.Actor {
            return &Printer{} 
        })

    observable, err := actor.SpawnNamed(observableProps, "example.observable.actor")
    if err != nil {
        fmt.Printf("failed to spawn observable actor: %v\n", err)
        panic(err)
    }

    printer, err := actor.SpawnNamed(printerProps, "printer")
    if err != nil {
        fmt.Printf("failed to spawn observer actor: %v\n", err)
        panic(err)
    }

    observable.Tell(&observation.Subscribe{
        PID: printer,
    })

    observable.Tell("hello")

    observable.Tell(&observation.Unsubscribe{
        PID: printer,
    })

    observable.Tell("hello again")

    //wait for async events to complete
    time.Sleep(1 * time.Millisecond)

    //stop all actors
    observable.GracefulStop()
    printer.GracefulStop()
}

Outputs:

received observation with value &{example.observable.actor }
received observation with value &{example.observable.actor hello}

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Middleware

func Middleware(next actor.ActorFunc) actor.ActorFunc

Middleware handles initializing observable and managing subscriptions

Types

type Mixin

type Mixin struct {
	// contains filtered or unexported fields
}

Mixin add support for tracking observers and notifying them about changes

func (*Mixin) Name

func (m *Mixin) Name() string

Name return observable name

func (*Mixin) Notify

func (m *Mixin) Notify(o interface{})

Notify send observation data to observers

type Observable

type Observable interface {
	CurrentObservation() interface{}
	// contains filtered or unexported methods
}

Observable interface that actor must implement to work to be observable

Example
package main

import (
	"fmt"
	"time"

	"github.com/AsynkronIT/protoactor-go/actor"
	"github.com/artyomturkin/protoactor-go-observation"
)

// ExampleObservableActor simple observable actor for tests
type ExampleObservableActor struct {
	observation.Mixin

	data string
}

func (o *ExampleObservableActor) CurrentObservation() interface{} {
	return o.data
}

// ensure ObservableActor implements observation.Observable interface
var _ observation.Observable = (*ExampleObservableActor)(nil)

func (o *ExampleObservableActor) Receive(ctx actor.Context) {
	switch m := ctx.Message().(type) {
	case string:
		o.data = m
		o.Notify(o.data)
	}
}

// Printer
type Printer struct{}

func (p *Printer) Receive(ctx actor.Context) {
	if m, ok := ctx.Message().(*observation.Observation); ok {
		fmt.Printf("received observation with value %v\n", m)
	}
}

func main() {
	observableProps := actor.
		FromProducer(func() actor.Actor { return &ExampleObservableActor{} }).
		WithMiddleware(observation.Middleware)

	printerProps := actor.
		FromProducer(func() actor.Actor { return &Printer{} })

	observable, err := actor.SpawnNamed(observableProps, "example.observable.actor")
	if err != nil {
		fmt.Printf("failed to spawn observable actor: %v\n", err)
		panic(err)
	}

	printer, err := actor.SpawnNamed(printerProps, "printer")
	if err != nil {
		fmt.Printf("failed to spawn observer actor: %v\n", err)
		panic(err)
	}

	observable.Tell(&observation.Subscribe{
		PID: printer,
	})

	observable.Tell("hello")

	observable.Tell(&observation.Unsubscribe{
		PID: printer,
	})

	observable.Tell("hello again")

	//wait for async events to complete
	time.Sleep(1 * time.Millisecond)

	//stop all actors
	observable.GracefulStop()
	printer.GracefulStop()

}
Output:

received observation with value &{example.observable.actor }
received observation with value &{example.observable.actor hello}

type Observation

type Observation struct {
	Producer string
	Data     interface{}
}

Observation observed data and who was the producer

type Subscribe

type Subscribe struct {
	PID *actor.PID
}

Subscribe add PID to notification pool

type Unsubscribe

type Unsubscribe struct {
	PID *actor.PID
}

Unsubscribe remove PID from notification pool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL