server

package
v0.0.0-...-c9cb8e9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2023 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EXT_IP              = "ip"
	EXT_UA              = "ua"
	EXT_REQUEST_ID      = "rid"
	EXT_COOKIE          = "ck"
	EXT_EVENT_TIMESTAMP = "ts"
)

options for kafka ext fields

View Source
const (
	KafkaBatchSize = 1 << 5
)

Variables

View Source
var KafkaWriter *kafka.Writer

Functions

func BufMsg

func BufMsg(msgs ...*kafka.Message)

func CacheMsg

func CacheMsg(ctx context.Context)

CacheMsg cache msgs to local files

func GenMessage

func GenMessage(topic string, data *map[string]interface{}) (*kafka.Message, error)

func InitKafka

func InitKafka(ctx context.Context)

func MustLoadConfig

func MustLoadConfig(file *string)

func ResendMsg

func ResendMsg(ctx context.Context)

ResendMsg resends cached msgs to kafka

func RushStageData

func RushStageData(ctx context.Context)

RushStageData rush to save data to local file before dying

func SendMsg

func SendMsg(ctx context.Context)

SendMsg sends msgs to kafka

func Sentinel

func Sentinel(ctx context.Context)

Sentinel monitor kafka status and provides advice

Types

type Config

type Config struct {
	Server ServerConfig  `yaml:"server"`
	Kafka  KafkaConfig   `yaml:"kafka"`
	Data   DataConfig    `yaml:"data"`
	Log    TLoggerConfig `yaml:"log"`
}
var GlbConfig Config

type DataConfig

type DataConfig struct {
	DataDir string `yaml:"data_dir"` // cache data directory
}

type Direction

type Direction int8

type FieldsSetter

type FieldsSetter func(ctx context.Context) []zap.Field

type FileCache

type FileCache struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewCache

func NewCache() (*FileCache, error)

func (*FileCache) Clear

func (c *FileCache) Clear() error

func (*FileCache) FlushCache

func (c *FileCache) FlushCache(maxSize int) error

FlushCache flush cache data into cache file

func (*FileCache) Lock

func (c *FileCache) Lock(tag string)

Lock lock cache file

func (*FileCache) Unlock

func (c *FileCache) Unlock() error

Unlock unlock cache file

func (*FileCache) WriteCache

func (c *FileCache) WriteCache(data []byte)

WriteCache continuously write cache data

func (*FileCache) WriteToKafka

func (c *FileCache) WriteToKafka(ctx context.Context) error

WriteToKafka read cache data, decode kafka msgs and write to kafka

type GrpcConfig

type GrpcConfig struct {
	Addr string `yaml:"addr"` // grpc server listening address,eg: 0.0.0.0:8000
}

type HttpConfig

type HttpConfig struct {
	Addr string `yaml:"addr"` // http server listening address,eg: 0.0.0.0:8000
}

type KafkaConfig

type KafkaConfig struct {
	Broker       []string `yaml:"broker"`        // broker addresses
	PartitionCnt int      `yaml:"partition_cnt"` // topic partition count
	AckPolicy    int      `yaml:"ack_policy"`    // acknowledgement policy for writing msg successfully
	WriteTimeout int      `yaml:"write_timeout"` // write timeout, million second unit
}

type LogConfig

type LogConfig struct {
	LogDir   string `yaml:"log_dir"`
	LogLevel string `yaml:"log_level"`
}

type ServerConfig

type ServerConfig struct {
	Http                 HttpConfig `yaml:"http"`
	Grpc                 GrpcConfig `yaml:"grpc"`
	ExtFields            []string   `yaml:"ext_fields"`      // ext fields to patch into kafka data
	TopicFile            string     `yaml:"topic_whitelist"` // topic whitelist file
	EnableTopicWhitelist bool
	TopicWhitelist       *goset.Set
}

type TLogger

type TLogger struct {
	*zap.Logger
	// contains filtered or unexported fields
}
var ServerLogger *TLogger

func NewLogger

func NewLogger(cfg *TLoggerConfig) (*TLogger, error)

func (*TLogger) AddFieldsSetter

func (l *TLogger) AddFieldsSetter(s FieldsSetter)

func (*TLogger) Debugf

func (l *TLogger) Debugf(ctx context.Context, msg string, v ...interface{})

func (*TLogger) Errorf

func (l *TLogger) Errorf(ctx context.Context, err error, msg string, v ...interface{})

func (*TLogger) Fatalf

func (l *TLogger) Fatalf(ctx context.Context, err error, msg string, v ...interface{})

func (*TLogger) Infof

func (l *TLogger) Infof(ctx context.Context, msg string, v ...interface{})

func (*TLogger) Panicf

func (l *TLogger) Panicf(ctx context.Context, err error, msg string, v ...interface{})

func (*TLogger) Warnf

func (l *TLogger) Warnf(ctx context.Context, msg string, v ...interface{})

type TLoggerConfig

type TLoggerConfig struct {
	// Dir is the directory where log files are stored
	Dir string `yaml:"dir"`
	// FileName is the log file name
	FileName string `yaml:"file_name"`
	// Level is the min log level recorded in log file
	Level string `yaml:"level"`
	// MaxSize is the maximum size in megabytes of the log file before it gets rotated.
	MaxSize int `yaml:"max_size"`
	// MaxAge is the maximum number of days to retain old log files based on the
	// timestamp encoded in their filename.  Note that a day is defined as 24
	// hours and may not exactly correspond to calendar days due to daylight
	// savings, leap seconds, etc. The default is not to remove old log files
	// based on age.
	MaxDays int `yaml:"max_days"`
	// MaxBackups is the maximum number of old log files to retain.  The default
	// is to retain all old log files (though MaxAge may still cause them to get
	// deleted.)
	MaxBackups int `yaml:"max_backups"`
	// Compress determines if the rotated log files should be compressed
	// using gzip. The default is not to perform compression.
	Compress bool `yaml:"compress"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL