broker

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 7, 2023 License: MIT Imports: 12 Imported by: 3

Documentation

Index

Constants

View Source
const (
	MessageAct    = "Message-Act"
	MessageId     = "Message-Id"
	MessageFrom   = "Massage-From"
	MeggageTo     = "Massage-To"
	Identifier    = "Identifier"
	Failed        = "Failed"
	MaggageToken  = "Massage-Token"
	ContextCodec  = "Context-Codec"
	TimeStampFrom = "TimeStampFrom"
	TimeStampTo   = "TimeStampTo"

	Host             = "Host"
	Connection       = "Connection"
	ContentType      = "Content-Type"
	TransferEncoding = "Transfer-Encoding"
	AcceptEncoding   = "Accept-Encoding"
	Authorization    = "Authorization"
	ContentLength    = "Content-Length"
	SessionId        = "SessionId"
)

/////////////////////////////////////////////////////////////////////////////

Variables

View Source
var (
	DefaultCodec encoding.Codec = nil
)

Functions

func Marshal

func Marshal(codec encoding.Codec, msg Any) ([]byte, error)

func Unmarshal

func Unmarshal(codec encoding.Codec, inputData []byte, outValue interface{}) error

Types

type Any

type Any interface{}

type Binder

type Binder func() Any

type Broker

type Broker interface {
	Name() string
	Options() Options
	Address() string

	Init(...Option) error

	Connect() error
	ConnectRetry() error
	Disconnect() error

	Publish(topic string, msg Any, opts ...PublishOption) error
	PublishRaw(topic string, msg []byte, opts ...PublishOption) error

	Subscribe(topic string, handler Handler, binder Binder, opts ...SubscribeOption) (Subscriber, error)
}

type EncodeErrorFunc

type EncodeErrorFunc func(error) RespEvent

EncodeErrorFunc is encode error func.

type Event

type Event interface {
	Topic() string
	Message() *Message
	Ack() error
	Error() error
	Data() Any
	Raw() []byte
}

type Handler

type Handler func(context.Context, Event) error

type Headers

type Headers map[string]string

type Message

type Message struct {
	Headers Headers
	Body    Any
}

func (Message) GetHeader

func (m Message) GetHeader(key string) string

func (Message) GetHeaders

func (m Message) GetHeaders() Headers

type Option

type Option func(*Options)

func OptionContextWithValue

func OptionContextWithValue(k, v interface{}) Option

func WithAddress

func WithAddress(addressList ...string) Option

WithAddress set broker address

func WithClientId

func WithClientId(clientId string) Option

WithClientId set broker clientId

func WithCodec

func WithCodec(name string) Option

WithCodec set codec, support: json, proto.

func WithDisconnect

func WithDisconnect(f func()) Option

func WithEnableSecure

func WithEnableSecure(enable bool) Option

func WithErrorHandler

func WithErrorHandler(handler Handler) Option

func WithGlobalPropagator

func WithGlobalPropagator() Option

func WithGlobalTracerProvider

func WithGlobalTracerProvider() Option

func WithLogger

func WithLogger(logger log.Logger) Option

func WithOnConnect

func WithOnConnect(f func()) Option

func WithOptionContext

func WithOptionContext(ctx context.Context) Option

func WithPropagator

func WithPropagator(propagators propagation.TextMapPropagator) Option

func WithTLSConfig

func WithTLSConfig(config *tls.Config) Option

func WithTracerProvider

func WithTracerProvider(provider trace.TracerProvider, tracerName string) Option

type Options

type Options struct {
	Addrs    []string
	ClientId string // 检查是不是发给自己的
	FromTo   bool

	Codec encoding.Codec

	ErrorHandler Handler

	Secure    bool
	TLSConfig *tls.Config

	Context context.Context

	Tracings []tracing.Option

	OnConnect  func()
	Disconncet func()

	Log *log.Helper
	// contains filtered or unexported fields
}

func NewOptions

func NewOptions() Options

func NewOptionsAndApply

func NewOptionsAndApply(opts ...Option) Options

func (*Options) Apply

func (o *Options) Apply(opts ...Option)

func (*Options) Logger

func (o *Options) Logger() *log.Helper

type PublishOption

type PublishOption func(*PublishOptions)

func PublishContextWithValue

func PublishContextWithValue(k, v interface{}) PublishOption

func WithPublishContext

func WithPublishContext(ctx context.Context) PublishOption

type PublishOptions

type PublishOptions struct {
	Context context.Context
}

func NewPublishOptions

func NewPublishOptions(opts ...PublishOption) PublishOptions

func (*PublishOptions) Apply

func (o *PublishOptions) Apply(opts ...PublishOption)

type RespEvent

type RespEvent interface {
	GetBody() Any
}

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

func DisableAutoAck

func DisableAutoAck() SubscribeOption

func SubscribeContextWithValue

func SubscribeContextWithValue(k, v interface{}) SubscribeOption

func WithQueueName

func WithQueueName(name string) SubscribeOption

func WithSubscribAct

func WithSubscribAct(Act int) SubscribeOption

func WithSubscribeContext

func WithSubscribeContext(ctx context.Context) SubscribeOption

type SubscribeOptions

type SubscribeOptions struct {
	AutoAck bool
	Act     int // 0 :  ,1 : req, 2 : resp, 3 : upload
	Queue   string
	Context context.Context
}

func NewSubscribeOptions

func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions

func (*SubscribeOptions) Apply

func (o *SubscribeOptions) Apply(opts ...SubscribeOption)

type Subscriber

type Subscriber interface {
	Options() SubscribeOptions
	Topic() string
	Unsubscribe() error
}

Directories

Path Synopsis
mqtt module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL