SheXiang_mq

package module
v0.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2023 License: Apache-2.0 Imports: 10 Imported by: 0

README

Shexiang-mq

Shexiang-mq 充分利用go 高并发特性的本地mq

目录

Install

go get -u github.com/dingguangyi0/SheXiang-mq

Usage

1 消息模型(Message Model)

Shexiang-mq 主要由 Producer、Consumer、MessageQueue、 三部分组成,其中Producer 负责生产消息,Consumer 负责并发消费消息,MessageQueue 负责存储消息,每个Topic中的消息地址存储于多个 Message Queue 中

2 消息生产者(Producer)

负责生产消息,一般由业务系统负责生产消息。

3 消息消费者(Consumer)

负责消费消息,一般是后台系统负责异步消费。启动消费者后,消费者会启动监听 MessageQueue队列。

4 主题(Topic)

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题

Examples & Demos

package main

import (
	"fmt"
	m "github.com/dingguangyi0/SheXiang-mq"
	"math/rand"
	"strconv"
	"time"
)

func main() {
	//init mq config file
	config := &m.MqConfig{
		ConsumerConfig: &m.ConsumerConfig{
			PoolSize: 10,
			MessageListeners: map[string]func(message m.Message) m.ConsumeConcurrentlyStatus{
				"topic-test": func(message m.Message) m.ConsumeConcurrentlyStatus {
					fmt.Println("Consumed topic-test")
					duration := time.Duration(rand.Int63n(5))
					time.Sleep(duration * time.Second)
					return m.ConsumeSuccess
				},
				"topic-test2": func(message m.Message) m.ConsumeConcurrentlyStatus {
					fmt.Println("Consumed topic-test2")
					duration := time.Duration(rand.Int63n(5))
					time.Sleep(duration * time.Second)
					return m.ConsumeSuccess
				},
			},
		},
		Selector: m.Random,
	}
	mq, _ := config.NewStart()
	consumer := mq.Consumer
	producer := mq.Producer
	listener := mq.MonitorListener
	listener.TurnMonitor()
	for i := 0; i < 1; i++ {
		func(i int) {
			err := producer.Send(m.Message{
				Topic: "topic-test",
				Body:  []byte(strconv.Itoa(i)),
			})
			if err != nil {
				return
			}
		}(i)
		func(i int) {
			err := producer.Send(m.Message{
				Topic: "topic-test2",
				Body:  []byte(strconv.Itoa(i)),
			})
			if err != nil {
				return
			}
		}(i)
	}
	fmt.Println("Produ")

	//发送完毕取消订阅
	consumer.Unsubscribe("topic-test", "topic-test2")
	//生产者关闭
	producer.Shutdown()
	//消费者关闭
	consumer.ShutdownCallback(func() {
		fmt.Println("消费者关闭")
	})
	//执行回调
	//关闭监控
	listener.CloseMonitor()
}

Documentation

Index

Constants

View Source
const (
	DefaultProducerGroup = "DEFAULT_PRODUCER"
	DefaultConsumerGroup = "DEFAULT_CONSUMER"
)
View Source
const (
	DefaultMessageQueueLength = 5
	DefaultMessageCapLength   = 1
	DefaultPoolSize           = 5
)

Variables

View Source
var (
	TopicEmpty = errors.New("please specify a topic to send messages ")

	ServiceStateNoStart = errors.New("producer service start createJust for not send messages")
)
View Source
var (
	// ErrShutdownAlready the producer service state not OK, maybe started once
	ErrShutdownAlready = errors.New("the producer service state not OK, maybe started once")
	// ConsumerConfigEmpty Consumer configuration required for MQ is empty
	ConsumerConfigEmpty = errors.New("consumer configuration required for MQ is empty")
	//ConsumerConfigMessageListenersEmpty consumer configuration MessageListeners required for MQ is empty
	ConsumerConfigMessageListenersEmpty = errors.New("consumer configuration MessageListeners required for MQ is empty")
)

Functions

func TrimSuffix

func TrimSuffix(s, suffix string) string

Types

type AllocateMessageQueueStrategy

type AllocateMessageQueueStrategy interface {
	// Name The strategy name
	Name() string

	// Allocate To allocate result of given strategy
	Allocate() []MessageQueue
}

AllocateMessageQueueStrategy Strategy Algorithm for message allocating between consumers

type ConsumeConcurrentlyStatus

type ConsumeConcurrentlyStatus int
const (
	// ConsumeSuccess Success consumption
	ConsumeSuccess ConsumeConcurrentlyStatus = iota
	// ReconsumeLater Failure consumption,later try to consume
	ReconsumeLater
)

type Consumer

type Consumer interface {
	// Start the consumer with the given parameters and return immediately
	Start() error

	// Shutdown Stop the consumer with the given parameters and return immediately
	Shutdown()

	ShutdownCallback(callback func())

	// Subscribe with the given parameters and return immediately
	Subscribe(topic string) bool

	// Unsubscribe with the given parameters and return immediately
	Unsubscribe(topics ...string)

	// RegisterMessageListener with the given parameters and return immediately and
	RegisterMessageListener(topic string, l func(message Message) ConsumeConcurrentlyStatus)
}

type ConsumerConfig

type ConsumerConfig struct {
	PoolSize         int
	ConsumerGroup    string
	MessageListeners map[string]func(message Message) ConsumeConcurrentlyStatus
}

type DefaultConsumer

type DefaultConsumer struct {
	ConsumerGroup string
	ServiceState  ServiceState
	Listeners     map[string]func(message Message) ConsumeConcurrentlyStatus
	// contains filtered or unexported fields
}

func (*DefaultConsumer) Callback

func (c *DefaultConsumer) Callback(f func())

func (*DefaultConsumer) RegisterMessageListener

func (c *DefaultConsumer) RegisterMessageListener(topic string, listener func(message Message) ConsumeConcurrentlyStatus)

func (*DefaultConsumer) Shutdown

func (c *DefaultConsumer) Shutdown()

func (*DefaultConsumer) ShutdownCallback

func (c *DefaultConsumer) ShutdownCallback(callback func())

func (*DefaultConsumer) Start

func (c *DefaultConsumer) Start() error

func (*DefaultConsumer) Subscribe

func (c *DefaultConsumer) Subscribe(topic string) bool

func (*DefaultConsumer) Unsubscribe

func (c *DefaultConsumer) Unsubscribe(topics ...string)

type DefaultProducer

type DefaultProducer struct {
	ProducerGroup string
	ServiceState  ServiceState
	MqFactory     *MqFactory
}

func (*DefaultProducer) Send

func (p *DefaultProducer) Send(msg Message) error

func (*DefaultProducer) Shutdown

func (p *DefaultProducer) Shutdown()

func (*DefaultProducer) Start

func (p *DefaultProducer) Start() error

type FactoryConfig

type FactoryConfig struct {
	ToPicConfigs []*ToPicConfig
	//Topic 队列选择器 选填默认随机 优先ToPicConfigs 中
	Selector MsgQueueSelector
}

type Message

type Message struct {
	Topic      string
	Body       []byte
	Properties map[string]any
}

type MessageQueue

type MessageQueue struct {
	QueueId string
	Message chan Message
}

type MessageQueueSelector

type MessageQueueSelector interface {
	Select(messageQueues []*MessageQueue) *MessageQueue
}

func MessageQueueSelectorCreate

func MessageQueueSelectorCreate(selector MsgQueueSelector) MessageQueueSelector

type Monitor

type Monitor struct {
	// contains filtered or unexported fields
}

func NewMonitor added in v0.3.0

func NewMonitor(mqFactory *MqFactory) *Monitor

func (*Monitor) CloseMonitor

func (m *Monitor) CloseMonitor()

func (*Monitor) Info

func (m *Monitor) Info() map[string]*Msg

func (*Monitor) Print added in v0.3.0

func (m *Monitor) Print()

func (*Monitor) Surround added in v0.3.0

func (m *Monitor) Surround(listener func(message Message) ConsumeConcurrentlyStatus, msg Message)

func (*Monitor) TurnMonitor

func (m *Monitor) TurnMonitor()

type MonitorListener

type MonitorListener interface {
	Surround(messageListener func(message Message) ConsumeConcurrentlyStatus, msg Message)

	Print()

	Info() map[string]*Msg

	TurnMonitor()

	CloseMonitor()
}

type Mq

type Mq struct {
	Consumer        Consumer
	Producer        Producer
	MonitorListener MonitorListener
}

type MqConfig

type MqConfig struct {
	//Topic 队列选择器 //默认随机 ToPicConfig 不配置 默认去mq
	Selector             MsgQueueSelector
	ProducerConfig       *ProducerConfig
	ConsumerConfig       *ConsumerConfig
	ToPicConfigs         []*ToPicConfig
	DefaultTopicPoolSize int
}

func (*MqConfig) New

func (mq *MqConfig) New() (*Mq, error)

func (*MqConfig) NewStart

func (mq *MqConfig) NewStart() (*Mq, error)

New 快捷创建mq 并使用 默认启用 consumer producer

type MqFactory

type MqFactory struct {
	TopicPublishInfoTable map[string]*TopicPublishInfo
	// contains filtered or unexported fields
}

func Instance

func Instance(config *FactoryConfig) *MqFactory

func (*MqFactory) GetConsumer

func (m *MqFactory) GetConsumer() Consumer

GetConsumer 获取一个topic 对应的消费者 采用延迟初始化

func (*MqFactory) GetConsumerByConfig

func (m *MqFactory) GetConsumerByConfig(config *ConsumerConfig) Consumer

func (*MqFactory) GetConsumerByGroup

func (m *MqFactory) GetConsumerByGroup(groupName string) Consumer

GetConsumer 获取一个topic 对应的消费者 采用延迟初始化

func (*MqFactory) GetProducer

func (m *MqFactory) GetProducer() Producer

GetProducer 获取一个topic 对应的生产者 采用延迟初始化

func (*MqFactory) GetProducerByConfig

func (m *MqFactory) GetProducerByConfig(config *ProducerConfig) Producer

func (*MqFactory) GetProducerByGroup

func (m *MqFactory) GetProducerByGroup(groupName string) Producer

GetProducer 获取一个topic 对应的生产者 采用延迟初始化

type Msg

type Msg struct {
	// topicName name for monitoring
	TopicName string
	// consumerGroup name for monitoring
	ConsumerGroup string
	// taskCount task count for monitoring
	TaskCount int64
	// timeCount time count for monitoring
	TimeCount int64
	// maxTime max time task  for monitoring
	MaxTime int64
	// minTime min time task for monitoring
	MinTime int64

	Running int64

	Waiting int64
	// started
	StartTime time.Time
	//
	LastTaskTime int64
	// contains filtered or unexported fields
}

type MsgQueueSelector

type MsgQueueSelector int
const (
	// Random 随机
	Random MsgQueueSelector = iota
	Polling
)

type Pool added in v0.3.0

type Pool struct {
	Pool ants.Pool
}

type Producer

type Producer interface {
	Start() error

	Shutdown()

	Send(msg Message) error
}

Producer Message

type ProducerConfig

type ProducerConfig struct {
	ProducerGroupName string
}

type SelectMessageQueueByPolling

type SelectMessageQueueByPolling struct {
	// contains filtered or unexported fields
}

func (*SelectMessageQueueByPolling) Select

func (s *SelectMessageQueueByPolling) Select(messageQueues []*MessageQueue) *MessageQueue

Select 通过轮询选择消息队列

type SelectMessageQueueByRandom

type SelectMessageQueueByRandom struct{}

func (*SelectMessageQueueByRandom) Select

func (s *SelectMessageQueueByRandom) Select(messageQueues []*MessageQueue) *MessageQueue

Select 随机选择消息队列

type ServiceState

type ServiceState int
const (
	// CreateJust Service just created,not start
	CreateJust ServiceState = iota
	// Running Service Running
	Running
	// ShutdownAlready Service shutdown
	ShutdownAlready
	// StartFailed Service Start failure
	StartFailed
)

type ToPicConfig

type ToPicConfig struct {
	//名称
	TopicName string
	//队列长度
	MessageQueueLength int
	//队列缓存容量
	MessageCapLength int
	//Topic 队列选择器 //默认随机
	Selector MsgQueueSelector
	// 可用线程数
	PoolSize int
	// contains filtered or unexported fields
}

ToPicConfig Topic 配置

func NewToPicConfig

func NewToPicConfig(topicName string) *ToPicConfig

func NewToPicConfigByPoolSize added in v0.3.0

func NewToPicConfigByPoolSize(topicName string, DefaultTopicPoolSize int) *ToPicConfig

type TopicPublishInfo

type TopicPublishInfo struct {
	ToPicConfig          *ToPicConfig
	MessageQueueSelector MessageQueueSelector
	// contains filtered or unexported fields
}

func (*TopicPublishInfo) TopicBlockageMessageQueueCount

func (topic *TopicPublishInfo) TopicBlockageMessageQueueCount() int64

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL