Documentation ¶
Index ¶
Examples ¶
Constants ¶
const ( Redis = EndpointProtocol("redis") // Redis AMQP = EndpointProtocol("amqp") // AMQP )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AMQPSubscriber ¶
type AMQPSubscriber struct { *Endpoint // contains filtered or unexported fields }
AMQPSubscriber represents a subscriber, which consumes messages from AMQP
func (*AMQPSubscriber) Close ¶
func (sub *AMQPSubscriber) Close()
Close closes the subscriber gracefully, it blocks until all messages are handled
func (*AMQPSubscriber) Run ¶
func (sub *AMQPSubscriber) Run()
Run starts the subscriber and blocks until the subscriber is closed
type ActionFunc ¶
type ActionFunc func(args ...interface{})
ActionFunc is the function that hanlding messages args is composed of context-related parameters
amqp args[0] should be amqp.Delivery
redis args[0] should be ...
type Endpoint ¶
type Endpoint struct { Protocol EndpointProtocol Original string Redis struct { Addr string Password string Channels []string } AMQP struct { URI string ExchangeName string QueueName string RouteKey []string Ack bool Exclusive bool Type string } }
Endpoint represents an endpoint
type EndpointProtocol ¶
type EndpointProtocol string
EndpointProtocol is the type of protocol that the endpoint represents.
type RedisSubscriber ¶
type RedisSubscriber struct { *Endpoint // contains filtered or unexported fields }
RedisSubscriber represents a subscriber, which consumes messages from redis
func (*RedisSubscriber) Close ¶
func (sub *RedisSubscriber) Close()
Close closes the subscriber gracefully, it blocks until all messages are finished
func (*RedisSubscriber) Run ¶
func (sub *RedisSubscriber) Run()
Run starts the subscriber and blocks until the subscriber is closed
type Setup ¶
type Setup struct { ActionFunc ActionFunc URL string }
type Subscriber ¶
type Subscriber interface { Run() Close() }
type SubscriberManager ¶
type SubscriberManager struct {
// contains filtered or unexported fields
}
SubscriberManager is a manager to control subscribers
Example ¶
logger := logrus.New() subMgr := NewSubscriberManager(logger) subMgr.Register( "TestAMQPSubscriber", &Setup{ URL: "amqp://root:root@rabbitmq:5672/test.amqp.exchange1/test.amqp.queue1?route=foo&route=bar&ack=false&type=direct", ActionFunc: func(args ...interface{}) { delivery := args[0].(amqp.Delivery) delivery.Ack(false) }, }, ) subMgr.Register( "TestRedisSubscriber", &Setup{ URL: "redis://:password@redis:6379/?channel=foo&channel=bar", ActionFunc: func(args ...interface{}) { message := args[0].(*redis.Message).Payload fmt.Println(message) }, }, ) subMgr.Run() // Stop the subscribers subMgr.GracefulStop()
Output:
func NewSubscriberManager ¶
func NewSubscriberManager(log logger) *SubscriberManager
NewSubscriberManager creates a mangager
func (*SubscriberManager) GracefulStop ¶
func (sm *SubscriberManager) GracefulStop()
GracefulStop stops the manager gracefully. It stops the subscribers from accepting new messages and blocks until all the pending messages are finished.
func (*SubscriberManager) Register ¶
func (sm *SubscriberManager) Register(name string, setup *Setup) error
func (*SubscriberManager) Run ¶
func (sm *SubscriberManager) Run()
Run starts the subscribers that the manager controls
func (*SubscriberManager) Validate ¶
func (sm *SubscriberManager) Validate(url string) error
Validate validates if a url is valid