recvs

package
v1.13.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2020 License: MIT Imports: 23 Imported by: 0

Documentation

Overview

Package recvs defines different kind of receivers.

recvs are components applied in acceptor. Each recv can receiving specific kind of messages. All recv should satisfy `libs.AcceptorRecvItf`.

Index

Constants

View Source
const (
	// RandomValOperator set this val in meta will replaced by random string
	RandomValOperator = "@RANDOM_STRING"
)

Variables

This section is empty.

Functions

func GetKafkaRewriteTag

func GetKafkaRewriteTag(rewriteTag, env string) string

func NewRsyslogSrv

func NewRsyslogSrv(addr string) (*syslog.Server, syslog.LogPartsChannel, error)

Types

type AcceptorRecvItf

type AcceptorRecvItf interface {
	SetSyncOutChan(chan<- *libs.FluentMsg)
	SetAsyncOutChan(chan<- *libs.FluentMsg)
	SetMsgPool(*sync.Pool)
	SetCounter(libs.CounterIft)
	Run(context.Context)
	GetName() string
}

type BaseRecv

type BaseRecv struct {
	// contains filtered or unexported fields
}

func (*BaseRecv) SetAsyncOutChan

func (r *BaseRecv) SetAsyncOutChan(outchan chan<- *libs.FluentMsg)

func (*BaseRecv) SetCounter

func (r *BaseRecv) SetCounter(counter libs.CounterIft)

func (*BaseRecv) SetMsgPool

func (r *BaseRecv) SetMsgPool(msgPool *sync.Pool)

func (*BaseRecv) SetSyncOutChan

func (r *BaseRecv) SetSyncOutChan(outchan chan<- *libs.FluentMsg)

type FluentdRecv

type FluentdRecv struct {
	*BaseRecv
	*FluentdRecvCfg
	// contains filtered or unexported fields
}

FluentdRecv recv for fluentd format

func NewFluentdRecv

func NewFluentdRecv(cfg *FluentdRecvCfg) (r *FluentdRecv)

NewFluentdRecv create new FluentdRecv

func (*FluentdRecv) GetName

func (r *FluentdRecv) GetName() string

GetName return the name of this recv

func (*FluentdRecv) ProcessMsg added in v1.12.0

func (r *FluentdRecv) ProcessMsg(msg *libs.FluentMsg)

ProcessMsg process msg

func (*FluentdRecv) Run

func (r *FluentdRecv) Run(ctx context.Context)

Run starting this recv

func (*FluentdRecv) SendMsg

func (r *FluentdRecv) SendMsg(msg *libs.FluentMsg)

SendMsg put msg into downstream

type FluentdRecvCfg

type FluentdRecvCfg struct {
	Name,

	Addr,

	TagKey,

	LBKey string

	// NFork fork concators
	NFork,
	ConcatorBufSize int
	ConcatorWait time.Duration

	// if IsRewriteTagFromTagKey, set `msg.Tag = msg.Message[OriginRewriteTagKey]`
	IsRewriteTagFromTagKey bool
	OriginRewriteTagKey    string

	ConcatMaxLen int
	ConcatCfg    map[string]interface{}
}

FluentdRecvCfg configuration of FluentdRecv

type HTTPRecv

type HTTPRecv struct {
	*BaseRecv
	*HTTPRecvCfg
}

HTTPRecv recv for HTTP

func NewHTTPRecv

func NewHTTPRecv(cfg *HTTPRecvCfg) *HTTPRecv

NewHTTPRecv return new HTTPRecv

func (*HTTPRecv) BadRequest

func (r *HTTPRecv) BadRequest(ctx *gin.Context, msg string)

BadRequest set bad http response

func (*HTTPRecv) GetName

func (r *HTTPRecv) GetName() string

GetName get current HTTPRecv instance's name

func (*HTTPRecv) HTTPLogHandler

func (r *HTTPRecv) HTTPLogHandler(ctx *gin.Context)

HTTPLogHandler process log received by HTTP

func (*HTTPRecv) Run

func (r *HTTPRecv) Run(ctx context.Context)

Run useless, just capatable for RecvItf

type HTTPRecvCfg

type HTTPRecvCfg struct {
	HTTPSrv     *gin.Engine
	MaxBodySize int64
	// Name: recv name
	// Path: url endpoint
	Name, Path, Env string

	// Tag: set `msg.Tag = Tag`
	// OrigTag & TagKey: set `msg.Message[TagKey] = OrigTag`
	OrigTag, Tag, TagKey, MsgKey string

	// TSRegexp: validate time string
	// TimeKey: load time string from `msg.Message[TimeKey].(string)`
	// TimeFormat: `time.Parse(ts, TimeFormat)`
	TimeKey, TimeFormat string
	TSRegexp            *regexp.Regexp

	// SigKey: load signature from `msg.Message[SigKey].([]byte)`
	// SigSalt: calculate signature by `md5(ts + SigSalt)`
	SigKey  string
	SigSalt []byte

	MaxAllowedDelaySec, MaxAllowedAheadSec time.Duration
}

HTTPRecvCfg is the configuration for HTTPRecv

type KafkaCfg

type KafkaCfg struct {
	KafkaCommitCfg
	Topics, Brokers                  []string
	Group, Tag, MsgKey, TagKey, Name string
	NConsumer                        int
	KMsgPool                         *sync.Pool
	Meta                             map[string]interface{}
	IsJSONFormat                     bool
	JSONTagKey                       string
	RewriteTag                       string
	ReconnectInterval                time.Duration
}

KafkaCfg kafka client configuration

Args:

IsJSONFormat: unmarshal json into `msg.Message`
MsgKey: put kafka msg body into `msg.Message[MsgKey]`
TagKey: set tag into `msg.Message[TagKey]`
Name: name of this recv plugin
KMsgPool: sync.Pool for `*utils.kafka.KafkaMsg`
Meta: add new field and value into `msg.Message`
JSONTagKey: load tag from kafka message(only work when IsJSONFormat is true)
RewriteTag: rewrite `msg.Tag`, `msg.Message["tag"]` will keep origin value
ReconnectInterval: restart consumer periodically

type KafkaCommitCfg

type KafkaCommitCfg struct {
	IntervalNum      int
	IntervalDuration time.Duration
}

type KafkaRecv

type KafkaRecv struct {
	BaseRecv
	*KafkaCfg
}

func NewKafkaRecv

func NewKafkaRecv(cfg *KafkaCfg) *KafkaRecv

func (*KafkaRecv) GetName

func (r *KafkaRecv) GetName() string

func (*KafkaRecv) Run

func (r *KafkaRecv) Run(ctx context.Context)

type PendingMsg added in v1.10.4

type PendingMsg struct {
	// contains filtered or unexported fields
}

PendingMsg is the message wait tobe concatenate

type RsyslogCfg

type RsyslogCfg struct {
	RewriteTags map[string]string
	TimeShift   time.Duration
	Name, Addr, TagKey, MsgKey,
	Tag,
	NewTimeFormat, TimeKey, NewTimeKey string
}

type RsyslogRecv

type RsyslogRecv struct {
	*BaseRecv
	*RsyslogCfg
}

RsyslogRecv

func NewRsyslogRecv

func NewRsyslogRecv(cfg *RsyslogCfg) *RsyslogRecv

func (*RsyslogRecv) GetName

func (r *RsyslogRecv) GetName() string

func (*RsyslogRecv) Run

func (r *RsyslogRecv) Run(ctx context.Context)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL