semaphore

package module
v0.0.0-...-a5ea77e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2019 License: MIT Imports: 7 Imported by: 0

README

🚦 semaphore

Semaphore pattern implementation with timeout of lock/unlock operations based on channels.

Awesome Patreon Build Status Code Coverage Code Quality GoDoc Research License

Usage

Quick start
limiter := semaphore.New(1000)

http.HandleFunc("/", func(rw http.ResponseWriter, _ *http.Request) {
	if _, err := limiter.Acquire(semaphore.WithTimeout(time.Minute)); err != nil {
		http.Error(rw, http.StatusText(http.StatusTooManyRequests), http.StatusTooManyRequests)
		return
	}
	defer limiter.Release()
	// handle request
})

log.Fatal(http.ListenAndServe(":80", nil))
Console tool for command execution in parallel

This example shows how to execute many console commands in parallel.

$ semaphore create 2
$ semaphore add -- docker build
$ semaphore add -- vagrant up
$ semaphore add -- ansible-playbook
$ semaphore wait --timeout=1m --notify

asciicast

See more details here.

HTTP response time limitation

This example shows how to follow SLA.

sla := 100 * time.Millisecond
sem := semaphore.New(1000)

http.Handle("/do-with-timeout", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
	done := make(chan struct{})
	deadline := semaphore.WithTimeout(sla)

	go func() {
		release, err := sem.Acquire(deadline)
		if err != nil {
			return
		}
		defer release()
		defer close(done)

		// do some heavy work
	}()

	// wait what happens before
	select {
	case <-deadline:
		http.Error(rw, "operation timeout", http.StatusGatewayTimeout)
	case <-done:
		// send success response
	}
}))
HTTP request throughput limitation

This example shows how to limit request throughput.

limiter := func(limit int, timeout time.Duration, handler http.HandlerFunc) http.HandlerFunc {
	throughput := semaphore.New(limit)
	return func(rw http.ResponseWriter, req *http.Request) {
		deadline := semaphore.WithTimeout(timeout)

		release, err := throughput.Acquire(deadline)
		if err != nil {
			http.Error(rw, err.Error(), http.StatusTooManyRequests)
			return
		}
		defer release()

		handler.ServeHTTP(rw, req)
	}
}

http.HandleFunc("/do-with-limit", limiter(1000, time.Minute, func(rw http.ResponseWriter, req *http.Request) {
	// do some limited work
}))
HTTP personal rate limitation

This example shows how to create user-specific rate limiter.

func LimiterForUser(user User, cnf Config) semaphore.Semaphore {
	mx.RLock()
	limiter, ok := limiters[user]
	mx.RUnlock()
	if !ok {
		mx.Lock()
		// handle negative case
		mx.Unlock()
	}
	return limiter
}

func RateLimiter(cnf Config, handler http.HandlerFunc) http.HandlerFunc {
	return func(rw http.ResponseWriter, req *http.Request) {
		user, ok := // get user from request context

		limiter := LimiterForUser(user, cnf)
		release, err := limiter.Acquire(semaphore.WithTimeout(cnf.SLA))
		if err != nil {
			http.Error(rw, err.Error(), http.StatusGatewayTimeout)
			return
		}

		// handle the request in separated goroutine because the current will be held
		go func() { handler.ServeHTTP(rw, req) }()

		// hold the place for a required time
		rl, ok := cnf.RateLimit[user]
		if !ok {
			rl = cnf.DefaultRateLimit
		}
		time.Sleep(rl)
		release()
		// rate limit = semaphore capacity / rate limit time, e.g. 10 request per second 
	}
}

http.HandleFunc("/do-with-rate-limit", RateLimiter(cnf, func(rw http.ResponseWriter, req *http.Request) {
	// do some rate limited work
}))
Use context for cancellation

This example shows how to use context and semaphore together.

deadliner := func(limit int, timeout time.Duration, handler http.HandlerFunc) http.HandlerFunc {
	throughput := semaphore.New(limit)
	return func(rw http.ResponseWriter, req *http.Request) {
		ctx := semaphore.WithContext(req.Context(), semaphore.WithTimeout(timeout))

		release, err := throughput.Acquire(ctx.Done())
		if err != nil {
			http.Error(rw, err.Error(), http.StatusGatewayTimeout)
			return
		}
		defer release()

		handler.ServeHTTP(rw, req.WithContext(ctx))
	}
}

http.HandleFunc("/do-with-deadline", deadliner(1000, time.Minute, func(rw http.ResponseWriter, req *http.Request) {
	// do some limited work
}))
A pool of workers

This example shows how to create a pool of workers based on semaphore.

type Pool struct {
	sem  semaphore.Semaphore
	work chan func()
}

func (p *Pool) Schedule(task func()) {
	select {
	case p.work <- task: // delay the task to already running workers
	case release, ok := <-p.sem.Signal(nil): if ok { go p.worker(task, release) } // ok is always true in this case
	}
}

func (p *Pool) worker(task func(), release semaphore.ReleaseFunc) {
	defer release()
	var ok bool
	for {
		task()
		task, ok = <-p.work
		if !ok { return }
	}
}

func New(size int) *Pool {
	return &Pool{
		sem:  semaphore.New(size),
		work: make(chan func()),
	}
}

func main() {
	pool := New(2)
	pool.Schedule(func() { /* do some work */ })
	...
	pool.Schedule(func() { /* do some work */ })
}
Interrupt execution
interrupter := semaphore.Multiplex(
	semaphore.WithTimeout(time.Second),
	semaphore.WithSignal(os.Interrupt),
)
sem := semaphore.New(runtime.GOMAXPROCS(0))
_, err := sem.Acquire(interrupter)
if err == nil {
	panic("press Ctrl+C")
}
// successful interruption

Installation

$ go get github.com/kamilsk/semaphore
$ # or use mirror
$ egg bitbucket.org/kamilsk/semaphore

egg1 is an extended go get.

Update

This library is using SemVer for versioning, and it is not BC-safe. Therefore, do not use go get -u to update it, use dep, glide or something similar for this purpose.

1 The project is still in prototyping.


Gitter

made with ❤️ by OctoLab

Documentation

Overview

Package semaphore provides an implementation of Semaphore pattern with timeout of lock/unlock operations based on channels.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Capacity

func Capacity() int

Capacity returns a capacity of the default semaphore.

func IsEmpty

func IsEmpty(err error) bool

IsEmpty checks if passed error is related to call Release on empty semaphore.

func IsTimeout

func IsTimeout(err error) bool

IsTimeout checks if passed error is related to call Acquire on full semaphore.

func Multiplex

func Multiplex(channels ...<-chan struct{}) <-chan struct{}

Multiplex combines multiple empty struct channels into one. TODO can be leaky, https://github.com/kamilsk/semaphore/issues/133

func Occupied

func Occupied() int

Occupied returns a current number of occupied slots of the default semaphore.

func Release

func Release() error

Release releases the previously occupied slot of the default semaphore.

func Signal

func Signal(deadline <-chan struct{}) <-chan ReleaseFunc

Signal returns a channel to send to it release function only if Acquire is successful. In any case, the channel will be closed.

func WithContext

func WithContext(parent context.Context, deadline <-chan struct{}) context.Context

WithContext returns Context with cancellation based on empty struct channel.

func WithDeadline

func WithDeadline(deadline time.Time) <-chan struct{}

WithDeadline returns empty struct channel above on `time.Timer` channel. TODO can be leaky, https://github.com/kamilsk/semaphore/issues/133

func WithSignal

func WithSignal(s os.Signal) <-chan struct{}

WithSignal returns empty struct channel above on `os.Signal` channel. TODO can be leaky, https://github.com/kamilsk/semaphore/issues/133

func WithTimeout

func WithTimeout(timeout time.Duration) <-chan struct{}

WithTimeout returns empty struct channel above on `time.Timer` channel. TODO can be leaky, https://github.com/kamilsk/semaphore/issues/133

Types

type HealthChecker

type HealthChecker interface {
	// Capacity returns a capacity of a semaphore.
	// It must be safe to call Capacity concurrently on a single semaphore.
	Capacity() int
	// Occupied returns a current number of occupied slots.
	// It must be safe to call Occupied concurrently on a single semaphore.
	Occupied() int
}

HealthChecker defines helpful methods related with semaphore status.

type ReleaseFunc

type ReleaseFunc func()

ReleaseFunc tells a semaphore to release the previously occupied slot and ignore an error if it occurs.

func Acquire

func Acquire(deadline <-chan struct{}) (ReleaseFunc, error)

Acquire tries to reduce the number of available slots of the default semaphore for 1. The operation can be canceled using deadline channel. In this case, it returns an appropriate error.

func (ReleaseFunc) Release

func (f ReleaseFunc) Release() error

Release calls f().

type Releaser

type Releaser interface {
	// Release releases the previously occupied slot.
	// If no places were occupied, then it returns an appropriate error.
	// It must be safe to call Release concurrently on a single semaphore.
	Release() error
}

Releaser defines a method to release the previously occupied semaphore.

type Semaphore

type Semaphore interface {
	HealthChecker
	Releaser

	// Acquire tries to reduce the number of available slots for 1.
	// The operation can be canceled using context. In this case,
	// it returns an appropriate error.
	// It must be safe to call Acquire concurrently on a single semaphore.
	Acquire(deadline <-chan struct{}) (ReleaseFunc, error)
	// Signal returns a channel to send to it release function
	// only if Acquire is successful. In any case, the channel will be closed.
	Signal(deadline <-chan struct{}) <-chan ReleaseFunc
}

Semaphore provides the functionality of the same named pattern.

func New

func New(capacity int) Semaphore

New constructs a new thread-safe Semaphore with the given capacity.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL