internal

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2023 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LINUX_USER        = "edge-extractor"
	LINUX_BIN         = "/usr/local/bin/edge-extractor"
	LINUX_CONFIG_FILE = "/etc/edge-extractor/config.json"
	LINUX_LOG_DIR     = "/var/log/edge-extractor"
)
View Source
const (
	ProcessorStateRunning  = "RUNNING"
	ProcessorStateStarting = "STARTING"
	ProcessorStateShutdown = "SHUTDOWN"
	ProcessorStateStopped  = "STOPPED"
	ProcessorStateNotFound = "NOT_FOUND"
)
View Source
const ConfigSourceExtPipelines = "ext_pipeline_config"
View Source
const ConfigSourceLocal = "local"
View Source
const NewConfigAction = 1

const StartProcessorAction = 1

View Source
const RestartProcessorAction = 2
View Source
const StartProcessorLoopAction = 4
View Source
const StopProcessorAction = 3

Variables

View Source
var Key = ""

Functions

func DecryptString

func DecryptString(key, text string) (string, error)

func EncryptString

func EncryptString(key, text string) (string, error)

func GetBinaryDir

func GetBinaryDir() string

func PrepareLinuxServiceEnv

func PrepareLinuxServiceEnv() error

func RemoveLinuxServiceEnv

func RemoveLinuxServiceEnv() error

function removes edge-extractor linux user and group , removes edge-extractor binary from /usr/local/bin, removes config file from /etc/edge-extractor, removes log directory from /var/log/edge-extractor

func UpdateLinuxServiceBinary

func UpdateLinuxServiceBinary() error

Types

type CdfClient

type CdfClient struct {
	// contains filtered or unexported fields
}

func NewCdfClient

func NewCdfClient(projectName, cdfCluster, clientID, clientSecret string, scopes []string, azureTenantId, tokenUrl string, datasetId int) *CdfClient

func (*CdfClient) BasicUploadFileBody

func (co *CdfClient) BasicUploadFileBody(filePath, fileName, mimeType, uploadUrl string) error

func (*CdfClient) Client

func (co *CdfClient) Client() *cognite.Client

func (*CdfClient) CompareAssets

func (co *CdfClient) CompareAssets(asset1, asset2 core.Asset) bool

CompareAssets compares 2 assets and returs true if they are equal

func (*CdfClient) UploadFile

func (co *CdfClient) UploadFile(filePath, externalId, name, mimeType string, assetId uint64) error

func (*CdfClient) UploadInMemoryBody

func (co *CdfClient) UploadInMemoryBody(body []byte, fileName, mimeType, uploadUrl string) error

func (*CdfClient) UploadInMemoryFile

func (co *CdfClient) UploadInMemoryFile(body []byte, externalId, name, mimeType string, assetId uint64) error

func (*CdfClient) UploadMultipartFileBody

func (co *CdfClient) UploadMultipartFileBody(filePath, fileName, mimeType, uploadUrl string) error

UploadMultipartFileBody currently not supported by CDF

type CdfConfigObserver

type CdfConfigObserver struct {
	// contains filtered or unexported fields
}

func NewCdfConfigObserver

func NewCdfConfigObserver(extractorID string, cogClient *CdfClient, remoteConfigSource string, secretManager *SecretManager) *CdfConfigObserver

func (*CdfConfigObserver) Start

func (intgr *CdfConfigObserver) Start(reloadInterval time.Duration)

Start starts observer process using provided asset filter and reload interval. The operation is non-blocking

func (*CdfConfigObserver) Stop

func (intgr *CdfConfigObserver) Stop()

func (*CdfConfigObserver) SubscribeToConfigUpdates

func (intgr *CdfConfigObserver) SubscribeToConfigUpdates(name string, config interface{}) ConfigActionQueue

SubscribeToConfigUpdates registers Integration in config observer and returns config action queue that Integration can use to receive config updates The queue has capacity of 5 items. If queue is full , the oldest item will be dropped. This is done to avoid blocking of config observer Config events aren't filtered and it's responsibility of Integration to do change detection name - name of Integration config - pointer to Integration config struct

type ConfigAction

type ConfigAction struct {
	Name   int
	Config interface{}
	ProcId uint64
}

type ConfigActionQueue

type ConfigActionQueue chan ConfigAction

type ProcessorState

type ProcessorState struct {
	ID           uint64
	CurrentState string
	TargetState  string
}

type RemoteConfig

type RemoteConfig map[string]json.RawMessage

type SecretManager

type SecretManager struct {
	Key     string
	Secrets map[string]string // map of decrypted secrets
}

func NewSecretManager

func NewSecretManager(key string) *SecretManager

func (*SecretManager) GetEncryptedSecrets

func (sm *SecretManager) GetEncryptedSecrets() (map[string]string, error)

func (*SecretManager) GetSecret

func (sm *SecretManager) GetSecret(key string) string

returns secret either from internal secret store or from ENV variable if it is not found in the store. If secret is not found in ENV variable, returns key (plain text)

func (*SecretManager) LoadEncryptedSecrets

func (sm *SecretManager) LoadEncryptedSecrets(secrets map[string]string) error

LoadEncryptedSecrets loads secrets in encrypted form from map[string]string, decrypts them and stores in internal secret store

func (*SecretManager) LoadSecrets

func (sm *SecretManager) LoadSecrets(secrets map[string]string)

LoadSecrets loads secrets in plain text from map[string]string into internal secret store

type StateTracker

type StateTracker struct {
	// contains filtered or unexported fields
}

StateTracker keep track of current and target states for all processors

func NewStateTracker

func NewStateTracker() *StateTracker

func (*StateTracker) GetProcessorState

func (intgr *StateTracker) GetProcessorState(procId uint64) *ProcessorState

GetProcessorState public version of getProcessorState

func (*StateTracker) SetProcessorCurrentState

func (intgr *StateTracker) SetProcessorCurrentState(procId uint64, state string)

func (*StateTracker) SetProcessorTargetState

func (intgr *StateTracker) SetProcessorTargetState(procId uint64, state string)

func (*StateTracker) WaitForProcessorTargetState

func (intgr *StateTracker) WaitForProcessorTargetState(procId uint64, timeout time.Duration) bool

WaitForProcessorTargetState blocks execution untill processor reaches target or wait operation times out .

type StaticConfig

type StaticConfig struct {
	ProjectName          string
	CdfCluster           string
	AdTenantId           string
	AuthTokenUrl         string
	ClientID             string
	Secret               string
	Scopes               []string
	CdfDatasetID         int
	ExtractorID          string
	RemoteConfigSource   string // local, ext_pipeline_config
	ConfigReloadInterval time.Duration
	EnabledIntegrations  []string
	LogLevel             string
	LogDir               string

	Integrations map[string]json.RawMessage // map of integration configs (key is integration name, value is integration config)
	IsEncrypted  bool
	Secrets      map[string]string // map of encrypted secrets (key is secret name, value is encrypted secret)
}

func (*StaticConfig) DecryptSecrets

func (config *StaticConfig) DecryptSecrets(key string) error

func (*StaticConfig) EncryptSecrets

func (config *StaticConfig) EncryptSecrets(key string) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL