store

package
v0.0.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2023 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RedisLivePrefix = "streamdal_live"
	RedisLiveFormat = "streamdal_live:%s:%s:%s" // K: $session_id:$node_name:$audience

	RedisAudiencePrefix    = "streamdal_audience"
	RedisAudienceKeyFormat = "streamdal_audience:%s" // K: $audience V: NONE

	// RedisRegisterFormat key resides in the RedisLivePrefix
	RedisRegisterFormat = "streamdal_live:%s:%s:register" // K: $session_id:$node_name:register; V: NONE

	RedisPipelinePrefix    = "streamdal_pipeline"
	RedisPipelineKeyFormat = "streamdal_pipeline:%s" // K: $pipeline_id V: serialized protos.Pipeline

	RedisConfigPrefix    = "streamdal_config"
	RedisConfigKeyFormat = "streamdal_config:%s:%s" // K: $audience V: $pipeline_id (string)

	RedisPausedPrefix    = "streamdal_paused"
	RedisPausedKeyFormat = "streamdal_paused:%s:%s" // K: $pipeline_id:$audience V: NONE

	RedisNotificationConfigPrefix    = "streamdal_notification_config"
	RedisNotificationConfigKeyFormat = "streamdal_notification_config:%s" // K: $config_id V: serialized protos.NotificationConfig

	RedisNotificationAssocPrefix = "streamdal_notification_assoc"
	RedisNotificationAssocFormat = "streamdal_notification_assoc:%s:%s" // K: $pipeline_id:$config_id V: NONE

	RedisSchemaPrefix = "streamdal_schema"
	RedisSchemaFormat = "streamdal_schema:%s" // K: $audience V: serialized protos.Schema

	RedisActiveTailPrefix    = "streamdal_tail"
	RedisActiveTailKeyFormat = "streamdal_tail:%s"        // K: $service_name:$tail_request_id V: serialized protos.TailRequest
	RedisPausedTailKeyFormat = "streamdal_tail_paused:%s" // K: $service_name:$tail_request_id:paused V: serialized protos.TailRequest

	// RedisActiveTailTTL is the TTL for the active tail key. While this key
	// should be automatically cleaned up when the frontend stops a Tail() request,
	// this TTL is a safety mechanism to ensure we do not leave orphaned tails.
	RedisActiveTailTTL = 10 * time.Second

	RedisTelemetryRegistrationPrefix = "streamdal_telemetry:registrations"
	RedisTelemetryRegistrationFormat = "streamdal_telemetry:registrations:%x"

	RedisTelemetryAudiencePrefix = "streamdal_telemetry:audience"
	RedisTelemetryAudienceFormat = "streamdal_telemetry:audience:%x"
)
View Source
const (
	// RedisKeyWatchPrefix is the key under which redis publishes key events.
	// The format is  __keyspace@{$database_number}__
	// We're always defaulting to db 0, so we can use this prefix to watch for key changes
	// See https://redis.io/docs/manual/keyspace-notifications/
	RedisKeyWatchPrefix = "__keyspace@0__:"

	// InstallIDKey is a unique ID for this streamdal server cluster
	// Each cluster will get a unique UUID. This is used to track the number of
	// installs for telemetry and is completely random for anonymization purposes.
	InstallIDKey = "install_id"

	RedisCreationDateKey = "streamdal_settings:creation_date"
)

Variables

View Source
var (
	ErrPipelineNotFound = errors.New("pipeline not found")
	ErrConfigNotFound   = errors.New("config not found")
)

Functions

func RedisActiveTailKey added in v0.0.9

func RedisActiveTailKey(tailID string) string

func RedisAudienceKey

func RedisAudienceKey(audience string) string

func RedisConfigKey

func RedisConfigKey(audience *protos.Audience, pipelineID string) string

func RedisLiveKey

func RedisLiveKey(session, node, audience string) string

func RedisNotificationAssocKey

func RedisNotificationAssocKey(pipelineID, configID string) string

func RedisNotificationConfigKey

func RedisNotificationConfigKey(configID string) string

func RedisPausedKey

func RedisPausedKey(audience, pipelineID string) string

func RedisPausedTailKey added in v0.0.9

func RedisPausedTailKey(tailID string) string

func RedisPipelineKey

func RedisPipelineKey(pipelineID string) string

func RedisRegisterKey

func RedisRegisterKey(session, node string) string

func RedisSchemaKey

func RedisSchemaKey(audience string) string

func RedisTelemetryAudience added in v0.0.10

func RedisTelemetryAudience(aud *protos.Audience) string

func RedisTelemetryRegistrationKey added in v0.0.10

func RedisTelemetryRegistrationKey(serviceName, os, sdk, arch string) string

Types

type IStore

type IStore interface {
	AddRegistration(ctx context.Context, req *protos.RegisterRequest) error
	DeleteRegistration(ctx context.Context, req *protos.DeregisterRequest) error
	RecordRegistration(ctx context.Context, req *protos.RegisterRequest) error
	SeenRegistration(ctx context.Context, req *protos.RegisterRequest) bool
	GetPipelines(ctx context.Context) (map[string]*protos.Pipeline, error)
	GetPipeline(ctx context.Context, pipelineID string) (*protos.Pipeline, error)
	GetConfig(ctx context.Context) (map[*protos.Audience][]string, error) // v: pipeline_id
	GetConfigByAudience(ctx context.Context, audience *protos.Audience) ([]string, error)
	GetLive(ctx context.Context) ([]*types.LiveEntry, error)
	GetPaused(ctx context.Context) (map[string]*types.PausedEntry, error)
	CreatePipeline(ctx context.Context, pipeline *protos.Pipeline) error
	AddAudience(ctx context.Context, req *protos.NewAudienceRequest) error
	DeleteAudience(ctx context.Context, req *protos.DeleteAudienceRequest) error
	DeletePipeline(ctx context.Context, pipelineID string) error
	UpdatePipeline(ctx context.Context, pipeline *protos.Pipeline) error
	AttachPipeline(ctx context.Context, req *protos.AttachPipelineRequest) error
	DetachPipeline(ctx context.Context, req *protos.DetachPipelineRequest) error
	PausePipeline(ctx context.Context, req *protos.PausePipelineRequest) error
	ResumePipeline(ctx context.Context, req *protos.ResumePipelineRequest) error
	IsPaused(ctx context.Context, audience *protos.Audience, pipelineID string) (bool, error)
	GetAudiences(ctx context.Context) ([]*protos.Audience, error)
	IsPipelineAttached(ctx context.Context, audience *protos.Audience, pipelineID string) (bool, error)
	GetNotificationConfig(ctx context.Context, req *protos.GetNotificationRequest) (*protos.NotificationConfig, error)
	GetNotificationConfigs(ctx context.Context) (map[string]*protos.NotificationConfig, error)
	GetNotificationConfigsByPipeline(ctx context.Context, pipelineID string) ([]*protos.NotificationConfig, error)
	CreateNotificationConfig(ctx context.Context, req *protos.CreateNotificationRequest) error
	UpdateNotificationConfig(ctx context.Context, req *protos.UpdateNotificationRequest) error
	DeleteNotificationConfig(ctx context.Context, req *protos.DeleteNotificationRequest) error
	AttachNotificationConfig(ctx context.Context, req *protos.AttachNotificationRequest) error
	DetachNotificationConfig(ctx context.Context, req *protos.DetachNotificationRequest) error
	GetAttachCommandsByService(ctx context.Context, serviceName string) ([]*protos.Command, error)
	GetPipelineUsage(ctx context.Context) ([]*PipelineUsage, error)
	GetActivePipelineUsage(ctx context.Context, pipelineID string) ([]*PipelineUsage, error)
	GetActiveTailCommandsByService(ctx context.Context, serviceName string) ([]*protos.Command, error)
	AddActiveTailRequest(ctx context.Context, req *protos.TailRequest) (string, error) // Returns key that tail req is stored under

	// GetAudiencesBySessionID returns all audiences for a given session id
	// This is needed to inject an inferschema pipeline for each announced audience
	// to the session via a goroutine in internal.Register()
	GetAudiencesBySessionID(ctx context.Context, sessionID string) ([]*protos.Audience, error)

	GetAudiencesByService(ctx context.Context, serviceName string) ([]*protos.Audience, error)

	// WatchKeys will watch for key changes for given key pattern; every time key
	// is updated, it will send a message on the return channel.
	// WatchKeys launches a dedicated goroutine for the watch - it can be stopped
	// via the provided context.
	WatchKeys(quitCtx context.Context, key string) chan struct{}

	AddSchema(ctx context.Context, req *protos.SendSchemaRequest) error

	GetSchema(ctx context.Context, aud *protos.Audience) (*protos.Schema, error)

	// GetInstallID returns the unique ID for this server cluster.
	// If an ID has not been set yet, a new one is generated and returned
	GetInstallID(ctx context.Context) (string, error)

	// GetCreationDate returns the creation date of this server cluster. This is used
	// for sending server_timestamp_created_seconds metric to telemetry
	GetCreationDate(ctx context.Context) (int64, error)

	// SetCreationDate sets the creation date of this server cluster
	SetCreationDate(ctx context.Context, ts int64) error

	// IsPipelineAttachedAny returns if pipeline is attached to any audience. Used for telemetry tags
	IsPipelineAttachedAny(ctx context.Context, pipelineID string) bool

	// PauseTailRequest pauses a tail request by its ID
	PauseTailRequest(ctx context.Context, req *protos.PauseTailRequest) (*protos.TailRequest, error)

	// ResumeTailRequest resumes a tail request by its ID
	ResumeTailRequest(ctx context.Context, req *protos.ResumeTailRequest) (*protos.TailRequest, error)

	// GetTailRequestById returns a TailRequest by its ID
	GetTailRequestById(ctx context.Context, tailID string) (*protos.TailRequest, error)
}

type Options

type Options struct {
	Encryption   encryption.IEncryption
	RedisBackend *redis.Client
	ShutdownCtx  context.Context
	NodeName     string
	SessionTTL   time.Duration
	Telemetry    statsd.Statter
	InstallID    string
}

type PipelineUsage

type PipelineUsage struct {
	PipelineId string
	Active     bool
	NodeName   string
	SessionId  string
	Audience   *protos.Audience
}

type Store

type Store struct {
	// contains filtered or unexported fields
}

func New

func New(opts *Options) (*Store, error)

func (*Store) AddActiveTailRequest

func (s *Store) AddActiveTailRequest(ctx context.Context, req *protos.TailRequest) (string, error)

func (*Store) AddAudience

func (s *Store) AddAudience(ctx context.Context, req *protos.NewAudienceRequest) error

func (*Store) AddRegistration

func (s *Store) AddRegistration(ctx context.Context, req *protos.RegisterRequest) error

func (*Store) AddSchema

func (s *Store) AddSchema(ctx context.Context, req *protos.SendSchemaRequest) error

func (*Store) AttachNotificationConfig

func (s *Store) AttachNotificationConfig(ctx context.Context, req *protos.AttachNotificationRequest) error

func (*Store) AttachPipeline

func (s *Store) AttachPipeline(ctx context.Context, req *protos.AttachPipelineRequest) error

func (*Store) CreateNotificationConfig

func (s *Store) CreateNotificationConfig(ctx context.Context, req *protos.CreateNotificationRequest) error

func (*Store) CreatePipeline

func (s *Store) CreatePipeline(ctx context.Context, pipeline *protos.Pipeline) error

func (*Store) DeleteAudience

func (s *Store) DeleteAudience(ctx context.Context, req *protos.DeleteAudienceRequest) error

func (*Store) DeleteNotificationConfig

func (s *Store) DeleteNotificationConfig(ctx context.Context, req *protos.DeleteNotificationRequest) error

func (*Store) DeletePipeline

func (s *Store) DeletePipeline(ctx context.Context, pipelineId string) error

func (*Store) DeleteRegistration

func (s *Store) DeleteRegistration(ctx context.Context, req *protos.DeregisterRequest) error

func (*Store) DetachNotificationConfig

func (s *Store) DetachNotificationConfig(ctx context.Context, req *protos.DetachNotificationRequest) error

func (*Store) DetachPipeline

func (s *Store) DetachPipeline(ctx context.Context, req *protos.DetachPipelineRequest) error

func (*Store) GetActivePipelineUsage

func (s *Store) GetActivePipelineUsage(ctx context.Context, pipelineID string) ([]*PipelineUsage, error)

GetActivePipelineUsage gets *ACTIVE* pipeline usage on the CURRENT node NOTE: This method is a helper for GetPipelineUsage().

func (*Store) GetActiveTailCommandsByService

func (s *Store) GetActiveTailCommandsByService(ctx context.Context, serviceName string) ([]*protos.Command, error)

func (*Store) GetAttachCommandsByService

func (s *Store) GetAttachCommandsByService(ctx context.Context, serviceName string) ([]*protos.Command, error)

func (*Store) GetAudiences

func (s *Store) GetAudiences(ctx context.Context) ([]*protos.Audience, error)

func (*Store) GetAudiencesByService

func (s *Store) GetAudiencesByService(ctx context.Context, serviceName string) ([]*protos.Audience, error)

func (*Store) GetAudiencesBySessionID

func (s *Store) GetAudiencesBySessionID(ctx context.Context, sessionID string) ([]*protos.Audience, error)

func (*Store) GetConfig

func (s *Store) GetConfig(ctx context.Context) (map[*protos.Audience][]string, error)

func (*Store) GetConfigByAudience

func (s *Store) GetConfigByAudience(ctx context.Context, audience *protos.Audience) ([]string, error)

GetConfigByAudience returns a list of pipeline IDs attached to given audience

func (*Store) GetCreationDate added in v0.0.10

func (s *Store) GetCreationDate(ctx context.Context) (int64, error)

func (*Store) GetInstallID added in v0.0.10

func (s *Store) GetInstallID(ctx context.Context) (string, error)

func (*Store) GetLive

func (s *Store) GetLive(ctx context.Context) ([]*types.LiveEntry, error)

func (*Store) GetNotificationConfig

func (s *Store) GetNotificationConfig(ctx context.Context, req *protos.GetNotificationRequest) (*protos.NotificationConfig, error)

func (*Store) GetNotificationConfigs

func (s *Store) GetNotificationConfigs(ctx context.Context) (map[string]*protos.NotificationConfig, error)

func (*Store) GetNotificationConfigsByPipeline

func (s *Store) GetNotificationConfigsByPipeline(ctx context.Context, pipelineID string) ([]*protos.NotificationConfig, error)

func (*Store) GetPaused

func (s *Store) GetPaused(ctx context.Context) (map[string]*types.PausedEntry, error)

func (*Store) GetPausedTailRequestById added in v0.0.9

func (s *Store) GetPausedTailRequestById(ctx context.Context, tailID string) (*protos.TailRequest, error)

func (*Store) GetPipeline

func (s *Store) GetPipeline(ctx context.Context, pipelineId string) (*protos.Pipeline, error)

func (*Store) GetPipelineUsage

func (s *Store) GetPipelineUsage(ctx context.Context) ([]*PipelineUsage, error)

GetPipelineUsage gets usage across the entire cluster

func (*Store) GetPipelines

func (s *Store) GetPipelines(ctx context.Context) (map[string]*protos.Pipeline, error)

func (*Store) GetSchema

func (s *Store) GetSchema(ctx context.Context, aud *protos.Audience) (*protos.Schema, error)

func (*Store) GetTailRequestById added in v0.0.9

func (s *Store) GetTailRequestById(ctx context.Context, tailID string) (*protos.TailRequest, error)

func (*Store) IsPaused

func (s *Store) IsPaused(ctx context.Context, audience *protos.Audience, pipelineID string) (bool, error)

IsPaused returns if pipeline is paused and if it exists

func (*Store) IsPipelineAttached

func (s *Store) IsPipelineAttached(ctx context.Context, audience *protos.Audience, pipelineID string) (bool, error)

func (*Store) IsPipelineAttachedAny added in v0.0.10

func (s *Store) IsPipelineAttachedAny(ctx context.Context, pipelineID string) bool

func (*Store) PausePipeline

func (s *Store) PausePipeline(ctx context.Context, req *protos.PausePipelineRequest) error

func (*Store) PauseTailRequest added in v0.0.9

func (s *Store) PauseTailRequest(ctx context.Context, req *protos.PauseTailRequest) (*protos.TailRequest, error)

func (*Store) RecordRegistration added in v0.0.10

func (s *Store) RecordRegistration(ctx context.Context, req *protos.RegisterRequest) error

func (*Store) ResumePipeline

func (s *Store) ResumePipeline(ctx context.Context, req *protos.ResumePipelineRequest) error

func (*Store) ResumeTailRequest added in v0.0.9

func (s *Store) ResumeTailRequest(ctx context.Context, req *protos.ResumeTailRequest) (*protos.TailRequest, error)

func (*Store) SeenRegistration added in v0.0.10

func (s *Store) SeenRegistration(ctx context.Context, req *protos.RegisterRequest) bool

func (*Store) SetCreationDate added in v0.0.10

func (s *Store) SetCreationDate(ctx context.Context, ts int64) error

func (*Store) UpdateNotificationConfig

func (s *Store) UpdateNotificationConfig(ctx context.Context, req *protos.UpdateNotificationRequest) error

func (*Store) UpdatePipeline

func (s *Store) UpdatePipeline(ctx context.Context, pipeline *protos.Pipeline) error

func (*Store) WatchKeys

func (s *Store) WatchKeys(quitCtx context.Context, key string) chan struct{}

WatchKeys will watch for key changes for given key pattern; every time key is updated, it will send a message on the return channel.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL