cache

package
v0.0.0-...-e951c9a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 6, 2022 License: AGPL-3.0 Imports: 42 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultPurgeInterval = 1 * time.Minute
)
View Source
const (
	GroupcacheRingKey = "groupcache"
)

Variables

View Source
var (
	ErrGroupcacheMiss = errors.New("cache miss")
)

Functions

func ExtractCacheGenNumber

func ExtractCacheGenNumber(ctx context.Context) string

ExtractCacheGenNumbersFromHeaders gets the cache gen from the context.

func HashKey

func HashKey(key string) string

HashKey hashes key into something you can store in memcached.

func InjectCacheGenNumber

func InjectCacheGenNumber(ctx context.Context, cacheGen string) context.Context

InjectCacheGenNumber returns a derived context containing the cache gen.

func IsCacheConfigured

func IsCacheConfigured(cfg Config) bool

IsCacheConfigured determines if memcached, redis, or embedded-cache have been configured

func IsEmbeddedCacheSet

func IsEmbeddedCacheSet(cfg Config) bool

func IsEmptyTieredCache

func IsEmptyTieredCache(cache Cache) bool

IsEmptyTieredCache is used to determine whether the current Cache is implemented by an empty tiered.

func IsMemcacheSet

func IsMemcacheSet(cfg Config) bool

IsMemcacheSet returns whether a non empty Memcache config is set or not, based on the configured host or addresses.

Internally, this function is used to set Memcache as the cache storage to be used.

func IsRedisSet

func IsRedisSet(cfg Config) bool

IsRedisSet returns whether a non empty Redis config is set or not, based on the configured endpoint.

Internally, this function is used to set Redis as the cache storage to be used.

func StringToBytes

func StringToBytes(s string) []byte

StringToBytes converts string to byte slice. (copied from vendor/github.com/go-redis/redis/v8/internal/util/unsafe.go)

Types

type BackgroundConfig

type BackgroundConfig struct {
	WriteBackGoroutines int `yaml:"writeback_goroutines"`
	WriteBackBuffer     int `yaml:"writeback_buffer"`
}

BackgroundConfig is config for a Background Cache.

func (*BackgroundConfig) RegisterFlagsWithPrefix

func (cfg *BackgroundConfig) RegisterFlagsWithPrefix(prefix string, description string, f *flag.FlagSet)

RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet

type Cache

type Cache interface {
	Store(ctx context.Context, key []string, buf [][]byte) error
	Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error)
	Stop()
	// GetCacheType returns a string indicating the cache "type" for the purpose of grouping cache usage statistics
	GetCacheType() stats.CacheType
}

Cache byte arrays by key.

func CollectStats

func CollectStats(cache Cache) Cache

CollectStats returns a new Cache that keeps various statistics on cache usage.

func Instrument

func Instrument(name string, cache Cache, reg prometheus.Registerer) Cache

Instrument returns an instrumented cache.

func New

func New(cfg Config, reg prometheus.Registerer, logger log.Logger, cacheType stats.CacheType) (Cache, error)

New creates a new Cache using Config.

func NewBackground

func NewBackground(name string, cfg BackgroundConfig, cache Cache, reg prometheus.Registerer) Cache

NewBackground returns a new Cache that does stores on background goroutines.

func NewCacheGenNumMiddleware

func NewCacheGenNumMiddleware(downstreamCache Cache) Cache

NewCacheGenNumMiddleware creates a new GenNumMiddleware.

func NewMockCache

func NewMockCache() Cache

NewMockCache makes a new MockCache.

func NewNoopCache

func NewNoopCache() Cache

NewNoopCache returns a no-op cache.

func NewSnappy

func NewSnappy(next Cache, logger log.Logger) Cache

NewSnappy makes a new snappy encoding cache wrapper.

func NewTiered

func NewTiered(caches []Cache) Cache

NewTiered makes a new tiered cache.

func StopOnce

func StopOnce(cache Cache) Cache

StopOnce wraps a Cache and ensures its only stopped once.

type Config

type Config struct {
	EnableFifoCache bool `yaml:"enable_fifocache"`

	DefaultValidity time.Duration `yaml:"default_validity"`

	Background     BackgroundConfig      `yaml:"background"`
	Memcache       MemcachedConfig       `yaml:"memcached"`
	MemcacheClient MemcachedClientConfig `yaml:"memcached_client"`
	Redis          RedisConfig           `yaml:"redis"`
	EmbeddedCache  EmbeddedCacheConfig   `yaml:"embedded_cache"`
	Fifocache      FifoCacheConfig       `yaml:"fifocache"` // deprecated

	// This is to name the cache metrics properly.
	Prefix string `yaml:"prefix" doc:"hidden"`

	// For tests to inject specific implementations.
	Cache Cache `yaml:"-"`

	// AsyncCacheWriteBackConcurrency specifies the number of goroutines to use when asynchronously writing chunks fetched from the store to the chunk cache.
	AsyncCacheWriteBackConcurrency int `yaml:"async_cache_write_back_concurrency"`
	// AsyncCacheWriteBackBufferSize specifies the maximum number of fetched chunks to buffer for writing back to the chunk cache.
	AsyncCacheWriteBackBufferSize int `yaml:"async_cache_write_back_buffer_size"`
}

Config for building Caches.

func (*Config) RegisterFlagsWithPrefix

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f *flag.FlagSet)

RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet

func (*Config) Validate

func (cfg *Config) Validate() error

type EmbeddedCacheConfig

type EmbeddedCacheConfig struct {
	Distributed bool          `yaml:"distributed,omitempty"`
	Enabled     bool          `yaml:"enabled,omitempty"`
	MaxSizeMB   int64         `yaml:"max_size_mb"`
	TTL         time.Duration `yaml:"ttl"`

	// PurgeInterval tell how often should we remove keys that are expired.
	// by default it takes `DefaultPurgeInterval`
	PurgeInterval time.Duration `yaml:"-"`
}

EmbeddedCacheConfig represents in-process embedded cache config. It can also be distributed, sharding keys across peers when run with microservices or SSD mode.

func (*EmbeddedCacheConfig) IsEnabledWithDistributed

func (cfg *EmbeddedCacheConfig) IsEnabledWithDistributed() bool

func (*EmbeddedCacheConfig) IsEnabledWithoutDistributed

func (cfg *EmbeddedCacheConfig) IsEnabledWithoutDistributed() bool

func (*EmbeddedCacheConfig) RegisterFlagsWithPrefix

func (cfg *EmbeddedCacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet)

type EmbeddedCacheSingletonConfig

type EmbeddedCacheSingletonConfig struct {
	// distributed cache configs. Have no meaning if `Distributed=false`.
	ListenPort int     `yaml:"listen_port,omitempty"`
	Ring       RingCfg `yaml:"ring,omitempty"`

	// Default capacity if none provided while creating each "Group".
	MaxSizeMB int64 `yaml:"max_size__mb,omitempty"`

	// Different timeouts
	HeartbeatInterval time.Duration `yaml:"heartbeat_interval,omitempty"`
	HeartbeatTimeout  time.Duration `yaml:"heartbeat_timeout,omitempty"`
	WriteByteTimeout  time.Duration `yaml:"write_timeout,omitempty"`
}

EmbeddedCacheSingletonConfig defines global singleton needed by Embedded cache(particularly used in distributed fashion)

func (*EmbeddedCacheSingletonConfig) RegisterFlagsWithPrefix

func (cfg *EmbeddedCacheSingletonConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet)

type FifoCache

type FifoCache struct {
	// contains filtered or unexported fields
}

FifoCache is a simple string -> interface{} cache which uses a fifo slide to manage evictions. O(1) inserts and updates, O(1) gets.

func NewFifoCache

func NewFifoCache(name string, cfg FifoCacheConfig, reg prometheus.Registerer, logger log.Logger, cacheType stats.CacheType) *FifoCache

NewFifoCache returns a new initialised FifoCache of size.

func (*FifoCache) Fetch

func (c *FifoCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error)

Fetch implements Cache.

func (*FifoCache) Get

func (c *FifoCache) Get(ctx context.Context, key string) ([]byte, bool)

Get returns the stored value against the key and when the key was last updated.

func (*FifoCache) GetCacheType

func (c *FifoCache) GetCacheType() stats.CacheType

func (*FifoCache) Stop

func (c *FifoCache) Stop()

Stop implements Cache.

func (*FifoCache) Store

func (c *FifoCache) Store(ctx context.Context, keys []string, values [][]byte) error

Store implements Cache.

type FifoCacheConfig

type FifoCacheConfig struct {
	MaxSizeBytes string        `yaml:"max_size_bytes"`
	MaxSizeItems int           `yaml:"max_size_items"` // deprecated
	TTL          time.Duration `yaml:"ttl"`

	DeprecatedValidity time.Duration `yaml:"validity"`
	DeprecatedSize     int           `yaml:"size"`

	PurgeInterval time.Duration
}

FifoCacheConfig holds config for the FifoCache.

func (*FifoCacheConfig) RegisterFlagsWithPrefix

func (cfg *FifoCacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet)

RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet

func (*FifoCacheConfig) Validate

func (cfg *FifoCacheConfig) Validate() error

type GenNumMiddleware

type GenNumMiddleware struct {
	// contains filtered or unexported fields
}

GenNumMiddleware adds gen number to keys from context. Expected size of gen numbers is upto 2 digits. If we start seeing problems with keys exceeding length limit, we need to look into resetting gen numbers.

func (GenNumMiddleware) Fetch

func (c GenNumMiddleware) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error)

Fetch adds cache gen number to keys before calling Fetch method of downstream cache. It also removes gen number before responding back with found and missing keys to make sure consumer of response gets to see same keys.

func (GenNumMiddleware) GetCacheType

func (c GenNumMiddleware) GetCacheType() stats.CacheType

func (GenNumMiddleware) Stop

func (c GenNumMiddleware) Stop()

Stop calls Stop method of downstream cache.

func (GenNumMiddleware) Store

func (c GenNumMiddleware) Store(ctx context.Context, keys []string, buf [][]byte) error

Store adds cache gen number to keys before calling Store method of downstream cache.

type GroupCache

type GroupCache struct {
	// contains filtered or unexported fields
}

func NewGroupCache

func NewGroupCache(rm ringManager, config GroupCacheConfig, logger log.Logger, reg prometheus.Registerer) (*GroupCache, error)

func (*GroupCache) NewGroup

func (c *GroupCache) NewGroup(name string, cfg *GroupConfig, ct stats.CacheType) Cache

func (*GroupCache) Stats

func (c *GroupCache) Stats() *groupcache.Stats

type GroupCacheConfig

type GroupCacheConfig struct {
	Enabled           bool          `yaml:"enabled,omitempty"`
	Ring              RingCfg       `yaml:"ring,omitempty"`
	MaxSizeMB         int64         `yaml:"max_size_mb,omitempty"`
	ListenPort        int           `yaml:"listen_port,omitempty"`
	HeartbeatInterval time.Duration `yaml:"heartbeat_interval,omitempty"`
	HeartbeatTimeout  time.Duration `yaml:"heartbeat_timeout,omitempty"`
	WriteByteTimeout  time.Duration `yaml:"write_timeout,omitempty"`

	Cache Cache `yaml:"-"`
}

type GroupConfig

type GroupConfig struct {
	MaxSizeMB int64 `yaml:"max_size_mb,omitempty"`
}

Groupconfig represents config per Group.

type GroupcacheRingManager

type GroupcacheRingManager struct {
	services.Service
	// contains filtered or unexported fields
}

GroupcacheRingManager is a component instantiated before all the others and is responsible for the ring setup.

func NewGroupcacheRingManager

func NewGroupcacheRingManager(cfg GroupCacheConfig, log log.Logger, registerer prometheus.Registerer) (*GroupcacheRingManager, error)

NewRingManager is the recommended way of instantiating a GroupcacheRingManager.

The other functions will assume the GroupcacheRingManager was instantiated through this function.

func (*GroupcacheRingManager) Addr

func (rm *GroupcacheRingManager) Addr() string

func (*GroupcacheRingManager) OnRingInstanceHeartbeat

func (rm *GroupcacheRingManager) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc)

func (*GroupcacheRingManager) OnRingInstanceRegister

func (rm *GroupcacheRingManager) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens)

func (*GroupcacheRingManager) OnRingInstanceStopping

func (rm *GroupcacheRingManager) OnRingInstanceStopping(_ *ring.BasicLifecycler)

func (*GroupcacheRingManager) OnRingInstanceTokens

func (rm *GroupcacheRingManager) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens)

func (*GroupcacheRingManager) Ring

func (rm *GroupcacheRingManager) Ring() ring.ReadRing

func (*GroupcacheRingManager) ServeHTTP

func (rm *GroupcacheRingManager) ServeHTTP(w http.ResponseWriter, req *http.Request)

ServeHTTP serves the HTTP route /groupcache/ring.

type Memcached

type Memcached struct {
	// contains filtered or unexported fields
}

Memcached type caches chunks in memcached

func NewMemcached

func NewMemcached(cfg MemcachedConfig, client MemcachedClient, name string, reg prometheus.Registerer, logger log.Logger, cacheType stats.CacheType) *Memcached

NewMemcached makes a new Memcached.

func (*Memcached) Fetch

func (c *Memcached) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error)

Fetch gets keys from the cache. The keys that are found must be in the order of the keys requested.

func (*Memcached) GetCacheType

func (c *Memcached) GetCacheType() stats.CacheType

func (*Memcached) Stop

func (c *Memcached) Stop()

Stop does nothing.

func (*Memcached) Store

func (c *Memcached) Store(ctx context.Context, keys []string, bufs [][]byte) error

Store stores the key in the cache.

type MemcachedClient

type MemcachedClient interface {
	GetMulti(keys []string) (map[string]*memcache.Item, error)
	Set(item *memcache.Item) error
}

MemcachedClient interface exists for mocking memcacheClient.

func NewMemcachedClient

func NewMemcachedClient(cfg MemcachedClientConfig, name string, r prometheus.Registerer, logger log.Logger) MemcachedClient

NewMemcachedClient creates a new MemcacheClient that gets its server list from SRV and updates the server list on a regular basis.

type MemcachedClientConfig

type MemcachedClientConfig struct {
	Host           string        `yaml:"host"`
	Service        string        `yaml:"service"`
	Addresses      string        `yaml:"addresses"` // EXPERIMENTAL.
	Timeout        time.Duration `yaml:"timeout"`
	MaxIdleConns   int           `yaml:"max_idle_conns"`
	MaxItemSize    int           `yaml:"max_item_size"`
	UpdateInterval time.Duration `yaml:"update_interval"`
	ConsistentHash bool          `yaml:"consistent_hash"`
	CBFailures     uint          `yaml:"circuit_breaker_consecutive_failures"`
	CBTimeout      time.Duration `yaml:"circuit_breaker_timeout"`  // reset error count after this long
	CBInterval     time.Duration `yaml:"circuit_breaker_interval"` // remain closed for this long after CBFailures errors
}

MemcachedClientConfig defines how a MemcachedClient should be constructed.

func (*MemcachedClientConfig) RegisterFlagsWithPrefix

func (cfg *MemcachedClientConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet)

RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet

type MemcachedConfig

type MemcachedConfig struct {
	Expiration time.Duration `yaml:"expiration"`

	BatchSize   int `yaml:"batch_size"`
	Parallelism int `yaml:"parallelism"`
}

MemcachedConfig is config to make a Memcached

func (*MemcachedConfig) RegisterFlagsWithPrefix

func (cfg *MemcachedConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet)

RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet

type MemcachedJumpHashSelector

type MemcachedJumpHashSelector struct {
	// contains filtered or unexported fields
}

MemcachedJumpHashSelector implements the memcache.ServerSelector interface. MemcachedJumpHashSelector utilizes a jump hash to distribute keys to servers.

While adding or removing servers only requires 1/N keys to move, servers are treated as a stack and can only be pushed/popped. Therefore, MemcachedJumpHashSelector works best for servers with consistent DNS names where the naturally sorted order is predictable.

func DefaultMemcachedJumpHashSelector

func DefaultMemcachedJumpHashSelector() *MemcachedJumpHashSelector

func NewMemcachedJumpHashSelector

func NewMemcachedJumpHashSelector(resolveUnixAddr UnixResolver, resolveTCPAddr TCPResolver) *MemcachedJumpHashSelector

func (*MemcachedJumpHashSelector) Each

func (s *MemcachedJumpHashSelector) Each(f func(net.Addr) error) error

Each iterates over each server and calls the given function. If f returns a non-nil error, iteration will stop and that error will be returned.

func (*MemcachedJumpHashSelector) PickServer

func (s *MemcachedJumpHashSelector) PickServer(key string) (net.Addr, error)

PickServer returns the server address that a given item should be shared onto.

func (*MemcachedJumpHashSelector) SetServers

func (s *MemcachedJumpHashSelector) SetServers(servers ...string) error

SetServers changes a MemcachedJumpHashSelector's set of servers at runtime and is safe for concurrent use by multiple goroutines.

Each server is given equal weight. A server is given more weight if it's listed multiple times.

SetServers returns an error if any of the server names fail to resolve. No attempt is made to connect to the server. If any error occurs, no changes are made to the internal server list.

To minimize the number of rehashes for keys when scaling the number of servers in subsequent calls to SetServers, servers are stored in natural sort order.

type RedisCache

type RedisCache struct {
	// contains filtered or unexported fields
}

RedisCache type caches chunks in redis

func NewRedisCache

func NewRedisCache(name string, redisClient *RedisClient, logger log.Logger, cacheType stats.CacheType) *RedisCache

NewRedisCache creates a new RedisCache

func (*RedisCache) Fetch

func (c *RedisCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error)

Fetch gets keys from the cache. The keys that are found must be in the order of the keys requested.

func (*RedisCache) GetCacheType

func (c *RedisCache) GetCacheType() stats.CacheType

func (*RedisCache) Stop

func (c *RedisCache) Stop()

Stop stops the redis client.

func (*RedisCache) Store

func (c *RedisCache) Store(ctx context.Context, keys []string, bufs [][]byte) error

Store stores the key in the cache.

type RedisClient

type RedisClient struct {
	// contains filtered or unexported fields
}

func NewRedisClient

func NewRedisClient(cfg *RedisConfig) (*RedisClient, error)

NewRedisClient creates Redis client

func (*RedisClient) Close

func (c *RedisClient) Close() error

func (*RedisClient) MGet

func (c *RedisClient) MGet(ctx context.Context, keys []string) ([][]byte, error)

func (*RedisClient) MSet

func (c *RedisClient) MSet(ctx context.Context, keys []string, values [][]byte) error

func (*RedisClient) Ping

func (c *RedisClient) Ping(ctx context.Context) error

type RedisConfig

type RedisConfig struct {
	Endpoint           string         `yaml:"endpoint"`
	MasterName         string         `yaml:"master_name"`
	Timeout            time.Duration  `yaml:"timeout"`
	Expiration         time.Duration  `yaml:"expiration"`
	DB                 int            `yaml:"db"`
	PoolSize           int            `yaml:"pool_size"`
	Username           string         `yaml:"username"`
	Password           flagext.Secret `yaml:"password"`
	EnableTLS          bool           `yaml:"tls_enabled"`
	InsecureSkipVerify bool           `yaml:"tls_insecure_skip_verify"`
	IdleTimeout        time.Duration  `yaml:"idle_timeout"`
	MaxConnAge         time.Duration  `yaml:"max_connection_age"`
}

RedisConfig defines how a RedisCache should be constructed.

func (*RedisConfig) RegisterFlagsWithPrefix

func (cfg *RedisConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet)

RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet

type RingCfg

type RingCfg struct {
	lokiutil.RingConfig `yaml:",inline"`
}

RingCfg is a wrapper for the Groupcache ring configuration plus the replication factor.

type TCPResolver

type TCPResolver func(network, address string) (*net.TCPAddr, error)

type UnixResolver

type UnixResolver func(network, address string) (*net.UnixAddr, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL