gateway

package
v0.0.0-...-0ade494 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2022 License: AGPL-3.0 Imports: 45 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DELIMITER = string("<<>>")
)

Variables

View Source
var (
	ReadTimeout       time.Duration
	ReadHeaderTimeout time.Duration
	WriteTimeout      time.Duration
	IdleTimeout       time.Duration

	Diagnostics diagnostics.DiagnosticsI
)
View Source
var BatchEvent = []byte(`
	{
		"batch": [
		]
	}
`)
View Source
var CustomVal string

CustomVal is used as a key in the jobsDB customval column

Functions

func Init

func Init()

func IsEnableRateLimit

func IsEnableRateLimit() bool

IsEnableRateLimit is true if rate limiting is enabled on gateway

func SetEnableEventSchemasFeature

func SetEnableEventSchemasFeature(b bool) bool

SetEnableEventSchemasFeature overrides enableEventSchemasFeature configuration and returns previous value

func SetEnableRateLimit

func SetEnableRateLimit(b bool) bool

SetEnableRateLimit overrides enableRateLimit configuration and returns previous value

func SetEnableSuppressUserFeature

func SetEnableSuppressUserFeature(b bool) bool

SetEnableSuppressUserFeature overrides enableSuppressUserFeature configuration and returns previous value

Types

type DSStats

type DSStats struct {
	SourceNums   []SourceEvents
	NumUsers     int
	AvgBatchSize float64
	TableSize    int64
	NumRows      int
}

type GatewayAdmin

type GatewayAdmin struct {
	// contains filtered or unexported fields
}

func (*GatewayAdmin) Status

func (g *GatewayAdmin) Status() interface{}

Status function is used for debug purposes by the admin interface

type GatewayRPCHandler

type GatewayRPCHandler struct {
	// contains filtered or unexported fields
}

func (*GatewayRPCHandler) GetDSFailedJobs

func (g *GatewayRPCHandler) GetDSFailedJobs(arg string, result *string) (err error)

func (*GatewayRPCHandler) GetDSJobCount

func (g *GatewayRPCHandler) GetDSJobCount(arg string, result *string) (err error)

func (*GatewayRPCHandler) GetDSList

func (g *GatewayRPCHandler) GetDSList(_ string, result *string) (err error)

func (*GatewayRPCHandler) GetDSStats

func (g *GatewayRPCHandler) GetDSStats(dsName string, result *string) (err error)

GetDSStats TODO : first_event, last_event min--maxid to event: available in dsrange Average batch size ⇒ num_events we want per ds writeKey, count(*) we want source name to count per ds Num Distinct users per ds Avg Event size = Table_size / (avg Batch size * Total rows) is Table_size correct measure? add job status group by

EventsBySource ================================================================================ │───────│───────────│───────────────────────────────│ │ COUNT │ NAME │ ID │ │───────│───────────│───────────────────────────────│ │ 7 │ test-dev │ "1jEZBT9aChBgbVkfKBjtLau8XAM" │ │ 1 │ and-raid │ "1lBkol38t4m5Xz3zZAeSZ0P26QU" │ │───────│───────────│───────────────────────────────│ ================================================================================ NumUsers : 2 AvgBatchSize : 1 TableSize : 65536 NumRows : 8

func (*GatewayRPCHandler) GetJobByID

func (g *GatewayRPCHandler) GetJobByID(arg string, result *string) (err error)

func (*GatewayRPCHandler) GetJobIDStatus

func (g *GatewayRPCHandler) GetJobIDStatus(arg string, result *string) (err error)

type HandleT

type HandleT struct {
	ProcessRequestTime stats.RudderStats
	// contains filtered or unexported fields
}

HandleT is the struct returned by the Setup call

func (*HandleT) GetWebhookSourceDefName

func (gateway *HandleT) GetWebhookSourceDefName(writeKey string) (name string, ok bool)

GetWebhookSourceDefName returns the webhook source definition name by write key

func (*HandleT) IncrementAckCount

func (gateway *HandleT) IncrementAckCount(count uint64)

IncrementAckCount increments the acknowledged count for gateway requests

func (*HandleT) IncrementRecvCount

func (gateway *HandleT) IncrementRecvCount(count uint64)

IncrementRecvCount increments the received count for gateway requests

func (*HandleT) MaxReqSize

func (*HandleT) MaxReqSize() int

MaxReqSize is the maximum request body size, in bytes, accepted by gateway web handlers

func (*HandleT) ProcessWebRequest

func (gateway *HandleT) ProcessWebRequest(w *http.ResponseWriter, r *http.Request, reqType string, payload []byte, writeKey string) string

ProcessWebRequest is an Interface wrapper for webhook

func (*HandleT) SetReadonlyDBs

func (gateway *HandleT) SetReadonlyDBs(readonlyGatewayDB, readonlyRouterDB, readonlyBatchRouterDB jobsdb.ReadonlyJobsDB)

func (*HandleT) Setup

func (gateway *HandleT) Setup(
	application app.Interface, backendConfig backendconfig.BackendConfig, jobsDB jobsdb.JobsDB,
	rateLimiter ratelimiter.RateLimiter, versionHandler func(w http.ResponseWriter, r *http.Request),
	rsourcesService rsources.JobService,
) error

Setup initializes this module: - Monitors backend config for changes. - Starts web request batching goroutine, that batches incoming messages. - Starts web request batch db writer goroutine, that writes incoming batches to JobsDB. - Starts debugging goroutine that prints gateway stats.

This function will block until backend config is initially received.

func (*HandleT) Shutdown

func (gateway *HandleT) Shutdown() error

func (*HandleT) StartAdminHandler

func (gateway *HandleT) StartAdminHandler(ctx context.Context) error

StartAdminHandler for Admin Operations

func (*HandleT) StartWebHandler

func (gateway *HandleT) StartWebHandler(ctx context.Context) error

StartWebHandler starts all gateway web handlers, listening on gateway port. Supports CORS from all origins. This function will block.

func (*HandleT) TrackRequestMetrics

func (gateway *HandleT) TrackRequestMetrics(errorMessage string)

TrackRequestMetrics provides access to add request success/failure telemetry

func (*HandleT) UpdateSourceStats

func (gateway *HandleT) UpdateSourceStats(sourceStats map[string]int, bucket string, sourceTagMap map[string]string)

UpdateSourceStats creates a new stat for every writekey and updates it with the corresponding count

type ImportRequestHandler

type ImportRequestHandler struct{}

ImportRequestHandler is an empty struct to capture import specific request handling functionality

func (*ImportRequestHandler) ProcessRequest

func (irh *ImportRequestHandler) ProcessRequest(gateway *HandleT, w *http.ResponseWriter, r *http.Request, _ string, payload []byte, writeKey string) string

ProcessRequest on ImportRequestHandler splits payload by user and throws them into the webrequestQ and waits for all their responses before returning

type RegularRequestHandler

type RegularRequestHandler struct{}

RegularRequestHandler is an empty struct to capture non-import specific request handling functionality

func (*RegularRequestHandler) ProcessRequest

func (rrh *RegularRequestHandler) ProcessRequest(gateway *HandleT, w *http.ResponseWriter, r *http.Request, reqType string, payload []byte, writeKey string) string

ProcessRequest throws a webRequest into the queue and waits for the response before returning

type RequestHandler

type RequestHandler interface {
	ProcessRequest(gateway *HandleT, w *http.ResponseWriter, r *http.Request, reqType string, payload []byte, writeKey string) string
}

RequestHandler interface for abstracting out server-side import request processing and rest of the calls

type SourceEvents

type SourceEvents struct {
	Count int
	Name  string
	ID    string
}

type SqlRunner

type SqlRunner struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL