broker

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2021 License: MIT Imports: 10 Imported by: 3

Documentation

Overview

Package broker defines standard interface for a message broker.

Index

Constants

View Source
const (
	// ContentType is header key for content type.
	ContentType = "content-type"
	// MessageType is header key for type of message's body.
	MessageType = "message-type"
)

Variables

View Source
var File_broker_proto protoreflect.FileDescriptor

Functions

func GetMessageType added in v0.2.0

func GetMessageType(v interface{}) string

GetMessageType return full type name of the given value without pointer indicator (*).

Types

type Broker

type Broker interface {
	// Open establish connection to the target server.
	Open(ctx context.Context) error
	// Publish publish the message to the target topic.
	Publish(ctx context.Context, topic string, m *Message, opts ...PublishOption) error
	// Subscribe subscribe to the topic to consume messages.
	Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
	// Close flush all in-flight messages and close underlying connection.
	// Close allows a context to control the duration
	// of a flush/close call. This context should be non-nil.
	// If a deadline is not set, a default deadline of 5s will be applied.
	Close(context.Context) error
}

Broker is an interface used for asynchronous messaging.

type Event

type Event interface {
	Topic() string
	Message() *Message
	Ack() error
}

Event is given to a subscription handler for processing

type Handler

type Handler = func(Event) error

Handler is used to process messages via a subscription of a topic. The handler is passed a publication interface which contains the message and optional Ack method to acknowledge receipt of the message.

type Message

type Message struct {
	Header map[string]string `` /* 153-byte string literal not displayed */
	Body   []byte            `protobuf:"bytes,2,opt,name=body,proto3" json:"body,omitempty"`
	// contains filtered or unexported fields
}

func Must added in v0.2.1

func Must(m *Message, err error) *Message

Must panics if the given err is not nil.

func NewMessage added in v0.2.0

func NewMessage(message interface{}, contentType string, headers ...string) (*Message, error)

NewMessage create new message from the given information. Message type will be automatically retrieved. ContentType is codec name: json, proto,... which is already registered in advance via encoding.RegisterCodec.

func (*Message) Descriptor deprecated

func (*Message) Descriptor() ([]byte, []int)

Deprecated: Use Message.ProtoReflect.Descriptor instead.

func (*Message) GetBody

func (x *Message) GetBody() []byte

func (*Message) GetContentType added in v0.2.0

func (x *Message) GetContentType() string

GetContentType return content type configured in the message's header. Default to be json.

func (*Message) GetHeader

func (x *Message) GetHeader() map[string]string

func (*Message) GetMessageType added in v0.2.0

func (x *Message) GetMessageType() string

GetMessageType return message type configured in the message's header. Otherwise return empty string.

func (*Message) MarshalToBody added in v0.2.0

func (x *Message) MarshalToBody(v interface{}) error

MarshalToBody marshal the given value to the message body based on the registered content-type and the registered codec. Use json codec for marshal if content-type is empty.

func (*Message) ProtoMessage

func (*Message) ProtoMessage()

func (*Message) ProtoReflect added in v0.1.8

func (x *Message) ProtoReflect() protoreflect.Message

func (*Message) Reset

func (x *Message) Reset()

func (*Message) String

func (x *Message) String() string

func (*Message) UnmarshalBodyTo added in v0.2.0

func (x *Message) UnmarshalBodyTo(v interface{}) error

UnmarshalBodyTo try to unmarshal the body of the message to the given pointer based on the content-type in the message's header. Use json codec for unmarshal if content-type is empty.

type PublishOption

type PublishOption func(*PublishOptions)

PublishOption is a func for config publish options.

type PublishOptions

type PublishOptions struct {
}

PublishOptions is a configuration holder for publish options.

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

SubscribeOption is a func for config subscription.

func DisableAutoAck

func DisableAutoAck() SubscribeOption

DisableAutoAck will disable auto ack of messages after they have been handled.

func Queue

func Queue(name string) SubscribeOption

Queue sets the name of the queue to share messages on

type SubscribeOptions

type SubscribeOptions struct {
	// AutoAck defaults to true. When a handler returns
	// with a nil error the message is acked.
	AutoAck bool
	// Subscribers with the same queue name
	// will create a shared subscription where each
	// receives a subset of messages.
	Queue string
}

SubscribeOptions is a configuration holder for subscriptions.

func (*SubscribeOptions) Apply

func (op *SubscribeOptions) Apply(opts ...SubscribeOption)

Apply apply the options.

type Subscriber

type Subscriber interface {
	Topic() string
	Unsubscribe() error
}

Subscriber is a convenience return type for the Subscribe method

Directories

Path Synopsis
Package memory provides a message broker using memory.
Package memory provides a message broker using memory.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL