Documentation ¶
Index ¶
- Variables
- func Close()
- func ConfigSaramaInterceptor(config *sarama.Config, client *Client)
- func Init(token, host string, config interface{}, options ...Option) *sarama.Config
- func InitializeNatsConnection(token, host string) error
- type Client
- type ClientConfig
- type ClientCounters
- type ClientReconnectionUpdateReq
- type ClientTypeUpdateReq
- type ClientUpdateSub
- type GetSchemaReq
- type Option
- type Options
- type RegisterReq
- type RegisterResp
- type SaramaConsumerInterceptor
- type SaramaProducerInterceptor
- type SchemaUpdateReq
- type TopicsPartitionsPerProducerConsumer
- type Update
Constants ¶
This section is empty.
Variables ¶
View Source
var BrokerConnection *nats.Conn
View Source
var Clients map[int]*Client
View Source
var JSContext nats.JetStreamContext
View Source
var NatsConnectionID string
Functions ¶
func ConfigSaramaInterceptor ¶
Types ¶
type Client ¶
type Client struct { ClientID int AccountName string NatsConnectionID string IsConsumer bool IsProducer bool LearningFactor int LearningFactorCounter int LearningRequestSent bool GetSchemaRequestSent bool ProducerProtoDesc protoreflect.MessageDescriptor ProducerSchemaID string ConsumerProtoDescMap map[string]protoreflect.MessageDescriptor Counters ClientCounters Config ClientConfig }
func (*Client) RegisterClient ¶
func (*Client) SendClientTypeUpdateReq ¶
func (*Client) SendLearningMessage ¶
func (*Client) SendRegisterSchemaReq ¶
func (c *Client) SendRegisterSchemaReq()
func (*Client) SentGetSchemaRequest ¶
func (*Client) SubscribeUpdates ¶
type ClientConfig ¶
type ClientConfig struct { ClientType string `json:"client_type"` ProducerMaxMessageBytes int `json:"producer_max_messages_bytes"` ProducerRequiredAcks string `json:"producer_required_acks"` ProducerTimeout time.Duration `json:"producer_timeout"` ProducerRetryMax int `json:"producer_retry_max"` ProducerRetryBackoff time.Duration `json:"producer_retry_backoff"` ProducerReturnErrors bool `json:"producer_return_errors"` ProducerReturnSuccesses bool `json:"producer_return_successes"` ProducerFlushMaxMessages int `json:"producer_flush_max_messages"` ProducerCompressionLevel string `json:"producer_compression_level"` ConsumerFetchMin int32 `json:"consumer_fetch_min"` ConsumerFetchDefault int32 `json:"consumer_fetch_default"` ConsumerRetryBackOff time.Duration `json:"consumer_retry_backoff"` ConsumerMaxWaitTime time.Duration `json:"consumer_max_wait_time"` ConsumerMaxProcessingTime time.Duration `json:"consumer_mex_processing_time"` ConsumerReturnErrors bool `json:"consumer_return_errors"` ConsumerOffsetAutoCommitEnable bool `json:"consumer_offset_auto_commit_enable"` ConsumerOffsetAutoCommintInterval time.Duration `json:"consumer_offset_auto_commit_interval"` ConsumerOffsetsInitial int `json:"consumer_offsets_initial"` ConsumerOffsetsRetryMax int `json:"consumer_offsets_retry_max"` ConsumerGroupSessionTimeout time.Duration `json:"consumer_group_session_timeout"` ConsumerGroupHeartBeatInterval time.Duration `json:"consumer_group_heart_beat_interval"` ConsumerGroupRebalanceTimeout time.Duration `json:"consumer_group_rebalance_timeout"` ConsumerGroupRebalanceRetryMax int `json:"consumer_group_rebalance_retry_max"` ConsumerGroupRebalanceRetryBackOff time.Duration `json:"consumer_group_rebalance_retry_back_off"` ConsumerGroupRebalanceResetInvalidOffsets bool `json:"consumer_group_rebalance_reset_invalid_offsets"` ConsumerGroupId string `json:"consumer_group_id"` Servers string `json:"servers"` ProducerTopicsPartitions map[string][]int32 `json:"producer_topics_partitions"` ConsumerTopicsPartitions map[string][]int32 `json:"consumer_group_topics_partitions"` }
func ConfigHandler ¶
func ConfigHandler(clientType string, config *sarama.Config) ClientConfig
type ClientCounters ¶
type ClientCounters struct { TotalBytesBeforeReduction int64 `json:"total_bytes_before_reduction"` TotalBytesAfterReduction int64 `json:"total_bytes_after_reduction"` TotalMessagesSuccessfullyProduce int `json:"total_messages_successfully_produce"` TotalMessagesSuccessfullyConsumed int `json:"total_messages_successfully_consumed"` TotalMessagesFailedProduce int `json:"total_messages_failed_produce"` TotalMessagesFailedConsume int `json:"total_messages_failed_consume"` }
type ClientTypeUpdateReq ¶
type ClientUpdateSub ¶
func (*ClientUpdateSub) SubscriptionHandler ¶
func (c *ClientUpdateSub) SubscriptionHandler() nats.MsgHandler
func (*ClientUpdateSub) UpdatesHandler ¶
func (c *ClientUpdateSub) UpdatesHandler()
type GetSchemaReq ¶
type GetSchemaReq struct {
SchemaID string `json:"schema_id"`
}
type Options ¶
func GetDefaultOptions ¶
func GetDefaultOptions() Options
type RegisterReq ¶
type RegisterReq struct { NatsConnectionID string `json:"nats_connection_id"` Language string `json:"language"` Version string `json:"version"` LearningFactor int `json:"learning_factor"` Config ClientConfig `json:"config"` }
type RegisterResp ¶
type SaramaConsumerInterceptor ¶
type SaramaConsumerInterceptor struct {
Client *Client
}
func (*SaramaConsumerInterceptor) OnConsume ¶
func (s *SaramaConsumerInterceptor) OnConsume(msg *sarama.ConsumerMessage)
type SaramaProducerInterceptor ¶
type SaramaProducerInterceptor struct {
Client *Client
}
func (*SaramaProducerInterceptor) OnSend ¶
func (s *SaramaProducerInterceptor) OnSend(msg *sarama.ProducerMessage)
type SchemaUpdateReq ¶
Source Files ¶
Click to show internal directories.
Click to hide internal directories.