subscriber

package module
v0.0.0-...-273f4bd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2019 License: MIT Imports: 11 Imported by: 0

README

subscriber

GoDoc Build Status Go Report Card Test Coverage

Subscriber is a simple implementation of the subscribers in Pub/Sub pattern. It's easy to use and provide you most of the basic functionality you need.

Usage

import (
	"github.com/sirupsen/logrus"
	"github.com/streadway/amqp"
)

logger := logrus.New()
subMgr := NewSubscriberManager(logger)
subMgr.Register(
    "TestAMQPSubscriber",
    &Setup{
        URL: "amqp://root:root@rabbitmq:5672/test.amqp.exchange1/test.amqp.queue1?route=foo&route=bar&ack=false&type=direct",
        ActionFunc: func(args ...interface{}) {
            delivery := args[0].(amqp.Delivery)
            delivery.Ack(false)
        },
    },
)
subMgr.Register(
    "TestRedisSubscriber",
    &Setup{
        URL: "redis://:password@redis:6379/?channel=foo&channel=bar",
        ActionFunc: func(args ...interface{}) {
            message := args[0].(*redis.Message).Payload
            fmt.Println(message)
        },
    },
)
subMgr.Run()

// Stop the subscribers
subMgr.GracefulStop()

Roadmap

  • Add support for AMQP
  • Add support for Redis

Documentation

Index

Examples

Constants

View Source
const (
	Redis = EndpointProtocol("redis") // Redis
	AMQP  = EndpointProtocol("amqp")  // AMQP
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AMQPSubscriber

type AMQPSubscriber struct {
	*Endpoint
	// contains filtered or unexported fields
}

AMQPSubscriber represents a subscriber, which consumes messages from AMQP

func (*AMQPSubscriber) Close

func (sub *AMQPSubscriber) Close()

Close closes the subscriber gracefully, it blocks until all messages are handled

func (*AMQPSubscriber) Run

func (sub *AMQPSubscriber) Run()

Run starts the subscriber and blocks until the subscriber is closed

type ActionFunc

type ActionFunc func(args ...interface{})

ActionFunc is the function that hanlding messages args is composed of context-related parameters

amqp args[0] should be amqp.Delivery

redis args[0] should be ...

type Endpoint

type Endpoint struct {
	Protocol EndpointProtocol
	Original string
	Redis    struct {
		Addr     string
		Password string
		Channels []string
	}
	AMQP struct {
		URI          string
		ExchangeName string
		QueueName    string
		RouteKey     []string
		Ack          bool
		Exclusive    bool
		Type         string
	}
}

Endpoint represents an endpoint

type EndpointProtocol

type EndpointProtocol string

EndpointProtocol is the type of protocol that the endpoint represents.

type RedisSubscriber

type RedisSubscriber struct {
	*Endpoint
	// contains filtered or unexported fields
}

RedisSubscriber represents a subscriber, which consumes messages from redis

func (*RedisSubscriber) Close

func (sub *RedisSubscriber) Close()

Close closes the subscriber gracefully, it blocks until all messages are finished

func (*RedisSubscriber) Run

func (sub *RedisSubscriber) Run()

Run starts the subscriber and blocks until the subscriber is closed

type Setup

type Setup struct {
	ActionFunc ActionFunc
	URL        string
}

type Subscriber

type Subscriber interface {
	Run()
	Close()
}

type SubscriberManager

type SubscriberManager struct {
	// contains filtered or unexported fields
}

SubscriberManager is a manager to control subscribers

Example
logger := logrus.New()
subMgr := NewSubscriberManager(logger)
subMgr.Register(
	"TestAMQPSubscriber",
	&Setup{
		URL: "amqp://root:root@rabbitmq:5672/test.amqp.exchange1/test.amqp.queue1?route=foo&route=bar&ack=false&type=direct",
		ActionFunc: func(args ...interface{}) {
			delivery := args[0].(amqp.Delivery)
			delivery.Ack(false)
		},
	},
)
subMgr.Register(
	"TestRedisSubscriber",
	&Setup{
		URL: "redis://:password@redis:6379/?channel=foo&channel=bar",
		ActionFunc: func(args ...interface{}) {
			message := args[0].(*redis.Message).Payload
			fmt.Println(message)
		},
	},
)
subMgr.Run()

// Stop the subscribers
subMgr.GracefulStop()
Output:

func NewSubscriberManager

func NewSubscriberManager(log logger) *SubscriberManager

NewSubscriberManager creates a mangager

func (*SubscriberManager) GracefulStop

func (sm *SubscriberManager) GracefulStop()

GracefulStop stops the manager gracefully. It stops the subscribers from accepting new messages and blocks until all the pending messages are finished.

func (*SubscriberManager) Register

func (sm *SubscriberManager) Register(name string, setup *Setup) error

func (*SubscriberManager) Run

func (sm *SubscriberManager) Run()

Run starts the subscribers that the manager controls

func (*SubscriberManager) Validate

func (sm *SubscriberManager) Validate(url string) error

Validate validates if a url is valid

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL