flink

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2023 License: Apache-2.0 Imports: 36 Imported by: 0

Documentation

Index

Constants

View Source
const (
	KindFlinkCluster = "FlinkCluster"
	// Flyte flink task type
	FlinkTaskType = "flink"
)

Variables

This section is empty.

Functions

func FlinkClusterTaskLogs added in v0.2.0

func FlinkClusterTaskLogs(ctx context.Context, config *Config, fi FlinkTaskLogsInput) ([]*core.TaskLog, error)

func GetJobArtifacts added in v0.2.0

func GetJobArtifacts(job *flinkIdl.FlinkJob) []string

func GroupByScheme

func GroupByScheme(artifacts []string) map[string][]string

func KubeClientConfig

func KubeClientConfig(host string, auth Auth) (*restclient.Config, error)

KubeClientConfig ...

func NewAnnotationPatch added in v0.1.12

func NewAnnotationPatch(key string, value string) (client.Patch, error)

func NewFlinkCluster

func NewFlinkCluster(config *Config, taskCtx FlinkTaskContext) (*flinkOp.FlinkCluster, error)

func Validate

func Validate(job *flinkIdl.FlinkJob) error

func ValidateRegEx

func ValidateRegEx(value string, r *regexp.Regexp) error

Types

type Annotations

type Annotations map[string]string

func GetDefaultAnnotations

func GetDefaultAnnotations(taskCtx pluginsCore.TaskExecutionMetadata) Annotations

type Auth

type Auth struct {
	TokenPath  string `json:"tokenPath" pflag:", Token path"`
	CaCertPath string `json:"caCertPath" pflag:", Certificate path"`
}

func (Auth) GetCA

func (auth Auth) GetCA() ([]byte, error)

func (Auth) GetToken

func (auth Auth) GetToken() (string, error)

type ClusterConfig

type ClusterConfig struct {
	Name     string `json:"name" pflag:",Friendly name of the remote cluster"`
	Endpoint string `json:"endpoint" pflag:", Remote K8s cluster endpoint"`
	Auth     Auth   `json:"auth" pflag:"-, Auth setting for the cluster"`
	Enabled  bool   `json:"enabled" pflag:", Boolean flag to enable or disable"`
}

type ClusterName

type ClusterName string

func NewClusterName

func NewClusterName(name string) (ClusterName, error)

func (ClusterName) String

func (cn ClusterName) String() string

func (ClusterName) Validate

func (cn ClusterName) Validate() error

type Config

type Config struct {
	DefaultFlinkCluster     flinkOp.FlinkCluster `json:"defaultFlinkCluster"`
	FlinkPropertiesOverride map[string]string    `json:"flinkPropertiesOverride" pflag:",Key value pairs of flink properties to be overridden in every FlinkJob"`
	LogConfig               logs.LogConfig       `json:"logs"`
	GeneratedNameMaxLength  *int                 `json:"generatedNameMaxLength" pflag:"Specifies the length of TaskExecutionID generated name. default: 50"`
	RemoteClusterConfig     ClusterConfig        `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for array jobs"`
	NonRetryableExitCodes   []int32              `json:"nonRetryableExitCodes" pfFlag:"Defines which job submitter exit codes should not be retried"`
	NonRetryableFlyteCode   *string              `json:"nonRetryableFlyteCode,omitempty" pfFlag:"Defines which code should be returned in case of nonRetryable exit codes"`
}

Config ... Flink-specific configs

func GetFlinkConfig

func GetFlinkConfig() *Config

type ContainerTemplateData added in v0.1.4

type ContainerTemplateData struct {
	ArtifactsByScheme map[string][]string
	Artifacts         []string
}

func NewContainerTemplateData added in v0.1.4

func NewContainerTemplateData(artifacts []string) *ContainerTemplateData

type FlinkCluster

type FlinkCluster flinkOp.FlinkCluster

type FlinkPropertiesTemplateData added in v0.1.5

type FlinkPropertiesTemplateData struct {
	Namespace   string
	ClusterName ClusterName
	Labels      map[string]string
}

func NewFlinkPropertiesTemplateData added in v0.1.5

func NewFlinkPropertiesTemplateData(namespace string, clusterName ClusterName, labels map[string]string) *FlinkPropertiesTemplateData

type FlinkTaskContext

type FlinkTaskContext struct {
	ClusterName ClusterName
	Namespace   string
	Annotations map[string]string
	Labels      map[string]string
	Job         flinkIdl.FlinkJob
}

func NewFlinkTaskContext

func NewFlinkTaskContext(ctx context.Context, taskCtx FlinkTaskExecContext) (*FlinkTaskContext, error)

type FlinkTaskExecContext added in v0.2.0

type FlinkTaskExecContext interface {
	TaskReader() pluginsCore.TaskReader
	TaskExecutionMetadata() pluginsCore.TaskExecutionMetadata
	InputReader() io.InputReader
}

type FlinkTaskLogsInput added in v0.2.0

type FlinkTaskLogsInput struct {
	ClusterName string
	Namespace   string
}

type Labels

type Labels map[string]string

func GetDefaultLabels

func GetDefaultLabels(taskCtx pluginsCore.TaskExecutionMetadata) Labels

type ObjectForPatch added in v0.1.12

type ObjectForPatch struct {
	Metadata ObjectMetaForPatch `json:"metadata"`
}

type ObjectMetaForPatch added in v0.1.12

type ObjectMetaForPatch struct {
	Annotations map[string]interface{} `json:"annotations"`
}

type Properties

type Properties map[string]string

func MergeProperties

func MergeProperties(maps ...Properties) Properties

func (Properties) GetInt

func (p Properties) GetInt(key string) (int, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL