fluent

package module
v0.1.36 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2020 License: MIT Imports: 22 Imported by: 0

README

Fluent AMQP

FA

Docker Automated build Telegram Sender Snap Status donate Download

Library that provides fluent and easy wrapper over streadway-amqp API. Adds such features like:

  • Reconnectiong. Will restore all defined infrastructure
  • Non-blocking processing of messages
  • Optional auto-requeue (with delay)
  • Signing and verifiying messages by public/private pair

API documentation

Signing and verification

Hash algorithm (x509) SHA512 with RSA, sign - SHA512.

The signer (producer) should use private key to sign content of message body and message id.

The validator (consumer) should use public certificate to validate content of message and message id against signature and should drops invalid or duplicated messages.

The sign should be put to a header (X-Signature by default: see DefaultSignatureHeader constant in godoc) as binary object (not hex or base64 encoded).

DATA = BYTES(ID) ... BYTES(BODY)
# SIGN via PKCS#1 v1.5
SIGN_HEADER_BODY = SIGN_SHA512(PRIVATE_KEY, DATA)

States

image

Default message

Message by default has:

  • Delivery type - persistent
  • Time - current time in UTC

Templates

amqp-recv supports templating output -o template. Template content read from STDIN.

Root template object is a amqp.Delivery with functions from Sprig plus additional methods like:

  • asText - converts bytes to string
  1. Basic example:

Print same as -o plain.

echo '{{- .Body | asText -}}' | amqp-recv -o template ...
  1. Notification to telegram

Use combination of basic CLI utils and templates.

TOKEN="xxxyyyy"        # BotFather token for Telegram (see here: https://t.me/BotFather)
CHAT_ID="1234567"      # Target Telegram chat ID (find yours: https://t.me/MyTelegramID_bot)
QUEUE="notification"   # queue name should be defined if persistent required
EXCHANGE="amqp.topic"  # source of notification
TYPE="topic"           # exchange type
TOPIC="#"              # specify subject that will be sent over telegram (# - everything)

while true; do
  echo -n -e 'Subject: {{.RoutingKey}}\n\n{{.Body | asText}}' | amqp-recv -o template -Q $QUEUE -e $EXCHANGE -k $TYPE "$TOPIC" > message.txt
  curl -f -X POST --data "text=$(cat message.txt)" --data "chat_id=${CHAT_ID}" "https://api.telegram.org/bot${TOKEN}/sendMessage" || exit 1
done

Command line utilities

  • amqp-exec - CGI like daemon to listen message and run executable (and send reply)
  • amqp-recv - Receive message from AMQP (like cat command)
  • amqp-send - Send message to AMQP (like wall command)
Installation

Get it from the Snap Store

  • snapcraft: fluent-amqp

    after installation commands are available with fluent-amqp. prefixes (i.e. fluent-amqp.amqp-exec)

  • pre-build binaries for all major platform

  • From bintray repository for most debian-based distribution (trusty, xenial, bionic, buster, wheezy):

sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys 379CE192D401AB61
echo "deb https://dl.bintray.com/reddec/debian {distribution} main" | sudo tee -a /etc/apt/sources.list
sudo apt install amqp-send amqp-exec amqp-recv

Documentation

Index

Constants

View Source
const DefaultSignatureHeader = "X-Signature"
View Source
const LastErrorHeader = "x-fluent-amqp-last-message-error"

Variables

This section is empty.

Functions

func BoolHeader added in v0.0.11

func BoolHeader(msg *amqp.Delivery, param string, def bool) bool

func FloatHeader added in v0.0.11

func FloatHeader(msg *amqp.Delivery, param string, def float64) float64

func IntHeader added in v0.0.11

func IntHeader(msg *amqp.Delivery, param string, def int64) int64

func SignalContext

func SignalContext(parent context.Context) context.Context

func StringHeader added in v0.0.11

func StringHeader(msg *amqp.Delivery, param string, def string) string

Types

type BrokerConfig

type BrokerConfig struct {
	// contains filtered or unexported fields
}

func Broker

func Broker(urls ...string) *BrokerConfig

func (*BrokerConfig) Context

func (bc *BrokerConfig) Context(ctx context.Context) *BrokerConfig

func (*BrokerConfig) Interval

func (bc *BrokerConfig) Interval(tm time.Duration) *BrokerConfig

func (*BrokerConfig) Logger

func (bc *BrokerConfig) Logger(logger Logger) *BrokerConfig

func (*BrokerConfig) OnExpired added in v0.0.11

func (bc *BrokerConfig) OnExpired(handler DefaultSinkExpiredHandler) *BrokerConfig

Default handler for Expired handler in a sink

func (*BrokerConfig) OnTooMuchRetries added in v0.0.11

func (bc *BrokerConfig) OnTooMuchRetries(threshold int64, handler DefaultSinkExpiredHandler) *BrokerConfig

Default handler for TooMuchRetries handler in a sink

func (*BrokerConfig) Retries added in v0.0.11

func (bc *BrokerConfig) Retries(num int) *BrokerConfig

Default (can be changed in a Sink) maximum retries for sink with TransactHandlers. Negative value means no limit. Default is 10.

func (*BrokerConfig) Start

func (bc *BrokerConfig) Start() *Server

func (*BrokerConfig) StdLogger added in v0.0.2

func (bc *BrokerConfig) StdLogger(prefix string) *BrokerConfig

func (*BrokerConfig) Timeout

func (bc *BrokerConfig) Timeout(tm time.Duration) *BrokerConfig

func (*BrokerConfig) Verbose added in v0.1.30

func (bc *BrokerConfig) Verbose(verbose bool) *BrokerConfig

type DefaultSinkExpiredHandler added in v0.0.11

type DefaultSinkExpiredHandler func(ctx context.Context, sinkName string, msg amqp.Delivery, retries int64) bool

Handler for sinks to catch expired messages (see SinkConfig OnExpired and OnTooMuchRetries)

type Exchange

type Exchange struct {
	// contains filtered or unexported fields
}

func (*Exchange) Attr

func (exc *Exchange) Attr(name string, value interface{}) *Exchange

func (*Exchange) Direct

func (exc *Exchange) Direct(name string) *Exchange

func (*Exchange) Fanout

func (exc *Exchange) Fanout(name string) *Exchange

func (*Exchange) Handler added in v0.0.10

func (exc *Exchange) Handler(obj SimpleHandler) *Server

func (*Exchange) HandlerFunc

func (exc *Exchange) HandlerFunc(fn SinkHandlerFunc) *Server

func (*Exchange) Key

func (exc *Exchange) Key(routingKeys ...string) *Exchange

func (*Exchange) Topic

func (exc *Exchange) Topic(name string) *Exchange

func (*Exchange) Transact

func (exc *Exchange) Transact(fn TransactionHandler) *Server

func (*Exchange) TransactFunc

func (exc *Exchange) TransactFunc(fn TransactionHandlerFunc) *Server

type Logger

type Logger interface {
	Println(items ...interface{})
}

type Message

type Message struct {
	// contains filtered or unexported fields
}

func (*Message) Bytes

func (msg *Message) Bytes(content []byte) *Message

func (*Message) ContentType added in v0.0.11

func (msg *Message) ContentType(mime string) *Message

func (*Message) Exchange

func (msg *Message) Exchange(name string) *Message

func (*Message) Header

func (msg *Message) Header(name string, data interface{}) *Message

func (*Message) ID

func (msg *Message) ID(id string) *Message

func (*Message) JSON

func (msg *Message) JSON(obj interface{}) *Message

func (*Message) JSONContent added in v0.0.11

func (msg *Message) JSONContent(data []byte) *Message

func (*Message) Key

func (msg *Message) Key(name string) *Message

func (*Message) Publish added in v0.0.10

func (ms *Message) Publish(ctx context.Context) (<-chan struct{}, error)

func (*Message) PublishWait added in v0.0.10

func (ms *Message) PublishWait(ctx context.Context) error

func (*Message) Raw added in v0.0.11

func (msg *Message) Raw() *amqp.Publishing

func (*Message) Reply

func (msg *Message) Reply(correlationId, queueName string) *Message

func (*Message) ReplyTo added in v0.0.4

func (msg *Message) ReplyTo(correlationId, queueName string) *Message

func (*Message) Send

func (ms *Message) Send() <-chan struct{}

func (*Message) SendContext added in v0.0.9

func (ms *Message) SendContext(ctx context.Context) error

func (*Message) String

func (msg *Message) String(content string) *Message

func (*Message) TTL

func (msg *Message) TTL(tm time.Duration) *Message

func (*Message) Time added in v0.0.11

func (msg *Message) Time(stamp time.Time) *Message

func (*Message) TrySend

func (ms *Message) TrySend() error

func (*Message) Type

func (msg *Message) Type(contentType string) *Message

type RPC added in v0.0.11

type RPC struct {
	// contains filtered or unexported fields
}

func BuildRPC added in v0.0.11

func BuildRPC(broker *Server, consumerQueue string, targetExchange, targetRoutingKey string) *RPC

func (*RPC) Bytes added in v0.1.14

func (rpc *RPC) Bytes(data []byte) *Reply

func (*RPC) Content added in v0.1.14

func (rpc *RPC) Content(data []byte, contentType string) *Reply

func (*RPC) JSON added in v0.1.14

func (rpc *RPC) JSON(obj interface{}) *Reply

func (*RPC) Raw added in v0.1.14

func (rpc *RPC) Raw(msg *amqp.Publishing) *Reply

type ReQueueConfig

type ReQueueConfig struct {
	// contains filtered or unexported fields
}

func (*ReQueueConfig) Create

func (rq *ReQueueConfig) Create() Requeue

func (*ReQueueConfig) Queue

func (rq *ReQueueConfig) Queue(name string) *ReQueueConfig

func (*ReQueueConfig) Timeout

func (rq *ReQueueConfig) Timeout(tm time.Duration) *ReQueueConfig

type ReadHandlerFunc added in v0.1.34

type ReadHandlerFunc func() error

type ReceiverHandler

type ReceiverHandler interface {
	Handle(msg *amqp.Delivery) bool
}

func NewCertValidator

func NewCertValidator(cert []byte, header string, log Logger) (ReceiverHandler, error)

Creates new handler that validates messages against signature header. Important! application MUST drop duplicated (by message id) messages by it self or it's possible just to resend same messages multiple times.

func NewCertValidatorFromFile

func NewCertValidatorFromFile(certFile string, header string, log Logger) (ReceiverHandler, error)

Creates new handler (see NewCertValidator) with key from public ASN.1 DER certificate. Certificate should contain --CERTIFICATE-- section

type Reply added in v0.0.11

type Reply struct {
	// contains filtered or unexported fields
}

func (*Reply) Bytes added in v0.1.13

func (rp *Reply) Bytes() ([]byte, error)

func (*Reply) Data added in v0.0.11

func (rp *Reply) Data() chan<- *amqp.Delivery

func (*Reply) ID added in v0.0.11

func (rp *Reply) ID() string

func (*Reply) JSON added in v0.1.13

func (rp *Reply) JSON(target interface{}) error

type Requeue

type Requeue interface {
	Requeue(original *amqp.Delivery) error
	RequeueWithError(original *amqp.Delivery, err error) error
}

type SenderHandler

type SenderHandler interface {
	Handle(msg *amqp.Publishing) bool
}

func NewSigner

func NewSigner(privateKey []byte, header string) (SenderHandler, error)

Create new PKS#1 1.5 SHA512 signer handler

func NewSignerFromFile

func NewSignerFromFile(privateKeyFile string, header string) (SenderHandler, error)

Load private key from PKCS#8 file and create new PKS#1 1.5 SHA512 signer handler. File should contains --PRIVATE KEY-- section

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server keeps broker configuration and all declared objects (queues, exchanges and else) for re-declare after restart

func (*Server) Publisher

func (brk *Server) Publisher() *WriterConfig

Publisher creates new AMQP producer

func (*Server) Requeue

func (brk *Server) Requeue(originalQueue string) *ReQueueConfig

Requeue creates new queue for requeue-ing. New name constructed by default as [originalQueue]/requeue. For example: original queue is SAMPLE, than requeue queue is SAMPLE/requeue

func (*Server) Sink

func (brk *Server) Sink(queueName string) *SinkConfig

Sink creates new AMQP consumer with optional queue name. If queue name is empty - autogenerated one will be used without persistence. Max retries by default is 10

func (*Server) WaitToFinish

func (brk *Server) WaitToFinish()

Wait to finish blocks thread un=til all allocated resources become freed

type SimpleHandler added in v0.0.10

type SimpleHandler interface {
	Handle(ctx context.Context, msg amqp.Delivery)
}

type SinkConfig

type SinkConfig struct {
	// contains filtered or unexported fields
}

func (*SinkConfig) Attr

func (snk *SinkConfig) Attr(name string, value interface{}) *SinkConfig

func (*SinkConfig) DeadLetter

func (snk *SinkConfig) DeadLetter(exchange, routingKey string) *SinkConfig

func (*SinkConfig) Direct

func (snk *SinkConfig) Direct(name string) *Exchange

func (*SinkConfig) Fanout

func (snk *SinkConfig) Fanout(name string) *Exchange

func (*SinkConfig) Handler added in v0.0.10

func (snk *SinkConfig) Handler(obj SimpleHandler) *Server

func (*SinkConfig) HandlerFunc added in v0.0.2

func (snk *SinkConfig) HandlerFunc(fn SinkHandlerFunc) *Server

func (*SinkConfig) KeepDead added in v0.1.25

func (snk *SinkConfig) KeepDead() *SinkConfig

func (*SinkConfig) Lazy

func (snk *SinkConfig) Lazy() *SinkConfig

func (*SinkConfig) ManualAck

func (snk *SinkConfig) ManualAck() *SinkConfig

func (*SinkConfig) Name added in v0.0.11

func (snk *SinkConfig) Name(name string) *SinkConfig

Name of sink. Just a tag fo future identity (like in default handlers)

func (*SinkConfig) OnExpired added in v0.0.11

func (snk *SinkConfig) OnExpired(handler SinkExpiredHandlerFunc) *SinkConfig

Handler for expired (reached retries limit) messages

func (*SinkConfig) OnTooMuchRetries added in v0.0.11

func (snk *SinkConfig) OnTooMuchRetries(threshold int64, handler SinkExpiredHandlerFunc) *SinkConfig

Handler for messages that reached defined threshold retires limit. Threshold can be less (in case of positive retries limit) or equal to retries limit and will executes before OnExpired (if applicable). The handler will be invoked each times after threshold limit. If handler returns false, message is dropped. In case when threshold is same as retries, requires at least one (from OnExpired or from OnTooMuschRetries) false to drop message.

func (*SinkConfig) Ready added in v0.1.34

func (snk *SinkConfig) Ready(fn ReadHandlerFunc) *SinkConfig

Add handler to catch when channel is ready

func (*SinkConfig) Requeue added in v0.0.5

func (snk *SinkConfig) Requeue(interval time.Duration) *SinkConfig

func (*SinkConfig) Retries

func (snk *SinkConfig) Retries(count int) *SinkConfig

func (*SinkConfig) Topic

func (snk *SinkConfig) Topic(name string) *Exchange

func (*SinkConfig) Transact added in v0.0.2

func (snk *SinkConfig) Transact(fn TransactionHandler) *Server

func (*SinkConfig) TransactFunc added in v0.0.2

func (snk *SinkConfig) TransactFunc(fn TransactionHandlerFunc) *Server

func (*SinkConfig) Use

func (snk *SinkConfig) Use(handler ReceiverHandler) *SinkConfig

func (*SinkConfig) Validate

func (snk *SinkConfig) Validate(certFile string) *SinkConfig

func (*SinkConfig) Verbose added in v0.1.30

func (snk *SinkConfig) Verbose(verbose bool) *SinkConfig

type SinkExpiredHandlerFunc added in v0.0.11

type SinkExpiredHandlerFunc func(ctx context.Context, msg amqp.Delivery, retries int64) bool

Handler for messages that reached retries amount. False return means request to drop message

type SinkHandlerFunc

type SinkHandlerFunc func(ctx context.Context, msg amqp.Delivery)

type StateHandler

type StateHandler interface {
	ChannelReady(ctx context.Context, ch *amqp.Channel) error
}

type TransactionHandler

type TransactionHandler interface {
	Handle(ctx context.Context, msg amqp.Delivery) error
}

type TransactionHandlerFunc

type TransactionHandlerFunc func(ctx context.Context, msg amqp.Delivery) error

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

func (*Writer) Prepare

func (writer *Writer) Prepare() *Message

func (*Writer) Reply

func (writer *Writer) Reply(msg *amqp.Delivery) *Message

type WriterConfig

type WriterConfig struct {
	// contains filtered or unexported fields
}

func (*WriterConfig) Create

func (wc *WriterConfig) Create() *Writer

func (*WriterConfig) DefaultDirect

func (wc *WriterConfig) DefaultDirect(name string) *WriterConfig

func (*WriterConfig) DefaultFanout

func (wc *WriterConfig) DefaultFanout(name string) *WriterConfig

func (*WriterConfig) DefaultKey

func (wc *WriterConfig) DefaultKey(routingKey string) *WriterConfig

func (*WriterConfig) DefaultTopic

func (wc *WriterConfig) DefaultTopic(name string) *WriterConfig

func (*WriterConfig) Overflow added in v0.1.33

func (wc *WriterConfig) Overflow(maxQueueSize int, handler func(int)) *WriterConfig

func (*WriterConfig) Sign

func (wc *WriterConfig) Sign(privateFile string) *WriterConfig

Sign body and add signature to DefaultSignatureHeader header. Panic if private key couldn't be read

func (*WriterConfig) Use

func (wc *WriterConfig) Use(handler SenderHandler) *WriterConfig

Directories

Path Synopsis
cmd
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL