messenger

package
v0.0.0-...-9d1ce3d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2017 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package messenger is a generated protocol buffer package.

It is generated from these files:

message.proto

It has these top-level messages:

Message
Path

Index

Constants

This section is empty.

Variables

View Source
var Logger = logrus.New()

Logger logs but can be replaced

Functions

This section is empty.

Types

type KafkaMessenger

type KafkaMessenger struct {
	// contains filtered or unexported fields
}

KafkaMessenger implements Messenger using Kafka

func NewKafkaMessenger

func NewKafkaMessenger(broker string, config *KafkaMessengerConfig) (*KafkaMessenger, error)

NewKafkaMessenger returns a new KafkaMessenger

func (*KafkaMessenger) Acknowledge

func (km *KafkaMessenger) Acknowledge(message *Message) error

Acknowledge tells Kafka that the message has been received and processed

func (*KafkaMessenger) Close

func (km *KafkaMessenger) Close() error

Close stops the Kafka Messenger from sending and receiving messages

func (*KafkaMessenger) Receive

func (km *KafkaMessenger) Receive() <-chan *Message

Receive returns messages from Kafka

func (*KafkaMessenger) Send

func (km *KafkaMessenger) Send(topic string, message *Message) error

Send sends messages to Kafka

type KafkaMessengerConfig

type KafkaMessengerConfig struct {
	ConsumerGroup   string
	TopicsToConsume []string
}

type Message

type Message struct {
	Origin      *Path             `protobuf:"bytes,1,opt,name=origin" json:"origin,omitempty"`
	Return      *Path             `protobuf:"bytes,2,opt,name=return" json:"return,omitempty"`
	Destination *Path             `protobuf:"bytes,3,opt,name=destination" json:"destination,omitempty"`
	Payload     []byte            `protobuf:"bytes,4,opt,name=payload,proto3" json:"payload,omitempty"`
	Metadata    map[string][]byte `` /* 143-byte string literal not displayed */
}

Message is passed between connectors and Conduction A Message should not have origin AND destination. Origin will be in Messages from connectors to Conduction and destination will be from Conduction to connectors

func NewMessageFromSaramaConsumerMessage

func NewMessageFromSaramaConsumerMessage(consumerMessage *sarama.ConsumerMessage) (*Message, error)

NewMessageFromSaramaConsumerMessage returns new Message

func (*Message) ConvertToSaramaProducerMessage

func (m *Message) ConvertToSaramaProducerMessage(topic string) (*sarama.ProducerMessage, error)

ConvertToSaramaProducerMessage returns a Kafka Producer Message from Message

func (*Message) Descriptor

func (*Message) Descriptor() ([]byte, []int)

func (*Message) GetDestination

func (m *Message) GetDestination() *Path

func (*Message) GetMetadata

func (m *Message) GetMetadata() map[string][]byte

func (*Message) GetOrigin

func (m *Message) GetOrigin() *Path

func (*Message) GetPayload

func (m *Message) GetPayload() []byte

func (*Message) GetReturn

func (m *Message) GetReturn() *Path

func (*Message) ProtoMessage

func (*Message) ProtoMessage()

func (*Message) Reset

func (m *Message) Reset()

func (*Message) SetMetadataFromConsumerMessage

func (m *Message) SetMetadataFromConsumerMessage(consumerMessage *sarama.ConsumerMessage)

SetMetadataFromConsumerMessage sets the Kafka metadata like partiion, offset and topic into Message

func (*Message) String

func (m *Message) String() string

type Messenger

type Messenger interface {
	Send(topic string, message *Message) error
	Receive() <-chan *Message
	Acknowledge(message *Message) error
	Close() error
}

Messenger orchestrates communication between conduction modules

type Path

type Path struct {
	Route    string            `protobuf:"bytes,1,opt,name=route" json:"route,omitempty"`
	Type     string            `protobuf:"bytes,2,opt,name=type" json:"type,omitempty"`
	Identity string            `protobuf:"bytes,3,opt,name=identity" json:"identity,omitempty"`
	Metadata map[string][]byte `` /* 143-byte string literal not displayed */
}

func (*Path) Descriptor

func (*Path) Descriptor() ([]byte, []int)

func (*Path) GetIdentity

func (m *Path) GetIdentity() string

func (*Path) GetMetadata

func (m *Path) GetMetadata() map[string][]byte

func (*Path) GetRoute

func (m *Path) GetRoute() string

func (*Path) GetType

func (m *Path) GetType() string

func (*Path) ProtoMessage

func (*Path) ProtoMessage()

func (*Path) Reset

func (m *Path) Reset()

func (*Path) String

func (m *Path) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL