Documentation ¶
Index ¶
- Constants
- func AppName(name string) utils.OptionFunc[useOption]
- func Async(strategies ...strategy.Strategy) utils.OptionFunc[pubOption]
- func ChannelLen(channelLength int) utils.OptionFunc[subOption]
- func Construct(ctx context.Context, confs map[string]*Conf, opts ...utils.OptionExtender) func()
- func EventHandler[T eventual](hdr eventHandler[T]) eventHandler[T]
- func EventHandlerWithMsg[T eventual](hdr eventHandlerWithMsg[T]) eventHandlerWithMsg[T]
- func Events[T eventual](events ...Event[T]) utils.OptionFunc[eventPubOption[T]]
- func Messages(messages ...Message) utils.OptionFunc[pubOption]
- func NewEventPublisherDI[T eventual](name string, opts ...utils.OptionExtender) func() EventPublisher[T]
- func NewEventSubscriberDI[T eventual](name string, opts ...utils.OptionExtender) func() EventSubscriber[T]
- func Objects[T any](objectUUIDGenFunc func(T) string, objects ...any) utils.OptionFunc[pubOption]
- type Conf
- type Event
- func EventCreated[T eventual](id string, createdAt time.Time, payload T) Event[T]
- func EventDeleted[T eventual](id string, deletedAt time.Time, payload T) Event[T]
- func EventUpdated[T eventual](id string, updatedAt time.Time, payload T) Event[T]
- func NewEvent[T eventual](id string, createdAt, updatedAt, deletedAt time.Time, payload T) Event[T]
- func UntimedEvent[T eventual](id string, payload T) Event[T]
- type EventPublisher
- type EventSubscriber
- type HandlerFunc
- type IRouter
- type Message
- type Publisher
- type Subscriber
Constants ¶
View Source
const ( ErrDuplicatedSubscriberName utils.Error = "duplicated mq subscriber name" ErrDuplicatedPublisherName utils.Error = "duplicated mq publisher name" ErrDuplicatedRouterName utils.Error = "duplicated mq router name" ErrEventHandlerConflict utils.Error = "conflict with event handler and message handler" ErrNotImplement utils.Error = "mq not implement" )
Variables ¶
This section is empty.
Functions ¶
func AppName ¶
func AppName(name string) utils.OptionFunc[useOption]
func ChannelLen ¶
func ChannelLen(channelLength int) utils.OptionFunc[subOption]
func EventHandler ¶
func EventHandler[T eventual](hdr eventHandler[T]) eventHandler[T]
func EventHandlerWithMsg ¶
func EventHandlerWithMsg[T eventual](hdr eventHandlerWithMsg[T]) eventHandlerWithMsg[T]
func Events ¶
func Events[T eventual](events ...Event[T]) utils.OptionFunc[eventPubOption[T]]
func Messages ¶
func Messages(messages ...Message) utils.OptionFunc[pubOption]
func NewEventPublisherDI ¶
func NewEventPublisherDI[T eventual](name string, opts ...utils.OptionExtender) func() EventPublisher[T]
func NewEventSubscriberDI ¶
func NewEventSubscriberDI[T eventual](name string, opts ...utils.OptionExtender) func() EventSubscriber[T]
Types ¶
type Conf ¶
type Conf struct { Topic string `yaml:"topic" json:"topic" toml:"topic"` Type mqType `yaml:"type" json:"type" toml:"type"` Producer bool `yaml:"producer" json:"producer" toml:"producer" default:"true"` Consumer bool `yaml:"consumer" json:"consumer" toml:"consumer"` ConsumerGroup string `yaml:"consumer_group" json:"consumer_group" toml:"consumer_group"` ConsumerConcurrency int `yaml:"consumer_concurrency" json:"consumer_concurrency" toml:"consumer_concurrency"` Endpoint *endpointConf `yaml:"endpoint" json:"endpoint" toml:"endpoint"` Persistent bool `yaml:"persistent" json:"persistent" toml:"persistent"` SerializeType string `yaml:"serialize_type" json:"serialize_type" toml:"serialize_type"` CompressType string `yaml:"compress_type" json:"compress_type" toml:"compress_type"` EnableLogger bool `yaml:"enable_logger" json:"enable_logger" toml:"enable_logger" default:"false"` Logger string `yaml:"logger" json:"logger" toml:"logger" default:"github.com/wfusion/gofusion/log/customlogger.mqLogger"` LogInstance string `yaml:"log_instance" json:"log_instance" toml:"log_instance" default:"default"` // mongo, mysql, mariadb option MessageScheme string `yaml:"message_scheme" json:"message_scheme" toml:"message_scheme" default:"watermill_message"` SeriesScheme string `yaml:"series_scheme" json:"series_scheme" toml:"series_scheme" default:"watermill_series"` ConsumerScheme string `yaml:"consumer_scheme" json:"consumer_scheme" toml:"consumer_scheme" default:"watermill_subscriber"` ConsumeMiddlewares []*middlewareConf `yaml:"consume_middlewares" json:"consume_middlewares" toml:"consume_middlewares"` }
Conf mq config nolint: revive // struct tag too long issue
type Event ¶
type Event[T eventual] interface { ID() string Type() string CreatedAt() time.Time UpdatedAt() time.Time DeletedAt() time.Time Payload() T Context() context.Context Ack() bool Nack() bool }
func EventCreated ¶
func EventDeleted ¶
func EventUpdated ¶
func UntimedEvent ¶
type EventPublisher ¶
type EventPublisher[T eventual] interface { // PublishEvent publishes provided messages to given topic. // // PublishEvent can be synchronous or asynchronous - it depends on the implementation. // // Most publishers implementations don't support atomic publishing of messages. // This means that if publishing one of the messages fails, the next messages will not be published. // // PublishEvent must be thread safe. PublishEvent(ctx context.Context, opts ...utils.OptionExtender) error }
func NewEventPublisher ¶
func NewEventPublisher[T eventual](name string, opts ...utils.OptionExtender) EventPublisher[T]
type EventSubscriber ¶
type EventSubscriber[T eventual] interface { // SubscribeEvent returns output channel with events from provided topic. // Channel is closed, when Close() was called on the subscriber. // // When provided ctx is cancelled, subscriber will close subscribe and close output channel. // Provided ctx is set to all produced messages. SubscribeEvent(ctx context.Context, opts ...utils.OptionExtender) (<-chan Event[T], error) }
func NewEventSubscriber ¶
func NewEventSubscriber[T eventual](name string, opts ...utils.OptionExtender) EventSubscriber[T]
type HandlerFunc ¶
type IRouter ¶
type IRouter interface { Handle(handlerName string, hdr any, opts ...utils.OptionExtender) Serve() error Start() Running() <-chan struct{} // contains filtered or unexported methods }
type Message ¶
type Message interface { ID() string Payload() []byte RawMessage() any Context() context.Context Object() any Ack() bool Nack() bool }
func NewMessage ¶
type Publisher ¶
type Publisher interface { // Publish publishes provided messages to given topic. // // Publish can be synchronous or asynchronous - it depends on the implementation. // // Most publishers implementations don't support atomic publishing of messages. // This means that if publishing one of the messages fails, the next messages will not be published. // // Publish must be thread safe. Publish(ctx context.Context, opts ...utils.OptionExtender) error // PublishRaw publishes provided raw messages to given topic. // // PublishRaw can be synchronous or asynchronous - it depends on the implementation. // // Most publishers implementations don't support atomic publishing of messages. // This means that if publishing one of the messages fails, the next messages will not be published. // // PublishRaw must be thread safe. PublishRaw(ctx context.Context, opts ...utils.OptionExtender) error // contains filtered or unexported methods }
type Subscriber ¶
type Subscriber interface { // Subscribe returns output channel with messages from provided topic. // Channel is closed, when Close() was called on the subscriber. // // When provided ctx is cancelled, subscriber will close subscribe and close output channel. // Provided ctx is set to all produced messages. Subscribe(ctx context.Context, opts ...utils.OptionExtender) (<-chan Message, error) // SubscribeRaw returns output channel with original messages from provided topic. // Channel is closed, when Close() was called on the subscriber. // // When provided ctx is cancelled, subscriber will close subscribe and close output channel. // Provided ctx is set to all produced messages. SubscribeRaw(ctx context.Context, opts ...utils.OptionExtender) (<-chan Message, error) // contains filtered or unexported methods }
func Sub ¶
func Sub(name string, opts ...utils.OptionExtender) Subscriber
Click to show internal directories.
Click to hide internal directories.