inputkafka

package
v0.0.0-...-c13075e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2024 License: MIT Imports: 13 Imported by: 0

README

gogstash input kafka

Synopsis

input:
  # type Must be "kafka"
  - type: kafka

    # kafka version, (required)
    version: 0.10.2.0

    # kafka brokers host:port, (required)
    brokers:
      - 127.0.0.1:9092

    # topic for kafka client to listen, (required)
    topics:
      - testTopic

    # consumer group, (required)
    group: log_center

    # Kafka consumer consume initial offset from oldest
    offset_oldest: true

    # Consumer group partition assignment strategy (range, roundrobin)
    assignor: roundrobin

    # use SASL authentication (optional)
    security_protocol: SASL
    sasl_username: you-username
    sasl_password: you-password

Documentation

Index

Constants

View Source
const ModuleName = "kafka"

ModuleName is the name used in config file

Variables

Functions

func InitHandler

func InitHandler(
	ctx context.Context,
	raw config.ConfigRaw,
	control config.Control,
) (config.TypeInputConfig, error)

InitHandler initialize the input plugin

Types

type InputConfig

type InputConfig struct {
	config.InputConfig
	Version          string   `json:"version"`                     // Kafka cluster version, eg: 0.10.2.0
	Brokers          []string `json:"brokers"`                     // Kafka bootstrap brokers to connect to, as a comma separated list
	Topics           []string `json:"topics"`                      // Kafka topics to be consumed, as a comma separated list
	Group            string   `json:"group"`                       // Kafka consumer group definition
	OffsetOldest     bool     `json:"offset_oldest"`               // Kafka consumer consume initial offset from oldest
	Assignor         string   `json:"assignor"`                    // Consumer group partition assignment strategy (range, roundrobin)
	SecurityProtocol string   `json:"security_protocol,omitempty"` // use SASL authentication
	SaslMechanism    string   `json:"sasl_mechanism,omitempty"`    // use SASL mechanism
	User             string   `json:"sasl_username,omitempty"`     // SASL authentication username
	Password         string   `json:"sasl_password,omitempty"`     // SASL authentication password
	// contains filtered or unexported fields
}

InputConfig holds the configuration json fields and internal objects

func DefaultInputConfig

func DefaultInputConfig() InputConfig

DefaultInputConfig returns an InputConfig struct with default values

func (*InputConfig) Start

func (t *InputConfig) Start(ctx context.Context, msgChan chan<- logevent.LogEvent) (err error)

Start wraps the actual function starting the plugin

type SCRAMClient

type SCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

func (*SCRAMClient) Begin

func (x *SCRAMClient) Begin(userName, password, authzID string) (err error)

func (*SCRAMClient) Done

func (x *SCRAMClient) Done() bool

func (*SCRAMClient) Step

func (x *SCRAMClient) Step(challenge string) (response string, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL