transistor

package module
v0.0.0-...-7edff62 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2018 License: MIT Imports: 17 Imported by: 35

README

Transistor CircleCI Coverage Status Go Report Card codebeat badge

Transistor allows you to run distributed workload on one or accross multiple hosts. It's a plugin based system that allows plugins to subscribe to multiple events and internal scheduler takes care of delivery. This allows multiple plugins to recieve same message do some work and respond with updated or different message.

We use a central file api.go that keeps all available messages in one place.

package plugins

import "github.com/codeamp/transistor"

func init() {
}

type Hello struct {
    Action  string
      Message string
}

you also need to register all api events with transistor so that we are able to transform it to correct type when json event payload is recieved.

All plugins need to implement Start, Stop, Subscribe and Process methods.

type Plugin interface {
	// Start starts the Plugin service, whatever that may be
	Start(chan Event) error

	// Stop stops the services and closes any necessary channels and connections
	Stop()

	// Subscribe takes in an event message and validates it for Process
	Subscribe() []string

	// Process takes in an event message and tries to process it
	Process(Event) error
}

You can create new events

func (x *ExamplePlugin1) Start(e chan transistor.Event) error {
	log.Info("starting ExamplePlugin")

	event := Hello{
		Action:  "examplePlugin1",
		Message: "Hello World from ExamplePlugin1",
	}

	e <- transistor.NewEvent(transistor.EventName("exampleplugin1"), transistor.Action("create"), nil)

	return nil
}

or respond to existing one and keep track of parent event

func (x *ExamplePlugin2) Process(e transistor.Event) error {
	if e.Name == "exampleplugin2:create" {
		hello := e.Payload().(Hello)
		log.Info("ExamplePlugin2 received a message:", hello)
	}
	return nil
}

Transistor can run on a multiple or single host. To run on multiple hosts you will need a Redis connection. This is a minimal example to set up 2 plugins:

func main() {
	config := transistor.Config{
		Server:   "0.0.0.0:16379",
		Database: "0",
		Pool:     "30",
		Process:  "1",
		Queueing: true,
		Plugins: map[string]interface{}{
			"exampleplugin1": map[string]interface{}{
				"hello":   "world1",
				"workers": 1,
			},
			"exampleplugin2": map[string]interface{}{
				"hello":   "world2",
				"workers": 1,
			},
		},
		EnabledPlugins: []string{"exampleplugin1", "exampleplugin2"},
	}

	t, err := transistor.NewTransistor(config)
	if err != nil {
		log.Fatal(err)
	}

	signals := make(chan os.Signal)
	signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
	go func() {
		sig := <-signals
		if sig == os.Interrupt || sig == syscall.SIGTERM {
			log.Info("Shutting down circuit. SIGTERM recieved!\n")
			// If Queueing is ON then workers are responsible for closing Shutdown chan
			if !t.Config.Queueing {
				t.Stop()
			}
		}
	}()

	log.InfoWithFields("plugins loaded", log.Fields{
		"plugins": strings.Join(t.PluginNames(), ","),
	})

	t.Run()
}

if you want to run on a single host and without redis you need to set Queueing: false in config. You can see and run a minimal example that uses Redis in example/ folder.

Transistor was build to power Checkr's deployment pipeline and it's used to build and deploy over 100 microservices to kubernetes.

GoDoc

Documentation

Index

Constants

This section is empty.

Variables

View Source
var EventRegistry = make(map[string]interface{})
View Source
var PluginRegistry = map[string]Creator{}

Functions

func MapPayload

func MapPayload(name string, event *Event) error

func RandomString

func RandomString(strlen int) string

func RegisterPlugin

func RegisterPlugin(name string, creator Creator, events ...interface{})

func SliceContains

func SliceContains(name string, list []string) bool

Types

type Action

type Action string

func GetAction

func GetAction(s string) Action

type Artifact

type Artifact struct {
	Source string      `json:"source,omitempty"`
	Key    string      `json:"key"`
	Value  interface{} `json:"value"`
	Secret bool        `json:"secret"`
}

func (*Artifact) Int

func (a *Artifact) Int() int

func (*Artifact) String

func (a *Artifact) String() string

func (*Artifact) StringMap

func (a *Artifact) StringMap() map[string]interface{}

func (*Artifact) StringSlice

func (a *Artifact) StringSlice() []interface{}

type Caller

type Caller struct {
	File       string `json:"file"`
	LineNumber int    `json:"line_number"`
}

type Config

type Config struct {
	Server         string
	Password       string
	Database       string
	Namespace      string
	Pool           string
	Process        string
	Plugins        map[string]interface{}
	EnabledPlugins []string
	Queueing       bool
}

type Creator

type Creator func() Plugin

type Event

type Event struct {
	ID       uuid.UUID `json:"id"`
	ParentID uuid.UUID `json:"parentId"`
	Name     EventName `json:"name"`

	Action       Action `json:"action"`
	State        State  `json:"state"`
	StateMessage string `json:"stateMessage"`

	Payload      interface{} `json:"payload"`
	PayloadModel string      `json:"payloadModel"`
	CreatedAt    time.Time   `json:"createdAt"`
	Caller       Caller      `json:"caller"`
	Artifacts    []Artifact  `json:"artifacts"`
}

func CreateEvent

func CreateEvent(eventName EventName, payload interface{}) Event

func DeleteEvent

func DeleteEvent(eventName EventName, payload interface{}) Event

func NewEvent

func NewEvent(eventName EventName, action Action, payload interface{}) Event

func UpdateEvent

func UpdateEvent(eventName EventName, payload interface{}) Event

func (*Event) AddArtifact

func (e *Event) AddArtifact(key string, value interface{}, secret bool)

func (*Event) CreateEvent

func (e *Event) CreateEvent(action Action, state State, stateMessage string) Event

func (*Event) DeleteEvent

func (e *Event) DeleteEvent(action Action, state State, stateMessage string) Event

func (*Event) Dump

func (e *Event) Dump()

func (*Event) Event

func (e *Event) Event() string

func (*Event) GetArtifact

func (e *Event) GetArtifact(key string) (Artifact, error)

func (*Event) GetArtifactFromSource

func (e *Event) GetArtifactFromSource(key string, source string) (Artifact, error)

func (*Event) Matches

func (e *Event) Matches(name string) bool

func (*Event) NewEvent

func (e *Event) NewEvent(action Action, state State, stateMessage string) Event

func (*Event) SetPayload

func (e *Event) SetPayload(payload interface{})

func (*Event) StatusEvent

func (e *Event) StatusEvent(action Action, state State, stateMessage string) Event

func (*Event) UpdateEvent

func (e *Event) UpdateEvent(action Action, state State, stateMessage string) Event

type EventName

type EventName string

type Plugin

type Plugin interface {
	// Start starts the Plugin service, whatever that may be
	Start(chan Event) error

	// Stop stops the services and closes any necessary channels and connections
	Stop()

	// Subscribe takes in an event message and validates it for Process
	Subscribe() []string

	// Process takes in an event message and tries to process it
	Process(Event) error
}

type RunningPlugin

type RunningPlugin struct {
	Name          string
	Plugin        Plugin
	Work          func(*workers.Msg)
	Enabled       bool
	Workers       int
	WorkerRetries int
}

type State

type State string

func GetState

func GetState(s string) State

type Transistor

type Transistor struct {
	Config     Config
	Events     chan Event
	TestEvents chan Event
	Shutdown   chan struct{}
	Plugins    []*RunningPlugin
}

Transistor runs codeflow and collects datt based on the given config

func NewTestTransistor

func NewTestTransistor(config Config) (*Transistor, error)

NewTestTransistor returns an Transistor struct based off the given Config

func NewTransistor

func NewTransistor(config Config) (*Transistor, error)

NewTransistor returns an Transistor struct based off the given Config

func (*Transistor) GetTestEvent

func (t *Transistor) GetTestEvent(name EventName, action Action, timeout time.Duration) (Event, error)

func (*Transistor) LoadPlugins

func (t *Transistor) LoadPlugins() error

func (*Transistor) PluginNames

func (t *Transistor) PluginNames() []string

Returns t list of strings of the configured plugins.

func (*Transistor) Run

func (t *Transistor) Run() error

Run runs the transistor daemon

func (*Transistor) Stop

func (t *Transistor) Stop()

Shutdown the transistor daemon

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL