htask

package module
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2018 License: MIT Imports: 4 Imported by: 0

README

htask (min Heap TASK scheduler)

CircleCI

htask.png

High Scalable In-memory task scheduler using Min Heap implemented in Golang.

htask creates only 1 (scheduler) + n (worker) goroutines, NOT creating goroutines for each task.

if workers size == 0 then scheduler create goroutine for each task when timer have expired.

github.com/kawasin73/htask/cron is wrapper of htask.Scheduler, cron implementation with human friendly interface.

Japanese blog -> Goでスケーラブルなスケジューラを書いた

Install

go get github.com/kawasin73/htask

Cron Usage

package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/kawasin73/htask/cron"
)

func main() {
	var wg sync.WaitGroup
	workers := 1
	c := cron.NewCron(&wg, cron.Option{
		Workers: workers,
	})

	task := func() {
		fmt.Println("hello world")
	}

	// executed every 10:11 AM.
	c.Every(1).Day().At(10, 11).Run(task)

	// task will be executed in every 1 minute from now.
	c.Every(1).Minute().Run(task)

	tenSecondsLater := time.Now().Add(10 * time.Second)
	// executed in every 2 seconds started from 10 seconds later.
	cancel, err := c.Every(2).Second().From(tenSecondsLater).Run(task)
	if err != nil {
		// handle error
	}

	// cron can schedule one time task.
	c.Once(tenSecondsLater.Add(time.Minute)).Run(func() {
		// task can be cancelled.
		cancel()
	})

	c.ChangeWorkers(0)

	time.Sleep(3 * time.Second)

	// on shutdown all queued task will be discarded.
	c.Close()
	wg.Wait()
}

Scheduler Usage

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/kawasin73/htask"
)

func main() {
	var wg sync.WaitGroup
	workers := 1
	scheduler := htask.NewScheduler(&wg, workers)

	ctx, _ := context.WithCancel(context.Background())
	scheduler.Set(ctx.Done(), time.Now().Add(time.Second*2), func(t time.Time) {
		fmt.Println("later executed at :", t)
	})
	scheduler.Set(ctx.Done(), time.Now().Add(time.Second), func(t time.Time) {
		fmt.Println("first executed at :", t)
		// it can set to scheduler while executing task.
		scheduler.Set(ctx.Done(), time.Now().Add(time.Millisecond*500), func(t time.Time) {
			fmt.Println("second executed at :", t)
		})
	})

	scheduler.ChangeWorkers(10)

	time.Sleep(3 * time.Second)

	// on shutdown
	scheduler.Close()
	wg.Wait()
}

Interface

Scheduler Interface

  • func NewScheduler(wg *sync.WaitGroup, workers int) *Scheduler
  • func (s *Scheduler) Set(chCancel <-chan struct{}, t time.Time, task func(time.Time)) error
  • func (s *Scheduler) ChangeWorkers(workers int) error
  • func (s *Scheduler) Close() error

Notes

  • min heap have no limit size.
  • when scheduler is closed, all pending tasks will be discarded.

Benchmarking

# using benchstat
go get -u golang.org/x/perf/cmd/benchstat

# benchmark Scheduler.Set()
go test -bench=. -count=10 > bench.txt && benchstat bench.txt

# benchmark latency
go run cmd/latency/main.go -interval=1000000 -n 10000 -worker=0
benchmark result
$ go run cmd/latency/main.go -interval=1000 -n 1000000 -worker=0
set 1000000 tasks in 2.995069909s. interval = 1µs, total=1s, workers=0
all task have executed in 1.187109126s.
task executed latency : mean=89.401961ms, min=825.441µs, max=187.529808ms
executed min index=0, max index=995060

$ go run cmd/latency/main.go -interval=100000 -n 100000 -worker=0
set 100000 tasks in 292.649906ms. interval = 100µs, total=10s, workers=0
all task have executed in 9.99992153s.
task executed latency : mean=27.207µs, min=12.938µs, max=1.195354ms
executed min index=74683, max index=0

$ go run cmd/latency/main.go -interval=1000000 -n 10000 -worker=0
set 10000 tasks in 28.64034ms. interval = 1ms, total=10s, workers=0
all task have executed in 9.999248894s.
task executed latency : mean=270.313µs, min=30.631µs, max=5.821627ms
executed min index=9118, max index=6563

LICENSE

MIT

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrClosed         = errors.New("scheduler is already closed")
	ErrInvalidWorkers = errors.New("workers must be more than 0")
	ErrInvalidTime    = errors.New("time is invalid zero time")
	ErrInvalidTask    = errors.New("task must not be nil")
	ErrTaskCancelled  = errors.New("task cancelled")
)

errors

View Source
var (
	ErrMax = errors.New("heap max size")
)

errors

Functions

This section is empty.

Types

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler is used to schedule tasks.

func NewScheduler

func NewScheduler(wg *sync.WaitGroup, workers int) *Scheduler

NewScheduler creates Scheduler and start scheduler and workers. number of created goroutines is counted to sync.WaitGroup.

func (*Scheduler) ChangeWorkers

func (c *Scheduler) ChangeWorkers(workers int) error

ChangeWorkers will change workers size. workers must greater than 0. if new size is smaller, shut appropriate number of workers down. if new size is bigger, create appropriate number of workers.

func (*Scheduler) Close

func (c *Scheduler) Close() error

Close shutdown scheduler and workers goroutine. if Scheduler is already closed then returns ErrClosed.

func (*Scheduler) Set

func (c *Scheduler) Set(chCancel <-chan struct{}, t time.Time, task func(time.Time)) error

Set enqueue new task to scheduler heap queue. task will be cancelled by closing chCancel. chCancel == nil is acceptable.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL