processors

package
v0.0.0-...-9acc2fa Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2024 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	CommitNeo4jRecordsCounts = prometheus.NewCounterVec(prometheus.CounterOpts{
		Name: "neo4j_commit_records_total",
		Help: "Total number of records committed to neo4j",
	}, []string{"worker", "status", "namespace"})
	KafkaTopicsLag = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Name: "kafka_consumer_group_lag",
		Help: "Kafka consumer group lag per topic",
	}, []string{"topic", "namespace"})
)

Functions

func NewBulkBuffer

func NewBulkBuffer() *bulKBuffer

func NewKafkaProcessors

func NewKafkaProcessors(namespace string) map[string]*BulkProcessor

func Process

func Process(s *BulkProcessor, tenantID string, b []byte) error

func StartGetLagByTopic

func StartGetLagByTopic(ctx context.Context, kafkaBrokers []string, groupID string, kgoLogger kgo.Logger) error

Types

type BulkProcessor

type BulkProcessor struct {
	// contains filtered or unexported fields
}

func NewBulkProcessor

func NewBulkProcessor(name string, ns string, fn commitFn) *BulkProcessor

func NewBulkProcessorWithSize

func NewBulkProcessorWithSize(name string, ns string, fn commitFn, size int) *BulkProcessor

func (*BulkProcessor) Add

func (s *BulkProcessor) Add(b BulkRequest)

func (*BulkProcessor) BulkActions

func (s *BulkProcessor) BulkActions(bulkActions int) *BulkProcessor

func (*BulkProcessor) Close

func (p *BulkProcessor) Close() error

func (*BulkProcessor) Flush

func (p *BulkProcessor) Flush()

func (*BulkProcessor) FlushInterval

func (s *BulkProcessor) FlushInterval(interval time.Duration) *BulkProcessor

func (*BulkProcessor) Start

func (p *BulkProcessor) Start(ctx context.Context) error

func (*BulkProcessor) Stop

func (p *BulkProcessor) Stop() error

func (*BulkProcessor) Workers

func (s *BulkProcessor) Workers(num int) *BulkProcessor

type BulkRequest

type BulkRequest struct {
	NameSpace string
	Data      []byte
}

func NewBulkRequest

func NewBulkRequest(namespace string, data []byte) BulkRequest

type Ingester

type Ingester struct {
	// contains filtered or unexported fields
}

func NewIngester

func NewIngester(ns directory.NamespaceID, cfg wtils.Config, cancel context.CancelFunc) (Ingester, error)

func (*Ingester) AddAuditLog

func (i *Ingester) AddAuditLog(record *kgo.Record)

func (*Ingester) Start

func (i *Ingester) Start(ctx context.Context)

func (*Ingester) StartAuditLogProcessor

func (i *Ingester) StartAuditLogProcessor(ctx context.Context) error

func (*Ingester) StartKafkaConsumers

func (i *Ingester) StartKafkaConsumers(ctx context.Context, kgoLogger kgo.Logger) error

func (*Ingester) Stop

func (i *Ingester) Stop()

type Mappable

type Mappable interface {
	ToMap() map[string]interface{}
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL