journal

package module
v1.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2020 License: MIT Imports: 24 Imported by: 1

README

Go-Journal

seperated from go-utils/journal.

Documentation

Index

Examples

Constants

View Source
const (
	// FileMode default file mode
	FileMode os.FileMode = 0664
	// DirMode default directory mode
	DirMode = os.FileMode(0775) | os.ModeDir

	// BufSize default buf file size
	BufSize = 1024 * 1024 * 4 // 4 MB
)

Variables

View Source
var (
	// ErrDuringRotate rotate error
	ErrDuringRotate = fmt.Errorf("during rotating")
)

Functions

func GenerateNewBufFName

func GenerateNewBufFName(now time.Time, oldFName string) (string, error)

GenerateNewBufFName return new buf file name depends on current time file name looks like `yyyymmddnnnn.ids`, nnnn begin from 0001 for each day

func OpenBufFile

func OpenBufFile(filepath string, preallocateBytes int64) (fp *os.File, err error)

OpenBufFile create and open file

func PrepareDir

func PrepareDir(path string) error

PrepareDir `mkdir -p`

func PrepareNewBufFile

func PrepareNewBufFile(dirPath string, oldFsStat *bufFileStat, isScan, isGz bool, sizeBytes int64) (fsStat *bufFileStat, err error)

PrepareNewBufFile create new data & id files, and update bufFileStat.

  • if `isScan=true`, will scan directory to find existing buf files, then generate new buf files.

* if `isScan=false`, keep old buf files, directly generate new file without scan directory.

Types

type BaseSerializer

type BaseSerializer struct {
	sync.Mutex
	// contains filtered or unexported fields
}

BaseSerializer base serializer

type Data

type Data struct {
	Data map[string]interface{}
	ID   int64
}

Data msgp data schema

func (*Data) DecodeMsg

func (z *Data) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*Data) EncodeMsg

func (z *Data) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*Data) MarshalMsg

func (z *Data) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Data) Msgsize

func (z *Data) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Data) UnmarshalMsg

func (z *Data) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type DataDecoder

type DataDecoder struct {
	BaseSerializer
	// contains filtered or unexported fields
}

DataDecoder data deserializer

func NewDataDecoder

func NewDataDecoder(fp *os.File, isCompress bool) (decoder *DataDecoder, err error)

NewDataDecoder create new DataDecoder

func (*DataDecoder) Read

func (dec *DataDecoder) Read(data *Data) (err error)

Read deserialize data from fp

type DataEncoder

type DataEncoder struct {
	BaseSerializer
	// contains filtered or unexported fields
}

DataEncoder data serializer

func NewDataEncoder

func NewDataEncoder(fp *os.File, isCompress bool) (enc *DataEncoder, err error)

NewDataEncoder create new DataEncoder

func (*DataEncoder) Close

func (enc *DataEncoder) Close() (err error)

Close close data gzip writer

func (*DataEncoder) Flush

func (enc *DataEncoder) Flush() (err error)

Flush flush buf to fp

func (*DataEncoder) Write

func (enc *DataEncoder) Write(msg *Data) (err error)

Write serialize data info fp

type IdsDecoder

type IdsDecoder struct {
	BaseSerializer
	// contains filtered or unexported fields
}

IdsDecoder ids deserializer

func NewIdsDecoder

func NewIdsDecoder(fp *os.File, isCompress bool) (decoder *IdsDecoder, err error)

NewIdsDecoder create new IdsDecoder

func (*IdsDecoder) LoadMaxId

func (dec *IdsDecoder) LoadMaxId() (maxId int64, err error)

LoadMaxId load the maxium id in all files

func (*IdsDecoder) ReadAllToBmap

func (dec *IdsDecoder) ReadAllToBmap() (ids *roaring.Bitmap, err error)

ReadAllToBmap read all ids in all files into bmap

func (*IdsDecoder) ReadAllToInt64Set

func (dec *IdsDecoder) ReadAllToInt64Set(ids Int64SetItf) (err error)

ReadAllToBmap read all ids in all files into set

type IdsEncoder

type IdsEncoder struct {
	BaseSerializer
	// contains filtered or unexported fields
}

IdsEncoder ids serializer

func NewIdsEncoder

func NewIdsEncoder(fp *os.File, isCompress bool) (enc *IdsEncoder, err error)

NewIdsEncoder create new IdsEncoder

func (*IdsEncoder) Close

func (enc *IdsEncoder) Close() (err error)

Close close ids gzip writer

func (*IdsEncoder) Flush

func (enc *IdsEncoder) Flush() (err error)

Flush flush buf to fp

func (*IdsEncoder) Write

func (enc *IdsEncoder) Write(id int64) (err error)

Write serialize id info fp

type Int64Set

type Int64Set struct {
	// contains filtered or unexported fields
}

Int64Set set depends on sync.Map. cost much more memory than bitmap

Example
s := NewInt64Set()
s.Add(5)

s.CheckAndRemove(5) // true
s.CheckAndRemove(3) // false
Output:

func NewInt64Set

func NewInt64Set() *Int64Set

NewInt64Set create new Int64Set

func (*Int64Set) Add

func (s *Int64Set) Add(i int)

Add add new number

func (*Int64Set) AddInt64

func (s *Int64Set) AddInt64(i int64)

AddInt64 add int64

func (*Int64Set) CheckAndRemove

func (s *Int64Set) CheckAndRemove(i int64) (ok bool)

CheckAndRemove return true if exists

func (*Int64Set) GetLen

func (s *Int64Set) GetLen() int

GetLen return length

type Int64SetItf

type Int64SetItf interface {
	Add(int)
	AddInt64(int64)
	CheckAndRemove(int64) bool
	GetLen() int
}

Int64SetItf set for int64

type Int64SetWithTTL

type Int64SetWithTTL struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Int64SetWithTTL int64 set with TTL

func NewInt64SetWithTTL

func NewInt64SetWithTTL(ctx context.Context, ttl time.Duration) *Int64SetWithTTL

NewInt64SetWithTTL create new int64 set with ttl

func (*Int64SetWithTTL) Add

func (s *Int64SetWithTTL) Add(id int)

Add add int

func (*Int64SetWithTTL) AddInt64

func (s *Int64SetWithTTL) AddInt64(id int64)

AddInt64 add int64

func (*Int64SetWithTTL) CheckAndRemove

func (s *Int64SetWithTTL) CheckAndRemove(id int64) (ok bool)

CheckAndRemove return true if id committed

func (*Int64SetWithTTL) Close

func (s *Int64SetWithTTL) Close()

Close close set, stop rotate

func (*Int64SetWithTTL) GetLen

func (s *Int64SetWithTTL) GetLen() (r int)

GetLen get items number of set

func (*Int64SetWithTTL) StartRotate

func (s *Int64SetWithTTL) StartRotate(ctx context.Context)

StartRotate start counter rotate

type Journal

type Journal struct {
	// RWMutex journal rwlock.
	// acquire write lock when flush/rotate journal legacy.
	// acquire read lock when read/write journal legacy.
	sync.RWMutex
	// contains filtered or unexported fields
}

Journal redo log consist by msgs and committed ids

func NewJournal

func NewJournal(opts ...OptionFunc) (j *Journal, err error)

NewJournal create new Journal

func (*Journal) Close

func (j *Journal) Close()

func (*Journal) Flush

func (j *Journal) Flush() (err error)

Flush flush journal files buffer to file

func (*Journal) GetMetric

func (j *Journal) GetMetric() map[string]interface{}

GetMetric monitor inteface

func (*Journal) IsLegacyRunning

func (j *Journal) IsLegacyRunning() bool

IsLegacyRunning check whether running legacy loading

func (*Journal) LoadLegacyBuf

func (j *Journal) LoadLegacyBuf(data *Data) (err error)

LoadLegacyBuf load legacy data one by one ⚠️Warn: should call `j.LockLegacy()` before invoke this method

func (*Journal) LoadMaxId

func (j *Journal) LoadMaxId() (int64, error)

LoadMaxId load max id from journal ids files

func (*Journal) LockLegacy

func (j *Journal) LockLegacy() bool

LockLegacy lock legacy to prevent rotate, clean

func (*Journal) Rotate

func (j *Journal) Rotate(ctx context.Context) (err error)

Rotate create new data and ids buf file. this function is not threadsafe.

func (*Journal) Start added in v1.1.0

func (j *Journal) Start(ctx context.Context) (err error)

func (*Journal) UnLockLegacy

func (j *Journal) UnLockLegacy() bool

UnLockLegacy release legacy lock

func (*Journal) WriteData

func (j *Journal) WriteData(data *Data) (err error)

WriteData write data to journal

func (*Journal) WriteId

func (j *Journal) WriteId(id int64) error

WriteId write id to journal

type LegacyLoader

type LegacyLoader struct {
	// acquire write lock during reset.
	// acquire read lock during read/write data/ids files.
	sync.RWMutex
	// contains filtered or unexported fields
}

LegacyLoader loader to handle legacy data and ids

func NewLegacyLoader

func NewLegacyLoader(ctx context.Context,
	logger *utils.LoggerType,
	dataFNames, idsFNames []string,
	isCompress bool,
	committedIDTTL time.Duration,
) *LegacyLoader

NewLegacyLoader create new LegacyLoader

func (*LegacyLoader) AddID

func (l *LegacyLoader) AddID(id int64)

AddID add id in ids

func (*LegacyLoader) CheckAndRemove added in v1.1.0

func (l *LegacyLoader) CheckAndRemove(id int64) bool

func (*LegacyLoader) Clean

func (l *LegacyLoader) Clean() error

Clean remove old legacy files

func (*LegacyLoader) GetIdsLen

func (l *LegacyLoader) GetIdsLen() int

GetIdsLen return length of ids

func (*LegacyLoader) Load

func (l *LegacyLoader) Load(data *Data) (err error)

Load load data from legacy

func (*LegacyLoader) LoadAllids

func (l *LegacyLoader) LoadAllids(ids Int64SetItf) (err error)

LoadAllids read all ids from ids file into ids set

func (*LegacyLoader) LoadMaxId

func (l *LegacyLoader) LoadMaxId() (maxId int64, err error)

LoadMaxId load max id from all ids files

func (*LegacyLoader) Reset

func (l *LegacyLoader) Reset(dataFNames, idsFNames []string)

Reset reset journal legacy link to existing files

type OptionFunc added in v1.1.0

type OptionFunc func(*option) error

func WithBufDirPath added in v1.1.0

func WithBufDirPath(path string) OptionFunc

func WithBufSizeByte added in v1.1.0

func WithBufSizeByte(bufSize int64) OptionFunc

func WithCommitIDTTL added in v1.1.0

func WithCommitIDTTL(d time.Duration) OptionFunc

func WithFlushInterval added in v1.1.0

func WithFlushInterval(d time.Duration) OptionFunc

func WithIsAggresiveGC added in v1.1.0

func WithIsAggresiveGC(is bool) OptionFunc

func WithIsCompress added in v1.1.0

func WithIsCompress(is bool) OptionFunc

func WithLogger added in v1.1.2

func WithLogger(logger *utils.LoggerType) OptionFunc

func WithName added in v1.1.0

func WithName(name string) OptionFunc

func WithRotateCheckInterval added in v1.1.0

func WithRotateCheckInterval(d time.Duration) OptionFunc

func WithRotateDuration added in v1.1.0

func WithRotateDuration(d time.Duration) OptionFunc

type Uint32Set

type Uint32Set struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Uint32Set set depends on bitmap. only support uint32, so cannot support more than 4294967295 numbers.

func NewUint32Set

func NewUint32Set() *Uint32Set

NewUint32Set create new Uint32Set

func (*Uint32Set) AddInt64

func (s *Uint32Set) AddInt64(i int64)

AddInt64 add new number

func (*Uint32Set) AddUint32

func (s *Uint32Set) AddUint32(i uint32)

AddUint32 add new number

func (*Uint32Set) CheckAndRemoveInt64

func (s *Uint32Set) CheckAndRemoveInt64(i int64) (ok bool)

CheckAndRemoveInt64 return true if exists

func (*Uint32Set) CheckAndRemoveUint32

func (s *Uint32Set) CheckAndRemoveUint32(i uint32) (ok bool)

CheckAndRemoveUint32 return true if exists

func (*Uint32Set) GetLen

func (s *Uint32Set) GetLen() int

GetLen return length

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL