amqpwrp

package module
v0.0.0-...-ba9acb7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2019 License: MIT Imports: 4 Imported by: 0

README

AmqpWRP

pros : handling reconnect, has pool of channels for blocking operations with server. also pool of channels gives more throughput on single connection cons: ack on each delivery (wanna to make ack after all prefetch received) todo: wanna add non blocking and non safe publish

Usage

Consume
stopFunc := amqpwrp.NewConsume(&amqpwrp.ConnConfig{
		Address:     "amqp://guest:guest@localhost:5672/",
		ChannelPool: 10,
	}, &amqpwrp.ConsumeConfig{
		Prefetch: 100,
		Queue:    "queueName",
		Args:     amqp.Table{},
		Callback: func(d amqp.Delivery) (e error, b bool) {
			cl := &RouteCollection{}
			err := json.Unmarshal(d.Body, &cl)
			if err != nil {
				return err, false
			}
			if len(cl.Key) == 0 {
				return fmt.Errorf("empty collection"), false
			}
			channelToPublish <- cl
			return nil, false
		},
	})
Publish
pubCh, stopFunc := amqpwrp.NewPublish(&amqpwrp.ConnConfig{
		Address:     "amqp://guest:guest@localhost:5672/",
		ChannelPool: 10,
	}, &amqpwrp.PublishConfig{
		Exchange:  "exchangeName",
		Key:       "queueName",
	})

....

// push message to channel
for _, cp := range cityPairList {
		for _, d := range dateList {
			pubCh <- amqp.Publishing{
				ContentType: "application/json",
				Body:        cp.toJsonWithDate(d),
			}
		}
	}
	// stop is blocking operation, pushing all remaining message and close resources
	stopFunc()

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConnConfig

type ConnConfig struct {
	Address     string
	ChannelPool int
}

type ConsumeConfig

type ConsumeConfig struct {
	Prefetch int
	Queue    string
	Consumer string

	Args     amqp.Table
	Callback func(d amqp.Delivery) (error, bool)
	// contains filtered or unexported fields
}

type PublishConfig

type PublishConfig struct {
	Exchange string
	Key      string
	// contains filtered or unexported fields
}

type StopFunc

type StopFunc func()

func NewConsume

func NewConsume(connConfig *ConnConfig, consumeConfig *ConsumeConfig) StopFunc

func NewPublish

func NewPublish(c *ConnConfig, p *PublishConfig) (chan<- amqp.Publishing, StopFunc)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL