bus

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 5, 2023 License: MIT Imports: 7 Imported by: 39

README

Bus

Implemented buses are a little more than a simple integration with a bus technology.

Namely, a bus implementation should make sure:

  • reading a task is lazy loaded
  • lazy loading produces no side effects

Lazy Loading

Many messaging technologies want high throughput and will greedily retrieve the next message from a topic. Task bus readers need to make sure that only a single task message is retrieved at a time, even if this means disconnecting and reconnecting from the messaging technology each time a new task is requested. Remember that in batch processing the bottleneck is not fetching in the next task but rather completing the task itself. For high throughput tasks it is recommended to use multiple instances of the worker.

Test Cases

  • Consumer messages are lazy loaded
  • Stop() can be called even when multiple calls are waiting for tasks
  • Stop() is non-blocking (even with outstanding Msg() or Send() requests)
  • Stop() is safe to call multiple times
  • Msg() is safe to call even after calling Stop()
  • Stop() is safe to call even if Msg() or Send() have not been called yet
  • Once consumer returns done=true then subsequent calls to Msg() should return done=true.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Topics added in v0.5.0

func Topics(opt *Options) (topics []string, err error)

Topics returns a list of the topics for the given bus

Types

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

Bus combines a consumer and producer into a single struct and implements both the Consumer and Producer interfaces.

Bus is the same as getting a Consumer and Producer separately but having them available in a single object.

Calling Stop() will stop the producer first then the consumer.

func NewBus

func NewBus(opt *Options) (*Bus, error)

NewBus returns an instance of Bus.

func (*Bus) Msg

func (b *Bus) Msg() (msg []byte, done bool, err error)

func (*Bus) Send

func (b *Bus) Send(topic string, msg []byte) error

func (*Bus) Stop

func (b *Bus) Stop() error

type Consumer

type Consumer interface {
	// Msg returns the bus msg bytes. If msg is
	// known to be the last then done should be true. msg may be
	// nil and done can be true. err should never be
	// io.EOF.
	//
	// Once the last message has been received, subsequent
	// calls to Msg should not block and always return
	// msg == nil (or len == 0), done == true and err == nil.
	//
	// A call to Msg should block until either a msg
	// is received or Stop has been called.
	//
	// Once Stop has been called subsequent calls to Msg
	// should not block and immediately return with
	// msg == nil (or len == 0), done == true and err == nil.
	Msg() (msg []byte, done bool, err error)
	Stop() error
	Info() info.Consumer
}

func NewConsumer

func NewConsumer(opt *Options) (Consumer, error)

NewConsumer creates a bus consumer from BusConfig.

type Options

type Options struct {
	// Possible Values:
	// - "stdio" (generic stdin, stdout)
	// - "stdin" (for consumer)
	// - "stdout" (for producer)
	// - "stderr" (for producer)
	// - "null" (for producer)
	// - "file"
	// - "nsq"
	// = "pubsub"
	// - "nop" - no-operation bus for testing
	Bus    string `toml:"bus" comment:"task message bus (nsq, pubsub, file, stdio)"`
	InBus  string `toml:"in_bus" commented:"true" comment:"set a different consumer bus type than producer (nsq, pubsub, file, stdin)"`
	OutBus string `` /* 131-byte string literal not displayed */

	// consumer topic and channel
	InTopic   string `toml:"in_topic" commented:"true" comment:"for file bus in_topic is a file name"`
	InChannel string `toml:"in_channel" commented:"true" comment:"for pubsub this is the subscription name"`

	// for "nsq" bus type
	NSQdHosts    []string `toml:"nsqd_hosts" commented:"true" comment:"ndqd host names for producer or consumer"`
	LookupdHosts []string `toml:"lookupd_hosts" commented:"true" comment:"nsq lookupd host names consumer only"`

	// for "pubsub" bus type
	PubsubHost string `toml:"pubsub_host" commented:"true" comment:"pubsub host only for emulator"`
	ProjectID  string `toml:"project_id" commented:"true" comment:"pubsub goolge project name"`
	JSONAuth   string `toml:"json_auth" commented:"true" comment:"pubsub json data for authentication"`

	// NopMock for "nop" bus type,
	// Can be set in order to
	// mock various return scenarios.
	//
	// Supported Values:
	// - "init_err" - returns err on initialization: either NewProducer or NewConsumer
	// - "err" - every method returns an error
	// - "send_err" - returns err when Producer.Send() is called.
	// - "msg_err" - returns err on Consumer.Msg() call.
	// - "msg_done" - returns a nil task message done=true on Consumer.Msg() call.
	// - "msg_msg_done" - returns a non-nil task message and done=true Consumer.Msg() call.
	// - "stop_err" - returns err on Stop() method call
	NopMock string `toml:"-"`
}

Options is a general config struct that provides all potential config values for all bus types.

func NewOptions

func NewOptions(bus string) *Options

type Producer

type Producer interface {
	Send(topic string, msg []byte) error
	Stop() error
	Info() info.Producer
}

func NewProducer

func NewProducer(opt *Options) (Producer, error)

NewProducer creates a bus producer from Option.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL