sqsjkr

package module
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2021 License: MIT Imports: 25 Imported by: 0

README

Build Status

sqsjkr

SQS Job Kicker is the simple worker to invoke command from SQS message

Install

go get github.com/kayac/sqsjkr

Usage

package main
import (
    "context"
    "log"

    "github.com/kayac/sqsjkr"
)

func main() {
    // config
    conf := sqsjkr.NewConfig()
    conf.SetAWSAccount("aws_account_id", "aws_profile", "ap-northeast-1")
    conf.SetSQSQueue("your sqs queue name")
    conf.SetConcurrentNum(5)
    conf.SetTriggerCommand("echo trigger")

    // run sqsjkr
    ctx := context.Background()
    sjkr := sqsjkr.New(conf)
    if err := sqsjkr.Run(ctx, sjkr, "debug"); err != nil {
        log.Println("[error] ", err)
    }
}

Config

  • [account] section
params type description
id string AWS account id
profile string AWS profile name
region string AWS region
  • [sqs] section
params type description
queue_name string AWS SQS queue name
  • [kicker] section
params type description
max_concurrent_num integer number of jobs concurrency
life_time_trigger string trigger command when to pass the lifetime
stats_port integer port number of sqsjkr stats

You can load config by toml format file:

[account]
id = "12345678"
profile = "default"
region = "ap-northeast-1"

[sqs]
queue_name = "sqsjkr_queue"

[kicker]
max_concurrent_num = 5
life_time_trigger = "echo 'stdin!' | ./test/trigger_test.sh"
stats_port = 8061
conf := sqsjkr.LoadConfig("/path/to/toml")

Job definition

params type description
command string job command
env map environment variables
event_id string job event uniq name (for example, AWS CloudWatch Event Scheduler ID(Name)).
life_time integer or string integer is fixed by second unit. string format requires unit name such as 'm', 's', and so on (e.g. 1s, 1m, 1h).
lock_id string locks another job
abort_if_locked bool if job is locked by lock_id, new job give up without retry.
disable_life_time_trigger bool disable lifetime trigger even though a job is over the lifetime (default false).
  • example:
{
    "command": "echo 'hello sqsjkr!'",
    "env": {
        "PATH": "/usr/local/bin/:/usr/bin/:/sbin/:/bin",
        "MAILTO": "example@example.com"
    },
    "event_id": "cloudwatch_event_schedule_id_name",
    "lock_id": "lock_id",
    "life_time": "1m",
    "abort_if_locked": false
}
LifeTime

The job waits for life_time if the other job which is same lock_id is executing. So, if a job requires too many time to process and don't want to execute frequently in short term, job should be set the proper life_time.

Locker

SQS Job Kicker provides Locker interface which is like a feature of 'setlock' to avoid to execute same lock_id. sqsjkr package's sample uses DynamoDB as Locker backend. Show the following Locker interface:

type Locker interface {
	Lock(string, string) error
	Unlock(string) error
}

You can set your custom Locker by SetLocker(locker Locker):

mylocker := NewMyLocker() // Your Locker
sjkr.SetLocker(mylocker)

Throttler

sqsjkr provides Throttler interface which avoid to execute duplicated job. This repository includes the Throttler sample using DynamoDB. Show following Throttler interface:

type Locker interface {
	Set(string, string) error
	UnSet(string) error
}

You can set your custom Throttler by SetThrottler(th Throttler):

myThr := NewMyThrottler() // Your Throttler
sjkr.SetLocker(myThr)

Stats HTTP endpoint

sqsjkr runs a HTTP server on port 8061 to get stats of myself.

$ curl -s localhost:8061/stats/metrics/v2
{
  "workers": {
    "busy": 4,
    "idle": 6
  },
  "invocations": {
    "succeeded": 10,
    "failed": 2,
    "errored": 3
  }
}

LICENSE

MIT

Documentation

Index

Constants

View Source
const (
	DefaultMaxCocurrentNum = 20
	VisibilityTimeout      = 30
	WaitTimeSec            = 10
	MaxRetrieveMessageNum  = 10
	JobRetryInterval       = time.Second * 5
	ApplicationJSON        = "application/json"
	DefaultStatsPort       = 8061
)

Default Const

Variables

View Source
var (
	ErrOverLifeTime = errors.New("over life time")
)

TrapSignals list

Functions

func Run

func Run(ctx context.Context, sjkr SQSJkr, level string) error

Run SQSJkr daemon

func SpawnWorker

func SpawnWorker(sjkr SQSJkr, wid int, js <-chan Job, s *Stats)

SpawnWorker spawn worker

Types

type AccountSection

type AccountSection struct {
	Profile string `toml:"profile"`
	ID      string `toml:"id"`
	Region  string `toml:"region"`
}

AccountSection is aws account information

type Config

type Config struct {
	Account AccountSection `toml:"account"`
	Kicker  KickerSection  `toml:"kicker"`
	SQS     SQSSection     `toml:"sqs"`
}

Config is the sqsjkr config

func LoadConfig

func LoadConfig(path string) (*Config, error)

LoadConfig loads config file by config file path

func NewConfig

func NewConfig() *Config

NewConfig create sqsjkr config

func (*Config) SetAWSAccount

func (c *Config) SetAWSAccount(id, profile, region string)

SetAWSAccount set aws account

func (*Config) SetConcurrentNum

func (c *Config) SetConcurrentNum(num int)

SetConcurrentNum set number of concurrent to execute job

func (*Config) SetKickerConfig

func (c *Config) SetKickerConfig(num int, trigger string)

SetKickerConfig set kicker config

func (*Config) SetSQSQueue

func (c *Config) SetSQSQueue(qname string)

SetSQSQueue set sqs queue name

func (*Config) SetStatsPort

func (c *Config) SetStatsPort(port int) error

SetStatsPort set stats api port number

func (*Config) SetStatsSocket

func (c *Config) SetStatsSocket(sock string) error

SetStatsSocket set unix domain socket path

func (*Config) SetTriggerCommand

func (c *Config) SetTriggerCommand(trigger string)

SetTriggerCommand set trigger command

func (*Config) Validate

func (c *Config) Validate() error

Validate config validation

type DefaultJob

type DefaultJob struct {
	// contains filtered or unexported fields
}

DefaultJob is created by one SQS message's body

func (DefaultJob) Command

func (j DefaultJob) Command() string

Command return job's command

func (DefaultJob) EventID

func (j DefaultJob) EventID() string

EventID return event_id

func (*DefaultJob) Execute

func (j *DefaultJob) Execute(lkr lock.Locker) ([]byte, error)

Execute executes command

func (DefaultJob) JobID

func (j DefaultJob) JobID() string

JobID return job's id which is unique (sqs message id).

func (*DefaultJob) String added in v0.5.1

func (j *DefaultJob) String() string

type DefaultSQSJkr

type DefaultSQSJkr struct {
	SQS             *sqs.SQS
	RetentionPeriod time.Duration
	// contains filtered or unexported fields
}

DefaultSQSJkr default

func New

func New(c *Config) (*DefaultSQSJkr, error)

New DefaultSQSJkr

func (*DefaultSQSJkr) Config

func (sjkr *DefaultSQSJkr) Config() *Config

Config return SQSJkr config

func (*DefaultSQSJkr) JobStream

func (sjkr *DefaultSQSJkr) JobStream() chan Job

JobStream return Job chan.

func (*DefaultSQSJkr) Locker

func (sjkr *DefaultSQSJkr) Locker() lock.Locker

Locker return DefaultSQSJkr's Locker

func (*DefaultSQSJkr) Run

func (sjkr *DefaultSQSJkr) Run(ctx context.Context) error

Run sqsjkr daemon

func (*DefaultSQSJkr) SetLocker

func (sjkr *DefaultSQSJkr) SetLocker(l lock.Locker)

SetLocker set DefaultSQSJkr's Locker

func (*DefaultSQSJkr) SetThrottler

func (sjkr *DefaultSQSJkr) SetThrottler(t throttle.Throttler)

SetThrottler set DefaultSQSJkr's Throttler

func (*DefaultSQSJkr) Throttler

func (sjkr *DefaultSQSJkr) Throttler() throttle.Throttler

Throttler return DefaultSQSJkr's Throttler

type Duration

type Duration struct {
	time.Duration
}

Duration struct

func (*Duration) UnmarshalJSON

func (d *Duration) UnmarshalJSON(b []byte) (err error)

UnmarshalJSON Duration field to decode json

type Job

type Job interface {
	Execute(lock.Locker) ([]byte, error)
	JobID() string
	EventID() string
	Command() string
	String() string
}

Job is sqsjkr job struct

func NewJob

func NewJob(msg *sqs.Message, trigger string) (Job, error)

NewJob create job

type KickerSection

type KickerSection struct {
	MaxConcurrentNum int    `toml:"max_concurrent_num"`
	Trigger          string `toml:"life_time_trigger"`
	StatsPort        int    `toml:"stats_port"`
	StatsSocket      string `toml:"stats_socket"`
}

KickerSection is the config of command kicker

type LogLevel

type LogLevel int

LogLevel type

const (
	ErrorLevel LogLevel = iota
	WarnLevel
	InfoLevel
	DebugLevel
)

Log level const

type Logger

type Logger struct {
	Logger *log.Logger
	Level  LogLevel
}

Logger is sqsjkr logger struct

func NewLogger

func NewLogger() Logger

NewLogger returns Logger struct

func (Logger) Debugf

func (l Logger) Debugf(format string, args ...interface{})

Debugf output for debug

func (Logger) Errorf

func (l Logger) Errorf(format string, args ...interface{})

Errorf output error log

func (Logger) Infof

func (l Logger) Infof(format string, args ...interface{})

Infof output information log

func (*Logger) SetLevel

func (l *Logger) SetLevel(level string)

SetLevel set a logger level

func (Logger) Warnf

func (l Logger) Warnf(format string, args ...interface{})

Warnf output warning log

type MessageBody

type MessageBody struct {
	Command                string            `json:"command"`
	Environments           map[string]string `json:"envs"`
	EventID                string            `json:"event_id"`
	LifeTime               Duration          `json:"life_time"`
	LockID                 string            `json:"lock_id"`
	AbortIfLocked          bool              `json:"abort_if_locked"`
	DisableLifeTimeTrigger bool              `json:"disable_life_time_trigger"`
}

MessageBody for decoding json

func (MessageBody) String added in v0.3.2

func (m MessageBody) String() string

type SQSJkr

type SQSJkr interface {
	Run(context.Context) error
	Config() *Config
	JobStream() chan Job
	Locker() lock.Locker
	Throttler() throttle.Throttler
	SetLocker(locker lock.Locker)
	SetThrottler(throttle throttle.Throttler)
}

SQSJkr interfaces

type SQSSection

type SQSSection struct {
	QueueName string `toml:"queue_name"`
}

SQSSection is the AWS SQS configure

type Stats added in v0.5.0

type Stats struct {
	Workers struct {
		Busy int64 `json:"busy"`
		Idle int64 `json:"idle"`
	} `json:"workers"`
	Invocations struct {
		Succeeded int64 `json:"succeeded"`
		Failed    int64 `json:"failed"`
		Errored   int64 `json:"errored"`
	} `json:"invocations"`
	// contains filtered or unexported fields
}

Stats represents stats.

type StatsItem

type StatsItem struct {
	IdleWorkerNum uint32 `json:"idle_worker"`
	BusyWorkerNum uint32 `json:"busy_worker"`
}

StatsItem struct

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker struct

func (Worker) ReceiveMessage

func (w Worker) ReceiveMessage()

ReceiveMessage receive messages

Directories

Path Synopsis
cmd
sqsjkr
sample code how to use sqsjkr
sample code how to use sqsjkr

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL