go-queues

command module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2020 License: MIT Imports: 6 Imported by: 0

README

go-queues (In Development)

A generic producer-consumer service with pluggable queues written in Go

It is designed to be inherently scalable, apply concurrent processing using goroutines and has pluggable queue sources such as SQS, Kafka, etc.

Setup

  • Clone the repo git clone https://github.com/cnp96/go-consumer
  • Configurable environment variables
    • AWS_ACCESS_KEY_ID : AWS Access Key, Required
    • AWS_SECRET_ACCESS_KEY : AWS Access Secret, Required
    • AWS_REGION : The AWS region to establish service connection, (default us-east-2)
    • SQS_URL : The SQS endpoint to poll, (default "")
    • SQS_BATCH_SIZE : The maximum number of messages to receive per request, (default 10), (accepted 1-10)
    • SQS_WAIT_TIME : The maximum polling wait time in seconds, (default 20 seconds), (accepted 0-20)
    • SQS_VISIBILITY_TIMEOUT : Visiblity timout for a message after received in seconds, (default 20 seconds)
    • RUN_ONCE : Run the service one-time or in intervals, (default true)
    • RUN_INTERVAL : Run the service in the defined interval, (default 10 seconds). Works only when RUN_ONCE is set to true

Note: You can also load the env values from a file named .env stored in the root path

Quick Start


package main

import (
	"log"
	"sync"
	"time"

	"github.com/cnp96/go-queues/config"
	"github.com/cnp96/go-queues/streams/sqs"

	awsSqs "github.com/aws/aws-sdk-go/service/sqs"
)

func main() {
	// Load from .env file
	env := config.Env()

	// Instantiate a SQS instance
	queue, _ := sqs.NewSQS(sqs.SQSConfig{
		Verbosity: 0,

		// aws config
		AWSRegion:  env.AWSRegion,
		MaxRetries: 10,

		// aws creds - Env uis updated when provided
		// Or you can set the following keys your self
		// AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY
		// Read more about the credential chain [here](https://docs.aws.amazon.com/sdk-for-go/api/aws/credentials/).
		AWSKey:    env.AWSKey,
		AWSSecret: env.AWSSecret,

		// sqs config
		URL:               env.SQSURL,
		BatchSize:         env.SQSBatchSize,
		VisibilityTimeout: 120,
		WaitSeconds:       5,

		// run config
		RunInterval: 20,
		RunOnce:     env.RunOnce,
	})

	// simulate processing a request for 2 seconds
	handler := func(wg *sync.WaitGroup, msg *awsSqs.Message) {
		log.Println("Waiting:", *msg.MessageId)
		wait := time.Duration(1) * time.Second
		<-time.After(wait)

		log.Println("Processing:", *msg.MessageId, *msg.Body)

		time.Sleep(2 * time.Second)
		log.Println("Finished:", *msg.MessageId)

		err := queue.Delete(msg)
		log.Println("Delete Error:", err)

		wg.Done()
	}

	// Poll from the SQS queue
	queue.Poll(handler)
}


Contribution Guidelines

To be added soon

License

MIT

Documentation

The Go Gopher

There is no documentation for this package.

Directories

Path Synopsis
lib
streams
sqs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL