mogul

package module
v0.0.0-...-98acc6d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 4, 2019 License: Apache-2.0 Imports: 4 Imported by: 0

README

Mogul - locking over nodes via mongoDB

GoDoc Build Status

This packages gives you some functionality to set a global lock for a specified duration. Afterwards the lock is up for grabs again. Make sure to use an unique identifier for each gorouting on each host for the user parameter.

The package uses mongo's atomic handling of documents. A document in the locks collection will automatically represent an atomic entity which can be claimed if it does not exists, or when the associated lock has expired.


func myTaskManagement() {
..

	var m Mananger = New(session.DB(database).C(collection), session.DB(database).C(tasks))
	
	lock := m.NewMutex(name, user)
	 
	if got, _ := lock.TryLock(timeFrame); got {
	    defer lock.Unlock()
	
	    // create some tasks
	    m.Add(taskName, data)
	    m.Add(taskName2, data2)
	
	}
..
}

func myTaskHandler() {
    
    var m Mananger = New(session.DB(database).C(collection), session.DB(database).C(tasks))
    lease := time.Hour
    
    for {
    
        task, err = m.Next(user, &lease)
        
        if task != nil {
            // unmarshal task.data and do some work.
        }
    
    }
}

Documentation

Overview

Package mogul provides distributed locking and task handling via mongodb.

Using mongo documents we can synchronize and do work over a number of nodes. The typical usecase would be to run a cron job or scheduler on all nodes and then perform task creation on a single node using a lock. These tasks can then be efficiently executed on all nodes that are alive.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func New

func New(locks *mgo.Collection, tasks *mgo.Collection) *Manager

New creates a new manager to perform locking and task management. You can cast it to TaskHandler or MutexCreator to limit the behaviour to one of the two subjects.

func (*Manager) Add

func (m *Manager) Add(name string, data []byte) error

Add a task, with a name (should be unique for different jobs) and some payload for the consumer to base his/her work on.

Example
session := initDB()
defer clearDB(session)

var m mogul.TaskHandler = mogul.New(session.DB(database).C(collection), session.DB(database).C(tasks))

payload := []byte("barfbarf")
user := "testUser"
name := "firstTask"

// create a task
m.Add(name, payload)

// the only task we just created should pop
task, _ := m.Next(user, nil)

fmt.Println(task)
Output:

func (*Manager) NewMutex

func (m *Manager) NewMutex(name string, user string) *Mutex

Use the new function to combine a mongo session with your lock's name and user to obtain a new mutex, which is not locked yet

func (*Manager) Next

func (m *Manager) Next(user string, leaseTime *time.Duration) (*Task, error)

Next picks the next available task. You can give a lease time after which the job is back up for grabs for other consumers if its not completed. next task is selected via random points in a 2D array that are assigned to all tasks: http://stackoverflow.com/questions/2824157/random-record-from-mongodb at bottom

type Mutex

type Mutex struct {
	// contains filtered or unexported fields
}

Mutex is the object used for locking

func (*Mutex) IsExpired

func (m *Mutex) IsExpired() bool

IsExpired will return true when your lock time has expired

func (*Mutex) TryLock

func (m *Mutex) TryLock(atMost time.Duration) (bool, error)

Trylock will claim a lock if it is available. It also returns true when you already hold the lock. This extends the duration if you already hold the lock.

func (*Mutex) Unlock

func (m *Mutex) Unlock() error

Unlock frees the lock, removing the corresponding record in the database

type MutexCreator

type MutexCreator interface {
	NewMutex(name string, user string) *Mutex
}

MutexCreator is a specialization of the Manager struct for locking

type Task

type Task struct {
	Name         string     `bson:"_id"`
	User         *string    `bson:"user,omitempty"`
	Data         []byte     `bson:"task"`
	ExpiresAtUtc *time.Time `bson:"expires,omitempty"`
	Doc          meta       `bson:",inline"`
	// contains filtered or unexported fields
}

Task is the entity we work with to regulate jobs. It consists of a name and a payload. If a task is claimed the user and optinal expiresAtUtc will be filled.

func (*Task) Complete

func (t *Task) Complete() error

If the task has been completed succesfully we remove it from the database.

func (*Task) Failed

func (t *Task) Failed() error

If the task failed we remove our claim on the task, to make it available again. The job will be run again. If you don't want this then call Complete.

type TaskHandler

type TaskHandler interface {
	Add(name string, data []byte) error
	Next(user string, leaseTime *time.Duration) (*Task, error)
}

You can cast the Manager object to a TaskHandler to have a dedicated object for modifying tasks.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL