mq

package
v1.1.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: Apache-2.0 Imports: 44 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrDuplicatedSubscriberName utils.Error = "duplicated mq subscriber name"
	ErrDuplicatedPublisherName  utils.Error = "duplicated mq publisher name"
	ErrDuplicatedRouterName     utils.Error = "duplicated mq router name"
	ErrEventHandlerConflict     utils.Error = "conflict with event handler and message handler"
	ErrNotImplement             utils.Error = "mq not implement"
)

Variables

This section is empty.

Functions

func AppName

func AppName(name string) utils.OptionFunc[useOption]

func Async

func Async(strategies ...strategy.Strategy) utils.OptionFunc[pubOption]

func ChannelLen

func ChannelLen(channelLength int) utils.OptionFunc[subOption]

func Construct

func Construct(ctx context.Context, confs map[string]*Conf, opts ...utils.OptionExtender) func()

func EventHandler

func EventHandler[T eventual](hdr eventHandler[T]) eventHandler[T]

func EventHandlerWithMsg

func EventHandlerWithMsg[T eventual](hdr eventHandlerWithMsg[T]) eventHandlerWithMsg[T]

func Events

func Events[T eventual](events ...Event[T]) utils.OptionFunc[eventPubOption[T]]

func Messages

func Messages(messages ...Message) utils.OptionFunc[pubOption]

func NewEventPublisherDI

func NewEventPublisherDI[T eventual](name string, opts ...utils.OptionExtender) func() EventPublisher[T]

func NewEventSubscriberDI

func NewEventSubscriberDI[T eventual](name string, opts ...utils.OptionExtender) func() EventSubscriber[T]

func Objects

func Objects[T any](objectUUIDGenFunc func(T) string, objects ...any) utils.OptionFunc[pubOption]

Types

type Conf

type Conf struct {
	Topic               string        `yaml:"topic" json:"topic" toml:"topic"`
	Type                mqType        `yaml:"type" json:"type" toml:"type"`
	Producer            bool          `yaml:"producer" json:"producer" toml:"producer" default:"true"`
	Consumer            bool          `yaml:"consumer" json:"consumer" toml:"consumer"`
	ConsumerGroup       string        `yaml:"consumer_group" json:"consumer_group" toml:"consumer_group"`
	ConsumerConcurrency int           `yaml:"consumer_concurrency" json:"consumer_concurrency" toml:"consumer_concurrency"`
	Endpoint            *endpointConf `yaml:"endpoint" json:"endpoint" toml:"endpoint"`
	Persistent          bool          `yaml:"persistent" json:"persistent" toml:"persistent"`
	SerializeType       string        `yaml:"serialize_type" json:"serialize_type" toml:"serialize_type"`
	CompressType        string        `yaml:"compress_type" json:"compress_type" toml:"compress_type"`

	EnableLogger bool   `yaml:"enable_logger" json:"enable_logger" toml:"enable_logger" default:"false"`
	Logger       string `yaml:"logger" json:"logger" toml:"logger" default:"github.com/wfusion/gofusion/log/customlogger.mqLogger"`
	LogInstance  string `yaml:"log_instance" json:"log_instance" toml:"log_instance" default:"default"`

	// mongo, mysql, mariadb option
	MessageScheme  string `yaml:"message_scheme" json:"message_scheme" toml:"message_scheme" default:"watermill_message"`
	SeriesScheme   string `yaml:"series_scheme" json:"series_scheme" toml:"series_scheme" default:"watermill_series"`
	ConsumerScheme string `yaml:"consumer_scheme" json:"consumer_scheme" toml:"consumer_scheme" default:"watermill_subscriber"`

	ConsumeMiddlewares []*middlewareConf `yaml:"consume_middlewares" json:"consume_middlewares" toml:"consume_middlewares"`
}

Conf mq config nolint: revive // struct tag too long issue

type Event

type Event[T eventual] interface {
	ID() string
	Type() string
	CreatedAt() time.Time
	UpdatedAt() time.Time
	DeletedAt() time.Time
	Payload() T
	Context() context.Context
	Ack() bool
	Nack() bool
}

func EventCreated

func EventCreated[T eventual](id string, createdAt time.Time, payload T) Event[T]

func EventDeleted

func EventDeleted[T eventual](id string, deletedAt time.Time, payload T) Event[T]

func EventUpdated

func EventUpdated[T eventual](id string, updatedAt time.Time, payload T) Event[T]

func NewEvent

func NewEvent[T eventual](id string, createdAt, updatedAt, deletedAt time.Time, payload T) Event[T]

func UntimedEvent

func UntimedEvent[T eventual](id string, payload T) Event[T]

type EventPublisher

type EventPublisher[T eventual] interface {
	// PublishEvent publishes provided messages to given topic.
	//
	// PublishEvent can be synchronous or asynchronous - it depends on the implementation.
	//
	// Most publishers implementations don't support atomic publishing of messages.
	// This means that if publishing one of the messages fails, the next messages will not be published.
	//
	// PublishEvent must be thread safe.
	PublishEvent(ctx context.Context, opts ...utils.OptionExtender) error
}

func NewEventPublisher

func NewEventPublisher[T eventual](name string, opts ...utils.OptionExtender) EventPublisher[T]

type EventSubscriber

type EventSubscriber[T eventual] interface {
	// SubscribeEvent returns output channel with events from provided topic.
	// Channel is closed, when Close() was called on the subscriber.
	//
	// When provided ctx is cancelled, subscriber will close subscribe and close output channel.
	// Provided ctx is set to all produced messages.
	SubscribeEvent(ctx context.Context, opts ...utils.OptionExtender) (<-chan Event[T], error)
}

func NewEventSubscriber

func NewEventSubscriber[T eventual](name string, opts ...utils.OptionExtender) EventSubscriber[T]

type HandlerFunc

type HandlerFunc func(msg Message) error

type IRouter

type IRouter interface {
	Handle(handlerName string, hdr any, opts ...utils.OptionExtender)
	Serve() error
	Start()
	Running() <-chan struct{}
	// contains filtered or unexported methods
}

func Use

func Use(name string, opts ...utils.OptionExtender) IRouter

type Message

type Message interface {
	ID() string
	Payload() []byte
	RawMessage() any
	Context() context.Context
	Object() any
	Ack() bool
	Nack() bool
}

func NewMessage

func NewMessage(uuid string, payload []byte) Message

type Publisher

type Publisher interface {
	// Publish publishes provided messages to given topic.
	//
	// Publish can be synchronous or asynchronous - it depends on the implementation.
	//
	// Most publishers implementations don't support atomic publishing of messages.
	// This means that if publishing one of the messages fails, the next messages will not be published.
	//
	// Publish must be thread safe.
	Publish(ctx context.Context, opts ...utils.OptionExtender) error

	// PublishRaw publishes provided raw messages to given topic.
	//
	// PublishRaw can be synchronous or asynchronous - it depends on the implementation.
	//
	// Most publishers implementations don't support atomic publishing of messages.
	// This means that if publishing one of the messages fails, the next messages will not be published.
	//
	// PublishRaw must be thread safe.
	PublishRaw(ctx context.Context, opts ...utils.OptionExtender) error
	// contains filtered or unexported methods
}

func Pub

func Pub(name string, opts ...utils.OptionExtender) Publisher

type Subscriber

type Subscriber interface {
	// Subscribe returns output channel with messages from provided topic.
	// Channel is closed, when Close() was called on the subscriber.
	//
	// When provided ctx is cancelled, subscriber will close subscribe and close output channel.
	// Provided ctx is set to all produced messages.
	Subscribe(ctx context.Context, opts ...utils.OptionExtender) (<-chan Message, error)

	// SubscribeRaw returns output channel with original messages from provided topic.
	// Channel is closed, when Close() was called on the subscriber.
	//
	// When provided ctx is cancelled, subscriber will close subscribe and close output channel.
	// Provided ctx is set to all produced messages.
	SubscribeRaw(ctx context.Context, opts ...utils.OptionExtender) (<-chan Message, error)
	// contains filtered or unexported methods
}

func Sub

func Sub(name string, opts ...utils.OptionExtender) Subscriber

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL