gonyexpress

package module
v1.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2021 License: MIT Imports: 7 Imported by: 0

README

Gony Express

The gonyexpress is a Proof of Concept package to quickly create golang AMQP enabled Components. This package implements the "Routing Slip Pattern", and does all the heavy lifting, from:

  • Receiving a Message
  • Unpacking a Message
  • <your magic here>
  • Re-packaging the Message
  • Advancing a Message / or Retrying a failed Message
  • Acknowledging a Message has been handled

All a Gony Express Component needs is to do it's own bit of magic.

Example

package main

import (
	ge "github.com/SebastiaanPasterkamp/gonyexpress"
	. "github.com/SebastiaanPasterkamp/gonyexpress/payload"
)

func main() {
	c := ge.NewConsumer(
        "amqp://guest:guest@127.0.0.1:5672/",
        "example", // incoming queue
        4,         // worker count
        magic,     // your magic
    )

    forever := make(chan bool)
    c.Run()
	defer c.Shutdown()
	<-forever
}

func magic(
	traceID string, md MetaData, args Arguments, docs Documents,
) (*Documents, *MetaData, error) {
    // Work with the incoming general metadata, your step specific arguments,
    // and the attached documents.

    // Attach more documents and/or inject more metadata to the next message
	out := Documents{
		"example": NewDocument("your result here", "text/plain", ""),
	}

    // Return
	return &out, nil, nil
}

Future work

The Gony Express will work on extra features, such as opentracing support, diagnostics, improved logging, configuration, and more.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Component

type Component struct {
	// Broker is a utility wrapper around a RabbitMQ connection.
	Broker broker.Broker
	// contains filtered or unexported fields
}

Component is a RabbitMQ consumer / producer to do the heavy lifting for routing

func NewConsumer

func NewConsumer(URI, qname string, workers int, operator Operator) Component

NewConsumer creates a Consumer Component instance ready to connect to the rabbitmq + queue and execute the operator function for every recieved message.

func NewProducer

func NewProducer(URI, qname string) Component

NewProducer creates a Component instance ready to Connect to the rabbitmq, but it does not launch any workers and doesn't automatically consume incoming messages. The queue can be empty if the Producer is not interested in the result or progress updates, otherwise it is recommended to set the queue to: 'amq.rabbitmq.reply-to' (AMQPReplyTo) for a Direct Reply-To.

func (*Component) Close

func (c *Component) Close()

Close terminates the RabbitMQ channel and connection. Should be used when running a Producer, after Connect is called. Automatically called after Shutdown for a running Consumer.

func (*Component) Connect

func (c *Component) Connect() (<-chan amqp.Delivery, error)

Connect opens up a RabbitMQ connection and returns a channel through which Messages are delivered.

func (*Component) IsShuttingDown

func (c *Component) IsShuttingDown() <-chan bool

IsShuttingDown returns a channel to be closed when the Consumer is shutting down.

func (*Component) Run

func (c *Component) Run() error

Run launches the Component as a background service.

func (*Component) SendMessage

func (c *Component) SendMessage(msg payload.Message) error

SendMessage sends a message onto the message's current Slip queue

func (*Component) Shutdown

func (c *Component) Shutdown()

Shutdown will notify all workers to stop, and wait for all to finish.

type Operator

type Operator func(
	traceID string, md pl.MetaData, args pl.Arguments, docs pl.Documents,
) (*pl.Documents, *pl.MetaData, error)

Operator is a function to be executed for every message received by the Consumer. The returned payload is advanced to the next step on the routing slip.

Directories

Path Synopsis
bin

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL