topkapi

package module
v0.0.0-...-47e7506 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2024 License: Apache-2.0 Imports: 20 Imported by: 0

README

= Topkapi - Kafka Topic API

== Links
* https://www.cloudkarafka.com/[Cloudkarafka -  Managed Apache Kafka clusters]
* https://github.com/CloudKarafka/go-kafka-example[Apache Kafka Producer/Consumer example in Go] and https://www.cloudkarafka.com/docs/go.html[confluent_kafka_go recommendation]
* https://www.cloudkarafka.com/docs/kafkacat.html[kafkacat is a generic non-JVM producer and consumer for Apache Kafka ...]
* Example for https://www.cloudkarafka.com/docs/spring.html[Spring Boot Integration]

== Kafkacat
.kafkacat list brokers
[source,shell script]
----
kafkacat -b $KAFKA_BROKERS -X security.protocol=SASL_SSL -X sasl.mechanism=SCRAM-SHA-256 -X sasl.username=$KAFKA_SASL_USERNAME -X sasl.password=$KAFKA_SASL_PASSWORD -L
----

Optional debug args: `-v -X debug=generic,broker,security`

.kafkacat produce
[source,shell script]
----
echo "Hello Kafkakatz from AmazonLinux" | kafkacat -b $KAFKA_BROKERS -X security.protocol=SASL_SSL -X sasl.mechanism=SCRAM-SHA-256 -X sasl.username=$KAFKA_SASL_USERNAME -X sasl.password=$KAFKA_SASL_PASSWORD -P -t ${KAFKA_SASL_USERNAME}-hase
----

.kafkacat consume
[source,shell script]
----
kafkacat -b $KAFKA_BROKERS -X security.protocol=SASL_SSL -X sasl.mechanism=SCRAM-SHA-256 -X sasl.username=$KAFKA_SASL_USERNAME -X sasl.password=$KAFKA_SASL_PASSWORD -C -t ${KAFKA_SASL_USERNAME}-hase
----

== Kowl
.run Kowl https://github.com/cloudhut/kowl
[source,shell script]
----
docker run --rm -p 8080:8080 -e KAFKA_BROKERS=$KAFKA_BROKERS -e KAFKA_TLS_ENABLED=true -e KAFKA_SASL_ENABLED=true -e KAFKA_SASL_MECHANISM=SCRAM-SHA-256 -e KAFKA_SASL_USERNAME=$KAFKA_SASL_USERNAME -e KAFKA_SASL_PASSWORD=$KAFKA_SASL_PASSWORD quay.io/cloudhut/kowl:master
----

== Tips

TIP: Salted Challenge Response Authentication Mechanism (SCRAM), or SASL/SCRAM, is a family of SASL mechanisms that addresses the security concerns with traditional mechanisms that perform username/password authentication like PLAIN

* https://medium.com/swlh/a-free-apache-kafka-cloud-service-and-how-to-quickly-get-started-with-it-8f14520fff35[A Free Apache Kafka Cloud Service — and how to quickly get started with it]

* https://github.com/Shopify/sarama/blob/master/tools/kafka-console-producer/kafka-console-producer.go[Sarama Code for Console Producer]

* https://github.com/Shopify/sarama/issues/994[Sample code for header creation in sync producer]

* https://documentation.freshpaint.io/frequently-asked-questions/what-should-i-name-my-events[What Should I Name My Events?]
+
[quote]
____
Use a standard naming pattern for creating events. We recommend using the Noun + Verb + Object structure, as used in the following example:  `Checkout: Submit Order` . Choose a standard tense (either past or present) and  regular capitalization pattern.
____
* https://cloudevents.io/[CloudEvents - A specification for describing event data in a common way]
+
----
{
    "specversion": "1.0",
    "id": "",
    "source": "example/uri",
    "type": "example.type",
    "datacontenttype": "application/json",
    "data": {
        "action": "auth:test",
        "message": "Let's have some Huggy Coffee coffee"
    }
}
----

Documentation

Index

Constants

View Source
const EventVersion = "1.0"

EventVersion is used in Kafka Record header

Variables

Functions

This section is empty.

Types

type Client

type Client struct {
	// Public
	Config *ClientConfig
	// contains filtered or unexported fields
}

func NewClient

func NewClient() *Client

NewClient creates a new client with automatic configuration based on envconfig and argv[0] as default clientId

func NewClientFromConfig

func NewClientFromConfig(config *ClientConfig) *Client

NewClientFromConfig creates a new client (note that producers will be initialized on demand, so no errors are expected)

func NewClientWithId

func NewClientWithId(clientId string) *Client

NewClientWithId creates a new client the given clientId and default config

func (*Client) Close

func (c *Client) Close()

Close closes the client, if syncProducer is initialized it will also close it

func (*Client) Consume

func (c *Client) Consume(messageHandler MessageHandler, topicsWithoutPrefix ...string) error

Consume is a blocking function that reads message from a topic todo add consumeOnce function or flag to prevent infinite loop

func (*Client) Enable

func (c *Client) Enable(enabled bool)

Enable disables all communication (functions can be called but will only log) Default value is true

func (*Client) NewEvent

func (c *Client) NewEvent(action string, message string) *Event

NewEvent inits a new event with reasonable defaults

func (*Client) PublishEvent

func (c *Client) PublishEvent(event *Event, topic string) (int32, int64, error)

PublishEvent expects an Event struct which it will serialize as json before pushing it to the topic

func (*Client) PublishMessage

func (c *Client) PublishMessage(message []byte, topic string) (int32, int64, error)

PublishMessage expects a byte message which it will push to the topic this is the actual handlers to which other publish functions such as PublishEvent delegate See also https://github.com/IBM/sarama/blob/master/tools/kafka-console-producer/kafka-console-producer.go

func (*Client) Usage

func (c *Client) Usage()

Usage prints usage to STDOUT, delegate to envconfig

func (*Client) Verbose

func (c *Client) Verbose(enabled bool)

Verbose configure verbose logging for sarama functions Default value is false

type ClientConfig

type ClientConfig struct {
	Brokers         string        `required:"true" desc:"Comma separated List of brokers" split_words:"true"`
	SaslUsername    string        `required:"true" desc:"User for SASL Auth" split_words:"true"`
	SaslPassword    string        `required:"true" desc:"Password for SASL Auth" split_words:"true"`
	SaslMechanism   string        `default:"SCRAM-SHA-256" required:"true" desc:"SASL Mechanism" split_words:"true"`
	TlsEnabled      bool          `default:"true" desc:"TLS Encryption active" split_words:"true"`
	SaslEnabled     bool          `default:"true" desc:"Use SASL Authentication" split_words:"true"`
	TopicPrefix     string        `default:"" desc:"Optional prefix, prepended to topic name and empty by default" split_words:"true"`
	Enabled         bool          `default:"true" desc:"Communication Enabled" split_words:"true"`
	Verbose         bool          `default:"false" desc:"Verbose Logging" split_words:"true"`
	ClientId        string        `default:"" desc:"ClientId, will be also used as default source" split_words:"true"`
	OffsetMode      string        `default:"newest" desc:"Default offset for consumer, values: newest or oldest" split_words:"true"`
	KafkaVersion    string        `default:"2.6.0" desc:"Version of Kafka, important for initiating consumer group"`
	ConsumerTimeout time.Duration `desc:"Duration how long the consumer is looping (default: forever)"`
}

ClientConfig derived from envConfig

func NewConfig

func NewConfig() *ClientConfig

type Event

type Event struct {
	Action  string `json:"action,omitempty"`
	Message string `json:"message,omitempty"`
	// Time.MarshalJSON returns
	// "The time is a quoted string in RFC 3339 format, with sub-second precision added if present."
	Time     time.Time `json:"time,omitempty"`
	Source   string    `json:"source,omitempty"`
	EntityId string    `json:"entityId,omitempty"`
}

type MessageHandler

type MessageHandler func(message *sarama.ConsumerMessage)

MessageHandler is our custom handler attached to the SaramaConsumer supposed to be passed in by the caller

type SaramaConsumer

type SaramaConsumer struct {
	// contains filtered or unexported fields
}

SaramaConsumer represents a SaramaConsumer consumer group consumer

func (*SaramaConsumer) Cleanup

func (consumer *SaramaConsumer) Cleanup(sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*SaramaConsumer) ConsumeClaim

func (consumer *SaramaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*SaramaConsumer) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

Begin calls HashGeneratorFcn.NewClient which constructs a SCRAM client component based on a given hash.Hash factory receiver

func (*XDGSCRAMClient) Done

func (x *XDGSCRAMClient) Done() bool

func (*XDGSCRAMClient) Step

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

type ZerologSaramaLogger

type ZerologSaramaLogger struct {
	// contains filtered or unexported fields
}

ZerologSaramaLogger Wrapper to satisfy sarama.StdLogger Interface with a zerolog.Logger instance

func (ZerologSaramaLogger) Print

func (z ZerologSaramaLogger) Print(v ...interface{})

func (ZerologSaramaLogger) Printf

func (z ZerologSaramaLogger) Printf(format string, v ...interface{})

Printf is the Most commonly used function

func (ZerologSaramaLogger) Println

func (z ZerologSaramaLogger) Println(v ...interface{})

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL