amqp

package
v0.0.0-...-fefe0ed Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 5, 2019 License: BSD-3-Clause Imports: 12 Imported by: 0

README


title: AMQP weight: 15709

amqp

This trigger provides your flogo application the ability to start a flow via AMQP

Installation

flogo install https://github.com/mtorre-iot/flogo-contrib/trigger/amqp

Schema

Settings, Outputs and Endpoint:

{
  "settings":[
    {
      "name": "requestHostName",
      "type": "string",
      "required": true
    },
    {
      "name": "requestPort",
      "type": "string",
      "required": true
    },
    {
      "name": "requestExchangeName",
      "type": "string",
      "required": true
    },
    {
      "name": "requestExchangeType",
      "type": "string",
      "required": true
    },
    {
      "name": "requestRoutingKey",
      "type": "string",
      "required": true
    },
    {
      "name": "requestUser",
      "type": "string",
      "required": true
    },    
    {
      "name": "requestPassword",
      "type": "string",
      "required": true
    },
    {
      "name": "requestDurable",
      "type": "string",
      "required": false
    },
    {
      "name": "requestAutoDelete",
      "type": "string",
      "required": false
    },
    {
      "name": "requestReliable",
      "type": "string",
      "required": false
    },
    {
      "name": "responseHostName",
      "type": "string",
      "required": false
    },
    {
      "name": "responsePort",
      "type": "string",
      "required": false
    },
    {
      "name": "responseExchangeName",
      "type": "string",
      "required": false
    },
    {
      "name": "responseExchangeType",
      "type": "string",
      "required": false
    },
    {
      "name": "responseRoutingKey",
      "type": "string",
      "required": false
    },
    {
      "name": "responseUser",
      "type": "string",
      "required": false
    },    
    {
      "name": "responsePassword",
      "type": "string",
      "required": false
    },
    {
      "name": "responseDurable",
      "type": "string",
      "required": false
    },
    {
      "name": "responseAutoDelete",
      "type": "string",
      "required": false
    },
    {
      "name": "responseReliable",
      "type": "string",
      "required": false
    }
  ],
  "output": [
    {
      "name": "message",
      "type": "string"
    }
  ],
  "reply": [
    {
      "name": "data",
      "type": "object"
    }
  ],
  "handler": {
    "settings": [
      {
        "name": "topic",
        "type": "string",
        "required": true
      }
    ]
  }
}

Example Configurations

Triggers are configured via the triggers.json of your application. The following are some example configuration of the AMQP Trigger.

Start a flow

Configure the Trigger to start "myflow". "settings" "topic" is the topic it uses to listen for incoming messages. So in this case the "endpoints" "settings" "topic" is "test_start" will start "myflow" flow. The incoming message payload has to define "replyTo" which is the the topic used to reply on.

{
  "triggers": [
    {
      "id": "receive_amqp_message",
      "ref": "github.com/mtorre-iot/flogo-contrib/trigger/amqp",
      "name": "Receive AMQP Message",
      "description": "Simple AMQP Trigger",
      "settings": {
        "requestHostName": "localhost",
        "requestPort": "5672",
        "requestExchangeName": "AMQPRequestExchange",
        "requestExchangeType": "topic",
        "requestRoutingKey": "#",
        "requestUser": "guest",
        "requestPassword": "guest",
        "responseHostName": "localhost",
        "responsePort": "5672",
        "responseExchangeName": "AMQPResponseExchange",
        "responseExchangeType": "topic",
        "responseRoutingKey": "#",
        "responseUser": "guest",
        "responsePassword": "guest"
      },
      "handlers": [
        {
          "action": {
            "ref": "github.com/TIBCOSoftware/flogo-contrib/action/flow",
            "data": {
              "flowURI": "res://flow:amqp_application_test"
            },
            "mappings": {
              "input": [
                {
                  "mapTo": "message",
                  "type": "assign",
                  "value": "$.message"
                }
              ]
            }
          },
          "settings": {
            "topic": "update"
          }
        }
      ]
    }
  ]
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewFactory

func NewFactory(md *trigger.Metadata) trigger.Factory

NewFactory create a new Trigger factory

Types

type AMQPExchange

type AMQPExchange struct {
	Uri          string
	HostName     string
	ExchangeName string
	ExchangeType string
	QueueName    string
	RoutingKey   string
	UserName     string
	Password     string
	Durable      bool
	AutoDelete   bool
	Reliable     bool
	Connection   *amqp.Connection
	Channel      *amqp.Channel
	Queue        *amqp.Queue
	Messages     []string
	Confirms     chan amqp.Confirmation
	IsOpen       bool
}

AMQPExchange contains all parameters required to create or open an exchenge

func AMQPExchangeNew

func AMQPExchangeNew(hostName string, port int, exchangeName string, exchangeType string, queueName string, routingKey string,
	userName string, password string, durable bool, autoDelete bool, reliable bool) *AMQPExchange

AMQPExchangeNew creates a new exchange in the Broker

func (*AMQPExchange) Close

func (exch *AMQPExchange) Close() error

Close closes the exchange

func (*AMQPExchange) Open

func (exch *AMQPExchange) Open(isQueued bool) error

Open opens a previosly created Exchange

func (*AMQPExchange) PrepareReceiveFunc

func (exch *AMQPExchange) PrepareReceiveFunc(f func(msgs <-chan amqp.Delivery)) error

PrepareReceiveFunc Prepares exchange/queue to receive messages

func (*AMQPExchange) Publish

func (exch *AMQPExchange) Publish(body string) error

Publish publishes a new message into an existing exchange

func (*AMQPExchange) PublishObject

func (exch *AMQPExchange) PublishObject(obj interface{}) error

PublishObject serializes objects (JSON) and publish to existing exchange

func (*AMQPExchange) ReadMessages

func (exch *AMQPExchange) ReadMessages() ([]string, error)

ReadMessages reads the messajes accumulated in the queue

type AMQPFactory

type AMQPFactory struct {
	// contains filtered or unexported fields
}

AMQPFactory AMQP Trigger factory

func (*AMQPFactory) New

func (t *AMQPFactory) New(config *trigger.Config) trigger.Trigger

New Creates a new trigger instance for a given id

type AmqpTrigger

type AmqpTrigger struct {
	// contains filtered or unexported fields
}

AmqpTrigger is simple AMQP trigger

func (*AmqpTrigger) Initialize

func (t *AmqpTrigger) Initialize(ctx trigger.InitContext) error

Initialize implements trigger.Initializable.Initialize

func (*AmqpTrigger) Metadata

func (t *AmqpTrigger) Metadata() *trigger.Metadata

Metadata implements trigger.Trigger.Metadata

func (*AmqpTrigger) RunHandler

func (t *AmqpTrigger) RunHandler(handler *trigger.Handler, payload string)

RunHandler runs the handler and associated action

func (*AmqpTrigger) Start

func (t *AmqpTrigger) Start() error

Start implements trigger.Trigger.Start

func (*AmqpTrigger) Stop

func (t *AmqpTrigger) Stop() error

Stop implements ext.Trigger.Stop

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL