execution_scheduler

package module
v0.0.0-...-97d40ad Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 6, 2023 License: MIT Imports: 6 Imported by: 0

README

Execution Scheduler

API Reference

This project allow us to manage control concurrent executions in an easy way.

The easiest way to describe it, is by examples, but you can skip to Options in case you already know how it works and just want to see quickly how to configure advanced behaviour.

Examples

At examples/ we have many examples described and ordered below to ilustrate all features of this module.

In order to run them locally, you just need Docker installed, run make build to setup development docker image and then you are ready to execute any example by its number by running make example-NUMBER, e.g. make example-01.

Also you can run the fully featured examples/main.go that uses all features at once with make example.

Simplest case, also available at examples/01-simple.go, call function handler 3 times in parallel.

package main

import (
  "fmt"
  "sync"
  "time"

  s "github.com/dalthon/execution_scheduler"
)

var count int = 0

func main() {
  var waitGroup sync.WaitGroup

  // Initialize scheduler
  scheduler := s.NewScheduler(&s.Options[interface{}]{}, nil, &waitGroup)

  // Schedule some executions
  scheduler.Schedule(handler, errorHandler, s.Parallel, 0)
  scheduler.Schedule(handler, errorHandler, s.Parallel, 0)
  scheduler.Schedule(handler, errorHandler, s.Parallel, 0)

  // Start running scheduler
  scheduler.Run()

  // Wait for scheduler to finish
  waitGroup.Wait()
}

func handler(_ interface{}) error {
  time.Sleep(1 * time.Second)
  count++
  fmt.Printf("message #%d!\n", count)
  return nil
}

func errorHandler(_ interface{}, err error) error {
  return nil
}

The same, but with non blocking serial execution, just use s.Serial instead of s.Parallel as it is done at examples/02-serial.go.

  // Schedule some executions
  scheduler.Schedule(handler, errorHandler, s.Serial, 0)
  scheduler.Schedule(handler, errorHandler, s.Serial, 0)
  scheduler.Schedule(handler, errorHandler, s.Serial, 0)

In case you are wondering what is the last argument of scheduler.Schedule, it is priority, see how it is used at examples/03-priority.go.

Highest number, highest priority, so bigger runs first.

  // Schedule some executions
  scheduler.Schedule(handler(1), errorHandler, s.Serial, 1)
  scheduler.Schedule(handler(2), errorHandler, s.Serial, 2)
  scheduler.Schedule(handler(3), errorHandler, s.Serial, 3)

  // ...
  func handler(value int) func(interface{}) error {
    return func(_ interface{}) error {
      time.Sleep(1 * time.Second)
      fmt.Printf("message #%d!\n", value)
      return nil
    }
  }
  }

With parallel and serial scheduled at the same time, all parallels runs and serials wait for other serials as you can see at examples/04-parallel_and_serials.go.

  // Schedule some executions
  scheduler.Schedule(handler(1), errorHandler, s.Serial, 1)
  scheduler.Schedule(handler(2), errorHandler, s.Serial, 2)
  scheduler.Schedule(handler(3), errorHandler, s.Serial, 3)
  scheduler.Schedule(handler(4), errorHandler, s.Parallel, 1)
  scheduler.Schedule(handler(5), errorHandler, s.Parallel, 2)
  scheduler.Schedule(handler(6), errorHandler, s.Parallel, 3)

  // outputs:
  // message #6!
  // message #4!
  // message #3!
  // message #5!
  // message #2!
  // message #1!

In case you want an execution that runs one after another and don't allow other parallels at the same time, you can use Critical like is done at examples/05-critical.go.

  // Schedule some executions
  scheduler.Schedule(handler(0), errorHandler, s.Critical, 4)
  scheduler.Schedule(handler(1), errorHandler, s.Serial, 1)
  scheduler.Schedule(handler(2), errorHandler, s.Serial, 2)
  scheduler.Schedule(handler(3), errorHandler, s.Serial, 3)
  scheduler.Schedule(handler(4), errorHandler, s.Parallel, 1)
  scheduler.Schedule(handler(5), errorHandler, s.Parallel, 2)
  scheduler.Schedule(handler(6), errorHandler, s.Parallel, 3)

  // outputs:
  // message #0!
  // message #6!
  // message #5!
  // message #4!
  // message #3!
  // message #2!
  // message #1!

It may look weird that there is always a second handler named errorHandler in those examples.

It is exactly what you suspected! In case first handler errors or even panics as you can see at examples/06-error.go.

  func handler(value int) func(interface{}) error {
    return func(_ interface{}) error {
      time.Sleep(1 * time.Second)
      fmt.Printf("message #%d!\n", value)

      if (value % 2) == 0 {
        return errors.New("Even error!")
      }

      panic("Odd panics!")
    }
  }

  func errorHandler(_ interface{}, err error) error {
    fmt.Printf("recovered error: \"%v\"\n", err)
    time.Sleep(1 * time.Second)
    return nil
  }

  // outputs:
  // message #0!
  // recovered error: "Even error!"
  // message #4!
  // message #5!
  // recovered error: "Even error!"
  // recovered error: "Execution panicked with: Odd panics!"
  // message #6!
  // message #3!
  // recovered error: "Even error!"
  // recovered error: "Execution panicked with: Odd panics!"
  // message #2!
  // recovered error: "Even error!"
  // message #1!
  // recovered error: "Execution panicked with: Odd panics!"

A sharp pair of eyes maybe could be wondering why we have in all those examples so far this interface{} being passed as generic on options and handlers.

Also there is this second param of s.NewScheduler that is always nil...

So both are related!

Second argument of s.NewScheduler is a context object that is passed to scheduler so it can be shared across many executions.

This is also the reason why Scheduler has generics, in case you have noticed.

One example of use of this context is given at examples/07-context.go as you can see below:

  type CounterCtx struct {
    count uint
  }

    // Initialize scheduler
    scheduler := s.NewScheduler(&s.Options[*CounterCtx]{}, &CounterCtx{count: 0}, &waitGroup)

  func handler(value int) func(*CounterCtx) error {
    return func(counter *CounterCtx) error {
      time.Sleep(1 * time.Second)
      fmt.Printf("message #%d! so far we had %d other messages!\n", value, counter.count)
      counter.count++
      return nil
    }
  }

  func errorHandler(_ *CounterCtx, err error) error {
    return nil
  }

  // outputs:
  // message #0! so far we had 0 other messages!
  // message #5! so far we had 1 other messages!
  // message #3! so far we had 1 other messages!
  // message #6! so far we had 2 other messages!
  // message #4! so far we had 1 other messages!
  // message #2! so far we had 5 other messages!
  // message #1! so far we had 6 other messages!

By the output of examples/07-context.go you can also notice that given context is shared in a not thread-safe way. So you need to make sure that its interface is thread safe taking care by yourself of mutexes or whatever you may need to ensure that.

Going back to initialization of scheduler, you may have noticed that there are Options.

They could be used to define some timeouts and callbacks described at options section.

That's it! Now you know everything about this module!

Options

Options struct is given by:

type Options[C any] struct {
  ExecutionTimeout time.Duration
  InactivityDelay  time.Duration
  OnPrepare        func(scheduler *Scheduler[C]) error
  OnClosing        func(scheduler *Scheduler[C]) error
  OnError          func(scheduler *Scheduler[C]) error
  OnLeaveError     func(scheduler *Scheduler[C]) error
  OnInactive       func(scheduler *Scheduler[C]) error
  OnLeaveInactive  func(scheduler *Scheduler[C]) error
  OnCrash          func(scheduler *Scheduler[C])
  OnClose          func(scheduler *Scheduler[C])
}
  • ExecutionTimeout: Amount of time that an execution can wait before it times out. If it times out, errorHandler is called with TimeoutError. If it is time.Duration(0) (default), it never times out.
  • InactivityDelay: Amount of time that scheduler will wait without running executions before closing. If it is time.Duration(0) (default), it closes imediately once there is no more executions pending.
  • OnPrepare: Callback that will be called during scheduler Pending state which is the one that occurs once scheduler starts to run or once it is recovered from Error state. If returns an error, it crashes and then closes.
  • OnError: Once an execution error handler returns an error it goes to Error state. If more that one parallel execution retuns an error, this callback is called only once, so it means that it is called only once it enters Error state.
  • OnLeaveError: Scheduler tries to leave Error state when there are no more executions running. Then it calls this callback to try to recover from failure. If it returns an error it goes to Crashed state. Otherwise it goes back to initial Pending state.
  • OnInactive: Once all executions finish running successfully, scheduler goes from Active state to Inactive state. If it returns an error, it goes to Crashed state. Otherwise it stays on Inactive state waiting for new executions to go back to Active state, or goes to Closing state if inactivity times out.
  • OnLeaveInactive: From Inactive state, this callback runs after OnInactive and inactivity times out. If it returns an error it goes to Crashed state. Otherwise it goes back to initial Closing state.
  • OnClosing: Runs once we reach Closing state. If it returns an error it goes to Crashed state. Otherwise if there are scheduled executions it goes back to Active state. Otherwise it goes to Closed state.
  • OnCrash: It runs once scheduler enters Crashed state.
  • OnClose: It runs once scheduler enters Closed state.

So, scheduler has a state machine where all those timeouts and callbacks are hooked. Those are described in the next section.

Scheduler State Machine

Scheduler state machine

There are 8 states. That's right, I know how to count. There are only 7 states drawn at the diagram because the 8th state is Shutdown state that almost all states can go to once we trigger graceful shutdown with scheduler.Shutdown method.

All states are represented by the given status and descriptions:

  • Pending: Preparation state. Nothing runs there, it waits for OnPrepare to setup scheduler.
  • Active: Executions runs on this state.
  • Inactive: Once no execution is running, it goes to that state.
  • Closing: After inactivity times out, it goes to this state. In this state, every new execution gets scheduled and waits to run once scheduler goes back to Active.
  • Error: In this state, every new execution gets scheduled and waits to run once scheduler if it goes back to Active after being recovered.
  • Crashed: In this state, something bad enough happened and can't be recovered. It force closes all pending executions triggering an TimeoutError. It stays in this section until there are no more running or scheduled executions and then goes to Closed state.
  • Shutdown: Similar to Crashed state, but enters in this once we forcefully shutdown scheduler.
  • Closed: Single and only final state once nothing is running or pending and nothing new can start. Once it reaches this state, wait group is released.

Contributing

Pull requests and issues are welcome! I'll try to review them as soon as I can.

This project is quite simple and its Makefile is quite useful to do whatever you may need. Run make help for more info.

To run tests, run make test.

To run test with coverage, run make cover.

To run a full featured example available at examples/main.go, run make example.

License

This project is released under the MIT License

Documentation

Overview

This Execution Scheduler package can be used to handle complex concurrency executions in an easy fashion.

The main entity is Scheduler, which could be instantiated by NewScheduler.

Once we have a scheduler, we need to make it run with Scheduler.Run, we can asyncronously schedule executions with Scheduler.Schedule and force it to shutdown with Scheduler.Shutdown.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Execution

type Execution[C any] struct {
	Status ExecutionStatus
	// contains filtered or unexported fields
}

type ExecutionKind

type ExecutionKind uint64
const (
	Parallel ExecutionKind = iota
	Serial
	Critical
)

type ExecutionStatus

type ExecutionStatus uint64
const (
	ExecutionScheduled ExecutionStatus = iota
	ExecutionRunning
	ExecutionExpired
	ExecutionFinished
)

type Options

type Options[C any] struct {
	ExecutionTimeout time.Duration
	InactivityDelay  time.Duration
	OnPrepare        func(scheduler *Scheduler[C]) error
	OnClosing        func(scheduler *Scheduler[C]) error
	OnError          func(scheduler *Scheduler[C]) error
	OnLeaveError     func(scheduler *Scheduler[C]) error
	OnInactive       func(scheduler *Scheduler[C]) error
	OnLeaveInactive  func(scheduler *Scheduler[C]) error
	OnCrash          func(scheduler *Scheduler[C])
	OnClose          func(scheduler *Scheduler[C])
}

type PanicError

type PanicError struct {
	// contains filtered or unexported fields
}

func (*PanicError) Error

func (err *PanicError) Error() string

type Scheduler

type Scheduler[C any] struct {
	Status  Status
	Err     error
	Context C
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler[C any](options *Options[C], context C, waitGroup *sync.WaitGroup) *Scheduler[C]

func (*Scheduler[C]) Run

func (scheduler *Scheduler[C]) Run()

func (*Scheduler[C]) Schedule

func (scheduler *Scheduler[C]) Schedule(handler func(C) error, errorHandler func(C, error) error, kind ExecutionKind, priority int) *Execution[C]

func (*Scheduler[C]) Shutdown

func (scheduler *Scheduler[C]) Shutdown()

type SchedulerCrashedError

type SchedulerCrashedError struct {
}

func (*SchedulerCrashedError) Error

func (err *SchedulerCrashedError) Error() string

type SchedulerNotRecovered

type SchedulerNotRecovered struct {
}

func (*SchedulerNotRecovered) Error

func (err *SchedulerNotRecovered) Error() string

type ShutdownError

type ShutdownError struct {
}

func (*ShutdownError) Error

func (err *ShutdownError) Error() string

type Status

type Status uint64
const (
	PendingStatus Status = iota
	ActiveStatus
	InactiveStatus
	ClosingStatus
	ClosedStatus
	ErrorStatus
	CrashedStatus
	ShutdownStatus
)

type TimeoutError

type TimeoutError struct {
}

func (*TimeoutError) Error

func (err *TimeoutError) Error() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL