stgstorelog

package
v0.0.0-...-130f5e9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2018 License: Apache-2.0 Imports: 36 Imported by: 3

README

smartgostore

smartgostore is ...

Read the docs

Documentation

Overview

Copyright (c) 2015-2018 All rights reserved. 本软件源代码版权归 my.oschina.net/tantexian 所有,允许复制与学习借鉴. Author: tantexian, <tantexian@qq.com> Since: 2017/8/7

Copyright (c) 2015-2018 All rights reserved. 本软件源代码版权归 my.oschina.net/tantexian 所有,允许复制与学习借鉴. Author: tantexian, <tantexian@qq.com> Since: 2017/8/6

Copyright (c) 2015-2018 All rights reserved. 本软件源代码版权归 my.oschina.net/tantexian 所有,允许复制与学习借鉴. Author: tantexian, <tantexian@qq.com> Since: 2017/8/5

Copyright (c) 2015-2018 All rights reserved. 本软件源代码版权归 my.oschina.net/tantexian 所有,允许复制与学习借鉴. Author: tantexian, <tantexian@qq.com> Since: 2017/8/7

Copyright (c) 2015-2018 All rights reserved. 本软件源代码版权归 my.oschina.net/tantexian 所有,允许复制与学习借鉴. Author: tantexian, <tantexian@qq.com> Since: 2017/8/6

Index

Constants

View Source
const (
	WaitTimeOut              = 1000 * 5
	DEFAULT_INITIAL_CAPACITY = 11
)
View Source
const (
	MessageMagicCode = 0xAABBCCDD ^ 1880681586 + 8
	BlankMagicCode   = 0xBBCCDDEE ^ 1880681586 + 8
)
View Source
const (
	END_FILE_MIN_BLANK_LENGTH   = 4 + 4
	TOTALSIZE                   = 4 // 1 TOTALSIZE
	MAGICCODE                   = 4 // 2 MAGICCODE
	BODYCRC                     = 4 // 3 BODYCRC
	QUEUE_ID                    = 4 // 4 QUEUEID
	FLAG                        = 4 // 5 FLAG
	QUEUE_OFFSET                = 8 // 6 QUEUEOFFSET
	PHYSICAL_OFFSET             = 8 // 7 PHYSICALOFFSET
	SYSFLAG                     = 4 // 8 SYSFLAG
	BORN_TIMESTAMP              = 8 // 9 BORNTIMESTAMP
	BORN_HOST                   = 8 // 10 BORNHOST
	STORE_TIMESTAMP             = 8 // 11 STORETIMESTAMP
	STORE_HOST_ADDRESS          = 8 // 12 STOREHOSTADDRESS
	RE_CONSUME_TIMES            = 4 // 13 RECONSUMETIMES
	PREPARED_TRANSACTION_OFFSET = 8 // 14 Prepared Transaction Offset
	BODY_LENGTH                 = 4
	TOPIC_LENGTH                = 1
	PROPERTIES_LENGTH           = 2
)
View Source
const (
	TotalPhysicalMemorySize = 1024 * 1024 * 1024 * 24
	LongMinValue            = -9223372036854775808
)
View Source
const (
	OS_PAGE_SIZE       = 1024 * 4
	MMAPED_ENTIRE_FILE = -1
)
View Source
const (
	NotReadableBit           = 1      // 禁止读权限
	NotWriteableBit          = 1 << 1 // 禁止写权限
	WriteLogicsQueueErrorBit = 1 << 2 // 逻辑队列是否发生错误
	WriteIndexFileErrorBit   = 1 << 3 // 索引文件是否发生错误
	DiskFullBit              = 1 << 4 // 磁盘空间不足
)
View Source
const (
	SCHEDULE_TOPIC     = "SCHEDULE_TOPIC_XXX"
	FIRST_DELAY_TIME   = int64(1000)
	DELAY_FOR_A_WHILE  = int64(100)
	DELAY_FOR_A_PERIOD = int64(10000)
)
View Source
const (
	FrequencyOfSampling  = 1000
	MaxRecordsOfSampling = 60 * 10
	PrintTPSInterval     = 60 * 1
)
View Source
const (
	CQStoreUnitSize = 20 // 存储单元大小
)

ConsumeQueue 消费队列实现 Author zhoufei Since 2017/9/7

View Source
const (
	FlushRetryTimesOver = 3
)
View Source
const (
	GroupCommitRequestHighWater = 600000
)
View Source
const (
	MaxManualDeleteFileTimes = 20 // 手工触发一次最多删除次数
)
View Source
const (
	ReadSocketMaxBufferSize = 1024 * 1024
)
View Source
const (
	RetryTimesOver = 3
)
View Source
const (
	SUBSCRIPTION_ALL = "*"
)

Variables

View Source
var (
	HASH_SLOT_SIZE int32 = 4
	INDEX_SIZE     int32 = 20
	INVALID_INDEX  int32 = 0
)
View Source
var (
	INDEX_HEADER_SIZE    int32 = 40
	BEGINTIMESTAMP_INDEX int32 = 0
	ENDTIMESTAMP_INDEX   int32 = 8
	BEGINPHYOFFSET_INDEX int32 = 16
	ENDPHYOFFSET_INDEX   int32 = 24
	HASHSLOTCOUNT_INDEX  int32 = 32
	INDEXCOUNT_INDEX     int32 = 36
)

Functions

func GetHome

func GetHome() string

func GetParentDirectory

func GetParentDirectory(dir string) string

func GetPathSeparator

func GetPathSeparator() string

func GetUnixHome

func GetUnixHome() string

func GetWindowsHome

func GetWindowsHome() string

func HashCode

func HashCode(s string) int64

func PathExists

func PathExists(path string) (bool, error)

func TagsString2tagsCode

func TagsString2tagsCode(filterType stgcommon.TopicFilterType, tags string) int64

Types

type AcceptSocketService

type AcceptSocketService struct {
	// contains filtered or unexported fields
}

AcceptSocketService Author zhoufei Since 2017/10/19

func NewAcceptSocketService

func NewAcceptSocketService(port int32, haService *HAService) *AcceptSocketService

func (*AcceptSocketService) Shutdown

func (self *AcceptSocketService) Shutdown(interrupt bool)

type AllocateMapedFileService

type AllocateMapedFileService struct {
	// contains filtered or unexported fields
}

func NewAllocateMapedFileService

func NewAllocateMapedFileService() *AllocateMapedFileService

func (*AllocateMapedFileService) Shutdown

func (self *AllocateMapedFileService) Shutdown()

func (*AllocateMapedFileService) Start

func (self *AllocateMapedFileService) Start()

type AllocateRequest

type AllocateRequest struct {
	// contains filtered or unexported fields
}

func NewAllocateRequest

func NewAllocateRequest(filePath string, fileSize int64) *AllocateRequest

type AppendMessageCallback

type AppendMessageCallback interface {
	// contains filtered or unexported methods
}

AppendMessageCallback 写消息回调接口 Author: tantexian, <tantexian@qq.com> Since: 2017/8/6

type AppendMessageResult

type AppendMessageResult struct {
	Status         AppendMessageStatus
	WroteOffset    int64
	WroteBytes     int64
	MsgId          string
	StoreTimestamp int64
	LogicsOffset   int64
}

AppendMessageResult 写入commitlong返回结果集 Author gaoyanlei Since 2017/8/16

type AppendMessageStatus

type AppendMessageStatus int

AppendMessageStatus 写入commitlog 返回code Author gaoyanlei Since 2017/8/16

const (
	APPENDMESSAGE_PUT_OK AppendMessageStatus = iota
	END_OF_FILE
	MESSAGE_SIZE_EXCEEDED
	APPENDMESSAGE_UNKNOWN_ERROR
)

func (AppendMessageStatus) AppendMessageString

func (status AppendMessageStatus) AppendMessageString() string

type CallSnapshot

type CallSnapshot struct {
	// contains filtered or unexported fields
}

func NewCallSnapshot

func NewCallSnapshot(timestamp, callTimesTotal int64) *CallSnapshot

type CleanCommitLogService

type CleanCommitLogService struct {
	// contains filtered or unexported fields
}

CleanCommitLogService 清理物理文件服务 Author zhoufei Since 2017/10/13

func NewCleanCommitLogService

func NewCleanCommitLogService(defaultMessageStore *DefaultMessageStore) *CleanCommitLogService

type CleanConsumeQueueService

type CleanConsumeQueueService struct {
	// contains filtered or unexported fields
}

CleanConsumeQueueService 清理逻辑文件服务 Author zhoufei Since 2017/10/13

func NewCleanConsumeQueueService

func NewCleanConsumeQueueService(defaultMessageStore *DefaultMessageStore) *CleanConsumeQueueService

type CleanReferenceResource

type CleanReferenceResource interface {
	// contains filtered or unexported methods
}

type CommitLog

type CommitLog struct {
	MapedFileQueue        *MapedFileQueue
	DefaultMessageStore   *DefaultMessageStore
	GroupCommitService    *GroupCommitService
	FlushRealTimeService  *FlushRealTimeService
	AppendMessageCallback *DefaultAppendMessageCallback
	TopicQueueTable       map[string]int64
	// contains filtered or unexported fields
}

func NewCommitLog

func NewCommitLog(defaultMessageStore *DefaultMessageStore) *CommitLog

func (*CommitLog) Load

func (self *CommitLog) Load() bool

func (*CommitLog) Shutdown

func (self *CommitLog) Shutdown()

func (*CommitLog) Start

func (self *CommitLog) Start()

type ConsumeQueue

type ConsumeQueue struct {
	// contains filtered or unexported fields
}

func NewConsumeQueue

func NewConsumeQueue(topic string, queueId int32, storePath string, mapedFileSize int64, defaultMessageStore *DefaultMessageStore) *ConsumeQueue

type ConsumeQueueTable

type ConsumeQueueTable struct {
	// contains filtered or unexported fields
}

func NewConsumeQueueTable

func NewConsumeQueueTable() *ConsumeQueueTable

type DefaultAppendMessageCallback

type DefaultAppendMessageCallback struct {
	// contains filtered or unexported fields
}

func NewDefaultAppendMessageCallback

func NewDefaultAppendMessageCallback(size int32, commitLog *CommitLog) *DefaultAppendMessageCallback

type DefaultMessageFilter

type DefaultMessageFilter struct {
}

DefaultMessageFilter 消息过滤规则实现 Author zhoufei Since 2017/9/6

func (*DefaultMessageFilter) IsMessageMatched

func (df *DefaultMessageFilter) IsMessageMatched(subscriptionData *heartbeat.SubscriptionData, tagsCode int64) bool

type DefaultMessageStore

type DefaultMessageStore struct {
	MessageFilter      *DefaultMessageFilter // 消息过滤
	MessageStoreConfig *MessageStoreConfig   // 存储配置
	CommitLog          *CommitLog

	FlushConsumeQueueService *FlushConsumeQueueService // 逻辑队列刷盘服务
	CleanCommitLogService    *CleanCommitLogService    // 清理物理文件服务
	CleanConsumeQueueService *CleanConsumeQueueService // 清理逻辑文件服务
	DispatchMessageService   *DispatchMessageService   // 分发消息索引服务
	IndexService             *IndexService             // 消息索引服务
	AllocateMapedFileService *AllocateMapedFileService // 从物理队列解析消息重新发送到逻辑队列
	ReputMessageService      *ReputMessageService      // 从物理队列解析消息重新发送到逻辑队列
	HAService                *HAService                // HA服务
	ScheduleMessageService   *ScheduleMessageService   // 定时服务
	TransactionStateService  *TransactionStateService  // 分布式事务服务
	TransactionCheckExecuter *TransactionCheckExecuter // 事务回查接口
	StoreStatsService        *StoreStatsService        // 运行时数据统计
	RunningFlags             *RunningFlags             // 运行过程标志位
	SystemClock              *stgcommon.SystemClock    // 优化获取时间性能,精度1ms
	ShutdownFlag             bool                      // 存储服务是否启动
	StoreCheckpoint          *StoreCheckpoint
	BrokerStatsManager       *stats.BrokerStatsManager
	// contains filtered or unexported fields
}

DefaultMessageStore 存储层对外提供的接口 Author zhoufei Since 2017/9/6

func NewDefaultMessageStore

func NewDefaultMessageStore(messageStoreConfig *MessageStoreConfig, brokerStatsManager *stats.BrokerStatsManager) *DefaultMessageStore

func (*DefaultMessageStore) AppendToCommitLog

func (self *DefaultMessageStore) AppendToCommitLog(startOffset int64, data []byte) bool

AppendToCommitLog 向CommitLog追加数据,并分发至各个Consume Queue Author: zhoufei Since: 2017/10/24

func (*DefaultMessageStore) CheckInDiskByConsumeOffset

func (self *DefaultMessageStore) CheckInDiskByConsumeOffset(topic string, queueId int32, consumeOffset int64) bool

CheckInDiskByConsumeOffset 判断消息是否在磁盘 Author: zhoufei Since: 2017/9/20

func (*DefaultMessageStore) CleanExpiredConsumerQueue

func (self *DefaultMessageStore) CleanExpiredConsumerQueue()

GetMessageStoreTimeStamp 清除失效的消费队列 Author: zhoufei Since: 2017/9/21

func (*DefaultMessageStore) CleanUnusedTopic

func (self *DefaultMessageStore) CleanUnusedTopic(topics []string) int32

CleanUnusedTopic 清除未使用Topic

func (*DefaultMessageStore) Destroy

func (self *DefaultMessageStore) Destroy()

func (*DefaultMessageStore) GetCommitLogData

func (self *DefaultMessageStore) GetCommitLogData(offset int64) *SelectMapedBufferResult

GetCommitLogData 数据复制使用:获取CommitLog数据 Author: zhoufei Since: 2017/10/23

func (*DefaultMessageStore) GetEarliestMessageTime

func (self *DefaultMessageStore) GetEarliestMessageTime(topic string, queueId int32) int64

GetEarliestMessageTime 获取队列中最早的消息时间,如果找不到对应时间,则返回-1 Author: zhoufei Since: 2017/9/21

func (*DefaultMessageStore) GetMaxOffsetInQueue

func (self *DefaultMessageStore) GetMaxOffsetInQueue(topic string, queueId int32) int64

GetMaxOffsetInQueue 获取指定队列最大Offset 如果队列不存在,返回-1 Author: zhoufei Since: 2017/9/20

func (*DefaultMessageStore) GetMaxPhyOffset

func (self *DefaultMessageStore) GetMaxPhyOffset() int64

GetMaxPhyOffset 获取物理队列最大offset Author: zhoufei Since: 2017/10/24

func (*DefaultMessageStore) GetMessage

func (self *DefaultMessageStore) GetMessage(group string, topic string, queueId int32, offset int64, maxMsgNums int32, subscriptionData *heartbeat.SubscriptionData) *GetMessageResult

func (*DefaultMessageStore) GetMessageIds

func (self *DefaultMessageStore) GetMessageIds(topic string, queueId int32, minOffset, maxOffset int64, storeHost string) map[string]int64

GetMessageIds 批量获取MessageId Author: zhoufei Since: 2017/9/21

func (*DefaultMessageStore) GetMessageStoreTimeStamp

func (self *DefaultMessageStore) GetMessageStoreTimeStamp(topic string, queueId int32, offset int64) int64

GetMessageStoreTimeStamp 获取队列中存储时间,如果找不到对应时间,则返回-1 Author: zhoufei Since: 2017/9/21

func (*DefaultMessageStore) GetMinOffsetInQueue

func (self *DefaultMessageStore) GetMinOffsetInQueue(topic string, queueId int32) int64

GetMinOffsetInQueue 获取指定队列最小Offset 如果队列不存在,返回-1 Author: zhoufei Since: 2017/9/20

func (*DefaultMessageStore) GetOffsetInQueueByTime

func (self *DefaultMessageStore) GetOffsetInQueueByTime(topic string, queueId int32, timestamp int64) int64

GetOffsetInQueueByTime 根据消息时间获取某个队列中对应的offset 1、如果指定时间(包含之前之后)有对应的消息,则获取距离此时间最近的offset(优先选择之前) 2、如果指定时间无对应消息,则返回0 Author: zhoufei Since: 2017/9/21

func (*DefaultMessageStore) GetRuntimeInfo

func (self *DefaultMessageStore) GetRuntimeInfo() map[string]string

GetRuntimeInfo 获取运行时统计数据 Author: zhoufei Since: 2017/9/21

func (*DefaultMessageStore) Load

func (self *DefaultMessageStore) Load() bool

func (*DefaultMessageStore) LookMessageByOffset

func (self *DefaultMessageStore) LookMessageByOffset(commitLogOffset int64) *message.MessageExt

LookMessageByOffset 通过物理队列Offset,查询消息。 如果发生错误,则返回null Author: zhoufei Since: 2017/9/20

func (*DefaultMessageStore) Now

func (self *DefaultMessageStore) Now() int64

func (*DefaultMessageStore) PutMessage

func (*DefaultMessageStore) QueryMessage

func (self *DefaultMessageStore) QueryMessage(topic string, key string, maxNum int32, begin int64, end int64) *QueryMessageResult

func (*DefaultMessageStore) SelectOneMessageByOffset

func (self *DefaultMessageStore) SelectOneMessageByOffset(commitLogOffset int64) *SelectMapedBufferResult

SelectOneMessageByOffset 通过物理队列Offset,查询消息。 如果发生错误,则返回null Author: zhoufei Since: 2017/9/20

func (*DefaultMessageStore) SelectOneMessageByOffsetAndSize

func (self *DefaultMessageStore) SelectOneMessageByOffsetAndSize(commitLogOffset int64, msgSize int32) *SelectMapedBufferResult

SelectOneMessageByOffsetAndSize 通过物理队列Offset、size,查询消息。 如果发生错误,则返回null Author: zhoufei Since: 2017/9/20

func (*DefaultMessageStore) Shutdown

func (self *DefaultMessageStore) Shutdown()

func (*DefaultMessageStore) SlaveFallBehindMuch

func (self *DefaultMessageStore) SlaveFallBehindMuch() int64

SlaveFallBehindMuch Slave落后Master多少byte Author: zhoufei Since: 2017/9/21

func (*DefaultMessageStore) Start

func (self *DefaultMessageStore) Start() error

func (*DefaultMessageStore) UpdateHaMasterAddress

func (self *DefaultMessageStore) UpdateHaMasterAddress(newAddr string)

UpdateHaMasterAddress 更新HaMaster地址 Author: zhoufei Since: 2017/9/21

type DispatchMessageService

type DispatchMessageService struct {
	// contains filtered or unexported fields
}

func NewDispatchMessageService

func NewDispatchMessageService(putMsgIndexHightWater int32, defaultMessageStore *DefaultMessageStore) *DispatchMessageService

func (*DispatchMessageService) Shutdown

func (self *DispatchMessageService) Shutdown()

func (*DispatchMessageService) Start

func (self *DispatchMessageService) Start()

type DispatchRequest

type DispatchRequest struct {
	// contains filtered or unexported fields
}

type Files

type Files []os.FileInfo

func (Files) Len

func (self Files) Len() int

func (Files) Less

func (self Files) Less(i, j int) bool

func (Files) Swap

func (self Files) Swap(i, j int)

type FlushCommitLogService

type FlushCommitLogService interface {
	// contains filtered or unexported methods
}

type FlushConsumeQueueService

type FlushConsumeQueueService struct {
	// contains filtered or unexported fields
}

func NewFlushConsumeQueueService

func NewFlushConsumeQueueService(defaultMessageStore *DefaultMessageStore) *FlushConsumeQueueService

func (*FlushConsumeQueueService) Shutdown

func (self *FlushConsumeQueueService) Shutdown()

func (*FlushConsumeQueueService) Start

func (self *FlushConsumeQueueService) Start()

type FlushRealTimeService

type FlushRealTimeService struct {
	// contains filtered or unexported fields
}

func NewFlushRealTimeService

func NewFlushRealTimeService(commitLog *CommitLog) *FlushRealTimeService

type GetMessageResult

type GetMessageResult struct {
	// 多个连续的消息集合
	MessageMapedList list.List

	// 用来向Consumer传送消息
	MessageBufferList list.List

	// 枚举变量,取消息结果
	Status GetMessageStatus

	// 当被过滤后,返回下一次开始的Offset
	NextBeginOffset int64

	// 逻辑队列中的最小Offset
	MinOffset int64

	// 逻辑队列中的最大Offset
	MaxOffset int64

	// ByteBuffer 总字节数
	BufferTotalSize int

	// 是否建议从slave拉消息
	SuggestPullingFromSlave bool
}

GetMessageResult 访问消息返回结果 Author gaoyanlei Since 2017/8/17

func (*GetMessageResult) GetMessageCount

func (self *GetMessageResult) GetMessageCount() int

getMessageCount 获取message个数 Author gaoyanlei Since 2017/8/17

func (*GetMessageResult) Release

func (self *GetMessageResult) Release()

type GetMessageStatus

type GetMessageStatus int

GetMessageStatus 访问消息返回的状态码 Author gaoyanlei Since 2017/8/17

const (
	// 找到消息
	FOUND GetMessageStatus = iota
	// offset正确,但是过滤后没有匹配的消息
	NO_MATCHED_MESSAGE
	// offset正确,但是物理队列消息正在被删除
	MESSAGE_WAS_REMOVING
	// offset正确,但是从逻辑队列没有找到,可能正在被删除
	OFFSET_FOUND_NULL
	// offset错误,严重溢出
	OFFSET_OVERFLOW_BADLY
	// offset错误,溢出1个
	OFFSET_OVERFLOW_ONE
	// offset错误,太小了
	OFFSET_TOO_SMALL
	// 没有对应的逻辑队列
	NO_MATCHED_LOGIC_QUEUE
	// 队列中一条消息都没有
	NO_MESSAGE_IN_QUEUE
)

func (GetMessageStatus) String

func (self GetMessageStatus) String() string

type GroupCommitRequest

type GroupCommitRequest struct {
	// contains filtered or unexported fields
}

GroupCommitRequest Author zhoufei Since 2017/10/18

func NewGroupCommitRequest

func NewGroupCommitRequest(nextOffset int64) *GroupCommitRequest

type GroupCommitService

type GroupCommitService struct {
	// contains filtered or unexported fields
}

GroupCommitService Author zhoufei Since 2017/10/18

func NewGroupCommitService

func NewGroupCommitService(commitLog *CommitLog) *GroupCommitService

type GroupTransferService

type GroupTransferService struct {
	// contains filtered or unexported fields
}

GroupTransferService 同步进度监听服务,如果达到应用层的写入偏移量,则通知应用层该同步已经完成。 Author zhoufei Since 2017/10/18

func NewGroupTransferService

func NewGroupTransferService(haService *HAService) *GroupTransferService

type HAClient

type HAClient struct {
	// contains filtered or unexported fields
}

HAClient HA高可用客户端 Author zhoufei Since 2017/10/18

func NewHAClient

func NewHAClient(haService *HAService) *HAClient

func (*HAClient) Shutdown

func (self *HAClient) Shutdown()

type HAConnection

type HAConnection struct {
	// contains filtered or unexported fields
}

HAConnection Author zhoufei Since 2017/10/19

func NewHAConnection

func NewHAConnection(haService *HAService, connection *net.TCPConn) *HAConnection

type HAService

type HAService struct {
	// contains filtered or unexported fields
}

HAService HA高可用服务 Author zhoufei Since 2017/10/18

func NewHAService

func NewHAService(defaultMessageStore *DefaultMessageStore) *HAService

func (*HAService) Shutdown

func (self *HAService) Shutdown()

func (*HAService) Start

func (self *HAService) Start()

type IndexFile

type IndexFile struct {
	// contains filtered or unexported fields
}

func NewIndexFile

func NewIndexFile(fileName string, hashSlotNum, indexNum int32, endPhyOffset, endTimestamp int64) *IndexFile

type IndexHeader

type IndexHeader struct {
	// contains filtered or unexported fields
}

func NewIndexHeader

func NewIndexHeader(mappedByteBuffer *MappedByteBuffer) *IndexHeader

type IndexService

type IndexService struct {
	// contains filtered or unexported fields
}

func NewIndexService

func NewIndexService(messageStore *DefaultMessageStore) *IndexService

func (*IndexService) Load

func (self *IndexService) Load(lastExitOK bool) bool

func (*IndexService) Shutdown

func (self *IndexService) Shutdown()

func (*IndexService) Start

func (self *IndexService) Start()

type MapedFile

type MapedFile struct {
	ReferenceResource
	// 当前映射的虚拟内存总大小
	TotalMapedVitualMemory int64
	// 当前JVM中mmap句柄数量
	TotalMapedFiles int32
	// contains filtered or unexported fields
}

maped_file 封装mapedfile类用于操作commitlog文件及consumelog文件 Author: tantexian, <tantexian@qq.com> Since: 2017/8/5

func NewMapedFile

func NewMapedFile(filePath string, filesize int64) (*MapedFile, error)

NewMapedFile 根据文件名新建mapedfile Author: tantexian, <tantexian@qq.com> Since: 2017/8/5

func (*MapedFile) AppendMessageWithCallBack

func (self *MapedFile) AppendMessageWithCallBack(msg interface{}, appendMessageCallback AppendMessageCallback) *AppendMessageResult

AppendMessageWithCallBack 向MapedBuffer追加消息 Return: appendNums 成功添加消息字节数 Author: tantexian, <tantexian@qq.com> Since: 2017/8/5

func (*MapedFile) Commit

func (self *MapedFile) Commit(flushLeastPages int32) (flushPosition int64)

Commit 消息提交刷盘 Params: flushLeastPages 一次刷盘最少个数 Return: flushPosition 当前刷盘位置 Author: tantexian, <tantexian@qq.com> Since: 2017/8/6

func (*MapedFile) Flush

func (self *MapedFile) Flush()

func (*MapedFile) Unmap

func (self *MapedFile) Unmap()

type MapedFileQueue

type MapedFileQueue struct {
	// 每次触发删除文件,最多删除多少个文件
	DeleteFilesBatchMax int
	// contains filtered or unexported fields
}

func NewMapedFileQueue

func NewMapedFileQueue(storePath string, mapedFileSize int64,
	allocateMapedFileService *AllocateMapedFileService) *MapedFileQueue

type MappedByteBuffer

type MappedByteBuffer struct {
	MMapBuf  mmap.MMap
	ReadPos  int // read at &buf[ReadPos]
	WritePos int // write at &buf[WritePos]
	Limit    int // MMapBuf's max Size
}

func NewMappedByteBuffer

func NewMappedByteBuffer(mMap mmap.MMap) *MappedByteBuffer

func (*MappedByteBuffer) Bytes

func (m *MappedByteBuffer) Bytes() []byte

func (*MappedByteBuffer) Read

func (m *MappedByteBuffer) Read(data []byte) (n int, err error)

Read reads the next len(p) bytes from the buffer or until the buffer is drained. The return value n is the number of bytes read. If the buffer has no data to return, err is io.EOF (unless en(p) is zero); otherwise it is nil.

func (*MappedByteBuffer) ReadInt16

func (self *MappedByteBuffer) ReadInt16() (i int16)

func (*MappedByteBuffer) ReadInt32

func (self *MappedByteBuffer) ReadInt32() (i int32)

func (*MappedByteBuffer) ReadInt64

func (self *MappedByteBuffer) ReadInt64() (i int64)

func (*MappedByteBuffer) ReadInt8

func (self *MappedByteBuffer) ReadInt8() (i int8)

func (*MappedByteBuffer) Write

func (m *MappedByteBuffer) Write(data []byte) (n int, err error)

Write appends the contents of data to the buffer

func (*MappedByteBuffer) WriteInt16

func (self *MappedByteBuffer) WriteInt16(i int16) (mappedByteBuffer *MappedByteBuffer)

func (*MappedByteBuffer) WriteInt32

func (self *MappedByteBuffer) WriteInt32(i int32) (mappedByteBuffer *MappedByteBuffer)

func (*MappedByteBuffer) WriteInt64

func (self *MappedByteBuffer) WriteInt64(i int64) (mappedByteBuffer *MappedByteBuffer)

func (*MappedByteBuffer) WriteInt8

func (self *MappedByteBuffer) WriteInt8(i int8) (mappedByteBuffer *MappedByteBuffer)

type MessageExtBrokerInner

type MessageExtBrokerInner struct {
	message.MessageExt
	PropertiesString string
	TagsCode         int64
}

MessageExtBrokerInner 存储内部使用的Message对象 Author gaoyanlei Since 2017/8/16

type MessageFilter

type MessageFilter interface {
	IsMessageMatched(subscriptionData *heartbeat.SubscriptionData, tagsCode int64) bool
}

MessageFilter 消息过滤接口 Author zhoufei Since 2017/9/6

type MessageStore

type MessageStore interface {
	Load() bool
	Start() error
	Shutdown() // 关闭存储服务
	Destroy()
	PutMessage(msg *MessageExtBrokerInner) *PutMessageResult
	GetMessage(group string, topic string, queueId int32, offset int64, maxMsgNums int32, subscriptionData *heartbeat.SubscriptionData) *GetMessageResult
	GetMaxOffsetInQueue(topic string, queueId int32) int64 // 获取指定队列最大Offset 如果队列不存在,返回-1
	GetMinOffsetInQueue(topic string, queueId int32) int64 // 获取指定队列最小Offset 如果队列不存在,返回-1
	GetCommitLogOffsetInQueue(topic string, queueId int32, cqOffset int64) int64
	GetOffsetInQueueByTime(topic string, queueId int32, timestamp int64) int64                     // 根据消息时间获取某个队列中对应的offset
	LookMessageByOffset(commitLogOffset int64) *message.MessageExt                                 // 通过物理队列Offset,查询消息。 如果发生错误,则返回null
	SelectOneMessageByOffset(commitLogOffset int64) *SelectMapedBufferResult                       // 通过物理队列Offset,查询消息。 如果发生错误,则返回null
	SelectOneMessageByOffsetAndSize(commitLogOffset int64, msgSize int32) *SelectMapedBufferResult // 通过物理队列Offset、size,查询消息。 如果发生错误,则返回null
	GetRunningDataInfo() string
	GetRuntimeInfo() map[string]string // 取运行时统计数据
	GetMaxPhyOffset() int64            //获取物理队列最大offset
	GetMinPhyOffset() int64
	GetEarliestMessageTime(topic string, queueId int32) int64 // 获取队列中最早的消息时间
	GetMessageStoreTimeStamp(topic string, queueId int32, offset int64) int64
	GetMessageTotalInQueue(topic string, queueId int32) int64
	GetCommitLogData(offset int64) *SelectMapedBufferResult // 数据复制使用:获取CommitLog数据
	AppendToCommitLog(startOffset int64, data []byte) bool  // 数据复制使用:向CommitLog追加数据,并分发至各个Consume Queue
	ExcuteDeleteFilesManualy()
	QueryMessage(topic string, key string, maxNum int32, begin int64, end int64) *QueryMessageResult
	UpdateHaMasterAddress(newAddr string)
	SlaveFallBehindMuch() int64 // Slave落后Master多少,单位字节
	Now() int64
	CleanUnusedTopic(topics []string) int32
	CleanExpiredConsumerQueue()                                                                               // 清除失效的消费队列
	GetMessageIds(topic string, queueId int32, minOffset, maxOffset int64, storeHost string) map[string]int64 // 批量获取 messageId
	CheckInDiskByConsumeOffset(topic string, queueId int32, consumeOffset int64) bool                         //判断消息是否在磁盘
}

MessageStore 存储层对外提供的接口 Author zhoufei Since 2017/9/6

type MessageStoreConfig

type MessageStoreConfig struct {
	StorePathRootDir                       string                     `json:"StorePathRootDir"`        // 存储跟目录
	StorePathCommitLog                     string                     `json:"StorePathCommitLog"`      // CommitLog存储目录
	StorePathConsumeQueue                  string                     `json:"StorePathConsumeQueue"`   // ConsumeQueue存储目录
	StorePathIndex                         string                     `json:"StorePathIndex"`          // 索引文件存储目录
	StoreCheckpoint                        string                     `json:"StoreCheckpoint"`         // 异常退出产生的文件
	AbortFile                              string                     `json:"AbortFile"`               // 异常退出产生的文件
	TranStateTableStorePath                string                     `json:"TranStateTableStorePath"` // 分布式事务配置
	TranStateTableMapedFileSize            int32                      `json:"TranStateTableMapedFileSize"`
	TranRedoLogStorePath                   string                     `json:"TranRedoLogStorePath"`
	TranRedoLogMapedFileSize               int32                      `json:"TranRedoLogMapedFileSize"`
	CheckTransactionMessageAtleastInterval int64                      `json:"CheckTransactionMessageAtleastInterval"` // 事务回查至少间隔时间
	CheckTransactionMessageTimerInterval   int64                      `json:"CheckTransactionMessageTimerInterval"`   // 事务回查定时间隔时间
	CheckTransactionMessageEnable          bool                       `json:"CheckTransactionMessageEnable"`          // 是否开启事务Check过程,双十一时,可以关闭
	MapedFileSizeCommitLog                 int32                      `json:"MapedFileSizeCommitLog"`                 // CommitLog每个文件大小 1G
	MapedFileSizeConsumeQueue              int32                      `json:"MapedFileSizeConsumeQueue"`              // ConsumeQueue每个文件大小 默认存储30W条消息
	FlushIntervalCommitLog                 int32                      `json:"FlushIntervalCommitLog"`                 // CommitLog刷盘间隔时间(单位毫秒)
	FlushCommitLogTimed                    bool                       `json:"FlushCommitLogTimed"`                    // 是否定时方式刷盘,默认是实时刷盘
	FlushIntervalConsumeQueue              int32                      `json:"FlushIntervalConsumeQueue"`              // ConsumeQueue刷盘间隔时间(单位毫秒)
	CleanResourceInterval                  int32                      `json:"CleanResourceInterval"`                  // 清理资源间隔时间(单位毫秒)
	DeleteCommitLogFilesInterval           int32                      `json:"DeleteCommitLogFilesInterval"`           // 删除多个CommitLog文件的间隔时间(单位毫秒)
	DeleteConsumeQueueFilesInterval        int32                      `json:"DeleteConsumeQueueFilesInterval"`        // 删除多个ConsumeQueue文件的间隔时间(单位毫秒)
	DestroyMapedFileIntervalForcibly       int32                      `json:"DestroyMapedFileIntervalForcibly"`       // 强制删除文件间隔时间(单位毫秒)
	RedeleteHangedFileInterval             int32                      `json:"RedeleteHangedFileInterval"`             // 定期检查Hanged文件间隔时间(单位毫秒)
	DeleteWhen                             string                     `json:"DeleteWhen"`                             // 何时触发删除文件, 默认凌晨4点删除文件
	DiskMaxUsedSpaceRatio                  int32                      `json:"DiskMaxUsedSpaceRatio"`                  // 磁盘空间最大使用率
	FileReservedTime                       int64                      `json:"FileReservedTime"`
	PutMsgIndexHightWater                  int32                      `json:"PutMsgIndexHightWater"`             // 写消息索引到ConsumeQueue,缓冲区高水位,超过则开始流控
	MaxMessageSize                         int32                      `json:"MaxMessageSize"`                    // 最大消息大小,默认512K
	CheckCRCOnRecover                      bool                       `json:"CheckCRCOnRecover"`                 // 重启时,是否校验CRC
	FlushCommitLogLeastPages               int32                      `json:"FlushCommitLogLeastPages"`          // 刷CommitLog,至少刷几个PAGE
	FlushConsumeQueueLeastPages            int32                      `json:"FlushConsumeQueueLeastPages"`       // 刷ConsumeQueue,至少刷几个PAGE
	FlushCommitLogThoroughInterval         int32                      `json:"FlushCommitLogThoroughInterval"`    // 刷CommitLog,彻底刷盘间隔时间
	FlushConsumeQueueThoroughInterval      int32                      `json:"FlushConsumeQueueThoroughInterval"` // 刷ConsumeQueue,彻底刷盘间隔时间
	MaxTransferBytesOnMessageInMemory      int32                      `json:"MaxTransferBytesOnMessageInMemory"` // 最大被拉取的消息字节数,消息在内存
	MaxTransferCountOnMessageInMemory      int32                      `json:"MaxTransferCountOnMessageInMemory"` // 最大被拉取的消息个数,消息在内存
	MaxTransferBytesOnMessageInDisk        int32                      `json:"MaxTransferBytesOnMessageInDisk"`   // 最大被拉取的消息字节数,消息在磁盘
	MaxTransferCountOnMessageInDisk        int32                      `json:"MaxTransferCountOnMessageInDisk"`   // 最大被拉取的消息个数,消息在磁盘
	AccessMessageInMemoryMaxRatio          int64                      `json:"AccessMessageInMemoryMaxRatio"`     // 命中消息在内存的最大比例
	MessageIndexEnable                     bool                       `json:"MessageIndexEnable"`                // 是否开启消息索引功能
	MaxHashSlotNum                         int32                      `json:"MaxHashSlotNum"`
	MaxIndexNum                            int32                      `json:"MaxIndexNum"`
	MaxMsgsNumBatch                        int32                      `json:"MaxMsgsNumBatch"`
	MessageIndexSafe                       bool                       `json:"MessageIndexSafe"` // 是否使用安全的消息索引功能,即可靠模式。可靠模式下,异常宕机恢复慢; 非可靠模式下,异常宕机恢复快
	HaListenPort                           int32                      `json:"HaListenPort"`     // HA功能
	HaSendHeartbeatInterval                int32                      `json:"HaSendHeartbeatInterval"`
	HaHousekeepingInterval                 int32                      `json:"HaHousekeepingInterval"`
	HaTransferBatchSize                    int32                      `json:"HaTransferBatchSize"`
	HaMasterAddress                        string                     `json:"HaMasterAddress"`      // 如果不设置,则从NameServer获取Master HA服务地址
	HaSlaveFallbehindMax                   int32                      `json:"HaSlaveFallbehindMax"` // Slave落后Master超过此值,则认为存在异常
	BrokerRole                             config.BrokerRole          `json:"BrokerRole"`
	FlushDiskType                          config.FlushDiskType       `json:"FlushDiskType"`
	SyncFlushTimeout                       int32                      `json:"SyncFlushTimeout"`  // 同步刷盘超时时间
	MessageDelayLevel                      string                     `json:"MessageDelayLevel"` // 定时消息相关
	FlushDelayOffsetInterval               int64                      `json:"FlushDelayOffsetInterval"`
	CleanFileForciblyEnable                bool                       `json:"CleanFileForciblyEnable"` // 磁盘空间超过90%警戒水位,自动开始删除文件
	SynchronizationType                    config.SynchronizationType `json:"SynchronizationType"`     // 主从同步数据类型
}

MessageStoreConfig 存储层配置文件类 Author zhoufei Since 2017/9/6

func NewMessageStoreConfig

func NewMessageStoreConfig() *MessageStoreConfig

type PutMessageResult

type PutMessageResult struct {
	PutMessageStatus    PutMessageStatus
	AppendMessageResult *AppendMessageResult
}

PutMessageResult 写入消息返回结果 Author gaoyanlei Since 2017/8/16

type PutMessageStatus

type PutMessageStatus int

PutMessageStatus 写入消息过程的返回结果 Author gaoyanlei Since 2017/8/16

const (
	PUTMESSAGE_PUT_OK PutMessageStatus = iota
	FLUSH_DISK_TIMEOUT
	FLUSH_SLAVE_TIMEOUT
	SLAVE_NOT_AVAILABLE
	SERVICE_NOT_AVAILABLE
	CREATE_MAPEDFILE_FAILED
	MESSAGE_ILLEGAL
	PUTMESSAGE_UNKNOWN_ERROR
)

func (PutMessageStatus) PutMessageString

func (status PutMessageStatus) PutMessageString() string

type QueryMessageResult

type QueryMessageResult struct {
	MessageMapedList         []*SelectMapedBufferResult // 多个连续的消息集合
	MessageBufferList        []*MappedByteBuffer        // 用来向Consumer传送消息
	IndexLastUpdateTimestamp int64
	IndexLastUpdatePhyoffset int64
	BufferTotalSize          int32 // ByteBuffer 总字节数
}

QueryMessageResult 通过Key查询消息,返回结果 Author zhoufei Since 2017/9/6

func NewQueryMessageResult

func NewQueryMessageResult() *QueryMessageResult

func (*QueryMessageResult) AddMessage

func (qmr *QueryMessageResult) AddMessage(mapedBuffer *SelectMapedBufferResult)

type ReadSocketService

type ReadSocketService struct {
	// contains filtered or unexported fields
}

ReadSocketService Author zhoufei Since 2017/10/19

func NewReadSocketService

func NewReadSocketService(connection *net.TCPConn, haConnection *HAConnection) *ReadSocketService

type ReferenceResource

type ReferenceResource struct {
	CleanReferenceResource
	// contains filtered or unexported fields
}

func NewReferenceResource

func NewReferenceResource() *ReferenceResource

type ReputMessageService

type ReputMessageService struct {
	// contains filtered or unexported fields
}

func NewReputMessageService

func NewReputMessageService(defaultMessageStore *DefaultMessageStore) *ReputMessageService

type RunningFlags

type RunningFlags struct {
	// contains filtered or unexported fields
}

type ScheduleMessageService

type ScheduleMessageService struct {
	// contains filtered or unexported fields
}

func NewScheduleMessageService

func NewScheduleMessageService(defaultMessageStore *DefaultMessageStore) *ScheduleMessageService

func (*ScheduleMessageService) Encode

func (self *ScheduleMessageService) Encode() string

func (*ScheduleMessageService) Load

func (self *ScheduleMessageService) Load() bool

func (*ScheduleMessageService) Shutdown

func (self *ScheduleMessageService) Shutdown()

func (*ScheduleMessageService) Start

func (self *ScheduleMessageService) Start()

type SelectMapedBufferResult

type SelectMapedBufferResult struct {
	StartOffset      int64
	MappedByteBuffer *MappedByteBuffer
	Size             int32
	MapedFile        *MapedFile
	// contains filtered or unexported fields
}

SelectMapedBufferResult 查询Pagecache返回结果 Author zhoufei Since 2017/9/6

func NewSelectMapedBufferResult

func NewSelectMapedBufferResult(startOffset int64, mappedByteBuffer *MappedByteBuffer, size int32, mapedFile *MapedFile) *SelectMapedBufferResult

func (*SelectMapedBufferResult) Release

func (self *SelectMapedBufferResult) Release()

type StoreCheckpoint

type StoreCheckpoint struct {
	// contains filtered or unexported fields
}

func NewStoreCheckpoint

func NewStoreCheckpoint(scpPath string) (*StoreCheckpoint, error)

type StoreStatsService

type StoreStatsService struct {
	// contains filtered or unexported fields
}

func NewStoreStatsService

func NewStoreStatsService() *StoreStatsService

func (*StoreStatsService) GetGetMessageTransferedMsgCount

func (self *StoreStatsService) GetGetMessageTransferedMsgCount() int64

func (*StoreStatsService) GetPutMessageTimesTotal

func (self *StoreStatsService) GetPutMessageTimesTotal() int64

func (*StoreStatsService) GetRuntimeInfo

func (self *StoreStatsService) GetRuntimeInfo() map[string]string

func (*StoreStatsService) Shutdown

func (self *StoreStatsService) Shutdown()

func (*StoreStatsService) Start

func (self *StoreStatsService) Start()

type TransactionCheckExecuter

type TransactionCheckExecuter interface {
	// contains filtered or unexported methods
}

type TransactionStateService

type TransactionStateService struct {
}

func NewTransactionStateService

func NewTransactionStateService(defaultMessageStore *DefaultMessageStore) *TransactionStateService

func (*TransactionStateService) Start

func (self *TransactionStateService) Start()

type WaitNotifyObject

type WaitNotifyObject struct {
	// contains filtered or unexported fields
}

WaitNotifyObject 用来做线程之间异步通知 Author zhoufei Since 2017/10/23

func NewWaitNotifyObject

func NewWaitNotifyObject() *WaitNotifyObject

type WriteSocketService

type WriteSocketService struct {
	// contains filtered or unexported fields
}

WriteSocketService Author zhoufei Since 2017/10/19

func NewWriteSocketService

func NewWriteSocketService(connection *net.TCPConn, haConnection *HAConnection) *WriteSocketService

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL