history

package module
v0.0.0-...-5bd4fe7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2023 License: MIT Imports: 22 Imported by: 0

README

nntp-history - work in progress and every commit makes it worse! xD

nntp-history is a Go module designed for managing and storing NNTP (Network News Transfer Protocol) history records efficiently.

It provides a way to store and retrieve historical message records in a simple and scalable manner.

This module is suitable for building applications related to Usenet news servers or any system that requires managing message history efficiently.

go get github.com/go-while/nntp-history
   # bbolt code and test with latest commits from github
   # git glone https://github.com/etcd-io/bbolt in 'src/go.etcd.io/'

Code Hints

BootHistory Function

The BootHistory function in this Go code is responsible for initializing and booting a history management system.

It provides essential configuration options and prepares the system for historical data storage and retrieval.

Usage

To use the BootHistory function, follow these steps:

  1. Call the BootHistory function with the desired configuration options.

  2. The history management system will be initialized and ready for use.

history.History.WriterChan

  • history.History.WriterChan is a Go channel used for sending and processing historical data entries.

  • It is primarily responsible for writing data to a historical data storage system, using a HashDB (BoltDB) to avoid duplicate entries.

  • To send data for writing, you create a HistoryObject and send it through the channel.

  • If the ResponseChan channel is provided, it receives one of the following (int) values:

  /*
  0: Indicates "not a duplicate."
  1: Indicates "duplicate."
  2: Indicates "retry later."
  */

history.History.IndexChan

  • The history.History.IndexChan is a Go channel with a dual purpose.

  • Its primary function is to facilitate the detection of duplicate message-ID hashes within the history file.

  • When the offset is set to -1, the channel performs a check for duplicate hashes but does not add the hash to the BoltDB database.

  • When the offset is set to a value greater than zero, the channel functions as a mechanism for adding these message-ID hashes to the BoltDB database.

  • Beware: Adding message-ID hashes is then normally done via history.History.WriterChan if you want to write the history file too!

  • If desired, one could only use the IndexChan and avoid writing the history file. Use the full KeyLen of hash and provide a uniq up-counter for their Offsets.

  • If the IndexRetChan channel is provided, it receives one of the following (int) values:

  /*
  0: Indicates "not a duplicate."
  1: Indicates "duplicate."
  2: Indicates "retry later."
  */

Message-ID Hash Splitting with BoltDB

KeyAlgo (HashShort)

  • The standard key algorithm used is HashShort (-keyalgo=11).

Database Organization

To improve processing speed and optimize data storage, we organize data into multiple BoltDB databases based on the first character of the hash:

  • We create 16 separate BoltDB databases, each corresponding to one of the characters (0-9 and a-f).

  • Each of these 16 DBs is further divided into 256 buckets using the 2nd and 3rd character of the hash.

Key Structure

The remaining portion of the hash (after the first two characters) is used as the key to store offsets in the database:

  • The length of this remaining hash used as a key can be customized based on the KeyLen setting.
  • A lower KeyLen results in more "multioffsets" stored per key, which can be beneficial for reducing storage space.
  • However, a lower KeyLen also result in more frequent file seeks when accessing data.
  • The recommended KeyLen is 6. The minimum KeyLen is 1. The maximum KeyLen is the length of the hash -3.
  • Reasonable values for KeyLen range from 4 to 6. more if you expect more than 100M messages.
  • Choose wisely. You can not change KeyLen or KeyAlgo later.

*** outdated numbers as we updated code to sub.buckets and KeyIndex ***

# Test   1M inserts with `KeyLen` = 3 (`i` hashes): appoffset=29256 (~3%) trymultioffsets=57928 (~6%)
# Test  10M inserts with `KeyLen` = 3 (`i` hashes): appoffset=2467646 (24%!!) trymultioffsets=4491877 (44%!!)
# Test 100M inserts with `KeyLen` = 3 (`i` hashes): appoffset=XXXX trymultioffsets=XXXX

# Test   1M inserts with `KeyLen` = 4 (`i` hashes): appoffset=1850 (<0.2%) trymultioffsets=3698 (<0.4%)
# Test  10M inserts with `KeyLen` = 4 (`i` hashes): appoffset=183973 (<2%) trymultioffsets=365670 (<4%)
# Test 100M inserts with `KeyLen` = 4 (`i` hashes): appoffset=XXXX trymultioffsets=XXXX

# Test   1M inserts with `KeyLen` = 5 (`i` hashes): appoffset=114 trymultioffsets=228
# Test  10M inserts with `KeyLen` = 5 (`i` hashes): appoffset=11667 (~0.1%) trymultioffsets=23321 (~0.2%)
# Test 100M inserts with `KeyLen` = 5 (`i` hashes): appoffset=XXXX trymultioffsets=XXXX

# Test   1M inserts with `KeyLen` = 6 (`i` hashes): appoffset=4 trymultioffsets=8
# Test  10M inserts with `KeyLen` = 6 (`i` hashes): appoffset=748 trymultioffsets=1496
# Test 100M inserts with `KeyLen` = 6 (`i` hashes): appoffset=16511583 trymultioffsets=XXXX

# Test   1M inserts with `KeyLen` = 8 (`i` hashes): appoffset=0 trymultioffsets=0
# Test  10M inserts with `KeyLen` = 8 (`i` hashes): appoffset=4 trymultioffsets=8
# Test 100M inserts with `KeyLen` = 8 (`i` hashes): appoffset=XXXX trymultioffsets=XXXX

Example

Let's illustrate this approach with an example:

Suppose you have a Message-ID hash of "1a2b3c4d5e6f0...":

  • We have 16 Databases: [0-9a-f]. The first character "1" selects the database "1".
  • Each database holds 256 buckets. The next character "a2" selects the bucket "a2" within the "1" database.
  • The remaining hash "b3c4d5e6f0..." is used as the key in the "a2" bucket based on the specified KeyLen.

By following this approach, you can efficiently organize and retrieve data based on Message-ID hashes while benefiting from the performance and storage optimizations provided by BoltDB.

Feel free to customize the KeyLen setting to meet your specific performance and storage requirements.

Smaller KeyLen values save space but may result in more disk access, while larger KeyLen values reduce disk access but consume more space.

File Descriptors: 33 (+ 3 /dev/pts + 1 anon_inode: + 2 pipe: ?) = 39

  • Writer History.dat: 1
  • HashDB: 16 + 16 Fseeks
ls -lha /proc/$(pidof nntp-history-test)/fd

# counts all open FDs
ls -l /proc/$(pidof nntp-history-test)/fd | wc -l

# count open history.dat
ls -lha /proc/$(pidof nntp-history-test)/fd|grep history.dat$|wc -l

# count open hashdb
ls -lha /proc/$(pidof nntp-history-test)/fd|grep history.dat.hash|wc -l

BBoltDB Statistics

When you retrieve and examine the statistics (e.g., by using the Stats method) of a BoltDB (bbolt) database in Go, you can gather valuable information about the database's performance, resource usage, and structure. The statistics provide insights into how the database is operating and can be useful for optimizing your application. Here are some of the key metrics you can see from BoltDB database statistics:

Number of Buckets: The total number of buckets in the database. Each bucket is essentially a separate namespace for key-value pairs.

Number of Keys: The total number of keys stored in the database. This metric can help you understand the size of your dataset.

Number of Data Pages: The total number of data pages used by the database. Data pages store key-value pairs.

Number of Leaf Pages: The total number of leaf pages in the database. Leaf pages contain key-value pairs directly.

Number of Branch Pages: The total number of branch pages in the database. Branch pages are used for indexing and navigating to leaf pages.

Page Size: The size of each page in bytes. This information can be helpful for understanding memory usage.

Bucket Page Size: The average size of bucket pages in bytes. This metric can provide insights into how efficiently your buckets are organized.

Leaf Page Size: The average size of leaf pages in bytes. This helps you understand the size of the data itself.

Branch Page Size: The average size of branch pages in bytes. This metric can be useful for optimizing indexing.

Allocated Pages: The total number of pages allocated by the database. This can be helpful for monitoring resource usage.

Freed Pages: The total number of pages that have been freed or released. This metric can indicate how efficiently the database manages space.

Page Rebalance: The number of times pages have been rebalanced between sibling branches. This is relevant for understanding how the database maintains a balanced tree structure.

Transaction Stats: Information about transactions, including the number of started and committed transactions.

Page Cache Stats: Metrics related to the page cache, including the number of hits and misses, and the size of the cache.

Free Page Ns: The number of free pages per page allocation size. This provides insight into the fragmentation of free space.

These statistics can help you monitor and optimize your BoltDB database. For example, you can use them to identify performance bottlenecks, understand resource usage patterns, and assess the efficiency of your data organization. Depending on your application's specific requirements, you may want to focus on certain metrics more than others.

Contributing

Contributions to this code are welcome.

If you have suggestions for improvements or find issues, please feel free to open an issue or submit a pull request.

License

This code is provided under the MIT License. See the LICENSE file for details.

Benchmark pure writes (no dupe check via hashdb) to history file with 4K bufio.

  • not using L1Cache results in 4mio lines written as there are 4 jobs running
./nntp-history-test -useHashDB=false -useL1Cache=true -todo=1000000

Inserting 4.000.000 i hashes (75% duplicates) to history and hashdb

./nntp-history-test -todo=1000000 -DBG_BS_LOG=true
...

Checking 4.000.000 i hashes (75% duplicates) vs hashdb

./nntp-history-test -todo=1000000 -DBG_BS_LOG=false
...

Inserting 400.000.000 i hashes (75% duplicates) to history and hashdb (adaptive batchsize)

# history.DBG_BS_LOG = true // debugs BatchLOG for every batch insert!
# history.DBG_GOB_TEST = true // costly check: test decodes gob encoded data
#
./nntp-history-test -todo 100000000
...

Inserting 400.000.000 i hashes (75% duplicates) to history and hashdb (NO adaptive batchsize)

# history.DBG_BS_LOG = false // debugs BatchLOG for every batch insert!
# history.DBG_GOB_TEST = false // costly check: test decodes gob encoded data
#
./nntp-history-test -todo=100000000
...
...
...

Checking 400.000.000 i hashes (75% duplicates) vs hashdb

# history.DBG_BS_LOG = true // debugs BatchLOG for every batch insert!
#
./nntp-history-test -todo=100000000
...

Sizes with 25M hashes inserted

  • Filesystem: ZFS@linux
  • ZFS compression: lz4
  • ZFS blocksize=128K

Documentation

Index

Constants

View Source
const (
	HashShort                     = 0x0B // 11
	DefaultBoltINITParallel       = intBoltDBs
	DefaultBoltSYNCParallel       = intBoltDBs
	DefaultReplayDistance   int64 = 1024 * 1024
	WCBBS_UL                      = 0xFFFF // adaptive BatchSize => workerCharBucketBatchSize UpperLimit
	WCBBS_LL                      = 0xF    // adaptive BatchSize => workerCharBucketBatchSize LowerLimit
	// KeyLen is used with HashShort
	//  1st char of hash selects boltDB
	//  2nd + 3rd char (+4th char: if 4K his.rootBUCKETS) of hash selects bucket in boltDB
	//  remaining chars [3:$] are used as Key in BoltDB to store offset(s)
	// the key is further divided into 1st+2nd+3rd+... char as sub buckets and remainder used as key in the root.bucket.sub.bucket[3:$]
	//  offsets lead into history.dat and point to start of a line containing the full hash
	MinKeyLen = 8 // goes with HashShort
)
View Source
const (
	// never change this
	FlagExpires      bool = true
	FlagNeverExpires bool = false
)
View Source
const (
	CR                = "\r"
	LF                = "\n"
	CRLF              = CR + LF
	DefaultSocketPath = "./history.socket"
	// default launches a tcp port with a telnet interface @ localhost:49119
	DefaultServerTCPAddr = "[::]:49119"
)
View Source
const (
	ALWAYS = true
	// DefExpiresStr use 10 digits as spare so we can update it later without breaking offsets
	DefExpiresStr string = "----------" // never expires
	CaseLock             = 0xFF         // internal cache state. reply with CaseRetry while CaseLock
	CasePass             = 0xF1         // is a reply to L1Lock and IndexQuery
	CaseDupes            = 0x1C         // is a reply and cache state
	CaseRetry            = 0x2C         // is a reply to if CaseLock or CaseWrite or if history.dat returns EOF
	CaseAdded            = 0x3C         // is a reply to WriterChan:responseChan
	CaseWrite            = 0x4C         // internal cache state. is not a reply. reply with CaseRetry while CaseWrite is happening
	CaseError            = 0xE1         // some things drop this error
	ZEROPADLEN           = 0xFFF        // zeropads the header

)
View Source
const (
	FlagSearch = -1
)
View Source
const (
	MinRetryWaiter = 100
)

Variables

View Source
var (
	DBG_BS_LOG           bool                           // debugs BatchLOG for every batch insert! beware of the memory eating dragon!
	DBG_ABS1             bool                           // debugs adaptive batchsize in boltBucketPutBatch
	DBG_ABS2             bool                           // debugs adaptive batchsize forbatchqueue in boltDB_Worker
	AdaptBatch           bool                           // automagically adjusts CharBucketBatchSize=>wCBBS=workerCharBucketBatchSize to match BatchFlushEvery
	BatchFlushEvery      int64  = 5120                  // flushes boltDB in batch every N milliseconds (1-...)
	BoltDB_MaxBatchDelay        = 10 * time.Millisecond // default value from boltdb:db.go = 10 * time.Millisecond
	BoltDB_MaxBatchSize  int    = 1000                  // default value from boltdb:db.go = 1000
	CharBucketBatchSize  int    = 1024                  // default batchsize per char:bucket batchqueues
	EmptyStr             string                         // used as pointer
)
View Source
var (
	BoltDBreopenEveryN       = 0x0   // reopens boltDB every N added (not batchins) // very experimental and not working!
	WatchBoltTimer     int64 = 10    // prints bolts stats every N seconds. only with DEBUG
	NoReplayHisDat     bool  = false // can be set before booting to not replay history.dat
	// stop replay HisDat if we got this many OKs with a distance to missing
	// ReplayTestMax depends on bbolt.db.MaxBatchSize. ReplayTestMax should be at least 2x bbolt.db.MaxBatchSize!
	// if the process crashes: do NOT change the MaxBatchSize before starting!
	// ReplayHisDat() needs the same MaxBatchSize!
	ReplayDistance       int64  = DefaultReplayDistance           // defaults to replay at least 128K messages, more if missed ones (not in hashdb) appear.
	QIndexChan           int    = 16                              // Main-indexchan can queue this
	QindexChans          int    = 16                              // every sub-indexchans for a `char` can queue this
	BoltDB_AllocSize     int                                      // if not set defaults: 16 * 1024 * 1024 (min: 1024*1024)
	BoltSyncEveryS       int64  = 60                              // call db.sync() every seconds (only used with 'boltopts.NoSync: true')
	BoltSyncEveryN       uint64 = 500000                          // call db.sync() after N inserts (only used with 'boltopts.NoSync = true')
	BoltINITParallel     int    = DefaultBoltINITParallel         // set this via 'history.BoltINITParallel = 1' before calling BootHistory.
	BoltSYNCParallel     int    = DefaultBoltSYNCParallel         // set this via 'history.BoltSYNCParallel = 1' before calling BootHistory.
	BoltHashOpen                = make(chan struct{}, intBoltDBs) // dont change this
	HISTORY_INDEX_LOCK          = make(chan struct{}, 1)          // main lock
	HISTORY_INDEX_LOCK16        = make(chan struct{}, intBoltDBs) // sub locks

	// adjust root buckets page splitting behavior
	// we mostly do random inserts: lower value should be better?
	RootBucketFillPercent = 1.0

	// adjust sub buckets page splitting behavior
	// unsure if it does anything in sub buckets?
	SubBucketFillPercent = 0.25

	// can be 16 | (default: 256) | 4096 !4K is insane!
	// creates this many batchQueues and more goroutines
	// 16 generates a lot of pagesplits in bbolt
	// 256 is the sweet spot
	// 4096 generates high load as it launches this many and more go routines and queues!
	RootBUCKETSperDB = 16

	// KeyIndex creates 16^N sub buckets in `his.rootBUCKETS
	// KeyIndex cuts (shortens) the KeyLen by this to use as subb.buckets
	// 0 disables sub/nested buckets and uses full Keylen as Key in RootBuckets only.
	KeyIndex = 0
)
View Source
var (
	UseArenas             bool
	DBG_CGS               bool               // DEBUG_CACHE_GROW_SHRINK
	DefaultCacheExpires   int64 = 5          // gets x2 BatchFlushEvery x2
	DefaultCacheExtend    int64 = 5          // extends cached items after writes
	DefaultCachePurge     int64 = 1          // checks ttl every N seconds. affects CacheExpires/Extend max to + Purge
	DefaultEvictsCapacity       = intBoltDBs // his.cEvCap (size of Extend chan) is normally fine as is.
	ClearEveryN                 = DefaultEvictsCapacity
)
View Source
var (
	BootHisCli           bool
	DefaultHistoryServer = "[::1]:49119" // localhost:49119
	// set only once before boot
	TCPchanQ           = 128
	DefaultDialTimeout = 5   // seconds
	DefaultRetryWaiter = 500 // milliseconds
	DefaultDialRetries = -1  // try N times and fail or <= 0 enables infinite retry
)
View Source
var (
	DEBUGL1         bool  = false
	L1              bool  = true // better not disable L1 cache...
	L1CacheExpires  int64 = DefaultCacheExpires
	L1ExtendExpires int64 = DefaultCacheExtend
	L1Purge         int64 = DefaultCachePurge
	L1InitSize      int   = 64 * 1024

	// L1LockDelay: delays L1 locking by N milliseconds
	// L1 locking is most likely done per client-connection
	// settings this greater 0 limits the amount of articles a client can lock&send
	//    1ms is a max of 1000 messages/sec per conn
	//  100ms is a max of   10 messages/sec per conn
	//  250ms is a max of    4 messages/sec per conn
	// 1000ms is a max of    1 message /sec per conn
	// text peers mostly dont need more than 4 msg per sec
	L1LockDelay int = 0
)
View Source
var (
	DEBUGL2         bool  = false
	L2              bool  = true
	L2CacheExpires  int64 = DefaultCacheExpires
	L2ExtendExpires int64 = DefaultCacheExtend
	L2Purge         int64 = DefaultCachePurge
	L2InitSize      int   = 64 * 1024
)

L2Cache: offset => hash less requests to hisDat

View Source
var (
	DEBUGL3         bool  = false
	L3              bool  = true // do not disable!
	L3CacheExpires  int64 = DefaultCacheExpires
	L3ExtendExpires int64 = DefaultCacheExtend
	L3Purge         int64 = DefaultCachePurge
	L3InitSize      int   = 64 * 1024
)

* * L3Cache: key => offsets * less requests to boltDB * * disabling L3 is not a good idea!! * queues hold offsets which DB does not know about! * cache keeps track of duplicate writes * duplicate keys will get an empty_offsets * and the latest will overwrite the past write maybe still in queue *

View Source
var (
	IndexParallel     int = intBoltDBs
	NumQueueWriteChan int = intBoltDBs
	HisDatWriteBuffer int = 4 * 1024
)
View Source
var (
	BootVerbose = true
	TESTHASH0   = "0f05e27ca579892a63a256dacd657f5615fab04bf81e85f53ee52103e3a4fae8"
	TESTHASH1   = "f0d784ae13ce7cf1f3ab076027a6265861eb003ad80069cdfb1549dd1b8032e8"
	TESTHASH2   = "f0d784ae1747092974d02bd3359f044a91ed4fd0a39dc9a1feffe646e6c7ce09"
	TESTHASH    = TESTHASH2
	TESTCACKEY  = "f0d784ae1"
	TESTKEY     = "784ae1"
	TESTBUK     = "0d"
	TESTDB      = "f"
	TESTOFFSET  = 123456
	ROOTBUCKETS []string
	SUBBUCKETS  []string
	BUFIOBUFFER = 4 * 1024 // a history line with sha256 is 102 bytes long including LF or 38 bytes of payload + hashLen
	History     HISTORY
	DEBUG       bool = true
	DEBUG0      bool = false
	DEBUG1      bool = false
	DEBUG2      bool = false
	DEBUG9      bool = false
	LOCKHISTORY      = make(chan struct{}, 1)
	HEXCHARS         = []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"}
)
View Source
var (
	CPUProfile bool // set before boot

)
View Source
var (
	ForcedReplay bool
)
View Source
var (
	// set HEX true: converts offset into hex strings to store in bbolt
	// dont change later once db is initialized!
	HEX bool = true
)

Functions

func CPUBURN

func CPUBURN()

func CRC

func CRC(input string) string

func ConvertHistoryObjectToString

func ConvertHistoryObjectToString(obj *HistoryObject) string

func GetMedian

func GetMedian(char string, bucket string, slice *[]int64, new int64, lim int, minian int64, maxian int64, print bool) (med int64)

func IsPow2

func IsPow2(n int) bool

func LOCKfunc

func LOCKfunc(achan chan struct{}, src string) bool

func LeftPad

func LeftPad(input *string, length int)

func NullPad

func NullPad(input *string, length int)

func PrintMemoryStats

func PrintMemoryStats()

func PrintMemoryStatsEvery

func PrintMemoryStatsEvery(interval time.Duration)

func RemoveNullPad

func RemoveNullPad(input string) string

func UNLOCKfunc

func UNLOCKfunc(achan chan struct{}, src string)

func UnixTimeMicroSec

func UnixTimeMicroSec() int64

func UnixTimeMilliSec

func UnixTimeMilliSec() int64

func UnixTimeNanoSec

func UnixTimeNanoSec() int64

func UnixTimeSec

func UnixTimeSec() int64

Types

type AHASH

type AHASH struct {
	// contains filtered or unexported fields
}

type AccessControlList

type AccessControlList struct {
	// contains filtered or unexported fields
}
var (
	ACL        AccessControlList
	DefaultACL map[string]bool // can be set before booting
)

func (*AccessControlList) IsAllowed

func (a *AccessControlList) IsAllowed(ip string) bool

func (*AccessControlList) SetACL

func (a *AccessControlList) SetACL(ip string, val bool)

func (*AccessControlList) SetupACL

func (a *AccessControlList) SetupACL()

type BATCHLOCKS

type BATCHLOCKS struct {
	// contains filtered or unexported fields
}

type BLCH

type BLCH struct {
	// contains filtered or unexported fields
}

BATCHLOCKCHAN

type BOLTDB_PTR

type BOLTDB_PTR struct {
	BoltDB *bolt.DB
	// contains filtered or unexported fields
}

type BQ

type BQ struct {
	Maps   map[string]map[string]chan *BatchOffset
	BootCh chan struct{}
	// contains filtered or unexported fields
}

BatchQueue

type BatchLOG

type BatchLOG struct {
	// contains filtered or unexported fields
}

type BatchLOGGER

type BatchLOGGER struct {
	// contains filtered or unexported fields
}

type BatchOffset

type BatchOffset struct {
	// contains filtered or unexported fields
}

used to batch write items to boltDB

type BoltDBs

type BoltDBs struct {
	// contains filtered or unexported fields
}

type CCC

type CCC struct {
	Counter map[string]uint64 // counter key: value
}

CharCacheCounter

type ClearCache

type ClearCache struct {
	// contains filtered or unexported fields
}

type ClearCacheChan

type ClearCacheChan struct {
	// contains filtered or unexported fields
}

type HISTORY

type HISTORY struct {
	L1Cache L1CACHE
	L2Cache L2CACHE
	L3Cache L3CACHE

	Offset int64 // the actual offset for history.dat

	WriterChan chan *HistoryObject // history.dat writer channel
	IndexChan  chan *HistoryIndex  // main index query channel

	BatchLogs  BatchLOGGER
	BatchLocks map[string]*BATCHLOCKS // used to lock char:bucket in BoltSync and boltBucketPutBatch
	BoltDBsMap *BoltDBs               // using a ptr to a struct in the map allows updating the struct values without updating the map

	Counter map[string]uint64

	WBR bool // WatchBoltRunning

	CPUfile *os.File // ptr to file for cpu profiling
	MEMfile *os.File // ptr to file for mem profiling
	// TCPchan: used to send hobj via handleRConn to a remote historyServer
	TCPchan chan *HistoryObject
	// contains filtered or unexported fields
}

func (*HISTORY) AddHistory

func (his *HISTORY) AddHistory(hobj *HistoryObject, useL1Cache bool) int

func (*HISTORY) BoltSync

func (his *HISTORY) BoltSync(db *bolt.DB, char string, reopen bool) error

Public: BoltSync - for every DB call function with: db=nil and char=[0-9a-f]

func (*HISTORY) BoltSyncAll

func (his *HISTORY) BoltSyncAll() error

Public: BoltSyncAll boltDBs

func (*HISTORY) BootHistory

func (his *HISTORY) BootHistory(history_dir string, hashdb_dir string, useHashDB bool, boltOpts *bolt.Options, keyalgo int, keylen int)

BootHistory initializes the history component, configuring its settings and preparing it for operation. It sets up the necessary directories for history and hash databases, and opens the history data file. The function also manages the communication channels for reading and writing historical data. If the `useHashDB` parameter is set to true, it initializes the history database (HashDB) and starts worker routines. Parameters:

  • history_dir: The directory where history data will be stored.
  • hashdb_dir: The directory where the history database (HashDB) will be stored.
  • useHashDB: If true, enables the use of the history database (HashDB).
  • boltOpts: Bolt database options for configuring the HashDB.
  • keyalgo: The hash algorithm used for indexing historical data.
  • keylen: The length of the hash values used for indexing.

func (*HISTORY) BootHistoryClient

func (his *HISTORY) BootHistoryClient(historyServer string)

func (*HISTORY) CLOSE_HISTORY

func (his *HISTORY) CLOSE_HISTORY()

func (*HISTORY) CrunchBatchLogs

func (his *HISTORY) CrunchBatchLogs(more bool)

func (*HISTORY) DoCacheEvict

func (his *HISTORY) DoCacheEvict(char string, hash string, offset int64, key string)

gets called in BBATCH.go:boltBucketPutBatch() after boltTX

func (*HISTORY) DupeCheck

func (his *HISTORY) DupeCheck(db *bolt.DB, char string, bucket string, key string, hash string, offset int64, setempty bool, file *os.File, batchQueue chan *BatchOffset) (int, error)

DupeCheck checks for duplicate message-ID hashes in a BoltDB bucket. It manages offsets associated with message hashes and handles duplicates, ensuring the integrity of the historical data. If a hash is a duplicate, it returns 1, otherwise, it returns 0. It also handles the creation of new hash entries in the bucket when needed.

func (*HISTORY) FseekHistoryHeader

func (his *HISTORY) FseekHistoryHeader(output *[]byte) (int, error)

func (*HISTORY) FseekHistoryLine

func (his *HISTORY) FseekHistoryLine(offset int64) (string, error)

func (*HISTORY) FseekHistoryMessageHash

func (his *HISTORY) FseekHistoryMessageHash(file *os.File, offset int64, char string, bucket string, rethash *string) error

FseekHistoryMessageHash seeks to a specified offset in the history file and extracts a message-ID hash. It reads characters from the file until a tab character ('\t') is encountered, extracting the hash enclosed in curly braces. If a valid hash is found, it returns the hash as a string without curly braces. If the end of the file (EOF) is reached, it returns a special EOF marker.

func (*HISTORY) GetBoltBucketStats

func (his *HISTORY) GetBoltBucketStats(char string, print bool)

func (*HISTORY) GetBoltHashOpen

func (his *HISTORY) GetBoltHashOpen() int

func (*HISTORY) GetBoltStat

func (his *HISTORY) GetBoltStat(char string, print bool) (OpenTxN int, TxN int)

func (*HISTORY) GetBoltStats

func (his *HISTORY) GetBoltStats(char string, print bool) (OpenTxN int, TxN int)

func (*HISTORY) GetCounter

func (his *HISTORY) GetCounter(k string) uint64

func (*HISTORY) IndexQuery

func (his *HISTORY) IndexQuery(hash string, indexRetChan chan int, offset int64) (int, error)

func (*HISTORY) LockAllBatchLocks

func (his *HISTORY) LockAllBatchLocks(char string)

func (*HISTORY) NewRConn

func (his *HISTORY) NewRConn(historyServer string) *RemoteConn

func (*HISTORY) PrintBoltPerformance

func (his *HISTORY) PrintBoltPerformance()

func (*HISTORY) PrintCacheStats

func (his *HISTORY) PrintCacheStats()

func (*HISTORY) RebuildHashDB

func (his *HISTORY) RebuildHashDB() error

func (*HISTORY) ReplayHisDat

func (his *HISTORY) ReplayHisDat()

func (*HISTORY) SET_DEBUG

func (his *HISTORY) SET_DEBUG(debug int)

func (*HISTORY) Sync_upcounter

func (his *HISTORY) Sync_upcounter(k string)

func (*HISTORY) Sync_upcounterN

func (his *HISTORY) Sync_upcounterN(k string, v uint64)

func (*HISTORY) Wait4HashDB

func (his *HISTORY) Wait4HashDB()

func (*HISTORY) WatchBolt

func (his *HISTORY) WatchBolt()

type HistoryIndex

type HistoryIndex struct {
	Hash         string
	Char         string
	Offset       int64
	IndexRetChan chan int
}

used to query the index

type HistoryObject

type HistoryObject struct {
	MessageIDHash string
	StorageToken  string // "F" = flatstorage | "M" = mongodb | "X" = deleted
	Char          string
	Arrival       int64
	Expires       int64
	Date          int64
	ResponseChan  chan int // receives a 0,1,2 :: pass|duplicate|retrylater
}

func ConvertStringToHistoryObject

func ConvertStringToHistoryObject(parts []string) (*HistoryObject, error)

type HistorySettings

type HistorySettings struct {
	// constant values once DBs are initalized
	Ka int // keyalgo
	Kl int // keylen
	Ki int // keyindex
	Bp int // bucketsperdb
}

builds the history.dat header

type L1CACHE

type L1CACHE struct {
	Caches  map[string]*L1CACHEMAP
	Extend  map[string]*L1ECH
	Muxers  map[string]*L1MUXER
	Counter map[string]*CCC
	// contains filtered or unexported fields
}

func (*L1CACHE) BootL1Cache

func (l1 *L1CACHE) BootL1Cache(his *HISTORY)

The BootL1Cache method initializes the cache system. It creates cache maps, initializes them with initial sizes, and starts goroutines to periodically purge expired entries.

func (*L1CACHE) L1Stats

func (l1 *L1CACHE) L1Stats(statskey string) (retval uint64, retmap map[string]uint64)

func (*L1CACHE) LockL1Cache

func (l1 *L1CACHE) LockL1Cache(hash string, char string, value int, useHashDB bool) int

The LockL1Cache method is used to LOCK a `MessageIDHash` for processing. If the value is not in the cache or has expired, it locks the cache, updates the cache with a new value, and returns the value. Possible return values:

CaseLock == already in processing
CaseWrite == already in processing
CaseDupes == is a duplicate
CasePass == not a duplicate == locked article for processing

func (*L1CACHE) Set

func (l1 *L1CACHE) Set(hash string, char string, value int, flagexpires bool)

The Set method is used to set a value in the cache. If the cache size is close to its maximum, it grows the cache.

type L1CACHEMAP

type L1CACHEMAP struct {
	// contains filtered or unexported fields
}

type L1ECH

type L1ECH struct {
	// contains filtered or unexported fields
}

L1ExtendChan

type L1ITEM

type L1ITEM struct {
	// contains filtered or unexported fields
}

type L1MUXER

type L1MUXER struct {
	// contains filtered or unexported fields
}

type L1PQ

type L1PQ []L1PQItem

type L1PQItem

type L1PQItem struct {
	Key     string
	Expires int64
}

type L1pqQ

type L1pqQ struct {
	// contains filtered or unexported fields
}

func (*L1pqQ) Pop

func (pq *L1pqQ) Pop() (*L1PQItem, int)

func (*L1pqQ) Push

func (pq *L1pqQ) Push(item L1PQItem)

type L2CACHE

type L2CACHE struct {
	Caches map[string]*L2CACHEMAP
	Extend map[string]*L2ECH
	Muxers map[string]*L2MUXER

	Counter map[string]*CCC
	// contains filtered or unexported fields
}

func (*L2CACHE) BootL2Cache

func (l2 *L2CACHE) BootL2Cache(his *HISTORY)

The BootL2Cache method initializes the L2 cache. It creates cache maps, initializes them with initial sizes, and starts goroutines to periodically clean up expired entries.

func (*L2CACHE) GetHashFromOffset

func (l2 *L2CACHE) GetHashFromOffset(offset int64, rethash *string)

The GetHashFromOffset method retrieves a hash from the L2 cache using an offset as the key.

func (*L2CACHE) L2Stats

func (l2 *L2CACHE) L2Stats(statskey string) (retval uint64, retmap map[string]uint64)

func (*L2CACHE) OffsetToChar

func (l2 *L2CACHE) OffsetToChar(offset int64) (retval string)

func (*L2CACHE) SetOffsetHash

func (l2 *L2CACHE) SetOffsetHash(offset int64, hash string, flagexpires bool)

The SetOffsetHash method sets a cache item in the L2 cache using an offset as the key and a hash as the value. It also dynamically grows the cache when necessary.

type L2CACHEMAP

type L2CACHEMAP struct {
	// contains filtered or unexported fields
}

type L2ECH

type L2ECH struct {
	// contains filtered or unexported fields
}

L2ExtendChan

type L2ITEM

type L2ITEM struct {
	// contains filtered or unexported fields
}

type L2MUXER

type L2MUXER struct {
	// contains filtered or unexported fields
}

type L2PQ

type L2PQ []L2PQItem

type L2PQItem

type L2PQItem struct {
	Key     int64
	Expires int64
}

type L2pqQ

type L2pqQ struct {
	// contains filtered or unexported fields
}

func (*L2pqQ) Pop

func (pq *L2pqQ) Pop() (*L2PQItem, int)

func (*L2pqQ) Push

func (pq *L2pqQ) Push(item L2PQItem)

type L3CACHE

type L3CACHE struct {
	Caches map[string]*L3CACHEMAP
	Extend map[string]*L3ECH
	Muxers map[string]*L3MUXER

	Counter map[string]*CCC
	// contains filtered or unexported fields
}

func (*L3CACHE) BootL3Cache

func (l3 *L3CACHE) BootL3Cache(his *HISTORY)

The BootL3Cache method initializes the L3 cache. It creates cache maps, initializes them with initial sizes, and starts goroutines to periodically clean up expired entries.

func (*L3CACHE) GetOffsets

func (l3 *L3CACHE) GetOffsets(key string, char string, offsets *[]int64) int

The GetOffsets method retrieves a slice of offsets from the L3 cache using a key and a char.

func (*L3CACHE) L3Stats

func (l3 *L3CACHE) L3Stats(statskey string) (retval uint64, retmap map[string]uint64)

func (*L3CACHE) SetOffsets

func (l3 *L3CACHE) SetOffsets(key string, char string, offsets []int64, flagexpires bool, src string)

The SetOffsets method sets a cache item in the L3 cache using a key, char and a slice of offsets as the value. It also dynamically grows the cache when necessary.

type L3CACHEMAP

type L3CACHEMAP struct {
	// contains filtered or unexported fields
}

type L3ECH

type L3ECH struct {
	// contains filtered or unexported fields
}

L3ExtendChan

type L3ITEM

type L3ITEM struct {
	// contains filtered or unexported fields
}

type L3MUXER

type L3MUXER struct {
	// contains filtered or unexported fields
}

type L3PQ

type L3PQ []L3PQItem

type L3PQItem

type L3PQItem struct {
	Key     string
	Expires int64
}

type L3pqQ

type L3pqQ struct {
	// contains filtered or unexported fields
}

func (*L3pqQ) Pop

func (pq *L3pqQ) Pop() (*L3PQItem, int)

func (*L3pqQ) Push

func (pq *L3pqQ) Push(item L3PQItem)

type Memhash

type Memhash struct {
	// contains filtered or unexported fields
}

type Offsets

type Offsets struct {
	// contains filtered or unexported fields
}

type RemoteConn

type RemoteConn struct {
	// contains filtered or unexported fields
}

holds connection to historyServer

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL