Documentation ¶
Index ¶
- Constants
- Variables
- func RedisActiveTailKey(tailID string) string
- func RedisAudienceCreatedByKey(audience, createdBy string) string
- func RedisAudienceKey(audience string) string
- func RedisLiveKey(session, node, audience string) string
- func RedisNotificationAssocKey(pipelineID, configID string) string
- func RedisNotificationConfigKey(configID string) string
- func RedisPausedTailKey(tailID string) string
- func RedisPipelineKey(pipelineID string) string
- func RedisRegisterKey(session, node string) string
- func RedisSchemaKey(audience string) string
- func RedisTelemetryAudience(aud *protos.Audience) string
- func RedisTelemetryRegistrationKey(serviceName, os, sdk, arch string) string
- func RedisWasmKey(wasmName, wasmID string) string
- type IStore
- type Option
- type Options
- type PipelineUsage
- type Store
- func (s *Store) AddActiveTailRequest(ctx context.Context, req *protos.TailRequest) (string, error)
- func (s *Store) AddAudience(ctx context.Context, req *protos.NewAudienceRequest) error
- func (s *Store) AddRegistration(ctx context.Context, req *protos.RegisterRequest) error
- func (s *Store) AddSchema(ctx context.Context, req *protos.SendSchemaRequest) error
- func (s *Store) CreateAudience(ctx context.Context, aud *protos.Audience) error
- func (s *Store) CreateNotificationConfig(ctx context.Context, req *protos.CreateNotificationRequest) error
- func (s *Store) CreatePipeline(ctx context.Context, pipeline *protos.Pipeline) error
- func (s *Store) DeleteAudience(ctx context.Context, req *protos.DeleteAudienceRequest) error
- func (s *Store) DeleteNotificationConfig(ctx context.Context, req *protos.DeleteNotificationRequest) error
- func (s *Store) DeletePipeline(ctx context.Context, pipelineId string) error
- func (s *Store) DeletePipelineConfig(ctx context.Context, pipelineID string) ([]*protos.Audience, error)
- func (s *Store) DeleteRegistration(ctx context.Context, req *protos.DeregisterRequest) error
- func (s *Store) DeleteWasm(ctx context.Context, name, id string) error
- func (s *Store) DeleteWasmByID(ctx context.Context, id string) error
- func (s *Store) DeleteWasmByName(ctx context.Context, name string) error
- func (s *Store) GetActivePipelineUsage(ctx context.Context, pipelineID string) ([]*PipelineUsage, error)
- func (s *Store) GetActiveTailCommandsByService(ctx context.Context, serviceName string) ([]*protos.Command, error)
- func (s *Store) GetAllWasm(ctx context.Context) ([]*shared.WasmModule, error)
- func (s *Store) GetAudienceMappings(ctx context.Context) (map[*protos.Audience]*protos.PipelineConfigs, error)
- func (s *Store) GetAudiences(ctx context.Context) ([]*protos.Audience, error)
- func (s *Store) GetAudiencesByPipelineID(ctx context.Context, pipelineID string) ([]*protos.Audience, error)
- func (s *Store) GetAudiencesByService(ctx context.Context, serviceName string) ([]*protos.Audience, error)
- func (s *Store) GetAudiencesBySessionID(ctx context.Context, sessionID string) ([]*protos.Audience, error)
- func (s *Store) GetCreationDate(ctx context.Context) (int64, error)
- func (s *Store) GetInstallID(ctx context.Context) (string, error)
- func (s *Store) GetLive(ctx context.Context) ([]*types.LiveEntry, error)
- func (s *Store) GetNotificationConfig(ctx context.Context, req *protos.GetNotificationRequest) (*protos.NotificationConfig, error)
- func (s *Store) GetNotificationConfigs(ctx context.Context) (map[string]*protos.NotificationConfig, error)
- func (s *Store) GetPausedTailRequestById(ctx context.Context, tailID string) (*protos.TailRequest, error)
- func (s *Store) GetPipeline(ctx context.Context, pipelineId string) (*protos.Pipeline, error)
- func (s *Store) GetPipelineConfigsByAudience(ctx context.Context, aud *protos.Audience) (*protos.PipelineConfigs, error)
- func (s *Store) GetPipelineUsage(ctx context.Context) ([]*PipelineUsage, error)
- func (s *Store) GetPipelines(ctx context.Context) (map[string]*protos.Pipeline, error)
- func (s *Store) GetPipelinesByAudience(ctx context.Context, aud *protos.Audience) ([]*protos.Pipeline, error)
- func (s *Store) GetPipelinesByWasmID(ctx context.Context, wasmID string) ([]*protos.Pipeline, error)
- func (s *Store) GetSchema(ctx context.Context, aud *protos.Audience) (*protos.Schema, error)
- func (s *Store) GetSessionIDs(ctx context.Context, nodeName ...string) ([]string, error)
- func (s *Store) GetSessionIDsByAudience(ctx context.Context, aud *protos.Audience, nodeName ...string) ([]string, error)
- func (s *Store) GetSessionIDsByPipelineID(ctx context.Context, pipelineID string) ([]string, error)
- func (s *Store) GetSetPipelinesCommandsByService(ctx context.Context, serviceName string) ([]*protos.Command, error)
- func (s *Store) GetTailRequestById(ctx context.Context, tailID string) (*protos.TailRequest, error)
- func (s *Store) GetWasm(ctx context.Context, name, id string) (*shared.WasmModule, error)
- func (s *Store) GetWasmByID(ctx context.Context, id string) (*shared.WasmModule, error)
- func (s *Store) GetWasmByName(ctx context.Context, name string) (*shared.WasmModule, error)
- func (s *Store) IsPipelineAttachedAny(ctx context.Context, pipelineID string) bool
- func (s *Store) PauseTailRequest(ctx context.Context, req *protos.PauseTailRequest) (*protos.TailRequest, error)
- func (s *Store) RecordRegistration(ctx context.Context, req *protos.RegisterRequest) error
- func (s *Store) ResumeTailRequest(ctx context.Context, req *protos.ResumeTailRequest) (*protos.TailRequest, error)
- func (s *Store) SeenRegistration(ctx context.Context, req *protos.RegisterRequest) bool
- func (s *Store) SetCreationDate(ctx context.Context, ts int64) error
- func (s *Store) SetPauseResume(ctx context.Context, audience *protos.Audience, pipelineID string, paused bool) (bool, error)
- func (s *Store) SetPipelines(ctx context.Context, req *protos.SetPipelinesRequest) error
- func (s *Store) SetWasm(ctx context.Context, name, id string, wasm *shared.WasmModule) error
- func (s *Store) SetWasmByID(ctx context.Context, id string, wasm *shared.WasmModule) error
- func (s *Store) SetWasmByName(ctx context.Context, name string, wasm *shared.WasmModule) error
- func (s *Store) UpdateNotificationConfig(ctx context.Context, req *protos.UpdateNotificationRequest) error
- func (s *Store) UpdatePipeline(ctx context.Context, pipeline *protos.Pipeline) error
- func (s *Store) WatchKeys(quitCtx context.Context, key string) chan struct{}
Constants ¶
const ( RedisLivePrefix = "streamdal_live" RedisLiveFormat = "streamdal_live:%s:%s:%s" // K: $session_id:$node_name:$audience RedisAudiencePrefix = "streamdal_audience" RedisAudienceKeyFormat = "streamdal_audience:%s" // K: $audience V: SetPipelineConfig JSON RedisAudienceCreatedByPrefix = "streamdal_audience_created_by" RedisAudienceCreatedByFormat = "streamdal_audience_created_by:%s:%s" // K: $audience V: None // RedisRegisterFormat key resides in the RedisLivePrefix RedisRegisterFormat = "streamdal_live:%s:%s:register" // K: $session_id:$node_name:register; V: NONE RedisPipelinePrefix = "streamdal_pipeline" RedisPipelineKeyFormat = "streamdal_pipeline:%s" // K: $pipeline_id V: serialized protos.Pipeline RedisNotificationConfigPrefix = "streamdal_notification_config" RedisNotificationConfigKeyFormat = "streamdal_notification_config:%s" // K: $config_id V: serialized protos.NotificationConfig RedisNotificationAssocPrefix = "streamdal_notification_assoc" RedisNotificationAssocFormat = "streamdal_notification_assoc:%s:%s" // K: $pipeline_id:$config_id V: NONE RedisSchemaPrefix = "streamdal_schema" RedisSchemaFormat = "streamdal_schema:%s" // K: $audience V: serialized protos.Schema RedisActiveTailPrefix = "streamdal_tail" RedisActiveTailKeyFormat = "streamdal_tail:%s" // K: $service_name:$tail_request_id V: serialized protos.TailRequest RedisPausedTailKeyFormat = "streamdal_tail_paused:%s" // K: $service_name:$tail_request_id:paused V: serialized protos.TailRequest // RedisActiveTailTTL is the TTL for the active tail key. While this key // should be automatically cleaned up when the frontend stops a Tail() request, // this TTL is a safety mechanism to ensure we do not leave orphaned tails. RedisActiveTailTTL = 10 * time.Second RedisTelemetryRegistrationPrefix = "streamdal_telemetry:registrations" RedisTelemetryRegistrationFormat = "streamdal_telemetry:registrations:%x" RedisTelemetryAudiencePrefix = "streamdal_telemetry:audience" RedisTelemetryAudienceFormat = "streamdal_telemetry:audience:%x" RedisWasmPrefix = "streamdal_wasm" RedisWasmKeyFormat = "streamdal_wasm:%s:%s" // k: streamdal_wasm:$wasm-name:$wasm-id v: serialized protos.Wasm )
const ( // RedisKeyWatchPrefix is the key under which redis publishes key events. // The format is __keyspace@{$database_number}__ // We're always defaulting to db 0, so we can use this prefix to watch for key changes // See https://redis.io/docs/manual/keyspace-notifications/ RedisKeyWatchPrefix = "__keyspace@0__:" // InstallIDKey is a unique ID for this streamdal server cluster // Each cluster will get a unique UUID. This is used to track the number of // installs for telemetry and is completely random for anonymization purposes. InstallIDKey = "streamdal_install_id" RedisCreationDateKey = "streamdal_settings:creation_date" )
Variables ¶
var ( ErrPipelineNotFound = errors.New("pipeline not found") ErrConfigNotFound = errors.New("config not found") ErrMustExist = errors.New("object does not exist and MustExist is set") ErrNoOverwrite = errors.New("object exists and NoOverwrite is set") )
var (
ErrWasmNotFound = errors.New("wasm not found")
)
Functions ¶
func RedisActiveTailKey ¶
func RedisAudienceCreatedByKey ¶ added in v0.0.45
func RedisAudienceKey ¶
func RedisLiveKey ¶
func RedisPausedTailKey ¶
func RedisPipelineKey ¶
func RedisRegisterKey ¶
func RedisSchemaKey ¶
func RedisTelemetryAudience ¶
func RedisWasmKey ¶ added in v0.0.41
Types ¶
type IStore ¶
type IStore interface { AddRegistration(ctx context.Context, req *protos.RegisterRequest) error DeleteRegistration(ctx context.Context, req *protos.DeregisterRequest) error RecordRegistration(ctx context.Context, req *protos.RegisterRequest) error SeenRegistration(ctx context.Context, req *protos.RegisterRequest) bool GetPipelines(ctx context.Context) (map[string]*protos.Pipeline, error) GetPipeline(ctx context.Context, pipelineID string) (*protos.Pipeline, error) GetAudienceMappings(ctx context.Context) (map[*protos.Audience]*protos.PipelineConfigs, error) GetPipelineConfigsByAudience(ctx context.Context, aud *protos.Audience) (*protos.PipelineConfigs, error) GetLive(ctx context.Context) ([]*types.LiveEntry, error) CreatePipeline(ctx context.Context, pipeline *protos.Pipeline) error AddAudience(ctx context.Context, req *protos.NewAudienceRequest) error // CreateAudience creates a new audience in the store. This method differs // from AddAudience() in that it does not create any live entry. CreateAudience(ctx context.Context, aud *protos.Audience) error DeleteAudience(ctx context.Context, req *protos.DeleteAudienceRequest) error DeletePipeline(ctx context.Context, pipelineID string) error UpdatePipeline(ctx context.Context, pipeline *protos.Pipeline) error SetPauseResume(ctx context.Context, audience *protos.Audience, pipelineID string, pause bool) (bool, error) GetAudiences(ctx context.Context) ([]*protos.Audience, error) GetNotificationConfig(ctx context.Context, req *protos.GetNotificationRequest) (*protos.NotificationConfig, error) GetNotificationConfigs(ctx context.Context) (map[string]*protos.NotificationConfig, error) CreateNotificationConfig(ctx context.Context, req *protos.CreateNotificationRequest) error UpdateNotificationConfig(ctx context.Context, req *protos.UpdateNotificationRequest) error DeleteNotificationConfig(ctx context.Context, req *protos.DeleteNotificationRequest) error GetPipelineUsage(ctx context.Context) ([]*PipelineUsage, error) GetActivePipelineUsage(ctx context.Context, pipelineID string) ([]*PipelineUsage, error) GetActiveTailCommandsByService(ctx context.Context, serviceName string) ([]*protos.Command, error) AddActiveTailRequest(ctx context.Context, req *protos.TailRequest) (string, error) // Returns key that tail req is stored under // GetAudiencesBySessionID returns all audiences for a given session id // This is needed to inject an inferschema pipeline for each announced audience // to the session via a goroutine in internal.Register() GetAudiencesBySessionID(ctx context.Context, sessionID string) ([]*protos.Audience, error) GetAudiencesByService(ctx context.Context, serviceName string) ([]*protos.Audience, error) // WatchKeys will watch for key changes for given key pattern; every time key // is updated, it will send a message on the return channel. // WatchKeys launches a dedicated goroutine for the watch - it can be stopped // via the provided context. WatchKeys(quitCtx context.Context, key string) chan struct{} AddSchema(ctx context.Context, req *protos.SendSchemaRequest) error GetSchema(ctx context.Context, aud *protos.Audience) (*protos.Schema, error) // GetInstallID returns the unique ID for this server cluster. // If an ID has not been set yet, a new one is generated and returned GetInstallID(ctx context.Context) (string, error) // GetCreationDate returns the creation date of this server cluster. This is used // for sending server_timestamp_created_seconds metric to telemetry GetCreationDate(ctx context.Context) (int64, error) // SetCreationDate sets the creation date of this server cluster SetCreationDate(ctx context.Context, ts int64) error // IsPipelineAttachedAny returns if pipeline is attached to any audience. Used for telemetry tags IsPipelineAttachedAny(ctx context.Context, pipelineID string) bool // PauseTailRequest pauses a tail request by its ID PauseTailRequest(ctx context.Context, req *protos.PauseTailRequest) (*protos.TailRequest, error) // ResumeTailRequest resumes a tail request by its ID ResumeTailRequest(ctx context.Context, req *protos.ResumeTailRequest) (*protos.TailRequest, error) // GetTailRequestById returns a TailRequest by its ID GetTailRequestById(ctx context.Context, tailID string) (*protos.TailRequest, error) // SetPipelines saves pipelines as SetPipelinesConfig json to $audience key in store SetPipelines(ctx context.Context, req *protos.SetPipelinesRequest) error // GetSetPipelinesCommandsByService returns a slice of SetPipelines commands for a given service GetSetPipelinesCommandsByService(ctx context.Context, serviceName string) ([]*protos.Command, error) // GetSessionIDs returns a slice of ALL known session ids; // accepts an optional "node name" to filter by. GetSessionIDs(ctx context.Context, nodeName ...string) ([]string, error) // GetSessionIDsByAudience returns a slice of session IDs for a given audience; // accepts an optional "node name" to filter by. GetSessionIDsByAudience(ctx context.Context, aud *protos.Audience, nodeName ...string) ([]string, error) // GetSessionIDsByPipelineID returns a slice of session IDs that use a pipeline ID GetSessionIDsByPipelineID(ctx context.Context, pipelineID string) ([]string, error) // GetPipelinesByAudience will fetch pipeline configs and then use the ID to // fetch the actual pipeline from store + update the paused state. GetPipelinesByAudience(ctx context.Context, aud *protos.Audience) ([]*protos.Pipeline, error) // DeletePipelineConfig deletes a pipeline config for all audiences that use // given pipeline ID; it will return a list of audiences that the pipeline id // was used by. DeletePipelineConfig(ctx context.Context, pipelineID string) ([]*protos.Audience, error) // GetAudiencesByPipelineID returns a slice of audiences that use a pipeline ID GetAudiencesByPipelineID(ctx context.Context, pipelineID string) ([]*protos.Audience, error) // BEGIN Wasm-related methods GetAllWasm(ctx context.Context) ([]*shared.WasmModule, error) GetWasm(ctx context.Context, name, id string) (*shared.WasmModule, error) GetWasmByID(ctx context.Context, id string) (*shared.WasmModule, error) GetWasmByName(ctx context.Context, name string) (*shared.WasmModule, error) SetWasm(ctx context.Context, id, name string, wasm *shared.WasmModule) error SetWasmByName(ctx context.Context, name string, wasm *shared.WasmModule) error SetWasmByID(ctx context.Context, id string, wasm *shared.WasmModule) error DeleteWasm(ctx context.Context, id, name string) error DeleteWasmByID(ctx context.Context, id string) error DeleteWasmByName(ctx context.Context, name string) error // GetPipelinesByWasmID returns a slice of pipelines that use a given Wasm ID GetPipelinesByWasmID(ctx context.Context, wasmID string) ([]*protos.Pipeline, error) }
type Option ¶ added in v0.0.41
type Option struct { // When set, the store will NOT overwrite an existing object. Applies to write(). NoOverwrite bool // When set, the store will ensure that the object exists before performing // the operation. Applies to write() and delete(). NOTE: MustExist // and MustNotExist are mutually exclusive. MustExist bool // Optional TTL to set on the key. Applies to write(). TTL time.Duration }
Option contains settings that can influence read, write or delete operations.
NOTE: Redis transactions do NOT function like "traditional" txns as there is no concept of a "rollback" or "commit". Txns in redis are essentially grouped commands that are executed atomically. Due to this, Options currently does not support TXNs and there is a _small_ potential for races.
See: https://redis.io/docs/interact/transactions/ See: https://redis.com/blog/you-dont-need-transaction-rollbacks-in-redis/
type Options ¶
type Options struct { Encryption encryption.IEncryption RedisBackend *redis.Client ShutdownCtx context.Context NodeName string SessionTTL time.Duration Telemetry statsd.Statter InstallID string }
type PipelineUsage ¶
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
func (*Store) AddActiveTailRequest ¶
func (*Store) AddAudience ¶
func (*Store) AddRegistration ¶
func (*Store) CreateAudience ¶ added in v0.0.32
func (*Store) CreateNotificationConfig ¶
func (*Store) CreatePipeline ¶
func (*Store) DeleteAudience ¶
func (*Store) DeleteNotificationConfig ¶
func (*Store) DeletePipeline ¶
func (*Store) DeletePipelineConfig ¶ added in v0.0.23
func (*Store) DeleteRegistration ¶
func (*Store) DeleteWasm ¶ added in v0.0.41
DeleteWasm will remove a Wasm entry from the store by name and ID.
func (*Store) DeleteWasmByID ¶ added in v0.0.41
DeleteWasmByID will remove an EXISTING Wasm entry by ID. This method expects for a Wasm object with the given ID to exist. If it does not, the delete will error.
func (*Store) DeleteWasmByName ¶ added in v0.0.41
DeleteWasmByName will remove an EXISTING Wasm entry by Name. This method expects for a Wasm object with the given Name to exist. If it does not, the delete will error.
func (*Store) GetActivePipelineUsage ¶
func (s *Store) GetActivePipelineUsage(ctx context.Context, pipelineID string) ([]*PipelineUsage, error)
GetActivePipelineUsage gets *ACTIVE* pipeline usage on the CURRENT node NOTE: This method is a helper for GetPipelineUsage().
func (*Store) GetActiveTailCommandsByService ¶
func (*Store) GetAllWasm ¶ added in v0.0.41
func (*Store) GetAudienceMappings ¶ added in v0.0.44
func (s *Store) GetAudienceMappings(ctx context.Context) (map[*protos.Audience]*protos.PipelineConfigs, error)
GetAudienceMappings returns all audience -> *protos.PipelineConfigs assignments
func (*Store) GetAudiences ¶
func (*Store) GetAudiencesByPipelineID ¶ added in v0.0.23
func (*Store) GetAudiencesByService ¶
func (*Store) GetAudiencesBySessionID ¶
func (*Store) GetNotificationConfig ¶
func (s *Store) GetNotificationConfig(ctx context.Context, req *protos.GetNotificationRequest) (*protos.NotificationConfig, error)
func (*Store) GetNotificationConfigs ¶
func (*Store) GetPausedTailRequestById ¶
func (*Store) GetPipeline ¶
func (*Store) GetPipelineConfigsByAudience ¶ added in v0.0.17
func (s *Store) GetPipelineConfigsByAudience(ctx context.Context, aud *protos.Audience) (*protos.PipelineConfigs, error)
GetPipelineConfigsByAudience returns *protos.PipelineConfigs for a given audience TODO: Need tests
func (*Store) GetPipelineUsage ¶
func (s *Store) GetPipelineUsage(ctx context.Context) ([]*PipelineUsage, error)
GetPipelineUsage gets usage across the entire cluster
func (*Store) GetPipelines ¶
func (*Store) GetPipelinesByAudience ¶ added in v0.0.17
func (*Store) GetPipelinesByWasmID ¶ added in v0.0.41
func (*Store) GetSessionIDs ¶
GetSessionIDs will return ALL live session IDs; optionally filter by node name
func (*Store) GetSessionIDsByAudience ¶
func (s *Store) GetSessionIDsByAudience(ctx context.Context, aud *protos.Audience, nodeName ...string) ([]string, error)
TODO: Needs tests
func (*Store) GetSessionIDsByPipelineID ¶
func (*Store) GetSetPipelinesCommandsByService ¶
func (*Store) GetTailRequestById ¶
func (*Store) GetWasmByID ¶ added in v0.0.41
GetWasmByID will fetch Wasm from the store by ID (regardless of 'name')
func (*Store) GetWasmByName ¶ added in v0.0.41
GetWasmByName will fetch Wasm from the store by name (regardless of 'id')
func (*Store) IsPipelineAttachedAny ¶
func (*Store) PauseTailRequest ¶
func (s *Store) PauseTailRequest(ctx context.Context, req *protos.PauseTailRequest) (*protos.TailRequest, error)
func (*Store) RecordRegistration ¶
func (*Store) ResumeTailRequest ¶
func (s *Store) ResumeTailRequest(ctx context.Context, req *protos.ResumeTailRequest) (*protos.TailRequest, error)
func (*Store) SeenRegistration ¶
func (*Store) SetCreationDate ¶
func (*Store) SetPauseResume ¶
func (s *Store) SetPauseResume(ctx context.Context, audience *protos.Audience, pipelineID string, paused bool) (bool, error)
SetPauseResume sets pipeline pause status
func (*Store) SetPipelines ¶
func (*Store) SetWasm ¶ added in v0.0.41
SetWasm will store Wasm in the store by name and ID; it will overwrite an existing entry (if it exists).
func (*Store) SetWasmByID ¶ added in v0.0.41
SetWasmByID will overwrite an EXISTING Wasm entry by ID. This method requires for an existing Wasm object to exist in redi and will use the discovered 'id' to overwrite the entry.
func (*Store) SetWasmByName ¶ added in v0.0.41
SetWasmByName will overwrite an EXISTING Wasm entry by name. This method requires for an existing Wasm object to exist in redis and will use the discovered 'name' to overwrite the entry.