actress

package module
v0.0.0-...-3d43bcc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2024 License: AGPL-3.0 Imports: 22 Imported by: 0

README

Actress

A Concurrent Actor framework written in Go.

Overview

Create custom processes where what the processes do are either your own piece of code, or it can be a command called from the Operating system. The processes can communicate by sending events to pass the result from one processes to the next for further processing, or by chaining together process as workflows to create a series of Events that together will provide some end result.

Processes

A process are like a module capable of performing a specific tasks. The nature of the process is determined by an EventType and a Function attached to each process. A process have an InCh for receiving events, and an AddEvent for sending Events. The processes can themselves spawn new processes. Processes can also send Event messages to other processes.

A process can hold state within the Process Function.

Events

To initiate and trigger the execution of the process's function, we send events. Each process has its own unique event name. Events serve as communication channels within the system. They can carry data, either with the result of something a process did to pass it on to the next process for further processing, instructions for what a process should do, or both. An event can contain a chain of events to create workflows of what do do and in what order by using the NextEvent feature (see examples for usage).

type Event struct {
    // EventType is a unique name to identify the type of the event.
    EventType EventType 
    // Cmd is usually used for giving instructions or parameters for
    // what an event shall do.
    Cmd       []string
    // Data usually carries the data from one process to the next. Example
    // could be a file read on process1 is put in the Data field, and
    // passed on to process2 to be unmarshaled.
    Data      []byte
    // Err is used for defining the error message when the event is used
    // as an error event.
    Err       error
    // NextEvent defines a series of events to be executed like a workflow.
    // The receiving process should check this field for what kind of event
    // to create as the next step in the workflow.    
    NextEvent *Event 
}
Event Functions

Event Functions holds the logic for what a process shall do when an event is received, and what to do with the data the event carries. The Event functions are callback functions that are executed when a process are created.

The programmer can decide if the Process Function should depend on the input from the input channel of the process, or just continously do some work on it's own. For an event function to be triggered to work on events it should hold a for loop that listens on the Process InCh for new Events.

Examples

Check out the test files for examples for how to define an Event and it's Process function, or for more complete examples check out the examples folder.

Quick start
package main

import (
    "context"
    "fmt"
    "log"
    "strings"
    "time"

    "github.com/postmannen/actress"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Create a new root process.
    rootAct := actress.NewRootProcess(ctx)
    // Create a test channel where we receive the end result.
    testCh := make(chan string)

    // Define two event types for two processes.
    const ETTest1 actress.EventType = "ETTest1"
    const ETTest2 actress.EventType = "ETTest2"

    // Define the first function that will be attached to the ETTest1 EventType process.
    test1Func := func(ctx context.Context, p *actress.Process) func() {
        fn := func() {
            for {
                select {
                case ev := <-p.InCh:
                    upper := strings.ToUpper(string(ev.Data))
                    // Pass on the processing to the next process, and use the NextEvent we have specified in main
                    // for the EventType, and add the result of ToUpper to the data field.
                    p.AddEvent(actress.Event{EventType: ev.NextEvent.EventType, Data: []byte(upper)})
                case <-ctx.Done():
                    return
                }
            }
        }
        return fn
    }

    // Define the second function that will be attached to the ETTest2 EventType process.
    test2Func := func(ctx context.Context, p *actress.Process) func() {
        fn := func() {
            for {
                select {
                case result := <-p.InCh:
                    dots := string(result.Data) + "..."

                    // Send the result on the testCh so we are able to to receive it in main().
                    testCh <- string(dots)

                    // Also create an informational error message.
                    p.AddError(actress.Event{EventType: actress.ERDebug, Err: fmt.Errorf("info: done with the acting")})

                case <-ctx.Done():
                    return
                }
            }
        }
        return fn
    }

    // Register the event types and event function to processes.
    actress.NewProcess(ctx, *rootAct, ETTest1, test1Func).Act()
    actress.NewProcess(ctx, *rootAct, ETTest2, test2Func).Act()

    // Start all the registered processes.
    err := rootAct.Act()
    if err != nil {
        log.Fatal(err)
    }

    // Pass in an event destined for an ETTest1 EventType process, and also specify
    // the next event to be used when passing the result on from ETTest1 to the next
    // process which here is ETTest2.
    rootAct.AddEvent(actress.Event{EventType: ETTest1, Data: []byte("test"), NextEvent: &actress.Event{EventType: ETTest2}})

    // Wait and receive the result from the ETTest2 process.
    fmt.Printf("The result: %v\n", <-testCh)

    time.Sleep(time.Second * 2)
    cancel()
}

Details

Custom Events Processes

Custom Event Processes allows for dynimally adding new EventTypes and Processes at runtime. This feature is enabled by setting the CUSTOMEVENTS env variable to true, and also the path for where to look for configs with CUSTOMEVENTSPATH. The folder is continously being watched for changes, so any updates to config JSON files will be activated immediately. For adding custom event, put files in the CUSTOMEVENTSPATH with the extension .json. The files should be in the same format as an Event.

{"Name":"ET1","Cmd":["/bin/bash","-c"]}

The example above will automatically create a Process that have an EventType of ET1. We can then send Events using ET1 as the EventType, and what we put in Event.Cmd will be appended to the existing values that already exist in the Custom Event. Example follows.

Custom Event Example 1

We use the Custom Event from above, and add a new event using ET1 like this:

p.AddEvent(Event{EventType: EventType("ET1"), Cmd: []string{"ls -l"}})

When the Event is received at the ET1 process it's Cmd is appended what was defined earlier when creating the ET1 Process. The end result of the Cmd field will be []string{"/bin/bash","-c","ls -l"} which is then executed.

Custom Event Example 2

We can also add the whole command to be executed in the .json file likes this.

{"Name":"ETBleeping","Cmd":["/bin/bash","-c","curl -L https://bleepingcomputer.com"]}

Since this Event specification is complete in itself we don't have to use the Cmd field when adding an Event to use it.

p.AddEvent(Event{EventType: EventType("ETBleeping")})

NextEvent

NextEvent makes it possible to define an event as a chain of Events. An example could be that we want to get the content of a web page, and print the result to the screen. We could do that in the following way.

p.AddEvent(Event{EventType: EventType("ETBleeping"), NextEvent: &Event{EventType: ETPrint}})

Dynamic Processes

The purpose of dynamic processes is to have short lived processes that can be quickly started, and removed again when it's job is done. The only difference between a "normal" process and a dynamic process are that the dynamic processes have a mutex in the processes map DynProcesses so we also can delete the processes when they are no longer needed.

A typical example could be that there is a processes that needs to communicate in some other way with another process than cant be done with the current process's event channel. We can then spawn a dynamic process to take care of that. Check out the test and files in the examples directory. One process can spawn as many dynamic processes as it needs.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CheckEnv

func CheckEnv[T any](key string, v T) any

Check if an env variable is set. If found, return the value. Takes the name of the env variable, and the actual variable containing a default value as it's input.

func ETCustomEventFn

func ETCustomEventFn(ctx context.Context, p *Process) func()

func ETReadFileFn

func ETReadFileFn(ctx context.Context, p *Process) func()

func NewUUID

func NewUUID() string

Will create and return a new UUID.

func WrapperCustomCmd

func WrapperCustomCmd(command []string) func(ctx context.Context, p *Process) func()

Wrapper around creating an etFunc. The use of this wrapper function is to insert some predefined values into the cmd to be executed when a process for the eventType is created. When an event later is reveived, the content of the Event.Cmd field is appended to what is already defined there from earlier. An example of this is that we define the content of the command to be executed when the process is defined to contain []string{"/bin/bash","-c"}. When an event later is received and handled by this function, the contend of the .Cmd field is appended to the predefined fields, and will for example give a result to be executed like []string{"/bin/bash","-c","ls -l|grep file.txt"}.

Types

type Buffer

type Buffer struct {
	// contains filtered or unexported fields
}

func NewBuffer

func NewBuffer() *Buffer

func (*Buffer) Read

func (bu *Buffer) Read(p []byte) (int, error)

func (*Buffer) Write

func (bu *Buffer) Write(b []byte) (int, error)

type Config

type Config struct {
	Profiling        string
	CustomEvents     bool
	Metrics          bool
	CustomEventsPath string
}

func NewConfig

func NewConfig() *Config

New config will check flags and env variables set, and prepare and return the resulting *config.

type ETFunc

type ETFunc func(context.Context, *Process) func()

Function type describing the signature of a function that is to be used when creating a new process.

type Event

type Event struct {
	Nr int
	// EventType is a unique name to identify the type of the event.
	EventType EventType `json:"eventType" yaml:"eventType"`
	// Cmd is usually used for giving instructions or parameters for
	// what an event shall do.
	Cmd []string `json:"cmd" yaml:"cmd"`
	// Data usually carries the data from one process to the next. Example
	// could be a file read on process1 is put in the Data field, and
	// passed on to process2 to be unmarshaled.
	Data []byte `json:"data" yaml:"data"`
	// Err is used for defining the error message when the event is used
	// as an error event.
	Err error `json:"error" yaml:"error"`
	// NextEvent defines a series of events to be executed like a workflow.
	// The receiving process should check this field for what kind of event
	// to create as the next step in the workflow.
	NextEvent *Event `json:"event" yaml:"event"`
}

Event defines an event. It holds:

  • The EventType, which specifies the process are meant for.
  • The Cmd, are meant to but not limited to be a way to give instructions for what a process should do. The receiving process are responsible for parsing the string slice into something useful.
  • The Data field are ment to carry the result from the work done by a process, to the next process.
  • Both Cmd and Data can be used interchangeably if it makes more sense for a given scenario. No strict rules for this exist. Just make sure to document the use of the given EventType, so the structure of how to use the fields exist.
  • Err, are used by the error event type (ER).
  • NextEvent are used when we want to define a chain of events to be executed. The processes must make use of the field for this to work. Check out the examples folder for a simple example for how it could be implemented.

func NewEvent

func NewEvent(et EventType, opts ...EventOpt) *Event

type EventOpt

type EventOpt func(*Event)

func EVData

func EVData(b []byte) EventOpt

func EvCmd

func EvCmd(cmd []string) EventOpt

func EvNext

func EvNext(nev *Event) EventOpt

type EventRW

type EventRW struct {
	P    *Process
	Ev   *Event
	Info string
	Pos  int
}

func NewEventRW

func NewEventRW(p *Process, ev *Event, info string) *EventRW

NewEventRW will return a type that adds Read and Write methods to the Event type.

func (*EventRW) Read

func (m *EventRW) Read(b []byte) (int, error)

Read the data into b.

func (*EventRW) Write

func (m *EventRW) Write(b []byte) (int, error)

Write the data into Event.Data, and put the event into the EventCh to be processed.

type EventType

type EventType string

EventType is a unique name used to identify events. It is used both for creating processes and also for routing messages to the correct process.

const EDRouter EventType = "EDRouter"

Router for normal events.

const ERDebug EventType = "ERDebug"

Log debug errors.

const ERFatal EventType = "ERFatal"

Log and exit system.

const ERLog EventType = "ERLog"

Log errors.

const ERRouter EventType = "ERRouter"

Router for error events.

const ERTest EventType = "ERTest"

Log and exit system.

const ETCustomEvent EventType = "ETCustomEvent"

ETCustomEvent are used when reading custom user defined events to create processes from disk. It expects it's input in the Data field of the event to be the JSON serialized data of a custom Event. The unmarshaled custom event will then be used to prepare and start up a process for the new EventType.

const ETDone EventType = "ETDone"

Done don't currently do anything.

const ETExit EventType = "ETExit"

Will exit and kill all processes.

const ETOsCmd EventType = "etOsCmd"

Execute OS commands. The command to execute should be put in the first slot of the arrat at Event.Cmd[0], and all arguments should be put int the sub sequent slots. To make it simpler to run commands without splitting the up on Linux like operating systems use the -c flag with bash. Example, Event{EventType: etOsCmd, Cmd: ["bash","-c","ls -l|grep myfile"]}.

const ETOsSignal EventType = "ETOsSignal"

Press ctrl+c to exit.

const ETPid EventType = "ETPid"

Handling pids within the system. The structure of the ev.Cmd is a slice of string: []string{"action","pid","process name"}

const ETPidGetAll EventType = "ETPidGetAll"

Get all the current processes running. Will return a json encoded PidVsProcMap.

const ETPrint EventType = "ETPrint"

Print the content of the .Data field of the event to stdout.

const ETProfiling EventType = "ETprofiling"

Profiling.

const ETReadFile EventType = "ETReadFile"

Read file. The path path to read should be in Event.Cmd[0].

const ETRoot EventType = "ETRoot"

The main Root process.

const ETRouter EventType = "ETRouter"

Router for normal events.

const ETTestCh EventType = "ETTestCh"

Will forward the incomming event to the Process.TestCh.

const ETWatchEventFile EventType = "ETWatchEventFile"

type PidVsProcMap

type PidVsProcMap map[pidnr]*Process

type Process

type Process struct {

	// Channel to receive events into the process function.
	InCh chan Event `json:"-"`
	// Channel to send events to be picked up by other processes.
	EventCh chan Event `json:"-"`
	// Channel to send error events.
	ErrorCh chan Event `json:"-"`
	// Channel for getting the result in tests.
	TestCh chan Event `json:"-"`
	// Channel to use for routing events for dynamic processes.
	DynCh chan Event `json:"-"`
	// The event type for the process.
	Event EventType
	// Maps for various process information.
	Processes *processes
	// Map of dynamic processes
	DynProcesses *dynProcesses
	// Maps for various errProcess information
	ErrProcesses *errProcesses

	// Holding all configuration settings.
	Config *Config

	// PID of the process
	PID pidnr
	// Cancel func
	Cancel context.CancelFunc `json:"-"`
	// contains filtered or unexported fields
}

Process defines a process.

func NewDynProcess

func NewDynProcess(ctx context.Context, parentP Process, event EventType, fn ETFunc) *Process

func NewErrProcess

func NewErrProcess(ctx context.Context, parentP Process, event EventType, fn ETFunc) *Process

NewErrProcess will prepare and return a *Process. It will copy channels and map structures from the root process.

func NewProcess

func NewProcess(ctx context.Context, parentP Process, event EventType, fn ETFunc) *Process

NewProcess will prepare and return a *Process. It will copy channels and map structures from the root process.

func NewRootProcess

func NewRootProcess(ctx context.Context) *Process

NewRootProcess will prepare and return the root process which holds all the core elements needed, like the main channels for events and errors, and varouis registers or maps holding information about the system. Later created processes will reference these elements when they are created. The root process will also start up all the essential other processes needed, like the event router, and various standard error handling processes.

func (*Process) Act

func (p *Process) Act() error

Will start the current process.

func (*Process) AddDynEvent

func (p *Process) AddDynEvent(event Event)

Will add an event to be handled by the processes.

func (*Process) AddError

func (p *Process) AddError(event Event)

Will add an error to be handled by the error processes.

func (*Process) AddEvent

func (p *Process) AddEvent(event Event)

Will add an event to be handled by the processes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL