kafka

package
v0.0.0-...-ec7ed92 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2024 License: BSD-3-Clause Imports: 7 Imported by: 0

README

Kafka Activity

This activity publishes messages on a topic in a Kafka cluster.

Flogo CLI
flogo install github.com/project-flogo/messaging-contrib/kafka/activity

Configuration

Settings:
Name Type Description
connection any The connection object which is use to connect to kafka - REQUIRED Connection
topic string The Kafka topic on which to place the message - REQUIRED
Input:
Name Type Description
message string The message to send
Output:
Name Type Description
partition int32 Documents the partition that the message was placed on
offSet int64 Documents the offset for the message

Examples

The below example sends Hello From Flogo to a Kafka Broker running on localhost:

{
  "id": "publish_kafka_message",
  "name": "Publish Message to Kafka",
  "activity": {
    "ref": "github.com/project-flogo/contrib/activity/kafka",
    "settings": {
      "brokerUrls" : "localhost:9092",
      "topic"      : "syslog"
    },
    "input": {
      "message"    : "Hello From Flogo"
    }
  }
}

Development

Testing

To run tests first set up the Kafka broker using the docker-compose file given below:

version: '2'
  
services:

  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    expose:
    - "2181"

  kafka:
    image: wurstmeister/kafka:2.11-2.0.0
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

Then run the following command:

go test 

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func New

New create a new kafka activity

Types

type Activity

type Activity struct {
	// contains filtered or unexported fields
}

Activity is a kafka activity

func (*Activity) Eval

func (act *Activity) Eval(ctx activity.Context) (done bool, err error)

Eval implements the evaluation of the kafka activity

func (*Activity) Metadata

func (*Activity) Metadata() *activity.Metadata

Metadata returns the metadata for the kafka activity

type Input

type Input struct {
	Message string `md:"message,required"` // The message to send
}

func (*Input) FromMap

func (i *Input) FromMap(values map[string]interface{}) error

func (*Input) ToMap

func (i *Input) ToMap() map[string]interface{}

type Output

type Output struct {
	Partition int32 `md:"partition"` // Documents the partition that the message was placed on
	OffSet    int64 `md:"offset"`    // Documents the offset for the message
}

func (*Output) FromMap

func (o *Output) FromMap(values map[string]interface{}) error

func (*Output) ToMap

func (o *Output) ToMap() map[string]interface{}

type Settings

type Settings struct {
	Connection connection.Manager `md:"connection,required"`
	Topic      string             `md:"topic,required"` // The Kafka topic on which to place the message
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL