storage

package
v0.0.0-...-a6492d2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 20, 2022 License: Apache-2.0 Imports: 13 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	NoLease = &Lease{
		ID: clientv3.NoLease,
	}
)

Functions

func NewBoltdb

func NewBoltdb(dir string, service string) (*boltdb, error)

func NewEtcddb

func NewEtcddb(service string, containerId string, client etcdutil.EtcdWrapper) (*etcddb, error)

Types

type Lease

type Lease struct {
	// ID rb在bridge和guard两个阶段grant的lease存储在这里,
	// 没有采用存在BridgeLeaseID和GuardLeaseID两个属性的设计,这种设计会导致逻辑使用ID的时候要有选择,没有侧重,随时需要了解所在rb的子阶段
	ID clientv3.LeaseID `json:"id"`

	// Expire 过期时间点,ID本身不是单调递增,LeaseID = memberID + timestamp + counter,如果etcd集群有变动,leaseID可能会减小,
	// leaseID 设计目的是全局唯一即可。
	// 1 leaseID的ttl在sm中如果要缩短,需要停集群超过历史最大leaseID ttl,回收所有历史leaseID,增大leaseID的ttl没有问题
	// 2 带有更新时间点的leaseID,会导致shard被放弃掉
	Expire int64 `json:"expire"`
}

func (*Lease) EqualTo

func (l *Lease) EqualTo(lease *Lease) bool

func (*Lease) IsExpired

func (l *Lease) IsExpired() bool

func (*Lease) String

func (l *Lease) String() string

type MockedStorage

type MockedStorage struct {
	mock.Mock
}

func (*MockedStorage) Add

func (m *MockedStorage) Add(shard *ShardSpec) error

func (*MockedStorage) Clear

func (m *MockedStorage) Clear() error

func (*MockedStorage) Close

func (m *MockedStorage) Close() error

func (*MockedStorage) Delete

func (m *MockedStorage) Delete(k []byte) error

func (*MockedStorage) Drop

func (m *MockedStorage) Drop(ids []string) error

func (*MockedStorage) DropByLease

func (m *MockedStorage) DropByLease(exclude bool, leaseID clientv3.LeaseID) error

func (*MockedStorage) ForEach

func (m *MockedStorage) ForEach(visitor func(shardID string, dv *ShardKeeperDbValue) error) error

func (*MockedStorage) Get

func (m *MockedStorage) Get(k []byte) ([]byte, error)

func (*MockedStorage) MigrateLease

func (m *MockedStorage) MigrateLease(from, to clientv3.LeaseID) error

func (*MockedStorage) Put

func (m *MockedStorage) Put(shardID string, dv *ShardKeeperDbValue) error

func (*MockedStorage) Remove

func (m *MockedStorage) Remove(shardID string) error

func (*MockedStorage) Reset

func (m *MockedStorage) Reset() error

func (*MockedStorage) Update

func (m *MockedStorage) Update(k, v []byte) error

type ShardKeeperDbValue

type ShardKeeperDbValue struct {
	// Spec 分片基础信息
	Spec *ShardSpec `json:"spec"`

	// Disp 标记是否已经下发成功
	Disp bool `json:"disp"`

	// Drop 软删除,在异步协程中清理
	Drop bool `json:"drop"`
}

ShardKeeperDbValue 存储分片数据和管理信息

func (*ShardKeeperDbValue) NeedDrop

func (dv *ShardKeeperDbValue) NeedDrop(exclude bool, leaseID clientv3.LeaseID) bool

func (*ShardKeeperDbValue) SoftMigrate

func (dv *ShardKeeperDbValue) SoftMigrate(from, to clientv3.LeaseID) bool

func (*ShardKeeperDbValue) String

func (dv *ShardKeeperDbValue) String() string

type ShardSpec

type ShardSpec struct {
	// Id 方法传递的时候可以内容可以自识别,否则,添加分片相关的方法的生命一般是下面的样子:
	// newShard(id string, spec *apputil.ShardSpec)
	Id string `json:"id"`

	// Service 标记自己所在服务,不需要去etcd路径中解析,增加spec的描述性质
	Service string `json:"service"`

	// Task service管理的分片任务内容
	Task string `json:"task"`

	UpdateTime int64 `json:"updateTime"`

	// 通过api可以给shard主动分配到某个container
	ManualContainerId string `json:"manualContainerId"`

	// Group 同一个service需要区分不同种类的shard,
	// 这些shard之间不相关的balance到现有container上
	Group string `json:"group"`

	// WorkerGroup shard只能分配到属于WorkerGroup的container上
	WorkerGroup string `json:"workerGroup"`

	// Lease Add时带上guard lease,存储时可能存bridge和guard
	Lease *Lease `json:"lease"`
}

func (*ShardSpec) String

func (ss *ShardSpec) String() string

func (*ShardSpec) Validate

func (ss *ShardSpec) Validate() error

type Storage

type Storage interface {
	Close() error

	// Add
	// 添加分片
	Add(shard *ShardSpec) error

	// Drop
	// bridge阶段,drop掉本地的命中shard
	Drop(ids []string) error

	// ForEach
	// 遍历所有shard
	ForEach(visitor func(shardID string, dv *ShardKeeperDbValue) error) error

	// MigrateLease
	// 1 从旧的guard lease迁移到bridge lease
	// 2 从bridge lease迁移到新的guard lease
	MigrateLease(from, to clientv3.LeaseID) error

	// DropByLease
	// bridge阶段应对批量删除场景
	DropByLease(exclude bool, leaseID clientv3.LeaseID) error

	// Reset
	// 标记所有shard待下发,shardkeeper启动时使用
	Reset() error

	// Put
	// 区分于Update,允许包含特定于实现的逻辑
	Put(shardID string, dv *ShardKeeperDbValue) error

	// Remove
	// 区分于Delete,语序包含特定于实现的逻辑
	Remove(shardID string) error

	// Update
	// for unittest
	Update(k, v []byte) error
	// Delete
	// for unittest
	Delete(k []byte) error
	// Get
	// for unittest
	Get(k []byte) ([]byte, error)
	// Clear
	// for unittest
	Clear() error
}

Storage bolt引入的初衷是让shard提交给app和sm的shard分配解耦: 1 尽可能提高shard rb的稳定性(成功率) 2 提交给app这个环节,具备补偿能力,sync goroutine不断尝试能够让app感知明确(错误日志/报警)

type StorageType

type StorageType int
const (
	Boltdb StorageType = iota
	Etcd
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL