store

package
v0.0.0-...-1897b02 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2018 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AppendMessageResult

type AppendMessageResult struct {
	Status         AppendMessageStatus
	WroteOffset    int64
	WroteBytes     int64
	MsgId          string
	StoreTimestamp int64
	LogicsOffset   int64
}

AppendMessageResult 写入commitlong返回结果集 Author gaoyanlei Since 2017/8/16

type AppendMessageStatus

type AppendMessageStatus int

AppendMessageStatus 写入commitlog 返回code Author gaoyanlei Since 2017/8/16

const (
	APPENDMESSAGE_PUT_OK AppendMessageStatus = iota
	END_OF_FILE
	MESSAGE_SIZE_EXCEEDED
	APPENDMESSAGE_UNKNOWN_ERROR
)

func (AppendMessageStatus) String

func (status AppendMessageStatus) String() string

type BufferResult

type BufferResult interface {
	Release()
	Buffer() ByteBuffer
	Size() int
}

type ByteBuffer

type ByteBuffer interface {
	Bytes() []byte

	Write([]byte) (int, error)
	WriteInt8(int8) error
	WriteInt16(int16) error
	WriteInt32(int32) error
	WriteInt64(int64) error

	Read([]byte) (int, error)
	ReadInt8() int8
	ReadInt16() int16
	ReadInt32() int32
	ReadInt64() int64
}

ByteBuffer

type GetMessageResult

type GetMessageResult struct {
	// 多个连续的消息集合
	MessageMapedList list.List

	// 用来向Consumer传送消息
	MessageBufferList list.List

	// 枚举变量,取消息结果
	Status GetMessageStatus

	// 当被过滤后,返回下一次开始的Offset
	NextBeginOffset int64

	// 逻辑队列中的最小Offset
	MinOffset int64

	// 逻辑队列中的最大Offset
	MaxOffset int64

	// ByteBuffer 总字节数
	BufferTotalSize int

	// 是否建议从slave拉消息
	SuggestPullingFromSlave bool
}

GetMessageResult 访问消息返回结果 Author gaoyanlei Since 2017/8/17

func (*GetMessageResult) AddMessage

func (gmr *GetMessageResult) AddMessage(bufferResult BufferResult)

func (*GetMessageResult) GetMessageCount

func (gmr *GetMessageResult) GetMessageCount() int

GetMessageCount 获取message个数 Author gaoyanlei Since 2017/8/17

func (*GetMessageResult) Release

func (gmr *GetMessageResult) Release()

Release

type GetMessageStatus

type GetMessageStatus int

GetMessageStatus 访问消息返回的状态码 Author gaoyanlei Since 2017/8/17

const (
	// 找到消息
	FOUND GetMessageStatus = iota
	// offset正确,但是过滤后没有匹配的消息
	NO_MATCHED_MESSAGE
	// offset正确,但是物理队列消息正在被删除
	MESSAGE_WAS_REMOVING
	// offset正确,但是从逻辑队列没有找到,可能正在被删除
	OFFSET_FOUND_NULL
	// offset错误,严重溢出
	OFFSET_OVERFLOW_BADLY
	// offset错误,溢出1个
	OFFSET_OVERFLOW_ONE
	// offset错误,太小了
	OFFSET_TOO_SMALL
	// 没有对应的逻辑队列
	NO_MATCHED_LOGIC_QUEUE
	// 队列中一条消息都没有
	NO_MESSAGE_IN_QUEUE
)

func (GetMessageStatus) String

func (gms GetMessageStatus) String() string

type MessageExtInner

type MessageExtInner struct {
	message.MessageExt
	PropertiesString string
	TagsCode         int64
}

MessageExtInner 存储内部使用的Message对象 Author gaoyanlei Since 2017/8/16

func (*MessageExtInner) IsWaitStoreMsgOK

func (mebi *MessageExtInner) IsWaitStoreMsgOK() bool

type MessageStore

type MessageStore interface {
	Load() bool                                        //
	Start() error                                      //
	Shutdown()                                         // 关闭存储服务
	Destroy()                                          //
	PutMessage(msg *MessageExtInner) *PutMessageResult //
	GetMessage(group string, topic string, queueId int32, offset int64, maxMsgNums int32, subscriptionData *heartbeat.SubscriptionData) *GetMessageResult
	MaxOffsetInQueue(topic string, queueId int32) int64                                              // 获取指定队列最大Offset 如果队列不存在,返回-1
	MinOffsetInQueue(topic string, queueId int32) int64                                              // 获取指定队列最小Offset 如果队列不存在,返回-1
	CommitLogOffsetInQueue(topic string, queueId int32, cqOffset int64) int64                        //
	OffsetInQueueByTime(topic string, queueId int32, timestamp int64) int64                          // 根据消息时间获取某个队列中对应的offset
	LookMessageByOffset(commitLogOffset int64) *message.MessageExt                                   // 通过物理队列Offset,查询消息。 如果发生错误,则返回null
	SelectOneMessageByOffset(commitLogOffset int64) BufferResult                                     // 通过物理队列Offset,查询消息。 如果发生错误,则返回null
	SelectOneMessageByOffsetAndSize(commitLogOffset int64, msgSize int32) BufferResult               // 通过物理队列Offset、size,查询消息。 如果发生错误,则返回null
	RunningDataInfo() string                                                                         //
	RuntimeInfo() map[string]string                                                                  // 取运行时统计数据
	MaxPhyOffset() int64                                                                             //获取物理队列最大offset
	MinPhyOffset() int64                                                                             //
	EarliestMessageTime(topic string, queueId int32) int64                                           // 获取队列中最早的消息时间
	MessageStoreTimeStamp(topic string, queueId int32, offset int64) int64                           //
	MessageTotalInQueue(topic string, queueId int32) int64                                           //
	GetCommitLogData(offset int64) BufferResult                                                      // 数据复制使用:获取CommitLog数据
	AppendToCommitLog(startOffset int64, data []byte) bool                                           // 数据复制使用:向CommitLog追加数据,并分发至各个Consume Queue
	ExcuteDeleteFilesManualy()                                                                       //
	QueryMessage(topic string, key string, maxNum int32, begin int64, end int64) *QueryMessageResult //
	UpdateHaMasterAddress(newAddr string)                                                            //
	SlaveFallBehindMuch() int64                                                                      // Slave落后Master多少,单位字节
	CleanUnusedTopic(topics []string) int32
	CleanExpiredConsumerQueue()                                                                            // 清除失效的消费队列
	MessageIds(topic string, queueId int32, minOffset, maxOffset int64, storeHost string) map[string]int64 // 批量获取 messageId
	CheckInDiskByConsumeOffset(topic string, queueId int32, consumeOffset int64) bool                      //判断消息是否在磁盘
	EncodeScheduleMsg() string
	StoreStats() stats.StoreStats
	BrokerStats() stats.BrokerStats
}

MessageStore 存储层对外提供的接口 Author zhoufei Since 2017/9/6

type PutMessageResult

type PutMessageResult struct {
	Status PutMessageStatus
	Result *AppendMessageResult
}

PutMessageResult 写入消息返回结果 Author gaoyanlei Since 2017/8/16

func (*PutMessageResult) IsOk

func (pms *PutMessageResult) IsOk() bool

type PutMessageStatus

type PutMessageStatus int

PutMessageStatus 写入消息过程的返回结果 Author gaoyanlei Since 2017/8/16

const (
	PUTMESSAGE_PUT_OK PutMessageStatus = iota
	FLUSH_DISK_TIMEOUT
	FLUSH_SLAVE_TIMEOUT
	SLAVE_NOT_AVAILABLE
	SERVICE_NOT_AVAILABLE
	CREATE_MAPPED_FILE_FAILED
	MESSAGE_ILLEGAL
	PUTMESSAGE_UNKNOWN_ERROR
)

func (PutMessageStatus) String

func (status PutMessageStatus) String() string

type QueryMessageResult

type QueryMessageResult struct {
	MessageMapedList         []BufferResult // 多个连续的消息集合
	MessageBufferList        []ByteBuffer   // 用来向Consumer传送消息
	IndexLastUpdateTimestamp int64
	IndexLastUpdatePhyoffset int64
	BufferTotalSize          int32 // ByteBuffer 总字节数
}

QueryMessageResult 通过Key查询消息,返回结果 Author zhoufei Since 2017/9/6

func NewQueryMessageResult

func NewQueryMessageResult() *QueryMessageResult

func (*QueryMessageResult) AddMessage

func (qmr *QueryMessageResult) AddMessage(bufferResult BufferResult)

Directories

Path Synopsis
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
mmap
Package mmap allows mapping files into memory.
Package mmap allows mapping files into memory.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL