kafka

package module
v0.0.0-...-89bbdfb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 12, 2022 License: Apache-2.0, BSD-3-Clause, MIT Imports: 16 Imported by: 0

README

Kafka Trigger

This trigger subscribes to a topic on Kafka cluster and listens for the messages.

Flogo CLI
flogo install github.com/project-flogo/contrib/trigger/kafka

Configuration

Setting :
Name Type Description
brokerUrls string The brokers of the Kafka cluster to connect to - REQUIRED
user string If connecting to a SASL enabled port, the userid to use for authentication
password string If connecting to a SASL enabled port, the password to use for authentication
trustStore string If connecting to a TLS secured port, the directory containing the certificates representing the trust chain for the connection. This is usually just the CACert used to sign the server's certificate
HandlerSettings:
Name Type Description
topic string The Kafka topic on which to listen for messages
partitions string The specific partitions to consume messages from
offset int64 The offset to use when starting to consume messages
Output:
Name Type Description
message string The message that was consumed

Examples

{
  "triggers": [
    {
      "id": "flogo-kafka",
      "ref": "github.com/project-flogo/contrib/trigger/kafka",
      "settings": {
        "brokerUrls" : "localhost:9092",
        "trustStore" : "" 
      },
      "handlers": [
        {
          "settings": {
            "topic": "syslog",
          },
          "action": {
            "ref": "github.com/project-flogo/flow",
            "settings": {
              "flowURI": "res://flow:my_flow"
            }
          }
        }
      ]
    }
  ]
}

Development

Testing

To run tests first set up the kafka broker using the docker-compose file given below:

version: '2'
  
services:

  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    expose:
    - "2181"

  kafka:
    image: wurstmeister/kafka:2.11-2.0.0
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

Then run the following command:

go test 

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Factory

type Factory struct {
}

Factory is a kafka trigger factory

func (*Factory) Metadata

func (*Factory) Metadata() *trigger.Metadata

Metadata implements trigger.Factory.Metadata

func (*Factory) New

func (*Factory) New(config *trigger.Config) (trigger.Trigger, error)

New implements trigger.Factory.New

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler is a kafka topic handler

func NewKafkaHandler

func NewKafkaHandler(logger log.Logger, handler trigger.Handler, consumer sarama.Consumer) (*Handler, error)

NewKafkaHandler creates a new kafka handler to handle a topic

func (*Handler) Start

func (h *Handler) Start() error

Start starts the handler

func (*Handler) Stop

func (h *Handler) Stop() error

Stop stops the handler

type HandlerSettings

type HandlerSettings struct {
	Topic        string `md:"topic,required"` // The Kafka topic on which to listen for messageS
	Partitions   string `md:"partitions"`     // The specific partitions to consume messages from
	Offset       int64  `md:"offset"`         // The offset to use when starting to consume messages, default is set to Newest
	Deserializer string `md:"deserializer"`   // Content deserialization
}

type KafkaConnection

type KafkaConnection struct {
	// contains filtered or unexported fields
}

func (*KafkaConnection) Connection

func (c *KafkaConnection) Connection() sarama.Consumer

func (*KafkaConnection) Stop

func (c *KafkaConnection) Stop() error

type Output

type Output struct {
	Content interface{} `md:"content"` // The content of the json message recieved
	Message string      `md:"message"` // The message that was consumed
}

func (*Output) FromMap

func (o *Output) FromMap(values map[string]interface{}) error

func (*Output) ToMap

func (o *Output) ToMap() map[string]interface{}

type Settings

type Settings struct {
	BrokerUrls string `md:"brokerUrls,required"` // The Kafka cluster to connect to
	User       string `md:"user"`                // If connecting to a SASL enabled port, the user id to use for authentication
	Password   string `md:"password"`            // If connecting to a SASL enabled port, the password to use for authentication
	TrustStore string `md:"trustStore"`          // If connecting to a TLS secured port, the directory containing the certificates representing the trust chain for the connection. This is usually just the CACert used to sign the server's certificate
}

type Trigger

type Trigger struct {
	// contains filtered or unexported fields
}

Trigger is a kafka trigger

func (*Trigger) Initialize

func (t *Trigger) Initialize(ctx trigger.InitContext) error

Initialize initializes the trigger

func (*Trigger) Start

func (t *Trigger) Start() error

Start starts the kafka trigger

func (*Trigger) Stop

func (t *Trigger) Stop() error

Stop implements ext.Trigger.Stop

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL