broker

package
v1.7.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2021 License: Apache-2.0 Imports: 13 Imported by: 1

README

Broker

Include default broker (Kafka & RabbitMQ), or other broker (GCP PubSub, STOMP/AMQ) can be found in candi plugin.

Kafka

Register Kafka broker in service config

Modify configs/configs.go in your service

package configs

import (
	"pkg.agungdp.dev/candi/broker"
...

// LoadServiceConfigs load selected dependency configuration in this service
func LoadServiceConfigs(baseCfg *config.Config) (deps dependency.Dependency) {
	
		...

		brokerDeps := broker.InitBrokers(
			broker.NewKafkaBroker(),
		)

		... 
}

If you want to use Kafka consumer, just set USE_KAFKA_CONSUMER=true in environment variable, and follow this example.

If you want to use Kafka publisher in your usecase, follow this example code:

package usecase

import (
	"context"

	"pkg.agungdp.dev/candi/candishared"
	"pkg.agungdp.dev/candi/codebase/factory/dependency"
	"pkg.agungdp.dev/candi/codebase/factory/types"
	"pkg.agungdp.dev/candi/codebase/interfaces"
)

type usecaseImpl {
	kafkaPub interfaces.Publisher
}

func NewUsecase(deps dependency.Dependency) Usecase {
	return &usecaseImpl{
		kafkaPub: deps.GetBroker(types.Kafka).GetPublisher(),
	}
}

func (uc *usecaseImpl) UsecaseToPublishMessage(ctx context.Context) error {
	err := uc.kafkaPub.PublishMessage(ctx, &candishared.PublisherArgument{
		Topic:  "example-topic",
		Data:   "hello world",
	})
	return err
}

RabbitMQ

Register RabbitMQ broker in service config

Modify configs/configs.go in your service

package configs

import (
	"pkg.agungdp.dev/candi/broker"
...

// LoadServiceConfigs load selected dependency configuration in this service
func LoadServiceConfigs(baseCfg *config.Config) (deps dependency.Dependency) {
	
		...

		brokerDeps := broker.InitBrokers(
			broker.NewRabbitMQBroker(),
		)

		... 
}

If you want to use RabbitMQ consumer, just set USE_RABBITMQ_CONSUMER=true in environment variable, and follow this example.

If you want to use RabbitMQ publisher in your usecase, follow this example code:

package usecase

import (
	"context"

	"pkg.agungdp.dev/candi/broker"
	"pkg.agungdp.dev/candi/candishared"
	"pkg.agungdp.dev/candi/codebase/factory/dependency"
	"pkg.agungdp.dev/candi/codebase/factory/types"
	"pkg.agungdp.dev/candi/codebase/interfaces"
)

type usecaseImpl {
	rabbitmqPub interfaces.Publisher
}

func NewUsecase(deps dependency.Dependency) Usecase {
	return &usecaseImpl{
		rabbitmqPub: deps.GetBroker(types.RabbitMQ).GetPublisher(),
	}
}

func (uc *usecaseImpl) UsecaseToPublishMessage(ctx context.Context) error {
	err := uc.rabbitmqPub.PublishMessage(ctx, &candishared.PublisherArgument{
		Topic:  "example-topic",
		Data:   "hello world"
		Header: map[string]interface{}{
			broker.RabbitMQDelayHeader: 5000, // if you want set delay consume your message by active consumer for 5 seconds
		},
	})
	return err
}

Documentation

Index

Constants

View Source
const (
	// RabbitMQDelayHeader header key, value in millisecond
	RabbitMQDelayHeader = "x-delay"
)

Variables

This section is empty.

Functions

func NewKafkaPublisher

func NewKafkaPublisher(client sarama.Client, async bool) interfaces.Publisher

NewKafkaPublisher setup only kafka publisher with client connection

func NewRabbitMQPublisher

func NewRabbitMQPublisher(conn *amqp.Connection) interfaces.Publisher

NewRabbitMQPublisher setup only rabbitmq publisher with client connection

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker model

func InitBrokers

func InitBrokers(brokers ...interfaces.Broker) *Broker

InitBrokers register all broker for publisher or consumer

* for Kafka, pass NewKafkaBroker(...KafkaOptionFunc) in param, init kafka broker configuration from env KAFKA_BROKERS, KAFKA_CLIENT_ID, KAFKA_CLIENT_VERSION

* for RabbitMQ, pass NewRabbitMQBroker(...RabbitMQOptionFunc) in param, init rabbitmq broker configuration from env RABBITMQ_BROKER, RABBITMQ_CONSUMER_GROUP, RABBITMQ_EXCHANGE_NAME

func (*Broker) Disconnect

func (b *Broker) Disconnect(ctx context.Context) error

Disconnect disconnect all registered broker

func (*Broker) GetBrokers

func (b *Broker) GetBrokers() map[types.Worker]interfaces.Broker

GetBrokers get all registered broker

func (*Broker) RegisterBroker

func (b *Broker) RegisterBroker(brokerName types.Worker, bk interfaces.Broker)

RegisterBroker register new broker

type KafkaBroker

type KafkaBroker struct {
	// contains filtered or unexported fields
}

KafkaBroker configuration

func NewKafkaBroker

func NewKafkaBroker(opts ...KafkaOptionFunc) *KafkaBroker

NewKafkaBroker setup kafka configuration for publisher or consumer, empty option param for default configuration

func (*KafkaBroker) Disconnect

func (k *KafkaBroker) Disconnect(ctx context.Context) error

Disconnect method

func (*KafkaBroker) GetConfiguration

func (k *KafkaBroker) GetConfiguration() interface{}

GetConfiguration method

func (*KafkaBroker) GetName added in v1.7.2

func (k *KafkaBroker) GetName() types.Worker

GetName method

func (*KafkaBroker) GetPublisher

func (k *KafkaBroker) GetPublisher() interfaces.Publisher

GetPublisher method

func (*KafkaBroker) Health

func (k *KafkaBroker) Health() map[string]error

Health method

type KafkaOptionFunc

type KafkaOptionFunc func(*KafkaBroker)

KafkaOptionFunc func type

func KafkaSetBrokerHost added in v1.7.2

func KafkaSetBrokerHost(brokers []string) KafkaOptionFunc

KafkaSetBrokerHost set custom broker host

func KafkaSetConfig

func KafkaSetConfig(cfg *sarama.Config) KafkaOptionFunc

KafkaSetConfig set custom sarama configuration

func KafkaSetPublisher

func KafkaSetPublisher(pub interfaces.Publisher) KafkaOptionFunc

KafkaSetPublisher set custom publisher

type RabbitMQBroker

type RabbitMQBroker struct {
	// contains filtered or unexported fields
}

RabbitMQBroker broker

func NewRabbitMQBroker

func NewRabbitMQBroker(opts ...RabbitMQOptionFunc) *RabbitMQBroker

NewRabbitMQBroker setup rabbitmq configuration for publisher or consumer, default connection from RABBITMQ_BROKER environment

func (*RabbitMQBroker) Disconnect

func (r *RabbitMQBroker) Disconnect(ctx context.Context) error

Disconnect method

func (*RabbitMQBroker) GetConfiguration

func (r *RabbitMQBroker) GetConfiguration() interface{}

GetConfiguration method

func (*RabbitMQBroker) GetName added in v1.7.2

func (r *RabbitMQBroker) GetName() types.Worker

GetName method

func (*RabbitMQBroker) GetPublisher

func (r *RabbitMQBroker) GetPublisher() interfaces.Publisher

GetPublisher method

func (*RabbitMQBroker) Health

func (r *RabbitMQBroker) Health() map[string]error

Health method

type RabbitMQOptionFunc

type RabbitMQOptionFunc func(*RabbitMQBroker)

RabbitMQOptionFunc func type

func RabbitMQSetBrokerHost added in v1.7.2

func RabbitMQSetBrokerHost(brokers string) RabbitMQOptionFunc

RabbitMQSetBrokerHost set custom broker host

func RabbitMQSetChannel

func RabbitMQSetChannel(ch *amqp.Channel) RabbitMQOptionFunc

RabbitMQSetChannel set custom channel configuration

func RabbitMQSetPublisher

func RabbitMQSetPublisher(pub interfaces.Publisher) RabbitMQOptionFunc

RabbitMQSetPublisher set custom publisher

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL