connector

package
v4.6.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: MIT Imports: 24 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ReloaderComponentNotFoundErr = errors.New("component not found")

Functions

func ApplyFieldForceUpdate added in v4.2.9

func ApplyFieldForceUpdate(fieldUpdate []string, enricherSource map[string]interface{}, outputSource map[string]interface{})

ApplyFieldForceUpdate applies FieldForceUpdate merging configuration on input documents

func ApplyFieldKeepEarliest added in v4.2.9

func ApplyFieldKeepEarliest(fieldKeepEarliest []string, enricherSource map[string]interface{}, outputSource map[string]interface{})

ApplyFieldKeepEarliest applies all FieldKeepEarliest merging configuration on input documents

func ApplyFieldKeepLatest added in v4.2.9

func ApplyFieldKeepLatest(fieldKeepLatest []string, enricherSource map[string]interface{}, outputSource map[string]interface{})

ApplyFieldKeepLatest applies all FieldKeepLatest merging configuration on input documents

func ApplyFieldMath added in v4.2.9

func ApplyFieldMath(config []FieldMath, newDoc *models.Document, existingDoc *models.Document, outputSource map[string]interface{})

ApplyFieldMath applies all FieldMath merging configuration on input documents

func ApplyFieldMerge added in v4.2.9

func ApplyFieldMerge(fieldMerge []string, enricherSource map[string]interface{}, outputSource map[string]interface{})

ApplyFieldMerge applies all FieldReplace merging configuration on input documents

func ApplyFieldReplace added in v4.2.9

func ApplyFieldReplace(fieldReplace []string, enricherSource map[string]interface{}, outputSource map[string]interface{})

ApplyFieldReplace applies all FieldReplace merging configuration on input documents

func ApplyFieldReplaceIfMissing added in v4.2.9

func ApplyFieldReplaceIfMissing(fieldReplace []string, enricherSource map[string]interface{}, outputSource map[string]interface{})

ApplyFieldReplaceIfMissing applies all FieldReplace merging configuration on input documents

func DoubleUnescapeUnicode

func DoubleUnescapeUnicode(s string) ([]byte, error)

DoubleUnescapeUnicode is a special function to extract avro binaries from a JSON encoded string This function has been built for a very specific usage and may not works on other messages

func FilterHeaders added in v4.2.1

func FilterHeaders(filters []FilterHeaderOption, headers []*sarama.RecordHeader) (bool, string)

func LookupNestedMap added in v4.5.7

func LookupNestedMap(pathParts []string, data interface{}) (interface{}, bool)

LookupNestedMap lookup for a value corresponding to the exact specified path inside a map

func LookupNestedMapFullPaths added in v4.5.7

func LookupNestedMapFullPaths(data interface{}, paths [][]string, separator string) (interface{}, bool)

LookupNestedMapFullPaths Looks searches value of all paths in data and concatenates them with a separator

Types

type AvroToJSONTransformer

type AvroToJSONTransformer struct {
	// contains filtered or unexported fields
}

AvroToJSONTransformer :

func NewAvroToJSONTransformer

func NewAvroToJSONTransformer(schemaRegistryEndpoint string, ttlCacheDuration time.Duration) (*AvroToJSONTransformer, error)

New transformer constructor TODO : Manage multiple schemaRegistryEndpoint ? In case of server failure ?

func (AvroToJSONTransformer) AvroBinaryToNative

func (transformer AvroToJSONTransformer) AvroBinaryToNative(avroBinary []byte) (interface{}, error)

AvroBinaryToNative :

func (AvroToJSONTransformer) AvroBinaryToTextual

func (transformer AvroToJSONTransformer) AvroBinaryToTextual(avroBinary []byte) ([]byte, error)

AvroBinaryToTextual :

func (AvroToJSONTransformer) Transform

func (transformer AvroToJSONTransformer) Transform(msg Message) (Message, error)

Transform is the convertor transformer, it has to decode the AVRO message into a byte message (JSONMessage)

type BatchSink

type BatchSink struct {
	TargetURL    string
	Send         chan Message
	Client       *retryablehttp.Client
	BufferSize   int
	FlushTimeout time.Duration
	FormatToBIRs FormatToBIRs // TODO: Change to be more generic ? (sending []byte or interface{})
	DryRun       bool
}

BatchSink ..

func NewBatchSink

func NewBatchSink(targetURL string, client *retryablehttp.Client, bufferSize int, flushTimeout time.Duration,
	formatToBIRs FormatToBIRs, dryRun bool) *BatchSink

NewBatchSink constructor for BatchSink

func (*BatchSink) AddMessageToQueue

func (sink *BatchSink) AddMessageToQueue(message Message)

func (*BatchSink) SendToIngester

func (sink *BatchSink) SendToIngester(ctx context.Context, bir *BulkIngestRequest) error

func (*BatchSink) Sender

func (sink *BatchSink) Sender(ctx context.Context)

type BulkIngestRequest

type BulkIngestRequest struct {
	UUID         string     `json:"uuid"`
	DocumentType string     `json:"documentType"`
	MergeConfig  []Config   `json:"merge"`
	Docs         []Document `json:"docs"`
}

type Config

type Config struct {
	Mode             Mode    `json:"mode"`
	ExistingAsMaster bool    `json:"existingAsMaster"`
	Type             string  `json:"type,omitempty"`
	LinkKey          string  `json:"linkKey,omitempty"`
	Groups           []Group `json:"groups,omitempty"`
}

Config wraps all rules for document merging

func (*Config) Apply added in v4.2.9

func (config *Config) Apply(newDoc *models.Document, existingDoc *models.Document) *models.Document

Apply returns a pre-build merge function, configured with a specific merge config Merge is done in the following order : FieldMath, FieldReplace, FieldMerge

type ConsumerProcessor added in v4.2.0

type ConsumerProcessor interface {
	Process(message *sarama.ConsumerMessage)
}

type DecodedKafkaMessage added in v4.5.6

type DecodedKafkaMessage struct {
	Data map[string]interface{}
}

DecodedKafkaMessage holds a json decoded version of the kafka message It can be used to avoid decoding data multiple time (which is very consuming)

func (DecodedKafkaMessage) String added in v4.5.6

func (msg DecodedKafkaMessage) String() string

type DefaultConsumer added in v4.2.0

type DefaultConsumer struct {
	Ready chan bool
	// contains filtered or unexported fields
}

DefaultConsumer represents a Sarama consumer group consumer

func NewDefaultConsumer added in v4.2.0

func NewDefaultConsumer(processor ConsumerProcessor) DefaultConsumer

func (*DefaultConsumer) Cleanup added in v4.2.0

func (consumer *DefaultConsumer) Cleanup(sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*DefaultConsumer) ConsumeClaim added in v4.2.0

func (consumer *DefaultConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*DefaultConsumer) Setup added in v4.2.0

Setup is run at the beginning of a new session, before ConsumeClaim

type DefaultMultiConsumer added in v4.2.0

type DefaultMultiConsumer struct {
	Ready chan bool
	// contains filtered or unexported fields
}

DefaultMultiConsumer represents a Sarama consumer group consumer

func NewDefaultMultiConsumer added in v4.2.0

func NewDefaultMultiConsumer(processors map[string]ConsumerProcessor) DefaultMultiConsumer

func (*DefaultMultiConsumer) Cleanup added in v4.2.0

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*DefaultMultiConsumer) ConsumeClaim added in v4.2.0

func (consumer *DefaultMultiConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*DefaultMultiConsumer) Setup added in v4.2.0

Setup is run at the beginning of a new session, before ConsumeClaim

type Document

type Document struct {
	ID        string                 `json:"id"`
	Index     string                 `json:"index"`
	IndexType string                 `json:"type"`
	Source    map[string]interface{} `json:"source"`
}

Document represent an es document

type FieldMath

type FieldMath struct {
	Expression  string `json:"expression"`
	OutputField string `json:"outputField"`
}

FieldMath specify a merge rule using a math expression

type FilterHeaderOption added in v4.2.1

type FilterHeaderOption struct {
	Key       string
	Value     string
	Values    []string
	Condition string
}

type FilteredJsonMessage

type FilteredJsonMessage struct {
	Data map[string]interface{}
}

FilteredJsonMessage output once we've filtered the myrtea fields from the kafka messages

func (FilteredJsonMessage) String

func (msg FilteredJsonMessage) String() string

type FormatToBIRs added in v4.1.6

type FormatToBIRs func([]Message) []*BulkIngestRequest

type Group

type Group struct {
	Condition             string      `json:"condition,omitempty"`
	FieldReplace          []string    `json:"fieldReplace,omitempty"`
	FieldReplaceIfMissing []string    `json:"fieldReplaceIfMissing,omitempty"`
	FieldMerge            []string    `json:"fieldMerge,omitempty"`
	FieldMath             []FieldMath `json:"fieldMath,omitempty"`
	FieldKeepLatest       []string    `json:"fieldKeepLatest,omitempty"`
	FieldKeepEarliest     []string    `json:"fieldKeepEarliest,omitempty"`
	FieldForceUpdate      []string    `json:"fieldForceUpdate,omitempty"`
}

Group allows to group un set of merge fields and to define an optional condition to applay the merge fields

type JSONMapper

type JSONMapper struct {
	// contains filtered or unexported fields
}

JSONMapper :

func NewJSONMapper

func NewJSONMapper(name, path string) (*JSONMapper, error)

NewJSONMapper :

func (JSONMapper) DecodeDocument added in v4.5.6

func (mapper JSONMapper) DecodeDocument(msg Message) (Message, error)

DecodeDocument not implemented here, it only uses msg

func (JSONMapper) FilterDocument

func (mapper JSONMapper) FilterDocument(msg Message) (bool, string)

func (JSONMapper) MapToDocument

func (mapper JSONMapper) MapToDocument(msg Message) (Message, error)

MapAvroToDocument :

type JSONMapperConfigItem

type JSONMapperConfigItem struct {
	Mandatory    bool
	FieldType    string
	DefaultValue interface{}
	DateFormat   string
	Paths        [][]string
	OtherPaths   [][]string
	ArrayPath    []string
	Separator    string
}

JSONMapperConfigItem :

type JSONMapperFilterItem

type JSONMapperFilterItem struct {
	Keep         bool
	FieldType    string
	DefaultValue string
	Paths        [][]string
	Condition    string
	Value        string
	Values       []string
	Separator    string
}

type JSONMapperJsoniter added in v4.2.4

type JSONMapperJsoniter struct {
	// contains filtered or unexported fields
}

JSONMapperJsoniter :

func NewJSONMapperJsoniter added in v4.2.4

func NewJSONMapperJsoniter(name, path string) (*JSONMapperJsoniter, error)

NewJSONMapperJsoniter :

func (JSONMapperJsoniter) DecodeDocument added in v4.5.6

func (mapper JSONMapperJsoniter) DecodeDocument(msg Message) (Message, error)

DecodeDocument returns a DecodedKafkaMessage and contains a map with json decoded data

func (JSONMapperJsoniter) FilterDocument added in v4.2.4

func (mapper JSONMapperJsoniter) FilterDocument(msg Message) (bool, string)

FilterDocument checks if document is filtered or not, returns if documents valid and if invalid, the following reason

func (JSONMapperJsoniter) MapToDocument added in v4.2.4

func (mapper JSONMapperJsoniter) MapToDocument(msg Message) (Message, error)

MapToDocument Maps data to document

type KafkaMessage

type KafkaMessage struct {
	Data []byte
}

KafkaMessage ...

func (KafkaMessage) GetData

func (kMessage KafkaMessage) GetData() []byte

GetData Getter for the message data

func (KafkaMessage) String

func (kMessage KafkaMessage) String() string

type Lookup

type Lookup func(path string, value string, index string) ([]string, error)

type Mapper

type Mapper interface {
	// FilterDocument checks if document is filtered or not, returns if documents valid and if invalid,
	// the following reason. It is using the given filters.
	FilterDocument(msg Message) (bool, string)

	// MapToDocument Maps data to document
	MapToDocument(msg Message) (Message, error)

	// DecodeDocument is a function that just decodes a document and returns it
	// You can use it if you want to decode a message only "once" and not in each
	// individual function
	DecodeDocument(msg Message) (Message, error)
}

type Message

type Message interface {
	String() string
}

Message ...

type MessageWithOptions

type MessageWithOptions struct {
	Data    map[string]interface{}
	Options map[string]interface{}
}

MessageWithOptions output once we've filtered the myrtea fields from the kafka messages

func (MessageWithOptions) String

func (msg MessageWithOptions) String() string

type Mode

type Mode int

Mode ...

const (
	// Self ...
	Self Mode = iota + 1
	// EnrichTo ...
	EnrichTo
	// EnrichFrom ...
	EnrichFrom
)

func (Mode) MarshalJSON

func (s Mode) MarshalJSON() ([]byte, error)

MarshalJSON marshals the enum as a quoted json string

func (Mode) String

func (s Mode) String() string

func (*Mode) UnmarshalJSON

func (s *Mode) UnmarshalJSON(b []byte) error

UnmarshalJSON unmashals a quoted json string to the enum value

type Processor

type Processor interface {
	Process(msg Message) ([]Message, error)
}

Processor is a general interface to processor message

type Reloader added in v4.4.9

type Reloader struct {
	// contains filtered or unexported fields
}

func NewReloader added in v4.4.9

func NewReloader(action func(string) error, cooldown time.Duration, apiKey string) *Reloader

func (*Reloader) BindEndpoint added in v4.5.4

func (re *Reloader) BindEndpoint(rg chi.Router)

BindEndpoint Binds the reload endpoint to a existing router

type Restarter added in v4.5.0

type Restarter struct {
	// contains filtered or unexported fields
}

func NewRestarter added in v4.5.0

func NewRestarter(doneChan chan os.Signal, apiKey string) *Restarter

func (*Restarter) BindEndpoint added in v4.5.4

func (re *Restarter) BindEndpoint(rg chi.Router)

BindEndpoint Binds the restart endpoint to an existing router

type SaramaDebugLogger added in v4.2.0

type SaramaDebugLogger struct {
	// contains filtered or unexported fields
}

SaramaDebugLogger wraps zap.Logger with the sarama.StdLogger interface

func NewSaramaDebugLogger added in v4.2.0

func NewSaramaDebugLogger(zl *zap.Logger) *SaramaDebugLogger

NewSaramaLogger initializes a new SaramaDebugLogger with a zap.Logger

func (*SaramaDebugLogger) Print added in v4.2.0

func (s *SaramaDebugLogger) Print(v ...interface{})

func (*SaramaDebugLogger) Printf added in v4.2.0

func (s *SaramaDebugLogger) Printf(format string, v ...interface{})

func (*SaramaDebugLogger) Println added in v4.2.0

func (s *SaramaDebugLogger) Println(v ...interface{})

type SaramaLogger added in v4.2.0

type SaramaLogger struct {
	// contains filtered or unexported fields
}

SaramaLogger wraps zap.Logger with the sarama.StdLogger interface

func NewSaramaLogger added in v4.2.0

func NewSaramaLogger(zl *zap.Logger) *SaramaLogger

NewSaramaLogger initializes a new SaramaLogger with a zap.Logger

func (*SaramaLogger) Print added in v4.2.0

func (s *SaramaLogger) Print(v ...interface{})

func (*SaramaLogger) Printf added in v4.2.0

func (s *SaramaLogger) Printf(format string, v ...interface{})

func (*SaramaLogger) Println added in v4.2.0

func (s *SaramaLogger) Println(v ...interface{})

type Sink

type Sink interface {
	AddMessageToQueue(message Message)
}

type Transformer

type Transformer interface {
	Transform(Message) (Message, error)
}

Transformer ..

type TypedDataMessage added in v4.2.4

type TypedDataMessage struct {
	Strings map[string]string
	Ints    map[string]int64
	Bools   map[string]bool
	Times   map[string]time.Time
}

func (TypedDataMessage) String added in v4.2.4

func (m TypedDataMessage) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL