supervisor

package module
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2021 License: MIT Imports: 10 Imported by: 0

README

go.einride.tech/supervisor

A service that manages service lifetimes.

A supervisor is essentially a more capable errgroup. It monitors a set of running services, and restarts them if they fail. The supervisor keeps track of the status of each service and reports any status changes to listeners via a callback.

Examples

Supervising multiple services

Just as with errgroups, a supervisor can manage multiple services.

func ExampleSupervisor() {
	// Restart stopped services every 10ms.
	cfg := supervisor.Config{
		RestartInterval: 10 * time.Millisecond,
		// No specified clock returns system clock
		// No specified logger returns a nop-logger
	}
	// Create a context which can be canceled.
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	// Create pingpong table
	table := make(chan int)
	roundsToPlay := 2
	// Create player services.
	pingService := supervisor.NewService("ping", func(ctx context.Context) error {
		i := roundsToPlay
		for {
			select {
			case <-ctx.Done():
				return fmt.Errorf("timeout")
			case table <- i:
				fmt.Println("ping")
				i = <-table
				if i == 0 {
					close(table)
					cancel()
					return nil
				}
			}
		}
	})
	pongService := supervisor.NewService("pong", func(ctx context.Context) error {
		for {
			select {
			case <-ctx.Done():
				return fmt.Errorf("timeout")
			case i := <-table:
				if i == 0 {
					return nil
				}
				table <- i - 1
				fmt.Println("pong")
			}
		}
	})
	// Add service to the supervised services.
	cfg.Services = append(cfg.Services, pingService, pongService)
	// Create the supervisor from the config.
	s := supervisor.New(&cfg)
	// Run the supervisor (blocking call).
	err := s.Run(ctx)
	if err != nil {
		// handle error
		panic(err)
	}
	defer cancel()
	// Output:
	// ping
	// pong
	// ping
	// pong
}
Restarting crashed services

The main difference from errgroups is that a supervisor will restart a crashed service.

func ExampleRestartOnError() {
	// Restart stopped services every 10ms.
	cfg := supervisor.Config{
		RestartInterval: 10 * time.Millisecond,
	}
	ctx, cancel := context.WithCancel(context.Background())
	starts := 0
	svc := supervisor.NewService("example", func(ctx context.Context) error {
		starts++
		if starts > 3 {
			cancel()
			return nil
		}
		return fmt.Errorf("oops")
	})
	cfg.Services = append(cfg.Services, svc)
	s := supervisor.New(&cfg)
	if err := s.Run(ctx); err != nil {
        // no error currently returned
	}
	fmt.Println("service restarted", starts, "times")
	// Output:
	// service restarted 3 times
}

Documentation

Overview

Package supervisor provides a supervisor that monitors, manages the lifetime and reports status on services.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Services              []Service
	StatusUpdateListeners []func([]StatusUpdate)
	RestartInterval       time.Duration
	Clock                 clock.Clock
	Logger                logr.Logger
}

Config contains the full set of dependencies for a supervisor.

type Service

type Service interface {
	Run(context.Context) error
}

Service that can be managed by a supervisor.

func NewService

func NewService(name string, fn func(context.Context) error) Service

NewService creates a new service from a function.

type Status

type Status uint8

Status models the current status of a service.

const (
	// StatusIdle is when a service is waiting to be scheduled by the OS scheduler.
	StatusIdle Status = iota
	// StatusRunning is when a service is running and everything is a-OK.
	StatusRunning
	// StatusStopped is when a service has stopped without without an error.
	StatusStopped
	// StatusError is when a service has stopped with an error.
	StatusError
	// StatusPanic is when a service has stopped with a runtime panic.
	StatusPanic
)

func (Status) IsAlive

func (s Status) IsAlive() bool

IsAlive returns true for statuses indicating that the service is currently alive.

func (Status) String

func (i Status) String() string

type StatusUpdate

type StatusUpdate struct {
	ServiceID   int       `json:"serviceId"`
	ServiceName string    `json:"serviceName"`
	Time        time.Time `json:"time"`
	Status      Status    `json:"status"`
	Err         error     `json:"-"`
}

StatusUpdate represents an update to a supervised service.

type Supervisor

type Supervisor struct {
	// contains filtered or unexported fields
}
Example
package main

import (
	"context"
	"fmt"
	"time"

	"go.einride.tech/supervisor"
)

func main() {
	// Restart stopped services every 10ms.
	cfg := supervisor.Config{
		RestartInterval: 10 * time.Millisecond,
		// No specified clock returns system clock
		// No specified logger returns a nop-logger
	}
	// Create a context which can be canceled.
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	// Create pingpong table
	table := make(chan int)
	roundsToPlay := 2
	// Create player services.
	pingService := supervisor.NewService("ping", func(ctx context.Context) error {
		i := roundsToPlay
		for {
			select {
			case <-ctx.Done():
				return fmt.Errorf("timeout")
			case table <- i:
				fmt.Println("ping")
				i = <-table
				if i == 0 {
					close(table)
					cancel()
					return nil
				}
			}
		}
	})
	pongService := supervisor.NewService("pong", func(ctx context.Context) error {
		for {
			select {
			case <-ctx.Done():
				return fmt.Errorf("timeout")
			case i := <-table:
				if i == 0 {
					return nil
				}
				table <- i - 1
				fmt.Println("pong")
			}
		}
	})
	// Add service to the supervised services.
	cfg.Services = append(cfg.Services, pingService, pongService)
	// Create the supervisor from the config.
	s := supervisor.New(&cfg)
	// Run the supervisor (blocking call).
	err := s.Run(ctx)
	if err != nil {
		// handle error
		panic(err)
	}
	defer cancel()
}
Output:

ping
pong
ping
pong

func New

func New(cfg *Config) *Supervisor

New creates a new supervisor from a config.

Example
package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/go-logr/stdr"
	"go.einride.tech/supervisor"
)

func main() {
	// Restart stopped services every 10ms.
	cfg := supervisor.Config{
		RestartInterval: 10 * time.Millisecond,
		Logger:          stdr.New(log.New(os.Stderr, "", log.LstdFlags)),
	}
	// Create a context that can be canceled inside the service.
	ctx, cancel := context.WithCancel(context.Background())
	starts := 0
	svc := supervisor.NewService("example", func(ctx context.Context) error {
		if starts == 3 {
			cancel()
			return nil
		}
		starts++
		return fmt.Errorf("oops")
	})
	// Add service to set of supervised services.
	cfg.Services = append(cfg.Services, svc)
	// Create supervisor from config.
	s := supervisor.New(&cfg)
	// Run supervisor (blocking).
	_ = s.Run(ctx) // no error currently reported
	fmt.Println("service restarted", starts, "times")
}
Output:

service restarted 3 times

func (*Supervisor) Run added in v0.11.0

func (s *Supervisor) Run(ctx context.Context) error

Run the supervisor and all its services.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL