broker

package
v1.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2024 License: MIT Imports: 13 Imported by: 34

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultCodec encoding.Codec = nil
)

Functions

func Marshal

func Marshal(codec encoding.Codec, msg Any) ([]byte, error)

func Unmarshal

func Unmarshal(codec encoding.Codec, inputData []byte, outValue interface{}) error

Types

type Any

type Any interface{}

type Binder

type Binder func() Any

type Broker

type Broker interface {
	Name() string

	Options() Options

	Address() string

	Init(...Option) error

	Connect() error

	Disconnect() error

	Publish(ctx context.Context, topic string, msg Any, opts ...PublishOption) error

	Subscribe(topic string, handler Handler, binder Binder, opts ...SubscribeOption) (Subscriber, error)
}

type Event

type Event interface {
	Topic() string

	Message() *Message
	RawMessage() interface{}

	Ack() error

	Error() error
}

type Handler

type Handler func(ctx context.Context, evt Event) error

type Headers

type Headers map[string]string

type Message

type Message struct {
	Headers Headers
	Body    Any
}

func (Message) GetHeader

func (m Message) GetHeader(key string) string

func (Message) GetHeaders

func (m Message) GetHeaders() Headers

type Option

type Option func(*Options)

func OptionContextWithValue

func OptionContextWithValue(k, v interface{}) Option

func WithAddress

func WithAddress(addressList ...string) Option

WithAddress set broker address

func WithCodec

func WithCodec(name string) Option

WithCodec set codec, support: json, proto.

func WithEnableSecure

func WithEnableSecure(enable bool) Option

func WithErrorHandler

func WithErrorHandler(handler Handler) Option

func WithGlobalPropagator

func WithGlobalPropagator() Option

func WithGlobalTracerProvider

func WithGlobalTracerProvider() Option

func WithOptionContext

func WithOptionContext(ctx context.Context) Option

func WithPropagator

func WithPropagator(propagators propagation.TextMapPropagator) Option

func WithTLSConfig

func WithTLSConfig(config *tls.Config) Option

func WithTracerProvider

func WithTracerProvider(provider trace.TracerProvider, tracerName string) Option

type Options

type Options struct {
	Addrs []string

	Codec encoding.Codec

	ErrorHandler Handler

	Secure    bool
	TLSConfig *tls.Config

	Context context.Context

	Tracings []tracing.Option
}

func NewOptions

func NewOptions() Options

func NewOptionsAndApply

func NewOptionsAndApply(opts ...Option) Options

func (*Options) Apply

func (o *Options) Apply(opts ...Option)

type PublishOption

type PublishOption func(*PublishOptions)

func PublishContextWithValue

func PublishContextWithValue(k, v interface{}) PublishOption

func WithPublishContext

func WithPublishContext(ctx context.Context) PublishOption

type PublishOptions

type PublishOptions struct {
	Context context.Context
}

func NewPublishOptions

func NewPublishOptions(opts ...PublishOption) PublishOptions

func (*PublishOptions) Apply

func (o *PublishOptions) Apply(opts ...PublishOption)

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

func DisableAutoAck

func DisableAutoAck() SubscribeOption

func SubscribeContextWithValue

func SubscribeContextWithValue(k, v interface{}) SubscribeOption

func WithQueueName

func WithQueueName(name string) SubscribeOption

func WithSubscribeContext

func WithSubscribeContext(ctx context.Context) SubscribeOption

type SubscribeOptions

type SubscribeOptions struct {
	AutoAck bool
	Queue   string
	Context context.Context
}

func NewSubscribeOptions

func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions

func (*SubscribeOptions) Apply

func (o *SubscribeOptions) Apply(opts ...SubscribeOption)

type Subscriber

type Subscriber interface {
	// Options .
	Options() SubscribeOptions

	// Topic .
	Topic() string

	// Unsubscribe .
	Unsubscribe(removeFromManager bool) error
}

Subscriber .

func Subscribe added in v1.0.9

func Subscribe[T any](broker Broker, topic string, handler func(context.Context, string, Headers, *T) error, opts ...SubscribeOption) (Subscriber, error)

type SubscriberMap added in v1.1.0

type SubscriberMap map[string]Subscriber

type SubscriberSyncMap added in v1.1.0

type SubscriberSyncMap struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewSubscriberSyncMap added in v1.1.0

func NewSubscriberSyncMap() *SubscriberSyncMap

func (*SubscriberSyncMap) Add added in v1.1.0

func (sm *SubscriberSyncMap) Add(topic string, sub Subscriber)

func (*SubscriberSyncMap) Clear added in v1.1.0

func (sm *SubscriberSyncMap) Clear()

func (*SubscriberSyncMap) ForceClear added in v1.1.0

func (sm *SubscriberSyncMap) ForceClear()

func (*SubscriberSyncMap) Foreach added in v1.1.0

func (sm *SubscriberSyncMap) Foreach(fnc func(topic string, sub Subscriber))

func (*SubscriberSyncMap) Get added in v1.1.0

func (sm *SubscriberSyncMap) Get(topic string) Subscriber

func (*SubscriberSyncMap) Remove added in v1.1.0

func (sm *SubscriberSyncMap) Remove(topic string) error

func (*SubscriberSyncMap) RemoveOnly added in v1.1.3

func (sm *SubscriberSyncMap) RemoveOnly(topic string) bool

Directories

Path Synopsis
kafka module
mqtt module
nats module
nsq module
pulsar module
rabbitmq module
redis module
rocketmq module
stomp module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL