Documentation ¶
Index ¶
- type Config
- type Data
- type Node
- func (node *Node) AddToQueryQueue(req map[string]interface{}) int
- func (node *Node) AutomaticBackup()
- func (node *Node) ConnectToObservers()
- func (node *Node) CurrentMemoryUsage() uint64
- func (node *Node) CvFsInfo(entry []fs.DirEntry) ([]fs.FileInfo, error)
- func (node *Node) Delete(collection string, ks interface{}, vs interface{}, vol int, skip int, ...) []interface{}
- func (node *Node) DeleteKeyFromColl(collection string, key string) int
- func (node *Node) GetLogs(r io.Reader) int
- func (node *Node) HandleClientConnection(conn net.Conn)
- func (node *Node) Insert(collection string, jsonMap map[string]interface{}, conn net.Conn) error
- func (node *Node) LostReconnectObservers()
- func (node *Node) PLof(data, level string)
- func (node *Node) RemoveFromQueryQueue(id int)
- func (node *Node) RenewNodeConfig() error
- func (node *Node) Search(mu *sync.RWMutex, i int, tbd *[]int, collection string, ks interface{}, ...)
- func (node *Node) Select(collection string, ks interface{}, vs interface{}, vol int, skip int, ...) []interface{}
- func (node *Node) SendToObservers(jsonStr string)
- func (node *Node) SetupInitializeCDat() error
- func (node *Node) SetupNodeConfig() error
- func (node *Node) SignalListener()
- func (node *Node) StartRunQueryQueue()
- func (node *Node) StartTcp()
- func (node *Node) SyncOut()
- func (node *Node) SyncOutQueryQueue()
- func (node *Node) Update(collection string, ks interface{}, vs interface{}, vol int, skip int, ...) []interface{}
- func (node *Node) WriteToFile(backup bool)
- type Observer
- type ObserverConnection
- type Replica
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { Replicas []Replica `yaml:"replicas"` // Replicas are replica of this current node TLSCert string `yaml:"tls-cert"` // TLS cert path TLSKey string `yaml:"tls-key"` // TLS cert key Host string `yaml:"host"` // Node host i.e 0.0.0.0 usually TLS bool `default:"false" yaml:"tls"` // Use TLS? Port int `yaml:"port"` // Node port Key string `yaml:"key"` // Key for a cluster to communicate with the node and also used to resting data. MaxMemory uint64 `yaml:"max-memory"` // Default 10240MB = 10 GB (1024 * 10) LogMaxLines int `yaml:"log-max-lines"` // At what point to clear logs. Each log line start's with a [UTC TIME] LOG DATA Logging bool `default:"false" yaml:"logging"` // Log to file ? ReplicationSyncTime int `yaml:"replication-sync-time"` // in minutes default is every 10 minutes ReplicationSyncTimeout int `yaml:"replication-sync-timeout"` // As your node grows in size you may want to increase. Default is 10 minutes. TLSReplication bool `default:"false" yaml:"tls-replication"` // If your cluster node replicas are running TLS then configure this to true AutomaticBackups bool `default:"false" yaml:"automatic-backups"` // If for some reason a .cdat gets corrupt you can choose to have the system save a state of your .cdat file every set n amount of time. (default is every 8 hours(480 minutes) to make a backup of your nodes data under BackupsDirectory(which the system will create inside your binary executable location) files are named like so .cdat_YYMMDDHHMMSS in your set timezone AutomaticBackupTime int `yaml:"automatic-backup-time"` // Automatic node backup time. Default is 8 (hours) AutomaticBackupCleanup bool `default:"false" yaml:"automatic-backup-cleanup"` // If set true node will clean up backups that are older than AutomaticBackupCleanupTime days old AutomaticBackupCleanupHours int `yaml:"automatic-backup-cleanup-hours"` // Clean up old .cdat backups that are n amount hours old only used if AutomaticBackups is set true default is 12 hours Timezone string `default:"Local" yaml:"timezone"` // i.e America/Chicago default is local system time Observers []Observer `yaml:"observers"` // Observer servers listening for realtime node events (insert,update,delete). node if configured will relay successful inserts, updates, and deletes to all Observer(s) TLSObservers bool `yaml:"tls-observers"` // Set whether your Observers are listening on tls or not BackupsDirectory string `yaml:"backups-directory"` // Backups directory by default is in the execution directory /backups/ Whatever is provided the system will create the director(ies) if they doesn't exist. }
Config is the cluster config struct
type Data ¶
type Data struct { Map map[string][]map[string]interface{} // Data hash map Writers map[string]*sync.RWMutex // Collection writers }
Data is the node data struct
type Node ¶
type Node struct { TCPAddr *net.TCPAddr // TCPAddr represents the address of the nodes TCP end point TCPListener *net.TCPListener // TCPListener is the node TCP network listener. Wg *sync.WaitGroup // Node WaitGroup waits for all goroutines to finish up SignalChannel chan os.Signal // Catch operating system signal Config Config // Node config TLSConfig *tls.Config // Node TLS config if TLS is true ContextCancel context.CancelFunc // For gracefully shutting down ConfigMu *sync.RWMutex // Node config mutex Data *Data // Node data Context context.Context // Main looped go routine context. This is for listeners, event loops and so forth LogMu *sync.Mutex // Log file mutex LogFile *os.File // Opened log file ObserverConnections []*ObserverConnection // ObserverConnections QueryQueue []map[string]interface{} // QueryQueue is a queue of queries coming in from cluster(s) and is synced to a file which is encrypted every 100ms QueryQueueMu *sync.Mutex // QueryQueue mutex }
func (*Node) AddToQueryQueue ¶
AddToQueryQueue adds to query queue and returns unique query id
func (*Node) AutomaticBackup ¶
func (node *Node) AutomaticBackup()
AutomaticBackup automatically backs up node data every set AutomaticBackupTime hours to node backups directory under .cdat.unixtime Also will remove any old backups based on the days provided on AutomaticBackupCleanupTime
func (*Node) ConnectToObservers ¶
func (node *Node) ConnectToObservers()
ConnectToObservers connects to Observer listeners node will send an initial Key: SHAREDKEY\r\n This is read by the Observer and accepted at which point the Node and Observer can communicate.
func (*Node) CurrentMemoryUsage ¶
func (*Node) Delete ¶
func (node *Node) Delete(collection string, ks interface{}, vs interface{}, vol int, skip int, oprs interface{}, lock bool, conditions []interface{}, sortPos string, sortKey string) []interface{}
Delete is the node data delete method
func (*Node) DeleteKeyFromColl ¶
DeleteKeyFromColl Deletes key from all collection documents
func (*Node) HandleClientConnection ¶
func (*Node) LostReconnectObservers ¶
func (node *Node) LostReconnectObservers()
LostReconnectObservers connects to lost observers or will try to.
func (*Node) RemoveFromQueryQueue ¶
func (*Node) RenewNodeConfig ¶
func (*Node) Search ¶
func (node *Node) Search(mu *sync.RWMutex, i int, tbd *[]int, collection string, ks interface{}, vs interface{}, vol int, skip int, oprs interface{}, conditions []interface{}, del bool, update bool, objs *[]interface{})
Search checks if provided index within data collection meets conditions
func (*Node) Select ¶
func (node *Node) Select(collection string, ks interface{}, vs interface{}, vol int, skip int, oprs interface{}, lock bool, conditions []interface{}, del bool, sortPos string, sortKey string, count bool, update bool) []interface{}
Select is the node data select method
func (*Node) SendToObservers ¶
SendToObservers transmits a new insert, update, or delete event to all configured observers
func (*Node) SetupInitializeCDat ¶
SetupInitializeCDat reads .cdat file and will reinstate data back into memory. On failure and backups are setup this method will also automatically recover if corrupt or try to.
func (*Node) SetupNodeConfig ¶
SetupNodeConfig sets up default cluster node config i.e .nodeconfig
func (*Node) SignalListener ¶
func (node *Node) SignalListener()
SignalListener listens for system signals
func (*Node) StartRunQueryQueue ¶
func (node *Node) StartRunQueryQueue()
StartRunQueryQueue runs all queries that were on queue before shutdown
func (*Node) SyncOut ¶
func (node *Node) SyncOut()
SyncOut syncs current data to replicas at configured interval
func (*Node) SyncOutQueryQueue ¶
func (node *Node) SyncOutQueryQueue()
SyncOutQueryQueue syncs out query queue to .qqueue file which contains a list of queries that did not finish if any. On node start up the node will finish them off.
func (*Node) Update ¶
func (node *Node) Update(collection string, ks interface{}, vs interface{}, vol int, skip int, oprs interface{}, lock bool, conditions []interface{}, uks []interface{}, nvs []interface{}, sortPos string, sortKey string) []interface{}
Update is the node data update method
func (*Node) WriteToFile ¶
type Observer ¶
type Observer struct { Host string `yaml:"host"` // Host of Observer i.e an ip or fqdn Port int `yaml:"port"` // Port of Observer }
Observer is a FlexiKalish Observer which listens for realtime node events.
type ObserverConnection ¶
type ObserverConnection struct { Conn *net.TCPConn // Net connection SecureConn *tls.Conn // Secure connection with TLS Text *textproto.Conn // For writing and reading Ok bool // Is observer ok? Observer Observer // The underlying Observer for connection }
ObserverConnection is Node to Observer TCP or TLS connection