iot

package module
v0.0.0-...-114974b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 2, 2022 License: GPL-3.0 Imports: 32 Imported by: 2

README

The IOT engine

We need a diagram here with the engine in the middle and client 'connections' on the bottom and 'connections' to a 'bridge' above.

I don't like the word 'connection' and I don't like 'socket so I'll use Contact. As in lower contacts and higher contacts.

At this point there's no TCP so we're using channels of objects.

Documentation

Overview

Package iot comments. TODO: package comments for this pub/sub system.

Index

Constants

View Source
const HashTypeLen = 24

HashTypeLen now it's 24 bytes long

Variables

View Source
var (

	// TopicsAdded is
	TopicsAdded = promauto.NewCounter(prometheus.CounterOpts{
		Name: "look_topics_added",
		Help: "The total number new topics/subscriptions] added",
	})

	// API1GetStats is
	API1GetStats = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "api1_getstats",
			Help: "http requests.",
		},
	)
	// IotHTTP404 is used in TCPUtil.go
	IotHTTP404 = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "iot_http_404",
			Help: "http 404.",
		},
	)

	// API1PostGurus is
	API1PostGurus = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "api2_post_gurus",
			Help: "http post /api2/set.",
		},
	)

	// API1PostGurusFail is searchable
	API1PostGurusFail = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "api1_post_gurus_fail",
			Help: "http post /api2/set.",
		},
	)

	// TCPNameResolverFail1 is
	TCPNameResolverFail1 = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "tcp_name_resolver_fail1",
			Help: "failed to resolve address of guru.",
		},
	)
	// TCPNameResolverFail2 is
	TCPNameResolverFail2 = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "tcp_name_resolver_fail2",
			Help: "dial timeout looking for gurus.",
		},
	)

	// TCPNameResolverConnected is
	TCPNameResolverConnected = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "tcp_name_resolver_connected",
			Help: "normal connect happened.",
		},
	)
	//TCPServerDidntStart is
	TCPServerDidntStart = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "tcp_server_fail1",
			Help: "packet server listen fail.",
		},
	)
	//TCPServerAcceptError is
	TCPServerAcceptError = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "tcp_server_fail2",
			Help: "packet server acceptor fail.",
		},
	)

	//TCPServerConnAccept is
	TCPServerConnAccept = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "tcp_server_conn_accept",
			Help: "normal packer server connection.",
		},
	)

	//TCPServerNewConnection is
	TCPServerNewConnection = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "tcp_server_new_conn",
			Help: "normal packer server connection.",
		},
	)
	//TCPServerPacketReadError is
	TCPServerPacketReadError = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "tcp_server_packet_read_error",
			Help: "packets.ReadPacket error.",
		},
	)

	//TCPServerIotPushEror is
	TCPServerIotPushEror = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "tcp_server_packet_push_error",
			Help: "Push error.",
		},
	)
)
View Source
var (
	// HTTPServe404 is
	HTTPServe404 = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "main_http_404",
			Help: "Number of 404 main.ServeHTTP.",
		},
	)
	// ForwardsCount3100 is
	ForwardsCount3100 = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "main_3100_forwards",
			Help: "Number forwards main.startPublicServer3100.",
		},
	)
	// ForwardsCount9090 is for main.go
	ForwardsCount9090 = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "main_9090_forwards",
			Help: "http forwards main.startPublicServer9090.",
		},
	)
	// ForwardsCount8000 is
	ForwardsCount8000 = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "main_8000_forwards",
			Help: "tcp count main.startPublicServer9090.",
		},
	)
	// ForwardsDialFail8000 is
	ForwardsDialFail8000 = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "main_8000_dialfail",
			Help: "tcp dialfail main.startPublicServer9090.",
		},
	)
	// ForwardsConnectedl8000 is
	ForwardsConnectedl8000 = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "main_8000_connected",
			Help: "tcp conected main.startPublicServer9090.",
		},
	)
	// ForwardsAcceptl8000 is
	ForwardsAcceptl8000 = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "main_8000_accepted",
			Help: "tcp accepted main.startPublicServer9090.",
		},
	)
	// BadTokenRequests is
	BadTokenRequests = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "main_bad_token_requests",
			Help: "Token requests with flaws.",
		},
	)
)
View Source
var DEBUG = false

DEBUG because I don't know a better way. todo: look into conditional inclusion

View Source
var GuruNameToConfigMap map[string]*Executive

GuruNameToConfigMap for ease of unit test.

View Source
var TestLimits = ExecutiveLimits{}

TestLimits is for tests of autoscaling.TestLimits These are executive limits and not to be confused with token limits. irl connections limit is likely to be 10k and subscriptions 1e6

Functions

func BucketClear

func BucketClear(dest *StatsWithTime)

BucketClear is

func BucketCopy

BucketCopy is

func BucketSubtract

func BucketSubtract(src *StatsWithTime, dest *StatsWithTime)

BucketSubtract is

func ConnectGuruToSuperAide

func ConnectGuruToSuperAide(guru *Executive, aide *Executive)

ConnectGuruToSuperAide for testing a cluster with a supercluster we need channels from guru to aide.

func HasError

func HasError(p packets.Interface) *packets.Disconnect

HasError literally means does this packet have an "error" option returns a Disconnect if the p has an error

func IsWholeMqttPacket

func IsWholeMqttPacket(data []byte) (bool, int)

IsWholeMqttPacket returns true if the data is an mqtt packet and returns the length used.

func MQTTHandlePacket

func MQTTHandlePacket(cc *mqttContact, control libmqtt.Packet)

MQTTHandlePacket is for when the packet was parsed elsewhere (like in the websocket).

func NewWithInt64Comparator

func NewWithInt64Comparator() *redblacktree.Tree

NewWithInt64Comparator for HalfHash

func PostClusterStats

func PostClusterStats(stats *ClusterStats, addr string) error

PostClusterStats makes http client

func PostUpstreamNames

func PostUpstreamNames(guruList []string, addressList []string, addr string) error

PostUpstreamNames does SetUpstreamNames the hard way we are not going over the internet. Inside a ns should ba well under 1000 ms.

func PushDownFromTop

func PushDownFromTop(looker *LookupTableStruct, p packets.Interface) error

PushDownFromTop to deal with an incoming message going down. typically called by an upperChannel receiving a packet via it's tcp that it dialed. todo: upgrade and consolidate the address logic.

func PushPacketUpFromBottom

func PushPacketUpFromBottom(ssi ContactInterface, p packets.Interface) error

PushPacketUpFromBottom to deal with an incoming message on a bottom contact heading up.

func SocketSetup

func SocketSetup(tcpConn *net.TCPConn) error

SocketSetup sets common options

func SpecialPrint

func SpecialPrint(p *packets.PacketCommon, fn func())

func Text2Packet

func Text2Packet(text string) (packets.Interface, error)

Text2Packet turns badjson into a packet

func WebSocketLoop

func WebSocketLoop(wsConn *websocket.Conn, config *ContactStructConfig)

WebSocketLoop loops

Types

type BillingAccumulator

type BillingAccumulator struct {
	// contains filtered or unexported fields
}

BillingAccumulator is terse

func (*BillingAccumulator) Add

Add accumulates the stats into the BillingAccumulator

func (*BillingAccumulator) AreUnderMax

func (ba *BillingAccumulator) AreUnderMax(now uint32) (bool, string)

AreUnderMax returns if the stats are under the limits and, if not true, returns a message about it.

func (*BillingAccumulator) GetConnections

func (ba *BillingAccumulator) GetConnections(now uint32) float32

GetConnections is

func (*BillingAccumulator) GetInput

func (ba *BillingAccumulator) GetInput(now uint32) float32

GetInput - we sum up some prevous buckets and divide. do we need to sync with Add? No, because all access to this goes through a q in lookup.

func (*BillingAccumulator) GetOutput

func (ba *BillingAccumulator) GetOutput(now uint32) float32

GetOutput is

func (*BillingAccumulator) GetStats

func (ba *BillingAccumulator) GetStats(now uint32, dest *tokens.KnotFreeContactStats)

GetStats calcs them all at once into dest. dest should be zeroed before calling.

func (*BillingAccumulator) GetSubscriptions

func (ba *BillingAccumulator) GetSubscriptions(now uint32) float32

GetSubscriptions is

type ByteChan

type ByteChan struct {
	TheChan chan []byte
}

func (*ByteChan) Write

func (bc *ByteChan) Write(b []byte) (int, error)

this is a packet in bytes

type ByteCountingReader

type ByteCountingReader struct {
	// contains filtered or unexported fields
}

ByteCountingReader keeps track of how much was read.

func (*ByteCountingReader) Read

func (bcr *ByteCountingReader) Read(p []byte) (int, error)

type ByteCountingWriter

type ByteCountingWriter struct {
	// contains filtered or unexported fields
}

ByteCountingWriter keeps track of how much was written.

func (*ByteCountingWriter) Write

func (bcw *ByteCountingWriter) Write(p []byte) (int, error)

type ClusterExecutive

type ClusterExecutive struct {
	Aides []*Executive
	Gurus []*Executive

	PublicKeyTemp  *[32]byte //curve25519.PublicKey // temporary to this run not ed25519
	PrivateKeyTemp *[32]byte //curve25519.PrivateKey
	// contains filtered or unexported fields
}

ClusterExecutive is a list of Executive used for testing.

func MakeSimplestCluster

func MakeSimplestCluster(timegetter func() uint32, isTCP bool, aideCount int, suffix string) *ClusterExecutive

MakeSimplestCluster is just for testing as k8s doesn't work like this.

func MakeTCPMain

func MakeTCPMain(name string, limits *ExecutiveLimits, token string, isGuru bool) *ClusterExecutive

MakeTCPMain is called by main(s) and it news a table and contacts list and starts tcp acceptors.

func (*ClusterExecutive) GetNextAddress

func (ce *ClusterExecutive) GetNextAddress() string

GetNextAddress hands out localhost addresses starting at 9000

func (*ClusterExecutive) GetSubsCount

func (ce *ClusterExecutive) GetSubsCount() int

GetSubsCount returns count of all the subscriptions in all the lookup tables. this is really only good for test.

func (*ClusterExecutive) Heartbeat

func (ce *ClusterExecutive) Heartbeat(now uint32)

Heartbeat everyone when testing

func (*ClusterExecutive) Operate

func (ce *ClusterExecutive) Operate()

Operate where we pretend to be an Operator and resize the cluster. This is really only for test. Only works in non-tcp mode Does not call heartbeat or advance the time.

func (*ClusterExecutive) WaitForActions

func (ce *ClusterExecutive) WaitForActions()

WaitForActions is a utility for unit tests. we must wait for things to happen during tests we pretend to get service from the operator.

type ClusterStats

type ClusterStats struct {
	When  uint32 // unix time
	Stats []*ExecutiveStats
}

ClusterStats is ExecutiveStats from everyone in the cluster. maybe slightly delayed

type ContactInterface

type ContactInterface interface {
	Close(err error)

	GetClosed() bool

	GetKey() HalfHash

	GetExpires() uint32
	SetExpires(when uint32)

	GetToken() *tokens.KnotFreeTokenPayload
	SetToken(*tokens.KnotFreeTokenPayload)

	GetConfig() *ContactStructConfig

	WriteDownstream(cmd packets.Interface) error

	WriteUpstream(cmd packets.Interface) error // called by LookupTableStruct.PushUp

	String() string // used as a default channel name in test

	Heartbeat(uint32) // periodic service ~= 10 sec

	Read(p []byte) (int, error)
	Write(p []byte) (int, error)

	GetRates(now uint32) (int, int, int)

	SetReader(r io.Reader)
	SetWriter(w io.Writer)
}

ContactInterface is usually supplied by a tcp connection

type ContactStruct

type ContactStruct struct {
	// contains filtered or unexported fields
}

ContactStruct is our idea of channel or socket which is downstream from us.

func AddContactStruct

func AddContactStruct(ss *ContactStruct, ssi ContactInterface, config *ContactStructConfig) *ContactStruct

AddContactStruct initializes a contact, and puts the new ss on the global list. It also increments the sequence number in SockStructConfig. note that you must pass the same object twice, once as a ContactStruct and once as the Interface

func (*ContactStruct) Close

func (ss *ContactStruct) Close(err error)

Close closes the conn and the rest of the work too. doesn't send error or disconnect. needs to be overridden

func (*ContactStruct) GetClosed

func (ss *ContactStruct) GetClosed() bool

GetClosed because the contact is still referenced by looker after closed.

func (*ContactStruct) GetConfig

func (ss *ContactStruct) GetConfig() *ContactStructConfig

GetConfig is because we're passing around an interface

func (*ContactStruct) GetExpires

func (ss *ContactStruct) GetExpires() uint32

GetExpires returns when the cc should expire

func (*ContactStruct) GetKey

func (ss *ContactStruct) GetKey() HalfHash

GetKey is because we're passing around an interface

func (*ContactStruct) GetRates

func (ss *ContactStruct) GetRates(now uint32) (int, int, int)

GetRates to peek into in, out, dt := cc.GetRates(now)

func (*ContactStruct) GetSequence

func (ss *ContactStruct) GetSequence() uint64

GetSequence is

func (*ContactStruct) GetToken

func (ss *ContactStruct) GetToken() *tokens.KnotFreeTokenPayload

GetToken return the verified and decoded payload or else nil

func (*ContactStruct) Heartbeat

func (ss *ContactStruct) Heartbeat(now uint32)

Heartbeat is periodic service ~= 10 sec It's going forward stats to to the billing channel

func (*ContactStruct) IncOutput

func (ss *ContactStruct) IncOutput(amt int)

IncOutput so test and fake bytes written

func (*ContactStruct) Read

func (ss *ContactStruct) Read(p []byte) (int, error)

func (*ContactStruct) ReadByte

func (ss *ContactStruct) ReadByte() (byte, error)

ReadByte implements BufferedReader for libmqtt

func (*ContactStruct) SetExpires

func (ss *ContactStruct) SetExpires(when uint32)

SetExpires sets when the ss will expire in unix time

func (*ContactStruct) SetReader

func (ss *ContactStruct) SetReader(r io.Reader)

SetReader allows test to monkey with the flow

func (*ContactStruct) SetToken

func (ss *ContactStruct) SetToken(t *tokens.KnotFreeTokenPayload)

SetToken return the verified and decoded payload or else nil

func (*ContactStruct) SetWriter

func (ss *ContactStruct) SetWriter(w io.Writer)

SetWriter used nuy helpersof_test.go

func (*ContactStruct) String

func (ss *ContactStruct) String() string

func (*ContactStruct) Write

func (ss *ContactStruct) Write(p []byte) (int, error)

func (*ContactStruct) WriteByte

func (ss *ContactStruct) WriteByte(c byte) error

WriteByte implements BufferedWriter for libmqtt

func (*ContactStruct) WriteDownstream

func (ss *ContactStruct) WriteDownstream(cmd packets.Interface) error

WriteDownstream is often overridden in *test* we force plain contacts on the bottom of the guru's they just need to write.

func (*ContactStruct) WriteUpstream

func (ss *ContactStruct) WriteUpstream(cmd packets.Interface) error

WriteUpstream will be overridden this is used by an upper contact and is overridden. See tcpUpperContact

type ContactStructConfig

type ContactStructConfig struct {
	Name string // for debug
	// contains filtered or unexported fields
}

ContactStructConfig could be just a stack frame but I'd like to return it. This could be an interface that implements range and len or and the callbacks. Instead we have function pointers. TODO: revisit.

func NewContactStructConfig

func NewContactStructConfig(looker *LookupTableStruct) *ContactStructConfig

NewContactStructConfig is

func (*ContactStructConfig) AccessContactsList

func (config *ContactStructConfig) AccessContactsList(fn func(config *ContactStructConfig, listOfCi *list.List))

AccessContactsList so we can disconnect them in test and stuff. be sure to always lock. Don't call close or recurse in here or it will deadlock.

func (*ContactStructConfig) GetCe

func (config *ContactStructConfig) GetCe() *ClusterExecutive

GetCe is a getter

func (*ContactStructConfig) GetContactsListCopy

func (config *ContactStructConfig) GetContactsListCopy() []ContactInterface

GetContactsListCopy copies the list.

func (*ContactStructConfig) GetLookup

func (config *ContactStructConfig) GetLookup() *LookupTableStruct

GetLookup is a getter

func (*ContactStructConfig) IsGuru

func (config *ContactStructConfig) IsGuru() bool

IsGuru exposes onfig.lookup.isGuru

func (*ContactStructConfig) Len

func (config *ContactStructConfig) Len() int

Len returns the count of the contacts.

type DevNull

type DevNull struct {
}

DevNull has it's uses.

func (*DevNull) Read

func (null *DevNull) Read(b []byte) (int, error)

func (*DevNull) Write

func (null *DevNull) Write(b []byte) (int, error)

type Executive

type Executive struct {
	Looker *LookupTableStruct
	Config *ContactStructConfig
	Name   string

	Limits *ExecutiveLimits

	ClusterStats *ClusterStats // All the stats

	ClusterStatsString string // serialization of ClusterStats

	IAmBadError error // if something happened to simply ruin us and we're quitting.
	// contains filtered or unexported fields
}

func MakeHTTPExecutive

func MakeHTTPExecutive(ex *Executive, serverName string) *Executive

MakeHTTPExecutive sets up a http server for serving api1 and api2

func MakeMqttExecutive

func MakeMqttExecutive(ex *Executive, serverName string) *Executive

MakeMqttExecutive is a thing like a server, not the exec

func MakeTCPExecutive

func MakeTCPExecutive(ex *Executive, serverName string) *Executive

MakeTCPExecutive is a thing like a server, not the exec

func MakeTextExecutive

func MakeTextExecutive(ex *Executive, serverName string) *Executive

MakeTextExecutive is a thing like a server, not the exec

func NewExecutive

func NewExecutive(sizeEstimate int, aname string, timegetter func() uint32, isGuru bool, ce *ClusterExecutive) *Executive

NewExecutive A wrapper to hold and operate

func (*Executive) DialContactToAnyAide

func (ex *Executive) DialContactToAnyAide(isTCP bool, ce *ClusterExecutive)

DialContactToAnyAide is a utility to wait until we have a reference to an aide address and then get a tcp conn and keep it up and retry and keep it up forever. In test there is a ClusterExecutive struct that has references to all the names and addresses In k8s there is an operator that is periodically sending

func (*Executive) GetExecutiveStats

func (ex *Executive) GetExecutiveStats() *ExecutiveStats

GetExecutiveStats is fractions relative to the limits.

func (*Executive) GetHTTPAddress

func (ex *Executive) GetHTTPAddress() string

GetHTTPAddress is a getter

func (*Executive) GetMQTTAddress

func (ex *Executive) GetMQTTAddress() string

GetMQTTAddress is a getter

func (*Executive) GetSubsCount

func (ex *Executive) GetSubsCount() (int, float64)

GetSubsCount returns a count of how many names it's remembering. it also returns a fraction of buffer usage where 0.0 is empty and 1.0 is full.

func (*Executive) GetTCPAddress

func (ex *Executive) GetTCPAddress() string

GetTCPAddress is a getter

func (*Executive) GetTextAddress

func (ex *Executive) GetTextAddress() string

GetTextAddress is a getter

func (*Executive) Heartbeat

func (ex *Executive) Heartbeat(now uint32)

Heartbeat one per 10 sec?

func (*Executive) WaitForActions

func (ex *Executive) WaitForActions()

WaitForActions needs to be properly implemented. The we inject tracer packets with wait groups into q's and then wait for that.

type ExecutiveLimits

type ExecutiveLimits struct {
	tokens.KnotFreeContactStats //   in out su co
}

ExecutiveLimits will be how we tell if the ex is 'full'

type ExecutiveStats

type ExecutiveStats struct {
	// four float32 :   in out su co
	tokens.KnotFreeContactStats
	Buffers     float32 `json:"buf"`
	Name        string  `json:"name"`
	HTTPAddress string  `json:"http"`
	TCPAddress  string  `json:"tcp"`
	IsGuru      bool    `json:"guru"`

	Limits *ExecutiveLimits `json:"limits"`
}

ExecutiveStats is fractions relative to the limits. a fraction: 1.0 is 100% maxed out. 0 is idle.

func GetServerStats

func GetServerStats(addr string) (*ExecutiveStats, error)

GetServerStats asks nicely over http

func (*ExecutiveStats) DeepCopy

func (in *ExecutiveStats) DeepCopy() *ExecutiveStats

DeepCopy is an atwgenerated deepcopy function, copying the receiver, creating a new AppService.

func (*ExecutiveStats) DeepCopyInto

func (in *ExecutiveStats) DeepCopyInto(out *ExecutiveStats)

DeepCopyInto the slow way

type ExpansionDesired

type ExpansionDesired struct {
	ChangeAides int    // +1 for grow, 0 for same, -1 for shrink
	RemoveAide  string // the name of the aide to delete

	ChangeGurus int // +1 for grow, 0 for same, -1 for shrink

}

ExpansionDesired is

func CalcExpansionDesired

func CalcExpansionDesired(aides []*ExecutiveStats, gurus []*ExecutiveStats) ExpansionDesired

CalcExpansionDesired is used locally in tests and used by the operator to manage the cluster.

type HalfHash

type HalfHash uint64

HalfHash represents

func (*HalfHash) String

func (a *HalfHash) String() string

type HashType

type HashType [3]uint64

HashType is for the hash table that Lookup uses.

func (*HashType) FromHashType

func (h *HashType) FromHashType(src *HashType)

FromHashType init an existing hash from another - basically a copy

func (*HashType) GetBytes

func (h *HashType) GetBytes(b []byte)

GetBytes will fill b byte array with value from h.

func (*HashType) GetFractionalBits

func (h *HashType) GetFractionalBits(n int) int

GetFractionalBits returns a slice of n bits. Values of n greater than 64 are not implemented.

func (*HashType) GetHalfHash

func (h *HashType) GetHalfHash() HalfHash

GetHalfHash is for cases when we can do with 'just' 64 bits.

func (*HashType) GetUint64

func (h *HashType) GetUint64() uint64

GetUint64 is for cases when we can do with 'just' 64 bits.

func (*HashType) HashBytes

func (h *HashType) HashBytes(s []byte)

HashBytes will initialize an existing hash from a string. The string will get hashed to provide the bits so we'll wish this was faster. It doesn't have to be crypto safe but it does need to be evenly distributed. allocates. wanted to use highwayhash.New128 but was scared of 128 bits.

func (*HashType) HashString

func (h *HashType) HashString(s string)

HashString will hash the string and init the HashType

func (*HashType) InitFromBytes

func (h *HashType) InitFromBytes(addressBytes []byte)

InitFromBytes because I need to convert from [] to HashType should return error? rename

func (*HashType) Random

func (h *HashType) Random()

Random HashType initializes with random bits. We don't need to hash these more do we?

func (*HashType) String

func (h *HashType) String() string

type LookReply

type LookReply struct {
	Level       uint32
	Count       uint32
	Null        bool
	Node        string // node name
	IsPermanent bool
}

type LookupTableStruct

type LookupTableStruct struct {
	// contains filtered or unexported fields
}

LookupTableStruct is good for message routing and address lookup.

func NewLookupTable

func NewLookupTable(projectedTopicCount int, aname string, isGuru bool, getTime func() uint32) *LookupTableStruct

NewLookupTable makes a LookupTableStruct, usually a singleton. In the tests we call here and then use the result to init a server. Starts 16 go routines that are hung on their 32 deep q's

func (*LookupTableStruct) FlushMarkerAndWait

func (me *LookupTableStruct) FlushMarkerAndWait()

FlushMarkerAndWait puts a command into the head of *all* the q's and waits for *all* of them to arrive. This way we can wait. for testing.

func (*LookupTableStruct) GetAllSubsCount

func (me *LookupTableStruct) GetAllSubsCount() (int, float64)

GetAllSubsCount returns the count of subscriptions and the average depth of the channels.

func (*LookupTableStruct) Heartbeat

func (me *LookupTableStruct) Heartbeat(now uint32)

Heartbeat is every 10 sec. now is unix seconds.

func (*LookupTableStruct) PushUp

PushUp is to send msg up to guruness. has a q per contact. this is called directly by the pub/sub/look commands. getting an error here is kinda fatal.

func (*LookupTableStruct) SetUpstreamNames

func (me *LookupTableStruct) SetUpstreamNames(names []string, addresses []string)

SetUpstreamNames is called by a cluster exec of some kind when changing the guru count. We will update upstreamRouterStruct names are like: guru-0f3bca46d414d506ecce3de9762df6c3 addresses are like: 10.244.0.149:8384

type StatsWithTime

type StatsWithTime struct {
	tokens.KnotFreeContactStats
	Start uint32 `json:"st"`
}

StatsWithTime is

type UpstreamNamesArg

type UpstreamNamesArg struct {
	Names     []string
	Addresses []string
}

UpstreamNamesArg just has the one job

type WatchedTopic

type WatchedTopic struct {
	// contains filtered or unexported fields
}

watchedTopic is what we'll be collecting a lot of. what if *everyone* is watching this topic? and then the watchers.thetree is huge. these normally time out. See the heartbeat

func (*WatchedTopic) DeleteOption

func (wt *WatchedTopic) DeleteOption(key string)

DeleteOption returns the value,true to go with the key or nil,false

func (*WatchedTopic) GetOption

func (wt *WatchedTopic) GetOption(key string) ([]byte, bool)

GetOption returns the value,true to go with the key or nil,false

func (*WatchedTopic) IsBilling

func (wt *WatchedTopic) IsBilling() (*BillingAccumulator, bool)

GetOption returns the value,true to go with the key or nil,false

func (*WatchedTopic) Iterator

func (wt *WatchedTopic) Iterator() *subIterator

func (*WatchedTopic) OptionSize

func (wt *WatchedTopic) OptionSize() int

utility routines for watchedTopic options OptionSize returns key count which is same as value count

func (*WatchedTopic) SetOption

func (wt *WatchedTopic) SetOption(key string, val interface{})

SetOption adds the key,value

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL