processor

package
v0.0.0-...-0ade494 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2022 License: AGPL-3.0 Imports: 42 Imported by: 0

Documentation

Index

Constants

View Source
const (
	METRICKEYDELIMITER  = "!<<#>>!"
	USER_TRANSFORMATION = "USER_TRANSFORMATION"
	DEST_TRANSFORMATION = "DEST_TRANSFORMATION"
	EVENT_FILTER        = "EVENT_FILTER"
)

Variables

View Source
var (
	GWCustomVal string
)

Functions

func ConvertToFilteredTransformerResponse

func ConvertToFilteredTransformerResponse(events []transformer.TransformerEventT, filter bool) transformer.ResponseT

func Init

func Init()

func RegisterAdminHandlers

func RegisterAdminHandlers(readonlyProcErrorDB jobsdb.ReadonlyJobsDB)

func SetFeaturesRetryAttempts

func SetFeaturesRetryAttempts(overrideAttempts int)

Types

type DestStatT

type DestStatT struct {
	// contains filtered or unexported fields
}

type HandleT

type HandleT struct {
	// contains filtered or unexported fields
}

HandleT is a handle to this object used in main.go

func NewProcessor

func NewProcessor() *HandleT

NewProcessor creates a new Processor instance

func (*HandleT) Setup

func (proc *HandleT) Setup(
	backendConfig backendconfig.BackendConfig, gatewayDB, routerDB jobsdb.JobsDB,
	batchRouterDB, errorDB jobsdb.JobsDB, clearDB *bool, reporting types.ReportingI,
	multiTenantStat multitenant.MultiTenantI, transientSources transientsource.Service,
	rsourcesService rsources.JobService,
)

Setup initializes the module

func (*HandleT) Shutdown

func (proc *HandleT) Shutdown()

func (*HandleT) Start

func (proc *HandleT) Start(ctx context.Context) error

Start starts this processor's main loops.

func (*HandleT) Status

func (proc *HandleT) Status() interface{}

func (*HandleT) Store

func (proc *HandleT) Store(in *storeMessage)

type LifecycleManager

type LifecycleManager struct {
	HandleT *HandleT

	MultitenantStats multitenant.MultiTenantI // need not initialize again
	ReportingI       types.ReportingI         // need not initialize again
	BackendConfig    backendconfig.BackendConfig
	Transformer      transformer.Transformer
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, clearDb *bool, gwDb, rtDb, brtDb, errDb *jobsdb.HandleT,
	tenantDB multitenant.MultiTenantI, reporting types.ReportingI, transientSources transientsource.Service,
	rsourcesService rsources.JobService,
) *LifecycleManager

New creates a new Processor instance

func (*LifecycleManager) Start

func (proc *LifecycleManager) Start() error

Start starts a processor, this is not a blocking call. If the processor is not completely started and the data started coming then also it will not be problematic as we are assuming that the DBs will be up.

func (*LifecycleManager) Stop

func (proc *LifecycleManager) Stop()

Stop stops the processor, this is a blocking call.

type MetricMetadata

type MetricMetadata struct {
	// contains filtered or unexported fields
}

type ParametersT

type ParametersT struct {
	SourceID                string      `json:"source_id"`
	DestinationID           string      `json:"destination_id"`
	ReceivedAt              string      `json:"received_at"`
	TransformAt             string      `json:"transform_at"`
	MessageID               string      `json:"message_id"`
	GatewayJobID            int64       `json:"gateway_job_id"`
	SourceBatchID           string      `json:"source_batch_id"`
	SourceTaskID            string      `json:"source_task_id"`
	SourceTaskRunID         string      `json:"source_task_run_id"`
	SourceJobID             string      `json:"source_job_id"`
	SourceJobRunID          string      `json:"source_job_run_id"`
	EventName               string      `json:"event_name"`
	EventType               string      `json:"event_type"`
	SourceDefinitionID      string      `json:"source_definition_id"`
	DestinationDefinitionID string      `json:"destination_definition_id"`
	SourceCategory          string      `json:"source_category"`
	RecordID                interface{} `json:"record_id"`
	WorkspaceId             string      `json:"workspaceId"`
}

type SourceIDT

type SourceIDT string

type TrackingPlanStatT

type TrackingPlanStatT struct {
	// contains filtered or unexported fields
}

type TransformRequestT

type TransformRequestT struct {
	Event          []transformer.TransformerEventT
	Stage          string
	ProcessingTime float64
	Index          int
}

type WriteKeyT

type WriteKeyT string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL