Documentation ¶
Index ¶
- Constants
- Variables
- func RedisActiveTailKey(tailID string) string
- func RedisAudienceKey(audience string) string
- func RedisConfigKey(audience *protos.Audience, pipelineID string) string
- func RedisLiveKey(session, node, audience string) string
- func RedisNotificationAssocKey(pipelineID, configID string) string
- func RedisNotificationConfigKey(configID string) string
- func RedisPausedKey(audience, pipelineID string) string
- func RedisPausedTailKey(tailID string) string
- func RedisPipelineKey(pipelineID string) string
- func RedisRegisterKey(session, node string) string
- func RedisSchemaKey(audience string) string
- func RedisTelemetryAudience(aud *protos.Audience) string
- func RedisTelemetryRegistrationKey(serviceName, os, sdk, arch string) string
- type IStore
- type Options
- type PipelineUsage
- type Store
- func (s *Store) AddActiveTailRequest(ctx context.Context, req *protos.TailRequest) (string, error)
- func (s *Store) AddAudience(ctx context.Context, req *protos.NewAudienceRequest) error
- func (s *Store) AddRegistration(ctx context.Context, req *protos.RegisterRequest) error
- func (s *Store) AddSchema(ctx context.Context, req *protos.SendSchemaRequest) error
- func (s *Store) AttachNotificationConfig(ctx context.Context, req *protos.AttachNotificationRequest) error
- func (s *Store) AttachPipeline(ctx context.Context, req *protos.AttachPipelineRequest) error
- func (s *Store) CreateNotificationConfig(ctx context.Context, req *protos.CreateNotificationRequest) error
- func (s *Store) CreatePipeline(ctx context.Context, pipeline *protos.Pipeline) error
- func (s *Store) DeleteAudience(ctx context.Context, req *protos.DeleteAudienceRequest) error
- func (s *Store) DeleteNotificationConfig(ctx context.Context, req *protos.DeleteNotificationRequest) error
- func (s *Store) DeletePipeline(ctx context.Context, pipelineId string) error
- func (s *Store) DeleteRegistration(ctx context.Context, req *protos.DeregisterRequest) error
- func (s *Store) DetachNotificationConfig(ctx context.Context, req *protos.DetachNotificationRequest) error
- func (s *Store) DetachPipeline(ctx context.Context, req *protos.DetachPipelineRequest) error
- func (s *Store) GetActivePipelineUsage(ctx context.Context, pipelineID string) ([]*PipelineUsage, error)
- func (s *Store) GetActiveTailCommandsByService(ctx context.Context, serviceName string) ([]*protos.Command, error)
- func (s *Store) GetAttachCommandsByService(ctx context.Context, serviceName string) ([]*protos.Command, error)
- func (s *Store) GetAudiences(ctx context.Context) ([]*protos.Audience, error)
- func (s *Store) GetAudiencesByService(ctx context.Context, serviceName string) ([]*protos.Audience, error)
- func (s *Store) GetAudiencesBySessionID(ctx context.Context, sessionID string) ([]*protos.Audience, error)
- func (s *Store) GetConfig(ctx context.Context) (map[*protos.Audience][]string, error)
- func (s *Store) GetConfigByAudience(ctx context.Context, audience *protos.Audience) ([]string, error)
- func (s *Store) GetCreationDate(ctx context.Context) (int64, error)
- func (s *Store) GetInstallID(ctx context.Context) (string, error)
- func (s *Store) GetLive(ctx context.Context) ([]*types.LiveEntry, error)
- func (s *Store) GetNotificationConfig(ctx context.Context, req *protos.GetNotificationRequest) (*protos.NotificationConfig, error)
- func (s *Store) GetNotificationConfigs(ctx context.Context) (map[string]*protos.NotificationConfig, error)
- func (s *Store) GetNotificationConfigsByPipeline(ctx context.Context, pipelineID string) ([]*protos.NotificationConfig, error)
- func (s *Store) GetPaused(ctx context.Context) (map[string]*types.PausedEntry, error)
- func (s *Store) GetPausedTailRequestById(ctx context.Context, tailID string) (*protos.TailRequest, error)
- func (s *Store) GetPipeline(ctx context.Context, pipelineId string) (*protos.Pipeline, error)
- func (s *Store) GetPipelineUsage(ctx context.Context) ([]*PipelineUsage, error)
- func (s *Store) GetPipelines(ctx context.Context) (map[string]*protos.Pipeline, error)
- func (s *Store) GetSchema(ctx context.Context, aud *protos.Audience) (*protos.Schema, error)
- func (s *Store) GetTailRequestById(ctx context.Context, tailID string) (*protos.TailRequest, error)
- func (s *Store) IsPaused(ctx context.Context, audience *protos.Audience, pipelineID string) (bool, error)
- func (s *Store) IsPipelineAttached(ctx context.Context, audience *protos.Audience, pipelineID string) (bool, error)
- func (s *Store) IsPipelineAttachedAny(ctx context.Context, pipelineID string) bool
- func (s *Store) PausePipeline(ctx context.Context, req *protos.PausePipelineRequest) error
- func (s *Store) PauseTailRequest(ctx context.Context, req *protos.PauseTailRequest) (*protos.TailRequest, error)
- func (s *Store) RecordRegistration(ctx context.Context, req *protos.RegisterRequest) error
- func (s *Store) ResumePipeline(ctx context.Context, req *protos.ResumePipelineRequest) error
- func (s *Store) ResumeTailRequest(ctx context.Context, req *protos.ResumeTailRequest) (*protos.TailRequest, error)
- func (s *Store) SeenRegistration(ctx context.Context, req *protos.RegisterRequest) bool
- func (s *Store) SetCreationDate(ctx context.Context, ts int64) error
- func (s *Store) UpdateNotificationConfig(ctx context.Context, req *protos.UpdateNotificationRequest) error
- func (s *Store) UpdatePipeline(ctx context.Context, pipeline *protos.Pipeline) error
- func (s *Store) WatchKeys(quitCtx context.Context, key string) chan struct{}
Constants ¶
View Source
const ( RedisLivePrefix = "streamdal_live" RedisLiveFormat = "streamdal_live:%s:%s:%s" // K: $session_id:$node_name:$audience RedisAudiencePrefix = "streamdal_audience" RedisAudienceKeyFormat = "streamdal_audience:%s" // K: $audience V: NONE // RedisRegisterFormat key resides in the RedisLivePrefix RedisRegisterFormat = "streamdal_live:%s:%s:register" // K: $session_id:$node_name:register; V: NONE RedisPipelinePrefix = "streamdal_pipeline" RedisPipelineKeyFormat = "streamdal_pipeline:%s" // K: $pipeline_id V: serialized protos.Pipeline RedisConfigPrefix = "streamdal_config" RedisConfigKeyFormat = "streamdal_config:%s:%s" // K: $audience V: $pipeline_id (string) RedisPausedPrefix = "streamdal_paused" RedisPausedKeyFormat = "streamdal_paused:%s:%s" // K: $pipeline_id:$audience V: NONE RedisNotificationConfigPrefix = "streamdal_notification_config" RedisNotificationConfigKeyFormat = "streamdal_notification_config:%s" // K: $config_id V: serialized protos.NotificationConfig RedisNotificationAssocPrefix = "streamdal_notification_assoc" RedisNotificationAssocFormat = "streamdal_notification_assoc:%s:%s" // K: $pipeline_id:$config_id V: NONE RedisSchemaPrefix = "streamdal_schema" RedisSchemaFormat = "streamdal_schema:%s" // K: $audience V: serialized protos.Schema RedisActiveTailPrefix = "streamdal_tail" RedisActiveTailKeyFormat = "streamdal_tail:%s" // K: $service_name:$tail_request_id V: serialized protos.TailRequest RedisPausedTailKeyFormat = "streamdal_tail_paused:%s" // K: $service_name:$tail_request_id:paused V: serialized protos.TailRequest // RedisActiveTailTTL is the TTL for the active tail key. While this key // should be automatically cleaned up when the frontend stops a Tail() request, // this TTL is a safety mechanism to ensure we do not leave orphaned tails. RedisActiveTailTTL = 10 * time.Second RedisTelemetryRegistrationPrefix = "streamdal_telemetry:registrations" RedisTelemetryRegistrationFormat = "streamdal_telemetry:registrations:%x" RedisTelemetryAudiencePrefix = "streamdal_telemetry:audience" RedisTelemetryAudienceFormat = "streamdal_telemetry:audience:%x" )
View Source
const ( // RedisKeyWatchPrefix is the key under which redis publishes key events. // The format is __keyspace@{$database_number}__ // We're always defaulting to db 0, so we can use this prefix to watch for key changes // See https://redis.io/docs/manual/keyspace-notifications/ RedisKeyWatchPrefix = "__keyspace@0__:" // InstallIDKey is a unique ID for this streamdal server cluster // Each cluster will get a unique UUID. This is used to track the number of // installs for telemetry and is completely random for anonymization purposes. InstallIDKey = "install_id" RedisCreationDateKey = "streamdal_settings:creation_date" )
Variables ¶
View Source
var ( ErrPipelineNotFound = errors.New("pipeline not found") ErrConfigNotFound = errors.New("config not found") )
Functions ¶
func RedisActiveTailKey ¶ added in v0.0.9
func RedisAudienceKey ¶
func RedisLiveKey ¶
func RedisPausedKey ¶
func RedisPausedTailKey ¶ added in v0.0.9
func RedisPipelineKey ¶
func RedisRegisterKey ¶
func RedisSchemaKey ¶
func RedisTelemetryAudience ¶ added in v0.0.10
func RedisTelemetryRegistrationKey ¶ added in v0.0.10
Types ¶
type IStore ¶
type IStore interface { AddRegistration(ctx context.Context, req *protos.RegisterRequest) error DeleteRegistration(ctx context.Context, req *protos.DeregisterRequest) error RecordRegistration(ctx context.Context, req *protos.RegisterRequest) error SeenRegistration(ctx context.Context, req *protos.RegisterRequest) bool GetPipelines(ctx context.Context) (map[string]*protos.Pipeline, error) GetPipeline(ctx context.Context, pipelineID string) (*protos.Pipeline, error) GetConfig(ctx context.Context) (map[*protos.Audience][]string, error) // v: pipeline_id GetConfigByAudience(ctx context.Context, audience *protos.Audience) ([]string, error) GetLive(ctx context.Context) ([]*types.LiveEntry, error) GetPaused(ctx context.Context) (map[string]*types.PausedEntry, error) CreatePipeline(ctx context.Context, pipeline *protos.Pipeline) error AddAudience(ctx context.Context, req *protos.NewAudienceRequest) error DeleteAudience(ctx context.Context, req *protos.DeleteAudienceRequest) error DeletePipeline(ctx context.Context, pipelineID string) error UpdatePipeline(ctx context.Context, pipeline *protos.Pipeline) error AttachPipeline(ctx context.Context, req *protos.AttachPipelineRequest) error DetachPipeline(ctx context.Context, req *protos.DetachPipelineRequest) error PausePipeline(ctx context.Context, req *protos.PausePipelineRequest) error ResumePipeline(ctx context.Context, req *protos.ResumePipelineRequest) error IsPaused(ctx context.Context, audience *protos.Audience, pipelineID string) (bool, error) GetAudiences(ctx context.Context) ([]*protos.Audience, error) IsPipelineAttached(ctx context.Context, audience *protos.Audience, pipelineID string) (bool, error) GetNotificationConfig(ctx context.Context, req *protos.GetNotificationRequest) (*protos.NotificationConfig, error) GetNotificationConfigs(ctx context.Context) (map[string]*protos.NotificationConfig, error) GetNotificationConfigsByPipeline(ctx context.Context, pipelineID string) ([]*protos.NotificationConfig, error) CreateNotificationConfig(ctx context.Context, req *protos.CreateNotificationRequest) error UpdateNotificationConfig(ctx context.Context, req *protos.UpdateNotificationRequest) error DeleteNotificationConfig(ctx context.Context, req *protos.DeleteNotificationRequest) error AttachNotificationConfig(ctx context.Context, req *protos.AttachNotificationRequest) error DetachNotificationConfig(ctx context.Context, req *protos.DetachNotificationRequest) error GetAttachCommandsByService(ctx context.Context, serviceName string) ([]*protos.Command, error) GetPipelineUsage(ctx context.Context) ([]*PipelineUsage, error) GetActivePipelineUsage(ctx context.Context, pipelineID string) ([]*PipelineUsage, error) GetActiveTailCommandsByService(ctx context.Context, serviceName string) ([]*protos.Command, error) AddActiveTailRequest(ctx context.Context, req *protos.TailRequest) (string, error) // Returns key that tail req is stored under // GetAudiencesBySessionID returns all audiences for a given session id // This is needed to inject an inferschema pipeline for each announced audience // to the session via a goroutine in internal.Register() GetAudiencesBySessionID(ctx context.Context, sessionID string) ([]*protos.Audience, error) GetAudiencesByService(ctx context.Context, serviceName string) ([]*protos.Audience, error) // WatchKeys will watch for key changes for given key pattern; every time key // is updated, it will send a message on the return channel. // WatchKeys launches a dedicated goroutine for the watch - it can be stopped // via the provided context. WatchKeys(quitCtx context.Context, key string) chan struct{} AddSchema(ctx context.Context, req *protos.SendSchemaRequest) error GetSchema(ctx context.Context, aud *protos.Audience) (*protos.Schema, error) // GetInstallID returns the unique ID for this server cluster. // If an ID has not been set yet, a new one is generated and returned GetInstallID(ctx context.Context) (string, error) // GetCreationDate returns the creation date of this server cluster. This is used // for sending server_timestamp_created_seconds metric to telemetry GetCreationDate(ctx context.Context) (int64, error) // SetCreationDate sets the creation date of this server cluster SetCreationDate(ctx context.Context, ts int64) error // IsPipelineAttachedAny returns if pipeline is attached to any audience. Used for telemetry tags IsPipelineAttachedAny(ctx context.Context, pipelineID string) bool // PauseTailRequest pauses a tail request by its ID PauseTailRequest(ctx context.Context, req *protos.PauseTailRequest) (*protos.TailRequest, error) // ResumeTailRequest resumes a tail request by its ID ResumeTailRequest(ctx context.Context, req *protos.ResumeTailRequest) (*protos.TailRequest, error) // GetTailRequestById returns a TailRequest by its ID GetTailRequestById(ctx context.Context, tailID string) (*protos.TailRequest, error) }
type Options ¶
type Options struct { Encryption encryption.IEncryption RedisBackend *redis.Client ShutdownCtx context.Context NodeName string SessionTTL time.Duration Telemetry statsd.Statter InstallID string }
type PipelineUsage ¶
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
func (*Store) AddActiveTailRequest ¶
func (*Store) AddAudience ¶
func (*Store) AddRegistration ¶
func (*Store) AttachNotificationConfig ¶
func (*Store) AttachPipeline ¶
func (*Store) CreateNotificationConfig ¶
func (*Store) CreatePipeline ¶
func (*Store) DeleteAudience ¶
func (*Store) DeleteNotificationConfig ¶
func (*Store) DeletePipeline ¶
func (*Store) DeleteRegistration ¶
func (*Store) DetachNotificationConfig ¶
func (*Store) DetachPipeline ¶
func (*Store) GetActivePipelineUsage ¶
func (s *Store) GetActivePipelineUsage(ctx context.Context, pipelineID string) ([]*PipelineUsage, error)
GetActivePipelineUsage gets *ACTIVE* pipeline usage on the CURRENT node NOTE: This method is a helper for GetPipelineUsage().
func (*Store) GetActiveTailCommandsByService ¶
func (*Store) GetAttachCommandsByService ¶
func (*Store) GetAudiences ¶
func (*Store) GetAudiencesByService ¶
func (*Store) GetAudiencesBySessionID ¶
func (*Store) GetConfigByAudience ¶
func (s *Store) GetConfigByAudience(ctx context.Context, audience *protos.Audience) ([]string, error)
GetConfigByAudience returns a list of pipeline IDs attached to given audience
func (*Store) GetCreationDate ¶ added in v0.0.10
func (*Store) GetInstallID ¶ added in v0.0.10
func (*Store) GetNotificationConfig ¶
func (s *Store) GetNotificationConfig(ctx context.Context, req *protos.GetNotificationRequest) (*protos.NotificationConfig, error)
func (*Store) GetNotificationConfigs ¶
func (*Store) GetNotificationConfigsByPipeline ¶
func (*Store) GetPausedTailRequestById ¶ added in v0.0.9
func (*Store) GetPipeline ¶
func (*Store) GetPipelineUsage ¶
func (s *Store) GetPipelineUsage(ctx context.Context) ([]*PipelineUsage, error)
GetPipelineUsage gets usage across the entire cluster
func (*Store) GetPipelines ¶
func (*Store) GetTailRequestById ¶ added in v0.0.9
func (*Store) IsPaused ¶
func (s *Store) IsPaused(ctx context.Context, audience *protos.Audience, pipelineID string) (bool, error)
IsPaused returns if pipeline is paused and if it exists
func (*Store) IsPipelineAttached ¶
func (*Store) IsPipelineAttachedAny ¶ added in v0.0.10
func (*Store) PausePipeline ¶
func (*Store) PauseTailRequest ¶ added in v0.0.9
func (s *Store) PauseTailRequest(ctx context.Context, req *protos.PauseTailRequest) (*protos.TailRequest, error)
func (*Store) RecordRegistration ¶ added in v0.0.10
func (*Store) ResumePipeline ¶
func (*Store) ResumeTailRequest ¶ added in v0.0.9
func (s *Store) ResumeTailRequest(ctx context.Context, req *protos.ResumeTailRequest) (*protos.TailRequest, error)
func (*Store) SeenRegistration ¶ added in v0.0.10
func (*Store) SetCreationDate ¶ added in v0.0.10
func (*Store) UpdateNotificationConfig ¶
func (*Store) UpdatePipeline ¶
Click to show internal directories.
Click to hide internal directories.