datacommunicator

package
v0.0.0-...-81c02da Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2023 License: Apache-2.0 Imports: 18 Imported by: 12

Documentation

Overview

Package datacommunicator ...

Package datacommunicator ...

Index

Constants

View Source
const (
	KAFKA                = "Kafka"        // KAFKA as Messaging Platform, Please use this ID
	REDISSTREAMS         = "RedisStreams" // REDISSTREAMS as Messaging Platform
	EVENTREADERGROUPNAME = "eventreaders_grp"
)

BrokerType defines the underline MQ platform to be selected for the messages. KAFKA and RedisStremas platforms are supported.

View Source
const (
	// DefaultTLSMinVersion is default minimum version for tls
	DefaultTLSMinVersion = tls.VersionTLS12
	// TimeoutErrMsg is the connection time out error message
	TimeoutErrMsg string = " connection timed out"
)

Variables

This section is empty.

Functions

func CloseAll

func CloseAll()

CloseAll will disconnect KAFKA Connection. This API should be called when client is completely closing Kafka connection, both Reader and Writer objects. We don't close just one channel subscription using this API. For that we would be have different APIs defined, called "Remove".

func Decode

func Decode(d []byte, a interface{}) error

Decode converts the byte stream into Data (DECODE). /data will be masked as Interface before sent to Consumer or Requester.

func Encode

func Encode(d interface{}) ([]byte, error)

Encode converts the interface into Byte stream (ENCODE).

func SetConfiguration

func SetConfiguration(filePath string) error

SetConfiguration defines the function to read the client side configuration file any configuration data, which need / should be provided by MQ user would be taken directly from the user by asking to fill a structure. THIS DATA DETAILS SHOULD BE DEFINED AS PART OF INTERFACE DEFINITION.

func TLS

func TLS(cCert, cKey, caCert string) (*tls.Config, error)

TLS creates the TLS Configuration object to used by any Broker for Auth and Encryption. The Certficate and Key files are created from Java Keytool generated JKS format files. Please look into README for more information In case of Kafka, we generate the Server certificate files in JKS format. We do the same for Clients as well. Then we convert those files into PEM format.

Types

type KafkaF

type KafkaF struct {

	// KServersInfo defines the list of Kafka Server URI/Nodename:port. DEFAULT = [localhost:9092]
	KServersInfo []string `toml:"KServersInfo"`
	// KTimeout defines the timeout for Kafka Server connection.
	// DEFAULT = 10 (in seconds)
	KTimeout int `toml:"KTimeout"`
	// KAFKACertFile defines the TLS Certificate File for KAFKA. No DEFAULT
	KAFKACertFile string `toml:"KAFKACertFile"`
	// KAFKAKeyFile defines the TLS Key File for KAFKA. No DEFAULT
	KAFKAKeyFile string `toml:"KAFKAKeyFile"`
	// KAFKACAFile defines the KAFKA Certification Authority. No DEFAULT
	KAFKACAFile string `toml:"KAFKACAFile"`
}

KafkaF defines the KAFKA Server connection configurations. This structure will be extended once we are adding the TLS Authentication and Message encoding capability.

type KafkaPacket

type KafkaPacket struct {
	// All common base function objects are defined in this object. This
	// object will support only Publishing and Subscriptions based on KAFKA
	// support. We use KAFKA 2.2.0 with Scala 2.12.
	Packet

	// DialerConn defines the member which can be used for single connection
	// towards KAFKA
	DialerConn *kafka.Dialer

	// ServerInfo  defines list of the KAFKA server with port
	ServersInfo []string
	// contains filtered or unexported fields
}

KafkaPacket defines the KAFKA Message Object. This one conains all the required KAFKA-GO related identifiers to maintain connection with KAFKA servers. For Publishing and Consuming two different Connection used towards Kafka as we are using Reader and Writer IO Stream Integration with RPC call. Because of the way Kafka communication works, we are storing these IO objects as Value for a map & mapped to Channel name for which these objects are created. Apart of Reader or Writer maps, It also maintains the Dialer Object for initial Kafka connection. Current Active Server name too maintained as part of KafkaPacket Object.

func (*KafkaPacket) Accept

func (kp *KafkaPacket) Accept(fn MsgProcess) error

Accept function defines the Consumer or Subscriber functionality for KAFKA. If Reader object for the specified Pipe is not available, New Reader Object would be created. From this function Goroutine "Read" will be invoked to handle the incoming messages.

func (*KafkaPacket) Close

func (kp *KafkaPacket) Close() error

Close will terminate the write connection created for the topic. This API would check just the Writer map for the connection object.

func (*KafkaPacket) Distribute

func (kp *KafkaPacket) Distribute(d interface{}) error

Distribute defines the Producer / Publisher role and functionality. Writer would be created for each Pipe comes-in for communication. If Writer already exists, that connection would be used for this call. Before publishing the message in the specified Pipe, it will be converted into Byte stream using "Encode" API. Encryption is enabled for the message via TLS.

func (*KafkaPacket) Get

func (kp *KafkaPacket) Get(pipe string, d interface{}) interface{}

Get - Not supported for now in Kafka from Message Bus side due to limitations on the quality of the go library implementation. Will be taken-up in future.

func (*KafkaPacket) Read

func (kp *KafkaPacket) Read(fn MsgProcess) error

Read would access the KAFKA messages in a infinite loop. Callback method access is existing only in "goka" library. Not available in "kafka-go".

func (*KafkaPacket) Remove

func (kp *KafkaPacket) Remove() error

Remove will just remove the existing subscription. This API would check just the Reader map as to Distribute / Publish messages, we don't need subscription

type MQBus

type MQBus interface {
	Distribute(data interface{}) error
	Accept(fn MsgProcess) error
	Get(pipe string, d interface{}) interface{}
	Remove() error
	Close() error
}

MQBus Interface defines the Process interface function (Only function user should call). These functions are implemented as part of Packet struct. Distribute - API to Publish Messages into specified Pipe (Topic / Subject) Accept - Consume the incoming message if subscribed by that component Get - Would initiate blocking call to remote process to get response Close - Would disconnect the connection with Middleware.

func Communicator

func Communicator(bt string, messageQueueConfigPath, pipe string) (MQBus, error)

Communicator defines the Broker platform Middleware selection and corresponding communication object would be created to send / receive the messages. Broker type would be stored as part of Connection Object "Packet". TODO: We would be looking into Kafka Synchronous communication API for providing support for Sync Communication Model in MessageBus

type MQF

type MQF struct {
	KafkaF       *KafkaF       `toml:"KAFKA"`
	RedisStreams *RedisStreams `toml:"RedisStreams"`
}

MQF define the configuration File content for KAFKA in Golang structure format. These configurations are embedded into MQF structure for direct access to the data.

var MQ MQF

MQ Create both MQF and KafkaPacket Objects. MQF will be used to store all config information including Server URL, Port, User credentials and other configuration information, which is for Future Expansion.

type MsgProcess

type MsgProcess func(d interface{})

MsgProcess defines the functions for processing accepted messages. Any client who wants to accept and handle the events / notifications / messages, should implement this function as part of their procedure. That same function should be sent to MessageBus as callback for handling the incoming messages.

type Packet

type Packet struct {
	// BrokerType defines the underline MQ platform
	BrokerType string
}

Packet defines all the message related information that Producer or Consumer should know for message transactions. Both Producer and Consumer use this same structure for message transactions. BrokerType - Refer above defined Constants for possible values DataResponder - Refer HandleResponse Type description

type RedisStreams

type RedisStreams struct {
	RedisServerAddress             string `toml:"RedisServerAddress"`
	RedisServerPort                string `toml:"RedisServerPort"`
	SentinalAddress                string `toml:"SentinalAddress"`
	RedisCertFile                  string `toml:"RedisCertFile"`
	RedisKeyFile                   string `toml:"RedisKeyFile"`
	RedisCAFile                    string `toml:"RedisCAFile"`
	RSAPrivateKeyPath              string `toml:"RSAPrivateKeyPath"`
	RedisInMemoryEncryptedPassword string `toml:"RedisInMemoryEncryptedPassword"`
	RSAPrivateKey                  []byte
	RedisInMemoryPassword          []byte
}

RedisStreams defines the Redis connection configurations.

type RedisStreamsPacket

type RedisStreamsPacket struct {
	Packet
	// contains filtered or unexported fields
}

RedisStreamsPacket defines the RedisStreamsPacket Message Packet Object. Apart from Base Packet, it will contain Redis Connection Object

func (*RedisStreamsPacket) Accept

func (rp *RedisStreamsPacket) Accept(fn MsgProcess) error

Accept implmentation need to be added

func (*RedisStreamsPacket) Close

func (rp *RedisStreamsPacket) Close() error

Close implmentation need to be added

func (*RedisStreamsPacket) Distribute

func (rp *RedisStreamsPacket) Distribute(data interface{}) error

Distribute defines the Producer / Publisher role and functionality. Writer would be created for each Pipe comes-in for communication. If Writer already exists, that connection would be used for this call. Before publishing the message in the specified Pipe, it will be converted into Byte stream using

func (*RedisStreamsPacket) Get

func (rp *RedisStreamsPacket) Get(pipe string, d interface{}) interface{}

Get - Not supported for now in RedisStreams from Message Bus side due to limitations

func (*RedisStreamsPacket) Ping

func (rp *RedisStreamsPacket) Ping() bool

Ping function is used to test the db connection with ping command

func (*RedisStreamsPacket) Read

func (rp *RedisStreamsPacket) Read(fn MsgProcess) error

Read implmentation need to be added

func (*RedisStreamsPacket) Remove

func (rp *RedisStreamsPacket) Remove() error

Remove implmentation need to be added

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL