kafka

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 17, 2023 License: Apache-2.0 Imports: 28 Imported by: 0

README

tRPC-Go kafka plugin

English | 中文

wrapping community sarama, used with trpc.

producer client

client:                                            # Backend configuration for client calls.
  service:                                         # Configuration for the backend.
    - name: trpc.app.server.producer               # producer service name,define by yourself.        
      target: kafka://ip1:port1,ip2:port2?topic=YOUR_TOPIC&clientid=xxx&compression=xxx
      timeout: 800                                 # The maximum processing time of the current request.
package main

import (
  "time"
  "context"

  "trpc.group/trpc-go/trpc-database/kafka"
  "trpc.group/trpc-go/trpc-go/client"
)

func (s *server) SayHello(ctx context.Context, req *pb.ReqBody, rsp *pb.RspBody)( err error ) {
  proxy := kafka.NewClientProxy("trpc.app.server.producer") // The service name is customized 
  // and is mainly used for monitoring reporting and addressing configuration items.
  
  // kafka function call.
  err := proxy.Produce(ctx, key, value)

  // producer native interface, return offset、partition.
  partition, offset, err := proxy.SendSaramaMessage(ctx, sarama.ProducerMessage{
    Topic: "your_topic",
    Value: sarama.ByteEncoder(msg),
  })

  // Business logic
  // ...
}

consumer service

server:                                                                                   # Server configuration.
  service:                                                                                # The service provided by the business service can have multiple.
    - name: trpc.app.server.consumer                                                      # The routing name of the service is currently fixed.
      address: ip1:port1,ip2:port2?topics=topic1,topic2&group=xxx&version=x.x.x.x         # kafka consumer broker address,version default 1.1.1.0, partial version ckafka need to specify 0.10.2.0.
      protocol: kafka                                                                     # Application layer protocol.  
      timeout: 1000                                                                       # The maximum request processing time, in milliseconds. Framework configuration, not related to sarama configuration.

package main

import (
  "context"

  "trpc.group/trpc-go/trpc-database/kafka"
  trpc "trpc.group/trpc-go/trpc-go"
  "github.com/Shopify/sarama"
)

func main() {
  s := trpc.NewServer()
  // To use a custom addr, it needs to be called before starting the server
  /*
    cfg := kafka.GetDefaultConfig()
    cfg.ClientID = "newClientID"
    kafka.RegisterAddrConfig("address", cfg)
  */
  // In the case of starting multiple consumers, multiple services can be configured, and then any matching here kafka.RegisterHandlerService(s.Service("name"), handle), If no name is specified, it means that all services share the same handler
  kafka.RegisterKafkaHandlerService(s, handle) 
  s.Serve()
}

// Only when nil is returned successfully will the consumption be confirmed successfully.
// If the return fails, it will be determined whether to repeat the consumption according to the message attribute.
func handle(ctx context.Context, msg *sarama.ConsumerMessage) error {
  return nil
}
batch consumer
import (
  "context"
 
  "trpc.group/trpc-go/trpc-database/kafka"
  trpc "trpc.group/trpc-go/trpc-go"
  "github.com/Shopify/sarama"
)

func main() {
  s := trpc.NewServer()
  kafka.RegisterBatchHandlerService(s, handle) 
  s.Serve()
}

// Note: The batch parameter (>0) must be configured, if the batch parameter is not configured, the consumption processing function will not match and the consumption will fail.
// For complete usage examples, refer to examples/batchconsumer.
// Only when nil is returned successfully will the consumption be confirmed successfully, and all messages in the entire batch will be re-consumed if err is returned.
func handle(ctx context.Context, msgArray []*sarama.ConsumerMessage) error {
  // ...
  return nil
}

If you need to configure your own parameters, you can use:

cfg := kafka.GetDefaultConfig()
// Update own cfg properties.
kafka.RegisterAddrConfig("address", cfg) // address is the address filled in your configuration.

Parameter Description

producer
client:
  service:
    - name: trpc.app.server.producer
      target: kafka://ip1:port1,ip2:port2?topic=YOUR_TOPIC&clientid=xxx&compression=xxx
      timeout: 800
    - name: trpc.app.server.producer1   # Polaris Naming access does not support dynamic changes. If the Polaris IP changes, the service needs to be restarted to take effect. If you need to get all Polaris nodes, you need to pay attention to Q14
      target: kafka://YOUR_SERVICE_NAME?topic=YOUR_TOPIC&clientid=xxx&compression=xxx&discover=polaris&namespace=Development 
      timeout: 800 
parameter name meaning Optional value & description
ip:port address list Separate multiple addresses with commas(','),support domain:port, ip:porta. Not supported yet cl5.
clientid producer ID If it is [wormhole] kafka (PCG internal version), you need to register on the management page.
topic producer topic call Produce required. If it is [wormhole] kafka, need to register on the management page.
version client version Version numbers in the following two formats are supported: 0.1.1.0 / 1.1.0
partitioner message partition
random: sarama.NewRandomPartitioner(Defaults);
roundrobin: sarama.NewRoundRobinPartitioner;
hash: sarama.NewHashPartitioner No custom hash method yet;
compression compression method
none : sarama.CompressionNone;
gzip : sarama.CompressionGZIP(Defaults);
snappy : sarama.CompressionSnappy;
lz4 :sarama.CompressionLZ4;
zstd equal sarama.CompressionZSTD;
maxMessageBytes The maximum length Msg Defaults 131072
requiredAcks need a return receipt
When producing a message, the broker returns the message acknowledgment (ack) mode, which supports the following 3 values(0/1/-1):
0: NoResponse,No need to wait for broker response.
1: WaitForLocal, wait for the response from the local (leader node) to return.
-1: WaitForAll, wait for all nodes (leader node and all In Sync Replication follower nodes) to respond and return. (Defaults)
maxRetry failed max retries The maximum number of retries for production messages, the default is 3 times, note: must be greater than or equal to 0, otherwise an error will be reported.
retryInterval failure retry interval The unit is milliseconds, the default is 100ms.
trpcMeta transfer trpc meta to sarama header true means enable transfer, false means not transfer, the default is false
discover for service discovery type For example:polaris
namespace service namespace For example:Development
idempotent to start generators idempotent true/false,the default is false |
consumer
server:
  service:
    - name: trpc.app.server.consumer
      address: ip1:port1,ip2:port2?topics=topic1,topic2&group=xxx&version=x.x.x.x
      protocol: kafka
      timeout: 1000
    - name: trpc.app.server.consumer1   # Polaris Naming access does not support dynamic changes. If the Polaris IP changes, the service needs to be restarted to take effect. If you need to get all Polaris nodes, you need to pay attention to Q14.
      address: YOUR_SERVICE_NAME?topics=topic1,topic2&group=xxx&version=x.x.x.x&discover=polaris&namespace=Development
      protocol: kafka                                            
      timeout: 1000    # Maximum request processing time unit milliseconds
parameter name meaning Optional value & description
ip:port list address Separate multiple addresses with commas(','),support domain:port, ip:porta. Not supported yet cl5.
group consumer group If it is [wormhole] kafka, need to register on the management page.
clientid client id connect kafka client id
topics consumer toipc multiple separated by commas
compression compression method
strategy strategy
sticky: sarama.BalanceStrategySticky;
range : sarama.BalanceStrategyRange;
roundrobin: sarama.BalanceStrategyRoundRobin;
fetchDefault The default size (bytes) of the pulled message. If the actual size of the message is larger than this value, the memory space needs to be reallocated, which will affect performance. It is equivalent to sarama.Config.Consumer.Fetch.Default default 524288
fetchMax The maximum size (bytes) of the message to be pulled. If the actual size of the message is greater than this value, an error will be reported directly, which is equivalent to sarama.Config.Consumer.Fetch.Max default 1048576
batch batch number It is required when using batch consumption. When registering a batch consumption function, if the batch is not filled in, the parameters will not match and the consumption will fail. Use reference examples/batchconsumer
batchFlush batch flust time The default is 2 seconds, the unit is ms, which means the interval of forced consumption when the batch consumption does not meet the maximum number of items
initial initial consumer location
The location where the new consumer group connects to the cluster consumer for the first time
newest: latest location
oldest: oldest position
maxWaitTime The maximum waiting time for a single consumption pull request The maximum waiting time will only wait when there is no latest data, the default 1s
maxRetry failed max retries After exceeding, directly confirm and continue to consume the next message, default 0: no limit, keep retrying, negative number means no retry, directly confirm and continue to consume the next message, positive number means if there is always an error, the final number of executions is maxRetry+1
netMaxOpenRequests Maximum number of simultaneous requests Network layer configuration, maximum number of simultaneous requests, default 5
maxProcessingTime The maximum request time for a single consumer Unit ms, default 1000ms
netDailTimeout connect timeout Network layer configuration, connect timeout, unit ms, default 30000ms
netReadTimeout read timeout Network layer configuration, read timeout, unit ms, default 30000ms
netWriteTimeout write timeout Network layer configuration, write timeout, unit ms, default 30000ms
groupSessionTimeout consumer group session timeout Unit ms, default 10000ms
groupRebalanceTimeout consumer rebalance timeoue Unit ms, default 60000ms
mechanism Encryption method when using password optional value SCRAM-SHA-512/SCRAM-SHA-256
user user
password password
retryInterval retry interval Unit ms, default3000ms
isolationLevel isolation level optional value ReadUncommitted/ReadCommitted
trpcMeta transfer trpc meta, read sarama header to set trpc meta true means enable transfer, false means not transfer, the default is false
discover The discovery type used for service discovery For example:polaris
namespace service namespace For example:Development

Q&A

  • Q1: How to write the service.name of the consumer
  • A1: If there is only one consumer service, The name can be arbitrary (the trpc framework will register the implementation to all services in the server by default), see complete examples/consumer
server:                                                                  
  service:                                                               
    - name: trpc.anyname.will.works                                
      address: 127.0.0.1:9092?topics=test_topic&group=uzuki_consumer 
      protocol: kafka                                                    
      timeout: 1000                                                      
s := trpc.NewServer()
kafka.RegisterKafkaHandlerService(s, handle) 

If there are multiple services, you need to specify the same name as the configuration file when registering, see complete examples/consumer_with_mulit_service

server:                                                                   
  service:                                                                
    - name: trpc.databaseDemo.kafka.consumer1                             
      address: 127.0.0.1:9092?topics=test_topic&group=uzuki_consumer1 
      protocol: kafka                                                     
      timeout: 1000                                                       
    - name: trpc.databaseDemo.kafka.consumer2                             
      address: 127.0.0.1:9092?topics=test_topic&group=uzuki_consumer2 
      protocol: kafka                                                     
      timeout: 1000     
s := trpc.NewServer()    
kafka.RegisterKafkaConsumerService(s.Service("trpc.databaseDemo.kafka.consumer1"), &Consumer{})
kafka.RegisterKafkaConsumerService(s.Service("trpc.databaseDemo.kafka.consumer2"), &Consumer{})
  • Q2: What happens if handle returns non-nil when consuming

  • A2: It will resume consumption after sleeping for 3s. It is not recommended to do so. If it fails, the business should do the retry logic

  • Q3: When using ckafka to produce messages, error hint

err:type:framework, code:141, msg:kafka client transport SendMessage: kafka server: Message contents does not match its CRC.
  • A3: By default, gzip compression is enabled, and it is preferred to add parameters to the targetcompression=none
target: kafka://ip1:port1,ip2:port2?clientid=xxx&compression=none
  • Q4: When using ckafka to consume messages, error hint
kafka server transport: consume fail:kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
  • A4: First check whether the brokers are reachable, and then check the supported kafka client version, try to add parameters in the configuration file address, for exampleversion=0.10.2.0
address: ip1:port1,ip2:port2?topics=topic1,topic2&group=xxx&version=0.10.2.0 
  • Q5: When consuming messages, error hint
kafka server transport: consume fail:kafka server: The provider group protocol type is incompatible with the other members.
  • A5: The client regrouping strategy of the same consumer group is different, the parameter strategy can be modified, optional value:sticky(default),range,roundrobin
address: ip1:port1,ip2:port2?topics=topic1,topic2&group=xxx&strategy=range
  • Q6: The same user needs to be ordered in production, how to configure
  • A6: The client adds the parameter partitioner, optional random (default), roundrobin, hash (partitioned by key)
target: kafka://ip1:port1,ip2:port2?clientid=xxx&partitioner=hash
  • Q7: How to produce asynchronously
  • A7: The client config adds the parameter async=1
target: kafka://ip1:port1,ip2:port2?clientid=xxx&async=1
  • Q8: Having error with Polaris routing "Polaris-1006(ErrCodeServerError)" "not found service"
  • A8: Make sure that the service.name in the trpc configuration is trpc.${app}.${server}.AnyUniqNameWillWork instead of trpc.app.server.AnyUniqNameWillWork, a placeholder must be used Error site:
type:framework, code:131, msg:client Select: get source service route rule err: Polaris-1006(ErrCodeServerError): Response from {ID: 2079470528, Service: {ServiceKey: {namespace: "Polaris", service: "polaris.discover"}, ClusterType: discover}, Address: 127.0.0.1:8081}: not found service
  • Q9: How to use account and password
  • A9: The encryption method, username and password need to be configured in the connection parameters For example :
address: ip1:port1,ip2:port2?topics=topic1,topic2&mechanism=SCRAM-SHA-512&user={user}&password={password}
  • Q10: How to use asynchronous production write data callback
  • A10: It is necessary to rewrite the success/failure callback function of asynchronous production write data in the code, for example
import ( 
  // ... 
  "trpc.group/trpc-go/trpc-database/kafka"
  // ...
)

func init() {
  // Override the default asynchronous production write data error callback
  kafka.AsyncProducerErrorCallback = func(err error, topic string, key, value []byte, headers []sarama.RecordHeader) {
    // do something if async producer occurred error.
  }

  // Override the default asynchronous production write data success callback
  kafka.AsyncProducerSuccCallback = funcfunc(topic string, key, value []byte, headers []sarama.RecordHeader) {
    // do something if async producer succeed.
  }
}
  • Q11: How to inject custom configuration (remote configuration)
  • A11: Configure fake_address in trpc_go.yaml, and then inject it with kafka.RegisterAddrConfig method trpc_go.yaml The configuration is as follows
address: fake_address 

Before the service starts, inject custom configuration

func main() {
  s := trpc.NewServer()
  // Use a custom addr, which needs to be injected before starting the server
  cfg := kafka.GetDefaultConfig()
  cfg.Brokers = []string{"127.0.0.1:9092"}
  cfg.Topics = []string{"test_topic"}
  kafka.RegisterAddrConfig("fake_address", cfg)
  kafka.RegisterKafkaConsumerService(s, &Consumer{})

  s.Serve()
}
  • Q12: How to transfer trpc metadata
  • A12: Producers and consumers add the parameter trpcMeta=true, and transparent transmission is not enabled by default; if the original production interface has already set headers, you need to pay attention to header conflicts, duplication, overwriting and other issues;

producer

target: kafka://127.0.0.1:9092?clientid=test_producer&partitioner=hash&topic=test_topic&trpcMeta=true

consumer

address: 127.0.0.1:9092?topics=test_topic&group=test_group&trpcMeta=true     #kafka consumer dsn
  • Q13: How to get the context information of the underlying sarama
  • A13: The underlying sarama ConsumerGroupSession and ConsumerGroupClaim can be obtained through kafka.GetRawSaramaContext. However, the exposure of these two interfaces here is only for the convenience of users to monitor logs, and only the read method should be used. Calling any write method is undefined behavior here, which may cause unknown results
// RawSaramaContext deposit sarama ConsumerGroupSession and ConsumerGroupClaim
// This structure is exported for the convenience of users to implement monitoring, the content provided is only for reading, calling any write method is an undefined behavior
type RawSaramaContext struct {
    Session  sarama.ConsumerGroupSession
    Claim sarama.ConsumerGroupClaim
}

use case

func example(ctx context.Context){
    if rawContext, ok := kafka.GetRawSaramaContext(ctx); ok {
        log.Infof("InitialOffset:%d", rawContext.Claim.InitialOffset())
    }
}

  • Q14: How to obtain all nodes in server-side Polaris addressing
  • A14: Enable service_router.need_return_all_nodes=true in trpc_go.yaml
plugins:     # plugin configuration
  selector:
    polaris:
      service_router:
        need_return_all_nodes: true

Documentation

Overview

Package kafka encapsulated from github.com/Shopify/sarama Producer sending through trpc.Client Implement Consumer logic through trpc.Service

Package kafka security verification class

Index

Constants

View Source
const DefaultClientID = "trpcgo"

DefaultClientID default client id

View Source
const SASLTypeSSL = "SASL_SSL"

SASLTypeSSL represents the SASL_SSL security protocol.

Variables

View Source
var (
	DefaultServerCodec = &ServerCodec{}
	DefaultClientCodec = &ClientCodec{}
)

default codec

View Source
var AsyncProducerErrorCallback = func(err error, topic string, key, value []byte, headers []sarama.RecordHeader) {
	log.Errorf("asyncProduce failed. topic:%s, key:%s, value:%s. err:%v", topic, key,
		value, err)
}

AsyncProducerErrorCallback asynchronous production failure callback, the default implementation only prints the error log, the user can rewrite the callback function to achieve sending error capture.

View Source
var AsyncProducerSuccCallback = func(topic string, key, value []byte, headers []sarama.RecordHeader) {}

AsyncProducerSuccCallback asynchronous production success callback, no processing is done by default, the user can rewrite the callback function to achieve sending success capture.

View Source
var BatchConsumerServiceDesc = server.ServiceDesc{
	ServiceName: "trpc.kafka.consumer.service",
	HandlerType: ((*BatchConsumer)(nil)),
	Methods: []server.Method{
		{
			Name: "/trpc.kafka.consumer.service/handle",
			Func: BatchConsumerHandle,
		},
	},
}

BatchConsumerServiceDesc descriptor for server.RegisterService

View Source
var ContinueWithoutCommitError = &errs.Error{
	Type: errs.ErrorTypeBusiness,
	Code: errs.RetUnknown,
	Msg:  "Error:Continue to consume message without committing ack",
}

ContinueWithoutCommitError whether to continue to consume messages without commit ack Scenes: When producing a message, a message body may exceed the limit of Kafka, so the original message body will be split into multiple byte packets, encapsulated into a Kafka message body, and delivered. Then, when consuming messages, you need to wait for all subcontracted messages to be consumed before starting business logic processing. When the consumer's Handle method or msg.ServerRspErr returns this error, It means that you want to continue to consume messages without commit ack and not treat them as errors. specific scene reference issue:https://git.woa.com/trpc-go/trpc-database/issues/251

View Source
var DefaultClientTransport = NewClientTransport()

DefaultClientTransport default client kafka transport

View Source
var DefaultServerTransport = NewServerTransport()

DefaultServerTransport ServerTransport default implement

View Source
var KafkaConsumerServiceDesc = server.ServiceDesc{
	ServiceName: "trpc.kafka.consumer.service",
	HandlerType: ((*KafkaConsumer)(nil)),
	Methods: []server.Method{{
		Name: "/trpc.kafka.consumer.service/handle",
		Func: KafkaConsumerHandle,
	}},
}

KafkaConsumerServiceDesc descriptor

View Source
var NewClientProxy = func(name string, opts ...client.Option) Client {
	c := &kafkaCli{
		ServiceName: name,
		Client:      client.DefaultClient,
	}

	c.opts = make([]client.Option, 0, len(opts)+2)
	c.opts = append(c.opts, client.WithProtocol("kafka"), client.WithDisableServiceRouter())
	c.opts = append(c.opts, opts...)
	return c
}

NewClientProxy create a new kafka backend request proxy. The required parameter kafka service name: trpc.kafka.producer.service

View Source
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }

SHA256 hash protocol

View Source
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }

SHA512 hash protocol

View Source
var Timeout = 2 * time.Second

Timeout is the global timeout configuration of Kafka, the default is 2s, and users can modify it if necessary.

Functions

func BatchConsumerHandle

func BatchConsumerHandle(svr interface{}, ctx context.Context, f server.FilterFunc) (interface{}, error)

BatchConsumerHandle batch consumer service handler wrapper

func IsCWCError

func IsCWCError(err error) bool

IsCWCError Check if it is a ContinueWithoutCommitError error CWC:Continue Without Commit

func KafkaConsumerHandle

func KafkaConsumerHandle(svr interface{}, ctx context.Context, f server.FilterFunc) (interface{}, error)

KafkaConsumerHandle consumer service handler wrapper

func NewClientTransport

func NewClientTransport(opt ...transport.ClientTransportOption) transport.ClientTransport

NewClientTransport build kafka transport

func NewServerTransport

func NewServerTransport(opt ...transport.ServerTransportOption) transport.ServerTransport

NewServerTransport build serverTransport

func RegisterAddrConfig

func RegisterAddrConfig(address string, cfg *UserConfig)

RegisterAddrConfig register user-defined information, address is the corresponding address in the configuration file

func RegisterBatchHandlerService

func RegisterBatchHandlerService(
	s server.Service,
	handle func(ctx context.Context, msgArray []*sarama.ConsumerMessage) error,
)

RegisterBatchHandlerService register consumer function

func RegisterKafkaConsumerService

func RegisterKafkaConsumerService(s server.Service, svr KafkaConsumer)

RegisterKafkaConsumerService register service

func RegisterKafkaHandlerService

func RegisterKafkaHandlerService(s server.Service,
	handle func(ctx context.Context, msg *sarama.ConsumerMessage) error,
)

RegisterKafkaHandlerService register handle

Types

type BatchConsumer

type BatchConsumer interface {
	// Handle callback function when a message is received
	Handle(ctx context.Context, msgArray []*sarama.ConsumerMessage) error
}

BatchConsumer batch consumer

type Client

type Client interface {
	Produce(ctx context.Context, key, value []byte,
		headers ...sarama.RecordHeader) error
	SendMessage(ctx context.Context, topic string, key, value []byte,
		headers ...sarama.RecordHeader) (partition int32, offset int64, err error)
	AsyncSendMessage(ctx context.Context, topic string, key, value []byte,
		headers ...sarama.RecordHeader) (err error)
	// SendSaramaMessage produce sarama native messages directly
	SendSaramaMessage(ctx context.Context, sMsg sarama.ProducerMessage) (partition int32, offset int64, err error)
}

Client kafka interface

type ClientCodec

type ClientCodec struct{}

ClientCodec decode kafka client requests

func (*ClientCodec) Decode

func (c *ClientCodec) Decode(kafkaMsg codec.Msg, _ []byte) ([]byte, error)

Decode parse the metadata in the kafka client return packet

func (*ClientCodec) Encode

func (c *ClientCodec) Encode(kafkaMsg codec.Msg, _ []byte) ([]byte, error)

Encode set metadata for kafka client requests

type ClientConversation

type ClientConversation interface {
	Step(challenge string) (response string, err error)
	Done() bool
}

ClientConversation implements the client-side of an authentication conversation with a server. go:generate mockgen -destination=./mockkafka/scram_mock.go -package=mockkafka . ClientConversation

type ClientTransport

type ClientTransport struct {
	// contains filtered or unexported fields
}

ClientTransport implements the trpc.ClientTransport interface and encapsulate the producer

func (*ClientTransport) AsyncProduce

func (ct *ClientTransport) AsyncProduce(producer sarama.AsyncProducer)

AsyncProduce produce and process captured messages asynchronously

func (*ClientTransport) GetAsyncProducer

func (ct *ClientTransport) GetAsyncProducer(address string, timeout time.Duration) (*Producer, error)

GetAsyncProducer get an asynchronous producer and start an asynchronous coroutine to process production data and messages

func (*ClientTransport) GetProducer

func (ct *ClientTransport) GetProducer(address string, timeout time.Duration) (*Producer, error)

GetProducer get producer logic

func (*ClientTransport) RoundTrip

func (ct *ClientTransport) RoundTrip(
	ctx context.Context, _ []byte, callOpts ...transport.RoundTripOption,
) ([]byte, error)

RoundTrip send and receive kafka packets, return the kafka response and put it in ctx, there is no need to return rspbuf

type Config

type Config struct {
	MaxRequestSize  int32 `yaml:"max_request_size"`  // global maximum request body size
	MaxResponseSize int32 `yaml:"max_response_size"` // global maximum response body size
	RewriteLog      bool  `yaml:"rewrite_log"`       // whether to rewrite logs to log
}

Config is kafka proxy configuration

type KafkaConsumer

type KafkaConsumer interface {
	Handle(ctx context.Context, msg *sarama.ConsumerMessage) error
}

KafkaConsumer consumer interface

type LSCRAMClient

type LSCRAMClient struct {
	*scram.Client                 // client
	ClientConversation            // client session layer
	scram.HashGeneratorFcn        // hash value generating function
	User                   string // user
	Password               string // password
	Mechanism              string // encryption protocol type
	Protocol               string // encryption protocol
}

LSCRAMClient scram authentication client configuration

func (*LSCRAMClient) Begin

func (s *LSCRAMClient) Begin(userName, password, authzID string) (err error)

Begin SCRAM authentication start interface

func (*LSCRAMClient) Done

func (s *LSCRAMClient) Done() bool

Done SCRAM authentication end interface

func (*LSCRAMClient) Parse

func (s *LSCRAMClient) Parse(vals []string)

Parse SCRAM local analysis

func (*LSCRAMClient) Step

func (s *LSCRAMClient) Step(challenge string) (string, error)

Step SCRAM authentication step interface

type LogReWriter

type LogReWriter struct{}

LogReWriter redirect log

func (LogReWriter) Print

func (LogReWriter) Print(v ...interface{})

Print sarama.Logger interface

func (LogReWriter) Printf

func (LogReWriter) Printf(format string, v ...interface{})

Printf sarama.Logger interface

func (LogReWriter) Println

func (LogReWriter) Println(v ...interface{})

Println sarama.Logger interface

type Plugin

type Plugin struct{}

Plugin the default initialization of the plugin is used to load the kafka proxy connection parameter configuration

func (*Plugin) Setup

func (k *Plugin) Setup(name string, configDesc plugin.Decoder) error

Setup plugin initialization

func (*Plugin) Type

func (k *Plugin) Type() string

Type plugin type

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer encapsulation Producer information

type RateLimitConfig

type RateLimitConfig struct {
	Rate  float64 // token production rate
	Burst int     // token bucket capacity
}

RateLimitConfig is limit config

type RawSaramaContext

type RawSaramaContext struct {
	Session sarama.ConsumerGroupSession
	Claim   sarama.ConsumerGroupClaim
}

RawSaramaContext set sarama ConsumerGroupSession and ConsumerGroupClaim This structure is exported for the convenience of users to implement monitoring, the content provided is only for reading, calling any write method is an undefined behavior

func GetRawSaramaContext

func GetRawSaramaContext(ctx context.Context) (*RawSaramaContext, bool)

GetRawSaramaContext get sarama raw context information, include ConsumerGroupSession and ConsumerGroupClaim The retrieved context should only use read methods, using any write methods is undefined behavior

type Request

type Request struct {
	Topic     string
	Key       []byte
	Value     []byte
	Async     bool // to produce asynchronously or not
	Partition int32
	Headers   []sarama.RecordHeader // Deprecated: use Message.Headers instead
	Message   sarama.ProducerMessage
}

Request kafka request body

type Response

type Response struct {
	Partition int32
	Offset    int64
}

Response kafka response body

type ServerCodec

type ServerCodec struct{}

ServerCodec Server codec

func (*ServerCodec) Decode

func (s *ServerCodec) Decode(_ codec.Msg, _ []byte) ([]byte, error)

Decode when the server receives the binary request data from the client and unpacks it into reqbody, the service handler will automatically create a new empty msg as the initial general message body

func (*ServerCodec) Encode

func (s *ServerCodec) Encode(_ codec.Msg, _ []byte) ([]byte, error)

Encode the server packs rspbody into a binary and sends it back to the client

type ServerTransport

type ServerTransport struct {
	// contains filtered or unexported fields
}

ServerTransport kafka consumer transport

func (*ServerTransport) ListenAndServe

func (s *ServerTransport) ListenAndServe(ctx context.Context, opts ...transport.ListenServeOption) (err error)

ListenAndServe start the listener, and return an error if the listener fails

type UserConfig

type UserConfig struct {
	Brokers         []string // cluster address
	Topics          []string // for consumers
	Topic           string   // for producers
	Group           string   // consumer group
	Async           int      // Whether to produce asynchronously, 0 is synchronous and 1 is asynchronous
	ClientID        string   // Client ID
	Compression     sarama.CompressionCodec
	Version         sarama.KafkaVersion
	Strategy        sarama.BalanceStrategy
	Partitioner     func(topic string) sarama.Partitioner
	Initial         int64 // The location where the new consumer group first connects to the cluster consumer
	FetchDefault    int
	FetchMax        int
	MaxWaitTime     time.Duration
	RequiredAcks    sarama.RequiredAcks
	ReturnSuccesses bool
	Timeout         time.Duration
	// When producing asynchronously
	MaxMessageBytes   int           // the maximum number of bytes in the local cache queue
	FlushMessages     int           // the maximum number of messages sent by the local cache broker
	FlushMaxMessages  int           // the maximum number of messages in the local cache queue
	FlushBytes        int           // the maximum number of bytes sent by the local cache broker
	FlushFrequency    time.Duration // In asynchronous production, the maximum time sent by the local cache broker
	BatchConsumeCount int           // Maximum number of messages for batch consumption
	BatchFlush        time.Duration // Batch consumption takes effect
	ScramClient       *LSCRAMClient // LSCRAM safety certification
	// The maximum number of retries on failure,
	// the default is 0: retry all the time <0 means no retry
	MaxRetry               int
	NetMaxOpenRequests     int // Maximum number of requests
	MaxProcessingTime      time.Duration
	NetDailTimeout         time.Duration
	NetReadTimeout         time.Duration
	NetWriteTimeout        time.Duration
	GroupSessionTimeout    time.Duration
	GroupRebalanceTimeout  time.Duration
	GroupRebalanceRetryMax int
	IsolationLevel         sarama.IsolationLevel
	RetryInterval          time.Duration // Retry Interval Works with MaxRetry
	ProducerRetry          struct {
		Max           int           // Maximum number of retries
		RetryInterval time.Duration // RetryInterval retry interval
	}
	TrpcMeta   bool
	Idempotent bool // If enabled, the producer will ensure that exactly one copy of each message is written.

	RateLimitConfig *RateLimitConfig // token bucket limit configuration
	// contains filtered or unexported fields
}

UserConfig configuration parsed from address

func GetDefaultConfig

func GetDefaultConfig() *UserConfig

GetDefaultConfig Get the default configuration

func ParseAddress

func ParseAddress(address string) (*UserConfig, error)

ParseAddress address format ip1:port1,ip2:port2?clientid=xx&topics=topic1,topic2&group=xxx&compression=gzip

Directories

Path Synopsis
examples
batchconsumer
Package main is the main package.
Package main is the main package.
consumer
Package main is the main package.
Package main is the main package.
consumer_with_mulit_service
Package main is the main package.
Package main is the main package.
producer
Package main is the main package.
Package main is the main package.
producer_ex
Package main is the main package.
Package main is the main package.
Package mockkafka is a generated GoMock package.
Package mockkafka is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL