workerctl

package module
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2022 License: MIT Imports: 8 Imported by: 1

README

workerctl: worker controller for graceful shutdown

Go Reference

Package workerctl is controller of application's initialization and its graceful shutdown.

Usage

package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/daichitakahashi/workerctl"
)

func main() {
	ctx := context.Background()
	a := &workerctl.Aborter{}
	ctl, shutdown := workerctl.New()

	a.AbortOnError(func() (err error) {
		// 1. Init shared resource and bind to this Controller.
		// logOutput is used in both OneShotTaskRunner and Server.
		logOutput, err := os.CreateTemp("", "*log")
		if err != nil {
			return err
		}
		ctl.Bind(logOutput)

		// 2. Start OneShotTaskRunner.
		oneShot := &OneShotTaskRunner{
			Writer: logOutput,
		}
		err = ctl.Launch(ctx, oneShot)
		if err != nil {
			return err
		}

		{
			// 3. Create Controller depends on ctl and start Server.
			ctl := ctl.Dependent()

			server := &Server{
				OneShot: oneShot,
				Writer:  logOutput,
			}
			err := ctl.Launch(ctx, server)
			if err != nil {
				return err
			}
		}
		return nil
	}())

	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT)

	// Once SIGINT received or "/abort" requested, Controller starts shutdown.
	// First, Server's graceful shutdown finished(3).
	// Second, OneShotTaskRunner shutdown after all running task finished(2).
	// Finally, logOutput will be closed(1).
	select {
	case <-a.Aborted():
		log.Println("aborted")
	case s := <-quit:
		log.Println("signal received:", s)
	}

	ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
	defer cancel()
	if err := shutdown(ctx); err != nil {
		log.Fatal("Server forced to shutdown:", err)
	}
}

Documentation

Overview

Package workerctl controls initialization and shutdown of workers that consists of application. It aims to describe dependencies of them, and shutdown them in right order.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Abort

func Abort(ctx context.Context)

Abort invoke Aborter.Abort set by using WithAbort. Workers can signal abort if they know Controller's context.

func New

func New() (Controller, ShutdownFunc)

New returns a new Controller which can be shut down by calling ShutdownFunc.

func PanicSafe

func PanicSafe(fn func() error) (err error)

PanicSafe calls fn and captures panic occurred in fn. When panic is captured, it returns RecoveredError embracing recovered value. If fn returns error, PanicSafe also returns it.

func WithAbort

func WithAbort(ctx context.Context, a *Aborter) context.Context

WithAbort set Aborter to new Context based on ctx. Call Abort with returned new Context, set Aborter's Abort is called.

Types

type Aborter

type Aborter struct {
	// contains filtered or unexported fields
}

Aborter propagates abort of application. Intended for hooking shutdown from any worker. This is only state holder and propagator, thus actual shutdown must be caused by user.

func (*Aborter) Abort

func (a *Aborter) Abort()

Abort tells an application to shut down. This may be called by multiple goroutines simultaneously. After the first call, subsequent calls do nothing.

func (*Aborter) AbortOnError

func (a *Aborter) AbortOnError(err error)

AbortOnError tells an application to shut down with cause. Cause error can be got from Err. If err==nil, AbortOnError does nothing.

func (*Aborter) Aborted

func (a *Aborter) Aborted() <-chan struct{}

Aborted returns a channel that's closed when Aborter.Abort is called.

func (*Aborter) Err added in v0.0.3

func (a *Aborter) Err() error

Err returns error set in AbortOnError.

type Closer

type Closer []io.Closer

Closer is a list of io.Closer. By use this, we can handle error on init functions that contain operations opening resource.

Example
c, err := func() (c io.Closer, err error) {
	var closer workerctl.Closer
	defer closer.CloseOnError(err) // close all opened resources if function returns error

	first, err := openFile("first")
	if err != nil {
		return nil, err
	}
	closer = append(closer, first)

	second, err := openFile("second")
	if err != nil {
		return nil, err
	}
	closer = append(closer, second)

	third, err := openFile("third")
	if err != nil {
		return nil, err
	}
	closer = append(closer, third)

	return closer, nil
}()
if err != nil {
	log.Fatal(err)
}

c.Close()
Output:

close third
close second
close first

func (Closer) Close

func (c Closer) Close() (err error)

Close closes all appended resources in a reverse order.

func (Closer) CloseOnError added in v0.0.3

func (c Closer) CloseOnError(err error) error

CloseOnError closes all appended resources in a reverse order, only when err!=nil.

type Controller

type Controller interface {
	// Dependent creates new Controller depends on parent.
	// After all derived Controller's shutdown completed, parent Controller's shutdown will start.
	Dependent() Controller

	// Launch registers WorkerLauncher to this Controller and call it.
	// Return error when LaunchWorker cause error.
	// Failure of Launch makes no side effect on Controller's state.
	Launch(ctx context.Context, l WorkerLauncher) error

	// Bind resources to Controller.
	// After completion of controller's shutdown, resources will be closed.
	Bind(rcs ...io.Closer)
}

Controller is core interface of workerctl package. Controller can launch workers and also create dependent Controller. It enables to describe dependencies between workers. In shutdown, the derived Controller shuts down first, and then the parent Controller starts shutting down. The launched workers are associated with the Controller, and start shutdown when the associated Controller is in the shutdown phase. By binding resources that implement io.Closer, we can close them at the end of the Controller shutdown.

Example
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/daichitakahashi/workerctl"
)

type dummyCloser func() error

func (d dummyCloser) Close() error {
	return d()
}

func main() {
	ctx := context.Background()
	ctl, shutdown := workerctl.New()

	ctl.Bind(dummyCloser(func() error {
		fmt.Println("close resource 1")
		return nil
	}))

	ctl.Bind(dummyCloser(func() error {
		fmt.Println("close resource 2")
		return nil
	}))

	_ = ctl.Launch(ctx, workerctl.Func(func(ctx context.Context) (func(ctx context.Context), error) {
		fmt.Println("launch worker 1")
		return func(ctx context.Context) {
			fmt.Println("stop worker 1")
		}, nil
	}))

	_ = ctl.Launch(ctx, workerctl.Func(func(ctx context.Context) (func(ctx context.Context), error) {
		fmt.Println("launch worker 2")
		return func(ctx context.Context) {
			time.Sleep(time.Millisecond * 20)
			fmt.Println("stop worker 2")
		}, nil
	}))

	{
		ctl := ctl.Dependent()

		ctl.Bind(dummyCloser(func() error {
			fmt.Println("close resource 3")
			return nil
		}))

		_ = ctl.Launch(ctx, workerctl.Func(func(ctx context.Context) (func(ctx context.Context), error) {
			fmt.Println("launch worker 3")
			return func(ctx context.Context) {
				time.Sleep(time.Millisecond * 20)
				fmt.Println("stop worker 3")
			}, nil
		}))

		_ = ctl.Launch(ctx, workerctl.Func(func(ctx context.Context) (func(ctx context.Context), error) {
			fmt.Println("launch worker 4")
			return func(ctx context.Context) {
				fmt.Println("stop worker 4")
			}, nil
		}))
	}

	{
		ctl := ctl.Dependent()

		_ = ctl.Launch(ctx, workerctl.Func(func(ctx context.Context) (func(ctx context.Context), error) {
			fmt.Println("launch worker 5")
			return func(ctx context.Context) {
				time.Sleep(time.Millisecond * 10)
				fmt.Println("stop worker 5")
			}, nil
		}))
	}

	err := shutdown(context.Background())
	if err != nil {
		log.Fatal(err)
	}

}
Output:

launch worker 1
launch worker 2
launch worker 3
launch worker 4
launch worker 5
stop worker 4
stop worker 5
stop worker 3
close resource 3
stop worker 1
stop worker 2
close resource 2
close resource 1

type Func added in v0.0.3

type Func func(ctx context.Context) (stop func(ctx context.Context), err error)

Func is an easy way to define WorkerLauncher. Func(f) is a WorkerLauncher that calls f when passed to Controller.Launch.

func (Func) LaunchWorker added in v0.0.3

func (f Func) LaunchWorker(ctx context.Context) (stop func(ctx context.Context), err error)

LaunchWorker calls f(ctx).

type JobRunner

type JobRunner struct {

	// If PanicHandler is set, panic will be recovered and passed as v.
	// If not set, JobRunner doesn't recover.
	PanicHandler func(v interface{})
	// contains filtered or unexported fields
}

JobRunner is one shot job executor. Execute job using Run or Go, and Wait for all running jobs are finished. This enables us to perform shutdown of background job gracefully, which executed outside request-response scope.

func (*JobRunner) Go added in v0.0.2

func (r *JobRunner) Go(ctx context.Context, fn func(context.Context))

Go spawns goroutine and executes job.

func (*JobRunner) Run

func (r *JobRunner) Run(ctx context.Context, fn func(context.Context))

Run executes job.

func (*JobRunner) Wait

func (r *JobRunner) Wait(ctx context.Context) error

Wait for all running job finished. If ctx is cancelled or timed out, waiting is also cancelled.

type RecoveredError

type RecoveredError struct {
	Recovered interface{}
}

RecoveredError represents fatal error that embraces recovered value.

func (*RecoveredError) Error

func (e *RecoveredError) Error() string

type ShutdownFunc

type ShutdownFunc func(ctx context.Context) error

ShutdownFunc is a return value of New. It shuts down created Controller. Parameter ctx will be passed to shutdown functions of each worker, so we can set timeout using context.WithDeadline or context.WithTimeout. It is assumed to be used in combination with configuration of container shutdown. For example, `docker stop --timeout` or task definition parameter `stopTimeout` on AWS ECS.

type WorkerLauncher

type WorkerLauncher interface {
	LaunchWorker(ctx context.Context) (stop func(ctx context.Context), err error)
}

WorkerLauncher is responsible for initializing the worker and returning its shutdown function. If the worker fails to launch, it is expected to release all the resources that were initialized during the startup and then return error.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL