connector

package
v0.0.0-...-ad3276f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2022 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ConfigurationError = ConnectorErrorType("config")
View Source
var TimeoutError = ConnectorErrorType("timeout")
View Source
var ValidationError = ConnectorErrorType("validation")

Functions

func IsErrorOfType

func IsErrorOfType(errType string, err error) bool

func NewMessageID

func NewMessageID(prefix string, byteSize int) string

Types

type Connector

type Connector interface {
	Send(message *Message, opts *SendOptions) error
	RequestReply(request *Message, opts *SendOptions, then ReplyHandler) error
	Close() error
}

func CreateKafkaConnector

func CreateKafkaConnector(config *kbridge.Config) (Connector, error)

type ConnectorError

type ConnectorError func(message string) error

func ConnectorErrorType

func ConnectorErrorType(errorType string) ConnectorError

type JSONSerializer

type JSONSerializer struct{}

func (*JSONSerializer) Serialize

func (js *JSONSerializer) Serialize(msg *Message) ([]byte, error)

type KafkaConnector

type KafkaConnector struct {
	// contains filtered or unexported fields
}

func (*KafkaConnector) Close

func (k *KafkaConnector) Close() error

func (*KafkaConnector) RequestReply

func (k *KafkaConnector) RequestReply(request *Message, opts *SendOptions, then ReplyHandler) error

func (*KafkaConnector) Send

func (k *KafkaConnector) Send(message *Message, opts *SendOptions) error

func (*KafkaConnector) SetUp

func (k *KafkaConnector) SetUp()

type Message

type Message struct {
	ID         string
	Type       string
	Port       string
	Path       string
	Variables  map[string]string
	Parameters map[string][]string
	Headers    map[string]string
	Payload    []byte
}

func (*Message) Validate

func (m *Message) Validate() error

type MessageHeaders

type MessageHeaders map[string]interface{}

func (MessageHeaders) GetString

func (h MessageHeaders) GetString(key string) string

type MessageSerializer

type MessageSerializer interface {
	Serialize(msg *Message) ([]byte, error)
}

type ReplyHandler

type ReplyHandler func(reply []byte, headers MessageHeaders, err error)

type SendOptions

type SendOptions struct {
	Topic          string
	Partition      int
	ReplyTopic     string
	ReplyPartition int
	Passthrough    bool
}

type SerializersRegistry

type SerializersRegistry struct {
	// contains filtered or unexported fields
}

func NewSerializerRegistry

func NewSerializerRegistry() *SerializersRegistry

func (*SerializersRegistry) GetMessageTypes

func (r *SerializersRegistry) GetMessageTypes() []string

func (*SerializersRegistry) GetSerializer

func (r *SerializersRegistry) GetSerializer(messageType string) (MessageSerializer, error)

func (*SerializersRegistry) Register

func (r *SerializersRegistry) Register(messageType string, serializer MessageSerializer)

type YAMLSerializer

type YAMLSerializer struct{}

func (*YAMLSerializer) Serialize

func (ys *YAMLSerializer) Serialize(msg *Message) ([]byte, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL