golibmsg

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 2, 2023 License: MIT Imports: 10 Imported by: 0

README

Golib Message Bus

Note We are moving out from Gitlab. All packages are now migrated to github.com/golibs-starter/*. Please consider updating.

Message Bus solutions for Golang project. Kafka is now supported.

Setup instruction

Base setup, see GoLib Instruction

Both go get and go mod are supported.

go get github.com/golibs-starter/golib-message-bus
Usage

Using fx.Option to include dependencies for injection.

package main

import (
	"github.com/golibs-starter/golib-message-bus"
	"github.com/golibs-starter/golib-message-bus/kafka/core"
	"github.com/golibs-starter/golib-message-bus/testutil"
	"go.uber.org/fx"
)

func main() {
	fx.New(
		// Required
		golibmsg.KafkaCommonOpt(),

		// When you want to create topics if it doesn't exist.
		golibmsg.KafkaAdminOpt(),

		// When you want to produce message to Kafka.
		golibmsg.KafkaProducerOpt(),

		// When you want to consume message from Kafka.
		golibmsg.KafkaConsumerOpt(),

		// When you want all consumers are ready before application started.
		// Helpful in the integration test, see example here:
		// https://github.com/golibs-starter/golib-sample/blob/develop/src/worker/testing/handler/send_order_to_delivery_provider_test.go
		golibmsg.KafkaConsumerReadyWaitOpt(),

		// When you want to enable graceful shutdown.
		// OnStop hooks will run in reverse order.
		golibmsg.OnStopProducerOpt(),
		golibmsg.OnStopConsumerOpt(),

		// When you want to register a consumer.
		// Consumer has to implement core.ConsumerHandler
		golibmsg.ProvideConsumer(NewCustomConsumer),


		// ==================== TEST UTILS =================
		// This useful in test when you want to
		// reset (remove) kafka consumer group every test run.
		// Eg: https://github.com/golibs-starter/golib-sample/-/tree/develop/src/worker/testing/test_suite.go
		golibmsgTestUtil.ResetKafkaConsumerGroupOpt(),

		// This useful when you want to collect messages during test.
		// This option needs to come with some configuration:
		// app.kafka.consumer:
		//      handlerMappings:
		//         MessageCollectorHandler:
		//             topics: # List of topics that messages will be collected.
		//                 - c1.http.request-completed.test
		//                 - c1.order.order-created.test
		//             groupId: c1.MessageCollectorHandler.test
		//             enable: true
		// Eg: https://github.com/golibs-starter/golib-sample/-/tree/develop/src/worker/testing/test_suite.go
		golibmsgTestUtil.MessageCollectorOpt(),
	).Run()
}

// CustomConsumer is implementation of core.ConsumerHandler
type CustomConsumer struct {
}

func NewCustomConsumer() core.ConsumerHandler {
	return &CustomConsumer{}
}

func (c CustomConsumer) HandlerFunc(message *core.ConsumerMessage) {
	// Will run when a message arrived
}

func (c CustomConsumer) Close() {
	// Will run when application stop
}

Configuration
app:
    kafka:
        # Configuration for KafkaCommonOpt()
        # This is global configuration for all admin/producer/consumer if it is not configured.
        # ========================================
        # ======== START GLOBAL CONFIG ===========
        # ========================================
        # Kafka brokers to connect to.
        # Separate with commas. By default, localhost:9092 is used.
        bootstrapServers: kafka1:9092,kafka2:9092

        # Whether to use TLS when connecting to the broker.
        # By default, unsecured connection is used (leave empty).
        securityProtocol: TLS

        # A user-provided string sent with every request to the brokers for logging, debugging, and auditing purposes.
        clientId: golib

        # TLS configuration when securityProtocol=TLS
        tls:
            # A file contains public key from a pair of files.
            # The file must contain PEM encoded data.
            certFileLocation: "config/certs/test.dev-cert.pem"

            # A file contains private key from a pair of files.
            # The file must contain PEM encoded data.
            keyFileLocation: "config/certs/test.dev-key.pem"

            # A file contains root certificate authorities
            # that clients use when verifying server certificates.
            caFileLocation: "config/certs/test.dev-ca.pem"

            # Controls whether a client verifies
            # the server's certificate chain and host name.
            insecureSkipVerify: false
        # ========================================
        # ========= END GLOBAL CONFIG ============
        # ========================================

        # Configuration for KafkaAdminOpt()
        # These fields which existing in global config
        # can be overridden as bellow.
        admin:
            bootstrapServers: kafka1:9092,kafka2:9092
            securityProtocol: TLS
            clientId: golib
            tls:
                certFileLocation: "config/certs/test.dev-cert.pem"
                keyFileLocation: "config/certs/test.dev-key.pem"
                caFileLocation: "config/certs/test.dev-ca.pem"
                insecureSkipVerify: false
            topics:
                -   name: c1.http-request # Topic name when auto create topics is enabled
                    partitions: 1 # The number of partitions when topic is created. Default: 1.
                    replicaFactor: 1 # The number of copies of a topic in a Kafka cluster. Default: 1
                    retention: 72h # The period of time the topic will retain old log segments before deleting or compacting them. Default 72h.
                -   name: c1.order.order-created
                    partitions: 1
                    replicaFactor: 1
                    retention: 72h

        # Configuration for KafkaProducerOpt()
        # These fields which existing in global config
        # can be overridden as bellow.
        producer:
            bootstrapServers: kafka1:9092,kafka2:9092
            securityProtocol: TLS
            clientId: golib
            tls:
                certFileLocation: "config/certs/test.dev-cert.pem"
                keyFileLocation: "config/certs/test.dev-key.pem"
                caFileLocation: "config/certs/test.dev-ca.pem"
                insecureSkipVerify: false
            flushMessages: 1
            flushFrequency: 1s
            eventMappings:
                RequestCompletedEvent:
                    topicName: c1.http-request # Defines the topic that event will be sent to.
                    transactional: false # Enable/disable transactional when sending event message.
                    disable: false # Enable/disable send event message
                OrderCreatedEvent:
                    topicName: c1.order.order-created
                    transactional: false
                    disable: true

        # Configuration for KafkaConsumerOpt()
        # These fields which existing in global config
        # can be overridden as bellow.
        consumer:
            bootstrapServers: kafka1:9092,kafka2:9092
            securityProtocol: TLS
            clientId: golib
            tls:
                certFileLocation: "config/certs/test.dev-cert.pem"
                keyFileLocation: "config/certs/test.dev-key.pem"
                caFileLocation: "config/certs/test.dev-ca.pem"
                insecureSkipVerify: false
            handlerMappings:
                PushRequestCompletedToElasticSearchHandler: # It has to equal to the struct name of consumer
                    topic: c1.http-request # The topic that consumed by consumer
                    groupId: c1.http-request.PushRequestCompletedEsHandler.local # The group that consumed by consumer
                    enable: true # Enable/disable consumer
                PushOrderToElasticSearchHandler:
                    topic: c1.order.order-created
                    groupId: c1.order.order-created.PushRequestCompletedEsHandler.local
                    enable: true
                MessageCollectorHandler:
                    topics: # When you want to consume multiple topics
                        - c1.http.request-completed.test
                        - c1.order.order-created.test
                    groupId: c1.MessageCollectorHandler.test
                    enable: true

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func KafkaAdminOpt

func KafkaAdminOpt() fx.Option

func KafkaCommonOpt

func KafkaCommonOpt() fx.Option

func KafkaConsumerOpt

func KafkaConsumerOpt() fx.Option

func KafkaConsumerReadyWaitOpt

func KafkaConsumerReadyWaitOpt() fx.Option

func KafkaProducerOpt

func KafkaProducerOpt() fx.Option

func NewSaramaConsumers

func NewSaramaConsumers(in KafkaConsumersIn) (core.Consumer, error)

func OnStartConsumerHook

func OnStartConsumerHook(lc fx.Lifecycle, consumer core.Consumer, golibCtx context.Context)

func OnStopConsumerHook

func OnStopConsumerHook(in OnStopConsumerIn)

func OnStopConsumerOpt

func OnStopConsumerOpt() fx.Option

func OnStopProducerHook

func OnStopProducerHook(in OnStopProducerIn)

func OnStopProducerOpt

func OnStopProducerOpt() fx.Option

func ProvideConsumer

func ProvideConsumer(handler interface{}) fx.Option

Types

type KafkaConsumersIn

type KafkaConsumersIn struct {
	fx.In
	GlobalProps   *properties.Client
	ConsumerProps *properties.KafkaConsumer
	SaramaMapper  *impl.SaramaMapper
	Handlers      []core.ConsumerHandler `group:"kafka_consumer_handler"`
}

type OnStopConsumerIn

type OnStopConsumerIn struct {
	fx.In
	Lc            fx.Lifecycle
	ConsumerGroup core.Consumer `optional:"true"`
}

type OnStopProducerIn

type OnStopProducerIn struct {
	fx.In
	Lc             fx.Lifecycle
	ProducerClient sarama.Client      `name:"sarama_producer_client" optional:"true"`
	SyncProducer   core.SyncProducer  `optional:"true"`
	AsyncProducer  core.AsyncProducer `optional:"true"`
}

Directories

Path Synopsis
kafka
log

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL