jdi

package module
v0.0.0-...-a6be2c3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 7, 2020 License: GPL-3.0 Imports: 15 Imported by: 4

README

JDI. A worker that does the job
===============================

DO NOT USE IT. I think it's not ready.

.. image:: https://i.imgur.com/9IZfg7Z.gif

.. code-block:: go

   import "gitlab.com/pashinin.com/jdi"

Features
--------

* supports NATS-streaming for tasks exchange and NATS for messages exchange
* per queue concurrency
* Prometheus metrics

Example
-------

For example worker built with JDI - see "example/" folder. You can run
it from repo's root:

.. code-block::

   docker-compose -f example/docker-compose.yml up
   make example
   ./worker

Tasks broker
------------

Two parameters must be set: :code:`jdi.TasksBroker` and
:code:`jdi.MessagesBroker`. At-Least-Once delivery must be used for
tasks and At-Most-Once for other messages.

#. NATS and NATS-streaming

   .. code-block:: go

      jdi.TasksBroker = "nats://nats-streaming.service.consul:4223"
      jdi.MessagesBroker = "nats://nats.service.consul:4222"

..
   #. Redis (TODO)
   #. RabbitMQ
   #. Kafka
   #. RocketMQ
   #. Qpid
   #. Artemis
   #. NSQ
   #. ZeroMQ

Storage
-------

Tasks meta information will be stored here. A consistent storage is
required for *Exactly-Once* tasks execution.


Queues
------

JDI supports per queue concurrency. A worker can have many queues each
with it's own concurrency level.

To add a custom queue:

.. code-block:: go

   jdi.AddQueue(&jdi.Queue{
       Name:        "insane",
       Concurrency: 2000,
       JobsPool:    make(chan *jdi.Job, 3000),
   })

By default there is 1 queue named :code:`default` with
:code:`concurrency = 20` and :code:`JobsPool` size = 30.

If task's queue can not be found - it is placed in "default" queue. This
queue must always exist.

Monitoring
----------

Initializing JDI with the same parameters as your workers but setting
:code:`jdi.Monitor = true` will allow you to get all info and updates
from workers. But this JDI "worker" will not connect to TasksBroker. So
no tasks are executed on this instance. Also no queue goroutines are
created.

Run task EXACTLY ONCE
---------------------

**About At-Least-Once delivery.** By default NATS-streaming sends maximum
1 message to a worker at a time and waits 30s until a worker
acknowledges it (all limits can be configured). If a message was not
acknowledged - NATS-streaming will re-deliver it to another worker or to
the current one again. NATS will re-deliver it again and again until one
worker finishes one of the previously started jobs and acknowledges NATS
message. So one task can be received many times.

Running task exactly once is a *hack*. We save an ID of an incoming task
in a common consistent storage (for all workers) with unique ID
constraint. So no 2 workers can insert such record if it is already
created even if a message was re-delivered to another worker.

Use in-memory persistent storage for storing tasks metadata or results:
Redis, Tarantool.

There can NOT be a situation when 2 workers try to insert the same task
ID into 2 Tarantool masters. At the same time only one worker has a
message from NATS. If it can't INSERT a new task ID within a given
timeout - message will not be acknowledged and will be re-delivered.

There can be a rare case when two workers have INSERTed same task ID
into two different Tarantool masters. For example when first INSERT
couldn't replicate to master-2 (Tarantool) due to a network problem. And
then second INSERT is being made on master-2.



..
   Use Nomad Leader as Tarantool write node:

   http://localhost:4646/v1/status/leader

Documentation

Overview

JDI. A worker that does the job

Index

Constants

This section is empty.

Variables

View Source
var (
	Router *gin.Engine
	Port   int
)
View Source
var (
	// default: "jdi-chan-task"
	TaskChannel string

	TasksBrokerURI string

	TasksStorage string
)
View Source
var (
	// If Debug == true some changes are made:
	//
	// 1. Logger is created in "debug" mode
	DEBUG bool

	MessageChannel string

	// If Monitor = true - no tasks will be executed by this instance.
	Monitor bool = false

	MessagesBroker string = "nats://localhost"

	// A list of other workers' tasks that I don't have in RegisteredTasks
	// A map of { task-name: map-of-workers-having-that-task }
	// Why map of workers? To have a unique list of them without duplicates.
	OtherWorkersTasks map[string]map[string]struct{}
	OtherWorkers      map[string]struct{}

	ACKS_LATE bool
)
View Source
var Delay func(taskName string, args ...interface{}) string

This can be set only after task broker initialization

View Source
var (
	Exit chan bool
)
View Source
var (
	IsHealthy bool
)
View Source
var RegisterTask = task.RegisterTask

var Delay = task.Broker.Delay

Functions

func AddTasksOfAnotherWorker

func AddTasksOfAnotherWorker(worker string, tasks []string)

func Connect

func Connect() (err error)

func CreateLogger

func CreateLogger() *zap.Logger

func CreateRouter

func CreateRouter()

func GetTask

func GetTask(name string) job.JobFunc

func GinAPI

func GinAPI()

func GracefulShutdown

func GracefulShutdown()

func Healthy

func Healthy() (bool, map[string]string)

This function is called either once in 10 seconds or when hitting /healthcheck route.

func Init

func Init() (err error)

func MessageReceived

func MessageReceived(m *Message)

MessageReceived handles messages from other JDI workers

Possible messages:

newbie-here
When a new worker starts - it send this message

func PublishExecutingTasks

func PublishExecutingTasks()

// Sends executing tasks list to "tasks" channel

func RandString

func RandString(length int) string

func RegisteredTasksList

func RegisteredTasksList() []string

RegisteredTasksList returns a slice of registered Task names.

func StartAllQueues

func StartAllQueues()

func StopAPI

func StopAPI()

func StopWorker

func StopWorker()

Stop closes all jobs channels

func StringWithCharset

func StringWithCharset(length int, charset string) string

func TasksInCluster

func TasksInCluster() []string

func WaitToFinish

func WaitToFinish()

StartWorker

Blocking function

Types

type Job

type Job = job.Job

type JobID

type JobID = job.JobID

type Message

type Message struct {
	Worker  string
	Cmd     string
	Payload interface{}
}

type MessageBroker

type MessageBroker interface {
	// Init connects to a message broker and subscribes.
	Init(uri string) error

	// Send a message to all workers including myself.
	// Usually it's enough to specify only cmd.
	// But if a command needs arguments - pass them in args
	Send(cmd string, args interface{}) error
}
var Msg MessageBroker

type MessageBrokerNATS

type MessageBrokerNATS struct {
	Conn *nats.Conn // core NATS
	// contains filtered or unexported fields
}

func (*MessageBrokerNATS) Init

func (b *MessageBrokerNATS) Init(uri string) (err error)

func (*MessageBrokerNATS) Send

func (b *MessageBrokerNATS) Send(cmd string, m interface{}) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL