loafer_go

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2023 License: MIT Imports: 13 Imported by: 0

README

loafer-go

Lib for GO with async pooling of AWS/SQS messages

Usage
import (
  "context"
  "fmt"
  
  loafer_go "github.com/justcodes/loafer-go"
)

func main() {
  c := loafer_go.Config{
    // The hostname is used only when using localstack/goaws
    Hostname:   "http://localhost:4100",
    Key:        "aws-key",
    Secret:     "aws-secret",
    Region:     "us-east-1",
    // the number of workers will be divided between the routes
    // in this example, each route will have 15 workers
    // you should use with care, because using too many workers 
    // can cause problems consuming too much resources
    WorkerPool: 30, 
  }

  // initialize the manager
  manager := loafer_go.NewManager(c)

  // there are two ways to register routes
  // this way you can register various routes in a single call to 
  // manager.RegisterRoutes
  var routes = []*loafer_go.Route{
    loafer_go.NewRoute("queuename-1", handler1, 10, 30, 10),
    loafer_go.NewRoute("queuename-2", handler2, 10, 30, 10),
  }

  manager.RegisterRoutes(routes)

  // or you can register various routes calling the manager.RegisterRoute
  // method multiple times
  manager.RegisterRoute(loafer_go.NewRoute("queuename-1", handler1, 10, 30, 10))
  manager.RegisterRoute(loafer_go.NewRoute("queuename-2", handler1, 10, 30, 10))

  // start the manager it will run until you stop it with Ctrl + C
  err := manager.Run()

  if err != nil {
    panic(err)
  }
}

func handler1(ctx context.Context, m loafer_go.Message) error {
  fmt.Printf("Message received handler1: %+v\n ", m)
  // you can return errors, if you return an error the message will be returned to the queue
  return nil
}

func handler2(ctx context.Context, m loafer_go.Message) error {
  fmt.Printf("Message received handler2: %+v\n ", m)
  return nil
}
TODO
  • Add tests
  • Add support for sending messages to SQS
  • Add support for sending messages to SNS
Acknowledgments

This lib is inspired by loafer and gosqs.

I used goaws for testing.

Documentation

Index

Constants

View Source
const DataTypeNumber = dataType("Number")

DataTypeNumber represents the Number datatype, use it when creating custom attributes

View Source
const DataTypeString = dataType("String")

DataTypeString represents the String datatype, use it when creating custom attributes

Variables

View Source
var ErrBodyOverflow = newSQSErr("message surpasses sqs limit of 262144, please truncate body")

ErrBodyOverflow AWS SQS can only hold payloads of 262144 bytes. Messages must either be routed to s3 or truncated

View Source
var ErrGetMessage = newSQSErr("unable to retrieve message")

ErrGetMessage fires when a request to retrieve messages from sqs fails

View Source
var ErrInvalidCreds = newSQSErr("invalid aws credentials")

ErrInvalidCreds invalid credentials

View Source
var ErrInvalidVal = newSQSErr("value type does not match specified datatype")

ErrInvalidVal the custom attribute value must match the type of the custom attribute Datatype

View Source
var ErrMarshal = newSQSErr("unable to marshal request")

ErrMarshal unable to marshal request

View Source
var ErrMessageProcessing = newSQSErr("processing time exceeding limit")

ErrMessageProcessing occurs when a message has exceeded the consumption time limit set by aws SQS

View Source
var ErrNoRoute = newSQSErr("message received without a route")

ErrNoRoute message received without a route

View Source
var ErrPublish = newSQSErr("message publish failure. Retrying...")

ErrPublish If there is an error publishing a message. gosqs will wait 10 seconds and try again up to the configured retry count

View Source
var ErrQueueURL = newSQSErr("undefined queueURL")

ErrQueueURL undefined queueURL

View Source
var ErrUnableToDelete = newSQSErr("unable to delete item in queue")

ErrUnableToDelete unable to delete item

View Source
var ErrUnableToExtend = newSQSErr("unable to extend message processing time")

ErrUnableToExtend unable to extend message processing time

View Source
var ErrUndefinedPublisher = newSQSErr("sqs publisher is undefined")

ErrUndefinedPublisher invalid credentials

Functions

func NewManager

func NewManager(config Config) *manager

NewManager creates a new manager with the given configuration

Types

type Config

type Config struct {
	// private key to access aws
	Key string
	// secret to access aws
	Secret string
	// region for aws and used for determining the topic ARN
	Region string
	// provided automatically by aws, but must be set for emulators or local testing
	Hostname string
	// account ID of the aws account, used for determining the topic ARN
	AWSAccountID string
	// environment name, used for determinig the topic ARN
	Env string
	// prefix of the topic, this is set as a prefix to the environment
	TopicPrefix string
	// optional address of the topic, if this is not provided it will be created using other variables
	TopicARN string
	// optional address of queue, if this is not provided it will be retrieved during setup
	QueueURL string
	// used to determine how many attempts exponential backoff should use before logging an error
	RetryCount int
	// defines the total amount of goroutines that can be run by the consumer
	WorkerPool int
	// defines the total number of processing extensions that occur. Each proccessing extension will double the
	// visibilitytimeout counter, ensuring the handler has more time to process the message. Default is 2 extensions (1m30s processing time)
	// set to 0 to turn off extension processing
	ExtensionLimit *int

	// Add custom attributes to the message. This might be a correlationId or client meta information
	// custom attributes will be viewable on the sqs dashboard as meta data
	Attributes []customAttribute

	// Add a custom logger, the default will be log.Println
	Logger Logger
}

Config defines the gosqs configuration

func (*Config) NewCustomAttribute

func (c *Config) NewCustomAttribute(dataType dataType, title string, value interface{}) error

NewCustomAttribute adds a custom attribute to SNS and SQS messages. This can include correlationIds, logIds, or any additional information you would like separate from the payload body. These attributes can be easily seen from the SQS console.

must use gosqs.DataTypeNumber of gosqs.DataTypeString for the datatype, the value must match the type provided

type Handler

type Handler func(context.Context, Message) error

type Logger

type Logger interface {
	Println(v ...interface{})
}

Logger provides a simple interface to implement your own logging platform or use the default

type Message

type Message interface {
	// Route returns the event name that is used for routing within a worker, e.g. post_published
	// Decode will unmarshal the message into a supplied output using json
	Decode(out interface{}) error
	// Attribute will return the custom attribute that was sent through out the request.
	Attribute(key string) string
	// Metadata will return the metadata that was sent through out the request.
	Metadata() map[string]*string
}

type Route

type Route struct {
	ExtensionLimit int
	// contains filtered or unexported fields
}

func NewRoute

func NewRoute(queueName string, handler Handler, maxMessages int64, visibilityTimeout int, waitTimeSeconds int) *Route

func (*Route) Logger

func (r *Route) Logger() Logger

type SQSError

type SQSError struct {
	Err string `json:"err"`
	// contains filtered or unexported fields
}

SQSError defines the error handler for the gosqs package. SQSError satisfies the error interface and can be used safely with other error handlers

func (*SQSError) Context

func (e *SQSError) Context(err error) *SQSError

Context is used for creating a new instance of the error with the contextual error attached

func (*SQSError) Error

func (e *SQSError) Error() string

Error is used for implementing the error interface, and for creating a proper error string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL