sqsworker

package module
v0.0.0-...-da5baaf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 1, 2018 License: MIT Imports: 10 Imported by: 0

README

SqsWorker lets you enqueue and processes background backed by AWS SQS queue

Build Status GoDoc

Features:

  • Consumes SQS messages as jobs by multi workers
  • Enqueue jobs to as send messages to SQS queue

Example:

package main

import (
	"fmt"
	"time"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sqs"
	"github.com/pborman/uuid"
	"github.com/canhlinh/sqsworker"
)

func main() {
	testQueue := "testqueue.fifo"

	session := session.Must(session.NewSession())
	sqsClient := sqs.New(session, aws.NewConfig().WithRegion("us-east-1"))


	sqsWorkerPool := sqsworker.New(testQueue, 2, sqsClient)
	sqsWorkerPool.RegisterJobHandler("doingsomething", jobHandler)
	sqsWorkerPool.Start()
	sqsWorkerPool.Enqueue("doingsomething", map[string]interface{}{"paramter_a": uuid.New()})

	time.Sleep(3*time.Second)
	sqsWorkerPool.Stop()
}

 func jobHandler(args map[string]interface{}) error {
	fmt.Println("Run job", args)
	return nil
}

Documentation

Index

Constants

View Source
const (
	JobNameAttr = "job_name"
)
View Source
const (
	MaxNumberOfMessages = 10
)

Variables

View Source
var DefaultWaitTime int64 = 10

DefaultWaitTime default seconds for wating to receive jobs from the queue

Functions

func CreateSqsQueue

func CreateSqsQueue(sqsClient *sqs.SQS, queue string) (*string, error)

CreateSqsQueue create SQS queue

func CreateSqsQueueIfNotExist

func CreateSqsQueueIfNotExist(sqsClient *sqs.SQS, queue string) (*string, error)

CreateSqsQueueIfNotExist create sqs if not exist

func DeleteSqsQueue

func DeleteSqsQueue(sqsClient *sqs.SQS, queue string) error

DeleteSqsQueue deletes a sqs queue

func MapToJSON

func MapToJSON(m map[string]interface{}) string

MapToJSON marshal a map to json string

Types

type Dequeuer

type Dequeuer struct {
	// contains filtered or unexported fields
}

Dequeuer present a dequeuer

func NewDequeuer

func NewDequeuer(queueURL *string, sqsClient *sqs.SQS, visibleTimeout int64, maxJobs int64) *Dequeuer

NewDequeuer create a new dequeuer

func (*Dequeuer) Cancel

func (d *Dequeuer) Cancel()

Cancel stops polling message from SQS

func (*Dequeuer) Dequeue

func (d *Dequeuer) Dequeue(job *Job) error

Dequeue removes a job out of queue

func (*Dequeuer) Poll

func (d *Dequeuer) Poll() ([]*Job, error)

Poll polls new jobs in the sqs queue

type Enqueuer

type Enqueuer struct {
	// contains filtered or unexported fields
}

Enqueuer presents an queuer

func NewEnqueuer

func NewEnqueuer(queueURL *string, sqsClient *sqs.SQS) *Enqueuer

NewEnqueuer creates new Enqueuer

func (*Enqueuer) Enqueue

func (e *Enqueuer) Enqueue(jobName string, args map[string]interface{}) (*Job, error)

Enqueue enqueues a job

type Job

type Job struct {
	ID        *string
	Name      string
	ReceiptID *string
	Args      map[string]interface{}
	Retries   int
}

Job represents a job.

func SqsMessageToJob

func SqsMessageToJob(msg *sqs.Message) (*Job, error)

SqsMessageToJob converts sqs message to a job

func (*Job) SqsMessage

func (job *Job) SqsMessage() (*sqs.SendMessageInput, error)

SqsMessage convert Job to SendMessageInput

type JobHandler

type JobHandler func(args map[string]interface{}) error

JobHandler signature function to consume a job

type Worker

type Worker struct {
	Pool *WorkerPool
}

func NewWorker

func NewWorker(pool *WorkerPool) *Worker

type WorkerPool

type WorkerPool struct {
	*Enqueuer
	*Dequeuer
	// contains filtered or unexported fields
}

WorkerPool represents a pool of workers

func New

func New(queue string, maxWorkers int64, sqsClient *sqs.SQS) *WorkerPool

New create a new WorkerPool

func (*WorkerPool) Enqueue

func (pool *WorkerPool) Enqueue(name string, args map[string]interface{}) (*Job, error)

Enqueue enqueues a new job

func (*WorkerPool) ExistJobHandler

func (pool *WorkerPool) ExistJobHandler(name string) bool

ExistJobHandler check whether a job handler has been registered

func (*WorkerPool) RegisterJobHandler

func (pool *WorkerPool) RegisterJobHandler(name string, handler JobHandler)

RegisterJobHandler register job handlers before start workers

func (*WorkerPool) Start

func (pool *WorkerPool) Start()

Start start the worker pool

func (*WorkerPool) Stop

func (pool *WorkerPool) Stop()

Stop the worker pool

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL