Documentation ¶
Index ¶
Examples ¶
Constants ¶
const ( // LockWaitTime is how long we block for at a time to check if locker // acquisition is possible. This affects the minimum time it takes to cancel // a Lock acquisition. LockWaitTime = 2 * time.Second )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker can start long-running tasks (minutes to permanent) in Consul cluster.
func New ¶
New creates a broker that can start long-running tasks in Consul cluster.
Example ¶
package main import ( "context" "log" "os" "os/signal" "syscall" "time" "github.com/monetha/go-distributed/broker/consulbroker" ) func main() { defer log.Println("service stopped.") b, err := consulbroker.New(&consulbroker.Config{Address: "127.0.0.1:8500"}) if err != nil { log.Printf("New broker: %v", err) return } t, err := b.NewTask("some/long/running/task", someLongRunningTask) if err != nil { log.Printf("New task: %v", err) return } defer t.Close() t2, err := b.NewTask("other/long/running/task", otherLongRunningTask) if err != nil { log.Printf("New task: %v", err) return } defer t2.Close() sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT) <-sigChan } func someLongRunningTask(ctx context.Context) error { log.Println("someLongRunningTask: I'm alive!!") ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: log.Println("someLongRunningTask: Doing some interesting stuff..") case <-ctx.Done(): log.Println("someLongRunningTask: Oh, no! I should stop my work.") return ctx.Err() } } } func otherLongRunningTask(ctx context.Context) error { log.Println("otherLongRunningTask: BANG BANG!!") select { case <-time.After(10 * time.Second): log.Println("otherLongRunningTask: BOOO!!") return nil case <-ctx.Done(): log.Println("otherLongRunningTask: good bye") return ctx.Err() } }
Output:
type Config ¶
type Config struct { // Address is the address of the Consul server Address string // Scheme is the URI scheme for the Consul server Scheme string // Token is used to provide a per-request ACL token Token string }
Config is used to configure the creation of a broker
type Locker ¶
type Locker struct {
// contains filtered or unexported fields
}
Locker wraps Consul distributed lock by implementing Locker interface.
func (*Locker) Lock ¶
Lock attempts to acquire the locker and blocks while doing so. Providing a non-nil stopCh can be used to abort the locker attempt. Returns a channel that is closed if our locker is lost or an error. This channel could be closed at any time due to session invalidation, communication errors, operator intervention, etc. It is NOT safe to assume that the locker is held until Unlock(), application must be able to handle the locker being lost.