processor

package module
v0.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2022 License: MIT Imports: 11 Imported by: 1

README

core-processor

The project provides a framework for consuming Kafka.

It aims to simplify the logic of data consumption and transmission, and actively provide a configurable and efficient way.

With core and core-processor, we can do this:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    
    "github.com/DoNewsCode/core"
    processor "github.com/DoNewsCode/core-processor"
    "github.com/DoNewsCode/core/di"
    "github.com/DoNewsCode/core/otkafka"
    "github.com/segmentio/kafka-go"
)

type Handler struct {
}

func NewHandlerOut() processor.Out {
    return processor.NewOut(
        &Handler{},
    )
}

type Data struct {
    ID   int    `json:"id"`
    Name string `json:"name"`
}

func (h *Handler) Info() *processor.Info {
    return &processor.Info{
        Name:      "default", // the reader name is default
        BatchSize: 3,
    }
}

func (h *Handler) Handle(ctx context.Context, msg *kafka.Message) (interface{}, error) {
    e := &Data{}
    if err := json.Unmarshal(msg.Value, &e); err != nil {
        return nil, err
    }
    return e, nil
}

func (h *Handler) Batch(ctx context.Context, data []interface{}) error {
    for _, e := range data {
        fmt.Println(e.(*Data))
    }
    return nil
}

func main() {
    // prepare config and dependencies
    c := core.New(
        core.WithInline("kafka.reader.default.brokers", []string{"127.0.0.1:9092"}),
        core.WithInline("kafka.reader.default.topic", "test"),
        core.WithInline("kafka.reader.default.groupID", "test"),
        core.WithInline("kafka.reader.default.startOffset", kafka.FirstOffset),
    )
    defer c.Shutdown()
    c.ProvideEssentials()
    c.Provide(otkafka.Providers())
    c.AddModuleFunc(processor.New)

    // provide your handlers
    c.Provide(di.Deps{
        NewHandlerOut,
    })
    
    // start server
    err := c.Serve(context.Background())
    if err != nil {
        panic(err)
    }
}

After the above, we just need to add handlers and provide new methods for core. We can use processor.Info to flexibly adjust the operation of the processor.

Have fun!

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsFatalErr added in v0.2.2

func IsFatalErr(err error) bool

func NewFatalErr added in v0.2.2

func NewFatalErr(err error) error

NewFatalErr wrap original err to create FatalErr

Types

type BatchFunc

type BatchFunc func(ctx context.Context, data []interface{}) error

BatchFunc type for BatchHandler.Batch Func.

type BatchHandler

type BatchHandler interface {
	Handler
	// Batch processing the results returned by Handler.Handle.
	Batch(ctx context.Context, data []interface{}) error
}

BatchHandler one more Batch method than Handler.

type FatalErr added in v0.2.2

type FatalErr interface {
	Fatal() bool
}

FatalErr raise this error to exist processor

type HandleFunc

type HandleFunc func(ctx context.Context, msg *kafka.Message) (interface{}, error)

HandleFunc type for Handler.Handle Func.

type Handler

type Handler interface {
	// Info set the topic name and some config.
	Info() *Info
	// Handle for *kafka.Message.
	Handle(ctx context.Context, msg *kafka.Message) (interface{}, error)
}

Handler only include Info and Handle func.

type Info

type Info struct {
	// used to get reader from otkafka.ReaderMaker.
	// default: "default"
	Name string
	// reader workers count.
	// default: 1
	ReadWorker int
	// batch workers count.
	// default: 1
	BatchWorker int
	// data size for batch processing.
	// default: 1
	BatchSize int
	// handler workers count.
	HandleWorker int
	// the size of the data channel.
	// default: 100
	ChanSize int
	// run the batchFunc automatically at specified intervals, avoid not executing without reaching BatchSize
	// default: 30s
	AutoBatchInterval time.Duration
}

Info the info of BatchHandler.

Note:

If sequence is necessary, make sure that per worker count is one.
Multiple goroutines cannot guarantee the order in which data is processed.

type Out

type Out struct {
	di.Out

	Handlers []Handler `group:"ProcessorHandler,flatten"`
}

Out to provide Handler to in.Handlers.

func NewOut

func NewOut(handlers ...Handler) Out

NewOut create Out to provide Handler to in.Handlers.

Usage:
	func newHandlerA(logger log.Logger) processor.Out {
		return processor.NewOut(
			&HandlerA{logger: logger},
		)
	}
Or
	func newHandlers(logger log.Logger) processor.Out {
		return processor.NewOut(
			&HandlerA{logger: logger},
			&HandlerB{logger: logger},
		)
	}

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor dispatch Handler.

func New

func New(i in) (*Processor, error)

New create *Processor Module.

func (*Processor) ProvideRunGroup

func (e *Processor) ProvideRunGroup(group *run.Group)

ProvideRunGroup run workers:

  1. Fetch message from *kafka.Reader.
  2. Handle message by Handler.Handle.
  3. Batch data by BatchHandler.Batch. If batch success then commit message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL