BaseServer

package
v0.0.0-...-e11b555 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 5, 2020 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SyncMap = iota
	ConcurrentMap
)
View Source
const (
	OK         = "OK"
	ErrNoKey   = "ErrNoKey"
	NoLeader   = "NoLeader"
	Duplicate  = "Duplicate"
	ReElection = "ReElection"

	ConnectError = "ConnectError" // 最特殊的一项 用于判断对端服务器是否连接成功 如果对端还未与全部服务器连接成功 则什么也不干

	OpenError       = "OpenError"
	DeleteError     = "DeleteError"
	CreateError     = "CreateError"
	AcquireError    = "AcquireError"
	ReleaseError    = "ReleaseError"
	CheckTokenError = "CheckTokenError"

	ValueCannotBeConvertedToNumber   = "ValueCannotBeConvertedToNumber"
	CurrentValueExceedsExpectedValue = "CurrentValueExceedsExpectedValue"
	CASFlagUndefined                 = "CASFlagUndefined"
	CASFailture                      = "CASFailture"
)

TODO 这里有机会全部改成数字,因为每次判断的时候都需要比较,效率较差

View Source
const (
	Opdelete = iota // 当引用计数为零时删除永久文件/目录
	Opclose         // 当引用计数为零时不删除永久文件/目录
)

用于Delete RPC

View Source
const (
	NoticeErrorValue = 0
	NoticeSucess     = 1
)
View Source
const (
	Cas = 1
	Add = 2
	Sub = 4
)
View Source
const (
	Directory     = iota
	TemporaryFile // 瞬时节点会在没有客户端打开它时自动删除
	PermanentFile
)
View Source
const (
	NotLock = iota
	ReadLock
	WriteLock
)
View Source
const (
	PathError = iota
	CheckSumError
	InstanceSeqError
	DoesNotSupportRecursiveDeletion
	CannotDeleteFilesWithZeroReferenceCount
	LockTypeError
	ReleaseBeforeAcquire
	OnlyDirectoriesCanCreateFiles
	TokenSeqError
	FileNameError
)

Variables

View Source
var RootFileOperation = InitFileOperation()

如果把这个放在Kvraft中fileSystem操作时会非常麻烦

View Source
var SHARD_COUNT = 32

TODO 桶的数量应该可以自己配置,因为xxhash的性能很优秀,所以越大的数据越适合更多的桶

Functions

func IntervalDisPlay

func IntervalDisPlay()

* @brief: Debug用,输出间隔符号

func RecursionDisplay

func RecursionDisplay(Fsn *FileSystemNode)

* @brief: Debug用,显示当前目录树的全部文件名

Types

type AcquireArgs

type AcquireArgs struct {
	ClientID    uint64
	SeqNo       int
	InstanceSeq uint64
	PathName    string
	FileName    string
	LockType    int
	Checksum    uint64
	TimeOut     uint32 // 毫秒为单位
}

type AcquireReply

type AcquireReply struct {
	Err   Err
	Token uint64
}

type BaseMap

type BaseMap struct {
	sync.Mutex
	// contains filtered or unexported fields
}

* @brief: 用于与一般ChubbyGoMap作比较

func NewBaseMap

func NewBaseMap() *BaseMap

func (*BaseMap) BaseDelete

func (myMap *BaseMap) BaseDelete(k string)

func (*BaseMap) BaseGet

func (myMap *BaseMap) BaseGet(k string) interface{}

func (*BaseMap) BaseStore

func (myMap *BaseMap) BaseStore(k string, v interface{})

type CASOp

type CASOp struct {
	ClientID  uint64 // 每个Client的ID
	Clientseq int    // 这个ClientID上目前的操作数

	Key string
	Old string // 思前想去old,new这里还是搞成string比较好,省的在守护线程里面再转换一次,减少临界区的开销
	New string

	Interval int
	// contains filtered or unexported fields
}

用于CAS

type CheckTokenArgs

type CheckTokenArgs struct {
	ClientID uint64
	SeqNo    int
	Token    uint64
	PathName string
	FileName string
}

type CheckTokenReply

type CheckTokenReply struct {
	Err Err
}

type ChubbyGoConcurrentMap

type ChubbyGoConcurrentMap struct {
	MapEntry reflect.Value
	Flag     uint32
}

func NewChubbyGoMap

func NewChubbyGoMap(flag uint32) *ChubbyGoConcurrentMap

func (*ChubbyGoConcurrentMap) ChubbyGoMapDelete

func (hs *ChubbyGoConcurrentMap) ChubbyGoMapDelete(key string)

func (*ChubbyGoConcurrentMap) ChubbyGoMapGet

func (hs *ChubbyGoConcurrentMap) ChubbyGoMapGet(key string) (string, bool)

func (*ChubbyGoConcurrentMap) ChubbyGoMapSet

func (hs *ChubbyGoConcurrentMap) ChubbyGoMapSet(key string, value string)

type ChubbyGoFileSystemError

type ChubbyGoFileSystemError int8

func (ChubbyGoFileSystemError) Error

func (err ChubbyGoFileSystemError) Error() string

type Clerk

type Clerk struct {
	ClientID uint64 // 记录当前客户端的序号
	// contains filtered or unexported fields
}

func MakeClerk

func MakeClerk(servers []*rpc.Client, IsOk *[]int32) *Clerk

在创建的时候已经知道了如何与服务端交互

func (*Clerk) Acquire

func (ck *Clerk) Acquire(pathname string, filename string, instanceseq uint64, LockType int, checksum uint64, timeout uint32) (bool, uint64)

* @brief: 对Fd目录下的Filename进行加锁,可以加读锁或者写锁,加锁不需要open * @param: 实例号和路径名来源于文件描述符;文件类型;文件名称 * @return: 返回加锁是否成功;

func (*Clerk) Append

func (ck *Clerk) Append(key string, value string)

func (*Clerk) CheckToken

func (ck *Clerk) CheckToken(pathname string, filename string, token uint64) bool

* @brief: 附带着目前持有的token,检测这个token是否还有效 * @notes: 问题的关键在于检查返回无效,其实在发出数据的一刻是有效的,但是不影响正确性

func (*Clerk) CompareAndSwap

func (ck *Clerk) CompareAndSwap(Key string, Old int, New int, Flag int) bool

* @brief: 提供一个功能更加丰富的CAS操作,见文件顶. * @notes: 操作要求key对应的值必须是数字,且现在其实没有做什么保护机制,也就是CAS操作的对象可能随时被修改成不是数字的值,这样就没办法了。 * TODO 这是后面权限的一个考虑点。

func (*Clerk) Create

func (ck *Clerk) Create(fd *FileDescriptor, Type int, filename string) (bool, *FileDescriptor)

* @brief: 在此文件描述符下创建一个文件,有三种类型目录,临时文件,文件 * @param: 实例号和路径名来源于文件描述符;文件类型;文件名称 * @return: 返回创建文件是否成功 * @notes: 对于返回值要先判断bool值再判断seq,bool为falseseq是没有意义的

func (*Clerk) Delete

func (ck *Clerk) Delete(pathname string, filename string, instanceseq uint64, opType int, checkSum uint64) bool

* @param: opType为操作类型,可以为delete或者close

func (*Clerk) FastGet

func (ck *Clerk) FastGet(key string) string

func (*Clerk) Get

func (ck *Clerk) Get(key string) string

* @brief: 因为为了保证强一致性,一个客户端一次只会跑一个操作

func (*Clerk) Open

func (ck *Clerk) Open(pathname string) (bool, *FileDescriptor)

* @brief: 要打开的文件路径;open只是为了可以在一个目录下创建文件,与加锁权限没有一点关系 * @return: 返回一个文件描述符 * @notes: 显然打开一个文件毫无意义,对于文件的操作有锁,对于内容的操作使用绝对路径为key直接get就ok

func (*Clerk) Put

func (ck *Clerk) Put(key string, value string)

func (*Clerk) PutAppend

func (ck *Clerk) PutAppend(key string, value string, op string)

func (*Clerk) Release

func (ck *Clerk) Release(pathname string, filename string, instanceseq uint64, token uint64, checksum uint64) bool

* @brief: 对特定的文件进行解锁,需要tocken是因为标记锁的版本,防止一个锁在其定义的超时范围之外进行解锁,从而解掉其他节点持有的锁 * @param: 路径名和文件名来源于文件描述符;instanceseq号;tocken号 * @return: 返回解锁是否成功;

type CloseArgs

type CloseArgs struct {
	ClientID    uint64
	SeqNo       int
	InstanceSeq uint64
	PathName    string
	FileName    string
	OpType      int
	Checksum    uint64
}

type CloseReply

type CloseReply struct {
	Err Err
}

type CompareAndSwapArgs

type CompareAndSwapArgs struct {
	ClientID uint64
	SeqNo    int
	Key      string
	Old      int
	New      int
	Flag     int
}

type CompareAndSwapReply

type CompareAndSwapReply struct {
	Err Err
}

type ConcurrentHashMap

type ConcurrentHashMap struct {
	ThreadSafeHashMap []*ConcurrentMapShared
	NowKeyNumber      uint64
	BucketNumber      uint32
}

func NewConcurrentMap

func NewConcurrentMap() *ConcurrentHashMap

* @brief: 创建一个新的map

func (ConcurrentHashMap) Count

func (m ConcurrentHashMap) Count() uint64

Returns the number of elements within the map. 返回目前map中的数据总量

func (ConcurrentHashMap) Get

func (m ConcurrentHashMap) Get(key string) (interface{}, bool)

Retrieves an element from map under given key.

func (ConcurrentHashMap) GetShard

func (m ConcurrentHashMap) GetShard(key string) *ConcurrentMapShared

* @brief: 利用key得到哈希表内的桶

func (ConcurrentHashMap) Has

func (m ConcurrentHashMap) Has(key string) bool

Looks up an item under specified key

func (ConcurrentHashMap) IsEmpty

func (m ConcurrentHashMap) IsEmpty() bool

Checks if map is empty.

func (ConcurrentHashMap) Items

func (m ConcurrentHashMap) Items() map[string]interface{}

Returns all items as map[string]interface{}

func (ConcurrentHashMap) Iter deprecated

func (m ConcurrentHashMap) Iter() <-chan Tuple

Returns an iterator which could be used in a for range loop.

Deprecated: using IterBuffered() will get a better performence

func (ConcurrentHashMap) IterBuffered

func (m ConcurrentHashMap) IterBuffered() <-chan Tuple

Returns a buffered iterator which could be used in a for range loop. 相当于所有的值都缓存在channel中

func (ConcurrentHashMap) IterCb

func (m ConcurrentHashMap) IterCb(fn IterCb)

Callback based iterator, cheapest way to read all elements in a map. 传入一个回调函数,目前map中的每个值都会调用这个回调

func (ConcurrentHashMap) Keys

func (m ConcurrentHashMap) Keys() []string

Return all keys as []string 这个貌似不对 可能channel会阻塞,因为count结束以后可能会传入值,导致这里channel阻塞

func (ConcurrentHashMap) MSet

func (m ConcurrentHashMap) MSet(data map[string]interface{})

func (ConcurrentHashMap) MarshalJSON

func (m ConcurrentHashMap) MarshalJSON() ([]byte, error)

Reviles ConcurrentMap "private" variables to json marshal. 可以用这个持久化

func (ConcurrentHashMap) Pop

func (m ConcurrentHashMap) Pop(key string) (v interface{}, exists bool)

Removes an element from the map and returns it

func (ConcurrentHashMap) Remove

func (m ConcurrentHashMap) Remove(key string)

Removes an element from the map.

func (ConcurrentHashMap) Set

func (m ConcurrentHashMap) Set(key string, value interface{})

Sets the given value under the specified key.

func (ConcurrentHashMap) SetIfAbsent

func (m ConcurrentHashMap) SetIfAbsent(key string, value interface{}) bool

Sets the given value under the specified key if no value was associated with it.

func (ConcurrentHashMap) Upsert

func (m ConcurrentHashMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{})

Insert or Update - updates existing element or inserts a new one using UpsertCb

type ConcurrentMapShared

type ConcurrentMapShared struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

type CreateArgs

type CreateArgs struct {
	ClientID    uint64
	SeqNo       int
	InstanceSeq uint64
	FileType    int
	PathName    string // 用PathName在服务器解析出节点
	FileName    string // 然后使用FileName创建文件
}

type CreateReply

type CreateReply struct {
	Err         Err
	InstanceSeq uint64
	CheckSum    uint64
}

type Err

type Err string

type FileDescriptor

type FileDescriptor struct {
	ChuckSum    uint64
	InstanceSeq uint64
	PathName    string
}

* @brief: open每次结束以后如果成功会返回一个文件描述符,从而进行后面的操作,这样抽象就很符合人的使用逻辑 * @notes: 目前全部搞成public是为了调试方便

type FileOp

type FileOp struct {
	Op        string // 代表单个操作的字符串open,create等一众操作
	ClientID  uint64 // 每个Client的ID
	Clientseq int    // 这个ClientID上目前的操作数

	InstanceSeq uint64 // 每次请求的InstanceSeq,用于判断请求是否过期
	Token       uint64 // 锁的版本号

	LockOrFileOrDeleteType int // 锁定类型或者文件类型或者delete,反正三个不会一起用

	FileName string // 在open时是路径名,其他时候是文件名
	PathName string // 路径名称
	CheckSum uint64 // 校验位
	TimeOut  uint32 // 加锁超时参数

	// TODO 权限控制位,现在还没用,因为不确定到底该以什么形式来设置权限
	ReadAcl   *[]uint64
	WriteAcl  *[]uint64
	ModifyAcl *[]uint64
}

type FileOperation

type FileOperation struct {
	// contains filtered or unexported fields
}

func InitFileOperation

func InitFileOperation() *FileOperation

type FileSystemNode

type FileSystemNode struct {
	OpenReferenceCount uint64 // open的引用计数,主要用于文件夹,可以理解为文件的描述符没什么意义,文件有意义的是LockType
	// contains filtered or unexported fields
}

func InitRoot

func InitRoot() *FileSystemNode

* @brief: 用于初始化每个Cell内的根目录 * @return: 返回根目录的实体

func (*FileSystemNode) Acquire

func (Fsn *FileSystemNode) Acquire(InstanceSeq uint64, filename string, locktype int, checksum uint64) (uint64, error)

* @param: 文件描述符传来的InstanceSeq;要删除的文件的名字 * @return: 成功删除返回true,否则返回false * @notes: 需要检测name是否存在; 调用方先判断bool再看seq

func (*FileSystemNode) CheckToken

func (Fsn *FileSystemNode) CheckToken(token uint64, filename string) error

func (*FileSystemNode) Delete

func (Fsn *FileSystemNode) Delete(InstanceSeq uint64, filename string, opType int, checkSum uint64) error

* @param: 文件描述符传来的InstanceSeq,要删除的文件的名字 * @return: 成功删除返回true,否则返回false * @notes: 需要检测name是否存在;

func (*FileSystemNode) Insert

func (Fsn *FileSystemNode) Insert(InstanceSeq uint64, Type int, name string, ReadAcl *[]uint64, WriteAcl *[]uint64, ModifyAcl *[]uint64) (uint64, uint64, error)

* @brief: Type:创建的文件的类型;name:文件名称;后面则是文件初始的操作权限 * @return: 插入正确返回true;否则返回false * @notes: 文件被创建的时候默认打开,即引用计数为1,能够插入文件证明客户端的InstanceSeq是目录的

func (*FileSystemNode) Open

func (Fsn *FileSystemNode) Open(name string) (uint64, uint64)
  • @param: 文件描述符传来的InstanceSeq;要打开的文件名
  • @return: 返回当前文件的instanceSeq
  • @notes: 对于一个文件来说客户端open操作可以检查某个文件是否存在,如果存在会返回一个句柄,反之返回false; 对于一个目录来说open可以使其获取句柄后创建文件;

func (*FileSystemNode) Release

func (Fsn *FileSystemNode) Release(InstanceSeq uint64, filename string, Token uint64, checksum uint64) error

* @param: 文件描述符传来的InstanceSeq;要释放的文件(锁)的名字 * @return: 成功删除返回true,否则返回false * @notes: 需要检测name是否存在

type GetArgs

type GetArgs struct {
	Key      string
	ClientID uint64
	SeqNo    int
}

type GetReply

type GetReply struct {
	Err   Err
	Value string
}

type IterCb

type IterCb func(key string, v interface{})

Iterator callback,called for every key,value found in maps. RLock is held for all calls for a given shard therefore callback sess consistent view of a shard, but not across the shards

type KvOp

type KvOp struct {
	Key   string
	Value string
	Op    string // 代表单个操作的字符串Get,Put,Append等一众操作
	// 这样做就使得一个客户端一次只能执行一个操作了
	ClientID  uint64 // 每个Client的ID
	Clientseq int    // 这个ClientID上目前的操作数
}

type LatestReply

type LatestReply struct {
	Seq   int    // 最新的序列号
	Value string // 之所以get不直接从db中取是因为取时的最新值不一定是读时的最新值,我们需要一个严格有序的操作序列
}

type OpenArgs

type OpenArgs struct {
	ClientID uint64
	SeqNo    int
	PathName string
}

type OpenReply

type OpenReply struct {
	Err         Err
	ChuckSum    uint64
	InstanceSeq uint64
}

type PutAppendArgs

type PutAppendArgs struct {
	Key      string
	Value    string
	Op       string
	ClientID uint64
	SeqNo    int
}

type PutAppendReply

type PutAppendReply struct {
	Err Err // 定义了六种错误 足以说明状态转换了
}

type RaftKV

type RaftKV struct {
	LogIndexNotice map[int]chan struct{} // 用于服务器与raft层同步信息

	KvDictionary   *ChubbyGoConcurrentMap // 改为并发map以后性能提升了百分之三十左右
	ClientSeqCache map[int64]*LatestReply // 用作判断当前请求是否已经执行过

	// 以下两项均作为通知机制;注意,协商以0为无效值,这样可以避免读取时锁的使用,ClientInstanceSeq用作instance和token
	ClientInstanceSeq map[uint64]chan uint64 // 用作返回给客户端的InstanceSeq
	// 仅用于Open操作
	ClientInstanceCheckSum map[uint64]chan uint64 // 用作返回给客户端的CheckSum

	CASNotice map[uint64]chan bool // 用作CAS操作的通知,因为CAS并不是总是成功

	// 显然这个数的最大值就是1,也就是连接成功的时候,且只有两种情况,即0和1
	// 不使用bool的原因是Golang的atomic貌似没有像C++一样提供atomic_flag这样保证无锁的bool值
	// 如果硬用bool加锁的话又慢又不好写,因为raft和kvraft应该是共享这个值的
	ConnectIsok *int32 // 用于同步各服务器之间的服务的具体启动时间 且raft与kvraft应该使用同一个值
	// contains filtered or unexported fields
}

func StartKVServerInit

func StartKVServerInit(me uint64, persister *Persister.Persister, maxraftstate int, chubbygomap uint32) *RaftKV

func (*RaftKV) Acquire

func (kv *RaftKV) Acquire(args *AcquireArgs, reply *AcquireReply) error

func (*RaftKV) CheckToken

func (kv *RaftKV) CheckToken(args *CheckTokenArgs, reply *CheckTokenReply) error

func (*RaftKV) CompareAndSwap

func (kv *RaftKV) CompareAndSwap(args *CompareAndSwapArgs, reply *CompareAndSwapReply) error

* @brief: CAS操作只有在key对应的value值为数字时才有效

func (*RaftKV) Create

func (kv *RaftKV) Create(args *CreateArgs, reply *CreateReply) error

func (*RaftKV) Delete

func (kv *RaftKV) Delete(args *CloseArgs, reply *CloseReply) error

func (*RaftKV) FastGet

func (kv *RaftKV) FastGet(args *GetArgs, reply *GetReply) error

func (*RaftKV) Get

func (kv *RaftKV) Get(args *GetArgs, reply *GetReply) error

显然对于同一个客户端不允许并发执行多个操作,会发生死锁,这是我定义的使用规范

func (*RaftKV) GetRaft

func (kv *RaftKV) GetRaft() *Raft.Raft

用于server_handler.go,注册raft服务

func (*RaftKV) Open

func (kv *RaftKV) Open(args *OpenArgs, reply *OpenReply) error

其中重复代码很多不修改的原因很多地方类型都不一样,减少代码行数就需要反射,会降低可读性与性能;

func (*RaftKV) PutAppend

func (kv *RaftKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) error

func (*RaftKV) Release

func (kv *RaftKV) Release(args *ReleaseArgs, reply *ReleaseReply) error

func (*RaftKV) StartKVServer

func (kv *RaftKV) StartKVServer(servers []*rpc.Client)

func (*RaftKV) StartRaftServer

func (kv *RaftKV) StartRaftServer(address *[]string)

type ReleaseArgs

type ReleaseArgs struct {
	ClientID    uint64
	SeqNo       int
	InstanceSeq uint64
	PathName    string
	FileName    string
	Token       uint64
	CheckSum    uint64
}

type ReleaseReply

type ReleaseReply struct {
	Err Err
}

type Tuple

type Tuple struct {
	Key string
	Val interface{}
}

Used by the Iter & IterBuffered functions to wrap two variables together over a channel,

type UpsertCb

type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{}

Callback to return new element to be inserted into the map It is called while lock is held, therefore it MUST NOT try to access other keys in same map, as it can lead to deadlock since Go sync.RWLock is not reentrant 回调返回待插入到 map 中的新元素 这个函数当且仅当在读写锁被锁定的时候才会被调用,因此一定不允许再去尝试读取同一个 map 中的其他 key 值。因为这样会导致线程死锁。死锁的原因是 Go 中 sync.RWLock 是不可重入的。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL