Documentation ¶
Index ¶
- Constants
- Variables
- func IntervalDisPlay()
- func RecursionDisplay(Fsn *FileSystemNode)
- type AcquireArgs
- type AcquireReply
- type BaseMap
- type CASOp
- type CheckTokenArgs
- type CheckTokenReply
- type ChubbyGoConcurrentMap
- type ChubbyGoFileSystemError
- type Clerk
- func (ck *Clerk) Acquire(pathname string, filename string, instanceseq uint64, LockType int, ...) (bool, uint64)
- func (ck *Clerk) Append(key string, value string)
- func (ck *Clerk) CheckToken(pathname string, filename string, token uint64) bool
- func (ck *Clerk) CompareAndSwap(Key string, Old int, New int, Flag int) bool
- func (ck *Clerk) Create(fd *FileDescriptor, Type int, filename string) (bool, *FileDescriptor)
- func (ck *Clerk) Delete(pathname string, filename string, instanceseq uint64, opType int, ...) bool
- func (ck *Clerk) FastGet(key string) string
- func (ck *Clerk) Get(key string) string
- func (ck *Clerk) Open(pathname string) (bool, *FileDescriptor)
- func (ck *Clerk) Put(key string, value string)
- func (ck *Clerk) PutAppend(key string, value string, op string)
- func (ck *Clerk) Release(pathname string, filename string, instanceseq uint64, token uint64, ...) bool
- type CloseArgs
- type CloseReply
- type CompareAndSwapArgs
- type CompareAndSwapReply
- type ConcurrentHashMap
- func (m ConcurrentHashMap) Count() uint64
- func (m ConcurrentHashMap) Get(key string) (interface{}, bool)
- func (m ConcurrentHashMap) GetShard(key string) *ConcurrentMapShared
- func (m ConcurrentHashMap) Has(key string) bool
- func (m ConcurrentHashMap) IsEmpty() bool
- func (m ConcurrentHashMap) Items() map[string]interface{}
- func (m ConcurrentHashMap) Iter() <-chan Tupledeprecated
- func (m ConcurrentHashMap) IterBuffered() <-chan Tuple
- func (m ConcurrentHashMap) IterCb(fn IterCb)
- func (m ConcurrentHashMap) Keys() []string
- func (m ConcurrentHashMap) MSet(data map[string]interface{})
- func (m ConcurrentHashMap) MarshalJSON() ([]byte, error)
- func (m ConcurrentHashMap) Pop(key string) (v interface{}, exists bool)
- func (m ConcurrentHashMap) Remove(key string)
- func (m ConcurrentHashMap) Set(key string, value interface{})
- func (m ConcurrentHashMap) SetIfAbsent(key string, value interface{}) bool
- func (m ConcurrentHashMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{})
- type ConcurrentMapShared
- type CreateArgs
- type CreateReply
- type Err
- type FileDescriptor
- type FileOp
- type FileOperation
- type FileSystemNode
- func (Fsn *FileSystemNode) Acquire(InstanceSeq uint64, filename string, locktype int, checksum uint64) (uint64, error)
- func (Fsn *FileSystemNode) CheckToken(token uint64, filename string) error
- func (Fsn *FileSystemNode) Delete(InstanceSeq uint64, filename string, opType int, checkSum uint64) error
- func (Fsn *FileSystemNode) Insert(InstanceSeq uint64, Type int, name string, ReadAcl *[]uint64, ...) (uint64, uint64, error)
- func (Fsn *FileSystemNode) Open(name string) (uint64, uint64)
- func (Fsn *FileSystemNode) Release(InstanceSeq uint64, filename string, Token uint64, checksum uint64) error
- type GetArgs
- type GetReply
- type IterCb
- type KvOp
- type LatestReply
- type OpenArgs
- type OpenReply
- type PutAppendArgs
- type PutAppendReply
- type RaftKV
- func (kv *RaftKV) Acquire(args *AcquireArgs, reply *AcquireReply) error
- func (kv *RaftKV) CheckToken(args *CheckTokenArgs, reply *CheckTokenReply) error
- func (kv *RaftKV) CompareAndSwap(args *CompareAndSwapArgs, reply *CompareAndSwapReply) error
- func (kv *RaftKV) Create(args *CreateArgs, reply *CreateReply) error
- func (kv *RaftKV) Delete(args *CloseArgs, reply *CloseReply) error
- func (kv *RaftKV) FastGet(args *GetArgs, reply *GetReply) error
- func (kv *RaftKV) Get(args *GetArgs, reply *GetReply) error
- func (kv *RaftKV) GetRaft() *Raft.Raft
- func (kv *RaftKV) Open(args *OpenArgs, reply *OpenReply) error
- func (kv *RaftKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) error
- func (kv *RaftKV) Release(args *ReleaseArgs, reply *ReleaseReply) error
- func (kv *RaftKV) StartKVServer(servers []*rpc.Client)
- func (kv *RaftKV) StartRaftServer(address *[]string)
- type ReleaseArgs
- type ReleaseReply
- type Tuple
- type UpsertCb
Constants ¶
const ( SyncMap = iota ConcurrentMap )
const ( OK = "OK" ErrNoKey = "ErrNoKey" NoLeader = "NoLeader" Duplicate = "Duplicate" ReElection = "ReElection" ConnectError = "ConnectError" // 最特殊的一项 用于判断对端服务器是否连接成功 如果对端还未与全部服务器连接成功 则什么也不干 OpenError = "OpenError" DeleteError = "DeleteError" CreateError = "CreateError" AcquireError = "AcquireError" ReleaseError = "ReleaseError" CheckTokenError = "CheckTokenError" ValueCannotBeConvertedToNumber = "ValueCannotBeConvertedToNumber" CurrentValueExceedsExpectedValue = "CurrentValueExceedsExpectedValue" CASFlagUndefined = "CASFlagUndefined" CASFailture = "CASFailture" )
TODO 这里有机会全部改成数字,因为每次判断的时候都需要比较,效率较差
const ( Opdelete = iota // 当引用计数为零时删除永久文件/目录 Opclose // 当引用计数为零时不删除永久文件/目录 )
用于Delete RPC
const ( NoticeErrorValue = 0 NoticeSucess = 1 )
const ( Cas = 1 Add = 2 Sub = 4 )
const ( Directory = iota TemporaryFile // 瞬时节点会在没有客户端打开它时自动删除 PermanentFile )
const ( NotLock = iota ReadLock WriteLock )
const ( PathError = iota CheckSumError InstanceSeqError DoesNotSupportRecursiveDeletion CannotDeleteFilesWithZeroReferenceCount LockTypeError ReleaseBeforeAcquire OnlyDirectoriesCanCreateFiles TokenSeqError FileNameError )
Variables ¶
var RootFileOperation = InitFileOperation()
如果把这个放在Kvraft中fileSystem操作时会非常麻烦
var SHARD_COUNT = 32
TODO 桶的数量应该可以自己配置,因为xxhash的性能很优秀,所以越大的数据越适合更多的桶
Functions ¶
Types ¶
type AcquireArgs ¶
type AcquireReply ¶
type BaseMap ¶
* @brief: 用于与一般ChubbyGoMap作比较
func NewBaseMap ¶
func NewBaseMap() *BaseMap
func (*BaseMap) BaseDelete ¶
type CASOp ¶
type CASOp struct { ClientID uint64 // 每个Client的ID Clientseq int // 这个ClientID上目前的操作数 Key string Old string // 思前想去old,new这里还是搞成string比较好,省的在守护线程里面再转换一次,减少临界区的开销 New string Interval int // contains filtered or unexported fields }
用于CAS
type CheckTokenArgs ¶
type CheckTokenReply ¶
type CheckTokenReply struct {
Err Err
}
type ChubbyGoConcurrentMap ¶
func NewChubbyGoMap ¶
func NewChubbyGoMap(flag uint32) *ChubbyGoConcurrentMap
func (*ChubbyGoConcurrentMap) ChubbyGoMapDelete ¶
func (hs *ChubbyGoConcurrentMap) ChubbyGoMapDelete(key string)
func (*ChubbyGoConcurrentMap) ChubbyGoMapGet ¶
func (hs *ChubbyGoConcurrentMap) ChubbyGoMapGet(key string) (string, bool)
func (*ChubbyGoConcurrentMap) ChubbyGoMapSet ¶
func (hs *ChubbyGoConcurrentMap) ChubbyGoMapSet(key string, value string)
type ChubbyGoFileSystemError ¶
type ChubbyGoFileSystemError int8
func (ChubbyGoFileSystemError) Error ¶
func (err ChubbyGoFileSystemError) Error() string
type Clerk ¶
type Clerk struct { ClientID uint64 // 记录当前客户端的序号 // contains filtered or unexported fields }
func (*Clerk) Acquire ¶
func (ck *Clerk) Acquire(pathname string, filename string, instanceseq uint64, LockType int, checksum uint64, timeout uint32) (bool, uint64)
* @brief: 对Fd目录下的Filename进行加锁,可以加读锁或者写锁,加锁不需要open * @param: 实例号和路径名来源于文件描述符;文件类型;文件名称 * @return: 返回加锁是否成功;
func (*Clerk) CheckToken ¶
* @brief: 附带着目前持有的token,检测这个token是否还有效 * @notes: 问题的关键在于检查返回无效,其实在发出数据的一刻是有效的,但是不影响正确性
func (*Clerk) CompareAndSwap ¶
* @brief: 提供一个功能更加丰富的CAS操作,见文件顶. * @notes: 操作要求key对应的值必须是数字,且现在其实没有做什么保护机制,也就是CAS操作的对象可能随时被修改成不是数字的值,这样就没办法了。 * TODO 这是后面权限的一个考虑点。
func (*Clerk) Create ¶
func (ck *Clerk) Create(fd *FileDescriptor, Type int, filename string) (bool, *FileDescriptor)
* @brief: 在此文件描述符下创建一个文件,有三种类型目录,临时文件,文件 * @param: 实例号和路径名来源于文件描述符;文件类型;文件名称 * @return: 返回创建文件是否成功 * @notes: 对于返回值要先判断bool值再判断seq,bool为falseseq是没有意义的
func (*Clerk) Delete ¶
func (ck *Clerk) Delete(pathname string, filename string, instanceseq uint64, opType int, checkSum uint64) bool
* @param: opType为操作类型,可以为delete或者close
type CloseReply ¶
type CloseReply struct {
Err Err
}
type CompareAndSwapArgs ¶
type CompareAndSwapReply ¶
type CompareAndSwapReply struct {
Err Err
}
type ConcurrentHashMap ¶
type ConcurrentHashMap struct { ThreadSafeHashMap []*ConcurrentMapShared NowKeyNumber uint64 BucketNumber uint32 }
func (ConcurrentHashMap) Count ¶
func (m ConcurrentHashMap) Count() uint64
Returns the number of elements within the map. 返回目前map中的数据总量
func (ConcurrentHashMap) Get ¶
func (m ConcurrentHashMap) Get(key string) (interface{}, bool)
Retrieves an element from map under given key.
func (ConcurrentHashMap) GetShard ¶
func (m ConcurrentHashMap) GetShard(key string) *ConcurrentMapShared
* @brief: 利用key得到哈希表内的桶
func (ConcurrentHashMap) Has ¶
func (m ConcurrentHashMap) Has(key string) bool
Looks up an item under specified key
func (ConcurrentHashMap) IsEmpty ¶
func (m ConcurrentHashMap) IsEmpty() bool
Checks if map is empty.
func (ConcurrentHashMap) Items ¶
func (m ConcurrentHashMap) Items() map[string]interface{}
Returns all items as map[string]interface{}
func (ConcurrentHashMap) Iter
deprecated
func (m ConcurrentHashMap) Iter() <-chan Tuple
Returns an iterator which could be used in a for range loop.
Deprecated: using IterBuffered() will get a better performence
func (ConcurrentHashMap) IterBuffered ¶
func (m ConcurrentHashMap) IterBuffered() <-chan Tuple
Returns a buffered iterator which could be used in a for range loop. 相当于所有的值都缓存在channel中
func (ConcurrentHashMap) IterCb ¶
func (m ConcurrentHashMap) IterCb(fn IterCb)
Callback based iterator, cheapest way to read all elements in a map. 传入一个回调函数,目前map中的每个值都会调用这个回调
func (ConcurrentHashMap) Keys ¶
func (m ConcurrentHashMap) Keys() []string
Return all keys as []string 这个貌似不对 可能channel会阻塞,因为count结束以后可能会传入值,导致这里channel阻塞
func (ConcurrentHashMap) MSet ¶
func (m ConcurrentHashMap) MSet(data map[string]interface{})
func (ConcurrentHashMap) MarshalJSON ¶
func (m ConcurrentHashMap) MarshalJSON() ([]byte, error)
Reviles ConcurrentMap "private" variables to json marshal. 可以用这个持久化
func (ConcurrentHashMap) Pop ¶
func (m ConcurrentHashMap) Pop(key string) (v interface{}, exists bool)
Removes an element from the map and returns it
func (ConcurrentHashMap) Remove ¶
func (m ConcurrentHashMap) Remove(key string)
Removes an element from the map.
func (ConcurrentHashMap) Set ¶
func (m ConcurrentHashMap) Set(key string, value interface{})
Sets the given value under the specified key.
func (ConcurrentHashMap) SetIfAbsent ¶
func (m ConcurrentHashMap) SetIfAbsent(key string, value interface{}) bool
Sets the given value under the specified key if no value was associated with it.
func (ConcurrentHashMap) Upsert ¶
func (m ConcurrentHashMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{})
Insert or Update - updates existing element or inserts a new one using UpsertCb
type ConcurrentMapShared ¶
type ConcurrentMapShared struct { // contains filtered or unexported fields }
type CreateArgs ¶
type CreateReply ¶
type FileDescriptor ¶
* @brief: open每次结束以后如果成功会返回一个文件描述符,从而进行后面的操作,这样抽象就很符合人的使用逻辑 * @notes: 目前全部搞成public是为了调试方便
type FileOp ¶
type FileOp struct { Op string // 代表单个操作的字符串open,create等一众操作 ClientID uint64 // 每个Client的ID Clientseq int // 这个ClientID上目前的操作数 InstanceSeq uint64 // 每次请求的InstanceSeq,用于判断请求是否过期 Token uint64 // 锁的版本号 LockOrFileOrDeleteType int // 锁定类型或者文件类型或者delete,反正三个不会一起用 FileName string // 在open时是路径名,其他时候是文件名 PathName string // 路径名称 CheckSum uint64 // 校验位 TimeOut uint32 // 加锁超时参数 // TODO 权限控制位,现在还没用,因为不确定到底该以什么形式来设置权限 ReadAcl *[]uint64 WriteAcl *[]uint64 ModifyAcl *[]uint64 }
type FileOperation ¶
type FileOperation struct {
// contains filtered or unexported fields
}
func InitFileOperation ¶
func InitFileOperation() *FileOperation
type FileSystemNode ¶
type FileSystemNode struct { OpenReferenceCount uint64 // open的引用计数,主要用于文件夹,可以理解为文件的描述符没什么意义,文件有意义的是LockType // contains filtered or unexported fields }
func (*FileSystemNode) Acquire ¶
func (Fsn *FileSystemNode) Acquire(InstanceSeq uint64, filename string, locktype int, checksum uint64) (uint64, error)
* @param: 文件描述符传来的InstanceSeq;要删除的文件的名字 * @return: 成功删除返回true,否则返回false * @notes: 需要检测name是否存在; 调用方先判断bool再看seq
func (*FileSystemNode) CheckToken ¶
func (Fsn *FileSystemNode) CheckToken(token uint64, filename string) error
func (*FileSystemNode) Delete ¶
func (Fsn *FileSystemNode) Delete(InstanceSeq uint64, filename string, opType int, checkSum uint64) error
* @param: 文件描述符传来的InstanceSeq,要删除的文件的名字 * @return: 成功删除返回true,否则返回false * @notes: 需要检测name是否存在;
func (*FileSystemNode) Insert ¶
func (Fsn *FileSystemNode) Insert(InstanceSeq uint64, Type int, name string, ReadAcl *[]uint64, WriteAcl *[]uint64, ModifyAcl *[]uint64) (uint64, uint64, error)
* @brief: Type:创建的文件的类型;name:文件名称;后面则是文件初始的操作权限 * @return: 插入正确返回true;否则返回false * @notes: 文件被创建的时候默认打开,即引用计数为1,能够插入文件证明客户端的InstanceSeq是目录的
type IterCb ¶
type IterCb func(key string, v interface{})
Iterator callback,called for every key,value found in maps. RLock is held for all calls for a given shard therefore callback sess consistent view of a shard, but not across the shards
type LatestReply ¶
type PutAppendArgs ¶
type PutAppendReply ¶
type PutAppendReply struct {
Err Err // 定义了六种错误 足以说明状态转换了
}
type RaftKV ¶
type RaftKV struct { LogIndexNotice map[int]chan struct{} // 用于服务器与raft层同步信息 KvDictionary *ChubbyGoConcurrentMap // 改为并发map以后性能提升了百分之三十左右 ClientSeqCache map[int64]*LatestReply // 用作判断当前请求是否已经执行过 // 以下两项均作为通知机制;注意,协商以0为无效值,这样可以避免读取时锁的使用,ClientInstanceSeq用作instance和token ClientInstanceSeq map[uint64]chan uint64 // 用作返回给客户端的InstanceSeq // 仅用于Open操作 ClientInstanceCheckSum map[uint64]chan uint64 // 用作返回给客户端的CheckSum CASNotice map[uint64]chan bool // 用作CAS操作的通知,因为CAS并不是总是成功 // 显然这个数的最大值就是1,也就是连接成功的时候,且只有两种情况,即0和1 // 不使用bool的原因是Golang的atomic貌似没有像C++一样提供atomic_flag这样保证无锁的bool值 // 如果硬用bool加锁的话又慢又不好写,因为raft和kvraft应该是共享这个值的 ConnectIsok *int32 // 用于同步各服务器之间的服务的具体启动时间 且raft与kvraft应该使用同一个值 // contains filtered or unexported fields }
func StartKVServerInit ¶
func (*RaftKV) Acquire ¶
func (kv *RaftKV) Acquire(args *AcquireArgs, reply *AcquireReply) error
func (*RaftKV) CheckToken ¶
func (kv *RaftKV) CheckToken(args *CheckTokenArgs, reply *CheckTokenReply) error
func (*RaftKV) CompareAndSwap ¶
func (kv *RaftKV) CompareAndSwap(args *CompareAndSwapArgs, reply *CompareAndSwapReply) error
* @brief: CAS操作只有在key对应的value值为数字时才有效
func (*RaftKV) Create ¶
func (kv *RaftKV) Create(args *CreateArgs, reply *CreateReply) error
func (*RaftKV) PutAppend ¶
func (kv *RaftKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) error
func (*RaftKV) Release ¶
func (kv *RaftKV) Release(args *ReleaseArgs, reply *ReleaseReply) error
func (*RaftKV) StartKVServer ¶
func (*RaftKV) StartRaftServer ¶
type ReleaseArgs ¶
type ReleaseReply ¶
type ReleaseReply struct {
Err Err
}
type Tuple ¶
type Tuple struct { Key string Val interface{} }
Used by the Iter & IterBuffered functions to wrap two variables together over a channel,
type UpsertCb ¶
type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{}
Callback to return new element to be inserted into the map It is called while lock is held, therefore it MUST NOT try to access other keys in same map, as it can lead to deadlock since Go sync.RWLock is not reentrant 回调返回待插入到 map 中的新元素 这个函数当且仅当在读写锁被锁定的时候才会被调用,因此一定不允许再去尝试读取同一个 map 中的其他 key 值。因为这样会导致线程死锁。死锁的原因是 Go 中 sync.RWLock 是不可重入的。