streaming

package module
v0.0.0-...-b9949c2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2023 License: Apache-2.0 Imports: 4 Imported by: 0

README

streaming

Go Report Card LICENSE LICENSE

streaming

Streaming is a client library, where the input and output data are stored in Kafka clusters.

Introduction

Streaming is a library written for kafka streamming processor,.

Basic Usage

Installation
go get -u github.com/flyaways/streaming
Usage

Streaming Processor

package main

import (
	"log"
	"os"
	"os/signal"

	"github.com/Shopify/sarama"
	cluster "github.com/bsm/sarama-cluster"
	"github.com/flyaways/streaming"
)

func Processor(msg *sarama.ConsumerMessage, outTopic []string) ([]*sarama.ProducerMessage, error) {
	msgs := []*sarama.ProducerMessage{}
	if msg.Topic == "input-topic-2" {
		msgs = append(msgs, &sarama.ProducerMessage{
			Topic: outTopic[0],
			Key:   sarama.ByteEncoder(msg.Key),
			Value: sarama.ByteEncoder(msg.Value),
		})
	}
	return msgs, nil
}

func main() {
	if err := streaming.NewStreaming(
		[]string{"127.0.0.1:9092"},
		[]string{"input-topic1", "input-topic-2"},
		[]string{"output-topic1", "output-topic"},
		"flyaways-streaming-group",
		cluster.NewConfig(),
		Processor); err != nil {
		log.Panic(err)
	}

	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)
	<-signals
}

Credits

Licenses

https://www.apache.org/licenses/LICENSE-2.0

FOSSA Status

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewStreaming

func NewStreaming(seedbrokers, inTopic, outTopic []string, groupID string, config *cluster.Config, cb CallBack) (err error)

NewStreaming New Streaming

func NewWithConfig

func NewWithConfig(s *Streaming) (err error)

NewWithConfig New With Config

Types

type CallBack

type CallBack func(*sarama.ConsumerMessage, []string) ([]*sarama.ProducerMessage, error)

CallBack Call Back

type Streaming

type Streaming struct {
	SeedBrokers []string
	Config      *cluster.Config
	InTopic     []string
	OutTopic    []string
	GroupID     string
	CallBack    CallBack
	Logger      sarama.StdLogger
	// contains filtered or unexported fields
}

Streaming config

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL