superstream

package module
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var BrokerConnection *nats.Conn
View Source
var Clients map[int]*Client
View Source
var JSContext nats.JetStreamContext
View Source
var NatsConnectionID string

Functions

func Close

func Close()

func ConfigSaramaInterceptor

func ConfigSaramaInterceptor(config *sarama.Config, client *Client)

func Init

func Init(token, host string, config interface{}, options ...Option) *sarama.Config

func InitializeNatsConnection

func InitializeNatsConnection(token, host string) error

Types

type Client

type Client struct {
	ClientID              int
	AccountName           string
	NatsConnectionID      string
	IsConsumer            bool
	IsProducer            bool
	LearningFactor        int
	LearningFactorCounter int
	LearningRequestSent   bool
	GetSchemaRequestSent  bool
	ProducerProtoDesc     protoreflect.MessageDescriptor
	ProducerSchemaID      string
	ConsumerProtoDescMap  map[string]protoreflect.MessageDescriptor
	Counters              ClientCounters
	Config                ClientConfig
}

func (*Client) RegisterClient

func (c *Client) RegisterClient() error

func (*Client) SendClientTypeUpdateReq

func (c *Client) SendClientTypeUpdateReq(clientType string)

func (*Client) SendLearningMessage

func (c *Client) SendLearningMessage(msg []byte)

func (*Client) SendRegisterSchemaReq

func (c *Client) SendRegisterSchemaReq()

func (*Client) SentGetSchemaRequest

func (c *Client) SentGetSchemaRequest(schemaID string) error

func (*Client) SubscribeUpdates

func (c *Client) SubscribeUpdates() error

type ClientConfig

type ClientConfig struct {
	ClientType                                string             `json:"client_type"`
	ProducerMaxMessageBytes                   int                `json:"producer_max_messages_bytes"`
	ProducerRequiredAcks                      string             `json:"producer_required_acks"`
	ProducerTimeout                           time.Duration      `json:"producer_timeout"`
	ProducerRetryMax                          int                `json:"producer_retry_max"`
	ProducerRetryBackoff                      time.Duration      `json:"producer_retry_backoff"`
	ProducerReturnErrors                      bool               `json:"producer_return_errors"`
	ProducerReturnSuccesses                   bool               `json:"producer_return_successes"`
	ProducerFlushMaxMessages                  int                `json:"producer_flush_max_messages"`
	ProducerCompressionLevel                  string             `json:"producer_compression_level"`
	ConsumerFetchMin                          int32              `json:"consumer_fetch_min"`
	ConsumerFetchDefault                      int32              `json:"consumer_fetch_default"`
	ConsumerRetryBackOff                      time.Duration      `json:"consumer_retry_backoff"`
	ConsumerMaxWaitTime                       time.Duration      `json:"consumer_max_wait_time"`
	ConsumerMaxProcessingTime                 time.Duration      `json:"consumer_mex_processing_time"`
	ConsumerReturnErrors                      bool               `json:"consumer_return_errors"`
	ConsumerOffsetAutoCommitEnable            bool               `json:"consumer_offset_auto_commit_enable"`
	ConsumerOffsetAutoCommintInterval         time.Duration      `json:"consumer_offset_auto_commit_interval"`
	ConsumerOffsetsInitial                    int                `json:"consumer_offsets_initial"`
	ConsumerOffsetsRetryMax                   int                `json:"consumer_offsets_retry_max"`
	ConsumerGroupSessionTimeout               time.Duration      `json:"consumer_group_session_timeout"`
	ConsumerGroupHeartBeatInterval            time.Duration      `json:"consumer_group_heart_beat_interval"`
	ConsumerGroupRebalanceTimeout             time.Duration      `json:"consumer_group_rebalance_timeout"`
	ConsumerGroupRebalanceRetryMax            int                `json:"consumer_group_rebalance_retry_max"`
	ConsumerGroupRebalanceRetryBackOff        time.Duration      `json:"consumer_group_rebalance_retry_back_off"`
	ConsumerGroupRebalanceResetInvalidOffsets bool               `json:"consumer_group_rebalance_reset_invalid_offsets"`
	ConsumerGroupId                           string             `json:"consumer_group_id"`
	Servers                                   string             `json:"servers"`
	ProducerTopicsPartitions                  map[string][]int32 `json:"producer_topics_partitions"`
	ConsumerTopicsPartitions                  map[string][]int32 `json:"consumer_group_topics_partitions"`
}

func ConfigHandler

func ConfigHandler(clientType string, config *sarama.Config) ClientConfig

type ClientCounters

type ClientCounters struct {
	TotalBytesBeforeReduction         int64 `json:"total_bytes_before_reduction"`
	TotalBytesAfterReduction          int64 `json:"total_bytes_after_reduction"`
	TotalMessagesSuccessfullyProduce  int   `json:"total_messages_successfully_produce"`
	TotalMessagesSuccessfullyConsumed int   `json:"total_messages_successfully_consumed"`
	TotalMessagesFailedProduce        int   `json:"total_messages_failed_produce"`
	TotalMessagesFailedConsume        int   `json:"total_messages_failed_consume"`
}

type ClientReconnectionUpdateReq

type ClientReconnectionUpdateReq struct {
	NewNatsConnectionID string `json:"new_nats_connection_id"`
	ClientID            int    `json:"client_id"`
}

type ClientTypeUpdateReq

type ClientTypeUpdateReq struct {
	ClientID int    `json:"client_id"`
	Type     string `json:"type"`
}

type ClientUpdateSub

type ClientUpdateSub struct {
	ClientID     int
	Subscription *nats.Subscription
	UpdateCahn   chan Update
}

func (*ClientUpdateSub) SubscriptionHandler

func (c *ClientUpdateSub) SubscriptionHandler() nats.MsgHandler

func (*ClientUpdateSub) UpdatesHandler

func (c *ClientUpdateSub) UpdatesHandler()

type GetSchemaReq

type GetSchemaReq struct {
	SchemaID string `json:"schema_id"`
}

type Option

type Option func(*Options) error

func ConsumerGroup

func ConsumerGroup(consumerGroup string) Option

func LearningFactor

func LearningFactor(learningFactor int) Option

func Servers

func Servers(servers []string) Option

type Options

type Options struct {
	Host           string
	LearningFactor int
	ConsumerGroup  string
	Servers        string
}

func GetDefaultOptions

func GetDefaultOptions() Options

type RegisterReq

type RegisterReq struct {
	NatsConnectionID string       `json:"nats_connection_id"`
	Language         string       `json:"language"`
	Version          string       `json:"version"`
	LearningFactor   int          `json:"learning_factor"`
	Config           ClientConfig `json:"config"`
}

type RegisterResp

type RegisterResp struct {
	ClientID       int    `json:"client_id"`
	AccountName    string `json:"account_name"`
	LearningFactor int    `json:"learning_factor"`
}

type SaramaConsumerInterceptor

type SaramaConsumerInterceptor struct {
	Client *Client
}

func (*SaramaConsumerInterceptor) OnConsume

type SaramaProducerInterceptor

type SaramaProducerInterceptor struct {
	Client *Client
}

func (*SaramaProducerInterceptor) OnSend

type SchemaUpdateReq

type SchemaUpdateReq struct {
	MasterMsgName string `json:"master_msg_name"`
	FileName      string `json:"file_name"`
	SchemaID      string `json:"schema_id"`
	Desc          []byte `json:"desc"`
}

type TopicsPartitionsPerProducerConsumer

type TopicsPartitionsPerProducerConsumer struct {
	ProducerTopicsPartitions map[string][]int32 `json:"producer_topics_partitions"`
	ConsumerTopicsPartitions map[string][]int32 `json:"consumer_group_topics_partitions"`
}

type Update

type Update struct {
	Type    string
	Payload []byte
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL