store

package
v0.0.50 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RedisLivePrefix = "streamdal_live"
	RedisLiveFormat = "streamdal_live:%s:%s:%s" // K: $session_id:$node_name:$audience

	RedisAudiencePrefix    = "streamdal_audience"
	RedisAudienceKeyFormat = "streamdal_audience:%s" // K: $audience V: SetPipelineConfig JSON

	RedisAudienceCreatedByPrefix = "streamdal_audience_created_by"
	RedisAudienceCreatedByFormat = "streamdal_audience_created_by:%s:%s" // K: $audience V: None

	// RedisRegisterFormat key resides in the RedisLivePrefix
	RedisRegisterFormat = "streamdal_live:%s:%s:register" // K: $session_id:$node_name:register; V: NONE

	RedisPipelinePrefix    = "streamdal_pipeline"
	RedisPipelineKeyFormat = "streamdal_pipeline:%s" // K: $pipeline_id V: serialized protos.Pipeline

	RedisNotificationConfigPrefix    = "streamdal_notification_config"
	RedisNotificationConfigKeyFormat = "streamdal_notification_config:%s" // K: $config_id V: serialized protos.NotificationConfig

	RedisNotificationAssocPrefix = "streamdal_notification_assoc"
	RedisNotificationAssocFormat = "streamdal_notification_assoc:%s:%s" // K: $pipeline_id:$config_id V: NONE

	RedisSchemaPrefix = "streamdal_schema"
	RedisSchemaFormat = "streamdal_schema:%s" // K: $audience V: serialized protos.Schema

	RedisActiveTailPrefix    = "streamdal_tail"
	RedisActiveTailKeyFormat = "streamdal_tail:%s"        // K: $service_name:$tail_request_id V: serialized protos.TailRequest
	RedisPausedTailKeyFormat = "streamdal_tail_paused:%s" // K: $service_name:$tail_request_id:paused V: serialized protos.TailRequest

	// RedisActiveTailTTL is the TTL for the active tail key. While this key
	// should be automatically cleaned up when the frontend stops a Tail() request,
	// this TTL is a safety mechanism to ensure we do not leave orphaned tails.
	RedisActiveTailTTL = 10 * time.Second

	RedisTelemetryRegistrationPrefix = "streamdal_telemetry:registrations"
	RedisTelemetryRegistrationFormat = "streamdal_telemetry:registrations:%x"

	RedisTelemetryAudiencePrefix = "streamdal_telemetry:audience"
	RedisTelemetryAudienceFormat = "streamdal_telemetry:audience:%x"

	RedisWasmPrefix    = "streamdal_wasm"
	RedisWasmKeyFormat = "streamdal_wasm:%s:%s" // k: streamdal_wasm:$wasm-name:$wasm-id v: serialized protos.Wasm
)
View Source
const (
	// RedisKeyWatchPrefix is the key under which redis publishes key events.
	// The format is  __keyspace@{$database_number}__
	// We're always defaulting to db 0, so we can use this prefix to watch for key changes
	// See https://redis.io/docs/manual/keyspace-notifications/
	RedisKeyWatchPrefix = "__keyspace@0__:"

	// InstallIDKey is a unique ID for this streamdal server cluster
	// Each cluster will get a unique UUID. This is used to track the number of
	// installs for telemetry and is completely random for anonymization purposes.
	InstallIDKey = "streamdal_install_id"

	RedisCreationDateKey = "streamdal_settings:creation_date"
)

Variables

View Source
var (
	ErrPipelineNotFound = errors.New("pipeline not found")
	ErrConfigNotFound   = errors.New("config not found")
	ErrMustExist        = errors.New("object does not exist and MustExist is set")
	ErrNoOverwrite      = errors.New("object exists and NoOverwrite is set")
)
View Source
var (
	ErrWasmNotFound = errors.New("wasm not found")
)

Functions

func RedisActiveTailKey

func RedisActiveTailKey(tailID string) string

func RedisAudienceCreatedByKey added in v0.0.45

func RedisAudienceCreatedByKey(audience, createdBy string) string

func RedisAudienceKey

func RedisAudienceKey(audience string) string

func RedisLiveKey

func RedisLiveKey(session, node, audience string) string

func RedisNotificationAssocKey

func RedisNotificationAssocKey(pipelineID, configID string) string

func RedisNotificationConfigKey

func RedisNotificationConfigKey(configID string) string

func RedisPausedTailKey

func RedisPausedTailKey(tailID string) string

func RedisPipelineKey

func RedisPipelineKey(pipelineID string) string

func RedisRegisterKey

func RedisRegisterKey(session, node string) string

func RedisSchemaKey

func RedisSchemaKey(audience string) string

func RedisTelemetryAudience

func RedisTelemetryAudience(aud *protos.Audience) string

func RedisTelemetryRegistrationKey

func RedisTelemetryRegistrationKey(serviceName, os, sdk, arch string) string

func RedisWasmKey added in v0.0.41

func RedisWasmKey(wasmName, wasmID string) string

Types

type IStore

type IStore interface {
	AddRegistration(ctx context.Context, req *protos.RegisterRequest) error
	DeleteRegistration(ctx context.Context, req *protos.DeregisterRequest) error
	RecordRegistration(ctx context.Context, req *protos.RegisterRequest) error
	SeenRegistration(ctx context.Context, req *protos.RegisterRequest) bool
	GetPipelines(ctx context.Context) (map[string]*protos.Pipeline, error)
	GetPipeline(ctx context.Context, pipelineID string) (*protos.Pipeline, error)
	GetAudienceMappings(ctx context.Context) (map[*protos.Audience]*protos.PipelineConfigs, error)
	GetPipelineConfigsByAudience(ctx context.Context, aud *protos.Audience) (*protos.PipelineConfigs, error)
	GetLive(ctx context.Context) ([]*types.LiveEntry, error)
	CreatePipeline(ctx context.Context, pipeline *protos.Pipeline) error
	AddAudience(ctx context.Context, req *protos.NewAudienceRequest) error

	// CreateAudience creates a new audience in the store. This method differs
	// from AddAudience() in that it does not create any live entry.
	CreateAudience(ctx context.Context, aud *protos.Audience) error
	DeleteAudience(ctx context.Context, req *protos.DeleteAudienceRequest) error
	DeletePipeline(ctx context.Context, pipelineID string) error
	UpdatePipeline(ctx context.Context, pipeline *protos.Pipeline) error
	SetPauseResume(ctx context.Context, audience *protos.Audience, pipelineID string, pause bool) (bool, error)
	GetAudiences(ctx context.Context) ([]*protos.Audience, error)
	GetNotificationConfig(ctx context.Context, req *protos.GetNotificationRequest) (*protos.NotificationConfig, error)
	GetNotificationConfigs(ctx context.Context) (map[string]*protos.NotificationConfig, error)
	CreateNotificationConfig(ctx context.Context, req *protos.CreateNotificationRequest) error
	UpdateNotificationConfig(ctx context.Context, req *protos.UpdateNotificationRequest) error
	DeleteNotificationConfig(ctx context.Context, req *protos.DeleteNotificationRequest) error
	GetPipelineUsage(ctx context.Context) ([]*PipelineUsage, error)
	GetActivePipelineUsage(ctx context.Context, pipelineID string) ([]*PipelineUsage, error)
	GetActiveTailCommandsByService(ctx context.Context, serviceName string) ([]*protos.Command, error)
	AddActiveTailRequest(ctx context.Context, req *protos.TailRequest) (string, error) // Returns key that tail req is stored under

	// GetAudiencesBySessionID returns all audiences for a given session id
	// This is needed to inject an inferschema pipeline for each announced audience
	// to the session via a goroutine in internal.Register()
	GetAudiencesBySessionID(ctx context.Context, sessionID string) ([]*protos.Audience, error)

	GetAudiencesByService(ctx context.Context, serviceName string) ([]*protos.Audience, error)

	// WatchKeys will watch for key changes for given key pattern; every time key
	// is updated, it will send a message on the return channel.
	// WatchKeys launches a dedicated goroutine for the watch - it can be stopped
	// via the provided context.
	WatchKeys(quitCtx context.Context, key string) chan struct{}

	AddSchema(ctx context.Context, req *protos.SendSchemaRequest) error

	GetSchema(ctx context.Context, aud *protos.Audience) (*protos.Schema, error)

	// GetInstallID returns the unique ID for this server cluster.
	// If an ID has not been set yet, a new one is generated and returned
	GetInstallID(ctx context.Context) (string, error)

	// GetCreationDate returns the creation date of this server cluster. This is used
	// for sending server_timestamp_created_seconds metric to telemetry
	GetCreationDate(ctx context.Context) (int64, error)

	// SetCreationDate sets the creation date of this server cluster
	SetCreationDate(ctx context.Context, ts int64) error

	// IsPipelineAttachedAny returns if pipeline is attached to any audience. Used for telemetry tags
	IsPipelineAttachedAny(ctx context.Context, pipelineID string) bool

	// PauseTailRequest pauses a tail request by its ID
	PauseTailRequest(ctx context.Context, req *protos.PauseTailRequest) (*protos.TailRequest, error)

	// ResumeTailRequest resumes a tail request by its ID
	ResumeTailRequest(ctx context.Context, req *protos.ResumeTailRequest) (*protos.TailRequest, error)

	// GetTailRequestById returns a TailRequest by its ID
	GetTailRequestById(ctx context.Context, tailID string) (*protos.TailRequest, error)

	// SetPipelines saves pipelines as SetPipelinesConfig json to $audience key in store
	SetPipelines(ctx context.Context, req *protos.SetPipelinesRequest) error

	// GetSetPipelinesCommandsByService returns a slice of SetPipelines commands for a given service
	GetSetPipelinesCommandsByService(ctx context.Context, serviceName string) ([]*protos.Command, error)

	// GetSessionIDs returns a slice of ALL known session ids;
	// accepts an optional "node name" to filter by.
	GetSessionIDs(ctx context.Context, nodeName ...string) ([]string, error)

	// GetSessionIDsByAudience returns a slice of session IDs for a given audience;
	// accepts an optional "node name" to filter by.
	GetSessionIDsByAudience(ctx context.Context, aud *protos.Audience, nodeName ...string) ([]string, error)

	// GetSessionIDsByPipelineID returns a slice of session IDs that use a pipeline ID
	GetSessionIDsByPipelineID(ctx context.Context, pipelineID string) ([]string, error)

	// GetPipelinesByAudience will fetch pipeline configs and then use the ID to
	// fetch the actual pipeline from store + update the paused state.
	GetPipelinesByAudience(ctx context.Context, aud *protos.Audience) ([]*protos.Pipeline, error)

	// DeletePipelineConfig deletes a pipeline config for all audiences that use
	// given pipeline ID; it will return a list of audiences that the pipeline id
	// was used by.
	DeletePipelineConfig(ctx context.Context, pipelineID string) ([]*protos.Audience, error)

	// GetAudiencesByPipelineID returns a slice of audiences that use a pipeline ID
	GetAudiencesByPipelineID(ctx context.Context, pipelineID string) ([]*protos.Audience, error)

	// BEGIN Wasm-related methods
	GetAllWasm(ctx context.Context) ([]*shared.WasmModule, error)
	GetWasm(ctx context.Context, name, id string) (*shared.WasmModule, error)
	GetWasmByID(ctx context.Context, id string) (*shared.WasmModule, error)
	GetWasmByName(ctx context.Context, name string) (*shared.WasmModule, error)
	SetWasm(ctx context.Context, id, name string, wasm *shared.WasmModule) error
	SetWasmByName(ctx context.Context, name string, wasm *shared.WasmModule) error
	SetWasmByID(ctx context.Context, id string, wasm *shared.WasmModule) error
	DeleteWasm(ctx context.Context, id, name string) error
	DeleteWasmByID(ctx context.Context, id string) error
	DeleteWasmByName(ctx context.Context, name string) error

	// GetPipelinesByWasmID returns a slice of pipelines that use a given Wasm ID
	GetPipelinesByWasmID(ctx context.Context, wasmID string) ([]*protos.Pipeline, error)
}

type Option added in v0.0.41

type Option struct {
	// When set, the store will NOT overwrite an existing object. Applies to write().
	NoOverwrite bool

	// When set, the store will ensure that the object exists before performing
	// the operation. Applies to write() and delete(). NOTE: MustExist
	// and MustNotExist are mutually exclusive.
	MustExist bool

	// Optional TTL to set on the key. Applies to write().
	TTL time.Duration
}

Option contains settings that can influence read, write or delete operations.

NOTE: Redis transactions do NOT function like "traditional" txns as there is no concept of a "rollback" or "commit". Txns in redis are essentially grouped commands that are executed atomically. Due to this, Options currently does not support TXNs and there is a _small_ potential for races.

See: https://redis.io/docs/interact/transactions/ See: https://redis.com/blog/you-dont-need-transaction-rollbacks-in-redis/

type Options

type Options struct {
	Encryption   encryption.IEncryption
	RedisBackend *redis.Client
	ShutdownCtx  context.Context
	NodeName     string
	SessionTTL   time.Duration
	Telemetry    statsd.Statter
	InstallID    string
}

type PipelineUsage

type PipelineUsage struct {
	PipelineId string
	Active     bool
	NodeName   string
	SessionId  string
	Audience   *protos.Audience
}

type Store

type Store struct {
	// contains filtered or unexported fields
}

func New

func New(opts *Options) (*Store, error)

func (*Store) AddActiveTailRequest

func (s *Store) AddActiveTailRequest(ctx context.Context, req *protos.TailRequest) (string, error)

func (*Store) AddAudience

func (s *Store) AddAudience(ctx context.Context, req *protos.NewAudienceRequest) error

func (*Store) AddRegistration

func (s *Store) AddRegistration(ctx context.Context, req *protos.RegisterRequest) error

func (*Store) AddSchema

func (s *Store) AddSchema(ctx context.Context, req *protos.SendSchemaRequest) error

func (*Store) CreateAudience added in v0.0.32

func (s *Store) CreateAudience(ctx context.Context, aud *protos.Audience) error

func (*Store) CreateNotificationConfig

func (s *Store) CreateNotificationConfig(ctx context.Context, req *protos.CreateNotificationRequest) error

func (*Store) CreatePipeline

func (s *Store) CreatePipeline(ctx context.Context, pipeline *protos.Pipeline) error

func (*Store) DeleteAudience

func (s *Store) DeleteAudience(ctx context.Context, req *protos.DeleteAudienceRequest) error

func (*Store) DeleteNotificationConfig

func (s *Store) DeleteNotificationConfig(ctx context.Context, req *protos.DeleteNotificationRequest) error

func (*Store) DeletePipeline

func (s *Store) DeletePipeline(ctx context.Context, pipelineId string) error

func (*Store) DeletePipelineConfig added in v0.0.23

func (s *Store) DeletePipelineConfig(ctx context.Context, pipelineID string) ([]*protos.Audience, error)

func (*Store) DeleteRegistration

func (s *Store) DeleteRegistration(ctx context.Context, req *protos.DeregisterRequest) error

func (*Store) DeleteWasm added in v0.0.41

func (s *Store) DeleteWasm(ctx context.Context, name, id string) error

DeleteWasm will remove a Wasm entry from the store by name and ID.

func (*Store) DeleteWasmByID added in v0.0.41

func (s *Store) DeleteWasmByID(ctx context.Context, id string) error

DeleteWasmByID will remove an EXISTING Wasm entry by ID. This method expects for a Wasm object with the given ID to exist. If it does not, the delete will error.

func (*Store) DeleteWasmByName added in v0.0.41

func (s *Store) DeleteWasmByName(ctx context.Context, name string) error

DeleteWasmByName will remove an EXISTING Wasm entry by Name. This method expects for a Wasm object with the given Name to exist. If it does not, the delete will error.

func (*Store) GetActivePipelineUsage

func (s *Store) GetActivePipelineUsage(ctx context.Context, pipelineID string) ([]*PipelineUsage, error)

GetActivePipelineUsage gets *ACTIVE* pipeline usage on the CURRENT node NOTE: This method is a helper for GetPipelineUsage().

func (*Store) GetActiveTailCommandsByService

func (s *Store) GetActiveTailCommandsByService(ctx context.Context, serviceName string) ([]*protos.Command, error)

func (*Store) GetAllWasm added in v0.0.41

func (s *Store) GetAllWasm(ctx context.Context) ([]*shared.WasmModule, error)

func (*Store) GetAudienceMappings added in v0.0.44

func (s *Store) GetAudienceMappings(ctx context.Context) (map[*protos.Audience]*protos.PipelineConfigs, error)

GetAudienceMappings returns all audience -> *protos.PipelineConfigs assignments

func (*Store) GetAudiences

func (s *Store) GetAudiences(ctx context.Context) ([]*protos.Audience, error)

func (*Store) GetAudiencesByPipelineID added in v0.0.23

func (s *Store) GetAudiencesByPipelineID(ctx context.Context, pipelineID string) ([]*protos.Audience, error)

func (*Store) GetAudiencesByService

func (s *Store) GetAudiencesByService(ctx context.Context, serviceName string) ([]*protos.Audience, error)

func (*Store) GetAudiencesBySessionID

func (s *Store) GetAudiencesBySessionID(ctx context.Context, sessionID string) ([]*protos.Audience, error)

func (*Store) GetCreationDate

func (s *Store) GetCreationDate(ctx context.Context) (int64, error)

func (*Store) GetInstallID

func (s *Store) GetInstallID(ctx context.Context) (string, error)

func (*Store) GetLive

func (s *Store) GetLive(ctx context.Context) ([]*types.LiveEntry, error)

func (*Store) GetNotificationConfig

func (s *Store) GetNotificationConfig(ctx context.Context, req *protos.GetNotificationRequest) (*protos.NotificationConfig, error)

func (*Store) GetNotificationConfigs

func (s *Store) GetNotificationConfigs(ctx context.Context) (map[string]*protos.NotificationConfig, error)

func (*Store) GetPausedTailRequestById

func (s *Store) GetPausedTailRequestById(ctx context.Context, tailID string) (*protos.TailRequest, error)

func (*Store) GetPipeline

func (s *Store) GetPipeline(ctx context.Context, pipelineId string) (*protos.Pipeline, error)

func (*Store) GetPipelineConfigsByAudience added in v0.0.17

func (s *Store) GetPipelineConfigsByAudience(ctx context.Context, aud *protos.Audience) (*protos.PipelineConfigs, error)

GetPipelineConfigsByAudience returns *protos.PipelineConfigs for a given audience TODO: Need tests

func (*Store) GetPipelineUsage

func (s *Store) GetPipelineUsage(ctx context.Context) ([]*PipelineUsage, error)

GetPipelineUsage gets usage across the entire cluster

func (*Store) GetPipelines

func (s *Store) GetPipelines(ctx context.Context) (map[string]*protos.Pipeline, error)

func (*Store) GetPipelinesByAudience added in v0.0.17

func (s *Store) GetPipelinesByAudience(ctx context.Context, aud *protos.Audience) ([]*protos.Pipeline, error)

func (*Store) GetPipelinesByWasmID added in v0.0.41

func (s *Store) GetPipelinesByWasmID(ctx context.Context, wasmID string) ([]*protos.Pipeline, error)

func (*Store) GetSchema

func (s *Store) GetSchema(ctx context.Context, aud *protos.Audience) (*protos.Schema, error)

func (*Store) GetSessionIDs

func (s *Store) GetSessionIDs(ctx context.Context, nodeName ...string) ([]string, error)

GetSessionIDs will return ALL live session IDs; optionally filter by node name

func (*Store) GetSessionIDsByAudience

func (s *Store) GetSessionIDsByAudience(ctx context.Context, aud *protos.Audience, nodeName ...string) ([]string, error)

TODO: Needs tests

func (*Store) GetSessionIDsByPipelineID

func (s *Store) GetSessionIDsByPipelineID(ctx context.Context, pipelineID string) ([]string, error)

func (*Store) GetSetPipelinesCommandsByService

func (s *Store) GetSetPipelinesCommandsByService(ctx context.Context, serviceName string) ([]*protos.Command, error)

func (*Store) GetTailRequestById

func (s *Store) GetTailRequestById(ctx context.Context, tailID string) (*protos.TailRequest, error)

func (*Store) GetWasm added in v0.0.41

func (s *Store) GetWasm(ctx context.Context, name, id string) (*shared.WasmModule, error)

GetWasm will fetch Wasm from the store by name and ID

func (*Store) GetWasmByID added in v0.0.41

func (s *Store) GetWasmByID(ctx context.Context, id string) (*shared.WasmModule, error)

GetWasmByID will fetch Wasm from the store by ID (regardless of 'name')

func (*Store) GetWasmByName added in v0.0.41

func (s *Store) GetWasmByName(ctx context.Context, name string) (*shared.WasmModule, error)

GetWasmByName will fetch Wasm from the store by name (regardless of 'id')

func (*Store) IsPipelineAttachedAny

func (s *Store) IsPipelineAttachedAny(ctx context.Context, pipelineID string) bool

func (*Store) PauseTailRequest

func (s *Store) PauseTailRequest(ctx context.Context, req *protos.PauseTailRequest) (*protos.TailRequest, error)

func (*Store) RecordRegistration

func (s *Store) RecordRegistration(ctx context.Context, req *protos.RegisterRequest) error

func (*Store) ResumeTailRequest

func (s *Store) ResumeTailRequest(ctx context.Context, req *protos.ResumeTailRequest) (*protos.TailRequest, error)

func (*Store) SeenRegistration

func (s *Store) SeenRegistration(ctx context.Context, req *protos.RegisterRequest) bool

func (*Store) SetCreationDate

func (s *Store) SetCreationDate(ctx context.Context, ts int64) error

func (*Store) SetPauseResume

func (s *Store) SetPauseResume(ctx context.Context, audience *protos.Audience, pipelineID string, paused bool) (bool, error)

SetPauseResume sets pipeline pause status

func (*Store) SetPipelines

func (s *Store) SetPipelines(ctx context.Context, req *protos.SetPipelinesRequest) error

func (*Store) SetWasm added in v0.0.41

func (s *Store) SetWasm(ctx context.Context, name, id string, wasm *shared.WasmModule) error

SetWasm will store Wasm in the store by name and ID; it will overwrite an existing entry (if it exists).

func (*Store) SetWasmByID added in v0.0.41

func (s *Store) SetWasmByID(ctx context.Context, id string, wasm *shared.WasmModule) error

SetWasmByID will overwrite an EXISTING Wasm entry by ID. This method requires for an existing Wasm object to exist in redi and will use the discovered 'id' to overwrite the entry.

func (*Store) SetWasmByName added in v0.0.41

func (s *Store) SetWasmByName(ctx context.Context, name string, wasm *shared.WasmModule) error

SetWasmByName will overwrite an EXISTING Wasm entry by name. This method requires for an existing Wasm object to exist in redis and will use the discovered 'name' to overwrite the entry.

func (*Store) UpdateNotificationConfig

func (s *Store) UpdateNotificationConfig(ctx context.Context, req *protos.UpdateNotificationRequest) error

func (*Store) UpdatePipeline

func (s *Store) UpdatePipeline(ctx context.Context, pipeline *protos.Pipeline) error

func (*Store) WatchKeys

func (s *Store) WatchKeys(quitCtx context.Context, key string) chan struct{}

WatchKeys will watch for key changes for given key pattern; every time key is updated, it will send a message on the return channel.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL