canyon

package module
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2024 License: MIT Imports: 51 Imported by: 3

README

canyon

Go net/http integration for job queue worker pattern with AWS Lambda and AWS SQS

GoDoc Go Report Card License

Example

package main

import (
    "context"
    "io"
    "log/slog"
    "net/http"
    "os"
    "os/signal"
    "syscall"

    "github.com/mashiike/canyon"
)

func main() {
    slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})))
    ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
    defer cancel()

    opts := []canyon.Option{
        canyon.WithServerAddress(":8080", "/"),
    }
    err := canyon.RunWithContext(ctx, "your-sqs-queue-name", http.HandlerFunc(handler), opts...)
    if err != nil {
        slog.Error("failed to run canyon", "error", err)
        os.Exit(1)
    }
}

func handler(w http.ResponseWriter, r *http.Request) {
    logger := canyon.Logger(r)
    if !canyon.IsWorker(r) {
        logger.Info("server process", slog.String("request", r.URL.Path))
        // handle webhook directly
        messageId, err := canyon.SendToWorker(r, nil)
        if err != nil {
            logger.Error("failed to send sqs message", "error", err)
            w.WriteHeader(http.StatusInternalServerError)
            return
        }
        logger.Info("send sqs message", slog.String("message_id", messageId), slog.String("method", r.Method), slog.String("path", r.URL.Path))
        w.WriteHeader(http.StatusAccepted)
        return
    }

    // handle from sqs message
    logger.Info("worker process", slog.String("request", r.URL.Path))
    bs, err := io.ReadAll(r.Body)
    if err != nil {
        logger.Error("failed to read body", "error", err)
        w.WriteHeader(http.StatusInternalServerError)
        return
    }
    logger.Info("event request body", slog.String("body", string(bs)))
    w.WriteHeader(http.StatusOK) // if 2xx is success, sqs message will be deleted
}

example lambda function in lambda/simple directory.

canyon.RunWithContext(ctx, sqs_queue_name, handler, opts...)

canyon.RunWithContext(ctx, sqs_queue_name, handler, opts...) works as below.

  • If a process is running on Lambda (AWS_EXECUTION_ENV or AWS_LAMBDA_RUNTIME_API environment variable defined),
    • Call lambda.Start()
    • if AWS Lambda invoke request has Records field, call handler as worker.
    • if AWS Lambda invoke request as HTTP integration, call handler as server.
  • Otherwise start two go routines
    • HTTP server is a net/http server
    • SQS worker is a sqs long polling worker for sqs_queue_name,
canyon.IsWorker(r)

canyon.IsWorker(r) returns true if the request is from SQS worker.

if this functions returns false, handler behaves as webhook handling server. if not worker request, canyon.SendToWorker(r, nil) sends request to SQS queue.

if this functions returns true, handler behaves as worker. canyon convert SQS Event to HTTP Request, and set Sqs-Message-Id, Sqs-Message-Attributes-... header to request.

canyon.SendToWorker(r, attributes)

canyon.SendToWorker(r, attributes) sends request to worker with SQS queue. can call only canyon.IsWorker(r) == false request. this function is capsuled sqsClient.SendMessage(ctx, &sqs.SendMessageInput{...}) and returns SendMessageOutput.MessageId and error.

if attributes is nil, sqs message no message attributes. can set map[string]sqs.MessageAttributeValue to attributes. helper function canyon.ToMessageAttributes(...) converts http.Header to sqs.MessageAttributeValue.

Advanced Usage

Change Message Visibility on Worker Failure
package main

//...

func handler(w http.ResponseWriter, r *http.Request) {
    logger := canyon.Logger(r)
    if !canyon.IsWorker(r) {
        // ... 
        return
    }
    // any thing in worker process
    // ...
    if err != nil {
        w.Header().Set("Retry-After", "60") // set retry-after header
        w.WriteHeader(http.StatusServiceUnavailable)
        return
    }
    logger.Info("event request body", slog.String("body", string(bs)))
    w.WriteHeader(http.StatusOK) // if 2xx is success, sqs message will be deleted
}

in worker process, if control sqs message visibility, set Retry-After header. if Retry-After header is set, canyon set message visibility to Retry-After header value + current processing time. if Retry-After header is not set, canyon not change message visibility: keep sqs queue default visibility timeout.

If customizing worker response behavior, use canyon.WithWorkerResponseChecker
package main

//...

func main() {
//...
    opts := []canyon.Option{
        canyon.WithServerAddress(":8080", "/"),
        canyon.WithWrokerResponseChecker(canyon.WorkerResponseCheckerFunc(
            func(_ context.Context, r *http.Response) bool {
                // this function called end of worker process
                return r.StatusCode != http.StatusOK //return isFailed flag
            },
        )),
    }
    err := canyon.RunWithContext(ctx, "your-sqs-queue-name", http.HandlerFunc(handler), opts...)
    if err != nil {
        slog.Error("failed to run canyon", "error", err)
        os.Exit(1)
    }
}

if return true, sqs message will not be deleted.

Large Payload, Request Body upload to S3
package main

//...

func main() {
//...
    b, err := canyon.NewS3Backend("s3://bucket-name/prefix")
    if err != nil {
        slog.Error("failed to create s3 backend", "error", err)
        os.Exit(1)
    }
    b.SetAppName("your-app-name") // if not set, default is "canyon"
    opts := []canyon.Option{
        canyon.WithServerAddress(":8080", "/"),
        canyon.WithBackend(b),
    }
    err := canyon.RunWithContext(ctx, "your-sqs-queue-name", http.HandlerFunc(handler), opts...)
    if err != nil {
        slog.Error("failed to run canyon", "error", err)
        os.Exit(1)
    }
}

if request body size is over 256KB, SQS Send Message API returns error. this case, use canyon.WithBackend option. if this option is set, canyon.IsWorker(r) == false request, request body will be upload to Backend. and canyon.IsWorker(r) == true request, request body will be download from Backend.

canyon.NewS3Backend("s3://bucket-name/prefix") returns canyon.S3Backend instance. this instance is implementation of canyon.Backend interface with AWS S3.

Envoiroment switch option canyon.WithCanyonEnv(envPrefix)

canyon.WithCanyonEnv(envPrefix) option is helper option for environment switch. this options is flexible option. for example, case of envPrefix is CANYON_ below.

if CAYNON_ENV=development, return multiple options (canyon.WithInMemoryQueue() and canyon.WithFileBackend(), canyon.WithVerbose()). file backend setup temporary directory.

if CANYON_ENV=test, return multiple options (canyon.WithInMemoryQueue() and canyon.WithInMemoryBackend()).

other value, same as CANYON_ENV=production. in production mode, enable CAYNON_BACKEND_URL. this environment variable is backend url. for example s3://bucket-name/prefix, setup canyon.NewS3Backend("s3://bucket-name/prefix") and canyon.WithBackend(...) options. and if CANYON_BACKEND_SAVE_APP_NAME is set, set canyon.S3Backend.SetAppName(...)

if backend url is file:///tmp/canyon, setup canyon.NewFileBackend("/tmp/canyon") and canyon.WithBackend(...) options.

for example default usage is

package main

//...

func main() {
//...
    opts := []canyon.Option{
        canyon.WithServerAddress(":8080", "/"),
        canyon.WithCanyonEnv("CANYON_"),
    }
    err := canyon.RunWithContext(ctx, "your-sqs-queue-name", http.HandlerFunc(handler), opts...)
    if err != nil {
        slog.Error("failed to run canyon", "error", err)
        os.Exit(1)
    }
}

set to last of options.

$ CANYON_ENV=development go run main.go

work as local development mode. using in memory queue and temporary file backend.

$ CANYON_ENV=production go run main.go

work as production mode. using AWS SQS and AWS S3.

Lambda Fallback Handler canyon.WithLambdaFallbackHandler(handler)

canyon.WithLambdaFallbackHandler(handler) option is helper option for fallback handler. if lambda payload is not SQS Event, call handler, call this handler.

package main

//...

func main() {
//...
    opts := []canyon.Option{
        canyon.WithServerAddress(":8080", "/"),
        canyon.WithLambdaFallbackHandler(func(ctx context.Context, event json.RawMessage) (interface{}, error) {
            // your fallback handler code
            // call if lambda payload is not SQS Event or HTTP Event
            fmt.Println("fallback handler called:", string(event))
            return nil, nil
        }),
    }
    err := canyon.RunWithContext(ctx, "your-sqs-queue-name", http.HandlerFunc(handler), opts...)
    if err != nil {
        slog.Error("failed to run canyon", "error", err)
        os.Exit(1)
    }
}

on local development, if set lambda callback handler, parse os.Stdin as lambda payload and call handler.

$ echo '{"foo":"bar"}' | go run main.go
<... few lines ...>
fallback handler called: {"foo":"bar"}
<... continue program ...>
Long Delayed Message with EventBridge Scheduler

SQS Delayed Message is max 15 minutes. if you want to more long delayed message, use EventBridge Scheduler. if more long delay, CreateSchedule API call with at expression.

package main

//...

func main() {
//...
    scheduler, err := canyon.NewEventBridgeScheduler(ctx, "schedule-name-prefix.")
    if err != nil {
        slog.Error("failed to create eventbridge scheduler", "error", err)
        os.Exit(1)
    }
    opts := []canyon.Option{
        canyon.WithServerAddress(":8080", "/"),
        canyon.WitScheduler(scheduler),
    }
    err := canyon.RunWithContext(ctx, "your-sqs-queue-name", http.HandlerFunc(handler), opts...)
    if err != nil {
        slog.Error("failed to run canyon", "error", err)
        os.Exit(1)
    }
}

if use canyon.CanyonEnv option, <env prefix>SCHEDULER environment variable is set true, use EventBridge Scheduler.

For testing

caynontest package is helper package for testing. this package like httptest package. for example

func TestXXX(t *testing.T) {
    h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // your handler code
        // can use canyon.IsWorker(r) and canyon.SendToWorker(r, nil)
	})
	r := canyontest.NewRunner(h)
	defer r.Close()

	resp, err := http.Post(r.URL, "application/json", strings.NewReader(`{"foo":"bar baz"}`))
	if err != nil {
		t.Fatal(err)
	}
    // your test code
}

if you want to only handler test, use canyontest.AsServer(h) and canontest.AsWorker(h). this is middleware for handler testing. not start real http server and sqs worker.

func TestServerLogic(t *testing.T) {
    h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // your server logic code
        // canyon.SendToWorker(r, nil)
        // canyon.IsWorker(r) == false
    })
    sender := canyon.WorkerSenderFunc(func(r *http.Request, m canyon.MessageAttributes) (string, error) {
        // call from canyon.SendToWorker()
        return "message-id", nil
    })
    h = canyontest.AsServer(h)
    r := httptest.NewRequest(http.MethodPost, "/", strings.NewReader(`{"foo":"bar baz"}`))
    w := httptest.NewRecorder()
    h.ServeHTTP(w, r)
    // your test code
}

func TestWorkerLogic(t *testing.T) {
    h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // your worker logic code
        // canyon.IsWorker(r) == true
        // r.Header with Sqs-Message-Id, Sqs-Message-Attributes-... headers
    })
    h = canyontest.AsWorker(h)
    r := httptest.NewRequest(http.MethodPost, "/", strings.NewReader(`{"foo":"bar baz"}`))
    w := httptest.NewRecorder()
    h.ServeHTTP(w, r)
    // your test code
}

LICENSE

MIT

Documentation

Overview

Example
package main

import (
	"fmt"
	"io"
	"log/slog"
	"net/http"
	"os"
	"strings"

	"github.com/mashiike/canyon"
	"github.com/mashiike/canyon/canyontest"
)

func main() {
	slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})))
	var messageCh = make(chan string, 1)
	h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		logger := canyon.Logger(r)
		if !canyon.IsWorker(r) {
			logger.Info("handle webhook directly", "method", r.Method, "path", r.URL.Path)
			// handle webhook directly
			messageId, err := canyon.SendToWorker(r, &canyon.SendOptions{
				MessageAttributes: canyon.ToMessageAttributes(
					http.Header{
						"User-Name": []string{"test user"},
					},
				),
			})
			if err != nil {
				logger.Error("failed to send sqs message", "error", err)
				w.WriteHeader(http.StatusInternalServerError)
				return
			}
			logger.Info("send sqs message", "message_id", messageId, "method", r.Method, "path", r.URL.Path)
			w.WriteHeader(http.StatusAccepted)
			return
		}
		// handle from sqs message
		logger.Info("handle webhook directly", "method", r.Method, "path", r.URL.Path, "message_id", r.Header.Get(canyon.HeaderSQSMessageID))
		bs, err := io.ReadAll(r.Body)
		if err != nil {
			logger.Error("failed to read body", "error", err)
			w.WriteHeader(http.StatusInternalServerError)
			return
		}
		defer r.Body.Close()
		messageCh <- string(bs) + ":" + r.Header.Get(canyon.HeaderSQSMessageAttribute("UserName", "String"))
		close(messageCh)
		w.WriteHeader(http.StatusOK) // if return 2xx, sqs message will be deleted from queue
	})
	r := canyontest.NewRunner(h, canyon.WithVarbose())
	defer r.Close()
	resp, err := http.Post(r.URL, "text/plain", strings.NewReader("hello world"))
	if err != nil {
		slog.Error("post failed", "error", err)
		os.Exit(1)
	}
	resp.Body.Close()
	if resp.StatusCode != http.StatusAccepted {
		slog.Error("post failed", "status", resp.StatusCode)
		os.Exit(1)
	} else {
		slog.Info("post success", "status", resp.StatusCode)
	}
	for msg := range messageCh {
		slog.Info("received message", "message", msg)
		fmt.Println(msg)
	}
}
Output:

hello world:test user

Index

Examples

Constants

View Source
const (
	HeaderSQSMessageID              = "Sqs-Message-Id"
	HeaderSQSMessageGroupID         = "Sqs-Message-Group-Id"
	HeaderSQSMessageDelaySeconds    = "Sqs-Message-Delay-Seconds"
	HeaderSQSEventSource            = "Sqs-Event-Source"
	HeaderSQSEventSourceArn         = "Sqs-Event-Source-Arn"
	HeaderSQSAwsRegionHeader        = "Sqs-Aws-Region"
	HeaderPrefixSQSAttribute        = "Sqs-Attribute-"
	HeaderPrefixSQSMessageAttribute = "Sqs-Message-Attribute-"
)

this headers are request headers, when run on worker.

View Source
const (
	HeaderAPIGatewayWebsocketConnectionID = "Api-Gateway-Websocket-Connection-Id"
	HeaderAPIGatewayWebsocketRouteKey     = "Api-Gateway-Websocket-Route-Key"
	HeaderAPIGatewayWebsocketAPIID        = "Api-Gateway-Websocket-Api-Id"
	HeaderAPIGatewayWebsocketStage        = "Api-Gateway-Websocket-Stage"
)

API Gateway Websocket Proxy headers

View Source
const DelayedSQSMessageID = "<delayed sqs message>"

Variables

View Source
var DefaultWorkerResponseChecker = WorkerResponseCheckerFunc(func(ctx context.Context, resp *http.Response) bool {
	return resp.StatusCode >= 300
})

DefaultWorkerResponseChecker is a default WorkerResponseChecker. if http.Response status code is 2xx success, sqs message will be deleted. if http.Response status code is 3xx, 4xx, or 5xx sqs message will not be deleted.

View Source
var LogComponentAttributeKey = "component"

LogComponentAttributeKey is a log/slog attribute key for canyon working component name [worker or server].

Functions

func BackupRequset added in v0.5.0

func BackupRequset(r *http.Request) (*http.Request, func() error, error)

BackupRequestBody returns backuped request body. maybe http handler read request body, and serializer read request body.

func ConnectionIsGone added in v0.7.0

func ConnectionIsGone(err error) bool

ConnectionIsGone returns true if err is GoneException.

func DefaultRouteKeySelector added in v0.7.0

func DefaultRouteKeySelector(body []byte) (string, error)

DefaultRouteKeySelector is a default RouteKeySelector. it returns "action" key from request body.

func DeleteConnection added in v0.7.0

func DeleteConnection(ctx context.Context, connectionID string) error

DeleteConnection deletes connectionID.

func EmbedIsWorkerInContext

func EmbedIsWorkerInContext(ctx context.Context, isWorker bool) context.Context

EmbedIsWorkerInContext embeds isWorker flag in context.

this function is for http.Handler unit testing.
not for production use.

func EmbedWebsocketManagementAPIClient added in v0.7.0

func EmbedWebsocketManagementAPIClient(ctx context.Context, client ManagementAPIClient) context.Context

EmbedWebsocketManagementAPIClient embeds WebsocketManagementAPIClient in context. for testing, not for production use.

func EmbedWorkerSenderInContext added in v0.5.0

func EmbedWorkerSenderInContext(ctx context.Context, sender WorkerSender) context.Context

EmbedWorkerSenderInContext embeds WorkerSender in context. for testing, not for production use.

func ErrorHasRetryAfter added in v0.6.0

func ErrorHasRetryAfter(err error) (int32, bool)

func ExitsConnection added in v0.7.0

func ExitsConnection(ctx context.Context, connectionID string) (bool, error)

ExitsConnection returns true if connectionID exists.

func GetConnection added in v0.7.0

func GetConnection(ctx context.Context, connectionID string) (*apigatewaymanagementapi.GetConnectionOutput, error)

GetConnection gets connectionID.

func HeaderSQSAttribute

func HeaderSQSAttribute(name string) string

HeaderSQSAttribute returns header name for SQS attribute, when run on worker

func HeaderSQSMessageAttribute

func HeaderSQSMessageAttribute(name, dataType string) string

HeaderSQSMessageAttribute returns header name for SQS message attribute, when run on worker

func IsWebsocket added in v0.7.0

func IsWebsocket(r *http.Request) bool

IsWebsocket returns true if the request is from websocket proxy.

func IsWorker

func IsWorker(r *http.Request) bool

IsWorker returns true if the request is from worker. if running with canyon and http.Handler called from sqs message, return true.

func Logger

func Logger(r *http.Request) *slog.Logger

Logger returns slog.Logger with component attribute. if called by sqs message, component is "worker". if called original http request, component is "server".

func PostToConnection added in v0.7.0

func PostToConnection(ctx context.Context, connectionID string, data []byte) error

PostToConnection posts data to connectionID.

func RegisterBackendFactory added in v0.2.0

func RegisterBackendFactory(scheme string, factory func(*url.URL) (Backend, error))

RegisterBackendFactory registers backend factory.

func RestoreRequest added in v0.5.0

func RestoreRequest(r *http.Request) *http.Request

RestoreRequest restores request body from backuped request body.

func Run

func Run(sqsQueueName string, mux http.Handler, opts ...Option) error

func RunWithContext

func RunWithContext(ctx context.Context, sqsQueueName string, mux http.Handler, opts ...Option) error

func SendToWorker added in v0.2.0

func SendToWorker(r *http.Request, opts *SendOptions) (string, error)

func SetAPIGatewayWebsocketProxyHeader added in v0.7.0

func SetAPIGatewayWebsocketProxyHeader(r *http.Request, reqCtx *events.APIGatewayWebsocketProxyRequestContext) *http.Request

SetAPIGatewayWebsocketProxyHeader sets API Gateway Websocket Proxy headers to Request

func SetDefaultAWSConfig added in v0.7.0

func SetDefaultAWSConfig(cfg *aws.Config)

func SetSQSMessageHeader added in v0.2.0

func SetSQSMessageHeader(r *http.Request, message *events.SQSMessage) *http.Request

Set SQS Message headers to Request

func ToMessageAttributes

func ToMessageAttributes(h http.Header) map[string]MessageAttributeValue

ToMessageAttributes converts http.Header to SQS MessageAttributes.

func Used added in v0.7.0

func Used(r *http.Request) bool

Used return true if the request handled by canyon.

func WebsocketConnectionID added in v0.7.0

func WebsocketConnectionID(r *http.Request) string

WebsocketConnectionID returns connection id from API Gateway Websocket Proxy headers.

func WebsocketRouteKey added in v0.7.0

func WebsocketRouteKey(r *http.Request) string

WebsocketRouteKey returns route key from API Gateway Websocket Proxy headers.

func WrapRetryAfter added in v0.6.0

func WrapRetryAfter(err error, retryAfter int32) error

Types

type AppNameSetable added in v0.2.0

type AppNameSetable interface {
	SetAppName(string)
}

type Backend added in v0.2.0

type Backend interface {
	// Save does store data.
	// If data is stored successfully, return URL of stored data.
	// If data is not stored successfully, return error.
	SaveRequestBody(context.Context, *http.Request) (*url.URL, error)

	// Load does load data from URL.
	// If data is loaded successfully, return data.
	// If data is not loaded successfully, return error.
	LoadRequestBody(context.Context, *url.URL) (io.ReadCloser, error)
}

Backend is interface for storing and loading data.

func NewBackend added in v0.2.0

func NewBackend(u *url.URL) (Backend, error)

NewBackend returns new Backend. with backend factory.

type BackendSerializer added in v0.4.0

type BackendSerializer interface {
	Serializer
	WithBackend(backend Backend) Serializer
}

type DefaultSerializer added in v0.4.0

type DefaultSerializer struct {
	Backend              Backend
	Logger               *slog.Logger
	BaseRetrySeconds     int32
	JitterOfRetrySeconds int32
}

DefaultSerializer is a struct for Serialize and Deserialize http.Request as SQS Message.

func NewDefaultSerializer added in v0.4.0

func NewDefaultSerializer() *DefaultSerializer

func (*DefaultSerializer) Clone added in v0.4.0

func (*DefaultSerializer) Deserialize added in v0.4.0

func (s *DefaultSerializer) Deserialize(ctx context.Context, message *events.SQSMessage) (*http.Request, error)

Deserialize does deserialize SQS Message as http.Request.

func (*DefaultSerializer) Serialize added in v0.4.0

Serialize does serialize http.Request as SQS Message.

func (*DefaultSerializer) WithBackend added in v0.4.0

func (s *DefaultSerializer) WithBackend(backend Backend) Serializer

func (*DefaultSerializer) WithLogger added in v0.4.0

func (s *DefaultSerializer) WithLogger(logger *slog.Logger) Serializer

type ErrorWithRetryAfter added in v0.6.0

type ErrorWithRetryAfter struct {
	Err        error
	RetryAfter int32
}

func (*ErrorWithRetryAfter) Error added in v0.6.0

func (e *ErrorWithRetryAfter) Error() string

func (*ErrorWithRetryAfter) Unwrap added in v0.6.0

func (e *ErrorWithRetryAfter) Unwrap() error

type EventBridgeScheduler added in v0.5.0

type EventBridgeScheduler struct {
	// contains filtered or unexported fields
}

func NewEventBridgeScheduler added in v0.5.0

func NewEventBridgeScheduler(ctx context.Context, namePrefix string) (*EventBridgeScheduler, error)

func (*EventBridgeScheduler) RegisterSchedule added in v0.5.0

func (s *EventBridgeScheduler) RegisterSchedule(ctx context.Context, msg *sqs.SendMessageInput) error

func (*EventBridgeScheduler) SetAPIClient added in v0.5.0

func (s *EventBridgeScheduler) SetAPIClient(client EventBridgeSchedulerClient)

func (*EventBridgeScheduler) SetGroupName added in v0.5.0

func (s *EventBridgeScheduler) SetGroupName(groupName string)

func (*EventBridgeScheduler) SetIAMRoleARN added in v0.5.0

func (s *EventBridgeScheduler) SetIAMRoleARN(iamRoleARN string) error

type EventBridgeSchedulerClient added in v0.5.0

type EventBridgeSchedulerClient interface {
	CreateSchedule(ctx context.Context, params *scheduler.CreateScheduleInput, optFns ...func(*scheduler.Options)) (*scheduler.CreateScheduleOutput, error)
}

type FileBackend added in v0.2.0

type FileBackend struct {
	// contains filtered or unexported fields
}

FileBackend is a backend for storing request body local file system

func NewFileBackend added in v0.2.0

func NewFileBackend(path string) (*FileBackend, error)

NewFileBackend returns new FileBackend.

func (*FileBackend) LoadRequestBody added in v0.2.0

func (b *FileBackend) LoadRequestBody(ctx context.Context, backendURL *url.URL) (io.ReadCloser, error)

LoadRequestBody loads request body from local file system.

func (*FileBackend) SaveRequestBody added in v0.2.0

func (b *FileBackend) SaveRequestBody(ctx context.Context, req *http.Request) (*url.URL, error)

SaveRequestBody stores request body in local file system.

type InMemoryBackend added in v0.2.0

type InMemoryBackend struct {
	// contains filtered or unexported fields
}

InMemoryBackend is a backend for storing request body in memory.

func NewInMemoryBackend added in v0.2.0

func NewInMemoryBackend() *InMemoryBackend

NewInMemoryBackend returns new InMemoryBackend.

func (*InMemoryBackend) Entries added in v0.4.0

func (b *InMemoryBackend) Entries() map[string][]byte

func (*InMemoryBackend) LoadRequestBody added in v0.2.0

func (b *InMemoryBackend) LoadRequestBody(ctx context.Context, backendURL *url.URL) (io.ReadCloser, error)

LoadRequestBody loads request body from memory.

func (*InMemoryBackend) SaveRequestBody added in v0.2.0

func (b *InMemoryBackend) SaveRequestBody(ctx context.Context, req *http.Request) (*url.URL, error)

SaveRequestBody stores request body in memory.

type InMemoryScheduler added in v0.5.0

type InMemoryScheduler struct {
	// contains filtered or unexported fields
}

func NewInMemoryScheduler added in v0.5.0

func NewInMemoryScheduler(clientFunc func() SQSClient) *InMemoryScheduler

func (*InMemoryScheduler) RegisterSchedule added in v0.5.0

func (s *InMemoryScheduler) RegisterSchedule(ctx context.Context, msg *sqs.SendMessageInput) error

type LambdaHandlerFunc added in v0.4.0

type LambdaHandlerFunc func(ctx context.Context, event []byte) ([]byte, error)

LambdaHandlerFunc is a adapter for lambda.Handler.

func (LambdaHandlerFunc) Invoke added in v0.4.0

func (f LambdaHandlerFunc) Invoke(ctx context.Context, event []byte) ([]byte, error)

Invoke invokes LambdaHandlerFunc.

type LoggingableSerializer added in v0.4.0

type LoggingableSerializer interface {
	Serializer
	WithLogger(logger *slog.Logger) Serializer
}

type ManagementAPIClient added in v0.7.0

ManagmentAPIBackend is a backend for sending message to websocket connection using Amazon API Gateway Management API.

func NewManagementAPIClient added in v0.7.0

func NewManagementAPIClient(awsCfg aws.Config, endpointURL string) (ManagementAPIClient, error)

NewManagementAPIClient creates a new ManagementAPIClient.

func NewManagementAPIClientWithRequest added in v0.7.0

func NewManagementAPIClientWithRequest(req *http.Request, endpointURL string) (ManagementAPIClient, error)

NewManagementAPIClientWithRequest creates a new ManagementAPIClient with request.

type MessageAttributeValue added in v0.5.0

type MessageAttributeValue struct {
	DataType         string
	StringValue      *string
	BinaryValue      []byte
	BinaryListValues [][]byte
	StringListValues []string
}

MessageAttributeValue is a struct for sqs message attribute.

type Option

type Option func(*runOptions)

Option is a Run() and RunWtihContext() option.

func WithBackend added in v0.2.0

func WithBackend(b Backend) Option

WithBackend returns a new Option if set this option, canyon using backend. when send to sqs message, canyon upload request to any backend. and sqs message body set backend_url.

SQS message body limit is 256KB. this option is useful for large request. for example S3Backend is request body upload to s3.

func WithCanyonEnv added in v0.2.0

func WithCanyonEnv(envPrefix string) Option

WithCanyonEnv returns a new Option env swith default options. if env == "development", set varbose and in memory queue, temporary file backend. if env == "test", set in memory queue, in memory backend. otherwise, setup backend with os.Getenv("CANYON_BACKEND_URL").

if url is empty, not set backend.
if url schema is s3, set s3 backend.
if url schema is file, set file backend.

func WithDisableServer

func WithDisableServer() Option

WithDisableServer returns a new Option that disable server. if set this option, canyon not running server.

func WithDisableWebsocket added in v0.7.0

func WithDisableWebsocket() Option

WithDisableWebsocket returns a new Option that disable websocket. if set this option, canyon not running websocket.

func WithDisableWorker

func WithDisableWorker() Option

WithDisableWorker returns a new Option that disable worker. if set this option, canyon not runnning worker.

func WithInMemoryQueue added in v0.2.0

func WithInMemoryQueue(visibilityTimeout time.Duration, maxReceiveCount int32, dlq io.Writer) Option

WithInMemoryQueue returns a new Option that sets the mode of on memory queue. if run on AWS Lambda, ignore this option. if set this option, canyon not used real AWS SQS. only used on memory queue. for local development.

func WithLambdaFallbackHandler added in v0.3.0

func WithLambdaFallbackHandler(handler interface{}) Option

WithLambdaFallbackHandler returns a new Option that sets the fallback lambda handler. if set this option, call fallback lambda handler when paylaod is not sqs message or http request.

func WithListener

func WithListener(listener net.Listener) Option

WithContext returns a new Option that sets the local server listener. this option for testing. normally, you should not use this option. if production used, WithServerAddress() option.

func WithLogger

func WithLogger(logger *slog.Logger) Option

WithLogger returns a new Option that sets the canyon logger. default is slog.Default().

func WithProxyProtocol

func WithProxyProtocol() Option

WithProxyProtocol returns a new Option that enables to PROXY protocol. if you want to use proxy protocol, you should use this option. if run on AWS Lambda, ignore this option.

func WithSQSClient

func WithSQSClient(sqsClient SQSClient) Option

WithSQSClient returns a new Option that sets the sqs client. this option for testing. normally, you should not use this option. default sqs client is loaded from aws default config.

func WithSQSPollingDuration

func WithSQSPollingDuration(pollingDuration time.Duration) Option

WithSQSPollingDuration returns a new Option that sets the local poller polling duration. if run on AWS Lambda, ignore this option.

func WithScheduler added in v0.5.0

func WithScheduler(scheduler Scheduler) Option

WithScheduler returns a new Option that sets the scheduler. if set this option, canyon using scheduler. when send to delayed second over 900 seconds sqs message, canyon create schedule.

func WithSerializer added in v0.4.0

func WithSerializer(serializer Serializer) Option

WithSerializer returns a new Option that sets the serializer. for http.Request Serializetion format change. if can use Backend implement BackendSerializer interface

func WithServerAddress

func WithServerAddress(address string, prefix string) Option

WithServerAddress returns a new Option that sets the local server address. if you want to use proxy protocol, you should use this option.

func WithStdin added in v0.3.0

func WithStdin(stdin io.Reader) Option

WithStdin returns a new Option that sets the Stdin Stream reader. this option for testing. normally, you should not use this option. to fallback lambda handler test.

func WithVarbose

func WithVarbose() Option

WithVarbose returns a new Option that sets the canyon loggign verbose. this option for debugging canyon. canyon will output many debug log.

func WithWebsocketAddress added in v0.7.0

func WithWebsocketAddress(address string) Option

WithWebsocketAddress returns a new Option that sets the websocket address. if you want to use proxy protocol, you should use this option.

func WithWebsocketCallbackURL added in v0.7.0

func WithWebsocketCallbackURL(url string) Option

WithWebsocketCallbackURL returns a new Option that sets the websocket callback url. if set this option, canyon websocket connections callback url.

func WithWebsocketListener added in v0.7.0

func WithWebsocketListener(listener net.Listener) Option

WithWebsocketListener returns a new Option that sets the websocket listener. this option for testing. normally, you should not use this option. if production used, WithWebsocketAddress() option.

func WithWorkerBatchSize

func WithWorkerBatchSize(batchSize int) Option

WithWorkerBatchSize returns a new Option that sets the local poller batch size. if run on AWS Lambda, ignore this option.

func WithWorkerTimeoutMergin added in v0.5.0

func WithWorkerTimeoutMergin(mergin time.Duration) Option

WithWorkerTimeoutMergin returns a new Option that sets the worker timeout mergin. if set this option, canyon worker timeout is visibility timeout - mergin. default mergin is 1 second. if visibility timeout is 30 seconds, worker timeout is 29 seconds.

func WithWrokerResponseChecker

func WithWrokerResponseChecker(responseChecker WorkerResponseChecker) Option

WithWrokerResponseChecker returns a new Option that sets the worker response checker.

type RetryAfterSerializer added in v0.6.0

type RetryAfterSerializer struct {
	Serializer
	BaseRetrySeconds     int32
	JitterOfRetrySeconds int32
	// contains filtered or unexported fields
}

func NewRetryAfterSerializer added in v0.6.0

func NewRetryAfterSerializer(serializer Serializer, base int32, jitter int32) *RetryAfterSerializer

func (*RetryAfterSerializer) Clone added in v0.6.0

func (*RetryAfterSerializer) Deserialize added in v0.6.0

func (s *RetryAfterSerializer) Deserialize(ctx context.Context, message *events.SQSMessage) (*http.Request, error)

func (*RetryAfterSerializer) Serialize added in v0.6.0

func (*RetryAfterSerializer) WithBackend added in v0.6.0

func (s *RetryAfterSerializer) WithBackend(backend Backend) Serializer

func (*RetryAfterSerializer) WithLogger added in v0.6.0

func (s *RetryAfterSerializer) WithLogger(logger *slog.Logger) Serializer

type RouteKeySelector added in v0.7.0

type RouteKeySelector func(body []byte) (string, error)

RouteKeySelector is a function to select route key from request body. for local.

type S3Backend added in v0.2.0

type S3Backend struct {
	// contains filtered or unexported fields
}

S3Backend is a backend for saving and loading request body to/from S3.

func NewS3Backend added in v0.2.0

func NewS3Backend(s3URLPrefix string) (*S3Backend, error)

NewS3Backend creates a new S3Backend.

func (*S3Backend) LoadRequestBody added in v0.2.0

func (b *S3Backend) LoadRequestBody(ctx context.Context, u *url.URL) (io.ReadCloser, error)

func (*S3Backend) SaveRequestBody added in v0.2.0

func (b *S3Backend) SaveRequestBody(ctx context.Context, req *http.Request) (*url.URL, error)

func (*S3Backend) SetAppName added in v0.2.0

func (b *S3Backend) SetAppName(name string)

SetAppName sets uploader name to S3Backend.

this value is used for metadata of S3 object.

func (*S3Backend) SetS3Client added in v0.2.0

func (b *S3Backend) SetS3Client(s3Client S3Client)

SetS3Client sets S3Client to S3Backend. for testing.

type S3Client added in v0.2.0

S3Client is a client for S3.

type SQSClient

type SQSClient interface {
	SendMessage(ctx context.Context, params *sqs.SendMessageInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageOutput, error)
	ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)
	DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error)
	GetQueueUrl(ctx context.Context, params *sqs.GetQueueUrlInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error)
	GetQueueAttributes(ctx context.Context, params *sqs.GetQueueAttributesInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueAttributesOutput, error)
	ChangeMessageVisibilityBatch(ctx context.Context, params *sqs.ChangeMessageVisibilityBatchInput, optFns ...func(*sqs.Options)) (*sqs.ChangeMessageVisibilityBatchOutput, error)
}

SQSClient is a client for SQS.

type Scheduler added in v0.5.0

type Scheduler interface {
	RegisterSchedule(ctx context.Context, msg *sqs.SendMessageInput) error
}

type SendOptions added in v0.5.0

type SendOptions struct {
	// MessageAttributes is a map of sqs message attributes.
	MessageAttributes map[string]MessageAttributeValue

	// MessageGroupID is a message group id for sqs message.
	MessageGroupID *string

	// DelaySeconds is a delay seconds for sqs message.
	DelaySeconds *int32
}

SendOptions is a struct for sending sqs message.

type Serializer added in v0.2.0

type Serializer interface {
	Serialize(ctx context.Context, r *http.Request) (*sqs.SendMessageInput, error)
	Deserialize(ctx context.Context, message *events.SQSMessage) (*http.Request, error)
}

type WebsocketHTTPBridgeHandler added in v0.7.0

type WebsocketHTTPBridgeHandler struct {
	Handler http.Handler

	websocket.Upgrader
	// contains filtered or unexported fields
}

WebsocketHTTPBridgeHandler is a http.Handler for websocket http bridge.

func NewWebsocketHTTPBridgeHandler added in v0.7.0

func NewWebsocketHTTPBridgeHandler(handler http.Handler) *WebsocketHTTPBridgeHandler

NewWebsocketHTTPBridgeHandler returns new WebsocketHTTPBridgeHandler

func (*WebsocketHTTPBridgeHandler) ServeHTTP added in v0.7.0

func (*WebsocketHTTPBridgeHandler) SetLogger added in v0.7.0

func (h *WebsocketHTTPBridgeHandler) SetLogger(logger *slog.Logger)

SetLogger set logger

func (*WebsocketHTTPBridgeHandler) SetRouteKeySelector added in v0.7.0

func (h *WebsocketHTTPBridgeHandler) SetRouteKeySelector(selector RouteKeySelector)

SetRouteKeySelector set route key selector

func (*WebsocketHTTPBridgeHandler) SetVerbose added in v0.7.0

func (h *WebsocketHTTPBridgeHandler) SetVerbose(verbose bool)

SetVerbose set verbose

type WorkerResponseChecker

type WorkerResponseChecker interface {
	// if return true, sqs message not deleted.
	IsFailure(ctx context.Context, resp *http.Response) bool
}

WorkerResponseChecker is a interface for checking worker's http.Response.

type WorkerResponseCheckerFunc

type WorkerResponseCheckerFunc func(ctx context.Context, resp *http.Response) bool

WorkerResponseCheckerFunc is a func type for checking worker's http.Response.

func (WorkerResponseCheckerFunc) IsFailure

func (f WorkerResponseCheckerFunc) IsFailure(ctx context.Context, resp *http.Response) bool

type WorkerResponseWriter

type WorkerResponseWriter struct {
	bytes.Buffer
	// contains filtered or unexported fields
}

WorkerResponseWriter is a http.ResponseWriter for worker handler. when call worker handler, set this as http.ResponseWriter.

func NewWorkerResponseWriter

func NewWorkerResponseWriter() *WorkerResponseWriter

func (*WorkerResponseWriter) Header

func (w *WorkerResponseWriter) Header() http.Header

func (*WorkerResponseWriter) Response

func (w *WorkerResponseWriter) Response(r *http.Request) *http.Response

func (*WorkerResponseWriter) WriteHeader

func (w *WorkerResponseWriter) WriteHeader(code int)

type WorkerSender added in v0.5.0

type WorkerSender interface {
	SendToWorker(r *http.Request, opts *SendOptions) (string, error)
}

WorkerSender is a interface for sending sqs message. for testing, not for production use.

type WorkerSenderFunc added in v0.5.0

type WorkerSenderFunc func(*http.Request, *SendOptions) (string, error)

WorkerSenderFunc is a func type for sending sqs message. for testing, not for production use.

func (WorkerSenderFunc) SendToWorker added in v0.5.0

func (f WorkerSenderFunc) SendToWorker(r *http.Request, opts *SendOptions) (string, error)

Directories

Path Synopsis
internal
lambda

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL