groupcache

package module
v2.0.0-...-be496b8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2024 License: Apache-2.0 Imports: 24 Imported by: 0

README

groupcache

GitHub Actions Workflow Status

groupcache is a caching and cache-filling library, intended as a replacement for memcached in many cases.

For API docs and examples, see http://godoc.org/github.com/tochemey/groupcache/v2

Note: This repository is made readonly. The new features will be handled in this new repo

Table of Content

Overview

A modified version of group cache with support:

  • upgrade the protobuf API.
  • service discovery
  • simple logger interface with a default logger based upon uber zap library
  • reorganise the proto definitions into a folder called: protos. See how to generate the pbs here
Modifications from original library

In addition to these modifications,

  • Logger interface to help add custom logging framework
  • Service Discovery to help discover other group cache automatically. At the moment the following providers are implemented:
  • HTTPPoolOpts is no longer exposed to the caller. It has been renamed to httpPoolOpts. The same applies to HTTPPool. This allows to start groupcache with a service discovery provider.
  • Upgrade the protocol buffer API

Comparing Groupcache to memcached

Like memcached, groupcache:
  • shards by key to select which peer is responsible for that key
Unlike memcached, groupcache:
  • does not require running a separate set of servers, thus massively reducing deployment/configuration pain. groupcache is a client library as well as a server. It connects to its own peers.

  • comes with a cache filling mechanism. Whereas memcached just says "Sorry, cache miss", often resulting in a thundering herd of database (or whatever) loads from an unbounded number of clients (which has resulted in several fun outages), groupcache coordinates cache fills such that only one load in one process of an entire replicated set of processes populates the cache, then multiplexes the loaded value to all callers.

  • does not support versioned values. If key "foo" is value "bar", key "foo" must always be "bar".

Loading process

In a nutshell, a groupcache lookup of Get("foo") looks like:

(On machine #5 of a set of N machines running the same code)

  1. Is the value of "foo" in local memory because it's super hot? If so, use it.

  2. Is the value of "foo" in local memory because peer #5 (the current peer) is the owner of it? If so, use it.

  3. Amongst all the peers in my set of N, am I the owner of the key "foo"? (e.g. does it consistent hash to 5?) If so, load it. If other callers come in, via the same process or via RPC requests from peers, they block waiting for the load to finish and get the same answer. If not, RPC to the peer that's the owner and get the answer. If the RPC fails, just load it locally (still with local dup suppression).

Example

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/tochemey/groupcache/v2"
    "github.com/tochemey/groupcache/v2/discovery"
    "github.com/tochemey/groupcache/v2/discovery/kubernetes"
)

func ExampleUsage() {

    // NOTE: It is important each node running the groupcache has the env vars properly set:
    // GROUP_PORT, NODE_NAME and NODE_IP
    // That the service discovery can properly identify the running instance

    // Create an instance of the discovery service.
    // For instance let us use kubernetes
    provider := kubernetes.New()

    // Create the discovery options
    // For kubernetes we only need the namespace and the application name
    application := "users"
    namespace := "default"
	 
    options := discovery.Config{
        kubernetes.ApplicationName: application,
        kubernetes.Namespace:       namespace,
    }

    // Create an instance of the service discovery
    serviceDiscovery := discovery.NewServiceDiscovery(provider, options)

    // Create an instance of the cluster
    ctx := context.Background()
    node := groupcache.NewNode(ctx, serviceDiscovery)
    
    // Start the cluster node
    err := node.Start(ctx)
    
    // Stop the cluster node
    defer node.Stop(ctx)
	 
    // Create a new group cache with a max cache size of 3MB
    group := groupcache.NewGroup("users", 3000000, groupcache.GetterFunc(
        func(ctx context.Context, id string, dest groupcache.Sink) error {

            // Returns a protobuf struct `User`
            user, err := fetchUserFromMongo(ctx, id)
            if err != nil {
                return err
            }

            // Set the user in the groupcache to expire after 5 minutes
            return dest.SetProto(&user, time.Now().Add(time.Minute*5))
        },
    ))

    user := new(User)

    ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
    defer cancel()

    if err := group.Get(ctx, "12345", groupcache.ProtoSink(user)); err != nil {
        log.Fatal(err)
    }

    fmt.Printf("-- User --\n")
    fmt.Printf("Id: %s\n", user.Id)
    fmt.Printf("Name: %s\n", user.Name)
    fmt.Printf("Age: %d\n", user.Age)
    fmt.Printf("IsSuper: %t\n", user.IsSuper)

    // Remove the key from the groupcache
    if err := group.Remove(ctx, "12345"); err != nil {
        log.Fatal(err)
    }
}

Clustering

The cluster engine depends upon the discovery mechanism to find other nodes in the cluster.

At the moment the following providers are implemented:

Note: One can add additional discovery providers using the following interface

In addition, one needs to set the following environment variables irrespective of the discovery provider to help identify the host node on which the cluster service is running:

  • NODE_NAME: the node name. For instance in kubernetes one can just get it from the metadata.name
  • NODE_IP: the node host address. For instance in kubernetes one can just get it from the status.podIP
  • GROUP_PORT: the port used by the discovery provider to communicate.

Note: Depending upon the discovery provider implementation, the NODE_NAME and NODE_IP can be the same.

Built-in Discovery Providers
Kubernetes Discovery Provider Setup

To get the kubernetes discovery working as expected, the following pod labels need to be set:

  • app.kubernetes.io/part-of: set this label with the actor system name
  • app.kubernetes.io/component: set this label with the application name
  • app.kubernetes.io/name: set this label with the application name

In addition, each node is required to have the following port open with the following ports name for the cluster engine to work as expected:

  • group-port: help the gossip protocol engine. This is actually the kubernetes discovery port
Get Started
const (
    namespace = "default"
    applicationName = "accounts"
)
// instantiate the k8 discovery provider
disco := kubernetes.NewDiscovery()
// define the discovery options
discoOptions := discovery.Config{
    kubernetes.ApplicationName: applicationName,
    kubernetes.Namespace:       namespace,
}
// define the service discovery
serviceDiscovery := discovery.NewServiceDiscovery(disco, discoOptions)
// start the cluster
Role Based Access

You’ll also have to grant the Service Account that your pods run under access to list pods. The following configuration can be used as a starting point. It creates a Role, pod-reader, which grants access to query pod information. It then binds the default Service Account to the Role by creating a RoleBinding. Adjust as necessary:

kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: pod-reader
rules:
  - apiGroups: [""] # "" indicates the core API group
    resources: ["pods"]
    verbs: ["get", "watch", "list"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: read-pods
subjects:
  # Uses the default service account. Consider creating a new one.
  - kind: ServiceAccount
    name: default
roleRef:
  kind: Role
  name: pod-reader
  apiGroup: rbac.authorization.k8s.io
mDNS Discovery Provider Setup
  • Service Name: the service name
  • Domain: The mDNS discovery domain
  • Port: The mDNS discovery port
  • IPv6: States whether to lookup for IPv6 addresses.
NATS Discovery Provider Setup

To use the NATS discovery provider one needs to provide the following:

  • NATS Server Address: the NATS Server address
  • NATS Subject: the NATS subject to use
  • Application Name: the application name
const (
    natsServerAddr = "nats://localhost:4248"
    natsSubject = "groupcache-gossip"
    applicationName = "accounts"
)
// instantiate the NATS discovery provider
disco := nats.NewDiscovery()
// define the discovery options
discoOptions := discovery.Config{
    ApplicationName: applicationName,
    NatsServer:      natsServer,
    NatsSubject:     natsSubject,
}
// define the service discovery
serviceDiscovery := discovery.NewServiceDiscovery(disco, discoOptions)
// start the cluster

Contribution

Contributions are welcome! The project adheres to Semantic Versioning and Conventional Commits. This repo uses Earthly.

To contribute please:

  • Fork the repository
  • Create a feature branch
  • Submit a pull request
Protobuf stub Generation

To generate any proto definition add the proto definition into the protos folder and run the following command:

earthly +protogen

Documentation

Overview

Package groupcache provides a data loading mechanism with caching and de-duplication that works across a set of peer processes.

Each data Get first consults its local cache, otherwise delegates to the requested key's canonical owner, which then checks its cache or finally gets the data. In the common case, many concurrent cache misses across a set of peers for the same key result in just one cache fill.

Index

Constants

This section is empty.

Variables

View Source
var NowFunc lru.NowFunc = time.Now

NowFunc returns the current time which is used by the LRU to determine if the value has expired. This can be overridden by tests to ensure items are evicted when expired.

Functions

func DeregisterGroup

func DeregisterGroup(name string)

DeregisterGroup removes group from group pool

func RegisterNewGroupHook

func RegisterNewGroupHook(fn func(*Group))

RegisterNewGroupHook registers a hook that is run each time a group is created.

func RegisterPeerPicker

func RegisterPeerPicker(fn func() PeerPicker)

RegisterPeerPicker registers the peer initialization function. It is called once, when the first group is created. Either RegisterPeerPicker or RegisterPerGroupPeerPicker should be called exactly once, but not both.

func RegisterPerGroupPeerPicker

func RegisterPerGroupPeerPicker(fn func(groupName string) PeerPicker)

RegisterPerGroupPeerPicker registers the peer initialization function, which takes the groupName, to be used in choosing a PeerPicker. It is called once, when the first group is created. Either RegisterPeerPicker or RegisterPerGroupPeerPicker should be called exactly once, but not both.

func RegisterServerStart

func RegisterServerStart(fn func())

RegisterServerStart registers a hook that is run when the first group is created.

Types

type AtomicInt

type AtomicInt int64

An AtomicInt is an int64 to be accessed atomically.

func (*AtomicInt) Add

func (i *AtomicInt) Add(n int64)

Add atomically adds n to i.

func (*AtomicInt) Get

func (i *AtomicInt) Get() int64

Get atomically gets the value of i.

func (*AtomicInt) Store

func (i *AtomicInt) Store(n int64)

Store atomically stores n to i.

func (*AtomicInt) String

func (i *AtomicInt) String() string

type ByteView

type ByteView struct {
	// contains filtered or unexported fields
}

A ByteView holds an immutable view of bytes. Internally it wraps either a []byte or a string, but that detail is invisible to callers.

A ByteView is meant to be used as a value type, not a pointer (like a time.Time).

func (ByteView) At

func (v ByteView) At(i int) byte

At returns the byte at index i.

func (ByteView) ByteSlice

func (v ByteView) ByteSlice() []byte

ByteSlice returns a copy of the data as a byte slice.

func (ByteView) Copy

func (v ByteView) Copy(dest []byte) int

Copy copies b into dest and returns the number of bytes copied.

func (ByteView) Equal

func (v ByteView) Equal(b2 ByteView) bool

Equal returns whether the bytes in b are the same as the bytes in b2.

func (ByteView) EqualBytes

func (v ByteView) EqualBytes(b2 []byte) bool

EqualBytes returns whether the bytes in b are the same as the bytes in b2.

func (ByteView) EqualString

func (v ByteView) EqualString(s string) bool

EqualString returns whether the bytes in b are the same as the bytes in s.

func (ByteView) Expire

func (v ByteView) Expire() time.Time

Returns the expire time associated with this view

func (ByteView) Len

func (v ByteView) Len() int

Len returns the view's length.

func (ByteView) ReadAt

func (v ByteView) ReadAt(p []byte, off int64) (n int, err error)

ReadAt implements io.ReaderAt on the bytes in v.

func (ByteView) Reader

func (v ByteView) Reader() io.ReadSeeker

Reader returns an io.ReadSeeker for the bytes in v.

func (ByteView) Slice

func (v ByteView) Slice(from, to int) ByteView

Slice slices the view between the provided from and to indices.

func (ByteView) SliceFrom

func (v ByteView) SliceFrom(from int) ByteView

SliceFrom slices the view from the provided index until the end.

func (ByteView) String

func (v ByteView) String() string

String returns the data as a string, making a copy if necessary.

func (ByteView) WriteTo

func (v ByteView) WriteTo(w io.Writer) (n int64, err error)

WriteTo implements io.WriterTo on the bytes in v.

type CacheStats

type CacheStats struct {
	Bytes     int64
	Items     int64
	Gets      int64
	Hits      int64
	Evictions int64
}

CacheStats are returned by stats accessors on Group.

type CacheType

type CacheType int

CacheType represents a type of cache.

const (
	// The MainCache is the cache for items that this peer is the
	// owner for.
	MainCache CacheType = iota + 1

	// The HotCache is the cache for items that seem popular
	// enough to replicate to this node, even though it's not the
	// owner.
	HotCache
)

type ErrNotFound

type ErrNotFound struct {
	Msg string
}

ErrNotFound should be returned from an implementation of `GetterFunc` to indicate the requested value is not available. When remote HTTP calls are made to retrieve values from other groupcache instances, returning this error will indicate to groupcache that the value requested is not available, and it should NOT attempt to call `GetterFunc` locally.

func (*ErrNotFound) Error

func (e *ErrNotFound) Error() string

func (*ErrNotFound) Is

func (e *ErrNotFound) Is(target error) bool

type ErrRemoteCall

type ErrRemoteCall struct {
	Msg string
}

ErrRemoteCall is returned from `group.Get()` when a remote GetterFunc returns an error. When this happens `group.Get()` does not attempt to retrieve the value via our local GetterFunc.

func (*ErrRemoteCall) Error

func (e *ErrRemoteCall) Error() string

func (*ErrRemoteCall) Is

func (e *ErrRemoteCall) Is(target error) bool

type Getter

type Getter interface {
	// Get returns the value identified by key, populating dest.
	//
	// The returned data must be un-versioned. That is, key must
	// uniquely describe the loaded data, without an implicit
	// current time, and without relying on cache expiration
	// mechanisms.
	Get(ctx context.Context, key string, dest Sink) error
}

A Getter loads data for a key.

type GetterFunc

type GetterFunc func(ctx context.Context, key string, dest Sink) error

A GetterFunc implements Getter with a function.

func (GetterFunc) Get

func (f GetterFunc) Get(ctx context.Context, key string, dest Sink) error

type Group

type Group struct {

	// Stats are statistics on the group.
	Stats Stats
	// contains filtered or unexported fields
}

A Group is a cache namespace and associated data loaded spread over a group of 1 or more machines.

func GetGroup

func GetGroup(name string) *Group

GetGroup returns the named group previously created with NewGroup, or nil if there's no such group.

func NewGroup

func NewGroup(name string, cacheBytes int64, getter Getter) *Group

NewGroup creates a coordinated group-aware Getter from a Getter.

The returned Getter tries (but does not guarantee) to run only one Get call at once for a given key across an entire set of peer processes. Concurrent callers both in the local process and in other processes receive copies of the answer once the original Get completes.

The group name must be unique for each getter.

func (*Group) CacheStats

func (g *Group) CacheStats(which CacheType) CacheStats

CacheStats returns stats about the provided cache within the group.

func (*Group) Get

func (g *Group) Get(ctx context.Context, key string, dest Sink) error

func (*Group) Name

func (g *Group) Name() string

Name returns the name of the group.

func (*Group) Remove

func (g *Group) Remove(ctx context.Context, key string) error

Remove clears the key from our cache then forwards the remove request to all peers.

func (*Group) Set

func (g *Group) Set(ctx context.Context, key string, value []byte, expire time.Time, hotCache bool) error

type Interface

type Interface interface {
	// Start starts the Node engine
	Start(ctx context.Context) error
	// Stop stops the Node engine
	Stop(ctx context.Context) error
}

Interface defines the Node interface

type NoPeers

type NoPeers struct{}

NoPeers is an implementation of PeerPicker that never finds a peer.

func (NoPeers) GetAll

func (NoPeers) GetAll() []ProtoGetter

func (NoPeers) PickPeer

func (NoPeers) PickPeer(key string) (peer ProtoGetter, ok bool)

type Node

type Node struct {
	// contains filtered or unexported fields
}

func NewNode

func NewNode(ctx context.Context, sd *discovery.Discovery, opts ...Option) (*Node, error)

NewNode create an instance of the cluster node

func (*Node) Start

func (x *Node) Start(ctx context.Context) error

Start starts the groupcache cluster node

func (*Node) Stop

func (x *Node) Stop(ctx context.Context) error

Stop shutdown the cluster node

type Option

type Option interface {
	// Apply sets the Option value of a config.
	Apply(node *Node)
}

func WithBasePath

func WithBasePath(path string) Option

WithBasePath set the base path

func WithHashFn

func WithHashFn(hashFn consistenthash.Hash) Option

WithHashFn sets the custom hash function

func WithLogger

func WithLogger(logger log.Logger) Option

WithLogger sets the logger

func WithReplicaCount

func WithReplicaCount(count int) Option

WithReplicaCount sets the total number of replicas count

func WithShutdownTimeout

func WithShutdownTimeout(timeout time.Duration) Option

WithShutdownTimeout sets the shutdown timeout

type OptionFunc

type OptionFunc func(node *Node)

OptionFunc implements the Option interface.

func (OptionFunc) Apply

func (f OptionFunc) Apply(node *Node)

Apply applies the Node's option

type PeerPicker

type PeerPicker interface {
	// PickPeer returns the peer that owns the specific key
	// and true to indicate that a remote peer was nominated.
	// It returns nil, false if the key owner is the current peer.
	PickPeer(key string) (peer ProtoGetter, ok bool)
	// GetAll returns all the peers in the group
	GetAll() []ProtoGetter
}

PeerPicker is the interface that must be implemented to locate the peer that owns a specific key.

type ProtoGetter

type ProtoGetter interface {
	Get(context context.Context, in *pb.GetRequest, out *pb.GetResponse) error
	Remove(context context.Context, in *pb.GetRequest) error
	Set(context context.Context, in *pb.SetRequest) error
	// GetURL returns the peer URL
	GetURL() string
}

ProtoGetter is the interface that must be implemented by a peer.

type Sink

type Sink interface {
	// SetString sets the value to s.
	SetString(s string, e time.Time) error

	// SetBytes sets the value to the contents of v.
	// The caller retains ownership of v.
	SetBytes(v []byte, e time.Time) error

	// SetProto sets the value to the encoded version of m.
	// The caller retains ownership of m.
	SetProto(m proto.Message, e time.Time) error
	// contains filtered or unexported methods
}

A Sink receives data from a Get call.

Implementation of Getter must call exactly one of the Set methods on success.

`e` sets an optional time in the future when the value will expire. If you don't want expiration, pass the zero value for `time.Time` (for instance, `time.Time{}`).

func AllocatingByteSliceSink

func AllocatingByteSliceSink(dst *[]byte) Sink

AllocatingByteSliceSink returns a Sink that allocates a byte slice to hold the received value and assigns it to *dst. The memory is not retained by groupcache.

func ByteViewSink

func ByteViewSink(dst *ByteView) Sink

ByteViewSink returns a Sink that populates a ByteView.

func ProtoSink

func ProtoSink(m proto.Message) Sink

ProtoSink returns a sink that unmarshals binary proto values into m.

func StringSink

func StringSink(sp *string) Sink

StringSink returns a Sink that populates the provided string pointer.

func TruncatingByteSliceSink

func TruncatingByteSliceSink(dst *[]byte) Sink

TruncatingByteSliceSink returns a Sink that writes up to len(*dst) bytes to *dst. If more bytes are available, they're silently truncated. If fewer bytes are available than len(*dst), *dst is shrunk to fit the number of bytes available.

type Stats

type Stats struct {
	Gets                     AtomicInt // any Get request, including from peers
	CacheHits                AtomicInt // either cache was good
	GetFromPeersLatencyLower AtomicInt // slowest duration to request value from peers
	PeerLoads                AtomicInt // either remote load or remote cache hit (not an error)
	PeerErrors               AtomicInt
	Loads                    AtomicInt // (gets - cacheHits)
	LoadsDeduped             AtomicInt // after singleflight
	LocalLoads               AtomicInt // total good local loads
	LocalLoadErrs            AtomicInt // total bad local loads
	ServerRequests           AtomicInt // gets that came over the network from peers
}

Stats are per-group statistics.

Directories

Path Synopsis
Package consistenthash provides an implementation of a ring hash.
Package consistenthash provides an implementation of a ring hash.
Package discovery provides the interface to discover other groupcache nodes in the cluster
Package discovery provides the interface to discover other groupcache nodes in the cluster
kubernetes
Package kubernetes defines the kubernetes discovery provider
Package kubernetes defines the kubernetes discovery provider
mdns
Package mdns defines the mdns discovery provider
Package mdns defines the mdns discovery provider
example
pb
Package lru implements an LRU cache.
Package lru implements an LRU cache.
Package singleflight provides a duplicate function call suppression mechanism.
Package singleflight provides a duplicate function call suppression mechanism.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL