storage

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2022 License: GPL-3.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Silent = iota
	Error
	Warn
	Info
)
View Source
const DBFileName = "0.db"
View Source
const HeaderSize = 0x100
View Source
const SyncInterval = time.Minute
View Source
const WalFileName = "0.wal"

Variables

View Source
var (
	ErrInvalidLengthProto        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowProto          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroupProto = fmt.Errorf("proto: unexpected end of group")
)
View Source
var Op_name = map[int32]string{
	0: "None",
	1: "Modify",
	2: "Del",
	3: "Discard",
}
View Source
var Op_value = map[string]int32{
	"None":    0,
	"Modify":  1,
	"Del":     2,
	"Discard": 3,
}

Functions

func GenUUID

func GenUUID() (string, error)

func IsDir

func IsDir(filename string) bool

func IsFile

func IsFile(filename string) bool

func SetLogger

func SetLogger(l Logger)

func ToAbs

func ToAbs(p string) (string, error)

Types

type BinLog

type BinLog struct {
}

func (*BinLog) AppendEntry

func (l *BinLog) AppendEntry(f File, pos int64, entry *LogEntry) (int64, error)

写失败将破坏文件数据

func (*BinLog) IsValidFile

func (l *BinLog) IsValidFile(f File) (bool, error)

TODO: 更多校验

func (*BinLog) ReadEntry

func (l *BinLog) ReadEntry(f File, pos int64, entry *LogEntry) (int64, error)

func (*BinLog) ReadHeader

func (l *BinLog) ReadHeader(f File) (*FileHeader, error)

func (*BinLog) WriteHeader

func (l *BinLog) WriteHeader(f File, header *FileHeader) error

写失败将破坏文件数据

type ChangeCount

type ChangeCount map[string]int32

func (*ChangeCount) Scan

func (n *ChangeCount) Scan(value any) error

Scan implements the Scanner interface.

func (ChangeCount) Value

func (n ChangeCount) Value() (driver.Value, error)

Value implements the driver Valuer interface.

type DBRecord

type DBRecord struct {
	Key                string      `gorm:"index;column:key"`
	Value              string      `gorm:"column:value"`
	MachineID          string      `gorm:"column:machine_id"`
	Offset             int64       `gorm:"column:offset"`
	PrevMachineID      string      `gorm:"column:prev_machine_id"`
	Seq                uint64      `gorm:"column:seq"`
	CurrentLogGid      string      `gorm:"uniqueIndex;column:gid"`
	PrevLogGid         string      `gorm:"column:prev_log_gid"`
	IsDiscarded        bool        `gorm:"column:is_discarded"`
	IsDeleted          bool        `gorm:"column:is_deleted"`
	MachineChangeCount ChangeCount `gorm:"column:change_count"`
	Num                int64       `gorm:"num"`
	PrevNum            int64       `gorm:"prev_num"`
	CreatedAt          time.Time
	UpdatedAt          time.Time
	DeletedAt          sql.NullTime `gorm:"index"`
}

is_deleted || is_discarded can be removed from storage any time

func (*DBRecord) AddChange

func (r *DBRecord) AddChange(machineID string, changes int32) map[string]int32

func (*DBRecord) Changes

func (r *DBRecord) Changes(machineID string) int32

func (*DBRecord) Visible

func (r *DBRecord) Visible() bool

type File

type File interface {
	io.ReadWriteCloser
	io.Seeker
	Flush() error
	Path() string
}

不要依赖文件内部维护的position

func OpenFile added in v0.0.3

func OpenFile(filename string, readonly bool) (File, error)

type FileHeader added in v0.0.2

type FileHeader struct {
	Id                   string   `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
	FileEnd              int64    `protobuf:"varint,2,opt,name=file_end,json=fileEnd,proto3" json:"file_end,omitempty"`
	LastEntryId          string   `protobuf:"bytes,3,opt,name=last_entry_id,json=lastEntryId,proto3" json:"last_entry_id,omitempty"`
	EntryNum             int64    `protobuf:"varint,4,opt,name=entry_num,json=entryNum,proto3" json:"entry_num,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*FileHeader) Descriptor added in v0.0.2

func (*FileHeader) Descriptor() ([]byte, []int)

func (*FileHeader) GetEntryNum added in v0.0.2

func (m *FileHeader) GetEntryNum() int64

func (*FileHeader) GetFileEnd added in v0.0.2

func (m *FileHeader) GetFileEnd() int64

func (*FileHeader) GetId added in v0.0.2

func (m *FileHeader) GetId() string

func (*FileHeader) GetLastEntryId added in v0.0.2

func (m *FileHeader) GetLastEntryId() string

func (*FileHeader) Marshal added in v0.0.2

func (m *FileHeader) Marshal() (dAtA []byte, err error)

func (*FileHeader) MarshalTo added in v0.0.2

func (m *FileHeader) MarshalTo(dAtA []byte) (int, error)

func (*FileHeader) MarshalToSizedBuffer added in v0.0.2

func (m *FileHeader) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*FileHeader) ProtoMessage added in v0.0.2

func (*FileHeader) ProtoMessage()

func (*FileHeader) Reset added in v0.0.2

func (m *FileHeader) Reset()

func (*FileHeader) Size added in v0.0.2

func (m *FileHeader) Size() (n int)

func (*FileHeader) String added in v0.0.2

func (m *FileHeader) String() string

func (*FileHeader) Unmarshal added in v0.0.2

func (m *FileHeader) Unmarshal(dAtA []byte) error

func (*FileHeader) XXX_DiscardUnknown added in v0.0.2

func (m *FileHeader) XXX_DiscardUnknown()

func (*FileHeader) XXX_Marshal added in v0.0.2

func (m *FileHeader) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*FileHeader) XXX_Merge added in v0.0.2

func (m *FileHeader) XXX_Merge(src proto.Message)

func (*FileHeader) XXX_Size added in v0.0.2

func (m *FileHeader) XXX_Size() int

func (*FileHeader) XXX_Unmarshal added in v0.0.2

func (m *FileHeader) XXX_Unmarshal(b []byte) error

type Iterator

type Iterator interface {
	Next() bool
}

type JsonLog

type JsonLog struct {
}

func (*JsonLog) AppendEntry

func (l *JsonLog) AppendEntry(f File, pos int64, entry *LogEntry) (int64, error)

写失败将破坏文件数据

func (*JsonLog) IsValidFile

func (l *JsonLog) IsValidFile(f File) (bool, error)

TODO: 更多校验

func (*JsonLog) ReadEntry

func (l *JsonLog) ReadEntry(f File, pos int64, entry *LogEntry) (int64, error)

func (*JsonLog) ReadHeader

func (l *JsonLog) ReadHeader(f File) (*FileHeader, error)

func (*JsonLog) WriteHeader

func (l *JsonLog) WriteHeader(f File, header *FileHeader) error

写失败将破坏文件数据

type LogEntry added in v0.0.2

type LogEntry struct {
	Ops                  []*LogOperation `protobuf:"bytes,1,rep,name=ops,proto3" json:"ops,omitempty"`
	XXX_NoUnkeyedLiteral struct{}        `json:"-"`
	XXX_unrecognized     []byte          `json:"-"`
	XXX_sizecache        int32           `json:"-"`
}

func (*LogEntry) Descriptor added in v0.0.2

func (*LogEntry) Descriptor() ([]byte, []int)

func (*LogEntry) GetOps added in v0.0.2

func (m *LogEntry) GetOps() []*LogOperation

func (*LogEntry) Marshal added in v0.0.2

func (m *LogEntry) Marshal() (dAtA []byte, err error)

func (*LogEntry) MarshalTo added in v0.0.2

func (m *LogEntry) MarshalTo(dAtA []byte) (int, error)

func (*LogEntry) MarshalToSizedBuffer added in v0.0.2

func (m *LogEntry) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*LogEntry) ProtoMessage added in v0.0.2

func (*LogEntry) ProtoMessage()

func (*LogEntry) Reset added in v0.0.2

func (m *LogEntry) Reset()

func (*LogEntry) Size added in v0.0.2

func (m *LogEntry) Size() (n int)

func (*LogEntry) String added in v0.0.2

func (m *LogEntry) String() string

func (*LogEntry) Unmarshal added in v0.0.2

func (m *LogEntry) Unmarshal(dAtA []byte) error

func (*LogEntry) XXX_DiscardUnknown added in v0.0.2

func (m *LogEntry) XXX_DiscardUnknown()

func (*LogEntry) XXX_Marshal added in v0.0.2

func (m *LogEntry) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*LogEntry) XXX_Merge added in v0.0.2

func (m *LogEntry) XXX_Merge(src proto.Message)

func (*LogEntry) XXX_Size added in v0.0.2

func (m *LogEntry) XXX_Size() int

func (*LogEntry) XXX_Unmarshal added in v0.0.2

func (m *LogEntry) XXX_Unmarshal(b []byte) error

type LogFormat

type LogFormat interface {
	WriteHeader(f File, header *FileHeader) error
	IsValidFile(f File) (bool, error)
	ReadHeader(f File) (*FileHeader, error)
	AppendEntry(f File, pos int64, entry *LogEntry) (int64, error)
	ReadEntry(f File, pos int64, entry *LogEntry) (int64, error)
}

type LogInput

type LogInput struct {
	// contains filtered or unexported fields
}

type LogOperation added in v0.0.2

type LogOperation struct {
	Op                   int32            `protobuf:"varint,1,opt,name=op,proto3" json:"op,omitempty"`
	Key                  string           `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"`
	Value                string           `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"`
	Gid                  string           `protobuf:"bytes,4,opt,name=gid,proto3" json:"gid,omitempty"`
	PrevGid              string           `protobuf:"bytes,5,opt,name=prev_gid,json=prevGid,proto3" json:"prev_gid,omitempty"`
	PrevValue            string           `protobuf:"bytes,6,opt,name=prev_value,json=prevValue,proto3" json:"prev_value,omitempty"`
	Seq                  uint64           `protobuf:"varint,7,opt,name=seq,proto3" json:"seq,omitempty"`
	MachineId            string           `protobuf:"bytes,8,opt,name=machine_id,json=machineId,proto3" json:"machine_id,omitempty"`
	PrevMachineId        string           `protobuf:"bytes,9,opt,name=prev_machine_id,json=prevMachineId,proto3" json:"prev_machine_id,omitempty"`
	Changes              map[string]int32 `` /* 157-byte string literal not displayed */
	Num                  int64            `protobuf:"varint,11,opt,name=num,proto3" json:"num,omitempty"`
	PrevNum              int64            `protobuf:"varint,12,opt,name=prev_num,json=prevNum,proto3" json:"prev_num,omitempty"`
	XXX_NoUnkeyedLiteral struct{}         `json:"-"`
	XXX_unrecognized     []byte           `json:"-"`
	XXX_sizecache        int32            `json:"-"`
}

func (*LogOperation) Descriptor added in v0.0.2

func (*LogOperation) Descriptor() ([]byte, []int)

func (*LogOperation) GetChanges added in v0.0.2

func (m *LogOperation) GetChanges() map[string]int32

func (*LogOperation) GetGid added in v0.0.2

func (m *LogOperation) GetGid() string

func (*LogOperation) GetKey added in v0.0.2

func (m *LogOperation) GetKey() string

func (*LogOperation) GetMachineId added in v0.0.2

func (m *LogOperation) GetMachineId() string

func (*LogOperation) GetNum added in v0.0.2

func (m *LogOperation) GetNum() int64

func (*LogOperation) GetOp added in v0.0.2

func (m *LogOperation) GetOp() int32

func (*LogOperation) GetPrevGid added in v0.0.2

func (m *LogOperation) GetPrevGid() string

func (*LogOperation) GetPrevMachineId added in v0.0.2

func (m *LogOperation) GetPrevMachineId() string

func (*LogOperation) GetPrevNum added in v0.0.2

func (m *LogOperation) GetPrevNum() int64

func (*LogOperation) GetPrevValue added in v0.0.2

func (m *LogOperation) GetPrevValue() string

func (*LogOperation) GetSeq added in v0.0.2

func (m *LogOperation) GetSeq() uint64

func (*LogOperation) GetValue added in v0.0.2

func (m *LogOperation) GetValue() string

func (*LogOperation) Marshal added in v0.0.2

func (m *LogOperation) Marshal() (dAtA []byte, err error)

func (*LogOperation) MarshalTo added in v0.0.2

func (m *LogOperation) MarshalTo(dAtA []byte) (int, error)

func (*LogOperation) MarshalToSizedBuffer added in v0.0.2

func (m *LogOperation) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*LogOperation) ProtoMessage added in v0.0.2

func (*LogOperation) ProtoMessage()

func (*LogOperation) Reset added in v0.0.2

func (m *LogOperation) Reset()

func (*LogOperation) Size added in v0.0.2

func (m *LogOperation) Size() (n int)

func (*LogOperation) String added in v0.0.2

func (m *LogOperation) String() string

func (*LogOperation) Unmarshal added in v0.0.2

func (m *LogOperation) Unmarshal(dAtA []byte) error

func (*LogOperation) XXX_DiscardUnknown added in v0.0.2

func (m *LogOperation) XXX_DiscardUnknown()

func (*LogOperation) XXX_Marshal added in v0.0.2

func (m *LogOperation) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*LogOperation) XXX_Merge added in v0.0.2

func (m *LogOperation) XXX_Merge(src proto.Message)

func (*LogOperation) XXX_Size added in v0.0.2

func (m *LogOperation) XXX_Size() int

func (*LogOperation) XXX_Unmarshal added in v0.0.2

func (m *LogOperation) XXX_Unmarshal(b []byte) error

type LogProgress

type LogProgress struct {
	Offset    int64  `gorm:"column:offset"` // HeaderSize should be used as initial value
	Num       int64  `gorm:"column:num"`
	Gid       string `gorm:"column:gid"`
	MachineID string `gorm:"uniqueIndex;column:machine_id"`
	CreatedAt time.Time
	UpdatedAt time.Time
	DeletedAt sql.NullTime `gorm:"index"`
}

call newLogProgress to make instance

type LogProgressMgr

type LogProgressMgr struct {
	// contains filtered or unexported fields
}

func (*LogProgressMgr) Get

func (m *LogProgressMgr) Get(machineID string) *LogProgress

func (*LogProgressMgr) Init

func (m *LogProgressMgr) Init(progress ...*LogProgress)

func (*LogProgressMgr) Set

func (m *LogProgressMgr) Set(machineID string, progress *LogProgress)

type LogRunner

type LogRunner struct {
	// contains filtered or unexported fields
}

func (*LogRunner) Init

func (r *LogRunner) Init(machineID string, s NodeStorage) error

func (*LogRunner) Run

func (r *LogRunner) Run(i ...*LogInput) (*RunLogResult, error)

type Logger

type Logger interface {
	SetLevel(level int) Logger
	Category(catetory string) Logger
	AddSkip(skip int) Logger
	Warn(format string, args ...interface{})
	Error(format string, args ...interface{})
	Info(format string, args ...interface{})
}

type NetworkInfo

type NetworkInfo struct {
	// contains filtered or unexported fields
}

func (*NetworkInfo) Add

func (n *NetworkInfo) Add(name string) *ParticipantInfo

func (*NetworkInfo) Init

func (n *NetworkInfo) Init(wd string) error

type NodeStorage

type NodeStorage interface {
	ReadOnlyNodeStorage
	Add(record *DBRecord) error
	Replace(old string, new *DBRecord) error
	Merge(other ReadOnlyNodeStorage) error
}

type NodeStorageImpl

type NodeStorageImpl struct {
	// contains filtered or unexported fields
}

func (*NodeStorageImpl) Add

func (n *NodeStorageImpl) Add(record *DBRecord) error

func (*NodeStorageImpl) AllNodes

func (n *NodeStorageImpl) AllNodes() ([]*DBRecord, error)

func (*NodeStorageImpl) GetByGid

func (n *NodeStorageImpl) GetByGid(gid string) (*DBRecord, error)

func (*NodeStorageImpl) GetByKey

func (n *NodeStorageImpl) GetByKey(key string) ([]*DBRecord, error)

func (*NodeStorageImpl) Init

func (n *NodeStorageImpl) Init()

func (*NodeStorageImpl) Merge

func (n *NodeStorageImpl) Merge(other ReadOnlyNodeStorage) error

func (*NodeStorageImpl) Replace

func (n *NodeStorageImpl) Replace(old string, new *DBRecord) error

type Op added in v0.0.2

type Op int32
const (
	Op_None    Op = 0
	Op_Modify  Op = 1
	Op_Del     Op = 2
	Op_Discard Op = 3
)

func (Op) EnumDescriptor added in v0.0.2

func (Op) EnumDescriptor() ([]byte, []int)

func (Op) String added in v0.0.2

func (x Op) String() string

type Participant

type Participant struct {
	// contains filtered or unexported fields
}

Participant ...

func (*Participant) Accept

func (p *Participant) Accept(v *Value, seq int) error

func (*Participant) All

func (p *Participant) All() ([]*Value, error)

func (*Participant) AllConflicts

func (p *Participant) AllConflicts() ([]*Value, error)

func (*Participant) Close

func (p *Participant) Close()

func (*Participant) Del

func (p *Participant) Del(key string) error

func (*Participant) Has

func (p *Participant) Has(key string) (bool, error)

func (*Participant) Init

func (p *Participant) Init(wd string, machineID string) (err error)

func (*Participant) Load

func (p *Participant) Load(key string) (*Value, error)

func (*Participant) Save

func (p *Participant) Save(key string, value string) error

type ParticipantInfo

type ParticipantInfo struct {
	// contains filtered or unexported fields
}

func (*ParticipantInfo) Init

func (p *ParticipantInfo) Init(wd string, name string, n *NetworkInfo)

type ReadOnlyNodeStorage

type ReadOnlyNodeStorage interface {
	GetByKey(key string) ([]*DBRecord, error)
	GetByGid(gid string) (*DBRecord, error)
	AllNodes() ([]*DBRecord, error)
}

type RunLogContext

type RunLogContext struct {
	// contains filtered or unexported fields
}

func (*RunLogContext) Init

func (c *RunLogContext) Init(i ...*LogInput)

func (*RunLogContext) Progress

func (c *RunLogContext) Progress(machineID string) *LogProgress

type RunLogError

type RunLogError struct {
	// contains filtered or unexported fields
}

func (*RunLogError) Error

func (e *RunLogError) Error() string

type RunLogResult

type RunLogResult struct {
	// contains filtered or unexported fields
}

func (*RunLogResult) Error

func (r *RunLogResult) Error() error

func (*RunLogResult) Init

func (r *RunLogResult) Init(s map[string]*LogProgress, e *RunLogError)

func (*RunLogResult) Process

func (r *RunLogResult) Process(machineID string) *LogProgress

type RunLogWorker

type RunLogWorker struct {
	// contains filtered or unexported fields
}

type SqliteAdapter

type SqliteAdapter struct {
	// contains filtered or unexported fields
}

SqliteAdapter ...

func (*SqliteAdapter) Add

func (s *SqliteAdapter) Add(record *DBRecord) error

func (*SqliteAdapter) AllNodes

func (s *SqliteAdapter) AllNodes() ([]*DBRecord, error)

func (*SqliteAdapter) Close

func (s *SqliteAdapter) Close() error

func (*SqliteAdapter) GetByGid

func (s *SqliteAdapter) GetByGid(gid string) (*DBRecord, error)

func (*SqliteAdapter) GetByKey

func (s *SqliteAdapter) GetByKey(key string) ([]*DBRecord, error)

func (*SqliteAdapter) Has

func (s *SqliteAdapter) Has(gid string) (bool, error)

func (*SqliteAdapter) Init

func (s *SqliteAdapter) Init(dbFile string) error

func (*SqliteAdapter) Merge

func (s *SqliteAdapter) Merge(other ReadOnlyNodeStorage) error

func (*SqliteAdapter) Processes

func (s *SqliteAdapter) Processes() ([]*LogProgress, error)

func (*SqliteAdapter) Replace

func (s *SqliteAdapter) Replace(old string, new *DBRecord) error

func (*SqliteAdapter) Transaction

func (s *SqliteAdapter) Transaction(f func(s *SqliteAdapter) error) error

type Storage

type Storage interface {
	Save(key string, value string) error
	Del(key string) error
	Has(key string) (bool, error)
	Load(key string) (val *Value, err error)
	All() ([]*Value, error)
	Accept(v *Value, seq int) error
}

Storage ...

type Value

type Value struct {
	// contains filtered or unexported fields
}

func (*Value) Branches

func (v *Value) Branches() []*ValueVersion

func (*Value) Main

func (v *Value) Main() *ValueVersion

func (*Value) String

func (v *Value) String() string

func (*Value) ValidSeq

func (v *Value) ValidSeq(seq int) bool

func (*Value) Versions

func (v *Value) Versions() []*ValueVersion

type ValueVersion

type ValueVersion struct {
	// contains filtered or unexported fields
}

func (*ValueVersion) String

func (v *ValueVersion) String() string

type Wal

type Wal struct {
	// contains filtered or unexported fields
}

func (*Wal) Append

func (w *Wal) Append(logOp ...*LogOperation) (string, int64, error)

Append multiple operations will be appended as a single log entry returns the gid of the last operation generate and populate .Num, .Gid for each operation

func (*Wal) AppendRaw

func (w *Wal) AppendRaw(logOp ...*LogOperation) error

func (*Wal) Close

func (w *Wal) Close() error

func (*Wal) Flush

func (w *Wal) Flush() error

func (*Wal) Init

func (w *Wal) Init(filename string, l LogFormat, readonly bool) (err error)

func (*Wal) Iterator

func (w *Wal) Iterator() *WalIterator

func (*Wal) IteratorFrom

func (w *Wal) IteratorFrom(start string, inclusive bool) (*WalIterator, error)

func (*Wal) IteratorOffset

func (w *Wal) IteratorOffset(offset int64) *WalIterator

func (*Wal) Offset

func (w *Wal) Offset() int64

func (*Wal) RangeIterator

func (w *Wal) RangeIterator(start string, end string, includeStart bool, includeEnd bool) (*WalIterator, error)

type WalHelper

type WalHelper struct {
	// contains filtered or unexported fields
}

func (*WalHelper) Append

func (w *WalHelper) Append(logOp ...*LogOperation) (string, int64, error)

func (*WalHelper) Close

func (w *WalHelper) Close()

func (*WalHelper) Init

func (w *WalHelper) Init(filename string, l LogFormat, writeCount int)

func (*WalHelper) Offset

func (w *WalHelper) Offset() (int64, error)

type WalIterator

type WalIterator struct {
	// contains filtered or unexported fields
}

func (*WalIterator) Init

func (i *WalIterator) Init(w *Wal)

func (*WalIterator) InitWithOffset

func (i *WalIterator) InitWithOffset(w *Wal, offset int64)

func (*WalIterator) LogOp

func (i *WalIterator) LogOp() *LogOperation

func (*WalIterator) Next

func (i *WalIterator) Next() (hasNext bool)

func (*WalIterator) Offset

func (i *WalIterator) Offset() int64

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL