Documentation ¶
Index ¶
- Constants
- Variables
- type Consumer
- func (c *Consumer) BootstrapStatus() bool
- func (c *Consumer) CheckIfQueuesAreDrained() error
- func (c *Consumer) ClearEventStats()
- func (c *Consumer) CloseAllRunningDcpFeeds()
- func (c *Consumer) ConsumerName() string
- func (c *Consumer) DcpEventsRemainingToProcess() uint64
- func (c *Consumer) EventingNodeUUIDs() []string
- func (c *Consumer) EventsProcessedPSec() *cm.EventProcessingStats
- func (c *Consumer) GetAssignedVbs(workerName string) ([]uint16, error)
- func (c *Consumer) GetEventProcessingStats() map[string]uint64
- func (c *Consumer) GetExecutionStats() map[string]interface{}
- func (c *Consumer) GetFailureStats() map[string]interface{}
- func (c *Consumer) GetInsight() *common.Insight
- func (c *Consumer) GetLcbExceptionsStats() map[string]uint64
- func (c *Consumer) GetMetaStoreStats() map[string]uint64
- func (c *Consumer) GetOwner() *common.Owner
- func (c *Consumer) GetPrevRebalanceInCompleteStatus() bool
- func (c *Consumer) GetRebalanceStatus() bool
- func (c *Consumer) HandleV8Worker() error
- func (c *Consumer) HostPortAddr() string
- func (c *Consumer) Index() int
- func (c *Consumer) InternalVbDistributionStats() []uint16
- func (c *Consumer) NodeUUID() string
- func (c *Consumer) NotifyClusterChange()
- func (c *Consumer) NotifyPrepareTopologyChange(keepNodes, ejectNodes []string)
- func (c *Consumer) NotifyRebalanceStop()
- func (c *Consumer) NotifySettingsChange()
- func (c *Consumer) NotifyWorker()
- func (c *Consumer) PauseConsumer()
- func (c *Consumer) Pid() int
- func (c *Consumer) RebalanceStatus() bool
- func (c *Consumer) RebalanceTaskProgress() *cm.RebalanceProgress
- func (c *Consumer) RemoveSupervisorToken() error
- func (c *Consumer) ResetBootstrapDone()
- func (c *Consumer) ResetCounters()
- func (c *Consumer) ResolveHostname(instance common.DebuggerInstance) string
- func (c *Consumer) SendAssignedVbs()
- func (c *Consumer) SendDeleteCidMsg(cid uint32, partition uint16, seqNo uint64)
- func (c *Consumer) SendNoOp(seqNo uint64, partition uint16)
- func (c *Consumer) Serve()
- func (c *Consumer) SetBootstrapStatus(status bool)
- func (c *Consumer) SetConnHandle(conn net.Conn)
- func (c *Consumer) SetFeatureMatrix(featureMatrix uint32)
- func (c *Consumer) SetFeedbackConnHandle(conn net.Conn)
- func (c *Consumer) SetRebalanceStatus(status bool)
- func (c *Consumer) SignalBootstrapFinish()
- func (c *Consumer) SignalConnected()
- func (c *Consumer) SignalFeedbackConnected()
- func (c *Consumer) SignalStopDebugger() error
- func (c *Consumer) SpawnCompilationWorker(appCode, appContent, appName, eventingPort string, ...) (*common.CompileStatus, error)
- func (c *Consumer) Stop(context string)
- func (c *Consumer) String() string
- func (c *Consumer) TimerDebugStats() map[int]map[string]interface{}
- func (c *Consumer) UpdateEncryptionLevel(enforceTLS, encryptOn bool)
- func (c *Consumer) UpdateWorkerQueueMemCap(quota int64)
- func (c *Consumer) VbDcpEventsRemainingToProcess() map[int]int64
- func (c *Consumer) VbEventingNodeAssignMapUpdate(vbEventingNodeAssignMap map[uint16]string)
- func (c *Consumer) VbProcessingStats() map[uint16]map[string]interface{}
- func (c *Consumer) VbSeqnoStats() map[int]map[string]interface{}
- func (c *Consumer) WorkerVbMapUpdate(workerVbucketMap map[string][]uint16)
- type DcpStatsLog
- type DcpVbStats
- type LogKey
- type OwnershipEntry
- type ResponseStat
Constants ¶
const ( XATTR_EVENTING = "_eventing" XATTR_SYNC = "_sync" XATTR_MOU = "_mou" XATTR_CHKPT = "_checkpoints" )
const ( Unused3 int8 Unused4 )
const (
AggChanSizeMultiplier = 8
)
Note: Should be a multiple of number of dcpFeeds which we might not know during initialising consumer Hence, assuming 8 KV dcpFeeds for an average of 8 KV nodes.
const (
// ClusterChangeNotifChBufSize limits buffer size for cluster change notif from producer
ClusterChangeNotifChBufSize = 10
)
Variables ¶
var ( StateInit = state("StreamInit") StateStreamEnd = state("StreamEnd") // Caller should provide the request that its making and not this value StateStreamRequest = state("StreamRequest") )
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
Consumer is responsible interacting with c++ v8 worker over local tcp port
func NewConsumer ¶
func NewConsumer(hConfig *common.HandlerConfig, pConfig *common.ProcessConfig, rConfig *common.RebalanceConfig, index int, uuid, nsServerPort string, eventingNodeUUIDs []string, vbnos []uint16, app *common.AppConfig, dcpConfig map[string]interface{}, p common.EventingProducer, s common.EventingSuperSup, cursorRegistry common.CursorRegistryMgr, numVbuckets int, retryCount *int64, vbEventingNodeAssignMap map[uint16]string, workerVbucketMap map[string][]uint16, featureMatrix uint32) *Consumer
NewConsumer called by producer to create consumer handle
func (*Consumer) BootstrapStatus ¶
BootstrapStatus returns state of bootstrap for consumer instance
func (*Consumer) CheckIfQueuesAreDrained ¶
CheckIfQueuesAreDrained looks at all queues to make sure no events are left, to avoid potential loss of events - especially during rebalance out and pausing of execution of a function
func (*Consumer) ClearEventStats ¶
func (c *Consumer) ClearEventStats()
ClearEventStats flushes event processing stats
func (*Consumer) CloseAllRunningDcpFeeds ¶
func (c *Consumer) CloseAllRunningDcpFeeds()
CloseAllRunningDcpFeeds drops all socket connections to DCP producer
func (*Consumer) ConsumerName ¶
ConsumerName returns consumer name e.q <event_handler_name>_worker_1
func (*Consumer) DcpEventsRemainingToProcess ¶
DcpEventsRemainingToProcess reports cached value for dcp events remaining to producer
func (*Consumer) EventingNodeUUIDs ¶
EventingNodeUUIDs return list of known eventing node uuids
func (*Consumer) EventsProcessedPSec ¶
func (c *Consumer) EventsProcessedPSec() *cm.EventProcessingStats
EventsProcessedPSec reports dcp + timer events triggered per sec
func (*Consumer) GetAssignedVbs ¶
func (*Consumer) GetEventProcessingStats ¶
GetEventProcessingStats exposes dcp/timer processing stats
func (*Consumer) GetExecutionStats ¶
GetExecutionStats returns OnUpdate/OnDelete success/failure stats for event handlers from cpp world
func (*Consumer) GetFailureStats ¶
GetFailureStats returns failure stats for event handlers from cpp world
func (*Consumer) GetInsight ¶
func (*Consumer) GetLcbExceptionsStats ¶
GetLcbExceptionsStats returns libcouchbase exception stats from CPP workers
func (*Consumer) GetMetaStoreStats ¶
GetMetaStoreStats exposes timer store related stat counters
func (*Consumer) GetPrevRebalanceInCompleteStatus ¶
GetPrevRebalanceInCompleteStatus returns rebalance status for consumer instance
func (*Consumer) GetRebalanceStatus ¶
GetRebalanceStatus returns rebalance status for consumer instance
func (*Consumer) HandleV8Worker ¶
HandleV8Worker sets up CPP V8 worker post its bootstrap
func (*Consumer) HostPortAddr ¶
HostPortAddr returns the HostPortAddr combination of current eventing node e.g. 127.0.0.1:25000
func (*Consumer) Index ¶
Index returns the index of consumer among all consumers designated for specific handler on an eventing node
func (*Consumer) InternalVbDistributionStats ¶
InternalVbDistributionStats returns internal state of vbucket ownership distribution on local eventing node
func (*Consumer) NotifyClusterChange ¶
func (c *Consumer) NotifyClusterChange()
NotifyClusterChange is called by producer handle to signify each consumer instance about StartTopologyChange rpc call from cbauth service.Manager
func (*Consumer) NotifyPrepareTopologyChange ¶
NotifyPrepareTopologyChange is called by producer instance to notify about updated list of node uuids
func (*Consumer) NotifyRebalanceStop ¶
func (c *Consumer) NotifyRebalanceStop()
NotifyRebalanceStop is called by producer to signal stopping of rebalance operation
func (*Consumer) NotifySettingsChange ¶
func (c *Consumer) NotifySettingsChange()
NotifySettingsChange signals consumer instance of settings update
func (*Consumer) NotifyWorker ¶
func (c *Consumer) NotifyWorker()
func (*Consumer) PauseConsumer ¶
func (c *Consumer) PauseConsumer()
This, being the very first consumer level function involved in pause, also holds the responsibility to try and refresh its metadata handle in case of a encryption level change
func (*Consumer) RebalanceStatus ¶
RebalanceStatus returns state of rebalance for consumer instance
func (*Consumer) RebalanceTaskProgress ¶
func (c *Consumer) RebalanceTaskProgress() *cm.RebalanceProgress
RebalanceTaskProgress reports progress to producer
func (*Consumer) RemoveSupervisorToken ¶
func (*Consumer) ResetBootstrapDone ¶
func (c *Consumer) ResetBootstrapDone()
ResetBootstrapDone to unset bootstrap flag
func (*Consumer) ResetCounters ¶
func (c *Consumer) ResetCounters()
func (*Consumer) ResolveHostname ¶
func (c *Consumer) ResolveHostname(instance common.DebuggerInstance) string
ResolveHostname returns external IP address of this node. Looks through the alternate addresses as well. Returns alt addr if found. In-case of failure returns 127.0.0.1
func (*Consumer) SendAssignedVbs ¶
func (c *Consumer) SendAssignedVbs()
func (*Consumer) SendDeleteCidMsg ¶
func (*Consumer) SetBootstrapStatus ¶
SetBootstrapStatus updates bootstrapping status for consumer instance
func (*Consumer) SetConnHandle ¶
SetConnHandle sets the tcp connection handle for CPP V8 worker
func (*Consumer) SetFeatureMatrix ¶
func (*Consumer) SetFeedbackConnHandle ¶
SetFeedbackConnHandle initialised the socket connect for data channel from eventing-consumer
func (*Consumer) SetRebalanceStatus ¶
SetRebalanceStatus update rebalance status for consumer instance
func (*Consumer) SignalBootstrapFinish ¶
func (c *Consumer) SignalBootstrapFinish()
SignalBootstrapFinish is leveraged by Eventing.Producer instance to know if corresponding Eventing.Consumer instance has finished bootstrap
func (*Consumer) SignalConnected ¶
func (c *Consumer) SignalConnected()
SignalConnected notifies consumer routine when CPP V8 worker has connected to tcp listener instance
func (*Consumer) SignalFeedbackConnected ¶
func (c *Consumer) SignalFeedbackConnected()
SignalFeedbackConnected notifies consumer routine when CPP V8 worker has connected to data channel
func (*Consumer) SignalStopDebugger ¶
SignalStopDebugger signal C++ consumer to stop debugger
func (*Consumer) SpawnCompilationWorker ¶
func (c *Consumer) SpawnCompilationWorker(appCode, appContent, appName, eventingPort string, handlerHeaders, handlerFooters []string) (*common.CompileStatus, error)
SpawnCompilationWorker bring up a CPP worker to compile the user supplied handler code
func (*Consumer) TimerDebugStats ¶
TimerDebugStats captures timer related stats to assist in debugging mismatches during rebalance
func (*Consumer) UpdateEncryptionLevel ¶
func (*Consumer) UpdateWorkerQueueMemCap ¶
UpdateWorkerQueueMemCap revises the memory cap for cpp worker, dcp and timer queues
func (*Consumer) VbDcpEventsRemainingToProcess ¶
VbDcpEventsRemainingToProcess reports cached dcp events remaining broken down to vbucket level
func (*Consumer) VbEventingNodeAssignMapUpdate ¶
VbEventingNodeAssignMapUpdate captures updated node to vbucket assignment
func (*Consumer) VbProcessingStats ¶
VbProcessingStats exposes consumer vb metadata to producer
func (*Consumer) VbSeqnoStats ¶
VbSeqnoStats returns seq no stats, which can be useful in figuring out missed events during rebalance VbSeqnoStats returns seq no stats, which can be useful in figuring out missed events during rebalance
func (*Consumer) WorkerVbMapUpdate ¶
WorkerVbMapUpdate captures updated mapping of active consumers to vbuckets they should handle as per static planner
type DcpStatsLog ¶
type DcpStatsLog interface { AddDcpLog(vb uint16, key LogKey, value string) DeletePartition(vb uint16) }
func NewDcpStatsLog ¶
func NewDcpStatsLog(pollingInterval time.Duration, logSuffix string, stopChannel chan struct{}) DcpStatsLog
type DcpVbStats ¶
type DcpVbStats struct { CurrentState state `json:"state"` Ts string `json:"ts"` LastRequest string `json:"last_request"` StreamResponse map[string]*ResponseStat `json:"response,omitempty"` // contains filtered or unexported fields }
func NewDcpVbStat ¶
func NewDcpVbStat() *DcpVbStats
func (*DcpVbStats) AddStat ¶
func (stat *DcpVbStats) AddStat(key LogKey, value string)
type OwnershipEntry ¶
type OwnershipEntry struct { AssignedWorker string `json:"assigned_worker"` CurrentVBOwner string `json:"current_vb_owner"` Operation string `json:"operation"` SeqNo uint64 `json:"seq_no"` Timestamp string `json:"timestamp"` }
OwnershipEntry captures the state of vbucket within the metadata blob
type ResponseStat ¶
func NewResponseStat ¶
func NewResponseStat() *ResponseStat