msgbuzz

package module
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 2, 2024 License: BSD-3-Clause Imports: 6 Imported by: 0

README

go-msgbuzz

Message Bus Abstraction with rabbitmq implementation

Usage

    package main
    
    import (
        "fmt"
        "github.com/sihendra/go-msgbuzz"
        "time"
    )   

    func main() {        
        // Create msgbuzz instance
        msgBus := msgbuzz.NewRabbitMqClient("amqp://127.0.0.1:5672", 4)
    
        // Register consumer of some topic
        msgBus.On("profile.created", "reco_engine", func(confirm msgbuzz.MessageConfirm, bytes []byte) error {
            defer confirm.Ack()
            fmt.Printf("Incoming message: %s", string(bytes))
    
            return nil
        })
        
        go func(client *msgbuzz.RabbitMqClient) {
            // Wait consumer start, if no consumer no message will be saved by rabbitmq
            time.Sleep(time.Second * 1)
    
            // Publish to topic
            msgBus.Publish("profile.created", []byte(`{"name":"Dodo"}`))
    
            // Wait for consumer picking the message before stopping
            time.Sleep(time.Second * 1)
            msgBus.Close()
        }(msgBus)
    
        // Will block until msgbuzz closed
        fmt.Println("Start Consuming")
        msgBus.StartConsuming()
        fmt.Println("Finish Consuming")

    }    

Testing

All Tests

Run $ make test-all to run all tests.

Unit Tests

Run $ make test-unit to run unit tests only.

Integration Tests

Run $ make test-integration to run integration tests only (require docker).

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithRabbitMqRoutingKey added in v0.1.4

func WithRabbitMqRoutingKey(routingKey string) func(*MessageBusOption)

Types

type MessageBus

type MessageBus interface {
	Publish(topicName string, msg []byte, options ...func(*MessageBusOption)) error
	On(topicName string, consumerName string, handlerFunc MessageHandler) error
}

type MessageBusOption added in v0.1.4

type MessageBusOption struct {
	RabbitMq RabbitMqOption
}

func (*MessageBusOption) GetRabbitMqExchangeType added in v0.1.4

func (m *MessageBusOption) GetRabbitMqExchangeType() string

type MessageConfirm

type MessageConfirm interface {
	Ack() error
	Nack() error
	Retry(delay int64, maxRetry int) error
}

type MessageHandler

type MessageHandler func(MessageConfirm, []byte) error

type QueueNameGenerator

type QueueNameGenerator struct {
	// contains filtered or unexported fields
}

func NewQueueNameGenerator

func NewQueueNameGenerator(topicName string, clientGroup string) *QueueNameGenerator

func (QueueNameGenerator) DlxExchange

func (q QueueNameGenerator) DlxExchange() string

func (QueueNameGenerator) DlxQueue

func (q QueueNameGenerator) DlxQueue() string

func (QueueNameGenerator) Exchange

func (q QueueNameGenerator) Exchange() string

func (QueueNameGenerator) Queue

func (q QueueNameGenerator) Queue() string

func (QueueNameGenerator) RetryExchange

func (q QueueNameGenerator) RetryExchange() string

func (QueueNameGenerator) RetryQueue

func (q QueueNameGenerator) RetryQueue() string

type RabbitMqClient

type RabbitMqClient struct {
	// contains filtered or unexported fields
}

RabbitMqClient RabbitMq implementation of MessageBus

func NewRabbitMqClient

func NewRabbitMqClient(conn string, threadNum int) *RabbitMqClient

func (*RabbitMqClient) Close

func (m *RabbitMqClient) Close() error

func (*RabbitMqClient) On

func (m *RabbitMqClient) On(topicName string, consumerName string, handlerFunc MessageHandler) error

func (*RabbitMqClient) Publish

func (m *RabbitMqClient) Publish(topicName string, body []byte, options ...func(*MessageBusOption)) error

func (*RabbitMqClient) SetMaxPubRetry added in v0.1.2

func (m *RabbitMqClient) SetMaxPubRetry(maxPubRetry int)

func (*RabbitMqClient) SetPubRetryStepTime added in v0.1.2

func (m *RabbitMqClient) SetPubRetryStepTime(pubRetryStepTime int64)

func (*RabbitMqClient) SetRcStepTime added in v0.1.2

func (m *RabbitMqClient) SetRcStepTime(t int64)

func (*RabbitMqClient) StartConsuming

func (m *RabbitMqClient) StartConsuming() error

type RabbitMqMessageConfirm

type RabbitMqMessageConfirm struct {
	// contains filtered or unexported fields
}

func NewRabbitMqMessageConfirm

func NewRabbitMqMessageConfirm(channel *amqp.Channel, delivery *amqp.Delivery, nameGenerator *QueueNameGenerator, body []byte) *RabbitMqMessageConfirm

func (*RabbitMqMessageConfirm) Ack

func (m *RabbitMqMessageConfirm) Ack() error

func (*RabbitMqMessageConfirm) Nack

func (m *RabbitMqMessageConfirm) Nack() error

func (*RabbitMqMessageConfirm) Retry

func (m *RabbitMqMessageConfirm) Retry(delay int64, maxRetry int) error

func (*RabbitMqMessageConfirm) TotalRetried

func (m *RabbitMqMessageConfirm) TotalRetried() (int, error)

type RabbitMqOption added in v0.1.4

type RabbitMqOption struct {
	RoutingKey string
}

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL