delayd2

package module
v0.0.0-...-1fb7f3c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 1, 2016 License: BSD-3-Clause Imports: 16 Imported by: 0

README

delayd2

Build Status BSD License

delayd2 is an available setTimeout() service for scheduling message sends.

delayd2 stores scheduling information on clustered (aka streaming replication) PostgreSQL instances as a stable storage layer. MySQL (or Amazon Aurora) support is also planned.

Message Format

Message bodies are forwarded unchanged from received messages after their delay elapses.

All Delayd directives are taken from SQS Message Attributes.

Required Message Attributes
  • delayd2-delay (Integer): Delay time in second before emitting this message.
  • delayd2-relay-to (String): Queue name for relaying this message.

Acknowledgement and License

delayd2 is my whole rework against nabeken/delayd which was an fork of goinstant/delayd.

© 2015 Ken-ichi TANABE. Licensed under the BSD 3-clause license.

Documentation

Overview

delayd2 is an available setTimeout() service for scheduling message sends

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrMessageDuplicated = errors.New("driver: message is duplicated")
	ErrSessionRegistered = errors.New("driver: session is already registered")
)
View Source
var ErrInvalidAttributes = errors.New("delayd2: invalid attributes")
View Source
var ErrTooManyPayloads = errors.New("relay: too many payloads. Up to 10 payloads can be sent at once")

Functions

func BuildBatchMap

func BuildBatchMap(messages []*QueueMessage) map[string][][]releaseMessage

func BuildPlaceHolders

func BuildPlaceHolders(n int) string

func SendMessageBatch

func SendMessageBatch(s *sqs.SQS, req *sqs.SendMessageBatchInput, id2index map[string]int) error

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a SQS message consumer.

func NewConsumer

func NewConsumer(workerID string, driver Driver, queue *queue.Queue) *Consumer

func (*Consumer) ConsumeMessages

func (c *Consumer) ConsumeMessages() (int64, error)

ConsumeMessages consumes messages in SQS queue. It returns the number of consumed message in success.

type Driver

type Driver interface {
	RegisterSession() error
	DeregisterSession() error
	KeepAliveSession() error
	Enqueue(string, int64, string, string) error
	ResetActive() (int64, error)
	MarkActive(time.Time) (int64, error)
	MarkOrphaned() error
	AdoptOrphans() (int64, error)
	RemoveMessages([]string) error
	GetActiveMessages() ([]*QueueMessage, error)
}

func NewDriver

func NewDriver(workerID string, db *sql.DB) Driver

type QueueMessage

type QueueMessage struct {
	QueueID   string
	WorkerID  string
	ReleaseAt time.Time
	RelayTo   string
	Payload   string
}

QueueMessage is a message queued in the database.

type Relay

type Relay struct {
	// contains filtered or unexported fields
}

func NewRelay

func NewRelay(sqsSvc *sqs.SQS) *Relay

func (*Relay) Relay

func (r *Relay) Relay(relayTo string, payloads []string) error

Relay relays payloads to relayTo queue.

type Sender

type Sender struct {
	// contains filtered or unexported fields
}

func NewSender

func NewSender(queue *queue.Queue) *Sender

func (*Sender) SendMessage

func (s *Sender) SendMessage(duration int, relayTo, payload string) error

func (*Sender) SendMessageBatch

func (s *Sender) SendMessageBatch(duration int, relayTo string, payloads []string) error

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker is the delayd2 worker.

func NewWorker

func NewWorker(c *WorkerConfig, driver Driver, consumer *Consumer, relay *Relay) *Worker

NewWorker creates a new worker.

func (*Worker) Run

func (w *Worker) Run() error

Run starts the worker process.

func (*Worker) Shutdown

func (w *Worker) Shutdown(ctx context.Context) error

type WorkerConfig

type WorkerConfig struct {
	ID string

	LeaveMessagesOrphanedAtShutdown bool

	NumConsumerFactor int
	NumRelayFactor    int
}

WorkerConfig is configuration for Worker.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL