router

package
v0.0.0-...-0ade494 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2022 License: AGPL-3.0 Imports: 46 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	Diagnostics diagnostics.DiagnosticsI

	QueryFilters jobsdb.QueryFiltersT
)

Functions

func CleanFailedRecordsTableProcess

func CleanFailedRecordsTableProcess(ctx context.Context)

func Init

func Init()

func InitRouterAdmin

func InitRouterAdmin()

func PrepareJobRunIdAbortedEventsMap

func PrepareJobRunIdAbortedEventsMap(parameters json.RawMessage, jobRunIDAbortedEventsMap map[string][]*FailedEventRowT)

func RegisterAdminHandlers

func RegisterAdminHandlers(readonlyRouterDB, readonlyBatchRouterDB jobsdb.ReadonlyJobsDB)

Types

type Admin

type Admin struct {
	// contains filtered or unexported fields
}

func (*Admin) Status

func (ra *Admin) Status() interface{}

Status function is used for debug purposes by the admin interface

type DSStats

type DSStats struct {
	JobCountsByStateAndDestination []JobCountsByStateAndDestination
	ErrorCodeCountsByDestination   []ErrorCodeCountsByDestination
	JobCountByConnections          []JobCountByConnections
	LatestJobStatusCounts          []LatestJobStatusCounts
	UnprocessedJobCounts           int
}

type ErrorCodeCountsByDestination

type ErrorCodeCountsByDestination struct {
	Count         int
	ErrorCode     string
	Destination   string
	DestinationID string
}

type Factory

type Factory struct {
	Reporting        reporter
	Multitenant      tenantStats
	BackendConfig    backendconfig.BackendConfig
	RouterDB         jobsdb.MultiTenantJobsDB
	ProcErrorDB      jobsdb.JobsDB
	TransientSources transientsource.Service
	RsourcesService  rsources.JobService
}

func (*Factory) New

func (f *Factory) New(destinationDefinition backendconfig.DestinationDefinitionT) *HandleT

type FailedEventRowT

type FailedEventRowT struct {
	DestinationID string          `json:"destination_id"`
	RecordID      json.RawMessage `json:"record_id"`
}

type FailedEventsManagerI

type FailedEventsManagerI interface {
	SaveFailedRecordIDs(map[string][]*FailedEventRowT, *sql.Tx)
	DropFailedRecordIDs(jobRunID string)
	FetchFailedRecordIDs(jobRunID string) []*FailedEventRowT
	GetDBHandle() *sql.DB
}

func GetFailedEventsManager

func GetFailedEventsManager() FailedEventsManagerI

type FailedEventsManagerT

type FailedEventsManagerT struct {
	// contains filtered or unexported fields
}

func (*FailedEventsManagerT) DropFailedRecordIDs

func (fem *FailedEventsManagerT) DropFailedRecordIDs(taskRunID string)

func (*FailedEventsManagerT) FetchFailedRecordIDs

func (fem *FailedEventsManagerT) FetchFailedRecordIDs(taskRunID string) []*FailedEventRowT

func (*FailedEventsManagerT) GetDBHandle

func (fem *FailedEventsManagerT) GetDBHandle() *sql.DB

func (*FailedEventsManagerT) SaveFailedRecordIDs

func (fem *FailedEventsManagerT) SaveFailedRecordIDs(taskRunIDFailedEventsMap map[string][]*FailedEventRowT, txn *sql.Tx)

type HandleDestOAuthRespParamsT

type HandleDestOAuthRespParamsT struct {
	// contains filtered or unexported fields
}

type HandleT

type HandleT struct {
	MultitenantI tenantStats

	Reporting reporter
	// contains filtered or unexported fields
}

HandleT is the handle to this module.

func (*HandleT) ExecDisableDestination

func (rt *HandleT) ExecDisableDestination(destination *backendconfig.DestinationT, workspaceId, destResBody, rudderAccountId string) (int, string)

func (*HandleT) HandleOAuthDestResponse

func (rt *HandleT) HandleOAuthDestResponse(params *HandleDestOAuthRespParamsT) (int, string)

func (*HandleT) ResetSleep

func (rt *HandleT) ResetSleep()

ResetSleep this makes the workers reset their sleep

func (*HandleT) Setup

func (rt *HandleT) Setup(backendConfig backendconfig.BackendConfig, jobsDB jobsdb.MultiTenantJobsDB, errorDB jobsdb.JobsDB, destinationDefinition backendconfig.DestinationDefinitionT, transientSources transientsource.Service, rsourcesService rsources.JobService)

Setup initializes this module

func (*HandleT) Shutdown

func (rt *HandleT) Shutdown()

func (*HandleT) Start

func (rt *HandleT) Start()

type JSONResponseHandler

type JSONResponseHandler struct {
	// contains filtered or unexported fields
}

JSONResponseHandler handler for json response

func (*JSONResponseHandler) IsSuccessStatus

func (handler *JSONResponseHandler) IsSuccessStatus(respCode int, respBody string) (returnCode int)

IsSuccessStatus - returns the status code based on the response code and body

type JobCountByConnections

type JobCountByConnections struct {
	Count         int
	SourceId      string
	DestinationId string
}

type JobCountsByStateAndDestination

type JobCountsByStateAndDestination struct {
	Count       int
	State       string
	Destination string
}

type JobParametersT

type JobParametersT struct {
	SourceID                string      `json:"source_id"`
	DestinationID           string      `json:"destination_id"`
	ReceivedAt              string      `json:"received_at"`
	TransformAt             string      `json:"transform_at"`
	SourceBatchID           string      `json:"source_batch_id"`
	SourceTaskID            string      `json:"source_task_id"`
	SourceTaskRunID         string      `json:"source_task_run_id"`
	SourceJobID             string      `json:"source_job_id"`
	SourceJobRunID          string      `json:"source_job_run_id"`
	SourceDefinitionID      string      `json:"source_definition_id"`
	DestinationDefinitionID string      `json:"destination_definition_id"`
	SourceCategory          string      `json:"source_category"`
	RecordID                interface{} `json:"record_id"`
	MessageID               string      `json:"message_id"`
	WorkspaceID             string      `json:"workspaceId"`
	RudderAccountId         string      `json:"rudderAccountId"`
}

JobParametersT struct holds source id and destination id of a job

type JobResponse

type JobResponse struct {
	// contains filtered or unexported fields
}

type LatestJobStatusCounts

type LatestJobStatusCounts struct {
	Count int
	State string
	Rank  int
}

type NetHandleI

type NetHandleI interface {
	SendPost(ctx context.Context, structData integrations.PostParametersT) *utils.SendPostResponse
}

Network interface

type NetHandleT

type NetHandleT struct {
	// contains filtered or unexported fields
}

NetHandleT is the wrapper holding private variables

func (*NetHandleT) SendPost

func (network *NetHandleT) SendPost(ctx context.Context, structData integrations.PostParametersT) *utils.SendPostResponse

SendPost takes the EventPayload of a transformed job, gets the necessary values from the payload and makes a call to destination to push the event to it this returns the statusCode, status and response body from the response of the destination call

func (*NetHandleT) Setup

func (network *NetHandleT) Setup(destID string, netClientTimeout time.Duration)

Setup initializes the module

type ResponseHandlerI

type ResponseHandlerI interface {
	IsSuccessStatus(respCode int, respBody string) (returnCode int)
}

ResponseHandlerI - handle destination response

func New

func New(responseRules map[string]interface{}) ResponseHandlerI

New returns a destination response handler. Can be nil(Check before using this)

type RouterRpcHandler

type RouterRpcHandler struct {
	// contains filtered or unexported fields
}

func (*RouterRpcHandler) GetDSFailedJobs

func (r *RouterRpcHandler) GetDSFailedJobs(arg string, result *string) (err error)

func (*RouterRpcHandler) GetDSJobCount

func (r *RouterRpcHandler) GetDSJobCount(arg string, result *string) (err error)

func (*RouterRpcHandler) GetDSList

func (r *RouterRpcHandler) GetDSList(_ string, result *string) (err error)

func (*RouterRpcHandler) GetDSStats

func (r *RouterRpcHandler) GetDSStats(dsName string, result *string) (err error)

GetDSStats group_by job_status group by custom_val Get all errors = distinct (error), count(*) where state=failed Distinct (src_id, dst_id) Router jobs status flow ⇒ ordered by rank unprocessed_params ⇒ Num jobs not yet picked

func (*RouterRpcHandler) GetJobByID

func (r *RouterRpcHandler) GetJobByID(arg string, result *string) (err error)

func (*RouterRpcHandler) GetJobIDStatus

func (r *RouterRpcHandler) GetJobIDStatus(arg string, result *string) (err error)

type TXTResponseHandler

type TXTResponseHandler struct {
	// contains filtered or unexported fields
}

TXTResponseHandler handler for text response

func (*TXTResponseHandler) IsSuccessStatus

func (handler *TXTResponseHandler) IsSuccessStatus(respCode int, _ string) (returnCode int)

IsSuccessStatus - returns the status code based on the response code and body

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL