message

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 12, 2018 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrAlreadyAcked  = errors.New("message already acked")
	ErrAlreadyNacked = errors.New("message already nacked")
)

Functions

This section is empty.

Types

type HandlerFunc

type HandlerFunc func(msg *Message) ([]*Message, error)

type HandlerMiddleware

type HandlerMiddleware func(h HandlerFunc) HandlerFunc

type Message

type Message struct {
	UUID string // todo - change to []byte?, change to type

	Metadata Metadata

	Payload Payload
	// contains filtered or unexported fields
}

func NewMessage

func NewMessage(uuid string, payload Payload) *Message

func (*Message) Ack

func (m *Message) Ack() error

func (*Message) Acked

func (m *Message) Acked() <-chan struct{}

func (*Message) Nack

func (m *Message) Nack() error

func (*Message) Nacked

func (m *Message) Nacked() <-chan struct{}

type Messages

type Messages []*Message

func (Messages) IDs

func (m Messages) IDs() []string

type Metadata

type Metadata map[string]string

func (Metadata) Get

func (m Metadata) Get(key string) string

func (Metadata) Set

func (m Metadata) Set(key, value string)

type Payload

type Payload []byte

type PubSub

type PubSub interface {
	Close() error
	// contains filtered or unexported methods
}

func NewPubSub

func NewPubSub(publisher Publisher, subscriber Subscriber) PubSub

type Publisher

type Publisher interface {
	Close() error
	// contains filtered or unexported methods
}

type Router

type Router struct {
	// contains filtered or unexported fields
}

func NewRouter

func NewRouter(config RouterConfig, logger watermill.LoggerAdapter) (*Router, error)

func (*Router) AddHandler

func (r *Router) AddHandler(
	handlerName string,
	subscribeTopic string,
	publishTopic string,
	pubSub PubSub,
	handlerFunc HandlerFunc,
) error

func (*Router) AddMiddleware

func (r *Router) AddMiddleware(m ...HandlerMiddleware)

AddMiddleware adds a new middleware to the router.

The order of middlewares matters. Middleware added at the beginning is executed first.

func (*Router) AddNoPublisherHandler

func (r *Router) AddNoPublisherHandler(
	handlerName string,
	subscribeTopic string,
	subscriber Subscriber,
	handlerFunc HandlerFunc,
) error

func (*Router) AddPlugin

func (r *Router) AddPlugin(p ...RouterPlugin)

func (*Router) Close

func (r *Router) Close() error

func (*Router) Logger

func (r *Router) Logger() watermill.LoggerAdapter

func (*Router) Run

func (r *Router) Run() (err error)

func (*Router) Running

func (r *Router) Running() chan struct{}

Running is closed when router is running. In other words: you can wait till router is running using

<- r.Running()

type RouterConfig

type RouterConfig struct {
	ServerName string

	CloseTimeout time.Duration
}

func (RouterConfig) Validate

func (c RouterConfig) Validate() error

type RouterPlugin

type RouterPlugin func(*Router) error

type Subscriber

type Subscriber interface {
	Close() error
	// contains filtered or unexported methods
}

Directories

Path Synopsis
router

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL