slidingwindow: github.com/RussellLuo/slidingwindow Index | Examples | Files | Directories

package slidingwindow

import "github.com/RussellLuo/slidingwindow"

Code:

lim, _ := sw.NewLimiter(time.Second, 10, func() (sw.Window, sw.StopFunc) {
    // NewLocalWindow returns an empty stop function, so it's
    // unnecessary to call it later.
    return sw.NewLocalWindow()
})

ok := lim.Allow()
fmt.Printf("ok: %v\n", ok)

Output:

ok: true

Code:

package main

import (
    "fmt"
    "strconv"
    "time"

    sw "github.com/RussellLuo/slidingwindow"
    "github.com/go-redis/redis"
)

// RedisDatastore is a reference implementation of the Redis-based datastore,
// which can be used directly if you happen to use go-redis.
type RedisDatastore struct {
    client redis.Cmdable
    ttl    time.Duration
}

func NewRedisDatastore(client redis.Cmdable, ttl time.Duration) *RedisDatastore {
    return &RedisDatastore{client: client, ttl: ttl}
}

func (d *RedisDatastore) fullKey(key string, start int64) string {
    return fmt.Sprintf("%s@%d", key, start)
}

func (d *RedisDatastore) Add(key string, start, value int64) (int64, error) {
    k := d.fullKey(key, start)
    c, err := d.client.IncrBy(k, value).Result()
    if err != nil {
        return 0, err
    }
    // Ignore the possible error from EXPIRE command.
    d.client.Expire(k, d.ttl).Result() // nolint:errcheck
    return c, err
}

func (d *RedisDatastore) Get(key string, start int64) (int64, error) {
    k := d.fullKey(key, start)
    value, err := d.client.Get(k).Result()
    if err != nil {
        if err == redis.Nil {
            // redis.Nil is not an error, it only indicates the key does not exist.
            err = nil
        }
        return 0, err
    }
    return strconv.ParseInt(value, 10, 64)
}

func main() {
    size := time.Second
    store := NewRedisDatastore(
        redis.NewClient(&redis.Options{
            Addr: "localhost:6379",
        }),
        2*size, // twice of window-size is just enough.
    )

    lim, stop := sw.NewLimiter(size, 10, func() (sw.Window, sw.StopFunc) {
        return sw.NewSyncWindow("test", sw.NewBlockingSynchronizer(store, 500*time.Millisecond))
    })
    defer stop()

    ok := lim.Allow()
    fmt.Printf("ok: %v\n", ok)

}

Index

Examples

Package Files

slidingwindow.go synchronizer.go window.go

func NewLimiter Uses

func NewLimiter(size time.Duration, limit int64, newWindow NewWindow) (*Limiter, StopFunc)

NewLimiter creates a new limiter, and returns a function to stop the possible sync behaviour within the current window.

func NewLocalWindow Uses

func NewLocalWindow() (*LocalWindow, StopFunc)

func NewSyncWindow Uses

func NewSyncWindow(key string, syncer Synchronizer) (*SyncWindow, StopFunc)

NewSyncWindow creates an instance of SyncWindow with the given synchronizer.

type BlockingSynchronizer Uses

type BlockingSynchronizer struct {
    // contains filtered or unexported fields
}

BlockingSynchronizer does synchronization in a blocking mode and consumes no extra goroutine.

It's recommended to use BlockingSynchronizer in low-concurrency scenarios, either for higher accuracy, or for less goroutine consumption.

func NewBlockingSynchronizer Uses

func NewBlockingSynchronizer(store Datastore, syncInterval time.Duration) *BlockingSynchronizer

func (*BlockingSynchronizer) Start Uses

func (s *BlockingSynchronizer) Start()

func (*BlockingSynchronizer) Stop Uses

func (s *BlockingSynchronizer) Stop()

func (*BlockingSynchronizer) Sync Uses

func (s *BlockingSynchronizer) Sync(now time.Time, makeReq MakeFunc, handleResp HandleFunc)

Sync sends the window's count to the central datastore, and then update the window's count according to the response from the datastore.

type Datastore Uses

type Datastore interface {
    // Add adds delta to the count of the window represented
    // by start, and returns the new count.
    Add(key string, start, delta int64) (int64, error)

    // Get returns the count of the window represented by start.
    Get(key string, start int64) (int64, error)
}

Datastore represents the central datastore.

type HandleFunc Uses

type HandleFunc func(SyncResponse)

type Limiter Uses

type Limiter struct {
    // contains filtered or unexported fields
}

func (*Limiter) Allow Uses

func (lim *Limiter) Allow() bool

Allow is shorthand for AllowN(time.Now(), 1).

func (*Limiter) AllowN Uses

func (lim *Limiter) AllowN(now time.Time, n int64) bool

AllowN reports whether n events may happen at time now.

func (*Limiter) Limit Uses

func (lim *Limiter) Limit() int64

Limit returns the maximum events permitted to happen during one window size.

func (*Limiter) SetLimit Uses

func (lim *Limiter) SetLimit(newLimit int64)

SetLimit sets a new Limit for the limiter.

func (*Limiter) Size Uses

func (lim *Limiter) Size() time.Duration

Size returns the time duration of one window size. Note that the size is defined to be read-only, if you need to change the size, create a new limiter with a new size instead.

type LocalWindow Uses

type LocalWindow struct {
    // contains filtered or unexported fields
}

LocalWindow represents a window that ignores sync behavior entirely and only stores counters in memory.

func (*LocalWindow) AddCount Uses

func (w *LocalWindow) AddCount(n int64)

func (*LocalWindow) Count Uses

func (w *LocalWindow) Count() int64

func (*LocalWindow) Reset Uses

func (w *LocalWindow) Reset(s time.Time, c int64)

func (*LocalWindow) Start Uses

func (w *LocalWindow) Start() time.Time

func (*LocalWindow) Sync Uses

func (w *LocalWindow) Sync(now time.Time)

type MakeFunc Uses

type MakeFunc func() SyncRequest

type NewWindow Uses

type NewWindow func() (Window, StopFunc)

NewWindow creates a new window, and returns a function to stop the possible sync behaviour within it.

type NonblockingSynchronizer Uses

type NonblockingSynchronizer struct {
    // contains filtered or unexported fields
}

NonblockingSynchronizer does synchronization in a non-blocking mode. To achieve this, it needs to spawn a goroutine to exchange data with the central datastore.

It's recommended to always use NonblockingSynchronizer in high-concurrency scenarios.

func NewNonblockingSynchronizer Uses

func NewNonblockingSynchronizer(store Datastore, syncInterval time.Duration) *NonblockingSynchronizer

func (*NonblockingSynchronizer) Start Uses

func (s *NonblockingSynchronizer) Start()

func (*NonblockingSynchronizer) Stop Uses

func (s *NonblockingSynchronizer) Stop()

func (*NonblockingSynchronizer) Sync Uses

func (s *NonblockingSynchronizer) Sync(now time.Time, makeReq MakeFunc, handleResp HandleFunc)

Sync tries to send the window's count to the central datastore, or to update the window's count according to the response from the latest synchronization. Since the exchange with the datastore is always slower than the execution of Sync, usually Sync must be called at least twice to update the window's count finally.

type StopFunc Uses

type StopFunc func()

StopFunc stops the window's sync behaviour.

type SyncRequest Uses

type SyncRequest struct {
    Key     string
    Start   int64
    Count   int64
    Changes int64
}

type SyncResponse Uses

type SyncResponse struct {
    // Whether the synchronization succeeds.
    OK    bool
    Start int64
    // The changes accumulated by the local limiter.
    Changes int64
    // The total changes accumulated by all the other limiters.
    OtherChanges int64
}

type SyncWindow Uses

type SyncWindow struct {
    LocalWindow
    // contains filtered or unexported fields
}

SyncWindow represents a window that will sync counter data to the central datastore asynchronously.

Note that for the best coordination between the window and the synchronizer, the synchronization is not automatic but is driven by the call to Sync.

func (*SyncWindow) AddCount Uses

func (w *SyncWindow) AddCount(n int64)

func (*SyncWindow) Reset Uses

func (w *SyncWindow) Reset(s time.Time, c int64)

func (*SyncWindow) Sync Uses

func (w *SyncWindow) Sync(now time.Time)

type Synchronizer Uses

type Synchronizer interface {
    // Start starts the synchronization goroutine, if any.
    Start()

    // Stop stops the synchronization goroutine, if any, and waits for it to exit.
    Stop()

    // Sync sends a synchronization request.
    Sync(time.Time, MakeFunc, HandleFunc)
}

type Window Uses

type Window interface {
    // Start returns the start boundary.
    Start() time.Time

    // Count returns the accumulated count.
    Count() int64

    // AddCount increments the accumulated count by n.
    AddCount(n int64)

    // Reset sets the state of the window with the given settings.
    Reset(s time.Time, c int64)

    // Sync tries to exchange data between the window and the central
    // datastore at time now, to keep the window's count up-to-date.
    Sync(now time.Time)
}

Window represents a fixed-window.

Directories

PathSynopsis
testutil

Package slidingwindow imports 3 packages (graph) and is imported by 1 packages. Updated 2020-05-28. Refresh now. Tools for package owners.