cogman

package module
v1.0.1-0...-7ff242b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2021 License: MIT Imports: 13 Imported by: 0

README

CircleCI Go Report Card GitHub godoc for Joker666/cogman

logo

Table of Contents

How to use:

First add it to $GOPATH

go get github.com/Joker666/cogman

Then add configuration for rabbitmq for messaging, redis as backend, optionally mongodb as backend for re-enqueuing feature.
Start the server to consume the tasks and start the client session to send tasks to server. Start server and client.
Write task handlers and register them. Send the tasks to process. And voila!, you have set up the simplest background processing job server.

You should see something like this when everything is up and running List of services

Motivation

In python world you have Celery, in Ruby world you have Resque, SideKiq, in C# Hangfire. All of them had one thing in common, simple interface to get started. When building products in Golang, this was apparent that, there is no library with a simple interface. We have Machinery, which is an excellent library, but has a steep learning curve. Also it has tonnes of features backed in that you do not need and but is required anyway.

Also the way it handled processing of future tasks with RabbitMQ's Dead Letter Exchange, we were not very fond of it. So we decided to make our own job processing library. This is a opinionated library as it uses RabbitMQ as the message broker and Redis and optionally MongoDB backend for more features. This setup has worked great for us in production and under stress and I believe this can work for large tasks as well

Requirements

  • Go
  • RabbitMQ
  • Redis
  • MongoDB (optional)

Features

  • Task Priority
  • Persistence
  • Queue type
  • Retries
  • Multiple consumer & server
  • Concurrency
  • Redis and Mongo log
  • Re-enqueue recovered task
  • Handle Reconnection
  • UI
  • Rest API

Examples

Setup

Config

Cogman api config example.

cfg := &config.Config{
    ConnectionTimeout: time.Minute * 10, // default value 10 minutes
    RequestTimeout   : time.Second * 5,  // default value 5 second


    AmqpURI : "amqp://localhost:5672",                  // required
    RedisURI: "redis://localhost:6379/0",               // required
    MongoURI: "mongodb://root:secret@localhost:27017/", // optional

    RedisTTL: time.Hour * 24 * 7,  // optional. default value 1 week
    MongoTTL: time.Hour * 24 * 30, // optional. default value 1 month

    HighPriorityQueueCount: 2,     // optional. default value 1
    LowPriorityQueueCount : 4,     // optional. default value 1
    StartRestServer:        true   // optional. default value false
}

Client & Server also has individual config file to use them separately.

Client/Server

This Cogman api call will start a client and a server.

if err := cogman.StartBackground(cfg); err != nil {
    log.Fatal(err)
}

Instead, if you want you can initiate Client & Server individually:

// Client
client, err := cogman.NewSession(cfg)
if err != nil {
    log.Fatal(err)
}

if err := client.Connect(); err != nil {
    log.Fatal(err)
}

// Server
server, err := cogman.NewServer(cfg)
if err != nil {
    log.Fatal(err)
}

go func() {
    defer server.Stop()
    if err = server.Start(); err != nil {
        log.Fatal(err)
    }
}()

Task

Tasks are grouped by two priority level. Based on that it will be assigned to a queue.

type Task struct {
    TaskID         string       // unique. ID should be assigned by Cogman.
    Name           string       // required. And Task name must be registered with a task handler
    OriginalTaskID string       // a retry task will carry it's parents ID.
    PrimaryKey     string       // optional. Client can set any key to trace a task.
    Retry          int          // default value 0.
    Prefetch       int          // optional. Number of task fetch from queue by consumer at a time.
    Payload        []byte       // required
    Priority       TaskPriority // required. High or Low
    Status         Status       // current task status
    FailError      string       // default empty. If Status is failed, it must have a value.
    Duration       *float64     // task execution time.
    CreatedAt      time.Time    // create time.
    UpdatedAt      time.Time    // last update time.
}

Worker/Task Handler

Any struct can be passed as a handler it implements below interface:

type Handler interface {
	Do(ctx context.Context, payload []byte) error
}

A function type HandlerFunc also can pass as handler.

type HandlerFunc func(ctx context.Context, payload []byte) error

func (h HandlerFunc) Do(ctx context.Context, payload []byte) error {
	return h(ctx, payload)
}

Register The Handlers

// Register task handler from Server side
server.Register(taskName, handler)
server.Register(taskName, handlerFunc)

Send Task

Sending task using Cogman API:

if err := cogman.SendTask(*task, handler); err != nil {
	log.Fatal(err)
}

// If a task handler is already registered, you can pass nil.
if err := cogman.SendTask(*task, nil); err != nil {
	log.Fatal(err)
}

Sending task using Cogman Client/Server:

// Sending task from client
if err := client.SendTask(task); err != nil {
    return err
}

Queue

Cogman queue type:

- High_Priority_Queue [default Queue]  
- Low_Priority_Queue  [lazy Queue]

There are two types queues that Cogman maintains. Default & Lazy queue. High priority tasks would be pushed to default queue and low priority task would be pushed to lazy queue. The number of each type of queues can be set by client/server through configuration. Queue won't be lost after any sort of connection interruption.

Re-Connection

Cogman Client & Server both handles reconnection. If the client loses connection, it can still take tasks, and those will be processed immediate after Cogman client gets back the connection. After Server reconnects, it will start to consume tasks without losing any task.

Re-Enqueue

Re-enqueue feature to recover all the initiated task those are lost for connection error. If client somehow loses the amqp connection, Cogman can still take the task in offline. All offline task will be re-queue after connection re-established. Cogman fetches all the offline tasks from mongo logs, and re-initiate them. Mongo connection required here. For re-enqueuing, task retry count would not change.

Feature Comparison

Comparison among the other job/task process runner.

Feature Cogman Machinery
Backend redis/mongo redis
Priorities
Re-Enqueue
Concurrency
Re-Connection
Delayed jobs
Concurrent client/server
Re-try
Persistence
UI
Rest API
Chain
Chords
Groups

Contribution

Want to contribute? Great!

To fix a bug or enhance an existing module, follow these steps:

  • Fork the repo
  • Create a new branch (git checkout -b improve-feature)
  • Make the appropriate changes in the files
  • Add changes to reflect the changes made
  • Commit your changes (git commit -am 'Improve feature')
  • Push to the branch (git push origin improve-feature)
  • Create a Pull Request

Bug / Feature Request

If you find a bug, kindly open an issue here.
If you'd like to request/add a new function, feel free to do so by opening an issue here.

License

MIT © MD Ahad Hasan

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrRequestTimeout    = errors.New("cogman: request timeout")
	ErrInvalidData       = errors.New("cogman: invalid data")
	ErrConnectionTimeout = errors.New("cogman: connection timeout")
	ErrInvalidConfig     = errors.New("cogman: invalid server config")
	ErrDuplicateTaskName = errors.New("cogman: duplicate task name")
	ErrRunningServer     = errors.New("cogman: server is already running")
	ErrStoppedServer     = errors.New("cogman: server is already stopped")
	ErrNoTask            = errors.New("cogman: server has no task")
	ErrTaskHeadless      = errors.New("cogman: headless task")
	ErrTaskUnidentified  = errors.New("cogman: unidentified task")
	ErrTaskUnhandled     = errors.New("cogman: unhandled task")
	ErrNoTaskID          = errors.New("cogman: no task id")
)

list of errors

Functions

func Register

func Register(taskName string, handler util.Handler) error

Register registers task for server to process

func SendTask

func SendTask(task util.Task, handler util.Handler) error

SendTask sends task to server from client

func StartBackground

func StartBackground(cfg *config.Config) error

StartBackground starts the server and client in background

Types

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server hold the necessary field for server

func NewServer

func NewServer(cfg config.Server) (*Server, error)

NewServer return a new server instance

func (*Server) Consume

func (s *Server) Consume(ctx context.Context, prefetch int)

Consume start task consumption

func (*Server) GetTaskHandler

func (s *Server) GetTaskHandler(taskName string) util.Handler

GetTaskHandler return task handler

func (*Server) Register

func (s *Server) Register(taskName string, h util.Handler) error

Register register a task handler. taskName must be unique

func (*Server) Start

func (s *Server) Start() error

Start will start task consumer, mongo & redis connection

func (*Server) Stop

func (s *Server) Stop() error

Stop closes all the connection of Cogman server

type TaskHandlerMissingError

type TaskHandlerMissingError string

TaskHandlerMissingError is error when task handler is missing

func (TaskHandlerMissingError) Error

func (t TaskHandlerMissingError) Error() string

Directories

Path Synopsis
cmd
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL