core

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2024 License: Apache-2.0 Imports: 15 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrRedisConfNotInitialized = fmt.Errorf("redis config is not initialized")
	ErrConsumerNotify          = fmt.Errorf("consumer exits with notify")
)

Functions

func Exit

func Exit()

func Initialize

func Initialize(internalPc *conf.InternalProcessorConfig, pc *models.ProcessorCallbacks) error

func MetricsClient added in v0.5.7

func MetricsClient() *utils.MetricsClient

func NewConsumerGroupHandler

func NewConsumerGroupHandler(logs *log.Logger, setupFunc func() error, consumeFunc func(record *models.Record) error,
	workerNotifyChannel <-chan string, workerReadyChannel chan<- struct{}) sarama.ConsumerGroupHandler

func PassToDefaultOutputTopic

func PassToDefaultOutputTopic(ctx context.Context, record *models.Record) error

func PassToOutputTopic

func PassToOutputTopic(ctx context.Context, name string, record *models.Record) error

func Run

func Run() error

func Work

func Work(ctx context.Context, consumerConfig *conf.ConsumerConfig, workerIndex int, processFunc models.ProcessCallback,
	errCh chan<- error, wg *sync.WaitGroup, workerNotifyChannel <-chan string, workerReadyChannel chan<- struct{})

Types

type Cron added in v0.5.1

type Cron struct {
	// contains filtered or unexported fields
}

func NewCron added in v0.5.1

func NewCron(tickInterval time.Duration, windowSize int64, w *Watermark, done <-chan bool) *Cron

type JoinWorker added in v0.5.1

type JoinWorker struct {
	// contains filtered or unexported fields
}

func NewJoinWorker added in v0.5.1

func NewJoinWorker(w *Watermark, s state.StateStore, expierTime int) *JoinWorker

func (*JoinWorker) JoinWorkerProcessCallback added in v0.5.1

func (j *JoinWorker) JoinWorkerProcessCallback(ctx context.Context, record *models.Record) error

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

type Watermark added in v0.5.1

type Watermark struct {
	// contains filtered or unexported fields
}

func NewWatermark added in v0.5.1

func NewWatermark(windowSize int64) *Watermark

func (*Watermark) Advance added in v0.5.1

func (w *Watermark) Advance()

func (*Watermark) Get added in v0.5.1

func (w *Watermark) Get() int64

type WorkerMeta added in v0.5.1

type WorkerMeta struct {
	WorkerId   string
	TopicIndex int
	Alive      bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL