handlers

package
v4.13.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2022 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// APIRequestTask marks a task as an API request task
	APIRequestTask queue.TaskType = "api-request"
)
View Source
var (
	// ErrNoHandlerFound occurs when dispatcher can'f find a registered handler for a task type
	ErrNoHandlerFound = errors.New("no handler found")
)
View Source
var (
	ErrSerializingHearbeat = errors.New("failed to serialize progress payload while sending heartbeat")
)

Functions

func NewAPIRequestHandler

func NewAPIRequestHandler(tokenHeaderName string, tokenCreator tokens.Creator, client *http.Client) queue.TaskHandler

NewAPIRequestHandler creates a task handler that makes an HTTP request to a target API. The response from the request must be valid JSON or a stream of new line-separated JSON objects, otherwise the task will fail.

func NewDispatchHandler

func NewDispatchHandler(handlers map[queue.TaskType]queue.TaskHandler) queue.TaskHandler

NewDispatchHandler creates a task handler that will dispatch tasks to other handlers

func NewJSONAPIHandler added in v4.4.0

func NewJSONAPIHandler(client clients.BaseAPIClient) queue.TaskHandler

NewJSONAPIHandler creates a task handler that makes an JSON HTTP request to a target API using the provided BaseAPIClient.

The response from the request must be valid JSON or a stream of new line-separated JSON objects, otherwise the task will fail

The BaseAPIClient is responsible for bringing its own TokenProvider.

The NewAPIRequestHandler can be preferred if the request is not a JSON payload.

The NewJSONAPIHandler can be preferred because it is easier to mock the BaseAPIClient for tests.

Example usage:

client := clients.NewBaseAPIClient(
	"", // use an empty baseURL because the task spec will hold the URL
	"X-Auth",
	clients.TokenProviderFromCreator(&creator, "apiRequestTask", tokens.Options{}),
	http.DefaultClient,
	false,
)
handler := NewJSONAPIHandler(client)

Alternatively, use it within your custom task handler, this is required if the client behavior is dependent on the task spec:

type customHandler struct {
	tracing.Tracer
}
func (h customHandler) Process(ctx context.Context, task queue.Task, heartbeats chan<- queue.Progress) (err error) {
	span, ctx := h.StartSpan(ctx, "Process")
	defer func() {
		close(heartbeats)
		heartbeats = nil
		h.FinishSpan(span, err)
	}()

	var spec tasks.CustomSpec
	err = json.Unmarshal(task.Spec, &spec)
	if err != nil {
		return err
	}

	creator := specSpecificTokenCreator{
		projectID: spec.ProjectID,
	}
	client := clients.NewBaseAPIClient(
		"", // use an empty baseURL because the task spec will hold the URL
		"Auth",
		clients.TokenProviderFromCreator(&creator, "apiRequestTask", tokens.Options{}),
		http.DefaultClient,
		false,
	)
	client = clients.WithRetry(client, maxAttempts, backoff.Exponential())
	taskHandler := handlers.NewJSONAPIHandler(client)

	return taskHandler.Process(ctx, task, heartbeats)
}

func NewJSONAPIHandlerWithErrorChecker added in v4.7.0

func NewJSONAPIHandlerWithErrorChecker(client clients.BaseAPIClient, checker CheckForErrorFunction) queue.TaskHandler

func NewSQLTaskHandler

func NewSQLTaskHandler(name string, db *sql.DB) queue.TaskHandler

NewSQLTaskHandler creates a sqlTaskHandler handler instance with the given tracing name

Types

type APIRequestProgress

type APIRequestProgress struct {
	// Stage is the current stage of the API request task
	Stage APIRequestStage `json:"stage,omitempty"`
	// Duration of the HTTP request
	Duration *time.Duration `json:"duration,omitempty"`
	// ReturnedStatus is a status returned from the target endpoint
	ReturnedStatus *int `json:"returnedStatus,omitempty"`
	// ReturnedBody is a body returned from the target endpoint
	ReturnedBody *string `json:"returnedBody,omitempty"`
	// ErrorMessage contains an error message string if it occurs during the update process
	ErrorMessage *string `json:"errorMessage,omitempty"`
}

APIRequestProgress describes the progress of the API request task stored during the heartbeat handling

type APIRequestStage

type APIRequestStage string
var (
	// RequestPreparing means the task is preparing the request parameters and the body
	RequestPreparing APIRequestStage = "preparing"
	// RequestPending means the request was sent, awaiting the response
	RequestPending APIRequestStage = "pending"
	// RequestResponse means the response was received
	RequestResponse APIRequestStage = "response"
)

type APIRequestTaskSpec

type APIRequestTaskSpec struct {
	// Method to use for the API request
	Method string `json:"method"`
	// URL is the target URL for the request.
	// Must be an absolute URL that contains the scheme and the host components.
	URL string `json:"url"`
	// RequestBody to send
	RequestBody string `json:"requestBody"`
	// RequestHeaders to send
	RequestHeaders map[string]string `json:"requestHeaders"`
	// Authorized if `true` the task will send a header with the
	// signed JWT token as a part of the request
	Authorized bool `json:"authorized"`
	// ExpectedStatus is an HTTP status expected as a response.
	// If it does not match the actual status the task fails
	ExpectedStatus int `json:"expectedStatus"`
}

APIRequestTaskSpec describes the specification of the API request task

type CheckForErrorFunction added in v4.7.0

type CheckForErrorFunction func(m json.RawMessage) error

CheckForErrorStatus checks if the response contains an error status

type SQLExecTaskSpec

type SQLExecTaskSpec struct {
	// SQL is the actual sql that will be run
	SQL string `json:"sql"`
}

SQLExecTaskSpec defines a task that simply executes a single SQL statement. This can be used for simple CRON cleanup tasks, for example.

type SQLTaskProgress

type SQLTaskProgress struct {
	// Duration of the HTTP request in milliseconds
	Duration *int64 `json:"duration,omitempty"`
	// RowsAffected
	RowsAffected *int64 `json:"rowsAffected,omitempty"`
	// ErrorMessage contains an error message string if it occurs during the update process
	ErrorMessage *string `json:"errorMessage,omitempty"`
}

SQLTaskProgress contains the generic progress information for a sql task

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL