Documentation ¶
Index ¶
- Constants
- Variables
- func GenerateNewBufFName(now time.Time, oldFName string) (string, error)
- func OpenBufFile(filepath string, preallocateBytes int64) (fp *os.File, err error)
- func PrepareDir(path string) error
- func PrepareNewBufFile(dirPath string, oldFsStat *bufFileStat, isScan, isGz bool, sizeBytes int64) (fsStat *bufFileStat, err error)
- type BaseSerializer
- type Data
- type DataDecoder
- type DataEncoder
- type IdsDecoder
- type IdsEncoder
- type Int64Set
- type Int64SetItf
- type Int64SetWithTTL
- type Journal
- func (j *Journal) Close()
- func (j *Journal) Flush() (err error)
- func (j *Journal) GetMetric() map[string]interface{}
- func (j *Journal) IsLegacyRunning() bool
- func (j *Journal) LoadLegacyBuf(data *Data) (err error)
- func (j *Journal) LoadMaxId() (int64, error)
- func (j *Journal) LockLegacy() bool
- func (j *Journal) Rotate(ctx context.Context) (err error)
- func (j *Journal) Start(ctx context.Context) (err error)
- func (j *Journal) UnLockLegacy() bool
- func (j *Journal) WriteData(data *Data) (err error)
- func (j *Journal) WriteId(id int64) error
- type LegacyLoader
- func (l *LegacyLoader) AddID(id int64)
- func (l *LegacyLoader) CheckAndRemove(id int64) bool
- func (l *LegacyLoader) Clean() error
- func (l *LegacyLoader) GetIdsLen() int
- func (l *LegacyLoader) Load(data *Data) (err error)
- func (l *LegacyLoader) LoadAllids(ids Int64SetItf) (err error)
- func (l *LegacyLoader) LoadMaxId() (maxId int64, err error)
- func (l *LegacyLoader) Reset(dataFNames, idsFNames []string)
- type OptionFunc
- func WithBufDirPath(path string) OptionFunc
- func WithBufSizeByte(bufSize int64) OptionFunc
- func WithCommitIDTTL(d time.Duration) OptionFunc
- func WithFlushInterval(d time.Duration) OptionFunc
- func WithIsAggresiveGC(is bool) OptionFunc
- func WithIsCompress(is bool) OptionFunc
- func WithLogger(logger *utils.LoggerType) OptionFunc
- func WithName(name string) OptionFunc
- func WithRotateCheckInterval(d time.Duration) OptionFunc
- func WithRotateDuration(d time.Duration) OptionFunc
- type Uint32Set
Examples ¶
Constants ¶
Variables ¶
var ( // ErrDuringRotate rotate error ErrDuringRotate = fmt.Errorf("during rotating") )
var Logger *utils.LoggerType
Functions ¶
func GenerateNewBufFName ¶
GenerateNewBufFName return new buf file name depends on current time file name looks like `yyyymmddnnnn.ids`, nnnn begin from 0001 for each day
func OpenBufFile ¶
OpenBufFile create and open file
func PrepareNewBufFile ¶
func PrepareNewBufFile(dirPath string, oldFsStat *bufFileStat, isScan, isGz bool, sizeBytes int64) (fsStat *bufFileStat, err error)
PrepareNewBufFile create new data & id files, and update bufFileStat.
- if `isScan=true`, will scan directory to find existing buf files, then generate new buf files.
* if `isScan=false`, keep old buf files, directly generate new file without scan directory.
Types ¶
type BaseSerializer ¶
BaseSerializer base serializer
type Data ¶
Data msgp data schema
func (*Data) MarshalMsg ¶
MarshalMsg implements msgp.Marshaler
type DataDecoder ¶
type DataDecoder struct { BaseSerializer // contains filtered or unexported fields }
DataDecoder data deserializer
func NewDataDecoder ¶
func NewDataDecoder(fp *os.File, isCompress bool) (decoder *DataDecoder, err error)
NewDataDecoder create new DataDecoder
func (*DataDecoder) Read ¶
func (dec *DataDecoder) Read(data *Data) (err error)
Read deserialize data from fp
type DataEncoder ¶
type DataEncoder struct { BaseSerializer // contains filtered or unexported fields }
DataEncoder data serializer
func NewDataEncoder ¶
func NewDataEncoder(fp *os.File, isCompress bool) (enc *DataEncoder, err error)
NewDataEncoder create new DataEncoder
func (*DataEncoder) Close ¶
func (enc *DataEncoder) Close() (err error)
Close close data gzip writer
func (*DataEncoder) Write ¶
func (enc *DataEncoder) Write(msg *Data) (err error)
Write serialize data info fp
type IdsDecoder ¶
type IdsDecoder struct { BaseSerializer // contains filtered or unexported fields }
IdsDecoder ids deserializer
func NewIdsDecoder ¶
func NewIdsDecoder(fp *os.File, isCompress bool) (decoder *IdsDecoder, err error)
NewIdsDecoder create new IdsDecoder
func (*IdsDecoder) LoadMaxId ¶
func (dec *IdsDecoder) LoadMaxId() (maxId int64, err error)
LoadMaxId load the maxium id in all files
func (*IdsDecoder) ReadAllToBmap ¶
func (dec *IdsDecoder) ReadAllToBmap() (ids *roaring.Bitmap, err error)
ReadAllToBmap read all ids in all files into bmap
func (*IdsDecoder) ReadAllToInt64Set ¶
func (dec *IdsDecoder) ReadAllToInt64Set(ids Int64SetItf) (err error)
ReadAllToBmap read all ids in all files into set
type IdsEncoder ¶
type IdsEncoder struct { BaseSerializer // contains filtered or unexported fields }
IdsEncoder ids serializer
func NewIdsEncoder ¶
func NewIdsEncoder(fp *os.File, isCompress bool) (enc *IdsEncoder, err error)
NewIdsEncoder create new IdsEncoder
func (*IdsEncoder) Write ¶
func (enc *IdsEncoder) Write(id int64) (err error)
Write serialize id info fp
type Int64Set ¶
type Int64Set struct {
// contains filtered or unexported fields
}
Int64Set set depends on sync.Map. cost much more memory than bitmap
Example ¶
s := NewInt64Set() s.Add(5) s.CheckAndRemove(5) // true s.CheckAndRemove(3) // false
Output:
func (*Int64Set) CheckAndRemove ¶
CheckAndRemove return true if exists
type Int64SetItf ¶
Int64SetItf set for int64
type Int64SetWithTTL ¶
Int64SetWithTTL int64 set with TTL
func NewInt64SetWithTTL ¶
func NewInt64SetWithTTL(ctx context.Context, ttl time.Duration) *Int64SetWithTTL
NewInt64SetWithTTL create new int64 set with ttl
func (*Int64SetWithTTL) CheckAndRemove ¶
func (s *Int64SetWithTTL) CheckAndRemove(id int64) (ok bool)
CheckAndRemove return true if id committed
func (*Int64SetWithTTL) GetLen ¶
func (s *Int64SetWithTTL) GetLen() (r int)
GetLen get items number of set
func (*Int64SetWithTTL) StartRotate ¶
func (s *Int64SetWithTTL) StartRotate(ctx context.Context)
StartRotate start counter rotate
type Journal ¶
type Journal struct { // RWMutex journal rwlock. // acquire write lock when flush/rotate journal legacy. // acquire read lock when read/write journal legacy. sync.RWMutex // contains filtered or unexported fields }
Journal redo log consist by msgs and committed ids
func NewJournal ¶
func NewJournal(opts ...OptionFunc) (j *Journal, err error)
NewJournal create new Journal
func (*Journal) IsLegacyRunning ¶
IsLegacyRunning check whether running legacy loading
func (*Journal) LoadLegacyBuf ¶
LoadLegacyBuf load legacy data one by one ⚠️Warn: should call `j.LockLegacy()` before invoke this method
func (*Journal) LockLegacy ¶
LockLegacy lock legacy to prevent rotate, clean
func (*Journal) UnLockLegacy ¶
UnLockLegacy release legacy lock
type LegacyLoader ¶
type LegacyLoader struct { // acquire write lock during reset. // acquire read lock during read/write data/ids files. sync.RWMutex // contains filtered or unexported fields }
LegacyLoader loader to handle legacy data and ids
func NewLegacyLoader ¶
func NewLegacyLoader(ctx context.Context, logger *utils.LoggerType, dataFNames, idsFNames []string, isCompress bool, committedIDTTL time.Duration, ) *LegacyLoader
NewLegacyLoader create new LegacyLoader
func (*LegacyLoader) CheckAndRemove ¶ added in v1.1.0
func (l *LegacyLoader) CheckAndRemove(id int64) bool
func (*LegacyLoader) GetIdsLen ¶
func (l *LegacyLoader) GetIdsLen() int
GetIdsLen return length of ids
func (*LegacyLoader) Load ¶
func (l *LegacyLoader) Load(data *Data) (err error)
Load load data from legacy
func (*LegacyLoader) LoadAllids ¶
func (l *LegacyLoader) LoadAllids(ids Int64SetItf) (err error)
LoadAllids read all ids from ids file into ids set
func (*LegacyLoader) LoadMaxId ¶
func (l *LegacyLoader) LoadMaxId() (maxId int64, err error)
LoadMaxId load max id from all ids files
func (*LegacyLoader) Reset ¶
func (l *LegacyLoader) Reset(dataFNames, idsFNames []string)
Reset reset journal legacy link to existing files
type OptionFunc ¶ added in v1.1.0
type OptionFunc func(*option) error
func WithBufDirPath ¶ added in v1.1.0
func WithBufDirPath(path string) OptionFunc
func WithBufSizeByte ¶ added in v1.1.0
func WithBufSizeByte(bufSize int64) OptionFunc
func WithCommitIDTTL ¶ added in v1.1.0
func WithCommitIDTTL(d time.Duration) OptionFunc
func WithFlushInterval ¶ added in v1.1.0
func WithFlushInterval(d time.Duration) OptionFunc
func WithIsAggresiveGC ¶ added in v1.1.0
func WithIsAggresiveGC(is bool) OptionFunc
func WithIsCompress ¶ added in v1.1.0
func WithIsCompress(is bool) OptionFunc
func WithLogger ¶ added in v1.1.2
func WithLogger(logger *utils.LoggerType) OptionFunc
func WithName ¶ added in v1.1.0
func WithName(name string) OptionFunc
func WithRotateCheckInterval ¶ added in v1.1.0
func WithRotateCheckInterval(d time.Duration) OptionFunc
func WithRotateDuration ¶ added in v1.1.0
func WithRotateDuration(d time.Duration) OptionFunc
type Uint32Set ¶
Uint32Set set depends on bitmap. only support uint32, so cannot support more than 4294967295 numbers.
func (*Uint32Set) CheckAndRemoveInt64 ¶
CheckAndRemoveInt64 return true if exists
func (*Uint32Set) CheckAndRemoveUint32 ¶
CheckAndRemoveUint32 return true if exists