scheduler

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 20, 2023 License: BSD-3-Clause Imports: 5 Imported by: 0

README

Scheduler

Current component is just a wrapper around github.com/hibiken/asynq. Atm supports Redis only

Examples

Here we show some basic code snippets - creating and using scheduler sch, starting the server, composing the client, adding a job into queue, and finally actuall task worker implementation

Scheduler, later used into http handler/server -
...

	workersNum, err := strconv.Atoi(os.Getenv("SCHEDULER_WORKERS_NUM"))
	if err != nil {
		log.Fatal("cant process default workers number in pool")
	}

	// scheduler
	sch := scheduler.New(
		scheduler.Config{
			RedisURL:   os.Getenv("REDIS_URL"),
			WorkersNum: workersNum,
		},
	)

...
Server

Here we are creating the sch and strarting server workers, defining per each needed variables, worker pool nums etc.

...

	workersNum, err := strconv.Atoi(os.Getenv("SCHEDULER_WORKERS_NUM"))
	if err != nil {
		log.Fatal("cant process default workers number in pool")
	}

	// scheduler
	sch := scheduler.New(
		scheduler.Config{
			RedisURL:   os.Getenv("REDIS_URL"),
			WorkersNum: workersNum,
		},
	)

	err = sch.StartServer(map[string]asynq.Handler{
		tasks.TypeRSSParse: tasks.RSSParseTask{},
	})
	if err != nil {
    ...
...


Client - Handler Add Job into queue -

Here we reuse injected scheduler sch into server instance, to add a job

...

func (serv *QueueServer) Add(resp http.ResponseWriter, req *http.Request) {

    ...

	err = serv.scheduler.Add(scheduler.Task{
		//QueuePriority: scheduler.QueueCritical,
		TaskType: tasks.TypeRSSParse,
		Data: map[string]interface{}{
			"rss_uris": reqData.RssURIs,
		},
		MaxRetries: 3,
		// if not defined, will be now
		// if we want after 5 secs, or in fully defined date/time in the future
		//ProcessAt:  time.Now().Add(5 * time.Second),
	})
	if err != nil {
		serv.logger.WithError(err).Error("error scheduling task: loadsController.TypeLoadsCsvExport")

		httputil.RenderErr(resp, "cant add task to queue", http.StatusInternalServerError)
		return
	}

    ...
}
Sample task implentation

Just a snippet, showing task type definition, task struct with needed fields/if any/ and the actuall method, which MUST satisfy the asynq.Handler interface.

...


const (
	// TypeRSSParse -- rss parse task type
	TypeRSSParse = "rss:parse"
)

type (
	// RSSParseTask -- implements asynq.Handler interface
	RSSParseTask struct {
		// whatever fields needed here ...
	}
)

// ProcessTask -- exec method, implementing asynq.Handler interface
func (task RSSParseTask) ProcessTask(ctx context.Context, t *asynq.Task) error {

    ... 

	var data struct {
		RssURIs []string `json:"rss_uris"`
	}

	err := json.Unmarshal(t.Payload(), &data)
	if err != nil {
		logger.WithError(err).Error("json.Unmarshal failed")

		return err
	}

	// do whatever needed
    ...

	return nil
}

Documentation

Index

Constants

View Source
const (
	QueueRealtime = "realtime"
	QueueCritical = "critical"
	QueueDefault  = "default"
	QueueLow      = "low"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	RedisURL   string
	WorkersNum int
}

Config -- general scheduler config

type Scheduler

type Scheduler struct {
	Config Config
}

Scheduler -- main scheduler struct definition

func New

func New(config Config) *Scheduler

New -- scheduler construct here

func (*Scheduler) Add

func (s *Scheduler) Add(task Task) error

Add -- creates asynq task and adds it into the queue

func (*Scheduler) GetRedisClientOpts

func (s *Scheduler) GetRedisClientOpts() (asynq.RedisClientOpt, error)

func (*Scheduler) StartServer

func (s *Scheduler) StartServer(queueHandlers map[string]asynq.Handler) error

StartServer -- method, used to start the scheduler server

type Task

type Task struct {
	QueuePriority string
	TaskType      string
	Data          map[string]interface{}
	MaxRetries    int
	ProcessAt     time.Time
}

Task -- general task struct

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL