messagequeue

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2022 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultMaxBatchBytes                  = 1e6
	DefaultUpdatePublishChannelBufferSize = 1000
)
View Source
const (
	InitResponseTimeout = 1 * time.Minute
)

Variables

This section is empty.

Functions

func CreateTopics

func CreateTopics(
	brokerAddress string, dialer *kafka.Dialer, topics []string) error

func DeleteTopics

func DeleteTopics(
	brokerAddress string, dialer *kafka.Dialer, topics []string) error

[TODO?] deleteTopics() is not called in CloseMessageQueues() because subscribers are disconnected to the topics created by a publisher in the case that it restarted.

func GetGroupId

func GetGroupId(idElements []string) string

func HasTopics

func HasTopics(
	brokerAddress string, dialer *kafka.Dialer, topics []string) (
	bool, error)

func IsInitRequestforFinish

func IsInitRequestforFinish(command int8) bool

func IsInitRequestforStart

func IsInitRequestforStart(command int8) bool

Types

type InitCommand

type InitCommand struct {
	// contains filtered or unexported fields
}

func NewInitCommand

func NewInitCommand(groupId string) *InitCommand

func (*InitCommand) CreateInitRequestForFinish

func (ic *InitCommand) CreateInitRequestForFinish() (*kafka.Message, error)

func (*InitCommand) CreateInitRequestforStart

func (ic *InitCommand) CreateInitRequestforStart() (*kafka.Message, error)

func (*InitCommand) CreateInitResponseForCreateTopic

func (ic *InitCommand) CreateInitResponseForCreateTopic(
	errMessage string) (*kafka.Message, error)

func (*InitCommand) CreateInitResponseForNotFindData

func (ic *InitCommand) CreateInitResponseForNotFindData() (
	*kafka.Message, error)

func (*InitCommand) GetGroupId

func (ic *InitCommand) GetGroupId() string

func (*InitCommand) IsInitResponseforCreateTopic

func (ic *InitCommand) IsInitResponseforCreateTopic(
	message *kafka.Message) (bool, error)

func (*InitCommand) IsInitResponseforNotFindData

func (ic *InitCommand) IsInitResponseforNotFindData(
	message *kafka.Message) (bool, error)

type InitCommandMessage

type InitCommandMessage struct {
	Command    int8   `json:"command"`
	GroupId    string `json:"filePath"`
	ErrMessage string `json:"errMessage,omitempty"`
}

func DecodeInitCommand

func DecodeInitCommand(
	message *kafka.Message) (*InitCommandMessage, error)

type MessageQueueConfig

type MessageQueueConfig struct {
	// contains filtered or unexported fields
}

func GetMessageQueueConfig

func GetMessageQueueConfig(
	dataClient coreclientset.Interface, kubeClient kubernetes.Interface,
	messageQueueRef *corev1.ObjectReference) (*MessageQueueConfig, error)

func NewMessageQueueConfig

func NewMessageQueueConfig(
	brokers []string, user string, password string) *MessageQueueConfig

func (*MessageQueueConfig) CreateSaslDialer

func (mqc *MessageQueueConfig) CreateSaslDialer() (*kafka.Dialer, error)

func (*MessageQueueConfig) GetBrokers

func (mqc *MessageQueueConfig) GetBrokers() []string

func (*MessageQueueConfig) GetPassword

func (mqc *MessageQueueConfig) GetPassword() string

func (*MessageQueueConfig) GetUser

func (mqc *MessageQueueConfig) GetUser() string

type MessageQueueInitTopics

type MessageQueueInitTopics struct {
	Init     string
	Request  string
	Response string
}

type MessageQueueTopic

type MessageQueueTopic struct {
	// contains filtered or unexported fields
}

func NewMessageQueueFileSystemTopic

func NewMessageQueueFileSystemTopic(nameElements []string) *MessageQueueTopic

func NewMessageQueueRdbTopic

func NewMessageQueueRdbTopic(nameElements []string) *MessageQueueTopic

func (*MessageQueueTopic) CreateInitTopics

func (mqt *MessageQueueTopic) CreateInitTopics() (
	*MessageQueueInitTopics, error)

func (*MessageQueueTopic) CreateUpdateTopic

func (mqt *MessageQueueTopic) CreateUpdateTopic() (string, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL