Documentation ¶
Index ¶
- Constants
- Variables
- func CPUBURN()
- func CRC(input string) string
- func ConvertHistoryObjectToString(obj *HistoryObject) string
- func GetMedian(char string, bucket string, slice *[]int64, new int64, lim int, minian int64, ...) (med int64)
- func IsPow2(n int) bool
- func LOCKfunc(achan chan struct{}, src string) bool
- func LeftPad(input *string, length int)
- func NullPad(input *string, length int)
- func PrintMemoryStats()
- func PrintMemoryStatsEvery(interval time.Duration)
- func RemoveNullPad(input string) string
- func UNLOCKfunc(achan chan struct{}, src string)
- func UnixTimeMicroSec() int64
- func UnixTimeMilliSec() int64
- func UnixTimeNanoSec() int64
- func UnixTimeSec() int64
- type AHASH
- type AccessControlList
- type BATCHLOCKS
- type BLCH
- type BOLTDB_PTR
- type BQ
- type BatchLOG
- type BatchLOGGER
- type BatchOffset
- type BoltDBs
- type CCC
- type ClearCache
- type ClearCacheChan
- type HISTORY
- func (his *HISTORY) AddHistory(hobj *HistoryObject, useL1Cache bool) int
- func (his *HISTORY) BoltSync(db *bolt.DB, char string, reopen bool) error
- func (his *HISTORY) BoltSyncAll() error
- func (his *HISTORY) BootHistory(history_dir string, hashdb_dir string, useHashDB bool, boltOpts *bolt.Options, ...)
- func (his *HISTORY) BootHistoryClient(historyServer string)
- func (his *HISTORY) CLOSE_HISTORY()
- func (his *HISTORY) CrunchBatchLogs(more bool)
- func (his *HISTORY) DoCacheEvict(char string, hash string, offset int64, key string)
- func (his *HISTORY) DupeCheck(db *bolt.DB, char string, bucket string, key string, hash string, offset int64, ...) (int, error)
- func (his *HISTORY) FseekHistoryHeader(output *[]byte) (int, error)
- func (his *HISTORY) FseekHistoryLine(offset int64) (string, error)
- func (his *HISTORY) FseekHistoryMessageHash(file *os.File, offset int64, char string, bucket string, rethash *string) error
- func (his *HISTORY) GetBoltBucketStats(char string, print bool)
- func (his *HISTORY) GetBoltHashOpen() int
- func (his *HISTORY) GetBoltStat(char string, print bool) (OpenTxN int, TxN int)
- func (his *HISTORY) GetBoltStats(char string, print bool) (OpenTxN int, TxN int)
- func (his *HISTORY) GetCounter(k string) uint64
- func (his *HISTORY) IndexQuery(hash string, indexRetChan chan int, offset int64) (int, error)
- func (his *HISTORY) LockAllBatchLocks(char string)
- func (his *HISTORY) NewRConn(historyServer string) *RemoteConn
- func (his *HISTORY) PrintBoltPerformance()
- func (his *HISTORY) PrintCacheStats()
- func (his *HISTORY) RebuildHashDB() error
- func (his *HISTORY) ReplayHisDat()
- func (his *HISTORY) SET_DEBUG(debug int)
- func (his *HISTORY) Sync_upcounter(k string)
- func (his *HISTORY) Sync_upcounterN(k string, v uint64)
- func (his *HISTORY) Wait4HashDB()
- func (his *HISTORY) WatchBolt()
- type HistoryIndex
- type HistoryObject
- type HistorySettings
- type L1CACHE
- type L1CACHEMAP
- type L1ECH
- type L1ITEM
- type L1MUXER
- type L1PQ
- type L1PQItem
- type L1pqQ
- type L2CACHE
- func (l2 *L2CACHE) BootL2Cache(his *HISTORY)
- func (l2 *L2CACHE) GetHashFromOffset(offset int64, rethash *string)
- func (l2 *L2CACHE) L2Stats(statskey string) (retval uint64, retmap map[string]uint64)
- func (l2 *L2CACHE) OffsetToChar(offset int64) (retval string)
- func (l2 *L2CACHE) SetOffsetHash(offset int64, hash string, flagexpires bool)
- type L2CACHEMAP
- type L2ECH
- type L2ITEM
- type L2MUXER
- type L2PQ
- type L2PQItem
- type L2pqQ
- type L3CACHE
- func (l3 *L3CACHE) BootL3Cache(his *HISTORY)
- func (l3 *L3CACHE) GetOffsets(key string, char string, offsets *[]int64) int
- func (l3 *L3CACHE) L3Stats(statskey string) (retval uint64, retmap map[string]uint64)
- func (l3 *L3CACHE) SetOffsets(key string, char string, offsets []int64, flagexpires bool, src string)
- type L3CACHEMAP
- type L3ECH
- type L3ITEM
- type L3MUXER
- type L3PQ
- type L3PQItem
- type L3pqQ
- type Memhash
- type Offsets
- type RemoteConn
Constants ¶
const ( HashShort = 0x0B // 11 DefaultBoltINITParallel = intBoltDBs DefaultBoltSYNCParallel = intBoltDBs DefaultReplayDistance int64 = 1024 * 1024 WCBBS_UL = 0xFFFF // adaptive BatchSize => workerCharBucketBatchSize UpperLimit WCBBS_LL = 0xF // adaptive BatchSize => workerCharBucketBatchSize LowerLimit // KeyLen is used with HashShort // 1st char of hash selects boltDB // 2nd + 3rd char (+4th char: if 4K his.rootBUCKETS) of hash selects bucket in boltDB // remaining chars [3:$] are used as Key in BoltDB to store offset(s) // the key is further divided into 1st+2nd+3rd+... char as sub buckets and remainder used as key in the root.bucket.sub.bucket[3:$] // offsets lead into history.dat and point to start of a line containing the full hash MinKeyLen = 8 // goes with HashShort )
const ( // never change this FlagExpires bool = true FlagNeverExpires bool = false )
const ( CR = "\r" LF = "\n" CRLF = CR + LF DefaultSocketPath = "./history.socket" // default launches a tcp port with a telnet interface @ localhost:49119 DefaultServerTCPAddr = "[::]:49119" )
const ( ALWAYS = true // DefExpiresStr use 10 digits as spare so we can update it later without breaking offsets DefExpiresStr string = "----------" // never expires CaseLock = 0xFF // internal cache state. reply with CaseRetry while CaseLock CasePass = 0xF1 // is a reply to L1Lock and IndexQuery CaseDupes = 0x1C // is a reply and cache state CaseRetry = 0x2C // is a reply to if CaseLock or CaseWrite or if history.dat returns EOF CaseAdded = 0x3C // is a reply to WriterChan:responseChan CaseWrite = 0x4C // internal cache state. is not a reply. reply with CaseRetry while CaseWrite is happening CaseError = 0xE1 // some things drop this error ZEROPADLEN = 0xFFF // zeropads the header )
const (
FlagSearch = -1
)
const (
MinRetryWaiter = 100
)
Variables ¶
var ( DBG_BS_LOG bool // debugs BatchLOG for every batch insert! beware of the memory eating dragon! DBG_ABS1 bool // debugs adaptive batchsize in boltBucketPutBatch DBG_ABS2 bool // debugs adaptive batchsize forbatchqueue in boltDB_Worker AdaptBatch bool // automagically adjusts CharBucketBatchSize=>wCBBS=workerCharBucketBatchSize to match BatchFlushEvery BatchFlushEvery int64 = 5120 // flushes boltDB in batch every N milliseconds (1-...) BoltDB_MaxBatchDelay = 10 * time.Millisecond // default value from boltdb:db.go = 10 * time.Millisecond BoltDB_MaxBatchSize int = 1000 // default value from boltdb:db.go = 1000 CharBucketBatchSize int = 1024 // default batchsize per char:bucket batchqueues EmptyStr string // used as pointer )
var ( BoltDBreopenEveryN = 0x0 // reopens boltDB every N added (not batchins) // very experimental and not working! WatchBoltTimer int64 = 10 // prints bolts stats every N seconds. only with DEBUG NoReplayHisDat bool = false // can be set before booting to not replay history.dat // stop replay HisDat if we got this many OKs with a distance to missing // ReplayTestMax depends on bbolt.db.MaxBatchSize. ReplayTestMax should be at least 2x bbolt.db.MaxBatchSize! // if the process crashes: do NOT change the MaxBatchSize before starting! // ReplayHisDat() needs the same MaxBatchSize! ReplayDistance int64 = DefaultReplayDistance // defaults to replay at least 128K messages, more if missed ones (not in hashdb) appear. QIndexChan int = 16 // Main-indexchan can queue this QindexChans int = 16 // every sub-indexchans for a `char` can queue this BoltDB_AllocSize int // if not set defaults: 16 * 1024 * 1024 (min: 1024*1024) BoltSyncEveryS int64 = 60 // call db.sync() every seconds (only used with 'boltopts.NoSync: true') BoltSyncEveryN uint64 = 500000 // call db.sync() after N inserts (only used with 'boltopts.NoSync = true') BoltINITParallel int = DefaultBoltINITParallel // set this via 'history.BoltINITParallel = 1' before calling BootHistory. BoltSYNCParallel int = DefaultBoltSYNCParallel // set this via 'history.BoltSYNCParallel = 1' before calling BootHistory. BoltHashOpen = make(chan struct{}, intBoltDBs) // dont change this HISTORY_INDEX_LOCK = make(chan struct{}, 1) // main lock HISTORY_INDEX_LOCK16 = make(chan struct{}, intBoltDBs) // sub locks // adjust root buckets page splitting behavior // we mostly do random inserts: lower value should be better? RootBucketFillPercent = 1.0 // adjust sub buckets page splitting behavior // unsure if it does anything in sub buckets? SubBucketFillPercent = 0.25 // can be 16 | (default: 256) | 4096 !4K is insane! // creates this many batchQueues and more goroutines // 16 generates a lot of pagesplits in bbolt // 256 is the sweet spot // 4096 generates high load as it launches this many and more go routines and queues! RootBUCKETSperDB = 16 // KeyIndex creates 16^N sub buckets in `his.rootBUCKETS // KeyIndex cuts (shortens) the KeyLen by this to use as subb.buckets // 0 disables sub/nested buckets and uses full Keylen as Key in RootBuckets only. KeyIndex = 0 )
var ( UseArenas bool DBG_CGS bool // DEBUG_CACHE_GROW_SHRINK DefaultCacheExpires int64 = 5 // gets x2 BatchFlushEvery x2 DefaultCacheExtend int64 = 5 // extends cached items after writes DefaultCachePurge int64 = 1 // checks ttl every N seconds. affects CacheExpires/Extend max to + Purge DefaultEvictsCapacity = intBoltDBs // his.cEvCap (size of Extend chan) is normally fine as is. ClearEveryN = DefaultEvictsCapacity )
var ( BootHisCli bool DefaultHistoryServer = "[::1]:49119" // localhost:49119 // set only once before boot TCPchanQ = 128 DefaultDialTimeout = 5 // seconds DefaultRetryWaiter = 500 // milliseconds DefaultDialRetries = -1 // try N times and fail or <= 0 enables infinite retry )
var ( DEBUGL1 bool = false L1 bool = true // better not disable L1 cache... L1CacheExpires int64 = DefaultCacheExpires L1ExtendExpires int64 = DefaultCacheExtend L1Purge int64 = DefaultCachePurge L1InitSize int = 64 * 1024 // L1LockDelay: delays L1 locking by N milliseconds // L1 locking is most likely done per client-connection // settings this greater 0 limits the amount of articles a client can lock&send // 1ms is a max of 1000 messages/sec per conn // 100ms is a max of 10 messages/sec per conn // 250ms is a max of 4 messages/sec per conn // 1000ms is a max of 1 message /sec per conn // text peers mostly dont need more than 4 msg per sec L1LockDelay int = 0 )
var ( DEBUGL2 bool = false L2 bool = true L2CacheExpires int64 = DefaultCacheExpires L2ExtendExpires int64 = DefaultCacheExtend L2Purge int64 = DefaultCachePurge L2InitSize int = 64 * 1024 )
L2Cache: offset => hash less requests to hisDat
var ( DEBUGL3 bool = false L3 bool = true // do not disable! L3CacheExpires int64 = DefaultCacheExpires L3ExtendExpires int64 = DefaultCacheExtend L3Purge int64 = DefaultCachePurge L3InitSize int = 64 * 1024 )
* * L3Cache: key => offsets * less requests to boltDB * * disabling L3 is not a good idea!! * queues hold offsets which DB does not know about! * cache keeps track of duplicate writes * duplicate keys will get an empty_offsets * and the latest will overwrite the past write maybe still in queue *
var ( IndexParallel int = intBoltDBs NumQueueWriteChan int = intBoltDBs HisDatWriteBuffer int = 4 * 1024 )
var ( BootVerbose = true TESTHASH0 = "0f05e27ca579892a63a256dacd657f5615fab04bf81e85f53ee52103e3a4fae8" TESTHASH1 = "f0d784ae13ce7cf1f3ab076027a6265861eb003ad80069cdfb1549dd1b8032e8" TESTHASH2 = "f0d784ae1747092974d02bd3359f044a91ed4fd0a39dc9a1feffe646e6c7ce09" TESTHASH = TESTHASH2 TESTCACKEY = "f0d784ae1" TESTKEY = "784ae1" TESTBUK = "0d" TESTDB = "f" TESTOFFSET = 123456 ROOTBUCKETS []string SUBBUCKETS []string BUFIOBUFFER = 4 * 1024 // a history line with sha256 is 102 bytes long including LF or 38 bytes of payload + hashLen History HISTORY DEBUG bool = true DEBUG0 bool = false DEBUG1 bool = false DEBUG2 bool = false DEBUG9 bool = false LOCKHISTORY = make(chan struct{}, 1) HEXCHARS = []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"} )
var (
CPUProfile bool // set before boot
)
var (
ForcedReplay bool
)
var ( // set HEX true: converts offset into hex strings to store in bbolt // dont change later once db is initialized! HEX bool = true )
Functions ¶
func ConvertHistoryObjectToString ¶
func ConvertHistoryObjectToString(obj *HistoryObject) string
func PrintMemoryStats ¶
func PrintMemoryStats()
func PrintMemoryStatsEvery ¶
func RemoveNullPad ¶
func UNLOCKfunc ¶
func UNLOCKfunc(achan chan struct{}, src string)
func UnixTimeMicroSec ¶
func UnixTimeMicroSec() int64
func UnixTimeMilliSec ¶
func UnixTimeMilliSec() int64
func UnixTimeNanoSec ¶
func UnixTimeNanoSec() int64
func UnixTimeSec ¶
func UnixTimeSec() int64
Types ¶
type AccessControlList ¶
type AccessControlList struct {
// contains filtered or unexported fields
}
var ( ACL AccessControlList DefaultACL map[string]bool // can be set before booting )
func (*AccessControlList) IsAllowed ¶
func (a *AccessControlList) IsAllowed(ip string) bool
func (*AccessControlList) SetACL ¶
func (a *AccessControlList) SetACL(ip string, val bool)
func (*AccessControlList) SetupACL ¶
func (a *AccessControlList) SetupACL()
type BATCHLOCKS ¶
type BATCHLOCKS struct {
// contains filtered or unexported fields
}
type BOLTDB_PTR ¶
type BQ ¶
type BQ struct { Maps map[string]map[string]chan *BatchOffset BootCh chan struct{} // contains filtered or unexported fields }
BatchQueue
type BatchLOGGER ¶
type BatchLOGGER struct {
// contains filtered or unexported fields
}
type BatchOffset ¶
type BatchOffset struct {
// contains filtered or unexported fields
}
used to batch write items to boltDB
type ClearCache ¶
type ClearCache struct {
// contains filtered or unexported fields
}
type ClearCacheChan ¶
type ClearCacheChan struct {
// contains filtered or unexported fields
}
type HISTORY ¶
type HISTORY struct { L1Cache L1CACHE L2Cache L2CACHE L3Cache L3CACHE Offset int64 // the actual offset for history.dat WriterChan chan *HistoryObject // history.dat writer channel IndexChan chan *HistoryIndex // main index query channel BatchLogs BatchLOGGER BatchLocks map[string]*BATCHLOCKS // used to lock char:bucket in BoltSync and boltBucketPutBatch BoltDBsMap *BoltDBs // using a ptr to a struct in the map allows updating the struct values without updating the map Counter map[string]uint64 WBR bool // WatchBoltRunning CPUfile *os.File // ptr to file for cpu profiling MEMfile *os.File // ptr to file for mem profiling // TCPchan: used to send hobj via handleRConn to a remote historyServer TCPchan chan *HistoryObject // contains filtered or unexported fields }
func (*HISTORY) AddHistory ¶
func (his *HISTORY) AddHistory(hobj *HistoryObject, useL1Cache bool) int
func (*HISTORY) BoltSync ¶
Public: BoltSync - for every DB call function with: db=nil and char=[0-9a-f]
func (*HISTORY) BootHistory ¶
func (his *HISTORY) BootHistory(history_dir string, hashdb_dir string, useHashDB bool, boltOpts *bolt.Options, keyalgo int, keylen int)
BootHistory initializes the history component, configuring its settings and preparing it for operation. It sets up the necessary directories for history and hash databases, and opens the history data file. The function also manages the communication channels for reading and writing historical data. If the `useHashDB` parameter is set to true, it initializes the history database (HashDB) and starts worker routines. Parameters:
- history_dir: The directory where history data will be stored.
- hashdb_dir: The directory where the history database (HashDB) will be stored.
- useHashDB: If true, enables the use of the history database (HashDB).
- boltOpts: Bolt database options for configuring the HashDB.
- keyalgo: The hash algorithm used for indexing historical data.
- keylen: The length of the hash values used for indexing.
func (*HISTORY) BootHistoryClient ¶
func (*HISTORY) CLOSE_HISTORY ¶
func (his *HISTORY) CLOSE_HISTORY()
func (*HISTORY) CrunchBatchLogs ¶
func (*HISTORY) DoCacheEvict ¶
gets called in BBATCH.go:boltBucketPutBatch() after boltTX
func (*HISTORY) DupeCheck ¶
func (his *HISTORY) DupeCheck(db *bolt.DB, char string, bucket string, key string, hash string, offset int64, setempty bool, file *os.File, batchQueue chan *BatchOffset) (int, error)
DupeCheck checks for duplicate message-ID hashes in a BoltDB bucket. It manages offsets associated with message hashes and handles duplicates, ensuring the integrity of the historical data. If a hash is a duplicate, it returns 1, otherwise, it returns 0. It also handles the creation of new hash entries in the bucket when needed.
func (*HISTORY) FseekHistoryHeader ¶
func (*HISTORY) FseekHistoryLine ¶
func (*HISTORY) FseekHistoryMessageHash ¶
func (his *HISTORY) FseekHistoryMessageHash(file *os.File, offset int64, char string, bucket string, rethash *string) error
FseekHistoryMessageHash seeks to a specified offset in the history file and extracts a message-ID hash. It reads characters from the file until a tab character ('\t') is encountered, extracting the hash enclosed in curly braces. If a valid hash is found, it returns the hash as a string without curly braces. If the end of the file (EOF) is reached, it returns a special EOF marker.
func (*HISTORY) GetBoltBucketStats ¶
func (*HISTORY) GetBoltHashOpen ¶
func (*HISTORY) GetBoltStat ¶
func (*HISTORY) GetBoltStats ¶
func (*HISTORY) GetCounter ¶
func (*HISTORY) IndexQuery ¶
func (*HISTORY) LockAllBatchLocks ¶
func (*HISTORY) NewRConn ¶
func (his *HISTORY) NewRConn(historyServer string) *RemoteConn
func (*HISTORY) PrintBoltPerformance ¶
func (his *HISTORY) PrintBoltPerformance()
func (*HISTORY) PrintCacheStats ¶
func (his *HISTORY) PrintCacheStats()
func (*HISTORY) RebuildHashDB ¶
func (*HISTORY) ReplayHisDat ¶
func (his *HISTORY) ReplayHisDat()
func (*HISTORY) Sync_upcounter ¶
func (*HISTORY) Sync_upcounterN ¶
func (*HISTORY) Wait4HashDB ¶
func (his *HISTORY) Wait4HashDB()
type HistoryIndex ¶
used to query the index
type HistoryObject ¶
type HistoryObject struct { MessageIDHash string StorageToken string // "F" = flatstorage | "M" = mongodb | "X" = deleted Char string Arrival int64 Expires int64 Date int64 ResponseChan chan int // receives a 0,1,2 :: pass|duplicate|retrylater }
func ConvertStringToHistoryObject ¶
func ConvertStringToHistoryObject(parts []string) (*HistoryObject, error)
type HistorySettings ¶
type HistorySettings struct { // constant values once DBs are initalized Ka int // keyalgo Kl int // keylen Ki int // keyindex Bp int // bucketsperdb }
builds the history.dat header
type L1CACHE ¶
type L1CACHE struct { Caches map[string]*L1CACHEMAP Extend map[string]*L1ECH Muxers map[string]*L1MUXER Counter map[string]*CCC // contains filtered or unexported fields }
func (*L1CACHE) BootL1Cache ¶
The BootL1Cache method initializes the cache system. It creates cache maps, initializes them with initial sizes, and starts goroutines to periodically purge expired entries.
func (*L1CACHE) LockL1Cache ¶
The LockL1Cache method is used to LOCK a `MessageIDHash` for processing. If the value is not in the cache or has expired, it locks the cache, updates the cache with a new value, and returns the value. Possible return values:
CaseLock == already in processing CaseWrite == already in processing CaseDupes == is a duplicate CasePass == not a duplicate == locked article for processing
type L1CACHEMAP ¶
type L1CACHEMAP struct {
// contains filtered or unexported fields
}
type L2CACHE ¶
type L2CACHE struct { Caches map[string]*L2CACHEMAP Extend map[string]*L2ECH Muxers map[string]*L2MUXER Counter map[string]*CCC // contains filtered or unexported fields }
func (*L2CACHE) BootL2Cache ¶
The BootL2Cache method initializes the L2 cache. It creates cache maps, initializes them with initial sizes, and starts goroutines to periodically clean up expired entries.
func (*L2CACHE) GetHashFromOffset ¶
The GetHashFromOffset method retrieves a hash from the L2 cache using an offset as the key.
func (*L2CACHE) OffsetToChar ¶
type L2CACHEMAP ¶
type L2CACHEMAP struct {
// contains filtered or unexported fields
}
type L3CACHE ¶
type L3CACHE struct { Caches map[string]*L3CACHEMAP Extend map[string]*L3ECH Muxers map[string]*L3MUXER Counter map[string]*CCC // contains filtered or unexported fields }
func (*L3CACHE) BootL3Cache ¶
The BootL3Cache method initializes the L3 cache. It creates cache maps, initializes them with initial sizes, and starts goroutines to periodically clean up expired entries.
func (*L3CACHE) GetOffsets ¶
The GetOffsets method retrieves a slice of offsets from the L3 cache using a key and a char.
type L3CACHEMAP ¶
type L3CACHEMAP struct {
// contains filtered or unexported fields
}
type RemoteConn ¶
type RemoteConn struct {
// contains filtered or unexported fields
}
holds connection to historyServer