bus

package module
v0.0.0-...-9eaefed Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2017 License: MIT Imports: 16 Imported by: 0

README

Event Bus NSQ

  • A tiny wrapper around go-nsq topic and channel.

Installation

go get -u github.com/rafaeljesus/nsq-event-bus

Usage

The nsq-event-bus package exposes a interface for emitting and listening events.

Emitter

import "github.com/rafaeljesus/nsq-event-bus"

topic := "events"
emitter, err := bus.NewEmitter(EmitterConfig{
  Address: "localhost:4150",
  MaxInFlight: 25,
})

e := event{}
if err = emitter.Emit(topic, &e); err != nil {
  // handle failure to emit message
}

// emitting messages on a async fashion
if err = emitter.EmitAsync(topic, &e); err != nil {
  // handle failure to emit message
}

Listener

import "github.com/rafaeljesus/nsq-event-bus"

if err = bus.On(bus.ListenerConfig{
  Topic:              "topic",
  Channel:            "test_on",
  HandlerFunc:        handler,
  HandlerConcurrency: 4,
}); err != nil {
  // handle failure to listen a message
}

func handler(message *Message) (reply interface{}, err error) {
  e := event{}
  if err = message.DecodePayload(&e); err != nil {
    message.Finish()
    return
  }

  if message.Attempts > MAX_DELIVERY_ATTEMPTS {
    message.Finish()
    return
  }

  err, _ = doWork(&e)
  if err != nil {
    message.Requeue(BACKOFF_TIME)
    return
  }

  message.Finish()
  return
}

Request (Request/Reply)

import "github.com/rafaeljesus/nsq-event-bus"

topic := "user_signup"
emitter, err = bus.NewEmitter(bus.EmitterConfig{})

e := event{Login: "rafa", Password: "ilhabela_is_the_place"}
if err = bus.Request(topic, &e, handler); err != nil {
  // handle failure to listen a message
}

func handler(message *Message) (reply interface{}, err error) {
  e := event{}
  if err = message.DecodePayload(&e); err != nil {
    message.Finish()
    return
  }

  reply = &Reply{}
  message.Finish()
  return
}

Contributing

  • Fork it
  • Create your feature branch (git checkout -b my-new-feature)
  • Commit your changes (git commit -am 'Add some feature')
  • Push to the branch (git push origin my-new-feature)
  • Create new Pull Request

Badges

Build Status Go Report Card Go Doc


GitHub @rafaeljesus  ·  Medium @_jesus_rafael  ·  Twitter @_jesus_rafael

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrTopicRequired is returned when topic is not passed as parameter.
	ErrTopicRequired = errors.New("topic is mandatory")
	// ErrHandlerRequired is returned when handler is not passed as parameter.
	ErrHandlerRequired = errors.New("handler is mandatory")
	// ErrChannelRequired is returned when channel is not passed as parameter in bus.ListenerConfig.
	ErrChannelRequired = errors.New("channel is mandatory")
)
View Source
var (
	ErrNoEventListeners      = errors.New("At least one eventListener is required")
	ErrServiceAlreadyRunning = errors.New("Service is already running")
)
View Source
var ErrTimeoutOccurred error = errors.New("Timeout reached")

Functions

func On

func On(lc ListenerConfig) (err error)

On listen to a message from a specific topic using nsq consumer, returns an error if topic and channel not passed or if an error occurred while creating nsq consumer.

Types

type Emitter

type Emitter interface {
	Emit(topic string, payload interface{}) error
	EmitAsync(topic string, payload interface{}) error
	EmitBulkAsync(topic string, payload []interface{}) error
	Request(topic string, payload interface{}, handler HandlerFunc) error
	EmitAndWaitForResultWithTimeout(topic string, payload interface{}, timeoutDuration time.Duration) (interface{}, error)
}

Emitter exposes a interface for emitting and listening for events.

func NewEmitter

func NewEmitter(ec EmitterConfig) (emitter Emitter, err error)

NewEmitter returns a new eventEmitter configured with the variables from the config parameter, or returning an non-nil err if an error occurred while creating nsq producer.

type EmitterConfig

type EmitterConfig struct {
	Address                 string
	DialTimeout             time.Duration
	ReadTimeout             time.Duration
	WriteTimeout            time.Duration
	LocalAddr               net.Addr
	LookupdPollInterval     time.Duration
	LookupdPollJitter       float64
	MaxRequeueDelay         time.Duration
	DefaultRequeueDelay     time.Duration
	BackoffStrategy         nsq.BackoffStrategy
	MaxBackoffDuration      time.Duration
	BackoffMultiplier       time.Duration
	MaxAttempts             uint16
	LowRdyIdleTimeout       time.Duration
	RDYRedistributeInterval time.Duration
	ClientID                string
	Hostname                string
	UserAgent               string
	HeartbeatInterval       time.Duration
	SampleRate              int32
	TLSV1                   bool
	TLSConfig               *tls.Config
	Deflate                 bool
	DeflateLevel            int
	Snappy                  bool
	OutputBufferSize        int64
	OutputBufferTimeout     time.Duration
	MaxInFlight             int
	MsgTimeout              time.Duration
	AuthSecret              string
}

EmitterConfig carries the different variables to tune a newly started emitter, it exposes the same configuration available from official nsq go client.

type EventListener

type EventListener struct {
	*nsq.Consumer
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(topic, channel string, worker HandlerFunc) *EventListener

func (*EventListener) AddMiddleware

func (el *EventListener) AddMiddleware(fn Middleware)

func (*EventListener) SetConcurrency

func (el *EventListener) SetConcurrency(concurrency int)

func (*EventListener) SetMaxAttempts

func (el *EventListener) SetMaxAttempts(attempts uint16)

func (*EventListener) SetMaxInFlight

func (el *EventListener) SetMaxInFlight(maxInFlight int)

func (*EventListener) Start

func (el *EventListener) Start(environment string) (*nsq.Consumer, error)

type HandlerFunc

type HandlerFunc func(m *Message) (interface{}, error)

type ListenerConfig

type ListenerConfig struct {
	Topic                   string
	Channel                 string
	Lookup                  []string
	HandlerFunc             HandlerFunc
	HandlerConcurrency      int
	DialTimeout             time.Duration
	ReadTimeout             time.Duration
	WriteTimeout            time.Duration
	LocalAddr               net.Addr
	LookupdPollInterval     time.Duration
	LookupdPollJitter       float64
	MaxRequeueDelay         time.Duration
	DefaultRequeueDelay     time.Duration
	BackoffStrategy         nsq.BackoffStrategy
	MaxBackoffDuration      time.Duration
	BackoffMultiplier       time.Duration
	MaxAttempts             uint16
	LowRdyIdleTimeout       time.Duration
	RDYRedistributeInterval time.Duration
	ClientID                string
	Hostname                string
	UserAgent               string
	HeartbeatInterval       time.Duration
	SampleRate              int32
	TLSV1                   bool
	TLSConfig               *tls.Config
	Deflate                 bool
	DeflateLevel            int
	Snappy                  bool
	OutputBufferSize        int64
	OutputBufferTimeout     time.Duration
	MaxInFlight             int
	MsgTimeout              time.Duration
	AuthSecret              string
}

ListenerConfig carries the different variables to tune a newly started consumer, it exposes the same configuration available from official nsq go client.

type Message

type Message struct {
	*nsq.Message
	ReplyTo string
	Payload []byte
	Context context.Context `json:"-"`
}

Message carries nsq.Message fields and methods and adds extra fields for handling messages internally.

func NewMessage

func NewMessage(payload []byte, replyTo string) *Message

NewMessage returns a new bus.Message.

func (*Message) DecodePayload

func (m *Message) DecodePayload(v interface{}) (err error)

DecodePayload deserializes data (as []byte) and creates a new struct passed by parameter.

type Middleware

type Middleware func(HandlerFunc) HandlerFunc

type Service

type Service struct {
	// contains filtered or unexported fields
}

func NewService

func NewService() *Service

func (*Service) AddListener

func (s *Service) AddListener(el *EventListener)

func (*Service) AddNSQD

func (s *Service) AddNSQD(nsqd string)

func (*Service) AddNSQLookupd

func (s *Service) AddNSQLookupd(nsqLookupd string)

func (*Service) SetEnvironment

func (s *Service) SetEnvironment(environment string)

func (*Service) Start

func (s *Service) Start() error

func (*Service) Stop

func (s *Service) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL