Documentation ¶
Overview ¶
Example ¶
package main import ( "fmt" "io" "log/slog" "net/http" "os" "strings" "github.com/mashiike/canyon" "github.com/mashiike/canyon/canyontest" ) func main() { slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug}))) var messageCh = make(chan string, 1) h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { logger := canyon.Logger(r) if !canyon.IsWorker(r) { logger.Info("handle webhook directly", "method", r.Method, "path", r.URL.Path) // handle webhook directly messageId, err := canyon.SendToWorker(r, &canyon.SendOptions{ MessageAttributes: canyon.ToMessageAttributes( http.Header{ "User-Name": []string{"test user"}, }, ), }) if err != nil { logger.Error("failed to send sqs message", "error", err) w.WriteHeader(http.StatusInternalServerError) return } logger.Info("send sqs message", "message_id", messageId, "method", r.Method, "path", r.URL.Path) w.WriteHeader(http.StatusAccepted) return } // handle from sqs message logger.Info("handle webhook directly", "method", r.Method, "path", r.URL.Path, "message_id", r.Header.Get(canyon.HeaderSQSMessageID)) bs, err := io.ReadAll(r.Body) if err != nil { logger.Error("failed to read body", "error", err) w.WriteHeader(http.StatusInternalServerError) return } defer r.Body.Close() messageCh <- string(bs) + ":" + r.Header.Get(canyon.HeaderSQSMessageAttribute("UserName", "String")) close(messageCh) w.WriteHeader(http.StatusOK) // if return 2xx, sqs message will be deleted from queue }) r := canyontest.NewRunner(h, canyon.WithVarbose()) defer r.Close() resp, err := http.Post(r.URL, "text/plain", strings.NewReader("hello world")) if err != nil { slog.Error("post failed", "error", err) os.Exit(1) } resp.Body.Close() if resp.StatusCode != http.StatusAccepted { slog.Error("post failed", "status", resp.StatusCode) os.Exit(1) } else { slog.Info("post success", "status", resp.StatusCode) } for msg := range messageCh { slog.Info("received message", "message", msg) fmt.Println(msg) } }
Output: hello world:test user
Index ¶
- Constants
- Variables
- func BackupRequset(r *http.Request) (*http.Request, func() error, error)
- func ConnectionIsGone(err error) bool
- func DefaultRouteKeySelector(body []byte) (string, error)
- func DeleteConnection(ctx context.Context, connectionID string) error
- func EmbedIsWorkerInContext(ctx context.Context, isWorker bool) context.Context
- func EmbedWebsocketManagementAPIClient(ctx context.Context, client ManagementAPIClient) context.Context
- func EmbedWorkerSenderInContext(ctx context.Context, sender WorkerSender) context.Context
- func ErrorHasRetryAfter(err error) (int32, bool)
- func ExitsConnection(ctx context.Context, connectionID string) (bool, error)
- func GetConnection(ctx context.Context, connectionID string) (*apigatewaymanagementapi.GetConnectionOutput, error)
- func HeaderSQSAttribute(name string) string
- func HeaderSQSMessageAttribute(name, dataType string) string
- func IsWebsocket(r *http.Request) bool
- func IsWorker(r *http.Request) bool
- func Logger(r *http.Request) *slog.Logger
- func PostToConnection(ctx context.Context, connectionID string, data []byte) error
- func RegisterBackendFactory(scheme string, factory func(*url.URL) (Backend, error))
- func RestoreRequest(r *http.Request) *http.Request
- func Run(sqsQueueName string, mux http.Handler, opts ...Option) error
- func RunWithContext(ctx context.Context, sqsQueueName string, mux http.Handler, opts ...Option) error
- func SendToWorker(r *http.Request, opts *SendOptions) (string, error)
- func SetAPIGatewayWebsocketProxyHeader(r *http.Request, reqCtx *events.APIGatewayWebsocketProxyRequestContext) *http.Request
- func SetDefaultAWSConfig(cfg *aws.Config)
- func SetSQSMessageHeader(r *http.Request, message *events.SQSMessage) *http.Request
- func ToMessageAttributes(h http.Header) map[string]MessageAttributeValue
- func Used(r *http.Request) bool
- func WebsocketConnectionID(r *http.Request) string
- func WebsocketRouteKey(r *http.Request) string
- func WrapRetryAfter(err error, retryAfter int32) error
- type AppNameSetable
- type Backend
- type BackendSerializer
- type DefaultSerializer
- func (s *DefaultSerializer) Clone() *DefaultSerializer
- func (s *DefaultSerializer) Deserialize(ctx context.Context, message *events.SQSMessage) (*http.Request, error)
- func (s *DefaultSerializer) Serialize(ctx context.Context, r *http.Request) (*sqs.SendMessageInput, error)
- func (s *DefaultSerializer) WithBackend(backend Backend) Serializer
- func (s *DefaultSerializer) WithLogger(logger *slog.Logger) Serializer
- type ErrorWithRetryAfter
- type EventBridgeScheduler
- func (s *EventBridgeScheduler) RegisterSchedule(ctx context.Context, msg *sqs.SendMessageInput) error
- func (s *EventBridgeScheduler) SetAPIClient(client EventBridgeSchedulerClient)
- func (s *EventBridgeScheduler) SetGroupName(groupName string)
- func (s *EventBridgeScheduler) SetIAMRoleARN(iamRoleARN string) error
- type EventBridgeSchedulerClient
- type FileBackend
- type InMemoryBackend
- type InMemoryScheduler
- type LambdaHandlerFunc
- type LoggingableSerializer
- type ManagementAPIClient
- type MessageAttributeValue
- type Option
- func WithBackend(b Backend) Option
- func WithCanyonEnv(envPrefix string) Option
- func WithDisableServer() Option
- func WithDisableWebsocket() Option
- func WithDisableWorker() Option
- func WithInMemoryQueue(visibilityTimeout time.Duration, maxReceiveCount int32, dlq io.Writer) Option
- func WithLambdaFallbackHandler(handler interface{}) Option
- func WithListener(listener net.Listener) Option
- func WithLogger(logger *slog.Logger) Option
- func WithProxyProtocol() Option
- func WithSQSClient(sqsClient SQSClient) Option
- func WithSQSPollingDuration(pollingDuration time.Duration) Option
- func WithScheduler(scheduler Scheduler) Option
- func WithSerializer(serializer Serializer) Option
- func WithServerAddress(address string, prefix string) Option
- func WithStdin(stdin io.Reader) Option
- func WithVarbose() Option
- func WithWebsocketAddress(address string) Option
- func WithWebsocketCallbackURL(url string) Option
- func WithWebsocketListener(listener net.Listener) Option
- func WithWorkerBatchSize(batchSize int) Option
- func WithWorkerTimeoutMergin(mergin time.Duration) Option
- func WithWrokerResponseChecker(responseChecker WorkerResponseChecker) Option
- type RetryAfterSerializer
- func (s *RetryAfterSerializer) Clone() *RetryAfterSerializer
- func (s *RetryAfterSerializer) Deserialize(ctx context.Context, message *events.SQSMessage) (*http.Request, error)
- func (s *RetryAfterSerializer) Serialize(ctx context.Context, r *http.Request) (*sqs.SendMessageInput, error)
- func (s *RetryAfterSerializer) WithBackend(backend Backend) Serializer
- func (s *RetryAfterSerializer) WithLogger(logger *slog.Logger) Serializer
- type RouteKeySelector
- type S3Backend
- type S3Client
- type SQSClient
- type Scheduler
- type SendOptions
- type Serializer
- type WebsocketHTTPBridgeHandler
- func (h *WebsocketHTTPBridgeHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)
- func (h *WebsocketHTTPBridgeHandler) SetLogger(logger *slog.Logger)
- func (h *WebsocketHTTPBridgeHandler) SetRouteKeySelector(selector RouteKeySelector)
- func (h *WebsocketHTTPBridgeHandler) SetVerbose(verbose bool)
- type WorkerResponseChecker
- type WorkerResponseCheckerFunc
- type WorkerResponseWriter
- type WorkerSender
- type WorkerSenderFunc
Examples ¶
Constants ¶
const ( HeaderSQSMessageID = "Sqs-Message-Id" HeaderSQSMessageGroupID = "Sqs-Message-Group-Id" HeaderSQSMessageDelaySeconds = "Sqs-Message-Delay-Seconds" HeaderSQSEventSource = "Sqs-Event-Source" HeaderSQSEventSourceArn = "Sqs-Event-Source-Arn" HeaderSQSAwsRegionHeader = "Sqs-Aws-Region" HeaderPrefixSQSAttribute = "Sqs-Attribute-" HeaderPrefixSQSMessageAttribute = "Sqs-Message-Attribute-" )
this headers are request headers, when run on worker.
const ( HeaderAPIGatewayWebsocketConnectionID = "Api-Gateway-Websocket-Connection-Id" HeaderAPIGatewayWebsocketRouteKey = "Api-Gateway-Websocket-Route-Key" HeaderAPIGatewayWebsocketAPIID = "Api-Gateway-Websocket-Api-Id" HeaderAPIGatewayWebsocketStage = "Api-Gateway-Websocket-Stage" )
API Gateway Websocket Proxy headers
const DelayedSQSMessageID = "<delayed sqs message>"
Variables ¶
var DefaultWorkerResponseChecker = WorkerResponseCheckerFunc(func(ctx context.Context, resp *http.Response) bool {
return resp.StatusCode >= 300
})
DefaultWorkerResponseChecker is a default WorkerResponseChecker. if http.Response status code is 2xx success, sqs message will be deleted. if http.Response status code is 3xx, 4xx, or 5xx sqs message will not be deleted.
var LogComponentAttributeKey = "component"
LogComponentAttributeKey is a log/slog attribute key for canyon working component name [worker or server].
Functions ¶
func BackupRequset ¶ added in v0.5.0
BackupRequestBody returns backuped request body. maybe http handler read request body, and serializer read request body.
func ConnectionIsGone ¶ added in v0.7.0
ConnectionIsGone returns true if err is GoneException.
func DefaultRouteKeySelector ¶ added in v0.7.0
DefaultRouteKeySelector is a default RouteKeySelector. it returns "action" key from request body.
func DeleteConnection ¶ added in v0.7.0
DeleteConnection deletes connectionID.
func EmbedIsWorkerInContext ¶
EmbedIsWorkerInContext embeds isWorker flag in context.
this function is for http.Handler unit testing. not for production use.
func EmbedWebsocketManagementAPIClient ¶ added in v0.7.0
func EmbedWebsocketManagementAPIClient(ctx context.Context, client ManagementAPIClient) context.Context
EmbedWebsocketManagementAPIClient embeds WebsocketManagementAPIClient in context. for testing, not for production use.
func EmbedWorkerSenderInContext ¶ added in v0.5.0
func EmbedWorkerSenderInContext(ctx context.Context, sender WorkerSender) context.Context
EmbedWorkerSenderInContext embeds WorkerSender in context. for testing, not for production use.
func ErrorHasRetryAfter ¶ added in v0.6.0
func ExitsConnection ¶ added in v0.7.0
ExitsConnection returns true if connectionID exists.
func GetConnection ¶ added in v0.7.0
func GetConnection(ctx context.Context, connectionID string) (*apigatewaymanagementapi.GetConnectionOutput, error)
GetConnection gets connectionID.
func HeaderSQSAttribute ¶
HeaderSQSAttribute returns header name for SQS attribute, when run on worker
func HeaderSQSMessageAttribute ¶
HeaderSQSMessageAttribute returns header name for SQS message attribute, when run on worker
func IsWebsocket ¶ added in v0.7.0
IsWebsocket returns true if the request is from websocket proxy.
func IsWorker ¶
IsWorker returns true if the request is from worker. if running with canyon and http.Handler called from sqs message, return true.
func Logger ¶
Logger returns slog.Logger with component attribute. if called by sqs message, component is "worker". if called original http request, component is "server".
func PostToConnection ¶ added in v0.7.0
PostToConnection posts data to connectionID.
func RegisterBackendFactory ¶ added in v0.2.0
RegisterBackendFactory registers backend factory.
func RestoreRequest ¶ added in v0.5.0
RestoreRequest restores request body from backuped request body.
func RunWithContext ¶
func SendToWorker ¶ added in v0.2.0
func SendToWorker(r *http.Request, opts *SendOptions) (string, error)
func SetAPIGatewayWebsocketProxyHeader ¶ added in v0.7.0
func SetAPIGatewayWebsocketProxyHeader(r *http.Request, reqCtx *events.APIGatewayWebsocketProxyRequestContext) *http.Request
SetAPIGatewayWebsocketProxyHeader sets API Gateway Websocket Proxy headers to Request
func SetDefaultAWSConfig ¶ added in v0.7.0
func SetSQSMessageHeader ¶ added in v0.2.0
Set SQS Message headers to Request
func ToMessageAttributes ¶
func ToMessageAttributes(h http.Header) map[string]MessageAttributeValue
ToMessageAttributes converts http.Header to SQS MessageAttributes.
func WebsocketConnectionID ¶ added in v0.7.0
WebsocketConnectionID returns connection id from API Gateway Websocket Proxy headers.
func WebsocketRouteKey ¶ added in v0.7.0
WebsocketRouteKey returns route key from API Gateway Websocket Proxy headers.
func WrapRetryAfter ¶ added in v0.6.0
Types ¶
type AppNameSetable ¶ added in v0.2.0
type AppNameSetable interface {
SetAppName(string)
}
type Backend ¶ added in v0.2.0
type Backend interface { // Save does store data. // If data is stored successfully, return URL of stored data. // If data is not stored successfully, return error. SaveRequestBody(context.Context, *http.Request) (*url.URL, error) // Load does load data from URL. // If data is loaded successfully, return data. // If data is not loaded successfully, return error. LoadRequestBody(context.Context, *url.URL) (io.ReadCloser, error) }
Backend is interface for storing and loading data.
type BackendSerializer ¶ added in v0.4.0
type BackendSerializer interface { Serializer WithBackend(backend Backend) Serializer }
type DefaultSerializer ¶ added in v0.4.0
type DefaultSerializer struct { Backend Backend Logger *slog.Logger BaseRetrySeconds int32 JitterOfRetrySeconds int32 }
DefaultSerializer is a struct for Serialize and Deserialize http.Request as SQS Message.
func NewDefaultSerializer ¶ added in v0.4.0
func NewDefaultSerializer() *DefaultSerializer
func (*DefaultSerializer) Clone ¶ added in v0.4.0
func (s *DefaultSerializer) Clone() *DefaultSerializer
func (*DefaultSerializer) Deserialize ¶ added in v0.4.0
func (s *DefaultSerializer) Deserialize(ctx context.Context, message *events.SQSMessage) (*http.Request, error)
Deserialize does deserialize SQS Message as http.Request.
func (*DefaultSerializer) Serialize ¶ added in v0.4.0
func (s *DefaultSerializer) Serialize(ctx context.Context, r *http.Request) (*sqs.SendMessageInput, error)
Serialize does serialize http.Request as SQS Message.
func (*DefaultSerializer) WithBackend ¶ added in v0.4.0
func (s *DefaultSerializer) WithBackend(backend Backend) Serializer
func (*DefaultSerializer) WithLogger ¶ added in v0.4.0
func (s *DefaultSerializer) WithLogger(logger *slog.Logger) Serializer
type ErrorWithRetryAfter ¶ added in v0.6.0
func (*ErrorWithRetryAfter) Error ¶ added in v0.6.0
func (e *ErrorWithRetryAfter) Error() string
func (*ErrorWithRetryAfter) Unwrap ¶ added in v0.6.0
func (e *ErrorWithRetryAfter) Unwrap() error
type EventBridgeScheduler ¶ added in v0.5.0
type EventBridgeScheduler struct {
// contains filtered or unexported fields
}
func NewEventBridgeScheduler ¶ added in v0.5.0
func NewEventBridgeScheduler(ctx context.Context, namePrefix string) (*EventBridgeScheduler, error)
func (*EventBridgeScheduler) RegisterSchedule ¶ added in v0.5.0
func (s *EventBridgeScheduler) RegisterSchedule(ctx context.Context, msg *sqs.SendMessageInput) error
func (*EventBridgeScheduler) SetAPIClient ¶ added in v0.5.0
func (s *EventBridgeScheduler) SetAPIClient(client EventBridgeSchedulerClient)
func (*EventBridgeScheduler) SetGroupName ¶ added in v0.5.0
func (s *EventBridgeScheduler) SetGroupName(groupName string)
func (*EventBridgeScheduler) SetIAMRoleARN ¶ added in v0.5.0
func (s *EventBridgeScheduler) SetIAMRoleARN(iamRoleARN string) error
type EventBridgeSchedulerClient ¶ added in v0.5.0
type EventBridgeSchedulerClient interface {
CreateSchedule(ctx context.Context, params *scheduler.CreateScheduleInput, optFns ...func(*scheduler.Options)) (*scheduler.CreateScheduleOutput, error)
}
type FileBackend ¶ added in v0.2.0
type FileBackend struct {
// contains filtered or unexported fields
}
FileBackend is a backend for storing request body local file system
func NewFileBackend ¶ added in v0.2.0
func NewFileBackend(path string) (*FileBackend, error)
NewFileBackend returns new FileBackend.
func (*FileBackend) LoadRequestBody ¶ added in v0.2.0
func (b *FileBackend) LoadRequestBody(ctx context.Context, backendURL *url.URL) (io.ReadCloser, error)
LoadRequestBody loads request body from local file system.
func (*FileBackend) SaveRequestBody ¶ added in v0.2.0
SaveRequestBody stores request body in local file system.
type InMemoryBackend ¶ added in v0.2.0
type InMemoryBackend struct {
// contains filtered or unexported fields
}
InMemoryBackend is a backend for storing request body in memory.
func NewInMemoryBackend ¶ added in v0.2.0
func NewInMemoryBackend() *InMemoryBackend
NewInMemoryBackend returns new InMemoryBackend.
func (*InMemoryBackend) Entries ¶ added in v0.4.0
func (b *InMemoryBackend) Entries() map[string][]byte
func (*InMemoryBackend) LoadRequestBody ¶ added in v0.2.0
func (b *InMemoryBackend) LoadRequestBody(ctx context.Context, backendURL *url.URL) (io.ReadCloser, error)
LoadRequestBody loads request body from memory.
func (*InMemoryBackend) SaveRequestBody ¶ added in v0.2.0
SaveRequestBody stores request body in memory.
type InMemoryScheduler ¶ added in v0.5.0
type InMemoryScheduler struct {
// contains filtered or unexported fields
}
func NewInMemoryScheduler ¶ added in v0.5.0
func NewInMemoryScheduler(clientFunc func() SQSClient) *InMemoryScheduler
func (*InMemoryScheduler) RegisterSchedule ¶ added in v0.5.0
func (s *InMemoryScheduler) RegisterSchedule(ctx context.Context, msg *sqs.SendMessageInput) error
type LambdaHandlerFunc ¶ added in v0.4.0
LambdaHandlerFunc is a adapter for lambda.Handler.
type LoggingableSerializer ¶ added in v0.4.0
type LoggingableSerializer interface { Serializer WithLogger(logger *slog.Logger) Serializer }
type ManagementAPIClient ¶ added in v0.7.0
type ManagementAPIClient interface { PostToConnection(context.Context, *apigatewaymanagementapi.PostToConnectionInput, ...func(*apigatewaymanagementapi.Options)) (*apigatewaymanagementapi.PostToConnectionOutput, error) DeleteConnection(context.Context, *apigatewaymanagementapi.DeleteConnectionInput, ...func(*apigatewaymanagementapi.Options)) (*apigatewaymanagementapi.DeleteConnectionOutput, error) GetConnection(context.Context, *apigatewaymanagementapi.GetConnectionInput, ...func(*apigatewaymanagementapi.Options)) (*apigatewaymanagementapi.GetConnectionOutput, error) }
ManagmentAPIBackend is a backend for sending message to websocket connection using Amazon API Gateway Management API.
func NewManagementAPIClient ¶ added in v0.7.0
func NewManagementAPIClient(awsCfg aws.Config, endpointURL string) (ManagementAPIClient, error)
NewManagementAPIClient creates a new ManagementAPIClient.
func NewManagementAPIClientWithRequest ¶ added in v0.7.0
func NewManagementAPIClientWithRequest(req *http.Request, endpointURL string) (ManagementAPIClient, error)
NewManagementAPIClientWithRequest creates a new ManagementAPIClient with request.
type MessageAttributeValue ¶ added in v0.5.0
type MessageAttributeValue struct { DataType string StringValue *string BinaryValue []byte BinaryListValues [][]byte StringListValues []string }
MessageAttributeValue is a struct for sqs message attribute.
type Option ¶
type Option func(*runOptions)
Option is a Run() and RunWtihContext() option.
func WithBackend ¶ added in v0.2.0
WithBackend returns a new Option if set this option, canyon using backend. when send to sqs message, canyon upload request to any backend. and sqs message body set backend_url.
SQS message body limit is 256KB. this option is useful for large request. for example S3Backend is request body upload to s3.
func WithCanyonEnv ¶ added in v0.2.0
WithCanyonEnv returns a new Option env swith default options. if env == "development", set varbose and in memory queue, temporary file backend. if env == "test", set in memory queue, in memory backend. otherwise, setup backend with os.Getenv("CANYON_BACKEND_URL").
if url is empty, not set backend. if url schema is s3, set s3 backend. if url schema is file, set file backend.
func WithDisableServer ¶
func WithDisableServer() Option
WithDisableServer returns a new Option that disable server. if set this option, canyon not running server.
func WithDisableWebsocket ¶ added in v0.7.0
func WithDisableWebsocket() Option
WithDisableWebsocket returns a new Option that disable websocket. if set this option, canyon not running websocket.
func WithDisableWorker ¶
func WithDisableWorker() Option
WithDisableWorker returns a new Option that disable worker. if set this option, canyon not runnning worker.
func WithInMemoryQueue ¶ added in v0.2.0
func WithInMemoryQueue(visibilityTimeout time.Duration, maxReceiveCount int32, dlq io.Writer) Option
WithInMemoryQueue returns a new Option that sets the mode of on memory queue. if run on AWS Lambda, ignore this option. if set this option, canyon not used real AWS SQS. only used on memory queue. for local development.
func WithLambdaFallbackHandler ¶ added in v0.3.0
func WithLambdaFallbackHandler(handler interface{}) Option
WithLambdaFallbackHandler returns a new Option that sets the fallback lambda handler. if set this option, call fallback lambda handler when paylaod is not sqs message or http request.
func WithListener ¶
WithContext returns a new Option that sets the local server listener. this option for testing. normally, you should not use this option. if production used, WithServerAddress() option.
func WithLogger ¶
WithLogger returns a new Option that sets the canyon logger. default is slog.Default().
func WithProxyProtocol ¶
func WithProxyProtocol() Option
WithProxyProtocol returns a new Option that enables to PROXY protocol. if you want to use proxy protocol, you should use this option. if run on AWS Lambda, ignore this option.
func WithSQSClient ¶
WithSQSClient returns a new Option that sets the sqs client. this option for testing. normally, you should not use this option. default sqs client is loaded from aws default config.
func WithSQSPollingDuration ¶
WithSQSPollingDuration returns a new Option that sets the local poller polling duration. if run on AWS Lambda, ignore this option.
func WithScheduler ¶ added in v0.5.0
WithScheduler returns a new Option that sets the scheduler. if set this option, canyon using scheduler. when send to delayed second over 900 seconds sqs message, canyon create schedule.
func WithSerializer ¶ added in v0.4.0
func WithSerializer(serializer Serializer) Option
WithSerializer returns a new Option that sets the serializer. for http.Request Serializetion format change. if can use Backend implement BackendSerializer interface
func WithServerAddress ¶
WithServerAddress returns a new Option that sets the local server address. if you want to use proxy protocol, you should use this option.
func WithStdin ¶ added in v0.3.0
WithStdin returns a new Option that sets the Stdin Stream reader. this option for testing. normally, you should not use this option. to fallback lambda handler test.
func WithVarbose ¶
func WithVarbose() Option
WithVarbose returns a new Option that sets the canyon loggign verbose. this option for debugging canyon. canyon will output many debug log.
func WithWebsocketAddress ¶ added in v0.7.0
WithWebsocketAddress returns a new Option that sets the websocket address. if you want to use proxy protocol, you should use this option.
func WithWebsocketCallbackURL ¶ added in v0.7.0
WithWebsocketCallbackURL returns a new Option that sets the websocket callback url. if set this option, canyon websocket connections callback url.
func WithWebsocketListener ¶ added in v0.7.0
WithWebsocketListener returns a new Option that sets the websocket listener. this option for testing. normally, you should not use this option. if production used, WithWebsocketAddress() option.
func WithWorkerBatchSize ¶
WithWorkerBatchSize returns a new Option that sets the local poller batch size. if run on AWS Lambda, ignore this option.
func WithWorkerTimeoutMergin ¶ added in v0.5.0
WithWorkerTimeoutMergin returns a new Option that sets the worker timeout mergin. if set this option, canyon worker timeout is visibility timeout - mergin. default mergin is 1 second. if visibility timeout is 30 seconds, worker timeout is 29 seconds.
func WithWrokerResponseChecker ¶
func WithWrokerResponseChecker(responseChecker WorkerResponseChecker) Option
WithWrokerResponseChecker returns a new Option that sets the worker response checker.
type RetryAfterSerializer ¶ added in v0.6.0
type RetryAfterSerializer struct { Serializer BaseRetrySeconds int32 JitterOfRetrySeconds int32 // contains filtered or unexported fields }
func NewRetryAfterSerializer ¶ added in v0.6.0
func NewRetryAfterSerializer(serializer Serializer, base int32, jitter int32) *RetryAfterSerializer
func (*RetryAfterSerializer) Clone ¶ added in v0.6.0
func (s *RetryAfterSerializer) Clone() *RetryAfterSerializer
func (*RetryAfterSerializer) Deserialize ¶ added in v0.6.0
func (s *RetryAfterSerializer) Deserialize(ctx context.Context, message *events.SQSMessage) (*http.Request, error)
func (*RetryAfterSerializer) Serialize ¶ added in v0.6.0
func (s *RetryAfterSerializer) Serialize(ctx context.Context, r *http.Request) (*sqs.SendMessageInput, error)
func (*RetryAfterSerializer) WithBackend ¶ added in v0.6.0
func (s *RetryAfterSerializer) WithBackend(backend Backend) Serializer
func (*RetryAfterSerializer) WithLogger ¶ added in v0.6.0
func (s *RetryAfterSerializer) WithLogger(logger *slog.Logger) Serializer
type RouteKeySelector ¶ added in v0.7.0
RouteKeySelector is a function to select route key from request body. for local.
type S3Backend ¶ added in v0.2.0
type S3Backend struct {
// contains filtered or unexported fields
}
S3Backend is a backend for saving and loading request body to/from S3.
func NewS3Backend ¶ added in v0.2.0
NewS3Backend creates a new S3Backend.
func (*S3Backend) LoadRequestBody ¶ added in v0.2.0
func (*S3Backend) SaveRequestBody ¶ added in v0.2.0
func (*S3Backend) SetAppName ¶ added in v0.2.0
SetAppName sets uploader name to S3Backend.
this value is used for metadata of S3 object.
func (*S3Backend) SetS3Client ¶ added in v0.2.0
SetS3Client sets S3Client to S3Backend. for testing.
type S3Client ¶ added in v0.2.0
type S3Client interface { manager.UploadAPIClient manager.DownloadAPIClient s3.HeadObjectAPIClient }
S3Client is a client for S3.
type SQSClient ¶
type SQSClient interface { SendMessage(ctx context.Context, params *sqs.SendMessageInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageOutput, error) ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) GetQueueUrl(ctx context.Context, params *sqs.GetQueueUrlInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error) GetQueueAttributes(ctx context.Context, params *sqs.GetQueueAttributesInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueAttributesOutput, error) ChangeMessageVisibilityBatch(ctx context.Context, params *sqs.ChangeMessageVisibilityBatchInput, optFns ...func(*sqs.Options)) (*sqs.ChangeMessageVisibilityBatchOutput, error) }
SQSClient is a client for SQS.
type Scheduler ¶ added in v0.5.0
type Scheduler interface {
RegisterSchedule(ctx context.Context, msg *sqs.SendMessageInput) error
}
type SendOptions ¶ added in v0.5.0
type SendOptions struct { // MessageAttributes is a map of sqs message attributes. MessageAttributes map[string]MessageAttributeValue // MessageGroupID is a message group id for sqs message. MessageGroupID *string // DelaySeconds is a delay seconds for sqs message. DelaySeconds *int32 }
SendOptions is a struct for sending sqs message.
type Serializer ¶ added in v0.2.0
type WebsocketHTTPBridgeHandler ¶ added in v0.7.0
type WebsocketHTTPBridgeHandler struct { Handler http.Handler websocket.Upgrader // contains filtered or unexported fields }
WebsocketHTTPBridgeHandler is a http.Handler for websocket http bridge.
func NewWebsocketHTTPBridgeHandler ¶ added in v0.7.0
func NewWebsocketHTTPBridgeHandler(handler http.Handler) *WebsocketHTTPBridgeHandler
NewWebsocketHTTPBridgeHandler returns new WebsocketHTTPBridgeHandler
func (*WebsocketHTTPBridgeHandler) ServeHTTP ¶ added in v0.7.0
func (h *WebsocketHTTPBridgeHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)
func (*WebsocketHTTPBridgeHandler) SetLogger ¶ added in v0.7.0
func (h *WebsocketHTTPBridgeHandler) SetLogger(logger *slog.Logger)
SetLogger set logger
func (*WebsocketHTTPBridgeHandler) SetRouteKeySelector ¶ added in v0.7.0
func (h *WebsocketHTTPBridgeHandler) SetRouteKeySelector(selector RouteKeySelector)
SetRouteKeySelector set route key selector
func (*WebsocketHTTPBridgeHandler) SetVerbose ¶ added in v0.7.0
func (h *WebsocketHTTPBridgeHandler) SetVerbose(verbose bool)
SetVerbose set verbose
type WorkerResponseChecker ¶
type WorkerResponseChecker interface { // if return true, sqs message not deleted. IsFailure(ctx context.Context, resp *http.Response) bool }
WorkerResponseChecker is a interface for checking worker's http.Response.
type WorkerResponseCheckerFunc ¶
WorkerResponseCheckerFunc is a func type for checking worker's http.Response.
type WorkerResponseWriter ¶
WorkerResponseWriter is a http.ResponseWriter for worker handler. when call worker handler, set this as http.ResponseWriter.
func NewWorkerResponseWriter ¶
func NewWorkerResponseWriter() *WorkerResponseWriter
func (*WorkerResponseWriter) Header ¶
func (w *WorkerResponseWriter) Header() http.Header
func (*WorkerResponseWriter) Response ¶
func (w *WorkerResponseWriter) Response(r *http.Request) *http.Response
func (*WorkerResponseWriter) WriteHeader ¶
func (w *WorkerResponseWriter) WriteHeader(code int)
type WorkerSender ¶ added in v0.5.0
type WorkerSender interface {
SendToWorker(r *http.Request, opts *SendOptions) (string, error)
}
WorkerSender is a interface for sending sqs message. for testing, not for production use.
type WorkerSenderFunc ¶ added in v0.5.0
type WorkerSenderFunc func(*http.Request, *SendOptions) (string, error)
WorkerSenderFunc is a func type for sending sqs message. for testing, not for production use.
func (WorkerSenderFunc) SendToWorker ¶ added in v0.5.0
func (f WorkerSenderFunc) SendToWorker(r *http.Request, opts *SendOptions) (string, error)