bucketo

package module
v0.0.0-...-feb7e2c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 23, 2023 License: MIT Imports: 7 Imported by: 0

README

bucketo

test codecov Go Report Card

Introduction

bucketo is a package to help you building token buckets, it's designed to be very flexible yet simple! You can just start using the following code:

package main

import (
	"fmt"
	"github.com/thepabloaguilar/bucketo"
)

func main() {
	// Creates a bucket with 10 of capacity
	bucket := bucketo.NewBucket(10)

	// Consumes 1 token from the bucket, we're ignoring the error return, but you shouldn't
	ok, _ := bucket.Consume(1)
	fmt.Println("Was consumed ->", ok) // true
	
	currentTokens, _ := bucket.AvailableTokens()
	fmt.Println("Current tokens ->", currentTokens) // 9
	
	// Try to consume 10 tokens, but we have only 9 available
	ok, _ = bucket.Consume(10)
	fmt.Println("Was consumed ->", ok) // false

	currentTokens, _ = bucket.AvailableTokens()
	fmt.Println("Current tokens ->", currentTokens) // 9
}

For more information on how to configure you're bucket read the next sessions.

Features

Tokens Storage

Tokens Storage is the interface where your bucket get and set the tokens amount, so far this package only provides one token storage but you can implement other storages by respecting the TokenStorage interface.

InMemoryTokenStorage

This is the default token storage when you don't specify any, it just stores the token amount in memory.

package main

import (
	"fmt"
	"github.com/thepabloaguilar/bucketo"
)

func main() {
	bucketCapacity := int64(100)
	bucket := bucketo.NewBucket(
		bucketCapacity,
		bucketo.WithStorage(bucketo.NewInMemoryTokenStorage(bucketCapacity)), // You can also pass it explicit
	)

	fmt.Println(bucket.AvailableTokens()) // 100
}
Consume Strategy

All the meaning of having a bucket is to consume it, the way you can consume the tokens change depending on your requirements. By default, the bucket uses the Static Consume strategy where the static consume number is 1.

Static Consume

Static consume strategy will return the same number every time the Consume method is called.

package main

import (
	"fmt"
	"github.com/thepabloaguilar/bucketo"
)

func main() {
	consumeStrategy, _ := bucketo.NewStaticConsume(2) // It will always return 2 as the token number to consume
	bucket := bucketo.NewBucket(100, bucketo.WithConsumeStrategy(consumeStrategy))

	fmt.Println(bucket.AvailableTokens()) // 100

	// Note we're passing `nil` to the Consume method because the consume strategy does not need any info.
	_, _ = bucket.Consume(nil)
	fmt.Println(bucket.AvailableTokens()) // 98
}
Dynamic Consume

Sometimes we want to consume a non-static number of tokens depending on some variables, this is where the Dynamic Consume comes to play as it allows we to pass the number of tokens to consume.

package main

import (
	"fmt"
	"github.com/thepabloaguilar/bucketo"
)

func main() {
	bucket := bucketo.NewBucket(
		100,
		bucketo.WithConsumeStrategy(bucketo.NewDynamicConsume()),
	)

	fmt.Println(bucket.AvailableTokens()) // 100

	// Note we're passing `10` to the Consume method, this number is passed to the consume strategy.
	_, _ = bucket.Consume(10)
	fmt.Println(bucket.AvailableTokens()) // 90
}
Expression Consume

When you want to dynamic consume tokens and have expressions to express your requirements you can delegate this part to the Expression Consume strategy implementation. Let's see how it works:

package main

import (
	"fmt"
	"github.com/thepabloaguilar/bucketo"
)

func main() {
	// The expressions it'll be used by the consume strategy
	expressions := []bucketo.ConsumeExpression{
		{
			Expression: "my_argument == 1",
			Tokens:     50,
		},
		{
			Expression: `my_second_argument == "a"`,
			Tokens:     40,
		},
		{
			Expression: "true", // This can be considered a default case since it'll always return true
			Tokens:     10,
		},
	}
	consumeStrategy, _ := bucketo.NewExpressionsConsume(expressions)
	bucket := bucketo.NewBucket(100, bucketo.WithConsumeStrategy(consumeStrategy))

	fmt.Println(bucket.AvailableTokens()) // 100

	// Note we're passing a map to the Consume method which is passed to the consume strategy.
	// The value we pass here will be used as variables to evaluate the expressions, here we're
	// passing a map but it could be a struct.
	// The consume strategy will match the first expression in the list.
	//
	// For more information on how to use/build the expressions see https://expr.medv.io
	_, _ = bucket.Consume(map[string]int{
		"my_argument": 1,
	})
	fmt.Println(bucket.AvailableTokens()) // 50

	// Passing an empty map here will make all the expressions failing but the last, our default one.
	_, _ = bucket.Consume(map[string]int{})
	fmt.Println(bucket.AvailableTokens()) // 40

	// It'll match the second expression
	_, _ = bucket.Consume(map[string]string{
		"my_second_argument": "a",
	})
	fmt.Println(bucket.AvailableTokens()) // 0
}
Refillers

The other most important part of a bucket is also to return the tokens to our bucket, you can do it manually by calling the AddTokens method but you can also automate it. This package provides a time refiller.

Time Refiller

The Time Refiller will notify the bucket to add more tokens periodically, we can specify the time rate we will refill and how much to refill.

package main

import (
	"context"
	"fmt"
	"github.com/thepabloaguilar/bucketo"
	"time"
)

func main() {
	consumeStrategy, _ := bucketo.NewStaticConsume(10)
	timeRefiller := bucketo.NewTimeRefiller(10, time.Second) // It'll refill 10 tokens per second
	bucket := bucketo.NewBucket(
		100,
		bucketo.WithConsumeStrategy(consumeStrategy),
		bucketo.WithRefiller(timeRefiller),
	)

	// IMPORTANT: IT'S VERY IMPORTANT TO CALL THE START METHOD
	// The Start method start all the refillers and start listening to them.
	_ = bucket.Start(context.Background())

	fmt.Println(bucket.AvailableTokens()) // 90

	// After 1 second
	fmt.Println(bucket.AvailableTokens()) // 100
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoExpressionMatched         = errors.New("no expression matched")
	ErrTokensToConsumeNegative     = errors.New("tokens to consume should be positive or zero")
	ErrConsumeArgumentIsNotInteger = errors.New("consume strategy argument is not an integer")
)

Functions

This section is empty.

Types

type Bucket

type Bucket struct {
	// contains filtered or unexported fields
}

func NewBucket

func NewBucket(capacity int64, opts ...BucketOpt) *Bucket

func (*Bucket) AddTokens

func (b *Bucket) AddTokens(tokens int64) error

func (*Bucket) AvailableTokens

func (b *Bucket) AvailableTokens() (int64, error)

func (*Bucket) Consume

func (b *Bucket) Consume(args any) (bool, error)

func (*Bucket) Start

func (b *Bucket) Start(ctx context.Context) error

func (*Bucket) Stop

func (b *Bucket) Stop() error

type BucketOpt

type BucketOpt func(b *Bucket)

func WithConsumeStrategy

func WithConsumeStrategy(strategy ConsumeStrategy) BucketOpt

func WithRefiller

func WithRefiller(refiller Refiller) BucketOpt

func WithStorage

func WithStorage(storage TokensStorage) BucketOpt

type ConsumeExpression

type ConsumeExpression struct {
	Expression string
	Tokens     int64
	// contains filtered or unexported fields
}

type ConsumeStrategy

type ConsumeStrategy func(args any) (int64, error)

ConsumeStrategy is a function that determines how many tokens should be consumed.

func NewDynamicConsume

func NewDynamicConsume() ConsumeStrategy

NewDynamicConsume returns the same number it receives as argument, if the argument is not an int64 it'll return an error.

func NewExpressionsConsume

func NewExpressionsConsume(consumeExpressions []ConsumeExpression) (ConsumeStrategy, error)

NewExpressionsConsume allows you to dynamically consume tokens based on expressions, the expressions can relly on the arguments passed to the strategy, and they always must return a boolean value. Expressions are evaluated in the same order from the list, from the first to the last.

expressions := []ConsumeExpression{
	{Expression: "my_argument == 1", Tokens: 50},
	{Expression: `my_second_argument == "a"`, Tokens: 40},
	{Expression: "true", Tokens: 100}, // This can be considered a default case since it'll always return true
}

consumeStrategy, err := NewExpressionsConsume(expressions)
if err != nil {
	panic(err)
}

For more information on how to use/build the expressions see https://expr.medv.io

func NewStaticConsume

func NewStaticConsume(tokens int64) (ConsumeStrategy, error)

NewStaticConsume returns a static consume strategy which means the same token will always be returned. For example, NewStaticConsume(1) will always return 1 (one) when called to get the tokens to consume.

type InMemoryTokenStorage

type InMemoryTokenStorage struct {
	// contains filtered or unexported fields
}

func NewInMemoryTokenStorage

func NewInMemoryTokenStorage(initialTokens int64) *InMemoryTokenStorage

func (*InMemoryTokenStorage) GetTokens

func (s *InMemoryTokenStorage) GetTokens() (int64, error)

func (*InMemoryTokenStorage) SetTokens

func (s *InMemoryTokenStorage) SetTokens(tokens int64) error

type Refiller

type Refiller interface {
	StartRefiller(ctx context.Context, tokensChannel chan<- int64) error
	StopRefiller() error
}

Refiller is likely a background job to refill a bucket automatically.

type TimeRefiller

type TimeRefiller struct {
	// contains filtered or unexported fields
}

func NewTimeRefiller

func NewTimeRefiller(refillRate int64, refillRateTime time.Duration) *TimeRefiller

func (*TimeRefiller) StartRefiller

func (r *TimeRefiller) StartRefiller(ctx context.Context, tokensChannel chan<- int64) error

StartRefiller starts a background job and for each time waited iteration it sends the refill number to the tokens channel.

func (*TimeRefiller) StopRefiller

func (r *TimeRefiller) StopRefiller() error

StopRefiller stops the background job. CAUTION: Just call this method once after starting otherwise it'll block your program.

type TokensStorage

type TokensStorage interface {
	GetTokens() (int64, error)
	SetTokens(tokens int64) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL