kafka

package
v0.2.2-0...-13ba25a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2020 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultCLIHome = os.ExpandEnv("$HOME/.kafka")

DefaultCLIHome : is the home path

View Source
var ModuleCdc *codec.Codec

module codec

View Source
var SleepRoutine = time.Duration(2500000000)

SleepRoutine : the time the kafka msgs are to be taken in

View Source
var SleepTimer = time.Duration(1000000000)

SleepTimer : the time the kafka msgs are to be taken in

View Source
var TicketIDAtomicCounter int64

TicketIDAtomicCounter is a counter that adds when each time a function is called

View Source
var Topics = []string{
	"Topic",
}

Topics : is list of topics

Functions

func AddResponseToDB

func AddResponseToDB(ticketID Ticket, response []byte, kafkaDB *dbm.GoLevelDB, cdc *codec.Codec)

AddResponseToDB : Updates response to DB

func CliCtxFromKafkaMsg

func CliCtxFromKafkaMsg(msg KafkaMsg, cliCtx context.CLIContext) context.CLIContext

CliCtxFromKafkaMsg : sets the txctx and clictx again to consume

func GetResponseFromDB

func GetResponseFromDB(ticketID Ticket, kafkaDB *dbm.GoLevelDB, cdc *codec.Codec) []byte

GetResponseFromDB : gives the response from DB

func KafkaAdmin

func KafkaAdmin(kafkaPorts []string) sarama.ClusterAdmin

KafkaAdmin : is admin to create topics

func KafkaProducerDeliverMessage

func KafkaProducerDeliverMessage(msg KafkaMsg, topic string, producer sarama.SyncProducer, cdc *codec.Codec) error

KafkaProducerDeliverMessage : delivers messages to kafka

func NewConsumer

func NewConsumer(kafkaPorts []string) sarama.Consumer

NewConsumer : is a consumer which is needed to create child consumers to consume topics

func NewProducer

func NewProducer(kafkaPorts []string) sarama.SyncProducer

NewProducer is a producer to send messages to kafka

func PartitionConsumers

func PartitionConsumers(consumer sarama.Consumer, topic string) sarama.PartitionConsumer

PartitionConsumers : is a child consumer

func QueryDB

func QueryDB(cdc *codec.Codec, r *mux.Router, kafkaDB *dbm.GoLevelDB) http.HandlerFunc

QueryDB : REST outputs info from DB

func RegisterCodec

func RegisterCodec(cdc *codec.Codec)

Register concrete types on codec

func SendToKafka

func SendToKafka(msg KafkaMsg, kafkaState KafkaState, cdc *codec.Codec) []byte

SendToKafka : handles sending message to kafka

func SetTicketIDtoDB

func SetTicketIDtoDB(ticketID Ticket, kafkaDB *dbm.GoLevelDB, cdc *codec.Codec, msg []byte)

SetTicketIDtoDB : initiates ticketid in Database

func TopicsInit

func TopicsInit(admin sarama.ClusterAdmin, topic string)

TopicsInit : is needed to initialise topics

Types

type KafkaCliCtx

type KafkaCliCtx struct {
	OutputFormat  string
	Height        int64
	NodeURI       string
	From          string
	TrustNode     bool
	UseLedger     bool
	BroadcastMode string
	VerifierHome  string
	Simulate      bool
	GenerateOnly  bool
	FromAddress   sdk.AccAddress
	FromName      string
	Indent        bool
	SkipConfirm   bool
}

KafkaCliCtx : client tx without codec

type KafkaMsg

type KafkaMsg struct {
	Msg         sdk.Msg      `json:"msg"`
	TicketID    Ticket       `json:"ticketID"`
	BaseRequest rest.BaseReq `json:"base_req"`
	KafkaCli    KafkaCliCtx  `json:"kafkaCliCtx"`
	Password    string       `json:"password"`
	Mode        string       `json:"mode"`
}

KafkaMsg : is a store that can be stored in kafka queues

func KafkaTopicConsumer

func KafkaTopicConsumer(topic string, consumers map[string]sarama.PartitionConsumer, cdc *codec.Codec) KafkaMsg

KafkaTopicConsumer : Takes a consumer and makes it consume a topic message at a time

func NewKafkaMsgFromRest

func NewKafkaMsgFromRest(msg sdk.Msg, ticketID Ticket, baseRequest rest.BaseReq, cliCtx context.CLIContext, mode string, password string) KafkaMsg

NewKafkaMsgFromRest : makes a msg to send to kafka queue

type KafkaState

type KafkaState struct {
	KafkaDB   *dbm.GoLevelDB
	Admin     sarama.ClusterAdmin
	Consumer  sarama.Consumer
	Consumers map[string]sarama.PartitionConsumer
	Producer  sarama.SyncProducer
	Topics    []string
}

KafkaState : is a struct showing the state of kafka

func NewKafkaState

func NewKafkaState(kafkaPorts []string) KafkaState

NewKafkaState : returns a kafka state

type Ticket

type Ticket string

Ticket : is a type that implements string

func TicketIDGenerator

func TicketIDGenerator(prefix string) Ticket

TicketIDGenerator is a random unique ticket ID generator, output is a string

type TicketIDResponse

type TicketIDResponse struct {
	TicketID Ticket `json:"TicketID" valid:"required~TicketID is mandatory,length(20)~RelayerAddress length should be 20" `
}

TicketIDResponse : is a json structure to send TicketID to user

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL