cache

package
v0.0.0-...-de40865 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2024 License: Apache-2.0 Imports: 30 Imported by: 10

Documentation

Index

Constants

View Source
const (
	BackendMemcached = "memcached"
	BackendRedis     = "redis"
)
View Source
const (
	// CompressionSnappy is the value of the snappy compression.
	CompressionSnappy = "snappy"
)

Variables

View Source
var (
	ErrNoMemcachedAddresses                    = errors.New("no memcached addresses provided")
	ErrMemcachedMaxAsyncConcurrencyNotPositive = errors.New("max async concurrency must be positive")
	ErrInvalidWriteBufferSizeBytes             = errors.New("invalid write buffer size specified (must be greater than 0)")
	ErrInvalidReadBufferSizeBytes              = errors.New("invalid read buffer size specified (must be greater than 0)")
)
View Source
var (
	ErrRedisConfigNoEndpoint               = errors.New("no redis endpoint provided")
	ErrRedisMaxAsyncConcurrencyNotPositive = errors.New("max async concurrency must be positive")
)

Functions

This section is empty.

Types

type AddressProvider

type AddressProvider interface {
	// Resolve resolves the provided list of memcached cluster to the actual nodes
	Resolve(context.Context, []string) error

	// Addresses returns the nodes
	Addresses() []string
}

AddressProvider performs node address resolution given a list of clusters.

type Allocator

type Allocator interface {
	// Get returns a byte slice with at least sz capacity. Length of the slice is
	// not guaranteed and so must be asserted by callers (cache clients).
	Get(sz int) *[]byte
	// Put returns the byte slice to the underlying allocator. The cache clients
	// will only call this method during error handling when allocated values are
	// not returned to the caller as cache results.
	Put(b *[]byte)
}

Allocator allows memory for cache result values to be managed by callers instead of by a cache client itself. For example, this can be used by callers to implement arena-style memory management if a workload tends to be request-centric.

type BackendConfig

type BackendConfig struct {
	Backend   string                `yaml:"backend"`
	Memcached MemcachedClientConfig `yaml:"memcached"`
	Redis     RedisClientConfig     `yaml:"redis"`
}

func (*BackendConfig) Validate

func (cfg *BackendConfig) Validate() error

Validate the config.

type Cache

type Cache interface {
	// StoreAsync writes data into the cache asynchronously.
	//
	// Note that individual byte buffers may be retained by the cache!
	StoreAsync(data map[string][]byte, ttl time.Duration)

	// Fetch multiple keys from cache. Returns map of input keys to data.
	// If key isn't in the map, data for given key was not found. One or more
	// Option instances may be passed to modify the behavior of this Fetch call.
	Fetch(ctx context.Context, keys []string, opts ...Option) map[string][]byte

	// Delete cache entry with the given key if it exists.
	Delete(ctx context.Context, key string) error

	// Name returns the name of this particular cache instance.
	Name() string
}

Cache is a generic interface.

func CreateClient

func CreateClient(cacheName string, cfg BackendConfig, logger log.Logger, reg prometheus.Registerer) (Cache, error)

func NewCompression

func NewCompression(cfg CompressionConfig, next Cache, logger log.Logger) Cache

func NewSnappy

func NewSnappy(next Cache, logger log.Logger) Cache

NewSnappy makes a new snappy encoding cache wrapper.

func NewSpanlessTracingCache

func NewSpanlessTracingCache(cache Cache, logger log.Logger, resolver spanlogger.TenantResolver) Cache

type CompressionConfig

type CompressionConfig struct {
	Compression string `yaml:"compression"`
}

func (*CompressionConfig) RegisterFlagsWithPrefix

func (cfg *CompressionConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string)

RegisterFlagsWithPrefix registers flags with provided prefix.

func (*CompressionConfig) Validate

func (cfg *CompressionConfig) Validate() error

type InstrumentedMockCache

type InstrumentedMockCache struct {
	// contains filtered or unexported fields
}

InstrumentedMockCache is a mocked cache implementation which also tracks the number of times its functions are called.

func NewInstrumentedMockCache

func NewInstrumentedMockCache() *InstrumentedMockCache

NewInstrumentedMockCache makes a new InstrumentedMockCache.

func (*InstrumentedMockCache) CountDeleteCalls

func (m *InstrumentedMockCache) CountDeleteCalls() int

func (*InstrumentedMockCache) CountFetchCalls

func (m *InstrumentedMockCache) CountFetchCalls() int

func (*InstrumentedMockCache) CountStoreCalls

func (m *InstrumentedMockCache) CountStoreCalls() int

func (*InstrumentedMockCache) Delete

func (m *InstrumentedMockCache) Delete(ctx context.Context, key string) error

func (*InstrumentedMockCache) Fetch

func (m *InstrumentedMockCache) Fetch(ctx context.Context, keys []string, opts ...Option) map[string][]byte

func (*InstrumentedMockCache) Flush

func (m *InstrumentedMockCache) Flush()

func (*InstrumentedMockCache) GetItems

func (m *InstrumentedMockCache) GetItems() map[string]Item

func (*InstrumentedMockCache) Name

func (m *InstrumentedMockCache) Name() string

func (*InstrumentedMockCache) StoreAsync

func (m *InstrumentedMockCache) StoreAsync(data map[string][]byte, ttl time.Duration)

type Item

type Item struct {
	Data      []byte
	ExpiresAt time.Time
}

type LRUCache

type LRUCache struct {
	// contains filtered or unexported fields
}

func WrapWithLRUCache

func WrapWithLRUCache(c Cache, name string, reg prometheus.Registerer, lruSize int, defaultTTL time.Duration) (*LRUCache, error)

WrapWithLRUCache wraps a given `Cache` c with a LRU cache. The LRU cache will always store items in both caches. However it will only fetch items from the underlying cache if the LRU cache doesn't have the item. Items fetched from the underlying cache will be stored in the LRU cache with a default TTL. The LRU cache will also remove items from the underlying cache if they are expired. The LRU cache is limited in number of items using `lruSize`. This means this cache is not tailored for large items or items that have a big variation in size.

func (*LRUCache) Delete

func (l *LRUCache) Delete(ctx context.Context, key string) error

func (*LRUCache) Fetch

func (l *LRUCache) Fetch(ctx context.Context, keys []string, opts ...Option) (result map[string][]byte)

func (*LRUCache) Name

func (l *LRUCache) Name() string

func (*LRUCache) StoreAsync

func (l *LRUCache) StoreAsync(data map[string][]byte, ttl time.Duration)

type MemcachedClient

type MemcachedClient struct {
	// contains filtered or unexported fields
}

func NewMemcachedClientWithConfig

func NewMemcachedClientWithConfig(logger log.Logger, name string, config MemcachedClientConfig, reg prometheus.Registerer) (*MemcachedClient, error)

NewMemcachedClientWithConfig makes a new MemcachedClient.

func (*MemcachedClient) CompareAndSwap

func (c *MemcachedClient) CompareAndSwap(ctx context.Context, key string, value []byte, ttl time.Duration) error

func (*MemcachedClient) Decrement

func (c *MemcachedClient) Decrement(ctx context.Context, key string, delta uint64) (uint64, error)

func (*MemcachedClient) Delete

func (c *MemcachedClient) Delete(ctx context.Context, key string) error

func (*MemcachedClient) FlushAll

func (c *MemcachedClient) FlushAll(ctx context.Context) error

func (*MemcachedClient) GetMulti

func (c *MemcachedClient) GetMulti(ctx context.Context, keys []string, opts ...Option) map[string][]byte

func (*MemcachedClient) Increment

func (c *MemcachedClient) Increment(ctx context.Context, key string, delta uint64) (uint64, error)

func (*MemcachedClient) Name

func (c *MemcachedClient) Name() string

func (*MemcachedClient) SetAsync

func (c *MemcachedClient) SetAsync(key string, value []byte, ttl time.Duration)

func (*MemcachedClient) SetMultiAsync

func (c *MemcachedClient) SetMultiAsync(data map[string][]byte, ttl time.Duration)

func (*MemcachedClient) Stop

func (c *MemcachedClient) Stop()

func (*MemcachedClient) Touch

func (c *MemcachedClient) Touch(ctx context.Context, key string, ttl time.Duration) error

type MemcachedClientConfig

type MemcachedClientConfig struct {
	// Addresses specifies the list of memcached addresses. The addresses get
	// resolved with the DNS provider.
	Addresses flagext.StringSliceCSV `yaml:"addresses"`

	// Timeout specifies the socket read/write timeout.
	Timeout time.Duration `yaml:"timeout"`

	// ConnectTimeout specifies the connection timeout.
	ConnectTimeout time.Duration `yaml:"connect_timeout"`

	// WriteBufferSizeBytes specifies the size of the write buffer (in bytes). The buffer
	// is allocated for each connection.
	WriteBufferSizeBytes int `yaml:"write_buffer_size_bytes" category:"experimental"`

	// ReadBufferSizeBytes specifies the size of the read buffer (in bytes). The buffer
	// is allocated for each connection.
	ReadBufferSizeBytes int `yaml:"read_buffer_size_bytes" category:"experimental"`

	// MinIdleConnectionsHeadroomPercentage specifies the minimum number of idle connections
	// to keep open as a percentage of the number of recently used idle connections.
	// If negative, idle connections are kept open indefinitely.
	MinIdleConnectionsHeadroomPercentage float64 `yaml:"min_idle_connections_headroom_percentage" category:"advanced"`

	// MaxIdleConnections specifies the maximum number of idle connections that
	// will be maintained per address. For better performances, this should be
	// set to a number higher than your peak parallel requests.
	MaxIdleConnections int `yaml:"max_idle_connections" category:"advanced"`

	// MaxAsyncConcurrency specifies the maximum number of SetAsync goroutines.
	MaxAsyncConcurrency int `yaml:"max_async_concurrency" category:"advanced"`

	// MaxAsyncBufferSize specifies the queue buffer size for SetAsync operations.
	MaxAsyncBufferSize int `yaml:"max_async_buffer_size" category:"advanced"`

	// MaxGetMultiConcurrency specifies the maximum number of concurrent GetMulti() operations.
	// If set to 0, concurrency is unlimited.
	MaxGetMultiConcurrency int `yaml:"max_get_multi_concurrency" category:"advanced"`

	// MaxGetMultiBatchSize specifies the maximum number of keys a single underlying
	// GetMulti() should run. If more keys are specified, internally keys are split
	// into multiple batches and fetched concurrently, honoring MaxGetMultiConcurrency parallelism.
	// If set to 0, the max batch size is unlimited.
	MaxGetMultiBatchSize int `yaml:"max_get_multi_batch_size" category:"advanced"`

	// MaxItemSize specifies the maximum size of an item stored in memcached, in bytes.
	// Items bigger than MaxItemSize are skipped. If set to 0, no maximum size is enforced.
	MaxItemSize int `yaml:"max_item_size" category:"advanced"`

	// TLSEnabled enables connecting to Memcached with TLS.
	TLSEnabled bool `yaml:"tls_enabled" category:"advanced"`

	// TLS to use to connect to the Memcached server.
	TLS dstls.ClientConfig `yaml:",inline"`
}

MemcachedClientConfig is the config accepted by RemoteCacheClient.

func (*MemcachedClientConfig) RegisterFlagsWithPrefix

func (c *MemcachedClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

func (*MemcachedClientConfig) Validate

func (c *MemcachedClientConfig) Validate() error

type MemcachedJumpHashSelector

type MemcachedJumpHashSelector struct {
	// contains filtered or unexported fields
}

MemcachedJumpHashSelector implements the memcache.ServerSelector interface, utilizing a jump hash to distribute keys to servers.

While adding or removing servers only requires 1/N keys to move, servers are treated as a stack and can only be pushed/popped. Therefore, MemcachedJumpHashSelector works best for servers with consistent DNS names where the naturally sorted order is predictable (ie. Kubernetes statefulsets).

func (*MemcachedJumpHashSelector) Each

func (s *MemcachedJumpHashSelector) Each(f func(net.Addr) error) error

Each iterates over each server and calls the given function. If f returns a non-nil error, iteration will stop and that error will be returned.

func (*MemcachedJumpHashSelector) PickServer

func (s *MemcachedJumpHashSelector) PickServer(key string) (net.Addr, error)

PickServer returns the server address that a given item should be shared onto.

func (*MemcachedJumpHashSelector) SetServers

func (s *MemcachedJumpHashSelector) SetServers(servers ...string) error

SetServers changes a MemcachedJumpHashSelector's set of servers at runtime and is safe for concurrent use by multiple goroutines.

Each server is given equal weight. A server is given more weight if it's listed multiple times.

SetServers returns an error if any of the server names fail to resolve. No attempt is made to connect to the server. If any error occurs, no changes are made to the internal server list.

To minimize the number of rehashes for keys when scaling the number of servers in subsequent calls to SetServers, servers are stored in natural sort order.

type MockCache

type MockCache struct {
	// contains filtered or unexported fields
}

func NewMockCache

func NewMockCache() *MockCache

func (*MockCache) Delete

func (m *MockCache) Delete(_ context.Context, key string) error

func (*MockCache) Fetch

func (m *MockCache) Fetch(_ context.Context, keys []string, _ ...Option) map[string][]byte

func (*MockCache) Flush

func (m *MockCache) Flush()

func (*MockCache) GetItems

func (m *MockCache) GetItems() map[string]Item

func (*MockCache) Name

func (m *MockCache) Name() string

func (*MockCache) StoreAsync

func (m *MockCache) StoreAsync(data map[string][]byte, ttl time.Duration)

type Option

type Option func(opts *Options)

Option is a callback used to modify the Options that a particular client method uses.

func WithAllocator

func WithAllocator(alloc Allocator) Option

WithAllocator creates a new Option that makes use of a specific memory Allocator for cache result values.

type Options

type Options struct {
	Alloc Allocator
}

Options are used to modify the behavior of an individual call to get results from a cache backend. They are constructed by applying Option callbacks passed to a client method to a default Options instance.

type RedisClient

type RedisClient struct {
	// contains filtered or unexported fields
}

func NewRedisClient

func NewRedisClient(logger log.Logger, name string, config RedisClientConfig, reg prometheus.Registerer) (*RedisClient, error)

NewRedisClient makes a new RedisClient.

func (*RedisClient) Delete

func (c *RedisClient) Delete(ctx context.Context, key string) error

Delete implement RemoteCacheClient.

func (*RedisClient) GetMulti

func (c *RedisClient) GetMulti(ctx context.Context, keys []string, _ ...Option) map[string][]byte

GetMulti implements RemoteCacheClient.

func (*RedisClient) Name

func (c *RedisClient) Name() string

func (*RedisClient) SetAsync

func (c *RedisClient) SetAsync(key string, value []byte, ttl time.Duration)

SetAsync implements RemoteCacheClient.

func (*RedisClient) SetMultiAsync

func (c *RedisClient) SetMultiAsync(data map[string][]byte, ttl time.Duration)

SetMultiAsync implements RemoteCacheClient.

func (*RedisClient) Stop

func (c *RedisClient) Stop()

Stop implement RemoteCacheClient.

type RedisClientConfig

type RedisClientConfig struct {
	// Endpoint specifies the endpoint of Redis server.
	Endpoint flagext.StringSliceCSV `yaml:"endpoint"`

	// Use the specified Username to authenticate the current connection
	// with one of the connections defined in the ACL list when connecting
	// to a Redis 6.0 instance, or greater, that is using the Redis ACL system.
	Username string `yaml:"username"`

	// Optional password. Must match the password specified in the
	// requirepass server configuration option (if connecting to a Redis 5.0 instance, or lower),
	// or the User Password when connecting to a Redis 6.0 instance, or greater,
	// that is using the Redis ACL system.
	Password flagext.Secret `yaml:"password"`

	// DB Database to be selected after connecting to the server.
	DB int `yaml:"db"`

	// MasterName is Redis Sentinel master name. An empty string for Redis Server or Redis Cluster.
	MasterName string `yaml:"master_name" category:"advanced"`

	// DialTimeout specifies the client dial timeout.
	DialTimeout time.Duration `yaml:"dial_timeout" category:"advanced"`

	// ReadTimeout specifies the client read timeout.
	ReadTimeout time.Duration `yaml:"read_timeout" category:"advanced"`

	// WriteTimeout specifies the client write timeout.
	WriteTimeout time.Duration `yaml:"write_timeout" category:"advanced"`

	// Maximum number of socket connections.
	ConnectionPoolSize int `yaml:"connection_pool_size" category:"advanced"`

	// Amount of time client waits for connection if all connections
	// are busy before returning an error.
	// Default is ReadTimeout + 1 second.
	ConnectionPoolTimeout time.Duration `yaml:"connection_pool_timeout" category:"advanced"`

	// MinIdleConnections specifies the minimum number of idle connections which is useful when establishing
	// new connection is slow.
	MinIdleConnections int `yaml:"min_idle_connections" category:"advanced"`

	// Amount of time after which client closes idle connections.
	// Should be less than server's timeout.
	// -1 disables idle timeout check.
	IdleTimeout time.Duration `yaml:"idle_timeout" category:"advanced"`

	// MaxConnectionAge is connection age at which client retires (closes) the connection.
	// Default 0 is to not close aged connections.
	MaxConnectionAge time.Duration `yaml:"max_connection_age" category:"advanced"`

	// MaxItemSize specifies the maximum size of an item stored in Redis.
	// Items bigger than MaxItemSize are skipped.
	// If set to 0, no maximum size is enforced.
	MaxItemSize int `yaml:"max_item_size" category:"advanced"`

	// MaxAsyncConcurrency specifies the maximum number of SetAsync goroutines.
	MaxAsyncConcurrency int `yaml:"max_async_concurrency" category:"advanced"`

	// MaxAsyncBufferSize specifies the queue buffer size for SetAsync operations.
	MaxAsyncBufferSize int `yaml:"max_async_buffer_size" category:"advanced"`

	// MaxGetMultiConcurrency specifies the maximum number of concurrent GetMulti() operations.
	// If set to 0, concurrency is unlimited.
	MaxGetMultiConcurrency int `yaml:"max_get_multi_concurrency" category:"advanced"`

	// MaxGetMultiBatchSize specifies the maximum size per batch for mget.
	MaxGetMultiBatchSize int `yaml:"max_get_multi_batch_size" category:"advanced"`

	// TLSEnabled enable TLS for Redis connection.
	TLSEnabled bool `yaml:"tls_enabled" category:"advanced"`

	// TLS to use to connect to the Redis server.
	TLS dstls.ClientConfig `yaml:",inline"`
}

RedisClientConfig is the config accepted by RedisClient.

func (*RedisClientConfig) RegisterFlagsWithPrefix

func (c *RedisClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet

func (*RedisClientConfig) Validate

func (c *RedisClientConfig) Validate() error

type RemoteCacheAdapter

type RemoteCacheAdapter struct {
	// contains filtered or unexported fields
}

RemoteCacheAdapter is an implementation of the Cache interface that wraps RemoteCacheClient instances. This adapter will eventually be removed when those interfaces are unified.

func NewRemoteCacheAdapter

func NewRemoteCacheAdapter(remoteClient RemoteCacheClient) *RemoteCacheAdapter

func (*RemoteCacheAdapter) Delete

func (c *RemoteCacheAdapter) Delete(ctx context.Context, key string) error

func (*RemoteCacheAdapter) Fetch

func (c *RemoteCacheAdapter) Fetch(ctx context.Context, keys []string, opts ...Option) map[string][]byte

func (*RemoteCacheAdapter) Name

func (c *RemoteCacheAdapter) Name() string

func (*RemoteCacheAdapter) StoreAsync

func (c *RemoteCacheAdapter) StoreAsync(data map[string][]byte, ttl time.Duration)

type RemoteCacheClient

type RemoteCacheClient interface {
	// GetMulti fetches multiple keys at once from a cache. In case of error,
	// an empty map is returned and the error tracked/logged. One or more Option
	// instances may be passed to modify the behavior of this GetMulti call.
	GetMulti(ctx context.Context, keys []string, opts ...Option) map[string][]byte

	// SetAsync enqueues an asynchronous operation to store a key into a cache.
	// In case the underlying async operation fails, the error will be tracked/logged.
	SetAsync(key string, value []byte, ttl time.Duration)

	// SetMultiAsync enqueues asynchronous operations to store a keys and values
	// into a cache. In case the underlying async operations fail, the error will
	// be tracked/logged.
	SetMultiAsync(data map[string][]byte, ttl time.Duration)

	// Delete deletes a key from a cache.
	// This is a synchronous operation. If an asynchronous set operation for key is still
	// pending to be processed, it will wait for it to complete before performing deletion.
	Delete(ctx context.Context, key string) error

	// Stop client and release underlying resources.
	Stop()

	// Name returns the name of this particular cache instance.
	Name() string
}

RemoteCacheClient is a high level client to interact with remote cache.

type SpanlessTracingCache

type SpanlessTracingCache struct {
	// contains filtered or unexported fields
}

SpanlessTracingCache wraps a Cache and logs Fetch operation in the parent spans.

func (SpanlessTracingCache) Delete

func (t SpanlessTracingCache) Delete(ctx context.Context, key string) error

func (SpanlessTracingCache) Fetch

func (t SpanlessTracingCache) Fetch(ctx context.Context, keys []string, opts ...Option) (result map[string][]byte)

func (SpanlessTracingCache) Name

func (t SpanlessTracingCache) Name() string

func (SpanlessTracingCache) StoreAsync

func (t SpanlessTracingCache) StoreAsync(data map[string][]byte, ttl time.Duration)

type Versioned

type Versioned struct {
	// contains filtered or unexported fields
}

Versioned cache adds a version prefix to the keys. This allows cache keys to be changed in a newer version of the code (after a bugfix or a cached data format change).

func NewVersioned

func NewVersioned(c Cache, version uint) Versioned

NewVersioned creates a new Versioned cache.

func (Versioned) Delete

func (c Versioned) Delete(ctx context.Context, key string) error

func (Versioned) Fetch

func (c Versioned) Fetch(ctx context.Context, keys []string, opts ...Option) map[string][]byte

func (Versioned) Name

func (c Versioned) Name() string

func (Versioned) StoreAsync

func (c Versioned) StoreAsync(data map[string][]byte, ttl time.Duration)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL