message

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2019 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// MsgPrefix is prefix keyword of message body
	MsgPrefix = "msg"
	// MsgMetaDataUUID is message metadata uuid keyword
	MsgMetaDataUUID = "uuid"
	// MsgMetaDataTS is message metadata timestamp keyword
	MsgMetaDataTS = "ts"
)

Variables

This section is empty.

Functions

func GetFuncName

func GetFuncName() string

GetFuncName get the function name of the calling function

Types

type Decoder

type Decoder interface {
	// DecodeMsg will decode the given message into out variable
	DecodeMsg(msg consumer.Message) (*Message, error)
}

Decoder is a interface that Kafka message decoders

func NewDefaultDecoder

func NewDefaultDecoder(
	jobConfig *rules.JobConfig, serviceConfig config.ServiceConfig) (decoder Decoder, err error)

NewDefaultDecoder will initialize the json decoder based on the job type

type JSONDecoder

type JSONDecoder struct{}

JSONDecoder is an implementation of Decoder interface for identity decoder

func (*JSONDecoder) DecodeMsg

func (j *JSONDecoder) DecodeMsg(msg consumer.Message) (*Message, error)

DecodeMsg will convert given JSON string to a map

type Message

type Message struct {
	// MsgInSubTS is the timestamp when the message is consumed by the subscriber
	MsgInSubTS time.Time
	// MsgMetaDataTS is defined in encoder/decoder metadata
	MsgMetaDataTS time.Time
	// RawMessage is encoded message
	RawMessage consumer.Message
	// DecodedMessage is decoded message
	DecodedMessage map[string]interface{}
}

Message contains raw message read from Kafka and the decoded message

type Parser

type Parser struct {
	// ServiceConfig is ares-subscriber configure
	ServiceConfig config.ServiceConfig
	// JobName is job name
	JobName string
	// Cluster is ares cluster name
	Cluster string
	// destinations each message will be parsed and written into
	Destination sink.Destination
	// Transformations are keyed on the output column name
	Transformations map[string]*rules.TransformationConfig
	// contains filtered or unexported fields
}

Parser holds all resources needed to parse one message into one or multiple row objects with respect to different destinations

func NewParser

func NewParser(jobConfig *rules.JobConfig, serviceConfig config.ServiceConfig) *Parser

NewParser will create a Parser for given JobConfig

func (*Parser) CheckPrimaryKeys

func (mp *Parser) CheckPrimaryKeys(destination sink.Destination, row client.Row) error

CheckPrimaryKeys returns error if the value of primary key column is nil

func (*Parser) CheckTimeColumnExistence

func (mp *Parser) CheckTimeColumnExistence(schema *metaCom.Table, columnDict map[string]int,
	destination sink.Destination, row client.Row) error

CheckTimeColumnExistence checks if time column is missing for fact table

func (*Parser) IsMessageValid

func (mp *Parser) IsMessageValid(msg map[string]interface{}, destination sink.Destination) error

IsMessageValid checks if the message is valid

func (*Parser) ParseMessage

func (mp *Parser) ParseMessage(msg map[string]interface{}, destination sink.Destination) (client.Row, error)

ParseMessage will parse given message to fit the destination

type StringMessage

type StringMessage struct {
	// contains filtered or unexported fields
}

stringMessage is an implementation of Message interface for testing.

func NewStringMessage

func NewStringMessage(topic, msg string) *StringMessage

func (*StringMessage) Ack

func (m *StringMessage) Ack()

func (*StringMessage) Cluster

func (m *StringMessage) Cluster() string

func (*StringMessage) Key

func (m *StringMessage) Key() []byte

func (*StringMessage) Nack

func (m *StringMessage) Nack()

func (*StringMessage) Offset

func (m *StringMessage) Offset() int64

func (*StringMessage) Partition

func (m *StringMessage) Partition() int32

func (*StringMessage) Topic

func (m *StringMessage) Topic() string

func (*StringMessage) Value

func (m *StringMessage) Value() []byte

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL