api

package
v0.0.0-...-ec16f60 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2021 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	EventRecordHeaderType              = "sdc.event.type"
	EventRecordHeaderVersion           = "sdc.event.version"
	EventRecordHeaderCreationTimestamp = "sdc.event.creation_timestamp"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Batch

type Batch interface {
	GetSourceOffset() *string
	GetRecords() []Record
}

Batch is the interface that wraps the basic Batch method.

GetSourceOffset returns the initial offset of the current batch. This return value should be treated as an opaque value as it is source dependent.

GetRecords returns an iterator with all the records in the batch for the current stage. Every time this method is called it returns a new iterator with all records in the batch.

type BatchMaker

type BatchMaker interface {
	GetLanes() []string
	AddRecord(record Record, outputLanes ...string)
}

BatchMaker is the interface that wraps the basic methods for adding record to pipeline. Data Collector Edge origin stages receive an instance of a BatchMaker to write to the pipeline the records they create or process.

GetLanes returns the available lane names (stream names) for the stage.

AddRecord adds a record to the BatchMaker.

type Destination

type Destination interface {
	Write(batch Batch) error
}

Destination is a Data Collector Edge destination stage. Destination stages receive records from origin stages and write them to an external system.

Write method, when running a pipeline, the Data Collector Edge calls this method from the Destination stage to write a batch of records to an external system.

type ErrorMessage

type ErrorMessage struct {
	ErrorCode          string `json:"errorCode"`
	Timestamp          int64  `json:"timestamp"`
	LocalizableMessage string `json:"localized"`
	Stacktrace         string `json:"errorStackTrace"`
}

type Field

type Field struct {
	Type  string
	Value interface{}
}

func Create

func Create(fieldType string, value interface{}) (*Field, error)

func CreateBigFloatField

func CreateBigFloatField(value big.Float) (*Field, error)

func CreateBigIntField

func CreateBigIntField(value big.Int) (*Field, error)

func CreateBoolField

func CreateBoolField(value bool) (*Field, error)

func CreateByteArrayField

func CreateByteArrayField(value []byte) (*Field, error)

func CreateByteField

func CreateByteField(value byte) (*Field, error)

func CreateDateTimeField

func CreateDateTimeField(value time.Time) (*Field, error)

func CreateDoubleField

func CreateDoubleField(value float64) (*Field, error)

func CreateField

func CreateField(value interface{}) (*Field, error)

func CreateFieldFromSDCField

func CreateFieldFromSDCField(value interface{}) (*Field, error)

func CreateFileRefField

func CreateFileRefField(value FileRef) (*Field, error)

func CreateFloatField

func CreateFloatField(value float32) (*Field, error)

func CreateFloatListField

func CreateFloatListField(listFloatValue []float64) (*Field, error)

func CreateInteger32Field

func CreateInteger32Field(value int32) (*Field, error)

func CreateIntegerField

func CreateIntegerField(value int) (*Field, error)

func CreateListField

func CreateListField(listValue []interface{}) (*Field, error)

func CreateListFieldWithListOfFields

func CreateListFieldWithListOfFields(listFields []*Field) *Field

func CreateListMapField

func CreateListMapField(listMapValue *linkedhashmap.Map) (*Field, error)

func CreateListMapFieldWithMapOfFields

func CreateListMapFieldWithMapOfFields(mapFields *linkedhashmap.Map) *Field

func CreateLongField

func CreateLongField(value int64) (*Field, error)

func CreateLongFieldU64

func CreateLongFieldU64(value uint64) (*Field, error)

func CreateMapField

func CreateMapField(mapValue map[string]interface{}) (*Field, error)

func CreateMapFieldWithMapOfFields

func CreateMapFieldWithMapOfFields(mapFields map[string]*Field) *Field

func CreateMapListField

func CreateMapListField(listValue []map[string]interface{}) (*Field, error)

func CreateShortField

func CreateShortField(value int8) (*Field, error)

func CreateStringField

func CreateStringField(value string) (*Field, error)

func CreateStringListField

func CreateStringListField(listStringValue []string) (*Field, error)

func CreateUInteger16Field

func CreateUInteger16Field(value uint16) (*Field, error)

func CreateUInteger32Field

func CreateUInteger32Field(value uint32) (*Field, error)

func (*Field) Clone

func (f *Field) Clone() *Field

func (*Field) GetValueAsFloat

func (f *Field) GetValueAsFloat() (float32, error)

type FileRef

type FileRef interface {
	CreateInputStream() (io.Reader, error)
	CloseInputStream(reader io.Reader) error
}
type Header interface {
	GetStageCreator() string

	GetSourceId() string

	GetTrackingId() string

	GetPreviousTrackingId() string

	GetStagesPath() string

	GetErrorDataCollectorId() string

	GetErrorPipelineName() string

	GetErrorMessage() string

	GetErrorStage() string

	GetErrorTimestamp() int64

	GetAttributeNames() []string

	GetAttributes() map[string]string

	GetAttribute(name string) interface{}

	SetAttribute(name string, value string)
}

Header represents metadata about the record

type Origin

type Origin interface {
	Produce(lastSourceOffset *string, maxBatchSize int, batchMaker BatchMaker) (*string, error)
}

Origin is Data Collector Edge origin stage. Origin stages consume data from an external system creating records that can be processed by processor or destination stages.

Produce method - When running a pipeline, the Data Collector Edge calls this method from the Origin stage to obtain a batch of records for processing. Origin stages should not block indefinitely within this method if there is no data. They should have an internal timeout after which they produce an empty batch. By doing so it gives the chance to other stages in pipeline to know that the pipeline is still healthy but there is no data coming; and potentially allowing notifications to external systems. lastSourceOffset the offset returned by the previous call to this method, or NULL if this method is being called for the first time ever. maxBatchSize the requested maximum batch size a single call to this method should produce. batchMaker records created by the Source stage must be added to the BatchMaker for them to be available to the rest of the pipeline. Return the offset for the next call to this method. If NULL is returned it means the Source stage has fully process that data, that no more data is to be expected and that the pipeline should finish once the current batch is fully processed. Return error if the Source had an error while consuming data or creating records.

type Processor

type Processor interface {
	Process(batch Batch, batchMaker BatchMaker) error
}

Processor is Data Collector Edge processor stage. Processor stages receive records from an origin or other processors stages, perform operations on the records and write them out so they can be processed by another processor or destination stages.

Produce method - When running a pipeline, the Data Collector Edge calls this method from the Processor stage with a batch of records to process. Parameter batch - the batch of records to process. Parameter batchMaker - records created by the Processor stage must be added to the BatchMaker for them to be available to the rest of the pipeline.

type Record

type Record interface {
	GetHeader() Header
	Get(fieldPath ...string) (*Field, error)
	Set(field *Field) *Field
	SetField(fieldPath string, field *Field) (*Field, error)
	GetFieldPaths() map[string]bool
	Delete(fieldPath string) (*Field, error)
	Clone() Record
}

Record represents the unit of data Data Collector Edge pipelines process.

GetHeader method returns the metadata header of the record.

Get method returns the root data field of the record.

type Service

type Service interface {
	Init(stageContext StageContext) []validation.Issue
	Destroy() error
}

type Stage

type Stage interface {
	Init(stageContext StageContext) []validation.Issue
	Destroy() error
}

Stage is the base interface for Data Collector Edge stages implementations defining their common context and lifecycle.

Init method initializes the stage. This method is called once when the pipeline is being initialized before the processing any data. If the stage returns an empty list of ConfigIssue then the stage is considered ready to process data. Else it is considered it is mis-configured or that there is a problem and the stage is not ready to process data, thus aborting the pipeline initialization.

Destroy method destroys the stage. It should be used to release any resources held by the stage after initialization or processing. This method is called once when the pipeline is being shutdown. After this method is called, the stage will not be called to process any more data. This method is also called after a failed initialization to allow releasing resources created before the initialization failed.

type StageContext

type StageContext interface {
	// If we plan to support ELs later, we should remove and provide in build support for this
	GetResolvedValue(configValue interface{}) (interface{}, error)
	CreateRecord(recordSourceId string, value interface{}) (Record, error)
	CreateEventRecord(recordSourceId string, value interface{}, eventType string, eventVersion int) (Record, error)
	GetMetrics() metrics.Registry
	ToError(err error, record Record)
	ToEvent(record Record)
	ReportError(err error)
	GetOutputLanes() []string
	Evaluate(value string, configName string, ctx context.Context) (interface{}, error)
	IsErrorStage() bool
	CreateConfigIssue(error string, optional ...interface{}) validation.Issue
	GetService(serviceName string) (Service, error)
	IsPreview() bool
	GetPipelineParameters() map[string]interface{}
	SetStop()
	IsStopped() bool
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL