kafka

package
v0.0.0-...-ec7ed92 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2024 License: BSD-3-Clause Imports: 12 Imported by: 0

README

Kafka Trigger

This trigger subscribes to a topic on Kafka cluster and listens for the messages.

Flogo CLI
flogo install github.com/project-flogo/messaging-contrib/kafka/trigger

Configuration

Setting :
Name Type Description
connection any The connection object which is use to connect to Kafka - REQUIRED Connection
HandlerSettings:
Name Type Description
topic string The Kafka topic on which to listen for messages
partitions string The specific partitions to consume messages from
offset int64 The offset to use when starting to consume messages
Output:
Name Type Description
message string The message that was consumed

Examples

{
  "triggers": [
    {
      "id": "flogo-kafka",
      "ref": "github.com/project-flogo/contrib/trigger/kafka",
      "settings": {
        "brokerUrls" : "localhost:9092",
        "trustStore" : "" 
      },
      "handlers": [
        {
          "settings": {
            "topic": "syslog",
          },
          "action": {
            "ref": "github.com/project-flogo/flow",
            "settings": {
              "flowURI": "res://flow:my_flow"
            }
          }
        }
      ]
    }
  ]
}

Development

Testing

To run tests first set up the kafka broker using the docker-compose file given below:

version: '2'
  
services:

  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    expose:
    - "2181"

  kafka:
    image: wurstmeister/kafka:2.11-2.0.0
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

Then run the following command:

go test 

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Factory

type Factory struct {
}

Factory is a kafka trigger factory

func (*Factory) Metadata

func (*Factory) Metadata() *trigger.Metadata

Metadata implements trigger.Factory.Metadata

func (*Factory) New

func (*Factory) New(config *trigger.Config) (trigger.Trigger, error)

New implements trigger.Factory.New

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler is a kafka topic handler

func NewKafkaHandler

func NewKafkaHandler(logger log.Logger, handler trigger.Handler, consumer sarama.Consumer) (*Handler, error)

NewKafkaHandler creates a new kafka handler to handle a topic

func (*Handler) Start

func (h *Handler) Start() error

Start starts the handler

func (*Handler) Stop

func (h *Handler) Stop() error

Stop stops the handler

type HandlerSettings

type HandlerSettings struct {
	Topic      string `md:"topic,required"` // The Kafka topic on which to listen for messageS
	Partitions string `md:"partitions"`     // The specific partitions to consume messages from
	Offset     int64  `md:"offset"`         // The offset to use when starting to consume messages, default is set to Newest
}

type Output

type Output struct {
	Message string `md:"message"` // The message that was consumed
}

func (*Output) FromMap

func (o *Output) FromMap(values map[string]interface{}) error

func (*Output) ToMap

func (o *Output) ToMap() map[string]interface{}

type Settings

type Settings struct {
	Connection connection.Manager `md:"connection,required"`
}

type Trigger

type Trigger struct {
	// contains filtered or unexported fields
}

Trigger is a kafka trigger

func (*Trigger) Initialize

func (t *Trigger) Initialize(ctx trigger.InitContext) error

Initialize initializes the trigger

func (*Trigger) Start

func (t *Trigger) Start() error

Start starts the kafka trigger

func (*Trigger) Stop

func (t *Trigger) Stop() error

Stop implements ext.Trigger.Stop

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL