broker

package
v1.17.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

README

Broker

Include default broker (Kafka & RabbitMQ), or other broker (GCP PubSub, STOMP/AMQ) can be found in candi plugin.

Kafka

Register Kafka broker in service config

Modify configs/configs.go in your service

package configs

import (
	"github.com/golangid/candi/broker"
...

// LoadServiceConfigs load selected dependency configuration in this service
func LoadServiceConfigs(baseCfg *config.Config) (deps dependency.Dependency) {
	
		...

		brokerDeps := broker.InitBrokers(
			broker.NewKafkaBroker(),
		)

		... 
}

If you want to use Kafka consumer, just set USE_KAFKA_CONSUMER=true in environment variable, and follow this example.

If you want to use Kafka publisher in your usecase, follow this example code:

package usecase

import (
	"context"

	"github.com/golangid/candi/candishared"
	"github.com/golangid/candi/codebase/factory/dependency"
	"github.com/golangid/candi/codebase/factory/types"
	"github.com/golangid/candi/codebase/interfaces"
)

type usecaseImpl {
	kafkaPub interfaces.Publisher
}

func NewUsecase(deps dependency.Dependency) Usecase {
	return &usecaseImpl{
		kafkaPub: deps.GetBroker(types.Kafka).GetPublisher(),
	}
}

func (uc *usecaseImpl) UsecaseToPublishMessage(ctx context.Context) error {
	err := uc.kafkaPub.PublishMessage(ctx, &candishared.PublisherArgument{
		Topic:  "example-topic",
		Data:   "hello world",
	})
	return err
}

RabbitMQ

Register RabbitMQ broker in service config

Modify configs/configs.go in your service

package configs

import (
	"github.com/golangid/candi/broker"
...

// LoadServiceConfigs load selected dependency configuration in this service
func LoadServiceConfigs(baseCfg *config.Config) (deps dependency.Dependency) {
	
		...

		brokerDeps := broker.InitBrokers(
			broker.NewRabbitMQBroker(),
		)

		... 
}

If you want to use RabbitMQ consumer, just set USE_RABBITMQ_CONSUMER=true in environment variable, and follow this example.

If you want to use RabbitMQ publisher in your usecase, follow this example code:

package usecase

import (
	"context"

	"github.com/golangid/candi/broker"
	"github.com/golangid/candi/candishared"
	"github.com/golangid/candi/codebase/factory/dependency"
	"github.com/golangid/candi/codebase/factory/types"
	"github.com/golangid/candi/codebase/interfaces"
)

type usecaseImpl {
	rabbitmqPub interfaces.Publisher
}

func NewUsecase(deps dependency.Dependency) Usecase {
	return &usecaseImpl{
		rabbitmqPub: deps.GetBroker(types.RabbitMQ).GetPublisher(),
	}
}

func (uc *usecaseImpl) UsecaseToPublishMessage(ctx context.Context) error {
	err := uc.rabbitmqPub.PublishMessage(ctx, &candishared.PublisherArgument{
		Topic:  "example-topic",
		Data:   "hello world"
		Header: map[string]interface{}{
			broker.RabbitMQDelayHeader: 5000, // if you want set delay consume your message by active consumer for 5 seconds
		},
	})
	return err
}

Documentation

Index

Constants

View Source
const (
	// RabbitMQDelayHeader header key, value in millisecond
	RabbitMQDelayHeader = "x-delay"
)
View Source
const (
	// RedisBrokerKey key constant
	RedisBrokerKey = "dynamic_scheduling"
)

Variables

This section is empty.

Functions

func GenerateKeyDeleteRedisPubSubMessage added in v1.14.9

func GenerateKeyDeleteRedisPubSubMessage(topic string, message interface{}) string

GenerateKeyDeleteRedisPubSubMessage delete redis key pubsub message pattern

func GetDefaultKafkaConfig added in v1.11.23

func GetDefaultKafkaConfig(additionalConfigFunc ...func(*sarama.Config)) *sarama.Config

GetDefaultKafkaConfig construct default kafka config

func NewKafkaPublisher

func NewKafkaPublisher(client sarama.Client, async bool) interfaces.Publisher

NewKafkaPublisher setup only kafka publisher with client connection

func NewRabbitMQPublisher

func NewRabbitMQPublisher(conn *amqp.Connection, exchange string) *rabbitMQPublisher

NewRabbitMQPublisher setup only rabbitmq publisher with client connection

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker model

func InitBrokers

func InitBrokers(brokers ...interfaces.Broker) *Broker

InitBrokers register all broker for publisher or consumer

* for Kafka, pass NewKafkaBroker(...KafkaOptionFunc) in param, init kafka broker configuration from env KAFKA_BROKERS, KAFKA_CLIENT_ID, KAFKA_CLIENT_VERSION

* for RabbitMQ, pass NewRabbitMQBroker(...RabbitMQOptionFunc) in param, init rabbitmq broker configuration from env RABBITMQ_BROKER, RABBITMQ_CONSUMER_GROUP, RABBITMQ_EXCHANGE_NAME

func (*Broker) Disconnect

func (b *Broker) Disconnect(ctx context.Context) error

Disconnect disconnect all registered broker

func (*Broker) GetBrokers

func (b *Broker) GetBrokers() map[types.Worker]interfaces.Broker

GetBrokers get all registered broker

func (*Broker) RegisterBroker

func (b *Broker) RegisterBroker(brokerName types.Worker, bk interfaces.Broker)

RegisterBroker register new broker

type KafkaBroker

type KafkaBroker struct {
	WorkerType types.Worker
	BrokerHost []string
	Config     *sarama.Config
	Client     sarama.Client
	// contains filtered or unexported fields
}

KafkaBroker configuration

func NewKafkaBroker

func NewKafkaBroker(opts ...KafkaOptionFunc) *KafkaBroker

NewKafkaBroker setup kafka configuration for publisher or consumer, empty option param for default configuration (with default worker type is types.Kafka)

func (*KafkaBroker) Disconnect

func (k *KafkaBroker) Disconnect(ctx context.Context) error

Disconnect method

func (*KafkaBroker) GetName added in v1.7.2

func (k *KafkaBroker) GetName() types.Worker

GetName method

func (*KafkaBroker) GetPublisher

func (k *KafkaBroker) GetPublisher() interfaces.Publisher

GetPublisher method

func (*KafkaBroker) Health

func (k *KafkaBroker) Health() map[string]error

Health method

type KafkaOptionFunc

type KafkaOptionFunc func(*KafkaBroker)

KafkaOptionFunc func type

func KafkaSetBrokerHost added in v1.7.2

func KafkaSetBrokerHost(brokers []string) KafkaOptionFunc

KafkaSetBrokerHost set custom broker host

func KafkaSetConfig

func KafkaSetConfig(cfg *sarama.Config) KafkaOptionFunc

KafkaSetConfig set custom sarama configuration

func KafkaSetPublisher

func KafkaSetPublisher(pub interfaces.Publisher) KafkaOptionFunc

KafkaSetPublisher set custom publisher

func KafkaSetWorkerType added in v1.17.2

func KafkaSetWorkerType(workerType types.Worker) KafkaOptionFunc

KafkaSetWorkerType set worker type

type RabbitMQBroker

type RabbitMQBroker struct {
	WorkerType types.Worker
	BrokerHost string
	Exchange   string
	Conn       *amqp.Connection
	Channel    *amqp.Channel
	// contains filtered or unexported fields
}

RabbitMQBroker broker

func NewRabbitMQBroker

func NewRabbitMQBroker(opts ...RabbitMQOptionFunc) *RabbitMQBroker

NewRabbitMQBroker setup rabbitmq configuration for publisher or consumer, default connection from RABBITMQ_BROKER environment (with default worker type is types.RabbitMQ)

func (*RabbitMQBroker) Disconnect

func (r *RabbitMQBroker) Disconnect(ctx context.Context) error

Disconnect method

func (*RabbitMQBroker) GetName added in v1.7.2

func (r *RabbitMQBroker) GetName() types.Worker

GetName method

func (*RabbitMQBroker) GetPublisher

func (r *RabbitMQBroker) GetPublisher() interfaces.Publisher

GetPublisher method

func (*RabbitMQBroker) Health

func (r *RabbitMQBroker) Health() map[string]error

Health method

type RabbitMQOptionFunc

type RabbitMQOptionFunc func(*RabbitMQBroker)

RabbitMQOptionFunc func type

func RabbitMQSetBrokerHost added in v1.7.2

func RabbitMQSetBrokerHost(brokers string) RabbitMQOptionFunc

RabbitMQSetBrokerHost set custom broker host

func RabbitMQSetChannel

func RabbitMQSetChannel(ch *amqp.Channel) RabbitMQOptionFunc

RabbitMQSetChannel set custom channel configuration

func RabbitMQSetExchange added in v1.17.2

func RabbitMQSetExchange(exchange string) RabbitMQOptionFunc

RabbitMQSetExchange set exchange

func RabbitMQSetPublisher

func RabbitMQSetPublisher(pub interfaces.Publisher) RabbitMQOptionFunc

RabbitMQSetPublisher set custom publisher

func RabbitMQSetWorkerType added in v1.17.2

func RabbitMQSetWorkerType(workerType types.Worker) RabbitMQOptionFunc

RabbitMQSetWorkerType set worker type

type RedisBroker added in v1.14.9

type RedisBroker struct {
	WorkerType types.Worker
	Pool       *redis.Pool
	// contains filtered or unexported fields
}

func NewRedisBroker added in v1.14.9

func NewRedisBroker(pool *redis.Pool, opts ...RedisOptionFunc) *RedisBroker

NewRedisBroker setup redis for publish message (with default worker type is types.RedisSubscriber)

func (*RedisBroker) Disconnect added in v1.14.9

func (r *RedisBroker) Disconnect(ctx context.Context) error

Disconnect method

func (*RedisBroker) GetName added in v1.14.9

func (r *RedisBroker) GetName() types.Worker

GetName method

func (*RedisBroker) GetPublisher added in v1.14.9

func (r *RedisBroker) GetPublisher() interfaces.Publisher

GetPublisher method

func (*RedisBroker) Health added in v1.14.9

func (r *RedisBroker) Health() map[string]error

Health method

func (*RedisBroker) InitPubSubConn added in v1.17.2

func (r *RedisBroker) InitPubSubConn() *redis.PubSubConn

InitPubSubConn method, return redis pubsub connection

func (*RedisBroker) PublishMessage added in v1.14.9

func (r *RedisBroker) PublishMessage(ctx context.Context, args *candishared.PublisherArgument) (err error)

PublishMessage method

type RedisMessage added in v1.14.9

type RedisMessage struct {
	HandlerName string `json:"h"`
	Key         string `json:"key"`
	Message     string `json:"message,omitempty"`
	EventID     string `json:"id,omitempty"`
}

RedisMessage messaging model for redis subscriber key

func ParseRedisPubSubKeyTopic added in v1.14.9

func ParseRedisPubSubKeyTopic(key []byte) (redisMessage RedisMessage)

ParseRedisPubSubKeyTopic parse key to redis message

type RedisOptionFunc added in v1.14.9

type RedisOptionFunc func(*RedisBroker)

RedisOptionFunc func type

func RedisSetConfigCommands added in v1.14.9

func RedisSetConfigCommands(commands ...string) RedisOptionFunc

RedisSetConfigCommands set config commands

func RedisSetSubscribeChannels added in v1.14.9

func RedisSetSubscribeChannels(channels ...string) RedisOptionFunc

RedisSetSubscribeChannels set channels

func RedisSetWorkerType added in v1.17.2

func RedisSetWorkerType(workerType types.Worker) RedisOptionFunc

RedisSetWorkerType set worker type

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL