consulbroker

package
v0.0.0-...-16a880a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 18, 2018 License: MPL-2.0 Imports: 5 Imported by: 1

Documentation

Index

Examples

Constants

View Source
const (
	// LockWaitTime is how long we block for at a time to check if locker
	// acquisition is possible. This affects the minimum time it takes to cancel
	// a Lock acquisition.
	LockWaitTime = 2 * time.Second
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker can start long-running tasks (minutes to permanent) in Consul cluster.

func New

func New(config *Config) (*Broker, error)

New creates a broker that can start long-running tasks in Consul cluster.

Example
package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/monetha/go-distributed/broker/consulbroker"
)

func main() {
	defer log.Println("service stopped.")

	b, err := consulbroker.New(&consulbroker.Config{Address: "127.0.0.1:8500"})
	if err != nil {
		log.Printf("New broker: %v", err)
		return
	}

	t, err := b.NewTask("some/long/running/task", someLongRunningTask)
	if err != nil {
		log.Printf("New task: %v", err)
		return
	}
	defer t.Close()

	t2, err := b.NewTask("other/long/running/task", otherLongRunningTask)
	if err != nil {
		log.Printf("New task: %v", err)
		return
	}
	defer t2.Close()

	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT)
	<-sigChan
}

func someLongRunningTask(ctx context.Context) error {
	log.Println("someLongRunningTask: I'm alive!!")

	ticker := time.NewTicker(10 * time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			log.Println("someLongRunningTask: Doing some interesting stuff..")
		case <-ctx.Done():
			log.Println("someLongRunningTask: Oh, no! I should stop my work.")
			return ctx.Err()
		}
	}
}

func otherLongRunningTask(ctx context.Context) error {
	log.Println("otherLongRunningTask: BANG BANG!!")

	select {
	case <-time.After(10 * time.Second):
		log.Println("otherLongRunningTask: BOOO!!")
		return nil
	case <-ctx.Done():
		log.Println("otherLongRunningTask: good bye")
		return ctx.Err()
	}
}
Output:

func (*Broker) NewTask

func (b *Broker) NewTask(key string, fun task.Func) (*task.Task, error)

NewTask creates new long-running task in Consul cluster. Task makes a best effort to ensure that exactly one instance of a task is executing in a cluster. Task may be re-started when needed until it's been closed.

type Config

type Config struct {
	// Address is the address of the Consul server
	Address string

	// Scheme is the URI scheme for the Consul server
	Scheme string

	// Token is used to provide a per-request ACL token
	Token string
}

Config is used to configure the creation of a broker

type Locker

type Locker struct {
	// contains filtered or unexported fields
}

Locker wraps Consul distributed lock by implementing Locker interface.

func NewLocker

func NewLocker(client *api.Client, key string, lockWaitTime time.Duration) *Locker

NewLocker creates new Locker instance.

func (*Locker) Key

func (l *Locker) Key() string

Key returns the name of locker.

func (*Locker) Lock

func (l *Locker) Lock(stopCh <-chan struct{}) (<-chan struct{}, error)

Lock attempts to acquire the locker and blocks while doing so. Providing a non-nil stopCh can be used to abort the locker attempt. Returns a channel that is closed if our locker is lost or an error. This channel could be closed at any time due to session invalidation, communication errors, operator intervention, etc. It is NOT safe to assume that the locker is held until Unlock(), application must be able to handle the locker being lost.

func (*Locker) Unlock

func (l *Locker) Unlock() error

Unlock released the locker. It is an error to call this if the locker is not currently held.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL