glog2kafka

package module
v0.0.0-...-8640c57 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 12, 2022 License: MIT Imports: 3 Imported by: 0

README

glog2kafka

Exporting logs to kafka

package main

import (
	"github.com/Shopify/sarama"
	log "github.com/ml444/glog"
	"github.com/ml444/glog/config"
	"github.com/ml444/glog2kafka"
)

func main() {
	cfg := sarama.NewConfig()
	endpoint, err := glog2kafka.NewKafkaEndpoint([]string{""}, "topic_name", cfg)
	if err != nil {
		return
	}
	err = log.InitLog(config.SetStreamer2Report(endpoint))
	if err != nil {
		return
	}
	defer log.Exit()

	log.Report("hello glog")
}

Modified sarama.Config

package main

import (
	"time"

	"github.com/Shopify/sarama"
	log "github.com/ml444/glog"
	"github.com/ml444/glog/config"
	"github.com/ml444/glog2kafka"
)

func GetKafkaConfig() *sarama.Config {
	cfg := sarama.NewConfig()
	cfg.ClientID = "glog2kafka"
	cfg.ChannelBufferSize = 1024

	//cfg.Producer.MaxMessageBytes = 1024 * 1024 * 5
	cfg.Producer.Return.Errors = true
	cfg.Producer.Flush.Bytes = 1024 * 1024 * 2
	cfg.Producer.Flush.Messages = 100
	cfg.Producer.Flush.MaxMessages = 1024
	cfg.Producer.Flush.Frequency = time.Millisecond * 100
	cfg.Producer.Retry.Max = 3
	return cfg
}

func main() {
	cfg := GetKafkaConfig()
	endpoint, err := glog2kafka.NewKafkaEndpoint([]string{""}, "topic_name", cfg)
	if err != nil {
		return
	}
	err = log.InitLog(config.SetStreamer2Report(endpoint))
	if err != nil {
		return
	}
	defer log.Exit()

	log.Report("hello glog")
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type KafkaEndPoint

type KafkaEndPoint struct {
	Cfg *sarama.Config
	// contains filtered or unexported fields
}

func NewKafkaEndpoint

func NewKafkaEndpoint(addresses []string, topic string, cfg *sarama.Config) (*KafkaEndPoint, error)

func (*KafkaEndPoint) Close

func (p *KafkaEndPoint) Close()

func (*KafkaEndPoint) Init

func (p *KafkaEndPoint) Init() error

func (*KafkaEndPoint) Ping

func (p *KafkaEndPoint) Ping() error

func (*KafkaEndPoint) Write

func (p *KafkaEndPoint) Write(msgByte []byte) (n int, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL