uwe

package module
v0.0.0-...-814a870 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2021 License: Apache-2.0 Imports: 11 Imported by: 0

README

GoDoc Go Report Card

uwe

UWE (Ubiquitous Workers Engine) is a common toolset for building and organizing your Go application, actor-like workers.

Table of Content

  1. Quick Start

  2. Documentation

    1. Chief
    2. Worker
    3. Presets

Quick Start

Get uwe using go get:

go get git.ooo.ua/pub/uwe/v2

Here is an example HelloWorld service with HTTP API and background worker:

package main

import (
	"fmt"
	"log"
	"net/http"
	"time"

	"git.ooo.ua/pub/uwe/v2"
	"git.ooo.ua/pub/uwe/v2/presets/api"
)


func main()  {
	// fill configurations for the predefined worker that start an HTTP server
	apiCfg := api.Config{
		Host:              "0.0.0.0",
		Port:              8080,
		EnableCORS:        false,
		ApiRequestTimeout: 0,
	}

	// initialize new instance of Chief
	chief := uwe.NewChief()
	// will add workers into the pool
	chief.AddWorker("app-server", api.NewServer(apiCfg, getRouter()))
	chief.AddWorker("dummy", NewDummy())

	// will enable recover of internal panics
	chief.UseDefaultRecover()
	// pass handler for internal events like errors, panics, warning, etc.
	// you can log it with you favorite logger (ex Logrus, Zap, etc)
	chief.SetEventHandler(uwe.STDLogEventHandler())

	// init all registered workers and run it all
	chief.Run()
}

type dummy struct{}

// NewDummy initialize new instance of dummy Worker.
func NewDummy() uwe.Worker {
	// At this point in most cases there we are preparing some state of the worker,
	// like a logger, configuration, variable, and fields.
	 return &dummy{}
}

// Init is an interface method used to initialize some state of the worker
// that required interaction with outer context, for example, initialize some connectors.
func (d *dummy) Init() error { return nil }

// Run starts event loop of worker.
func (d *dummy) Run(ctx uwe.Context) error {
	// initialize all required stuffs for the execution flow 
	ticker := time.NewTicker(time.Second)

	for {
		select {
		case <-ticker.C:
			// define all the processing code here 
			// or move it to a method and make a call here
			log.Println("do something")
		case <-ctx.Done():
			// close all connections, channels and finalise state if needed
			log.Println("good bye")
			return nil
		}
	}
}

// getRouter is used to declare an API scheme, 
func getRouter() http.Handler {
	// instead default can be used any another compatible router
	mux := http.NewServeMux()
	mux.HandleFunc("/hello/uwe", func(w http.ResponseWriter, r *http.Request) {
		_, _ = fmt.Fprintln(w, "hello world")
	})

	log.Println("REST API router initialized")
	return mux
}

Documentation

Chief

Chief is a supervisor that can be placed at the top of the go application's execution stack, it is blocked until SIGTERM is intercepted and then it shutdown all workers gracefully. Also, Chief can be used as a child supervisor inside the Worker, which is launched by Chief at the top-level.

Worker

Worker is an interface for async workers which launches and manages by the Chief.

  1. Init() - method used to initialize some state of the worker that required interaction with outer context, for example, initialize some connectors. In many cases this method is optional, so it can be implemented as empty: func (*W) Init() error { return nil }.
  2. Run(ctx Context) error - starts the Worker instance execution. The context will provide a signal when a worker must stop through the ctx.Done().

Workers lifecycle:

 (*) -> [New] -> [Initialized] -> [Run] -> [Stopped]
          |             |           |
          |             |           ↓
          |-------------|------> [Failed]
Presets

This library provides some working presets to simplify the use of Chief in projects and reduce duplicate code.

HTTP Server

api.Server is worker by default for starting a standard HTTP server. Server requires configuration and initialized http.Handler.

The HTTP server will work properly and will be correctly disconnected upon a signal from Supervisor (Chief).

Warning: this Server does not process SSL/TLS certificates on its own.To start an HTTPS server, look for a specific worker.

package main

import (
	"fmt"
	"log"
	"net/http"

	"git.ooo.ua/pub/uwe/v2"
)

func main() {
	// fill configurations for the predefined worker that start an HTTP server
	apiCfg := Config{
		Host:              "0.0.0.0",
		Port:              8080,
		EnableCORS:        false,
		ApiRequestTimeout: 0,
	}

	// initialize new instance of Chief
	chief := uwe.NewChief()
	chief.UseDefaultRecover()
	chief.SetEventHandler(uwe.STDLogEventHandler())

	// instead default can be used any another compatible router
	mux := http.NewServeMux()
	mux.HandleFunc("/hello/uwe", func(w http.ResponseWriter, r *http.Request) {
		_, _ = fmt.Fprintln(w, "hello world")
	})

	chief.AddWorker("app-server", NewServer(apiCfg, mux))

	chief.Run()
}
Job

presets.Job is a primitive worker who performs an action callback with a given period.

package main

import (
	"log"
	"time"

	"git.ooo.ua/pub/uwe/v2"
	"git.ooo.ua/pub/uwe/v2/presets"
)

func main() {
	var action = func() error {
		// define all the processing code here
		// or move it to a method and make a call here
		log.Println("do something")
		return nil
	}

	// initialize new instance of Chief
	chief := uwe.NewChief()
	chief.UseDefaultRecover()
	chief.SetEventHandler(uwe.STDLogEventHandler())

	// will add workers into the pool
	chief.AddWorker("simple-job", presets.NewJob(time.Second, action))

	chief.Run()
}
WorkerFunc

presets.WorkerFunc is a type of worker that consist from one function. Allow to use the function as worker.

package presets

import (
	"log"
	"time"

	"git.ooo.ua/pub/uwe/v2"
	"git.ooo.ua/pub/uwe/v2/presets"
)

func main() {
	var anonFuncWorker = func(ctx uwe.Context) error {
		// initialize all required stuffs for the execution flow
		ticker := time.NewTicker(time.Second)
		for {
			select {
			case <-ticker.C:
				// define all the processing code here
				// or move it to a method and make a call here
				log.Println("do something")
			case <-ctx.Done():
				// close all connections, channels and finalise state if needed
				log.Println("good bye")
				return nil
			}
		}
	}

	// initialize new instance of Chief
	chief := uwe.NewChief()
	chief.UseDefaultRecover()
	chief.SetEventHandler(uwe.STDLogEventHandler())

	// will add workers into the pool
	chief.AddWorker("anon-func", WorkerFunc(anonFuncWorker))

	chief.Run()
}

License

This library is distributed under the Apache 2.0 license.

Documentation

Index

Constants

View Source
const (
	WStateDisabled    sam.State = "Disabled"
	WStateEnabled     sam.State = "Enabled"
	WStateInitialized sam.State = "Initialized"
	WStateRun         sam.State = "Run"
	WStateStopped     sam.State = "Stopped"
	WStateFailed      sam.State = "Failed"
)

Const for worker state

Variables

View Source
var (
	//ErrWorkerNotExist custom error for not-existing worker
	ErrWorkerNotExist = func(name WorkerName) error {
		return fmt.Errorf("%s: not exist", name)
	}
)
View Source
var ForceStopTimeout = 45 * time.Second // nolint:gochecknoglobals

ForceStopTimeout is a timeout for killing all workers.

View Source
var WorkersStates = map[sam.State]struct{}{
	WStateDisabled:    {},
	WStateEnabled:     {},
	WStateInitialized: {},
	WStateRun:         {},
	WStateStopped:     {},
	WStateFailed:      {},
}

WorkersStates list of valid workers states.

Functions

This section is empty.

Types

type Chief

type Chief struct {

	// EnableByDefault sets all the working `Enabled`
	// if none of the workers is passed on to enable.
	EnableByDefault bool
	// AppName main app identifier of instance for logger and etc.
	AppName string
	// contains filtered or unexported fields
}

Chief is a head of workers, it must be used to register, initialize and correctly start and stop asynchronous executors of the type `Worker`.

func NewChief

func NewChief(name string, enableByDefault bool, logger *logrus.Entry) *Chief

NewChief creates and initialize new instance of `Chief`

func (*Chief) AddValueToContext

func (chief *Chief) AddValueToContext(key, value interface{})

AddValueToContext update chief context and set new value by key

func (*Chief) AddWorker

func (chief *Chief) AddWorker(name WorkerName, worker Worker)

AddWorker register a new `Worker` to the `Chief` worker pool.

func (*Chief) EnableWorker

func (chief *Chief) EnableWorker(name WorkerName) error

EnableWorker enables the worker with the specified `name`. By default, all added workers are enabled. After the first call of this method, only directly enabled workers will be active

func (*Chief) EnableWorkers

func (chief *Chief) EnableWorkers(names ...WorkerName) (err error)

EnableWorkers enables all worker from the `names` list. By default, all added workers are enabled. After the first call of this method, only directly enabled workers will be active

func (*Chief) GetContext

func (chief *Chief) GetContext() context.Context

GetContext returns chief context

func (*Chief) GetWorkersStates

func (chief *Chief) GetWorkersStates() map[WorkerName]sam.State

GetWorkersStates returns worker state by name map

func (*Chief) Init

func (chief *Chief) Init(logger *logrus.Entry) *Chief

Init initializes all internal states properly.

func (*Chief) IsEnabled

func (chief *Chief) IsEnabled(name WorkerName) bool

IsEnabled checks is enable worker with passed `name`.

func (*Chief) Run

func (chief *Chief) Run(workers ...WorkerName) error

Run enables passed workers, starts worker pool and lock context until it intercepts `syscall.SIGTERM`, `syscall.SIGINT`. NOTE: Use this method ONLY as a top-level action.

func (*Chief) RunWithContext

func (chief *Chief) RunWithContext(ctx context.Context, workers ...WorkerName) error

RunWithContext add function waitForSignal for RunWithLocker

func (*Chief) RunWithLocker

func (chief *Chief) RunWithLocker(locker func(), workers ...WorkerName) (err error)

RunWithLocker `locker` function should block the execution context and wait for some signal to stop.

func (*Chief) StartPool

func (chief *Chief) StartPool(parentCtx context.Context) int

StartPool runs all registered workers, locks until the `parentCtx` closes, and then gracefully stops all workers. Returns result code:

-1 — start failed
 0 — stopped properly

type CtxKey

type CtxKey string

CtxKey is the type of context keys for the values placed by`Chief`.

const (
	// CtxKeyLog is a context key for a `*logrus.Entry` value.
	CtxKeyLog CtxKey = "chief-log"
)

type ExitCode

type ExitCode int

ExitCode custom type

const (
	// ExitCodeOk means that the worker is stopped.
	ExitCodeOk ExitCode = iota
	// ExitCodeInterrupted means that the work cycle has been interrupted and can be restarted.
	ExitCodeInterrupted
	// ExitCodeFailed means that the worker fails.
	ExitCodeFailed
	// ExitReinitReq means that the worker can't do job and requires reinitialization.
	ExitReinitReq
)

type Message

type Message struct {
	UID    int64
	Target WorkerName
	Sender WorkerName
	Data   interface{}
}

Message type declaration

type WContext

type WContext interface {
	context.Context
	SendMessage(target WorkerName, data interface{}) error
	MessageBus() <-chan *Message
}

WContext declare interface for workers with context

func NewContext

func NewContext(ctx context.Context, name WorkerName, in, out chan *Message) WContext

NewContext constructor for worker with context

type Worker

type Worker interface {
	// Init initializes new instance of the `Worker` implementation,
	// this context should be used only as Key/Value transmitter,
	// DO NOT use it for `<- ctx.Done()`
	Init(ctx context.Context) Worker
	// RestartOnFail determines the need to restart the worker, if it stopped.
	RestartOnFail() bool
	// Run starts the `Worker` instance execution.
	Run(ctx WContext) ExitCode
}

Worker is an interface for async workers which launches and manages by the `Chief`.

type WorkerExistRule

type WorkerExistRule struct {
	AvailableWorkers map[WorkerName]struct{}
	// contains filtered or unexported fields
}

WorkerExistRule implements Rule interface for validation

func (*WorkerExistRule) Error

func (r *WorkerExistRule) Error(message string) *WorkerExistRule

Error sets the error message for the rule.

func (*WorkerExistRule) Validate

func (r *WorkerExistRule) Validate(value interface{}) error

Validate checks that service exist on the system

type WorkerName

type WorkerName string

WorkerName custom type

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool is

func (*WorkerPool) DisableWorker

func (pool *WorkerPool) DisableWorker(name WorkerName) error

DisableWorker sets state `WorkerDisabled` for workers with the specified `name`.

func (*WorkerPool) EnableWorker

func (pool *WorkerPool) EnableWorker(name WorkerName) error

EnableWorker sets state `WorkerEnabled` for workers with the specified `name`.

func (*WorkerPool) FailWorker

func (pool *WorkerPool) FailWorker(name WorkerName) error

FailWorker sets state `WorkerFailed` for workers with the specified `name`.

func (*WorkerPool) GetState

func (pool *WorkerPool) GetState(name WorkerName) sam.State

GetState returns current state for workers with the specified `name`.

func (*WorkerPool) GetWorkersStates

func (pool *WorkerPool) GetWorkersStates() map[WorkerName]sam.State

GetWorkersStates returns current state of all workers.

func (*WorkerPool) InitWorker

func (pool *WorkerPool) InitWorker(ctx context.Context, name WorkerName) error

InitWorker initializes all present workers.

func (*WorkerPool) IsDisabled

func (pool *WorkerPool) IsDisabled(name WorkerName) bool

IsDisabled checks is disabled worker with passed `name`.

func (*WorkerPool) IsEnabled

func (pool *WorkerPool) IsEnabled(name WorkerName) bool

IsEnabled checks is enabled worker with passed `name`.

func (*WorkerPool) IsRun

func (pool *WorkerPool) IsRun(name WorkerName) bool

IsRun checks is active worker with passed `name`.

func (*WorkerPool) ReplaceWorker

func (pool *WorkerPool) ReplaceWorker(name WorkerName, worker Worker)

ReplaceWorker replace worker in the pool

func (*WorkerPool) RunWorkerExec

func (pool *WorkerPool) RunWorkerExec(name WorkerName, ctx WContext) (err error)

RunWorkerExec adds worker into pool.

func (*WorkerPool) SetState

func (pool *WorkerPool) SetState(name WorkerName, state sam.State) error

SetState updates state of specified worker.

func (*WorkerPool) SetWorker

func (pool *WorkerPool) SetWorker(name WorkerName, worker Worker)

SetWorker adds worker into pool.

func (*WorkerPool) StartWorker

func (pool *WorkerPool) StartWorker(name WorkerName) error

StartWorker sets state `WorkerEnabled` for workers with the specified `name`.

func (*WorkerPool) StopWorker

func (pool *WorkerPool) StopWorker(name WorkerName) error

StopWorker sets state `WorkerStopped` for workers with the specified `name`.

Directories

Path Synopsis
presets
api

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL