jobworker

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 2, 2021 License: MIT Imports: 11 Imported by: 7

README

jobworker

Description

Package jobworker provides a generic interface around message queue.

The jobworker package must be used in conjunction with some message queue connector.

list of connectors:

Requirements

Go 1.13+

Installation

This package can be installed with the go get command:

$ go get -u github.com/go-jwdk/jobworker

Usage

Basically

Implements worker processes using go-jwdk/awa-sqs-connector.

package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	_ "github.com/go-jwdk/aws-sqs-connector"
	jw "github.com/go-jwdk/jobworker"
)

func main() {
	sqs, err := jw.Open("sqs", map[string]interface{}{
		"Region":          os.Getenv("REGION"),
		"NumMaxRetries":   3,
	})
	if err != nil {
		log.Println("Could not open a sqs conn", err)
		return
	}
	sqs.SetLoggerFunc(log.Println)
	worker, err := jw.New(&jw.Setting{
		Primary:    sqs,
		LoggerFunc: log.Println,
	})
	if err != nil {
		log.Println("Could not create a job worker", err)
		return
	}
	worker.Register("test", &HelloWorker{},
		jw.SubscribeMetadata("PollingInterval", "3"),
		jw.SubscribeMetadata("VisibilityTimeout", "20"),
		jw.SubscribeMetadata("WaitTimeSeconds", "10"),
		jw.SubscribeMetadata("MaxNumberOfJobs", "4"))

	go func() {
		log.Println("Start work")
		err := worker.Work(&jw.WorkSetting{
			WorkerConcurrency: 5,
		})
		if err != nil {
			log.Println("Failed to work", err)
			return
		}
	}()

	quit := make(chan os.Signal, 1)
	signal.Notify(quit, os.Interrupt, syscall.SIGTERM)

	<-quit

	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()
	log.Println("Received a signal of graceful shutdown")

	if err := worker.Shutdown(ctx); err != nil {
		log.Println("Failed to graceful shutdown:", err)
	}

	log.Println("Completed graceful shutdown")
}

type HelloWorker struct {
}

func (HelloWorker) Work(job *jw.Job) error {
	log.Println("[HelloWorker]", job.Content)
	return nil
}
Enqueue/EnqueueBatch

Implements job enqueue.

sqs, err := jw.Open("sqs", map[string]interface{}{
	"Region":          os.Getenv("REGION"),
	"NumMaxRetries":   3,
})
if err != nil {
	log.Println("Could not open a sqs conn", err)
	return
}
sqs.SetLoggerFunc(log.Println)
worker, err := jw.New(&jw.Setting{
	Primary:    sqs,
	LoggerFunc: log.Println,
})
if err != nil {
	log.Println("Could not create a job worker", err)
	return
}

_, err := worker.Enqueue(context.Background(), &jw.EnqueueInput{
	Queue:   "test",
	Content: fmt.Sprintf(`{"msg":"%s"}`, uuid.NewV4().String()),
	Metadata: map[string]string{
		"MessageDelaySeconds": "3",
	},
})
if err != nil {
	log.Println("Failed to enqueue", err)
}
Primary/Secondary

Set up primary and secondary connectors.

  • Primary: go-jwdk/awa-sqs-connector/sqs
  • Secondary: go-jwdk/db-connector/mysql
import (
	jw "github.com/go-jwdk/jobworker"
	_ "github.com/go-jwdk/awa-sqs-connector"
	_ "github.com/go-jwdk/db-connector/mysql"
)

sqs, err := jobworker.Open("sqs", map[string]interface{}{
	"Region": os.Getenv("REGION"),
})

mysql, err := jobworker.Open("mysql", map[string]interface{}{
	"DSN":             "test-db",
	"MaxOpenConns":    3,
	"MaxMaxIdleConns": 3,
	"ConnMaxLifetime": time.Minute,
	"NumMaxRetries":   3,
})

jw, err := jw.New(&jw.Setting{
    Primary:   sqs,
    Secondary: mysql,
})

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrJobDuplicationDetected = fmt.Errorf("job duplication detected")
	ErrNoActiveConn           = fmt.Errorf("no active conn")
)
View Source
var (
	ErrPrimaryConnIsRequired = errors.New("primary conn is required")
	ErrDuplicateEntryID      = errors.New("duplicate entry id")
)

Functions

func Register

func Register(name string, driver Driver)

Types

type CompleteJobInput

type CompleteJobInput struct {
	Job *Job
}

type CompleteJobOutput

type CompleteJobOutput struct{}

type Connector

type Connector interface {
	Name() string
	Subscribe(ctx context.Context, input *SubscribeInput) (*SubscribeOutput, error)
	Enqueue(ctx context.Context, input *EnqueueInput) (*EnqueueOutput, error)
	EnqueueBatch(ctx context.Context, input *EnqueueBatchInput) (*EnqueueBatchOutput, error)
	CompleteJob(ctx context.Context, input *CompleteJobInput) (*CompleteJobOutput, error)
	FailJob(ctx context.Context, input *FailJobInput) (*FailJobOutput, error)
	Close() error
	SetLoggerFunc(f LoggerFunc)
}

func Open

func Open(driverName string, attrs map[string]interface{}) (Connector, error)

type ConnectorMock added in v0.2.2

type ConnectorMock struct {
	NameFunc          func() string
	SubscribeFunc     func(ctx context.Context, input *SubscribeInput) (*SubscribeOutput, error)
	EnqueueFunc       func(ctx context.Context, input *EnqueueInput) (*EnqueueOutput, error)
	EnqueueBatchFunc  func(ctx context.Context, input *EnqueueBatchInput) (*EnqueueBatchOutput, error)
	CompleteJobFunc   func(ctx context.Context, input *CompleteJobInput) (*CompleteJobOutput, error)
	FailJobFunc       func(ctx context.Context, input *FailJobInput) (*FailJobOutput, error)
	CloseFunc         func() error
	SetLoggerFuncFunc func(f LoggerFunc)
}

func (*ConnectorMock) Close added in v0.2.2

func (m *ConnectorMock) Close() error

func (*ConnectorMock) CompleteJob added in v0.2.2

func (m *ConnectorMock) CompleteJob(ctx context.Context, input *CompleteJobInput) (*CompleteJobOutput, error)

func (*ConnectorMock) Enqueue added in v0.2.2

func (m *ConnectorMock) Enqueue(ctx context.Context, input *EnqueueInput) (*EnqueueOutput, error)

func (*ConnectorMock) EnqueueBatch added in v0.2.2

func (m *ConnectorMock) EnqueueBatch(ctx context.Context, input *EnqueueBatchInput) (*EnqueueBatchOutput, error)

func (*ConnectorMock) FailJob added in v0.2.2

func (m *ConnectorMock) FailJob(ctx context.Context, input *FailJobInput) (*FailJobOutput, error)

func (*ConnectorMock) Name added in v0.2.2

func (m *ConnectorMock) Name() string

func (*ConnectorMock) SetLoggerFunc added in v0.2.2

func (m *ConnectorMock) SetLoggerFunc(f LoggerFunc)

func (*ConnectorMock) Subscribe added in v0.2.2

func (m *ConnectorMock) Subscribe(ctx context.Context, input *SubscribeInput) (*SubscribeOutput, error)

type ConnectorProvider

type ConnectorProvider struct {
	// contains filtered or unexported fields
}

func (*ConnectorProvider) Close

func (p *ConnectorProvider) Close()

func (*ConnectorProvider) GetActiveConnsInPriorityOrder added in v0.2.2

func (p *ConnectorProvider) GetActiveConnsInPriorityOrder() []Connector

func (*ConnectorProvider) GetConnsInPriorityOrder added in v0.2.2

func (p *ConnectorProvider) GetConnsInPriorityOrder() []Connector

func (*ConnectorProvider) IsDead

func (p *ConnectorProvider) IsDead(conn Connector) bool

func (*ConnectorProvider) MarkDead

func (p *ConnectorProvider) MarkDead(conn Connector)

func (*ConnectorProvider) Register

func (p *ConnectorProvider) Register(priority int, conn Connector)

func (*ConnectorProvider) SetRetrySeconds

func (p *ConnectorProvider) SetRetrySeconds(sec time.Duration)

type CustomAttribute

type CustomAttribute struct {
	DataType    string
	BinaryValue []byte
	StringValue string
}

type Driver

type Driver interface {
	Open(attrs map[string]interface{}) (Connector, error)
}

type EnqueueBatchEntry added in v0.2.1

type EnqueueBatchEntry struct {
	ID              string // Uniq ID
	Content         string
	Metadata        map[string]string
	CustomAttribute map[string]*CustomAttribute
}

type EnqueueBatchInput

type EnqueueBatchInput struct {
	Queue   string
	Entries []*EnqueueBatchEntry
}

type EnqueueBatchOutput

type EnqueueBatchOutput struct {
	Failed     []string // Uniq ID
	Successful []string // Uniq ID
}

type EnqueueInput

type EnqueueInput struct {
	Queue           string
	Content         string
	Metadata        map[string]string
	CustomAttribute map[string]*CustomAttribute
}

type EnqueueOutput

type EnqueueOutput struct{}

type FailJobInput

type FailJobInput struct {
	Job *Job
}

type FailJobOutput

type FailJobOutput struct{}

type Job

type Job struct {
	Conn            Connector
	QueueName       string
	Content         string
	Metadata        map[string]string
	CustomAttribute map[string]*CustomAttribute
	Raw             interface{} // raw data of different jobs for each connector
	// contains filtered or unexported fields
}

func (*Job) IsFinished

func (j *Job) IsFinished() bool

type JobStat added in v0.3.0

type JobStat struct {
	Conn            string
	Queue           string
	Content         string
	Metadata        map[string]string
	CustomAttribute map[string]*CustomAttribute
}

type JobWorker

type JobWorker struct {
	// contains filtered or unexported fields
}

func New

func New(s *Setting) (*JobWorker, error)

func (*JobWorker) Enqueue

func (jw *JobWorker) Enqueue(ctx context.Context, input *EnqueueInput) (*EnqueueOutput, error)

func (*JobWorker) EnqueueBatch

func (jw *JobWorker) EnqueueBatch(ctx context.Context, input *EnqueueBatchInput) (*EnqueueBatchOutput, error)

func (*JobWorker) ForceExitActiveJob added in v0.4.0

func (jw *JobWorker) ForceExitActiveJob(ctx context.Context) error

func (*JobWorker) GetStats added in v0.3.0

func (jw *JobWorker) GetStats() *Stats

func (*JobWorker) Register

func (jw *JobWorker) Register(queue string, worker Worker, opts ...OptionFunc)

func (*JobWorker) RegisterFunc

func (jw *JobWorker) RegisterFunc(queue string, f WorkerFunc, opts ...OptionFunc)

func (*JobWorker) RegisterOnShutdown

func (jw *JobWorker) RegisterOnShutdown(f func())

func (*JobWorker) Shutdown

func (jw *JobWorker) Shutdown(ctx context.Context) error

func (*JobWorker) Work

func (jw *JobWorker) Work(s *WorkSetting) error

func (*JobWorker) WorkOnceSafely added in v0.2.2

func (jw *JobWorker) WorkOnceSafely(ctx context.Context, job *Job)

type LoggerFunc

type LoggerFunc func(...interface{})

type Option

type Option struct {
	SubscribeMetadata map[string]string
}

func (*Option) ApplyOptions

func (o *Option) ApplyOptions(opts ...OptionFunc)

type OptionFunc added in v0.2.1

type OptionFunc func(*Option)

func SubscribeMetadata added in v0.2.1

func SubscribeMetadata(k, v string) OptionFunc

SubscribeMetadata is metadata of subscribe func

type Setting

type Setting struct {
	Primary   Connector
	Secondary Connector

	DeadConnectorRetryInterval int64 // Seconds

	LoggerFunc LoggerFunc
}

type Stats added in v0.3.0

type Stats struct {
	Jobs []*JobStat
}

type SubscribeInput

type SubscribeInput struct {
	Queue    string
	Metadata map[string]string
}

type SubscribeOutput

type SubscribeOutput struct {
	Subscription Subscription
}

type Subscription

type Subscription interface {
	Active() bool
	Queue() chan *Job
	UnSubscribe() error
}

type SubscriptionMock added in v0.2.2

type SubscriptionMock struct {
	ActiveFunc      func() bool
	QueueFunc       func() chan *Job
	UnSubscribeFunc func() error
}

func (*SubscriptionMock) Active added in v0.2.2

func (m *SubscriptionMock) Active() bool

func (*SubscriptionMock) Queue added in v0.2.2

func (m *SubscriptionMock) Queue() chan *Job

func (*SubscriptionMock) UnSubscribe added in v0.2.2

func (m *SubscriptionMock) UnSubscribe() error

type WorkSetting

type WorkSetting struct {
	HeartbeatInterval int64 // Sec
	OnHeartBeat       func(job *Job)
	WorkerConcurrency int
}

type Worker

type Worker interface {
	Work(*Job) error
}

type WorkerFunc

type WorkerFunc func(job *Job) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL