rabbitmq

package
v1.0.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2021 License: Apache-2.0, Apache-2.0 Imports: 12 Imported by: 0

README

rabbitmq broker driver

基于 go-micro 原 rabbitmq 驱动组件修改

用以解决 micro.NewEvent 不能设置rabbitmq持久化消息的配置

  • rabbitmq.DeliveryMode(amqp.Persistent)

修改

  • options.go

//type deliveryMode struct{}
//type priorityKey struct{}

// 改为公共参数,可在外部设置

type DeliveryMode struct{}
type Priority struct{}
  • rabbitmq.go

func (r *rbroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
······
	if options.Context != nil {
		if value, ok := options.Context.Value(DeliveryMode{}).(uint8); ok {
			fmt.Println("DeliveryMode:", value)
			m.DeliveryMode = value
		}

		if value, ok := options.Context.Value(Priority{}).(uint8); ok {
			fmt.Println("Priority:", value)
			m.Priority = value
		}
	}
······
}
  • examples/services/asyncRabbitmq/event/handler/publish.go

func (s *DemoServiceHandler) publishSayHello(req string) error {

	// 设置DeliveryMode
	ctx := context.WithValue(context.Background(), rabbitmq.DeliveryMode{}, amqp.Persistent)
    // 设置Priority
	ctx = context.WithValue(ctx, rabbitmq.Priority{}, 0)

	// 发布消息
	p := micro.NewEvent(config.EVENT_B, s.Service.Client())
	if err := p.Publish(context.TODO(), &event.DemoEvent{
		City:        req,
		Timestamp:   time.Now().UTC().Unix(),
		Temperature: 28,
	}, 
            // 设置 PublishContext
            client.PublishContext(ctx),
	); err != nil {
		log.Printf("[pub] failed: %v", err)
	}
	return nil
}

Documentation

Overview

Package rabbitmq provides a RabbitMQ broker

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultExchange = Exchange{
		Name: "micro",
	}
	DefaultRabbitURL      = "amqp://guest:guest@127.0.0.1:5672"
	DefaultPrefetchCount  = 0
	DefaultPrefetchGlobal = false
	DefaultRequeueOnError = false
)

Functions

func AckOnSuccess

func AckOnSuccess() broker.SubscribeOption

AckOnSuccess will automatically acknowledge messages when no error is returned

func DurableExchange

func DurableExchange() broker.Option

DurableExchange is an option to set the Exchange to be durable

func DurableQueue

func DurableQueue() broker.SubscribeOption

DurableQueue creates a durable queue when subscribing.

func ExchangeName

func ExchangeName(e string) broker.Option

ExchangeName is an option to set the ExchangeName

func ExternalAuth

func ExternalAuth() broker.Option

func Headers

func Headers(h map[string]interface{}) broker.SubscribeOption

Headers adds headers used by the headers exchange

func NewBroker

func NewBroker(opts ...broker.Option) broker.Broker

func PrefetchCount

func PrefetchCount(c int) broker.Option

PrefetchCount ...

func PrefetchGlobal

func PrefetchGlobal() broker.Option

PrefetchGlobal creates a durable queue when subscribing.

func QueueArguments

func QueueArguments(h map[string]interface{}) broker.SubscribeOption

QueueArguments sets arguments for queue creation

func RequeueOnError

func RequeueOnError() broker.SubscribeOption

RequeueOnError calls Nack(muliple:false, requeue:true) on amqp delivery when handler returns error

func SubscribeContext

func SubscribeContext(ctx context.Context) broker.SubscribeOption

SubscribeContext set the context for broker.SubscribeOption

Types

type DeliveryMode

type DeliveryMode struct{}

type Exchange

type Exchange struct {
	// Name of the exchange
	Name string
	// Whether its persistent
	Durable bool
}

Exchange is the rabbitmq exchange

type ExternalAuthentication

type ExternalAuthentication struct {
}

func (*ExternalAuthentication) Mechanism

func (auth *ExternalAuthentication) Mechanism() string

func (*ExternalAuthentication) Response

func (auth *ExternalAuthentication) Response() string

type Priority

type Priority struct{}

type priorityKey struct{}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL