oni

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2023 License: MIT Imports: 10 Imported by: 0

README

Oni Kafka Framework

coffeehaze codecov Sourcegraph Go Report Card GoDoc License

Oni is Kafka Framework written in Go (Golang). that makes you easy to consume and produce kafka messages using robust API wrapper for kafka-go thanks to segmentio. the usage most likely same with Gin / Echo web framework.

Oni Mask art by @inksyndromeartwork

Contents

Installation
  1. Required go installed on your machine
go version
  1. Get oni and kafka-go
go get -u github.com/coffeehaze/oni
go get -u github.com/segmentio/kafka-go
  1. Import oni
import "github.com/coffeehaze/oni"
Quick Start
  1. Create package model and create foo.go file and place this code to it
package model

type Foo struct {
	FooContent string `json:"foo_content"`
}
  1. Create package consumer and create main.go file and place this code to it
package main

import (
	"context"
	"fmt"
	"github.com/coffeehaze/oni"
	"github.com/your/projectname/model"
	"github.com/segmentio/kafka-go"
	"syscall"
	"time"
)

func main() {
	ctx := context.Background()
	defer ctx.Done()

	// initialize consumer
	stream := oni.NewStream(kafka.ReaderConfig{
		Brokers: []string{
			"localhost:8097", // kafka brokers 1
			"localhost:8098", // kafka brokers 2
			"localhost:8099", // kafka brokers 3 you can only define one inside array
		},
		Topic: "foos",  // topic you want to listen at 
		GroupID: "consumer-group-foos",
	})
	foosConsumer := oni.NewConsumer(stream)
	foosConsumer.Handler(
		"create.foo", // event key you want to map to specific handler function
		func(ctx oni.Context) error {
			var foo model.Foo
			err := ctx.ShouldBindJSON(&foo) // bind message value to struct
			if err != nil {
				return err
			}
			fmt.Println(fmt.Sprintf("key=%s value=%s foo=%s", ctx.KeyString(), ctx.ValueString(), foo))
			return nil
		},
	)

	// initialize oni runner
	// to help start and graceful shutdown all producer and consumer you defined
	oniRunner := oni.Runner{
		Context: ctx,
		Timeout: 15 * time.Second,
		Syscall: oni.SyscallOpt(
			syscall.SIGINT,
			syscall.SIGTERM,
			syscall.SIGHUP,
		),
		Consumers: oni.ConsumerOpt(foosConsumer),
	}
	oniRunner.Start()
}

  1. Create package producer and create main.go file and place this code to it
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/your/projectname/model"
	"github.com/segmentio/kafka-go"
	"time"
)

func main() {
	foos := &kafka.Writer{
		Addr:  kafka.TCP("localhost:8097"), // kafka broker
		Topic: "foos",                      // target topic you want to send
	}

	// create json object using json.Marshal
	fooObj := model.Foo{
		FooContent: fmt.Sprintf("This is new foo %d", time.Now().Unix()),
	}
	fooByte, _ := json.Marshal(fooObj)

	err := foos.WriteMessages(context.Background(), kafka.Message{
		Key:   []byte("create.foo"), // target event you want to send at
		Value: fooByte,              // fooObj marshal result as the value
	})
	fmt.Println(err)
}

  1. Start run consumer/main.go and run producer/main.go separately
API Examples
Stream
  • oni.NewStream(cfg kafka.ReaderConfig)

    // example for oni.NewStream(cfg kafka.ReaderConfig)
    stream := oni.NewStream(kafka.ReaderConfig{
        Brokers: []string{"localhost:8097"},
        Topic: "foos",
        GroupID: "consumer-group-foos",
    })
    
  • end

Consumer
  • oni.NewConsumer(stream *oni.Stream) IConsumer

    // example for oni.NewConsumer(stream *oni.Stream)
    // default consume mode is implicit
    consumer := oni.NewConsumer(stream)
    
  • IConsumer.ErrorHandler(callbackFunc ErrorCallbackFunc)

    // set global error handler which will be invoked when oni.HandlerFunc returns error
    // this function should be called before handler creation and only called once 
    consumer.ErrorHandler(func (err error) {
        if err != nil {
            // do error handling logic such as logging
            // or handle special error cases
        }
    })
    
  • IConsumer.Implicit()

    // set consume mode to implicit which means every message
    // received by *oni.Stream will be automatically ack or committed
    // this function should be called before handler creation
    consumer.Implicit()
    
  • IConsumer.Explicit()

    // set consume mode to explicit which means every message 
    // received by *oni.Stream will be ack or committed manually using Context.Ack()
    // this function should be called before handler creation
    consumer.Explicit()
    
  • IConsumer.Group(keyGroup string) *Consumer

    // create new group of consumer key event prefix, for example `event.notification.blast`
    // could be had last suffix like `email.channel` and `sms.channel` so your next handler
    // should be had key event `email.channel` and `sms.channel` and actual handler event key 
    // for each handler should look like this
    // `event.notification.blast.email.channel`
    // `event.notification.blast.sms.channel`
    // and previous *oni.Stream behavior should inherit to new group 
    notificationBlastEvent := consumer.Group("event.notification.blast")
    notificationBlastEvent.Handler("email.channel", func (ctx oni.Context) error {})
    notificationBlastEvent.Handler("sms.channel", func (ctx oni.Context) error {})
    
  • IConsumer.Handler(key string, handlerFunc ...HandlerFunc)

    // create handler function for specific key event, for this example is `event.send.email`
    // message that produced to `event.send.email` key will be received by handler function
    // defined in this handler creation and message value and details will be propagated through
    // oni.Context
    consumer.Handler("event.send.email", func (ctx oni.Context) error {
        // put business logic here
        return nil
    })
    
  • IConsumer.Producer(name string, producerFunc ProducerFunc)

    // create producer that can be accessed by its name through oni.Context functions that
    // allows business logic to access producer by the name and use it for sending message
    // to targeted topic, can be defined multiple times and only created when producer get
    // called by oni.Context function and closed after sent the message  
    consumer.Producer("notification_producer", func() *kafka.Writer {
        // you can define using *kafka.Writer struct or you can use templates from oni
        // by returning this oni.BasicWriter(addr net.Addr, topic string) function.
        return &kafka.Writer{
            Addr:  kafka.TCP("localhost:8097"),
            Topic: "notification",
        }
    })
    
  • end

Context
  • Context.ShouldBindJSON(v interface{}) error

    func (ctx oni.Context) error {
        var foo model.Foo
    
        // binding received message value into model.Foo struct
        err := ctx.ShouldBindJSON(&foo)
        if err != nil {
            return err
        }
    
        return nil
    }
    
  • Context.ShouldRetryWith(producerFuncName string) error

    func (ctx oni.Context) error {
        // send back message to `retries` topic to be re-processed in side `main` topic  
        err := ctx.ShouldRetryWith("producer_name")
        if err != nil {
            return err
        }
    
        return nil
    }
    
  • Context.ShouldErrorWith(producerFuncName string) error

    func (ctx oni.Context) error {
        // send back message to `failures` topic to be marked as invalid request format 
        // or any system failures possible
        err := ctx.ShouldErrorWith("producer_name")
        if err != nil {
            return err
        }
    
        return nil
    }
    
  • Context.ShouldReturnWith(producerFuncName string) error

    func (ctx oni.Context) error {
        // send back message from `retries` topic to `main` topic to be re-processed
        // this function should be called only inside `retries` topic oni.HandlerFunc 
        err := ctx.ShouldReturnWith("producer_name")
        if err != nil {
            return err
        }
    
        return nil
    }
    
  • Context.Ack() error

    func (ctx oni.Context) error {
        // ack-knowledge or commit message, this function only valid when using explicit
        // consume mode because explicit mode doesn't automatically commit messages 
        err := ctx.Ack()
        if err != nil {
            return err
        }
    
        return nil
    }
    
  • Context.ValueBytes() []byte

    func (ctx oni.Context) error {
        // returns message value as []byte
        ctx.ValueBytes()
        return nil
    }
    
  • Context.ValueString() string

    func (ctx oni.Context) error {
        // returns message value as string
        ctx.ValueString()
        return nil
    }
    
  • Context.KeyBytes() []byte

    func (ctx oni.Context) error {
        // returns message key as []byte
        ctx.KeyBytes()
        return nil
    }
    
  • Context.KeyString() string

    func (ctx oni.Context) error {
        // returns message key as string
        ctx.KeyString()
        return nil
    }
    
  • Context.Message() kafka.Message

    func (ctx oni.Context) error {
        // returns all message details
        ctx.Message()
        return nil
    }
    
  • Context.ReaderStats() kafka.ReaderStats

    func (ctx oni.Context) error {
        // returns all reader stats
        ctx.ReaderStats()
        return nil
    }
    
  • Context.ReaderConfig() kafka.ReaderConfig

    func (ctx oni.Context) error {
        // returns all reader configurations
        ctx.ReaderConfig()
        return nil
    }
    
  • Context.GetProducer(producerFuncName string) *kafka.Writer

    func (ctx oni.Context) error {
        // return find producer using its name, registered by this function
        // IConsumer.Producer(name string, producerFunc ProducerFunc)
        // to be used for sending message to topic you want
        ctx.GetProducer("producer_name")
        return nil
    }
    
  • Context.OuterContext() context.Context

    func (ctx oni.Context) error {
        // return outer context that signed in the first creation of consumer
        // can be used for getting key-value data inside it
        ctx.OuterContext()
        return nil
    }
    
  • Context.FindKey(key string) interface{}

    func (ctx oni.Context) error {
        // find key inside outer context and return it as interface{}
        ctx.FindKey("context_key_name")
        return nil
    }
    
  • Context.CreateKeyVal(key string, val interface{})

    func (ctx oni.Context) error {
        // create key with its value into outer context
        ctx.CreateKeyVal("key_name", "this is value put whatever you want inside here")
        return nil
    }
    
  • end

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BasicWriter

func BasicWriter(addr net.Addr, topic string) *kafka.Writer

BasicWriter template message was sent directly to given topic name with default round-robin balancer and require ack 0 which means message do not wait for acknowledgement highly recommended for send basic message to topic

func BatchTimeoutWriter

func BatchTimeoutWriter(addr net.Addr, topic string, duration time.Duration) *kafka.Writer

BatchTimeoutWriter template message was sent will consume after duration defined with default round-robin balancer and require ack 0 which means message do not wait for acknowledgement highly recommended when for send message to retry topic to avoid fast ping-pong effect between main and retry topic

func SyscallOpt

func SyscallOpt(syscall ...os.Signal) []os.Signal

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func ConsumerOpt

func ConsumerOpt(consumers ...*Consumer) []*Consumer

func NewConsumer

func NewConsumer(stream *Stream) *Consumer

func (*Consumer) ErrorHandler

func (c *Consumer) ErrorHandler(callbackFunc ErrorCallbackFunc)

func (*Consumer) Explicit

func (c *Consumer) Explicit()

func (*Consumer) Group

func (c *Consumer) Group(keyGroup string) *Consumer

func (*Consumer) Handler

func (c *Consumer) Handler(key string, handlerFunc ...HandlerFunc)

func (*Consumer) Implicit

func (c *Consumer) Implicit()

func (*Consumer) Producer

func (c *Consumer) Producer(name string, producerFunc ProducerFunc)

type Context

type Context interface {
	ShouldBindJSON(v interface{}) error
	ShouldRetryWith(producerFuncName string) error
	ShouldErrorWith(producerFuncName string) error
	ShouldReturnWith(producerFuncName string) error

	Ack() error
	ValueBytes() []byte
	ValueString() string
	KeyBytes() []byte
	KeyString() string

	Message() kafka.Message
	ReaderStats() kafka.ReaderStats
	ReaderConfig() kafka.ReaderConfig
	GetProducer(producerFuncName string) *kafka.Writer

	OuterContext() context.Context
	FindKey(key string) interface{}
	CreateKeyVal(key string, val interface{})
}

type ErrorCallbackFunc

type ErrorCallbackFunc func(err error)

type HandlerFunc

type HandlerFunc func(ctx Context) error

type IConsumer

type IConsumer interface {
	Handler(key string, handlerFunc ...HandlerFunc)
	ErrorHandler(callbackFunc ErrorCallbackFunc)
	Producer(name string, producerFunc ProducerFunc)
	Group(keyGroup string) *Consumer

	Explicit()
	Implicit()
	// contains filtered or unexported methods
}

type IStream

type IStream interface {
	// contains filtered or unexported methods
}

type ProducerFunc

type ProducerFunc func() *kafka.Writer

type Runner

type Runner struct {
	Context   context.Context
	Timeout   time.Duration
	Syscall   []os.Signal
	Consumers []*Consumer
}

func (*Runner) Start

func (r *Runner) Start()

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func NewStream

func NewStream(config kafka.ReaderConfig) *Stream

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL