kafka

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 6, 2021 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	KAFKA_SSL                          = "ssl"
	KAFKA_SASL                         = "sasl_ssl"
	ENV_KEY_KAFKA_AUTH_MODE            = "KAFKA_AUTH_MODE"
	ENV_KEY_KAFKA_CA_LOCATION          = "KAFKA_CA_LOCATION"
	ENV_KEY_KAFKA_CERTIFICATE_LOCATION = "KAFKA_CERTIFICATE_LOCATION"
	ENV_KEY_KAFKA_KEY_LOCATION         = "KAFKA_KEY_LOCATION"
	ENV_KEY_KAFKA_KEY_PASSWORD         = "KAFKA_KEY_PASSWORD"
	ENV_KEY_KAFKA_KEY_SASL_USER        = "KAFKA_KEY_SASL_USER"
	ENV_KEY_KAFKA_KEY_SASL_PASSWORD    = "KAFKA_KEY_SASL_PASSWORD"
	ENV_KEY_KAFKA_KEY_SASL_MECHANISM   = "KAFKA_KEY_SASL_MECHANISM"
	ENV_KEY_KAFKA_BROKER_URL           = "KAFKA_BROKER_URL"
	ENV_KEY_KAFKA_GROUP_ID             = "KAFKA_GROUP_ID"
	ENV_KEY_KAFKA_AUTO_OFF_RESET       = "KAFKA_AUTO_OFF_RESET"
	PAYMENT_TOPIC                      = "PAYMENT"
	QUOTES_TOPIC                       = "QUOTES"
	FEE_TOPIC                          = "FEE"
	TRANSACTION_TOPIC                  = "TRANSACTIONS"
	REQUEST_TOPIC                      = "_req"
	RESPONSE_TOPIC                     = "_res"
	ANCHOR_REDEMPTION_TOPIC            = "ANCHOR_REDEMPTION" + REQUEST_TOPIC
)

Variables

View Source
var LOGGER = logging.MustGetLogger("kafka")

Functions

This section is empty.

Types

type KafkaOpreations

type KafkaOpreations struct {
	BrokerURL              string
	AutoOffReset           string
	SecurityProtocol       string
	SslCaLocation          string
	SslCertificateLocation string
	SslKeyLocation         string
	SslKeyPassword         string
	SaslUsername           string
	SaslPassword           string
	SaslMechanism          string
	Producer               *kafka.Producer
	Consumers              []*kafka.Consumer
	GroupId                string
	//used only by send-service
	FundHandler      transaction.CreateFundingOpereations
	SignHandler      signing.CreateSignOperations
	WhitelistHandler whitelist_handler.ParticipantWhiteList
	DbClient         *database.PostgreDatabaseClient
	ResponseHandler  *parse.ResponseHandler
}

func Initialize

func Initialize() (*KafkaOpreations, error)

func (*KafkaOpreations) Consume

func (ops *KafkaOpreations) Consume(data chan<- []byte, topic string, consumerIndex int)

func (*KafkaOpreations) InitPaymentConsumer

func (ops *KafkaOpreations) InitPaymentConsumer(groupId string, router func(string, []byte, *KafkaOpreations)) error

func (*KafkaOpreations) InitProducer

func (ops *KafkaOpreations) InitProducer() error

func (*KafkaOpreations) Produce

func (ops *KafkaOpreations) Produce(topic string, msg []byte) error

func (*KafkaOpreations) SendErrMsg

func (op *KafkaOpreations) SendErrMsg(msgId, instructionId, standardType, reqMsgType, ofiId, rfiId string, errType int)

Send back errors happened on RFI site during request processing to OFI

func (*KafkaOpreations) SendRequestToKafka

func (op *KafkaOpreations) SendRequestToKafka(topic string, msg []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL