qron

package module
v0.0.0-...-c7ee058 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2016 License: MIT Imports: 17 Imported by: 0

README

Qron

Qron is a simple scheduler for message queues.

Install

Download the latest release binary.

Or use go tool. This will install the qron binary to your $GOPATH/bin:

$ go get github.com/mak73kur/qron/cmd/qron

Usage

Run qron (in a verbose mode):

$ qron -c /path/to/config.yml -v

Example config:

reader:
    type: inline
    tab: |
        * * * * * every minute

writer:
    type: log

There are two main sections:

  • reader which tells qron where should it look for a job schedule called qron tab.
  • writer that decides where to qron will publish messages.

Specific properties depend on the chosen reader and writer types.

Thanks to spf13/viper, config file supports different formats: json, toml, yaml.

If path argument is empty, qron will try ./qron.yml by default.

Qron tab

Each line is a new job.

Parameters should be separated by a single whitespace character.

First five are schedule parameters, sixth is a message body. Another optional parameter is tags, see below.

Message body can have whitespace or any other character, except newline.

┌───────────── min (0-59)
│ ┌────────────── hour (0-23)
│ │ ┌─────────────── day of month (1-31)
│ │ │ ┌──────────────── month (1-12)
│ │ │ │ ┌───────────────── day of week (0-6) (0 to 6 are Sunday to Saturday)
│ │ │ │ │
│ │ │ │ │
* * * * * message_body

Allowed parameter expressions:

  • asterisk(*) — anything passes.
  • comma(,) — 0 5,17 * * 1 — executes at 5AM and 5PM each Monday.
  • slash(/) — 0 */6 * * * — shortcut for 0,6,12,18.

Qron will skip any lines that are empty or start with the comment characters (# or //).

Message can be followed by tag options for this job - JSON object enclosed in back quotes e.g. `{"ttl":"1m","key":"qron2"}`.

Actual effect (if any) of these tags ensured by the writer implementation.

Qron tab readers

Tab can be loaded from one of the following sources.

Inline source

Store the tab directly in the same config.

reader:
    type: inline
    tab: |
        * * * * * every minute
        */2 * * * * every two minutes
File source

The file will be read once on program start. Sending SIGHUP will trigger file reread without a restart (this is also true for redis reader).

reader:
    type: file
    path: /tmp/qrontab
Redis source

Tab is stored as a single string value in Redis DB.

Separate program thread will call GET <key> every minute to update the tab on any changes.

Note: if the value gets cleared for some reason - qron will assume the tab is empty and will continue to run with nothing to publish.

reader:
    type: redis
    url: localhost:6379
    key: qrontab
    # optional, 0 by default
    db: 0
    # optional, redis auth password
    auth: secret
Writers

Every minute qron checks whether a job matches the current time. If it does — qron writer will publish the job message.

Log writer

For debug purposes. Writes message directly to the program output.

writer.type: log
AMQP writer

AMQP writes messages to RabbitMQ or any other protocol implementation.

writer:
    type: amqp
    url: amqp://localhost:5672
    exchange: ""
    key: qron

AMQP handles two tag options:

  • key - overrides routing key for this message.
  • ttl - message expiration, see rabbitmq docs; ttl value can be either number of seconds or string duration, such as 2h45m30s.
* * * * * every minute `{"ttl":"1m","key":"ticker"}`
Redis writer

Push messages into the Redis list. Consumer side can use BRPOP or BLPOP to receive them.

writer:
    type: redis
    url: localhost:6379
    key: qron
    # optional, 0 by default
    db: 1
    # optional, redis auth password
    auth: secret
    # optionaly use LPUSH, default is RPUSH
    lpush: true

Redis supports key tag to override list key for any given job.

* * * * * every minute `{"key":"ticker"}`
HTTP writer

Send messages as HTTP requests.

writer:
    type: http
    url: https://destination-site.com/consume
    method: POST
    headers:
        X-Auth: "token"
        Content-Type: "application/x-www-form-urlencoded"

HTT supports url, method and headers tags to override URL or add headers for any given job.

* * * * * every=minute `{ "headers": {"X-Auth":"token2"} }`

TODO

  • verbose mode
  • custom msg options
  • proper godoc
  • tests for the base package

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ParseTags

func ParseTags(src []byte) (map[string]interface{}, int, error)

Parse message options

func SetVerbose

func SetVerbose(lvl int)

Types

type AMQP

type AMQP struct {
	// AMQP brokers
	URL string
	// AMQP exchange
	Exchange string
	// Routing Key Tag
	RoutingKey string

	sync.Mutex
	// contains filtered or unexported fields
}

func NewAMQP

func NewAMQP(url, exchange, routingKey string) (*AMQP, error)

func (*AMQP) Close

func (q *AMQP) Close() error

func (*AMQP) Connect

func (q *AMQP) Connect() error

func (*AMQP) Write

func (q *AMQP) Write(msgBody []byte, tags map[string]interface{}) error

type FileReader

type FileReader struct {
	Path string
}

func (FileReader) Read

func (f FileReader) Read() ([]byte, error)

type HTTPWriter

type HTTPWriter struct {
	URL     string
	Method  string
	Headers map[string]string
}

func (HTTPWriter) Write

func (w HTTPWriter) Write(msgBody []byte, tags map[string]interface{}) error

type InlineReader

type InlineReader struct {
	Tab []byte
}

func (InlineReader) Read

func (l InlineReader) Read() ([]byte, error)

type Job

type Job struct {
	// Schedule parameters
	Exp []string
	// Message body
	Payload string
	// Additional tags
	Tags map[string]interface{}
}

func ParseJob

func ParseJob(src []byte) (Job, error)

Create a job entity from a tab line

func ParseTab

func ParseTab(src []byte) ([]Job, error)

Parse multi-line tab string

func (Job) Match

func (j Job) Match(now time.Time) bool

Check if job should be executed at this time

type LogWriter

type LogWriter struct{}

Log is an example writer implementation that writes messages to program output

func (LogWriter) Write

func (w LogWriter) Write(msg []byte, tags map[string]interface{}) error

type Reader

type Reader interface {
	Read() ([]byte, error)
}

type RedisReader

type RedisReader struct {
	Key string

	sync.Mutex
	// contains filtered or unexported fields
}

func NewRedisReader

func NewRedisReader(url, auth string, db int) (*RedisReader, error)

func (*RedisReader) Read

func (r *RedisReader) Read() ([]byte, error)

func (*RedisReader) Watch

func (r *RedisReader) Watch(ch chan<- []byte)

Implement Watcher interface

type RedisWriter

type RedisWriter struct {
	Key   string
	LPush bool
	// contains filtered or unexported fields
}

func NewRedisWriter

func NewRedisWriter(url, auth string, db int) (*RedisWriter, error)

func (*RedisWriter) Write

func (r *RedisWriter) Write(msg []byte, tags map[string]interface{}) error

type Schedule

type Schedule struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewSchedule

func NewSchedule(r Reader, w Writer) *Schedule

func (*Schedule) LoadAndWatch

func (sch *Schedule) LoadAndWatch() error

Loads current schedule and starts a new routine to reload schedule on any changes

func (*Schedule) Run

func (sch *Schedule) Run()

Runs the scheduler

func (*Schedule) SetTab

func (sch *Schedule) SetTab(new []Job)

func (*Schedule) Tab

func (sch *Schedule) Tab() []Job

type Watcher

type Watcher interface {
	Watch(chan<- []byte)
}

type Writer

type Writer interface {
	Write([]byte, map[string]interface{}) error
}

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL