go-queues

command module
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2021 License: MIT Imports: 7 Imported by: 0

README

go-queues

A generic producer-consumer service with pluggable queues written in Go

It is designed to be inherently scalable, apply concurrent processing using goroutines and has pluggable queue sources such as SQS, Kafka, etc.

Quick Start

package main

import (
	"log"
	"strconv"
	"time"

	"github.com/social9/go-queues/streams/sqs"

	"github.com/aws/aws-sdk-go/aws"
	awsSqs "github.com/aws/aws-sdk-go/service/sqs"
)

func main() {
	// Instantiate the queue with service connection
	queue, _ := sqs.NewSQS(sqs.Config{
		// aws config
		AWSRegion:  "us-east-2",
		MaxRetries: 10,

		// aws creds - if provided, auto added to env. Or you can add manually as well
		AWSKey:    "<AWS Access Key>",
		AWSSecret: "<AWS Secret>",

		// sqs config
		URL:               "https://sqs.us-east-2.amazonaws.com/..../MyQueue.fifo",
		BatchSize:         10,  // fetch 10 messages per batch
		VisibilityTimeout: 120, // hide for 2 minutes from other consumers 
		WaitSeconds:       5,   // poll for 5 seconds per batch

		// misc config
		RunInterval: 20,    // poll every 20 seconds
		RunOnce:     false, // if set to true, polled only once
		MaxHandlers: 10,    // maximum number of messages to process at a time
		BusyTimeout: 30,    // wait for 30 seconds before rechecking if handlers are freed (when max handlers reached)
	})

	// Simlulate sending the messages in batch
	queue.Enqueue(getMessagesToEnque())

	// simulate processing a request for 2 seconds
	queue.RegisterPollHandler(func(msg *awsSqs.Message) {
		log.Println("Wait 2 seconds for:", *msg.MessageId)
		wait := time.Duration(2) * time.Second
		<-time.After(wait)

		log.Println("Processing:", *msg.MessageId, *msg.Body)

		// Simulate processing time as 10 seconds
		time.Sleep(10 * time.Second)
		log.Println("Finished:", *msg.MessageId)

		queue.Delete(msg)
	})

	// Poll from the SQS queue
	queue.Poll()

}

func getMessagesToEnque() []*awsSqs.SendMessageBatchRequestEntry {
	msgs := []string{"Test message 1-1", "Test Message 2-1", "Test Message 3-1"}

	var msgBatch []*awsSqs.SendMessageBatchRequestEntry
	for i := 0; i < len(msgs); i++ {
		message := &awsSqs.SendMessageBatchRequestEntry{
			Id:                     aws.String(`test_` + strconv.Itoa(i)),
			MessageBody:            aws.String(msgs[i]),
			MessageDeduplicationId: aws.String(`dedup_` + strconv.Itoa(i)),
			MessageGroupId:         aws.String("test_group"),
		}
		msgBatch = append(msgBatch, message)
	}

	return msgBatch
}

Setup

  • Clone the repo git clone https://github.com/social9/go-queues
  • Create a development branch git checkout -b new_development_branch origin/dev

Contribution Guidelines

  • Fork this repo to your GitHub account
  • You can either create an issue or pick from the existing and seek maintainers' attention before development
  • Your Pull Request branch must be rebased with the dev branch i.e. have a linear history
  • One or more maintainers will review your PR once associated to an issue.

Do append the issue ID in the pull request title e.g. Implemented a functionality closes #20 where 20 is the issue number

License

MIT

Documentation

The Go Gopher

There is no documentation for this package.

Directories

Path Synopsis
lib
streams
sqs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL