consumer

package
v0.0.0-...-b56df8a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 24, 2017 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

GoogleCloudPubsub is a package that aims to give you easy to test GooglePubsub Consumer interface

## Usage

First of all, you need a Handler struct that will implement the business logic for the Gcloud pubsub subscription consuming:

type ExampleHandler struct {
	// you can use use fields to store messages if you want for example bulk processing
	// messages []consumer.Message
}

// HandleMessage method will be called after a message had beed fetched from the pubsub.
// return error will Nack the message
func (eh *ExampleHandler) HandleMessage(msg consumer.Message) error {
	// eh.messages = append(eh.messages, msg)
	// single element processing can be implemented here
	return nil
}

// Finish method will be called before all messages should be acked
// With this method, you can do bulk actions after the HandleMessage Collected all the elements
func (eh *ExampleHandler) Finish() error {
	defer eh.wg.Done()
	// heavy bulk actions can be implemented here
	return nil
}

// The return value should be the consumer.Handler interface,
// not the actual struct pointer
func NewExampleHandler() consumer.Handler {
	return &ExampleHandler{}
}

Now with your new fancy struct and with it's constructor function, you can begin to use the Consumer

ctx := context.Background()
c := consumer.New(ctx, "example-subscription-name", NewExampleHandler)

If you want to specify further options for the consumer, you can do so with option setters.

ctx := context.Background()
cons := consumer.New(ctx, "example", NewExampleHandler,
		// you can configure the new consumer to use given amount of BatchSize
		// This is the amount that will be passed for the HandleMessage method for a single Handler object
		consumer.SetBatchSizeTo(amount),

		// This will set the Google Pubsub Message Iterators MaxExtensionDuration
		consumer.SetMaxExtensionDurationTo(10*time.Minute),

		// This will configure the consumer to how manny parallel worker should pull from the subscription
		consumer.SetWorkersCountTo(runtime.NumCPU()))

## Testing

When You test your application, Before the Consumer is being initialized, you should turn on Mock mod. When Mock mod enabled, not the original but a Mock consumer will be created when the New method called. It's behavior is alike, but remove the Dependency to use Google Pubsub Emulated Host, and increase the speed for your tests.

Make even the Benchmarking more valuable

func TestConsumerMockingAllPerfect(t *testing.T) {
	consumer.TurnMockModOn()
	defer consumer.TurnMockModOff()

	// consumer creation is the same, and not required to be happen here,
	// this is just an example , that it should be created after the mock mod enabled
	ctx := context.Background()
	c := consumer.New(ctx, "example-subscription-name", NewExampleHandler)
	c.Start()
	defer c.Stop()

	// And this is how you Send Messages to the Mock Consumer
	consumer.MockMessageFeeder["example-subscription-name"] <- []byte(`Hello World!`)

	// super complex business logic testing here

}

Index

Constants

View Source
const (
	// DefaultWorkerCount used to determine the default worker count on start
	DefaultWorkerCount = 1

	// DefaultBatchSize used to determine the default message batch and prefetch count
	DefaultBatchSize = 500
)

Variables

View Source
var MockMessageFeeder map[string]chan []byte

MockMessageFeeder is a channel that can take []byte messages that will be feeded to the mock consumers

Functions

func TurnMockModOff

func TurnMockModOff()

TurnMockModOff will disable the consumer New method to return with Mock struct instead of the real one. This is only for testing purpose!

func TurnMockModOn

func TurnMockModOn()

TurnMockModOn will enable the consumer New method to return with Mock struct instead of the real one. This is only for testing purpose!

Types

type Consumer

type Consumer interface {
	Start()
	Stop()
}

Consumer is a struct that can pull messages from a subscription, and pass values to a handler that can be constucted with a given handlerConstructor

Handler must implement the HandlerConstructor interface and the return value must implement the Handler interface

func New

func New(parent context.Context, subscriptionName string, handlerConstructor HandlerConstructor, opts ...Options) Consumer

New will create a new consumer structure that can handle work

type Handler

type Handler interface {
	HandleMessage(Message) error
	Finish() error
}

Handler is the public interface that must be implemented when using the Consumer object With that the code that use the Consumer can focuse on the business logic instead of the pubsub implementation

type HandlerConstructor

type HandlerConstructor func() Handler

HandlerConstructor function is the required function, that can return a new Handler object that will consume messages during the processing

type Message

type Message interface {
	Done(bool)
	Data() []byte
}

Message is the interface that must be implemented by the Handler HandleMessage method

type MessageWrapper

type MessageWrapper struct {
	// contains filtered or unexported fields
}

MessageWrapper is the wrapper struct for *pubsub.Message so it can be easily tested without the need of the Proxy Server

func (*MessageWrapper) Data

func (mw *MessageWrapper) Data() []byte

Data is the proxy function to retrive *pubsub.Message.Data content

func (*MessageWrapper) Done

func (mw *MessageWrapper) Done(ackType bool)

Done is the proxy function for *pubsub.Message object

type Options

type Options interface {
	Configure(config *config)
}

Options is the public interface when working with options

func SetBatchSizeTo

func SetBatchSizeTo(count int) Options

SetBatchSizeTo will return an options object that can configure the prefetch and message bulk size for the Consumer This is the amount that will be passed for the HandleMessage method for a single Handler object before calling Finish

func SetMaxExtensionDurationTo

func SetMaxExtensionDurationTo(maxExtension time.Duration) Options

SetMaxExtensionDurationTo will return an option that can set the maxExtension duration for google pubsub message iterator

func SetWorkersCountTo

func SetWorkersCountTo(count int) Options

SetWorkersCountTo will return an options object that can configure the workers count for a Consumer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL