ratebroker

package module
v0.0.0-...-1d0540f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2023 License: MIT Imports: 16 Imported by: 0

README

Rate Broker Go Reference

RateBroker

RateBroker is a rate limiting library written in Go. It uses a Limiter to enforce rate limits and a MessageBroker to allow for distributed limiting.

The library also provides a way to configure the RateBroker with various options, such as setting the maximum number of requests, the time window for the rate limit, and the NTP server for time synchronization.

Features

  • Configurable rate limiting options
  • NTP server support for accurate time synchronization
  • Message broker integration for distributed rate limiting
  • In-memory caching for efficient rate limit tracking
  • This repo also provides the Limiter subpackage for rate limiting functionality

Distributed Use Case

The use of a message broker can allow for rate limiting across application/servers. Since it is a message broker, it is not atomic rate limiting but should be conisidered more of approximate rate limiting.

The advantage of this approach is that the actual rate limiting happens in the server memory/application versus requiring an external network call like traditional solutions.

This means that checking the rate of users is very fast and in the instance of blocking users, not even a publish call is made over the network (refer to sequence diagram). Therefore; 429 use cases require very little latency to shut down in the scenario of DOS attacks.

Rate Broker is best used in server side applications versus client side applications. Although it certainly can be used in both scenarios, in the case of client side the benefit is not as impactful.

Request Sequence

Request Sequence

Example Diagram w/ Redis Streams

Redis Distributed Arch

Usage

Non Distributed Example
import "github.com/parkerroan/ratebroker"

// create a rate broker with max rate of 100 req / min
rb := ratebroker.NewRateBroker(
    ratebroker.WithMaxRequests(100),
    ratebroker.WithWindow(1 * time.Minute),
)

// You can then use the `TryAccept` method to check if a new request should be accepted based on the current rate limit:
ok, details := rb.TryAccept(context.Background(), "user1")
if !ok {
    log.Printf("Rate limit exceeded. Max requests: %d, Window: %s", details.MaxRequests, details.Window)
} else {
    log.Println("Request accepted")
}
Distributed HTTP Server Example
import (
	"context"
	"fmt"
	"log"
	"net/http"
	"os"
	"time"

	"github.com/go-redis/redis/v8"
	"github.com/joho/godotenv"
	"github.com/kelseyhightower/envconfig"
	"github.com/parkerroan/ratebroker"

	"github.com/gorilla/mux"
	"golang.org/x/exp/slog"
)

func main() {
    rdb := redis.NewClient(&redis.Options{
		Addr: "localhost:6379",
	})

	// Create instances of your broker and limiter
	redisBroker := ratebroker.NewRedisMessageBroker(rdb)

	// Create a rate broker w/ ring limiter
	rateBroker := ratebroker.NewRateBroker(
		ratebroker.WithBroker(redisBroker),
		ratebroker.WithMaxRequests(cfg.MaxRequests),
		ratebroker.WithWindow(cfg.Window),
        // ratebroker.WithNTPServer("pool.ntp.org") Optional NTP server feature 
	)

	ctx := context.Background()
	rateBroker.Start(ctx)

	// This function generates a key (in this case, the client's IP address)
	// that the rate limiter uses to identify unique clients.
	keyGetter := func(r *http.Request) string {
		// You might want to improve this method to handle IP-forwarding, etc.
		return r.RemoteAddress
	}

	// Create a new router
	r := mux.NewRouter() // or http.NewServeMux()

	// Create a new rate limited HTTP handler using your middleware
	r.Use(ratebroker.HttpMiddleware(rateBroker, keyGetter))

	r.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		// Handle the request, i.e., serve content, call other functions, etc.
		w.Write([]byte("Hello, World!"))
	})

	log.Fatal(http.ListenAndServe(":8080", r))
}

Development

TODO

Contributing

Contributions are welcome. Please submit a pull request or create an issue to discuss the changes.

License

This project is licensed under the MIT License.

Documentation

Overview

Package ratebroker manages rate limiting across various clients/servers, enabling control over request frequency per user within a specific time frame and maximum request count.

One feature of ratebroker is its compatibility with message brokers to distribute rate limits across multiple subscribers on the same topic/channel/stream. It uses an interface, allowing adaptability with any message broker that follows the prescribed method signature. This package includes support for Redis Streams, detailed at https://redis.io/topics/streams-intro.

Without an external message broker, ratebroker employs a local limiter. Usage example:

import (
	"time"
	"github.com/parkerroan/ratebroker"
)

func main() {
	// Set up ratebroker with a local limiter.
	rb := ratebroker.New(
		ratebroker.WithMaxRequests(10),
		ratebroker.WithWindow(10*time.Second),
	)

	allowed, detail := rb.TryAccept("userKey")
	// ... other code ...
}

The package utilizes 'limiters' to enforce rate limits. By default, a ring buffer limiter is used.

Two standalone limiters are also available, separate from the main ratebroker functionality, for scenarios not requiring distributed or user-specific rate limits:

1. Ring: The default, a ring buffer limiter suitable where slight leniency is acceptable. Details at https://github.com/parkerroan/ratebroker/limiter/ring.

2. Heap: A min-heap limiter for contexts needing higher precision, sorting requests by timestamps. Though more accurate, it requires more resources. See https://github.com/parkerroan/ratebroker/limiter/heap.

The choice between Ring and Heap depends on the precision and resource constraints of your application.

Index

Constants

View Source
const (
	// RequestAccepted is the event type for a request that was accepted.
	RequestAccepted = "REQUEST_ACCEPTED"
)

Variables

This section is empty.

Functions

func HttpMiddleware

func HttpMiddleware(rb *RateBroker, keyGetter func(r *http.Request) string) func(next http.Handler) http.Handler

HttpMiddleware creates a new middleware function for rate limiting. This function is compatible with both standard net/http and mux handlers.

func WithInitLoadOffset

func WithInitLoadOffset(offset time.Duration) func(*RedisMessageBroker)

WithInitLoadOffset is a time duration that will allow the pulling of older messages on startup from the topic. This would be used for not losing client request history on restart.

func WithStream

func WithStream(stream string) func(*RedisMessageBroker)

WithStream sets the Redis stream name, a good value would be the name of your application. default: "ratebroker"

Types

type LimitDetails

type LimitDetails struct {
	MaxRequests int
	Window      time.Duration
}

LimitDetails is a struct that contains the max requests and window for a single limiter.

type Message

type Message struct {
	BrokerID  string    `json:"broker_id"` // The ID of the broker
	Event     string    `json:"event"`     // Type of event, e.g., "request_accepted"
	Timestamp time.Time `json:"timestamp"` // When the event occurred
	Key       string    `json:"key"`       // The key of the request, e.g., IP, UserID, etc.
}

Message represents the structure of the data that will be sent through the broker.

type MessageBroker

type MessageBroker interface {
	Publish(ctx context.Context, msg Message) error
	Consume(ctx context.Context, handlerFunc func(Message)) error
}

MessageBroker is an interface that defines the methods that a broker must implement. The broker is responsible for publishing and consuming messages and could be implemented in any message broker, e.g., Redis, Kafka, etc.

type NewLimiterFunc

type NewLimiterFunc func(int, time.Duration) limiter.Limiter

NewLimiterFunc is a function that creates a new limiter.

type Option

type Option func(*RateBroker)

Option is a function that can be passed into NewRateBroker to configure the RateBroker.

func WithBroker

func WithBroker(broker MessageBroker) Option

WithBroker sets the broker for the RateBroker.

If no broker is provided, the RateBroker will not publish events to the message broker.

Instead it will use the local limiter to enforce rate limits without distribution.

func WithID

func WithID(id string) Option

WithID sets the ID for the RateBroker ID is used to identify messages published by the RateBroker and should be unique per pod/replica

func WithLimiterContructorFunc

func WithLimiterContructorFunc(limiterFunc NewLimiterFunc) Option

WithLimiterContructorFunc sets the function used to create a new limiter. The default is limiter.NewRingLimiterConstructorFunc() If you want to use a different limiter, you can pass in a function that creates it. For example, limiter.NewHeapLimiterConstructorFunc() is a supplied limiter that uses a heap instead of a ring buffer.

func WithMaxRequests

func WithMaxRequests(max int) Option

WithMaxRequests sets the maximum number of requests allowed within the supplied window for the RateBroker.

func WithMaxThreads

func WithMaxThreads(maxThreads int) Option

WithMaxThreads sets the maximum number of threads allowed for publishing events to the message broker.

func WithNTPServer

func WithNTPServer(server string) Option

WithNTPServer is an option to set the NTP server for the RateBroker. If this option is not used, the RateBroker will use the system's local time.

func WithWindow

func WithWindow(window time.Duration) Option

WithWindow sets the time window for the RateBroker.

type RateBroker

type RateBroker struct {
	// contains filtered or unexported fields
}

RateBroker is the main structure that will use a Limiter to enforce rate limits.

func NewRateBroker

func NewRateBroker(opts ...Option) *RateBroker

NewRateBroker creates a RateLimiter with the provided Limiter.

func (*RateBroker) Now

func (rb *RateBroker) Now() time.Time

Now tries to get the time from the NTP server if available; otherwise, it uses the local time.

func (*RateBroker) Start

func (rb *RateBroker) Start(ctx context.Context)

Start is a method on RateLimiter that starts the broker consuming messages and handling them in the background.

func (*RateBroker) TryAccept

func (rb *RateBroker) TryAccept(ctx context.Context, key string) (bool, LimitDetails)

TryAccept is a method on RateLimiter that checks a new request against the current rate limit.

type RedisMessageBroker

type RedisMessageBroker struct {
	// contains filtered or unexported fields
}

RedisMessageBroker is an implementation of the Broker interface that uses Redis as the message broker.

func NewRedisMessageBroker

func NewRedisMessageBroker(rdb *redis.Client, opts ...func(*RedisMessageBroker)) *RedisMessageBroker

func (*RedisMessageBroker) Consume

func (r *RedisMessageBroker) Consume(ctx context.Context, handlerFunc func(Message)) error

Consume listens to messages on a Redis stream and processes them with handlerFunc

func (*RedisMessageBroker) Publish

func (r *RedisMessageBroker) Publish(ctx context.Context, message Message) error

Publish publishes a message to a Redis stream

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL