semaphore: github.com/kamilsk/semaphore Index | Examples | Files | Directories

package semaphore

import "github.com/kamilsk/semaphore"

Package semaphore provides an implementation of Semaphore pattern with timeout of lock/unlock operations based on channels.

This example shows how to limit request throughput.

Code:

limiter := func(limit int, timeout time.Duration, handler http.HandlerFunc) http.HandlerFunc {
    throughput := semaphore.New(limit)
    return func(rw http.ResponseWriter, req *http.Request) {
        deadline := semaphore.WithTimeout(timeout)

        release, err := throughput.Acquire(deadline)
        if err != nil {
            http.Error(rw, err.Error(), http.StatusTooManyRequests)
            return
        }
        defer release()

        handler.ServeHTTP(rw, req)
    }
}

var race int
ts := httptest.NewServer(limiter(1, sla, http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
    // do some limited work
    race++
})))
defer ts.Close()

ok, fail := sendParallelHTTPRequestsToURL(5, ts.URL)

fmt.Printf("success: %d, failure: %d, race: %t \n", ok, fail, race != 5)

Output:

success: 5, failure: 0, race: false

This example shows how to follow SLA.

Code:

limiter := semaphore.New(2)

// start http server to handle parallel requests
ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
    done := make(chan struct{})
    deadline := semaphore.WithTimeout(sla)

    go func() {
        release, err := limiter.Acquire(deadline)
        if err != nil {
            return
        }
        defer release()
        defer close(done)

        // do some heavy work
        time.Sleep(40 * time.Millisecond)
    }()

    // wait what happens before
    select {
    case <-deadline:
        http.Error(rw, "operation timeout", http.StatusGatewayTimeout)
    case <-done:
        // send success response
        rw.Header().Set("Content-Type", "text/plain; charset=utf-8")
        rw.WriteHeader(http.StatusOK)
    }
}))
defer ts.Close()

ok, fail := sendParallelHTTPRequestsToURL(5, ts.URL)

fmt.Printf("success: %d, failure: %d \n", ok, fail)

Output:

success: 2, failure: 3

This example shows how to create a pool of workers based on the semaphore.

Code:

package main

import (
    "fmt"
    "sync"
    "sync/atomic"

    "github.com/kamilsk/semaphore"
)

type Pool struct {
    sem  semaphore.Semaphore
    work chan func()
}

func (p *Pool) Schedule(task func()) {
    select {
    case p.work <- task:
    case release := <-p.sem.Signal(nil):
        go p.worker(task, release)
    }
}

func (p *Pool) worker(task func(), release semaphore.ReleaseFunc) {
    defer release()
    var ok bool
    for {
        task()
        task, ok = <-p.work
        if !ok {
            return
        }
    }
}

func New(size int) *Pool {
    return &Pool{
        sem:  semaphore.New(size),
        work: make(chan func()),
    }
}

// This example shows how to create a pool of workers based on the semaphore.
func main() {
    var ok, fail int32 = 0, 5

    wg := &sync.WaitGroup{}
    do := func() {
        atomic.AddInt32(&ok, 1)
        atomic.AddInt32(&fail, -1)
        wg.Done()
    }
    pool := New(int(fail / 2))

    wg.Add(int(fail))
    for i, total := 0, int(fail); i < total; i++ {
        pool.Schedule(do)
    }
    wg.Wait()

    fmt.Printf("success: %d, failure: %d \n", ok, fail)
}

This example shows how to use context and semaphore together.

Code:

deadliner := func(limit int, timeout time.Duration, handler http.HandlerFunc) http.HandlerFunc {
    throughput := semaphore.New(limit)
    return func(rw http.ResponseWriter, req *http.Request) {
        ctx := semaphore.WithContext(req.Context(), semaphore.WithTimeout(timeout))

        release, err := throughput.Acquire(ctx.Done())
        if err != nil {
            http.Error(rw, err.Error(), http.StatusGatewayTimeout)
            return
        }
        defer release()

        handler.ServeHTTP(rw, req.WithContext(ctx))
    }
}

ts := httptest.NewServer(deadliner(2, sla, http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
    // context deadline is expected
    select {
    case <-req.Context().Done():
        rw.WriteHeader(http.StatusGatewayTimeout)
    case <-time.After(2 * sla):
        rw.WriteHeader(http.StatusOK)
    }
})))
defer ts.Close()

ok, fail := sendParallelHTTPRequestsToURL(5, ts.URL)

fmt.Printf("success: %d, failure: %d \n", ok, fail)

Output:

success: 0, failure: 5

This example shows how to create user-specific rate limiter.

Code:

// +build go1.7

package main

import (
    "context"
    "fmt"
    "net/http"
    "net/http/httptest"
    "runtime"
    "strconv"
    "sync"
    "time"

    "github.com/kamilsk/semaphore"
)

// User represents user ID.
type User int

// Config contains abstract configuration fields.
type Config struct {
    DefaultCapacity  int
    DefaultRateLimit time.Duration
    DefaultUser      User
    Capacity         map[User]int
    RateLimit        map[User]time.Duration
    SLA              time.Duration
}

// This variables can be a part of limiter provider service.
var (
    mx       sync.RWMutex
    limiters = make(map[User]semaphore.Semaphore)
)

// LimiterForUser returns limiter for user found in request context.
func LimiterForUser(user User, cnf Config) semaphore.Semaphore {
    mx.RLock()
    limiter, ok := limiters[user]
    mx.RUnlock()
    if !ok {
        mx.Lock()
        limiter, ok = limiters[user]
        if !ok {
            c, ok := cnf.Capacity[user]
            if !ok {
                c = cnf.DefaultCapacity
            }
            limiter = semaphore.New(c)
            limiters[user] = limiter
        }
        mx.Unlock()
    }
    return limiter
}

// RateLimiter performs rate limitation.
func RateLimiter(cnf Config, handler http.HandlerFunc) http.HandlerFunc {
    return func(rw http.ResponseWriter, req *http.Request) {
        user, ok := req.Context().Value("user").(User)
        if !ok {
            user = cnf.DefaultUser
        }

        limiter := LimiterForUser(user, cnf)
        release, err := limiter.Acquire(semaphore.WithTimeout(cnf.SLA))
        if err != nil {
            http.Error(rw, "operation timeout", http.StatusGatewayTimeout)
            return
        }

        go func() { handler.ServeHTTP(rw, req) }()

        rl, ok := cnf.RateLimit[user]
        if !ok {
            rl = cnf.DefaultRateLimit
        }
        time.Sleep(rl)
        release()
    }
}

// UserToContext gets user ID from request header and puts it into request context.
func UserToContext(cnf Config, handler http.HandlerFunc) http.HandlerFunc {
    return func(rw http.ResponseWriter, req *http.Request) {
        var user User = cnf.DefaultUser

        if id := req.Header.Get("user"); id != "" {
            i, err := strconv.Atoi(id)
            if err == nil {
                user = User(i)
            }
        }

        handler.ServeHTTP(rw, req.WithContext(context.WithValue(req.Context(), "user", user)))
    }
}

// This example shows how to create user-specific rate limiter.
func main() {
    var cnf Config = Config{
        DefaultUser:      1,
        DefaultCapacity:  runtime.GOMAXPROCS(0),
        DefaultRateLimit: 10 * time.Millisecond,
        Capacity:         map[User]int{1: 1},
        RateLimit:        map[User]time.Duration{1: 100 * time.Millisecond},
        SLA:              time.Second,
    }

    ts := httptest.NewServer(RateLimiter(cnf, UserToContext(cnf, func(rw http.ResponseWriter, req *http.Request) {})))
    defer ts.Close()

    start := time.Now()
    ok, fail := sendParallelHTTPRequestsToURL(2, ts.URL)
    end := time.Now()

    fmt.Printf("success: %d, failure: %d, elapsed: ~%d ms \n", ok, fail, (end.Sub(start).Nanoseconds()/100000000)*100)
}

Index

Examples

Package Files

channel.go context.go default.go semaphore.go

func Capacity Uses

func Capacity() int

Capacity returns a capacity of the default semaphore.

func IsEmpty Uses

func IsEmpty(err error) bool

IsEmpty checks if passed error is related to call Release on empty semaphore.

func IsTimeout Uses

func IsTimeout(err error) bool

IsTimeout checks if passed error is related to call Acquire on full semaphore.

func Multiplex Uses

func Multiplex(channels ...<-chan struct{}) <-chan struct{}

Multiplex combines multiple empty struct channels into one. TODO can be leaky, https://github.com/kamilsk/semaphore/issues/133

func Occupied Uses

func Occupied() int

Occupied returns a current number of occupied slots of the default semaphore.

func Release Uses

func Release() error

Release releases the previously occupied slot of the default semaphore.

func Signal Uses

func Signal(deadline <-chan struct{}) <-chan ReleaseFunc

Signal returns a channel to send to it release function only if Acquire is successful. In any case, the channel will be closed.

func WithContext Uses

func WithContext(parent context.Context, deadline <-chan struct{}) context.Context

WithContext returns Context with cancellation based on empty struct channel.

func WithDeadline Uses

func WithDeadline(deadline time.Time) <-chan struct{}

WithDeadline returns empty struct channel above on `time.Timer` channel. TODO can be leaky, https://github.com/kamilsk/semaphore/issues/133

func WithSignal Uses

func WithSignal(s os.Signal) <-chan struct{}

WithSignal returns empty struct channel above on `os.Signal` channel. TODO can be leaky, https://github.com/kamilsk/semaphore/issues/133

func WithTimeout Uses

func WithTimeout(timeout time.Duration) <-chan struct{}

WithTimeout returns empty struct channel above on `time.Timer` channel. TODO can be leaky, https://github.com/kamilsk/semaphore/issues/133

type HealthChecker Uses

type HealthChecker interface {
    // Capacity returns a capacity of a semaphore.
    // It must be safe to call Capacity concurrently on a single semaphore.
    Capacity() int
    // Occupied returns a current number of occupied slots.
    // It must be safe to call Occupied concurrently on a single semaphore.
    Occupied() int
}

HealthChecker defines helpful methods related with semaphore status.

type ReleaseFunc Uses

type ReleaseFunc func()

ReleaseFunc tells a semaphore to release the previously occupied slot and ignore an error if it occurs.

func Acquire Uses

func Acquire(deadline <-chan struct{}) (ReleaseFunc, error)

Acquire tries to reduce the number of available slots of the default semaphore for 1. The operation can be canceled using deadline channel. In this case, it returns an appropriate error.

func (ReleaseFunc) Release Uses

func (f ReleaseFunc) Release() error

Release calls f().

type Releaser Uses

type Releaser interface {
    // Release releases the previously occupied slot.
    // If no places were occupied, then it returns an appropriate error.
    // It must be safe to call Release concurrently on a single semaphore.
    Release() error
}

Releaser defines a method to release the previously occupied semaphore.

type Semaphore Uses

type Semaphore interface {
    HealthChecker
    Releaser

    // Acquire tries to reduce the number of available slots for 1.
    // The operation can be canceled using context. In this case,
    // it returns an appropriate error.
    // It must be safe to call Acquire concurrently on a single semaphore.
    Acquire(deadline <-chan struct{}) (ReleaseFunc, error)
    // Signal returns a channel to send to it release function
    // only if Acquire is successful. In any case, the channel will be closed.
    Signal(deadline <-chan struct{}) <-chan ReleaseFunc
}

Semaphore provides the functionality of the same named pattern.

func New Uses

func New(capacity int) Semaphore

New constructs a new thread-safe Semaphore with the given capacity.

Directories

PathSynopsis
cmd/semaphore

Package semaphore imports 7 packages (graph) and is imported by 1 packages. Updated 2018-10-22. Refresh now. Tools for package owners.