deliver

package module
v0.0.0-...-7cc7cea Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 5, 2019 License: MIT Imports: 7 Imported by: 0

README

Build Status codecov Documentation

deliver

go get -u github.com/tomwright/deliver

Publish + consume messages with standard interfaces.

Quick Start

This quick start assumes you already have your Subscriber or Producer created already.

For information on creating those objects, see Implementations below.

Message

Below is an example message that may be published when a user has been created.

type UserCreated struct {
	Username string `json:"username"`
}

func (m *UserCreated) Type() string {
	return "user.created"
}

func (m *UserCreated) Payload() ([]byte, error) {
	return json.Marshal(m)
}

func (m *UserCreated) WithPayload(bytes []byte) error {
	return json.Unmarshal(bytes, m)
}

Publishing

Subscribing

Implementations

Kafka

  • Message.Type() response is used as the topic.
  • Messages are marked before the given ConsumerFn is executed.

Setup:

brokers := []string{"cmg-local-kafka:9092"}
publisher, err := deliver.NewKafkaPublisher(brokers)
subscriber := deliver.NewKafkaSubscriber(brokers)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SubscribeNonBlocking

func SubscribeNonBlocking(ctx context.Context, options SubscribeOptions, subscriber Subscriber, wg *sync.WaitGroup, consumerCount int) chan error

SubscribeNonBlocking allows you to start many consumers with the same SubscribeOptions, in a non-blocking way. If nothing is reading from errChan this function will be blocked and you will not be notified if any consumers weren't started.

Types

type ConsumeFn

type ConsumeFn func(messageType string, messageBytes []byte) error

ConsumeFn is a function to handle a consumed message.

type Message

type Message interface {
	// Type returns the type of the message.
	Type() string

	// Payload returns the message payload.
	Payload() ([]byte, error)

	// WithPayload validates and sets the given payload on the message.
	WithPayload(payload []byte) error
}

Message is a message that can be published and consumed.

type ObservedPublisher

type ObservedPublisher struct {
	// contains filtered or unexported fields
}

ObservedPublisher can be used to test the way an application publishes messages.

func NewObservedPublisher

func NewObservedPublisher() *ObservedPublisher

NewObservedPublisher returns a new publisher that can be used when testing.

func (*ObservedPublisher) Clear

func (p *ObservedPublisher) Clear() error

Clear clears out any stored messages.

func (*ObservedPublisher) Close

func (p *ObservedPublisher) Close() error

Close performs no action.

func (*ObservedPublisher) Messages

func (p *ObservedPublisher) Messages() []Message

Messages returns all stored messages.

func (*ObservedPublisher) Publish

func (p *ObservedPublisher) Publish(m Message) error

Publish stores the given message internally so it can be retrieved later on.

type Publisher

type Publisher interface {
	// Publish publishes the given message.
	// If an error is returned, the message has not been published.
	Publish(m Message) error

	// Close closes any open connections.
	Close() error
}

Publisher can be used to publish messages.

func NewKafkaPublisher

func NewKafkaPublisher(brokerList []string) (Publisher, error)

NewKafkaPublisher returns a Publisher that will publish messages to kafka.

type SubscribeOptions

type SubscribeOptions struct {
	// ConsumeFn is the function to handle the consumed messages.
	ConsumeFn ConsumeFn
	// A message will only be consumed once per group.
	Group string
	// Types is the set of messages types to subscribe the ConsumeFn to.
	Types []string
	// IgnoreErrors defines whether or not errors returned from ConsumeFn will be written to Errors.
	// If this is false, a value must be provided for Errors.
	IgnoreErrors bool
	// Errors will receive any errors returned from ConsumeFn, if IgnoreErrors is false.
	Errors chan<- error
}

SubscribeArgs contains a set of arguments used when Subscribing to Messages.

func (*SubscribeOptions) Validate

func (x *SubscribeOptions) Validate() error

Validate makes sure we have a set of valid options and applies defaults.

type Subscriber

type Subscriber interface {
	// Subscribe starts a consumer with the given context.
	//
	// If an error is returned then the consumer has not been started, otherwise you should listen
	// on the errChan and handle any consumer errors.
	//
	// The consumer will be stopped when the given context is cancelled.
	Subscribe(ctx context.Context, options SubscribeOptions) error
}

Subscriber defines an interface that can be used to consume messages.

func NewKafkaSubscriber

func NewKafkaSubscriber(brokers []string) Subscriber

NewKafkaSubscriber returns a Subscriber that will consume from kafka.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL