Pipe - Event Grid and Message Router
This project allows you to listen to a variety of message sources and perform an action when a message is received. The documentation and the project is currently work in progress however curretly supported providers are:
- Nats.io - read and write to nats streaming
- HTTP - receive and send events over http
The project is built around a provider model where plugable elements can be added to the server to allow support for a variety of message sources.
Planned providers:
- Log files - read and write to log files
- SQS - AWS Simple Message Queue
- PubSub - Google pub sub
- Kafka
- And more.
Configuration
To configure pipes HCL configuration file is used...
# Input block, will listen for nats messages on defined queue
input "nats_queue" "nq_in" {
server = "nats://nats.service.consul:4222"
cluster_id = "test-cluster"
queue = "testmessagequeue"
}
# Output block, defines a http output
output "http" "nq_out" {
protocol = "http"
server = "localhost"
port = 8080
path = "/message"
}
pipe "accept_nats" {
# Name of the input block
input = "nq_in"
# Do not handle messages older than
expiration = "1h"
# Action to perform when a new message is received
action {
# Name of the output
output = "nq_out"
# Transform the initial message
template = <<EOF
{
"text": "Hey a picture from selfi drone",
"image": "{{ .JSON.Data }}"
}
EOF
}
# Called when action succeedes
on_success {
output = "success"
}
on_success {
output = "success"
}
# Called when the action fails
on_fail {
output = "fail"
}
}
Template values
.Raw
Return raw binary data as an array of bytes from the message
.JSON
If the message type is application/json return an object which allows access to elements
i.e.
Given:
{
"Pets": [
{"name": "fido"}
]
}
Then:
{{ .JSON.Pets[0].name }} // fido
Note .JSON does not convert the output to JSON format, writing the direct output of .JSON.Pets would produce a go formatted
object. To output json see the template function tojson
.
Template functions
base64encode
Base64 encode []byte
input_template: |
{
"image": "{{ base64encode .Raw }}"
}
base64decode
Base64 decode a string
input_template: |
{{ base64decode .JSON.Image }}
tojson
Convert to valid json
input_template: |
{{ tojson .JSON.Pets }}
Metrics
Metrics are exported using StatsD to import metrics into Prometheus please use the prometheus StatsD exporter https://hub.docker.com/r/prom/statsd-exporter/
Running the queue
To run the listener you can use the build docker container and provide a configuration file as a volume mount.
docker run -it \
-v $(shell pwd)/examples:/etc/config \
quay.io/nicholasjackson/faas-nats:latest \
-config /etc/config/examples
Testing
There is a simple test harness in ./testharness/main.go which can be used to validate the subscription and transformations.
TODO
[x] Implement monitoring and metrics with StatsD
[ ] Finish documentation
[ ] Write more examples
[ ] Finish basic provider implementation