kafku

package
v0.0.0-...-6725017 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 8, 2020 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Port                        = "PORT"
	DocsFile                    = "DOCS_FILE"
	KafkaTopics                 = "KAFKA_TOPICS"
	KafkaBrokers                = "KAFKA_BROKERS"
	KafkaClientID               = "KAFKA_CLIENT_ID"
	KafkaGroupID                = "KAFKA_GROUP_ID"
	RevealVars                  = "REVEAL_VARS"
	GMDataAddr                  = "GM_DATA_ADDR"
	GmDataDerivativeAddr        = "GM_DATA_DERIVATIVE_ADDR"
	MimeTopics                  = "MIME_TOPICS"
	FallbackTopic               = "FALLBACK_TOPIC"
	SpacyURL                    = "SPACY_URL"
	SpacyBodySize               = "SPACY_BODY_SIZE"
	CogitoURL                   = "COGITO_URL"
	CogitoBodySize              = "COGITO_BODY_SIZE"
	InternalServerName          = "INTERNAL_SERVER_NAME"
	InternalServerCrtPath       = "INTERNAL_SERVER_CRT_PATH"
	InternalServerKeyPath       = "INTERNAL_SERVER_KEY_PATH"
	InternalCaCrtPath           = "INTERNAL_CA_CRT_PATH"
	InternalTLSOn               = "INTERNAL_TLS_ON"
	ExternalSourceServerName    = "EXTERNAL_SOURCE_SERVER_NAME"
	ExternalSourceServerCrtPath = "EXTERNAL_SOURCE_SERVER_CRT_PATH"
	ExternalSourceServerKeyPath = "EXTERNAL_SOURCE_SERVER_KEY_PATH"
	ExternalSourceCaCrtPath     = "EXTERNAL_SOURCE_CA_CRT_PATH"
	ChangelogPath               = "CHANGELOG_PATH"
	IndexName                   = "INDEX_NAME"
	DocType                     = "DOCTYPE"
	Derivtype                   = "DERIVTYPE"
	DType                       = "DTYPE"
	SpacyEndpoint               = "SPACY_ENDPOINT"
	RowDtype                    = "ROW_DTYPE"
)

Keys

Variables

View Source
var DefaultConfigVars = Vars{
	Port:                        "8000",
	DocsFile:                    "index.html",
	KafkaTopics:                 "",
	KafkaBrokers:                "",
	KafkaClientID:               "",
	KafkaGroupID:                "",
	RevealVars:                  "KAFKA_BROKERS,KAFKA_TOPICS,KAFKA_CLIENT_ID,KAFKA_GROUP_ID",
	GMDataAddr:                  "",
	GmDataDerivativeAddr:        "",
	MimeTopics:                  "",
	FallbackTopic:               "",
	SpacyURL:                    "",
	SpacyBodySize:               "40000",
	CogitoURL:                   "",
	CogitoBodySize:              "40000",
	InternalServerName:          "",
	InternalServerCrtPath:       "",
	InternalServerKeyPath:       "",
	InternalCaCrtPath:           "",
	InternalTLSOn:               "false",
	ExternalSourceServerName:    "",
	ExternalSourceServerCrtPath: "",
	ExternalSourceServerKeyPath: "",
	ExternalSourceCaCrtPath:     "",
	ChangelogPath:               "Changelog.md",
	IndexName:                   "",
	DocType:                     "",
	Derivtype:                   "",
	DType:                       "",
	SpacyEndpoint:               "",
	RowDtype:                    "",
}

DefaultConfigVars are the custom config vars to be read into Viper

Functions

func AcmExists

func AcmExists(msg *sarama.ConsumerMessage) (acmExists bool)

AcmExists checks is the acm is not nil or blank

func CleanupText

func CleanupText(text string) (cleanedText []byte, err error)

CleanupText cleans out bad chars

func GenerateUniqueRandomMessage

func GenerateUniqueRandomMessage() (message string, err error)

GenerateUniqueRandomMessage is a test-only function for creating a unique mock GM data event message

func GenerateUniqueRandomMessages

func GenerateUniqueRandomMessages(numMessages int) (messages []string, err error)

GenerateUniqueRandomMessages is a test-only function for creating a unique mock GM data event message

func HandleTopics

func HandleTopics(onConsume OnConsumeFunc, group sarama.ConsumerGroup, topics []string) (err error)

HandleTopics consumes messages with a given function as part of a given consumer group

func NewConsumerGroup

func NewConsumerGroup(addrs []string, groupID string, config *sarama.Config) (group sarama.ConsumerGroup, err error)

NewConsumerGroup initializes a new sarama ConsumerGroup with the provided config values

func NewDefaultConfig

func NewDefaultConfig(clientID string) (config *sarama.Config)

NewDefaultConfig returns a Config pointer to be used by a ConsumerGroup

func ProduceZ

func ProduceZ(zfighter string)

ProduceZ ...

func PublishFighterMessage

func PublishFighterMessage(p sarama.AsyncProducer, pCount int, topic string, zfighter string)

PublishFighterMessage publishes an event message when a fighter has been added to the ES index

func PublishRandomMessages

func PublishRandomMessages(p sarama.AsyncProducer, pCount int, topic string) (err error)

PublishRandomMessages is a test-only function for publishing mock GM data event messages

func RandomIndexName

func RandomIndexName(stringLength int) (name string)

RandomIndexName creates random string

func RandomKafkaTopics

func RandomKafkaTopics(numString int, stringLength int) (names []string)

RandomKafkaTopics creates random strings to be used for kafka topic names

Types

type GroupHandler

type GroupHandler struct {
	// contains filtered or unexported fields
}

GroupHandler implements the ConsumerGroupHandler interface in order to consume Kafka messages

func (*GroupHandler) Cleanup

func (*GroupHandler) Cleanup(session sarama.ConsumerGroupSession) error

Cleanup runs when the GroupHandler is done

func (*GroupHandler) ConsumeClaim

func (h *GroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (err error)

ConsumeClaim runs a consume func that is passed through the GroupHandler every time a message is pulled from claim

func (*GroupHandler) Setup

func (h *GroupHandler) Setup(session sarama.ConsumerGroupSession) error

Setup runs when the GroupHandler starts consuming messages

type OnConsumeFunc

type OnConsumeFunc func(*sarama.ConsumerMessage) error

OnConsumeFunc is a func type that handles consumer messages received from kafka

type Vars

type Vars map[string]string

Vars is a list of key value pairs

func DefaultVars

func DefaultVars() (defaultVars Vars)

DefaultVars returns a map of default config vars to be used by viper

func NewVars

func NewVars(defaultVars map[string]string) (v Vars)

NewVars return new config variables

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL