lib

package
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2024 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const PipelineRunning = "running"
View Source
const RequestDeviceId = "deviceId"
View Source
const RequestOperatorId = "operatorId"

Variables

View Source
var ErrNotFound = errors.New("not found")
View Source
var ErrSomethingWentWrong = errors.New("container API - something went wrong")
View Source
var ErrWorkloadNotFound = errors.New("container API - could not delete operator: workload not found")

Functions

func CloseMQTTConnection

func CloseMQTTConnection()

func ConnectMQTTBroker

func ConnectMQTTBroker()

func GenerateFogOperatorStartCommand added in v0.0.4

func GenerateFogOperatorStartCommand(operator Operator, pipelineID string, inputTopics []operatorLib.InputTopic) operatorLib.StartOperatorControlCommand

func GetEnv

func GetEnv(key, fallback string) string

func IntInSlice

func IntInSlice(a int, list []int) bool

func StringInSlice

func StringInSlice(a string, list []string) bool

func ToJson

func ToJson(resp string) map[string]interface{}

Types

type Claims

type Claims struct {
	Sub         string              `json:"sub,omitempty"`
	RealmAccess map[string][]string `json:"realm_access,omitempty"`
}

func (Claims) Valid

func (c Claims) Valid() error

type DownstreamConfig added in v0.0.4

type DownstreamConfig struct {
	Enabled    bool
	InstanceID string
}

type Driver

type Driver interface {
	CreateOperators(pipelineId string, input []Operator, pipelineConfig PipelineConfig) error
	DeleteOperator(pipelineId string, input Operator) error
	GetPipelineStatus(pipelineId string) (PipelineStatus, error)
}

type FlowEngine

type FlowEngine struct {
	// contains filtered or unexported fields
}

func NewFlowEngine

func NewFlowEngine(
	driver Driver,
	parsingService ParsingApiService,
	permissionService PermissionApiService,
	kafak2mqttService Kafka2MqttApiService) *FlowEngine

func (*FlowEngine) DeletePipeline

func (f *FlowEngine) DeletePipeline(id string, userId string, token string) (err error)

func (*FlowEngine) GetPipelineStatus

func (f *FlowEngine) GetPipelineStatus(id, userId, token string) (PipelineStatus, error)

func (*FlowEngine) StartPipeline

func (f *FlowEngine) StartPipeline(pipelineRequest PipelineRequest, userId string, token string) (pipeline Pipeline, err error)

func (*FlowEngine) UpdatePipeline

func (f *FlowEngine) UpdatePipeline(pipelineRequest PipelineRequest, userId string, token string) (pipeline Pipeline, err error)

type InputSelection

type InputSelection struct {
	InputName         string   `json:"inputName,omitempty"`
	AspectId          string   `json:"aspectId,omitempty"`
	FunctionId        string   `json:"functionId,omitempty"`
	CharacteristicIds []string `json:"characteristicIds,omitempty"`
	SelectableId      string   `json:"selectableId,omitempty"`
}

type InputTopic

type InputTopic struct {
	Name         string    `json:"name,omitempty"`
	FilterType   string    `json:"filterType,omitempty"`
	FilterValue  string    `json:"filterValue,omitempty"`
	FilterValue2 string    `json:"filterValue2,omitempty"`
	Mappings     []Mapping `json:"mappings,omitempty"`
}

type Kafka2MqttApiService added in v0.0.4

type Kafka2MqttApiService interface {
	StartOperatorInstance(operatorName, operatorID string, pipelineID, userI, token string) (kafka2mqtt_api.Instance, error)
	RemoveInstance(id, pipelineID, userID, token string) error
}

type LoggerMiddleWare

type LoggerMiddleWare struct {
	// contains filtered or unexported fields
}

func NewLogger

func NewLogger(handler http.Handler, logLevel string) *LoggerMiddleWare

func (*LoggerMiddleWare) ServeHTTP

func (this *LoggerMiddleWare) ServeHTTP(w http.ResponseWriter, r *http.Request)

type Mapping

type Mapping struct {
	Dest   string `json:"dest,omitempty"`
	Source string `json:"source,omitempty"`
}

type NodeConfig

type NodeConfig struct {
	Name  string `json:"name,omitempty"`
	Value string `json:"value,omitempty"`
}

type NodeInput

type NodeInput struct {
	FilterType string      `json:"filterType,omitempty"`
	FilterIds  string      `json:"filterIds,omitempty"`
	TopicName  string      `json:"topicName,omitempty"`
	Values     []NodeValue `json:"values,omitempty"`
}

type NodeValue

type NodeValue struct {
	Name string `json:"name,omitempty"`
	Path string `json:"path,omitempty"`
}

type Operator

type Operator struct {
	Id               string            `json:"id,omitempty"`
	Name             string            `json:"name,omitempty"`
	ApplicationId    uuid.UUID         `json:"applicationId,omitempty"`
	ImageId          string            `json:"imageId,omitempty"`
	DeploymentType   string            `json:"deploymentType,omitempty"`
	OperatorId       string            `json:"operatorId,omitempty"`
	Config           map[string]string `json:"config,omitempty"`
	OutputTopic      string            `json:"outputTopic,omitempty"`
	PersistData      bool              `json:"persistData,omitempty"`
	InputTopics      []InputTopic
	InputSelections  []InputSelection `json:"inputSelections,omitempty"`
	Cost             uint             `json:"cost"`
	UpstreamConfig   UpstreamConfig   `json:"upstream,omitempty"`
	DownstreamConfig DownstreamConfig `json:"downstream,omitempty"`
}

type OperatorRequestConfig

type OperatorRequestConfig struct {
	Config      map[string]string `json:"config,omitempty"`
	InputTopics []InputTopic      `json:"inputTopics,omitempty"`
}

type ParsingApiService

type ParsingApiService interface {
	GetPipeline(id string, userId string, authorization string) (p parsing_api.Pipeline, err error)
}

type PermissionApiService

type PermissionApiService interface {
	UserHasDevicesReadAccess(ids []string, authorization string) (bool, error)
}

type Pipeline

type Pipeline struct {
	Id                 uuid.UUID  `json:"id,omitempty"`
	FlowId             string     `json:"flowId,omitempty"`
	Name               string     `json:"name,omitempty"`
	Description        string     `json:"description,omitempty"`
	Image              string     `json:"image,omitempty"`
	WindowTime         int        `json:"windowTime,omitempty"`
	MergeStrategy      string     `json:"mergeStrategy,omitempty"`
	ConsumeAllMessages bool       `json:"consumeAllMessages,omitempty"`
	Metrics            bool       `json:"metrics,omitempty"`
	Operators          []Operator `json:"operators,omitempty"`
}

type PipelineConfig

type PipelineConfig struct {
	WindowTime     int
	MergeStrategy  string
	Metrics        bool
	ConsumerOffset string
	FlowId         string
	PipelineId     string
	UserId         string
}

type PipelineNode

type PipelineNode struct {
	NodeId          string           `json:"nodeId, omitempty"`
	Inputs          []NodeInput      `json:"inputs,omitempty"`
	Config          []NodeConfig     `json:"config,omitempty"`
	InputSelections []InputSelection `json:"inputSelections,omitempty"`
	PersistData     bool             `json:"persistData,omitempty"`
}

type PipelineRequest

type PipelineRequest struct {
	Id                 string         `json:"id,omitempty"`
	FlowId             string         `json:"flowId,omitempty"`
	Name               string         `json:"name,omitempty"`
	Description        string         `json:"description,omitempty"`
	WindowTime         int            `json:"windowTime,omitempty"`
	MergeStrategy      string         `json:"mergeStrategy,omitempty"`
	ConsumeAllMessages bool           `json:"consumeAllMessages,omitempty"`
	Metrics            bool           `json:"metrics,omitempty"`
	Nodes              []PipelineNode `json:"nodes,omitempty"`
}

type PipelineResponse

type PipelineResponse struct {
	Id uuid.UUID `json:"id,omitempty"`
}

type PipelineStatus added in v0.0.5

type PipelineStatus struct {
	Running       bool   `json:"running"`
	Transitioning bool   `json:"transitioning"`
	Message       string `json:"message"`
}

type Response

type Response struct {
	Message string `json:"message,omitempty"`
}

type UpstreamConfig added in v0.0.4

type UpstreamConfig struct {
	Enabled bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL