sqsd

package module
v1.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 22, 2023 License: MIT Imports: 28 Imported by: 1

README

sqsd by golang

Motivation

This tool emulates sqsd of AWS Elastic Beanstalk worker environments.

The concept of sqsd is simplifying worker process management by lightwaight languages such as Perl and PHP,
and separating consuming process from SQS and job process.
These languages has no defact standard worker libary such as Ruby's sidekiq,
so it's difficult to build worker system or to manage it reliablly.

sqsd works only two things:

  • Fetching queue message from SQS
    • also removing it when job is succeeded
  • Invoking message to job process by HTTP POST request

Many languages' HTTP server library are stable, so user builds worker server by HTTP server.

This (github.com/taiyoh/sqsd) builds its concept without AWS Elastic Beanstalk.

Features

  • designed by Golang
    • fast and low memory usage
  • based on protoactor-go
    • actor model
    • clearing internal component responsibility
  • fetch scoreboard by gRPC
  • run circuit breaker if all worker processes are busy
    • stops automatically if prosecces are not busy
    • unlike CSP, gently run and stop switching by actor model
  • invoke job function directly
    • accepts sqsd.Invoker interface only

Usage

as single binary

setup .env file

INVOKER_URL=http://local.example.com/setup/your/worker/path
QUEUE_URL=https://queue.amazonaws.com/80398EXAMPLE/MyQueue
# INVOKER_TIMEOUT=60s # default
# UNLOCK_INTERVAL=1m # default
# LOCK_EXPIRE=24h # default
# FETCHER_PARALLEL_COUNT=1 # default
# INVOKER_PARALLEL_COUNT=1 # default
# MONITORING_PORT=6969 # default
# LOG_LEVEL=info # default

run it

$ sqsd -e .env

or

$ source .env
$ sqsd

NOTE: sqsd single binary supports HTTP invocation only.

as library
type myInvoker struct{}

func (myInvoker) Invoke(ctx context.Context, q sqsd.Message) error {
    // here is your job process
	return nil
}

func main() {
	sqsd.SetLogLevel(os.Getenv("LOG_LEVEL"))

	queue := sqs.New(session.Must(session.NewSession()))

    dur, _ := time.ParseDuration(os.Getenv("DEFAULT_INVOKER_TIMEOUT"))
    port, _ := strconv.ParseInt(os.Getenv("MONITORING_PORT"), 10, 64)
    fetcherParallel, _ := strconv.ParseInt(os.Getenv("FETCHER_PARALLEL_COUNT"), 10, 64)
    invokerParallel, _ := strconv.ParseInt(os.Getenv("INVOKER_PARALLEL_COUNT"), 10, 64)

    var ivk myInvoker

	sys := sqsd.NewSystem(
		sqsd.GatewayBuilder(queue, os.Getenv("QUEUE_URL"), int(fetcherParallel), dur),
		sqsd.ConsumerBuilder(ivk, int(invokerParallel)),
		sqsd.MonitorBuilder(int(port)),
	)

	logger := protoactorlog.New(protoactorlog.InfoLevel, "[your-worker]")

	ctx, cancel := signal.NotifyContext(
		context.Background(),
		syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP)
	defer cancel()

	if err := sys.Run(ctx); err != nil {
		panic(err)
	}

	logger.Info("end process")

    // wait until log buffer is flushed
	time.Sleep(500 * time.Millisecond)
}

Documentation

Index

Constants

View Source
const DisableMonitoring = -1

DisableMonitoring makes gRPC server disable to run.

Variables

View Source
var ErrRetainMessage = errors.New("this message should be retained")

ErrRetainMessage shows that this message should keep in queue. So, this error means that worker must not to remove message.

View Source
var MonitoringService_ServiceDesc = grpc.ServiceDesc{
	ServiceName: "sqsd.MonitoringService",
	HandlerType: (*MonitoringServiceServer)(nil),
	Methods: []grpc.MethodDesc{
		{
			MethodName: "CurrentWorkings",
			Handler:    _MonitoringService_CurrentWorkings_Handler,
		},
	},
	Streams:  []grpc.StreamDesc{},
	Metadata: "sqsd.proto",
}

MonitoringService_ServiceDesc is the grpc.ServiceDesc for MonitoringService service. It's only intended for direct use with grpc.RegisterService, and not to be introspected or modified (even as a copy)

Functions

func NewLogger added in v1.3.0

func NewLogger(handlerOpts slog.HandlerOptions, w io.Writer, system string) *slog.Logger

NewLogger constructs slog.Logger object with default 'system' attribute.

func RegisterMonitoringServiceServer

func RegisterMonitoringServiceServer(s grpc.ServiceRegistrar, srv MonitoringServiceServer)

func SetWithHandlerOptions added in v1.4.0

func SetWithHandlerOptions(handlerOpts slog.HandlerOptions, writers ...io.Writer)

SetWithHandlerOptions sets default logger with slog handler options. if io.Writer is not supplied, use os.Stderr.

Types

type CurrentWorkingsRequest

type CurrentWorkingsRequest struct {
	// contains filtered or unexported fields
}

func (*CurrentWorkingsRequest) Descriptor deprecated

func (*CurrentWorkingsRequest) Descriptor() ([]byte, []int)

Deprecated: Use CurrentWorkingsRequest.ProtoReflect.Descriptor instead.

func (*CurrentWorkingsRequest) ProtoMessage

func (*CurrentWorkingsRequest) ProtoMessage()

func (*CurrentWorkingsRequest) ProtoReflect

func (x *CurrentWorkingsRequest) ProtoReflect() protoreflect.Message

func (*CurrentWorkingsRequest) Reset

func (x *CurrentWorkingsRequest) Reset()

func (*CurrentWorkingsRequest) String

func (x *CurrentWorkingsRequest) String() string

type CurrentWorkingsResponse

type CurrentWorkingsResponse struct {
	Tasks []*Task `protobuf:"bytes,1,rep,name=tasks,proto3" json:"tasks,omitempty"`
	// contains filtered or unexported fields
}

func (*CurrentWorkingsResponse) Descriptor deprecated

func (*CurrentWorkingsResponse) Descriptor() ([]byte, []int)

Deprecated: Use CurrentWorkingsResponse.ProtoReflect.Descriptor instead.

func (*CurrentWorkingsResponse) GetTasks

func (x *CurrentWorkingsResponse) GetTasks() []*Task

func (*CurrentWorkingsResponse) ProtoMessage

func (*CurrentWorkingsResponse) ProtoMessage()

func (*CurrentWorkingsResponse) ProtoReflect

func (x *CurrentWorkingsResponse) ProtoReflect() protoreflect.Message

func (*CurrentWorkingsResponse) Reset

func (x *CurrentWorkingsResponse) Reset()

func (*CurrentWorkingsResponse) String

func (x *CurrentWorkingsResponse) String() string

type Gateway

type Gateway struct {
	// contains filtered or unexported fields
}

Gateway fetches and removes jobs from SQS.

func NewGateway

func NewGateway(queue *sqs.SQS, queueURL string, params ...GatewayParameter) *Gateway

NewGateway returns Gateway object.

type GatewayParameter

type GatewayParameter func(*gatewayParams)

GatewayParameter sets parameter to fetcher by functional option pattern.

func FetchInterval added in v1.3.5

func FetchInterval(d time.Duration) GatewayParameter

FetchInterval sets interval duration of receiving queue request to fetcher.

func FetchParallel added in v1.3.5

func FetchParallel(n int) GatewayParameter

FetcherParalles sets pallalel count of fetching process to SQS.

func FetcherMaxMessages added in v1.2.1

func FetcherMaxMessages(n int64) GatewayParameter

FetcherMaxMessages sets MaxNumberOfMessages of SQS between 1 and 10. Fetcher's default value is 10. if supplied value is out of range, forcely sets 1 or 10. (if n is less than 1, set 1 and is more than 10, set 10)

func FetcherQueueLocker

func FetcherQueueLocker(l locker.QueueLocker) GatewayParameter

FetcherQueueLocker sets FetcherQueueLocker in Gateway to block duplicated queue.

func FetcherVisibilityTimeout added in v1.4.2

func FetcherVisibilityTimeout(d time.Duration) GatewayParameter

FetcherVisibilityTimeout sets VisibilityTimeout of receiving message request.

func FetcherWaitTime added in v1.3.1

func FetcherWaitTime(d time.Duration) GatewayParameter

FetcherWaitTime sets WaitTimeSecond of receiving message request.

type HTTPInvoker

type HTTPInvoker struct {
	// contains filtered or unexported fields
}

HTTPInvoker invokes worker process by HTTP POST request.

func NewHTTPInvoker

func NewHTTPInvoker(rawurl string, dur time.Duration) (*HTTPInvoker, error)

NewHTTPInvoker returns HTTPInvoker instance.

func (*HTTPInvoker) Invoke

func (ivk *HTTPInvoker) Invoke(ctx context.Context, q Message) error

Invoke run http request to assigned URL.

type Invoker

type Invoker interface {
	Invoke(context.Context, Message) error
}

Invoker invokes worker process by any way.

type Message

type Message struct {
	ID         string
	Payload    string
	Receipt    string
	ReceivedAt time.Time
}

Message provides transition from sqs.Message

type MonitoringService

type MonitoringService struct {
	UnimplementedMonitoringServiceServer
	// contains filtered or unexported fields
}

MonitoringService provides grpc handler for MonitoringService.

func NewMonitoringService

func NewMonitoringService(consumer *worker) *MonitoringService

NewMonitoringService returns new MonitoringService object.

func (*MonitoringService) CurrentWorkings

CurrentWorkings handles CurrentWorkings grpc request using actor system.

func (*MonitoringService) WaitUntilAllEnds

func (s *MonitoringService) WaitUntilAllEnds(timeout time.Duration) error

WaitUntilAllEnds waits until all worker tasks finishes.

type MonitoringServiceClient

type MonitoringServiceClient interface {
	CurrentWorkings(ctx context.Context, in *CurrentWorkingsRequest, opts ...grpc.CallOption) (*CurrentWorkingsResponse, error)
}

MonitoringServiceClient is the client API for MonitoringService service.

For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.

type MonitoringServiceServer

type MonitoringServiceServer interface {
	CurrentWorkings(context.Context, *CurrentWorkingsRequest) (*CurrentWorkingsResponse, error)
	// contains filtered or unexported methods
}

MonitoringServiceServer is the server API for MonitoringService service. All implementations must embed UnimplementedMonitoringServiceServer for forward compatibility

type System

type System struct {
	// contains filtered or unexported fields
}

System controls actor system of sqsd.

func NewSystem

func NewSystem(builders ...SystemBuilder) *System

NewSystem returns System object.

func (*System) Run

func (s *System) Run(ctx context.Context) error

Run starts running actors and gRPC server.

type SystemBuilder

type SystemBuilder func(*System)

SystemBuilder provides constructor for system object requirements.

func ConsumerBuilder

func ConsumerBuilder(invoker Invoker, parallel int) SystemBuilder

ConsumerBuilder builds consumer for system.

func GatewayBuilder

func GatewayBuilder(queue *sqs.SQS, queueURL string, parallel int, timeout time.Duration, params ...GatewayParameter) SystemBuilder

GatewayBuilder builds gateway for system.

func MonitorBuilder

func MonitorBuilder(port int) SystemBuilder

MonitorBuilder sets monitor server port to system.

type Task

type Task struct {
	Id        string                 `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
	Receipt   string                 `protobuf:"bytes,2,opt,name=receipt,proto3" json:"receipt,omitempty"`
	StartedAt *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=started_at,json=startedAt,proto3" json:"started_at,omitempty"`
	// contains filtered or unexported fields
}

func (*Task) Descriptor deprecated

func (*Task) Descriptor() ([]byte, []int)

Deprecated: Use Task.ProtoReflect.Descriptor instead.

func (*Task) GetId

func (x *Task) GetId() string

func (*Task) GetReceipt

func (x *Task) GetReceipt() string

func (*Task) GetStartedAt

func (x *Task) GetStartedAt() *timestamppb.Timestamp

func (*Task) ProtoMessage

func (*Task) ProtoMessage()

func (*Task) ProtoReflect

func (x *Task) ProtoReflect() protoreflect.Message

func (*Task) Reset

func (x *Task) Reset()

func (*Task) String

func (x *Task) String() string

type UnimplementedMonitoringServiceServer

type UnimplementedMonitoringServiceServer struct {
}

UnimplementedMonitoringServiceServer must be embedded to have forward compatible implementations.

func (UnimplementedMonitoringServiceServer) CurrentWorkings

type UnsafeMonitoringServiceServer

type UnsafeMonitoringServiceServer interface {
	// contains filtered or unexported methods
}

UnsafeMonitoringServiceServer may be embedded to opt out of forward compatibility for this service. Use of this interface is not recommended, as added methods to MonitoringServiceServer will result in compilation errors.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL