architecture

package
v0.0.0-...-3658086 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2024 License: MIT Imports: 30 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Replicas                    []Replica  `yaml:"replicas"`                                 // Replicas are replica of this current node
	TLSCert                     string     `yaml:"tls-cert"`                                 // TLS cert path
	TLSKey                      string     `yaml:"tls-key"`                                  // TLS cert key
	Host                        string     `yaml:"host"`                                     // Node host i.e 0.0.0.0 usually
	TLS                         bool       `default:"false" yaml:"tls"`                      // Use TLS?
	Port                        int        `yaml:"port"`                                     // Node port
	Key                         string     `yaml:"key"`                                      // Key for a cluster to communicate with the node and also used to resting data.
	MaxMemory                   uint64     `yaml:"max-memory"`                               // Default 10240MB = 10 GB (1024 * 10)
	LogMaxLines                 int        `yaml:"log-max-lines"`                            // At what point to clear logs.  Each log line start's with a [UTC TIME] LOG DATA
	Logging                     bool       `default:"false" yaml:"logging"`                  // Log to file ?
	ReplicationSyncTime         int        `yaml:"replication-sync-time"`                    // in minutes default is every 10 minutes
	ReplicationSyncTimeout      int        `yaml:"replication-sync-timeout"`                 // As your node grows in size you may want to increase.  Default is 10 minutes.
	TLSReplication              bool       `default:"false" yaml:"tls-replication"`          // If your cluster node replicas are running TLS then configure this to true
	AutomaticBackups            bool       `default:"false" yaml:"automatic-backups"`        // If for some reason a .cdat gets corrupt you can choose to have the system save a state of your .cdat file every set n amount of time.  (default is every 8 hours(480 minutes) to make a backup of your nodes data under BackupsDirectory(which the system will create inside your binary executable location) files are named like so .cdat_YYMMDDHHMMSS in your set timezone
	AutomaticBackupTime         int        `yaml:"automatic-backup-time"`                    // Automatic node backup time.  Default is 8 (hours)
	AutomaticBackupCleanup      bool       `default:"false" yaml:"automatic-backup-cleanup"` // If set true node will clean up backups that are older than AutomaticBackupCleanupTime days old
	AutomaticBackupCleanupHours int        `yaml:"automatic-backup-cleanup-hours"`           // Clean up old .cdat backups that are n amount hours old only used if AutomaticBackups is set true default is 12 hours
	Timezone                    string     `default:"Local" yaml:"timezone"`                 // i.e America/Chicago default is local system time
	Observers                   []Observer `yaml:"observers"`                                // Observer servers listening for realtime node events (insert,update,delete).  node if configured will relay successful inserts, updates, and deletes to all Observer(s)
	TLSObservers                bool       `yaml:"tls-observers"`                            // Set whether your Observers are listening on tls or not
	BackupsDirectory            string     `yaml:"backups-directory"`                        // Backups directory by default is in the execution directory /backups/ Whatever is provided the system will create the director(ies) if they doesn't exist.
}

Config is the cluster config struct

type Data

type Data struct {
	Map     map[string][]map[string]interface{} // Data hash map
	Writers map[string]*sync.RWMutex            // Collection writers
}

Data is the node data struct

type Node

type Node struct {
	TCPAddr             *net.TCPAddr             // TCPAddr represents the address of the nodes TCP end point
	TCPListener         *net.TCPListener         // TCPListener is the node TCP network listener.
	Wg                  *sync.WaitGroup          // Node WaitGroup waits for all goroutines to finish up
	SignalChannel       chan os.Signal           // Catch operating system signal
	Config              Config                   // Node  config
	TLSConfig           *tls.Config              // Node TLS config if TLS is true
	ContextCancel       context.CancelFunc       // For gracefully shutting down
	ConfigMu            *sync.RWMutex            // Node config mutex
	Data                *Data                    // Node data
	Context             context.Context          // Main looped go routine context.  This is for listeners, event loops and so forth
	LogMu               *sync.Mutex              // Log file mutex
	LogFile             *os.File                 // Opened log file
	ObserverConnections []*ObserverConnection    // ObserverConnections
	QueryQueue          []map[string]interface{} // QueryQueue is a queue of queries coming in from cluster(s) and is synced to a file which is encrypted every 100ms
	QueryQueueMu        *sync.Mutex              // QueryQueue mutex
}

func (*Node) AddToQueryQueue

func (node *Node) AddToQueryQueue(req map[string]interface{}) int

AddToQueryQueue adds to query queue and returns unique query id

func (*Node) AutomaticBackup

func (node *Node) AutomaticBackup()

AutomaticBackup automatically backs up node data every set AutomaticBackupTime hours to node backups directory under .cdat.unixtime Also will remove any old backups based on the days provided on AutomaticBackupCleanupTime

func (*Node) ConnectToObservers

func (node *Node) ConnectToObservers()

ConnectToObservers connects to Observer listeners node will send an initial Key: SHAREDKEY\r\n This is read by the Observer and accepted at which point the Node and Observer can communicate.

func (*Node) CurrentMemoryUsage

func (node *Node) CurrentMemoryUsage() uint64

func (*Node) CvFsInfo

func (node *Node) CvFsInfo(entry []fs.DirEntry) ([]fs.FileInfo, error)

func (*Node) Delete

func (node *Node) Delete(collection string, ks interface{}, vs interface{}, vol int, skip int, oprs interface{}, lock bool, conditions []interface{}, sortPos string, sortKey string) []interface{}

Delete is the node data delete method

func (*Node) DeleteKeyFromColl

func (node *Node) DeleteKeyFromColl(collection string, key string) int

DeleteKeyFromColl Deletes key from all collection documents

func (*Node) GetLogs

func (node *Node) GetLogs(r io.Reader) int

func (*Node) HandleClientConnection

func (node *Node) HandleClientConnection(conn net.Conn)

func (*Node) Insert

func (node *Node) Insert(collection string, jsonMap map[string]interface{}, conn net.Conn) error

func (*Node) LostReconnectObservers

func (node *Node) LostReconnectObservers()

LostReconnectObservers connects to lost observers or will try to.

func (*Node) PLof

func (node *Node) PLof(data, level string)

func (*Node) RemoveFromQueryQueue

func (node *Node) RemoveFromQueryQueue(id int)

func (*Node) RenewNodeConfig

func (node *Node) RenewNodeConfig() error

func (*Node) Search

func (node *Node) Search(mu *sync.RWMutex, i int, tbd *[]int, collection string, ks interface{}, vs interface{}, vol int, skip int, oprs interface{}, conditions []interface{}, del bool, update bool, objs *[]interface{})

Search checks if provided index within data collection meets conditions

func (*Node) Select

func (node *Node) Select(collection string, ks interface{}, vs interface{}, vol int, skip int, oprs interface{}, lock bool, conditions []interface{}, del bool, sortPos string, sortKey string, count bool, update bool) []interface{}

Select is the node data select method

func (*Node) SendToObservers

func (node *Node) SendToObservers(jsonStr string)

SendToObservers transmits a new insert, update, or delete event to all configured observers

func (*Node) SetupInitializeCDat

func (node *Node) SetupInitializeCDat() error

SetupInitializeCDat reads .cdat file and will reinstate data back into memory. On failure and backups are setup this method will also automatically recover if corrupt or try to.

func (*Node) SetupNodeConfig

func (node *Node) SetupNodeConfig() error

SetupNodeConfig sets up default cluster node config i.e .nodeconfig

func (*Node) SignalListener

func (node *Node) SignalListener()

SignalListener listens for system signals

func (*Node) StartRunQueryQueue

func (node *Node) StartRunQueryQueue()

StartRunQueryQueue runs all queries that were on queue before shutdown

func (*Node) StartTcp

func (node *Node) StartTcp()

func (*Node) SyncOut

func (node *Node) SyncOut()

SyncOut syncs current data to replicas at configured interval

func (*Node) SyncOutQueryQueue

func (node *Node) SyncOutQueryQueue()

SyncOutQueryQueue syncs out query queue to .qqueue file which contains a list of queries that did not finish if any. On node start up the node will finish them off.

func (*Node) Update

func (node *Node) Update(collection string, ks interface{}, vs interface{}, vol int, skip int, oprs interface{}, lock bool, conditions []interface{}, uks []interface{}, nvs []interface{}, sortPos string, sortKey string) []interface{}

Update is the node data update method

func (*Node) WriteToFile

func (node *Node) WriteToFile(backup bool)

type Observer

type Observer struct {
	Host string `yaml:"host"` // Host of Observer i.e an ip or fqdn
	Port int    `yaml:"port"` // Port of Observer
}

Observer is a FlexiKalish Observer which listens for realtime node events.

type ObserverConnection

type ObserverConnection struct {
	Conn       *net.TCPConn    // Net connection
	SecureConn *tls.Conn       // Secure connection with TLS
	Text       *textproto.Conn // For writing and reading
	Ok         bool            // Is observer ok?
	Observer   Observer        // The underlying Observer for connection
}

ObserverConnection is Node to Observer TCP or TLS connection

type Replica

type Replica struct {
	Host string `yaml:"host"` // Host of replica i.e an ip or fqdn
	Port int    `yaml:"port"` // Port of replica
}

Replica is a cluster node that current node data will be replicated/synced to

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL