runner

package
v0.0.0-...-ec16f60 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2021 License: Apache-2.0 Imports: 18 Imported by: 6

Documentation

Overview

Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	STATS_DPM_DIRECTLY_TARGET        = "com_streamsets_pipeline_stage_destination_devnull_StatsDpmDirectlyDTarget"
	REMOTE_TIMESERIES_URL            = "REMOTE_TIMESERIES_URL"
	PIPELINE_COMMIT_ID               = "PIPELINE_COMMIT_ID"
	JOB_ID                           = "JOB_ID"
	UPDATE_WAIT_TIME_MS              = "UPDATE_WAIT_TIME_MS"
	DPM_PIPELINE_COMMIT_ID           = "dpm.pipeline.commitId"
	DPM_JOB_ID                       = "dpm.job.id"
	TIME_SERIES_ANALYSIS_PARAM_ID    = "TIME_SERIES_ANALYSIS"
	TIME_SERIES_ANALYSIS_METADATA_ID = "timeSeriesAnalysis"
)
View Source
const (
	InputRecords    = ".inputRecords"
	OutputRecords   = ".outputRecords"
	ErrorRecords    = ".errorRecords"
	StageErrors     = ".stageErrors"
	BatchProcessing = ".batchProcessing"
)
View Source
const (
	AtMostOnce                    = "AT_MOST_ONCE"
	AtLeastOnce                   = "AT_LEAST_ONCE"
	PipelineBatchProcessing       = "pipeline.batchProcessing"
	PipelineBatchCount            = "pipeline.batchCount"
	PipelineBatchInputRecords     = "pipeline.batchInputRecords"
	PipelineBatchOutputRecords    = "pipeline.batchOutputRecords"
	PipelineBatchErrorRecords     = "pipeline.batchErrorRecords"
	PipelineBatchErrorMessages    = "pipeline.batchErrorMessages"
	PipelineInputRecordsPerBatch  = "pipeline.inputRecordsPerBatch"
	PipelineOutputRecordsPerBatch = "pipeline.outputRecordsPerBatch"
	PipelineErrorRecordsPerBatch  = "pipeline.errorRecordsPerBatch"
	PipelineErrorsPerBatch        = "pipeline.errorsPerBatch"
	MaxCountInCache               = 10
)
View Source
const (
	IssueErrorTemplate = "Initialization Error '%s' on Instance : '%s' "
)

Variables

View Source
var (
	RestOffsetDisallowedStatuses = []string{
		common.FINISHING,
		common.RETRY,
		common.RUNNING,
		common.STARTING,
		common.STOPPING,
	}
	UpdateOffsetAllowedStatuses = []string{
		common.EDITED,
		common.FINISHED,
		common.STOPPED,
	}
)

Functions

func NewEdgeRunner

func NewEdgeRunner(
	pipelineId string,
	config execution.Config,
	runtimeInfo *common.RuntimeInfo,
	pipelineStoreTask pipelineStore.PipelineStoreTask,
) (execution.Runner, error)

Types

type BatchImpl

type BatchImpl struct {
	// contains filtered or unexported fields
}

func NewBatchImpl

func NewBatchImpl(instanceName string, records []api.Record, sourceOffset *string) *BatchImpl

func (*BatchImpl) GetRecords

func (b *BatchImpl) GetRecords() []api.Record

func (*BatchImpl) GetSourceOffset

func (b *BatchImpl) GetSourceOffset() *string

type BatchMakerImpl

type BatchMakerImpl struct {
	StageOutputSnapshot map[string][]api.Record
	// contains filtered or unexported fields
}

func NewBatchMakerImpl

func NewBatchMakerImpl(stagePipe StagePipe, keepSnapshot bool) *BatchMakerImpl

func (*BatchMakerImpl) AddRecord

func (b *BatchMakerImpl) AddRecord(record api.Record, outputLanes ...string)

func (*BatchMakerImpl) GetLanes

func (b *BatchMakerImpl) GetLanes() []string

func (*BatchMakerImpl) GetSize

func (b *BatchMakerImpl) GetSize() int64

func (*BatchMakerImpl) GetStageOutput

func (b *BatchMakerImpl) GetStageOutput(outputLane ...string) []api.Record

type EdgeRunner

type EdgeRunner struct {
	// contains filtered or unexported fields
}

func (*EdgeRunner) CommitOffset

func (edgeRunner *EdgeRunner) CommitOffset(sourceOffset common.SourceOffset) error

func (*EdgeRunner) GetErrorMessages

func (edgeRunner *EdgeRunner) GetErrorMessages(stageInstanceName string, size int) ([]api.ErrorMessage, error)

func (*EdgeRunner) GetErrorRecords

func (edgeRunner *EdgeRunner) GetErrorRecords(stageInstanceName string, size int) ([]api.Record, error)

func (*EdgeRunner) GetHistory

func (edgeRunner *EdgeRunner) GetHistory() ([]*common.PipelineState, error)

func (*EdgeRunner) GetMetrics

func (edgeRunner *EdgeRunner) GetMetrics() (metrics.Registry, error)

func (*EdgeRunner) GetOffset

func (edgeRunner *EdgeRunner) GetOffset() (common.SourceOffset, error)

func (*EdgeRunner) GetPipelineConfig

func (edgeRunner *EdgeRunner) GetPipelineConfig() common.PipelineConfiguration

func (*EdgeRunner) GetStatus

func (edgeRunner *EdgeRunner) GetStatus() (*common.PipelineState, error)

func (*EdgeRunner) IsRemotePipeline

func (edgeRunner *EdgeRunner) IsRemotePipeline() bool

func (*EdgeRunner) ResetOffset

func (edgeRunner *EdgeRunner) ResetOffset() error

func (*EdgeRunner) StartPipeline

func (edgeRunner *EdgeRunner) StartPipeline(
	runtimeParameters map[string]interface{},
) (*common.PipelineState, error)

func (*EdgeRunner) StopPipeline

func (edgeRunner *EdgeRunner) StopPipeline() (*common.PipelineState, error)

type FullPipeBatch

type FullPipeBatch struct {
	StageOutputSnapshot []execution.StageOutput
	// contains filtered or unexported fields
}

func (*FullPipeBatch) CompleteStage

func (b *FullPipeBatch) CompleteStage(batchMaker *BatchMakerImpl)

func (*FullPipeBatch) GetBatch

func (b *FullPipeBatch) GetBatch(pipe StagePipe) *BatchImpl

func (*FullPipeBatch) GetBatchSize

func (b *FullPipeBatch) GetBatchSize() int

func (*FullPipeBatch) GetErrorMessages

func (b *FullPipeBatch) GetErrorMessages() int64

func (*FullPipeBatch) GetErrorRecords

func (b *FullPipeBatch) GetErrorRecords() int64

func (*FullPipeBatch) GetErrorSink

func (b *FullPipeBatch) GetErrorSink() *common.ErrorSink

func (*FullPipeBatch) GetEventRecords

func (b *FullPipeBatch) GetEventRecords() int64

func (*FullPipeBatch) GetEventSink

func (b *FullPipeBatch) GetEventSink() *common.EventSink

func (*FullPipeBatch) GetInputRecords

func (b *FullPipeBatch) GetInputRecords() int64

func (*FullPipeBatch) GetOutputRecords

func (b *FullPipeBatch) GetOutputRecords() int64

func (*FullPipeBatch) GetPreviousOffset

func (b *FullPipeBatch) GetPreviousOffset() *string

func (*FullPipeBatch) GetSnapshotsOfAllStagesOutput

func (b *FullPipeBatch) GetSnapshotsOfAllStagesOutput() []execution.StageOutput

func (*FullPipeBatch) OverrideStageOutput

func (b *FullPipeBatch) OverrideStageOutput(pipe Pipe, stageOutput *execution.StageOutput)

func (*FullPipeBatch) SetNewOffset

func (b *FullPipeBatch) SetNewOffset(newOffset *string)

func (*FullPipeBatch) StartStage

func (b *FullPipeBatch) StartStage(pipe StagePipe) *BatchMakerImpl

type MetricsEventRunnable

type MetricsEventRunnable struct {
	// contains filtered or unexported fields
}

func NewMetricsEventRunnable

func NewMetricsEventRunnable(
	pipelineId string,
	pipelineConfig common.PipelineConfiguration,
	pipelineBean creation.PipelineBean,
	metricRegistry metrics.Registry,
	runtimeInfo *common.RuntimeInfo,
) *MetricsEventRunnable

func (*MetricsEventRunnable) Run

func (m *MetricsEventRunnable) Run()

func (*MetricsEventRunnable) Stop

func (m *MetricsEventRunnable) Stop()

type Pipe

type Pipe interface {
	Init() []validation.Issue
	Process(pipeBatch PipeBatch) error
	Destroy()
	IsSource() bool
	IsProcessor() bool
	IsTarget() bool
	GetInstanceName() string
	GetStageContext() api.StageContext
	GetOutputLanes() []string
	GetEventLanes() []string
}

func NewStagePipe

func NewStagePipe(stage StageRuntime, config execution.Config) Pipe

type PipeBatch

type PipeBatch interface {
	GetBatchSize() int
	GetPreviousOffset() *string
	SetNewOffset(offset *string)
	GetBatch(pipe StagePipe) *BatchImpl
	StartStage(pipe StagePipe) *BatchMakerImpl
	CompleteStage(batchMaker *BatchMakerImpl)
	GetErrorSink() *common.ErrorSink
	GetEventSink() *common.EventSink
	GetInputRecords() int64
	GetOutputRecords() int64
	GetEventRecords() int64
	GetErrorRecords() int64
	GetErrorMessages() int64
	OverrideStageOutput(pipe Pipe, stageOutput *execution.StageOutput)
	GetSnapshotsOfAllStagesOutput() []execution.StageOutput
}

func NewFullPipeBatch

func NewFullPipeBatch(
	tracker execution.SourceOffsetTracker,
	batchSize int,
	errorSink *common.ErrorSink,
	eventSink *common.EventSink,
	snapshotStagesOutput bool,
) PipeBatch

type Pipeline

type Pipeline struct {
	MetricRegistry metrics.Registry
	// contains filtered or unexported fields
}

func NewPipeline

func NewPipeline(
	config execution.Config,
	pipelineConfig common.PipelineConfiguration,
	sourceOffsetTracker execution.SourceOffsetTracker,
	runtimeParameters map[string]interface{},
	metricRegistry metrics.Registry,
) (*Pipeline, []validation.Issue)

func (*Pipeline) GetErrorMessages

func (p *Pipeline) GetErrorMessages(stageInstanceName string, size int) ([]api.ErrorMessage, error)

func (*Pipeline) GetErrorRecords

func (p *Pipeline) GetErrorRecords(stageInstanceName string, size int) ([]api.Record, error)

func (*Pipeline) Init

func (p *Pipeline) Init() []validation.Issue

func (*Pipeline) Run

func (p *Pipeline) Run()

func (*Pipeline) Stop

func (p *Pipeline) Stop()

type ProductionPipeline

type ProductionPipeline struct {
	PipelineConfig common.PipelineConfiguration
	Pipeline       *Pipeline
	MetricRegistry metrics.Registry
}

func NewProductionPipeline

func NewProductionPipeline(
	pipelineId string,
	config execution.Config,
	runner execution.Runner,
	pipelineConfiguration common.PipelineConfiguration,
	runtimeParameters map[string]interface{},
) (*ProductionPipeline, []validation.Issue)

func (*ProductionPipeline) Init

func (p *ProductionPipeline) Init() []validation.Issue

func (*ProductionPipeline) Run

func (p *ProductionPipeline) Run()

func (*ProductionPipeline) Stop

func (p *ProductionPipeline) Stop()

type ProductionSourceOffsetTracker

type ProductionSourceOffsetTracker struct {
	// contains filtered or unexported fields
}

func NewProductionSourceOffsetTracker

func NewProductionSourceOffsetTracker(pipelineId string) (*ProductionSourceOffsetTracker, error)

func (*ProductionSourceOffsetTracker) CommitOffset

func (o *ProductionSourceOffsetTracker) CommitOffset() error

func (*ProductionSourceOffsetTracker) GetLastBatchTime

func (o *ProductionSourceOffsetTracker) GetLastBatchTime() time.Time

func (*ProductionSourceOffsetTracker) GetOffset

func (o *ProductionSourceOffsetTracker) GetOffset() *string

func (*ProductionSourceOffsetTracker) IsFinished

func (o *ProductionSourceOffsetTracker) IsFinished() bool

func (*ProductionSourceOffsetTracker) SetOffset

func (o *ProductionSourceOffsetTracker) SetOffset(newOffset *string)

type SDCMetrics

type SDCMetrics struct {
	Timestamp   int64             `json:"timestamp"`
	Metadata    map[string]string `json:"metadata"`
	SdcId       string            `json:"sdcId"`
	Aggregated  bool              `json:"aggregated"`
	MasterSdcId string            `json:"masterSdcId"`
	Metrics     util.MetricsJson  `json:"metrics"`
}

type StagePipe

type StagePipe struct {
	Stage       StageRuntime
	InputLanes  []string
	OutputLanes []string
	EventLanes  []string
	// contains filtered or unexported fields
}

func (*StagePipe) Destroy

func (s *StagePipe) Destroy()

func (*StagePipe) GetEventLanes

func (s *StagePipe) GetEventLanes() []string

func (*StagePipe) GetInstanceName

func (s *StagePipe) GetInstanceName() string

func (*StagePipe) GetOutputLanes

func (s *StagePipe) GetOutputLanes() []string

func (*StagePipe) GetStageContext

func (s *StagePipe) GetStageContext() api.StageContext

func (*StagePipe) Init

func (s *StagePipe) Init() []validation.Issue

func (*StagePipe) IsProcessor

func (s *StagePipe) IsProcessor() bool

func (*StagePipe) IsSource

func (s *StagePipe) IsSource() bool

func (*StagePipe) IsTarget

func (s *StagePipe) IsTarget() bool

func (*StagePipe) Process

func (s *StagePipe) Process(pipeBatch PipeBatch) error

type StageRuntime

type StageRuntime struct {
	// contains filtered or unexported fields
}

func NewStageRuntime

func NewStageRuntime(
	pipelineBean creation.PipelineBean,
	stageBean creation.StageBean,
	stageContext api.StageContext,
) StageRuntime

func (*StageRuntime) Destroy

func (s *StageRuntime) Destroy()

func (*StageRuntime) Execute

func (s *StageRuntime) Execute(
	previousOffset *string,
	batchSize int,
	batch *BatchImpl,
	batchMaker *BatchMakerImpl,
) (*string, error)

func (*StageRuntime) GetInstanceName

func (s *StageRuntime) GetInstanceName() string

func (*StageRuntime) Init

func (s *StageRuntime) Init() []validation.Issue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL