sharq

package module
v0.0.0-...-c3e20b8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2018 License: MIT Imports: 15 Imported by: 0

README

go-sharq

SharQ is a flexible rate limited queueing system built using Redis.

go-sharq provides a core client library for both producer and consumer. This is a golang port of python library sharq. It also provides HTTP client for sharq-server.

Installation

Install using go get

go get -u github.com/karixtech/go-sharq

Usage

Initialization

Import the library

import sharq "github.com/karixtech/go-sharq"

Initialize the config

// NewCoreClientConfig returns default config
sharq_config := sharq.NewCoreClientConfig()
// Set redis options for universal redis client (supports sentinel and cluster clients)
sharq_config.RedisOptions = &redis.UniversalOptions{
	Addrs:    []string{"127.0.0.1:6379"},
	DB:       0,
}
// Requeue goroutine is disabled by default. Enable it to automatically requeue tasks not completed.
sharq_config.RunRequeue = true
sharq_client = sharq.NewCoreClient(sharq_config)
Enqueue

Enqueues a job into the queue. Every enqueue request is accompanied with an interval. The interval specifies the rate limiting capability of SharQ. An interval of 1000ms implies that SharQ will ensure two successful dequeue requests will be separated by 1000ms (interval is the inverse of rate. 1000ms interval means 1 job per second)

response := sharq_client.Enqueue(&sharq.EnqueueRequest{
  JobID:     "cea84623-be35-4368-90fa-7736570dabc4",
  Payload:   map[string]string{"message": "hello, world"},
  Interval:  1000, // in milliseconds
  QueueID:   "user001",
  QueueType: "sms",
})
fmt.Println(response.Status)
fmt.Println(response.JobID)
fmt.Println(response.Error)
Dequeue

Dequeues a job (non-blocking). It returns a job only if available or if it is ready for dequeue (based on the interval set while enqueueing).

response, err := sharq_client.Dequeue("sms")
if err != nil {
  fmt.Println("Unexpected error on sharq dequeue")
  fmt.Println(err)
}
if response == nil {
  fmt.Println("No jobs found in sharq")
}
fmt.Println(response.Status)
fmt.Println(response.QueueID)
fmt.Println(response.JobID)
fmt.Println(response.Payload)
fmt.Println(response.RequeuesRemaining)
Finish

Marks any dequeued job as succesfully completed. Any job which does get marked as finished upon dequeue will be re-enqueued into its respective queue after an expiry time (the JobRequeueInterval in the config).

err = sharq_client.Finish("sms", "user001", "bb59a2be-3b48-4645-8134-d9181742e3cf")
fmt.Println(err)

Documentation

Index

Constants

View Source
const (
	UserAgent = "go-sharq"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type CoreClient

type CoreClient struct {
	// contains filtered or unexported fields
}

func NewCoreClient

func NewCoreClient(config CoreClientConfig) *CoreClient

func (*CoreClient) BulkEnqueue

func (c *CoreClient) BulkEnqueue(e []EnqueueRequest) []EnqueueResponse

func (*CoreClient) Dequeue

func (c *CoreClient) Dequeue(queueType string) (*DequeueResponse, error)

func (*CoreClient) Enqueue

func (c *CoreClient) Enqueue(e *EnqueueRequest) EnqueueResponse

func (*CoreClient) Finish

func (c *CoreClient) Finish(queueType, queueID, jobID string) error

type CoreClientConfig

type CoreClientConfig struct {
	JobExpireInterval      int
	JobRequeueInterval     int
	DefaultJobRequeueLimit int

	RedisOptions *redis.UniversalOptions

	KeyPrefix string

	RunRequeue bool
}

func NewCoreClientConfig

func NewCoreClientConfig() CoreClientConfig

Returns a config object with default values set

type DequeueResponse

type DequeueResponse struct {
	Status            string      `json:"status"`
	QueueID           string      `json:"queue_id"`
	JobID             string      `json:"job_id"`
	Payload           interface{} `json:"payload"`
	RequeuesRemaining int         `json:"requeues_remaining"`
}

type EnqueueRequest

type EnqueueRequest struct {
	JobID     string      `json:"job_id"`
	Interval  int         `json:"interval"`
	Payload   interface{} `json:"payload"`
	QueueID   string      `json:"-"`
	QueueType string      `json:"-"`

	Options *EnqueueRequestOptions `json:"-"`
}

type EnqueueRequestOptions

type EnqueueRequestOptions struct {
	RequeueLimit int
}

type EnqueueResponse

type EnqueueResponse struct {
	Status string `json:"status"`
	JobID  string `json:"job_id"`

	Error error `json:"-"`
}

type ProxyClient

type ProxyClient struct {
	// contains filtered or unexported fields
}

func NewProxyClient

func NewProxyClient(URL string) *ProxyClient

func (*ProxyClient) BulkEnqueue

func (c *ProxyClient) BulkEnqueue(e []EnqueueRequest) []EnqueueResponse

func (*ProxyClient) Dequeue

func (c *ProxyClient) Dequeue(queueType string) (*DequeueResponse, error)

func (*ProxyClient) Enqueue

func (*ProxyClient) Finish

func (c *ProxyClient) Finish(queueType, queueID, jobID string) error

type ProxySharqError

type ProxySharqError struct {
	Message string
}

func (ProxySharqError) Error

func (p ProxySharqError) Error() string

type Sharq

type Sharq interface {
	Enqueue(e *EnqueueRequest) EnqueueResponse
	BulkEnqueue(e []EnqueueRequest) []EnqueueResponse
	Dequeue(queueType string) (*DequeueResponse, error)
	Finish(queueType, queueID, jobID string) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL