nsq-event-bus: github.com/rafaeljesus/nsq-event-bus Index | Files

package bus

import "github.com/rafaeljesus/nsq-event-bus"

Index

Package Files

config.go emitter.go listener.go message.go

Variables

var (
    // ErrTopicRequired is returned when topic is not passed as parameter.
    ErrTopicRequired = errors.New("topic is mandatory")
    // ErrHandlerRequired is returned when handler is not passed as parameter.
    ErrHandlerRequired = errors.New("handler is mandatory")
    // ErrChannelRequired is returned when channel is not passed as parameter in bus.ListenerConfig.
    ErrChannelRequired = errors.New("channel is mandatory")
)

func On Uses

func On(lc ListenerConfig) error

On listen to a message from a specific topic using nsq consumer, returns an error if topic and channel not passed or if an error occurred while creating nsq consumer.

type Breaker Uses

type Breaker struct {
    // Interval is the cyclic period of the closed state for CircuitBreaker to clear the internal counts,
    // If Interval is 0, CircuitBreaker doesn't clear the internal counts during the closed state.
    Interval time.Duration
    // Timeout is the period of the open state, after which the state of CircuitBreaker becomes half-open.
    // If Timeout is 0, the timeout value of CircuitBreaker is set to 60 seconds.
    Timeout time.Duration
    // Threshold when a threshold of failures has been reached, future calls to the broker will not run.
    // During this state, the circuit breaker will periodically allow the calls to run and, if it is successful,
    // will start running the function again. Default value is 5.
    Threshold uint32
    // OnStateChange is called whenever the state of CircuitBreaker changes.
    OnStateChange func(name, from, to string)
}

Breaker carries the configuration for circuit breaker

type Emitter Uses

type Emitter struct {
    // contains filtered or unexported fields
}

Emitter is the emitter wrapper over nsq.

func NewEmitter Uses

func NewEmitter(ec EmitterConfig) (*Emitter, error)

NewEmitter returns a new Emitter configured with the variables from the config parameter, or returning an non-nil err if an error occurred while creating nsq producer.

func (*Emitter) Emit Uses

func (e *Emitter) Emit(topic string, payload interface{}) error

Emit emits a message to a specific topic using nsq producer, returning an error if encoding payload fails or if an error occurred while publishing the message.

func (*Emitter) EmitAsync Uses

func (e *Emitter) EmitAsync(topic string, payload interface{}) error

Emit emits a message to a specific topic using nsq producer, but does not wait for the response from `nsqd`. Returns an error if encoding payload fails and logs to console if an error occurred while publishing the message.

func (*Emitter) Request Uses

func (e *Emitter) Request(topic string, payload interface{}, handler HandlerFunc) error

Request a RPC like method which implements request/reply pattern using nsq producer and consumer. Returns an non-nil err if an error occurred while creating or listening to the internal reply topic or encoding the message payload fails or while publishing the message.

type EmitterConfig Uses

type EmitterConfig struct {
    Address                 string
    DialTimeout             time.Duration
    ReadTimeout             time.Duration
    WriteTimeout            time.Duration
    LocalAddr               net.Addr
    LookupdPollInterval     time.Duration
    LookupdPollJitter       float64
    MaxRequeueDelay         time.Duration
    DefaultRequeueDelay     time.Duration
    BackoffStrategy         nsq.BackoffStrategy
    MaxBackoffDuration      time.Duration
    BackoffMultiplier       time.Duration
    MaxAttempts             uint16
    LowRdyIdleTimeout       time.Duration
    RDYRedistributeInterval time.Duration
    ClientID                string
    Hostname                string
    UserAgent               string
    HeartbeatInterval       time.Duration
    SampleRate              int32
    TLSV1                   bool
    TLSConfig               *tls.Config
    Deflate                 bool
    DeflateLevel            int
    Snappy                  bool
    OutputBufferSize        int64
    OutputBufferTimeout     time.Duration
    MaxInFlight             int
    MsgTimeout              time.Duration
    AuthSecret              string
    // Breaker circuit breaker configuration
    Breaker
}

EmitterConfig carries the different variables to tune a newly started emitter, it exposes the same configuration available from official nsq go client.

type HandlerFunc Uses

type HandlerFunc func(m *Message) (interface{}, error)

HandlerFunc is the handler function to handle the massage.

type ListenerConfig Uses

type ListenerConfig struct {
    Topic                   string
    Channel                 string
    Lookup                  []string
    HandlerFunc             HandlerFunc
    HandlerConcurrency      int
    DialTimeout             time.Duration
    ReadTimeout             time.Duration
    WriteTimeout            time.Duration
    LocalAddr               net.Addr
    LookupdPollInterval     time.Duration
    LookupdPollJitter       float64
    MaxRequeueDelay         time.Duration
    DefaultRequeueDelay     time.Duration
    BackoffStrategy         nsq.BackoffStrategy
    MaxBackoffDuration      time.Duration
    BackoffMultiplier       time.Duration
    MaxAttempts             uint16
    LowRdyIdleTimeout       time.Duration
    RDYRedistributeInterval time.Duration
    ClientID                string
    Hostname                string
    UserAgent               string
    HeartbeatInterval       time.Duration
    SampleRate              int32
    TLSV1                   bool
    TLSConfig               *tls.Config
    Deflate                 bool
    DeflateLevel            int
    Snappy                  bool
    OutputBufferSize        int64
    OutputBufferTimeout     time.Duration
    MaxInFlight             int
    MsgTimeout              time.Duration
    AuthSecret              string
}

ListenerConfig carries the different variables to tune a newly started consumer, it exposes the same configuration available from official nsq go client.

type Message Uses

type Message struct {
    *nsq.Message
    ReplyTo string
    Payload []byte
}

Message carries nsq.Message fields and methods and adds extra fields for handling messages internally.

func NewMessage Uses

func NewMessage(p []byte, r string) *Message

NewMessage returns a new bus.Message.

func (*Message) DecodePayload Uses

func (m *Message) DecodePayload(v interface{}) (err error)

DecodePayload deserializes data (as []byte) and creates a new struct passed by parameter.

Package bus imports 14 packages (graph). Updated 2019-03-27. Refresh now. Tools for package owners. This is an inactive package (no imports and no commits in at least two years).