kafsar

package
v0.0.0-...-7abe817 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2023 License: Apache-2.0 Imports: 34 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EmptyMemberId = ""
)

Variables

This section is empty.

Functions

func ConvertMsgId

func ConvertMsgId(messageId pulsar.MessageID) int64

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func NewKafsar

func NewKafsar(impl Server, config *Config) (*Broker, error)

func SetupKafsar

func SetupKafsar() (*Broker, int)

func (*Broker) AuthGroupTopic

func (b *Broker) AuthGroupTopic(topic, groupId string) bool

func (*Broker) Close

func (b *Broker) Close()

func (*Broker) Disconnect

func (b *Broker) Disconnect(addr net.Addr)

func (*Broker) Fetch

func (b *Broker) Fetch(addr net.Addr, req *codec.FetchReq) ([]*codec.FetchTopicResp, error)

func (*Broker) FetchPartition

func (b *Broker) FetchPartition(addr net.Addr, kafkaTopic, clientID string, req *codec.FetchPartitionReq, maxBytes int, minBytes int, maxWaitMs int, span LocalSpan) *codec.FetchPartitionResp

FetchPartition visible for testing

func (*Broker) GetOffsetManager

func (b *Broker) GetOffsetManager() OffsetManager

func (*Broker) GroupJoin

func (b *Broker) GroupJoin(addr net.Addr, req *codec.JoinGroupReq) (*codec.JoinGroupResp, error)

func (*Broker) GroupLeave

func (b *Broker) GroupLeave(addr net.Addr, req *codec.LeaveGroupReq) (*codec.LeaveGroupResp, error)

func (*Broker) GroupSync

func (b *Broker) GroupSync(addr net.Addr, req *codec.SyncGroupReq) (*codec.SyncGroupResp, error)

func (*Broker) HeartBeat

func (b *Broker) HeartBeat(addr net.Addr, req codec.HeartbeatReq) *codec.HeartbeatResp

func (*Broker) OffsetCommitPartition

func (b *Broker) OffsetCommitPartition(addr net.Addr, kafkaTopic, clientID string, req *codec.OffsetCommitPartitionReq) (*codec.OffsetCommitPartitionResp, error)

func (*Broker) OffsetFetch

func (b *Broker) OffsetFetch(addr net.Addr, topic, clientID, groupID string, req *codec.OffsetFetchPartitionReq) (*codec.OffsetFetchPartitionResp, error)

func (*Broker) OffsetLeaderEpoch

func (*Broker) OffsetListPartition

func (b *Broker) OffsetListPartition(addr net.Addr, kafkaTopic, clientID string, req *codec.ListOffsetsPartition) (*codec.ListOffsetsPartitionResp, error)

func (*Broker) PartitionNum

func (b *Broker) PartitionNum(addr net.Addr, kafkaTopic string) (int, error)

func (*Broker) Produce

func (b *Broker) Produce(addr net.Addr, kafkaTopic string, partition int, req *codec.ProducePartitionReq) (*codec.ProducePartitionResp, error)

func (*Broker) Run

func (b *Broker) Run() error

func (*Broker) SaslAuth

func (b *Broker) SaslAuth(addr net.Addr, req codec.SaslAuthenticateReq) (bool, codec.ErrorCode)

func (*Broker) SaslAuthConsumerGroup

func (b *Broker) SaslAuthConsumerGroup(addr net.Addr, req codec.SaslAuthenticateReq, consumerGroup string) (bool, codec.ErrorCode)

func (*Broker) SaslAuthTopic

func (b *Broker) SaslAuthTopic(addr net.Addr, req codec.SaslAuthenticateReq, topic, permissionType string) (bool, codec.ErrorCode)

func (*Broker) TopicList

func (b *Broker) TopicList(addr net.Addr) ([]string, error)

type Config

type Config struct {
	PulsarConfig PulsarConfig
	KafsarConfig KafsarConfig
	TraceConfig  NoErrorTracer
}

type ConsumerMetadata

type ConsumerMetadata struct {
	// contains filtered or unexported fields
}

type Group

type Group struct {
	// contains filtered or unexported fields
}

type GroupCoordinator

type GroupCoordinator interface {
	HandleJoinGroup(username, groupId, memberId, clientId, protocolType string, sessionTimeoutMs int,
		protocols []*codec.GroupProtocol) (*codec.JoinGroupResp, error)

	HandleSyncGroup(username, groupId, memberId string, generation int,
		groupAssignments []*codec.GroupAssignment) (*codec.SyncGroupResp, error)

	HandleLeaveGroup(username, groupId string, members []*codec.LeaveGroupMember) (*codec.LeaveGroupResp, error)

	HandleHeartBeat(username, groupId, memberId string) *codec.HeartbeatResp

	GetGroup(username, groupId string) (*Group, error)
}

type GroupCoordinatorCluster

type GroupCoordinatorCluster struct {
}

func NewGroupCoordinatorCluster

func NewGroupCoordinatorCluster() *GroupCoordinatorCluster

func (*GroupCoordinatorCluster) GetGroup

func (gcc *GroupCoordinatorCluster) GetGroup(username, groupId string) (*Group, error)

func (*GroupCoordinatorCluster) HandleHeartBeat

func (gcc *GroupCoordinatorCluster) HandleHeartBeat(username, groupId, memberId string) *codec.HeartbeatResp

func (*GroupCoordinatorCluster) HandleJoinGroup

func (gcc *GroupCoordinatorCluster) HandleJoinGroup(username, groupId, memberId, clientId, protocolType string, sessionTimeoutMs int,
	protocols []*codec.GroupProtocol) (*codec.JoinGroupResp, error)

func (*GroupCoordinatorCluster) HandleLeaveGroup

func (gcc *GroupCoordinatorCluster) HandleLeaveGroup(username, groupId string,
	members []*codec.LeaveGroupMember) (*codec.LeaveGroupResp, error)

func (*GroupCoordinatorCluster) HandleSyncGroup

func (gcc *GroupCoordinatorCluster) HandleSyncGroup(username, groupId, memberId string, generation int,
	groupAssignments []*codec.GroupAssignment) (*codec.SyncGroupResp, error)

type GroupCoordinatorStandalone

type GroupCoordinatorStandalone struct {
	// contains filtered or unexported fields
}

func NewGroupCoordinatorStandalone

func NewGroupCoordinatorStandalone(pulsarConfig PulsarConfig, kafsarConfig KafsarConfig, pulsarClient pulsar.Client) *GroupCoordinatorStandalone

func (*GroupCoordinatorStandalone) GetGroup

func (g *GroupCoordinatorStandalone) GetGroup(username, groupId string) (*Group, error)

func (*GroupCoordinatorStandalone) HandleHeartBeat

func (g *GroupCoordinatorStandalone) HandleHeartBeat(username, groupId, memberId string) *codec.HeartbeatResp

func (*GroupCoordinatorStandalone) HandleJoinGroup

func (g *GroupCoordinatorStandalone) HandleJoinGroup(username, groupId, memberId, clientId, protocolType string, sessionTimeoutMs int,
	protocols []*codec.GroupProtocol) (*codec.JoinGroupResp, error)

func (*GroupCoordinatorStandalone) HandleLeaveGroup

func (g *GroupCoordinatorStandalone) HandleLeaveGroup(username, groupId string,
	members []*codec.LeaveGroupMember) (*codec.LeaveGroupResp, error)

func (*GroupCoordinatorStandalone) HandleSyncGroup

func (g *GroupCoordinatorStandalone) HandleSyncGroup(username, groupId, memberId string, generation int,
	groupAssignments []*codec.GroupAssignment) (*codec.SyncGroupResp, error)

type GroupCoordinatorType

type GroupCoordinatorType int
const (
	Standalone GroupCoordinatorType = 0 + iota
	Cluster
)

type GroupStatus

type GroupStatus int
const (
	PreparingRebalance GroupStatus = 1 + iota
	CompletingRebalance
	Stable
	Dead
	Empty
)

type KafsarConfig

type KafsarConfig struct {
	// network config
	GnetConfig kgnet.GnetServerConfig
	NeedSasl   bool
	MaxConn    int32

	// Kafka protocol config
	ClusterId     string
	AdvertiseHost string
	AdvertisePort int

	MaxProducerRecordSize int
	MaxBatchSize          int

	MaxConsumersPerGroup     int
	GroupMinSessionTimeoutMs int
	GroupMaxSessionTimeoutMs int
	ConsumerReceiveQueueSize int
	MaxFetchRecord           int
	MinFetchWaitMs           int
	MaxFetchWaitMs           int
	ContinuousOffset         bool
	// PulsarTenant use for kafsar internal
	PulsarTenant string
	// PulsarNamespace use for kafsar internal
	PulsarNamespace string
	// OffsetTopic use to store kafka offset
	OffsetTopic string
	// GroupCoordinatorType enum: Standalone, Cluster; default Standalone
	GroupCoordinatorType GroupCoordinatorType
	// InitialDelayedJoinMs
	InitialDelayedJoinMs int
	// RebalanceTickMs
	RebalanceTickMs int
}

type LocalSpan

type LocalSpan struct {
	// contains filtered or unexported fields
}

type MemberInfo

type MemberInfo struct {
	// contains filtered or unexported fields
}

type MessageIdPair

type MessageIdPair struct {
	MessageId pulsar.MessageID
	Offset    int64
}

type NoErrorTracer

type NoErrorTracer interface {
	// IsDisabled check whether tracer disabled
	IsDisabled() bool
	// NewProvider create a trace driver
	NewProvider()
	// NewSpan create span
	NewSpan(ctx context.Context, operateName string, logs ...string) LocalSpan
	// SetAttribute set attribute from localSpan
	SetAttribute(span LocalSpan, k, v string)
	// NewSubSpan create child span from localSpan
	NewSubSpan(span LocalSpan, operateName string, logs ...string) LocalSpan
	// EndSpan close span
	EndSpan(span LocalSpan, logs ...string)
}

type OffsetManager

type OffsetManager interface {
	Start() chan bool

	CommitOffset(username, kafkaTopic, groupId string, partition int, pair MessageIdPair) error

	AcquireOffset(username, kafkaTopic, groupId string, partition int) (MessageIdPair, bool)

	RemoveOffset(username, kafkaTopic, groupId string, partition int) bool

	GenerateKey(username, kafkaTopic, groupId string, partition int) string

	RemoveOffsetWithKey(key string)

	GetOffsetMap() map[string]MessageIdPair

	Close()
}

func NewOffsetManager

func NewOffsetManager(client pulsar.Client, config KafsarConfig, pulsarHttpAddr string) (OffsetManager, error)

type OffsetManagerImpl

type OffsetManagerImpl struct {
	// contains filtered or unexported fields
}

func (*OffsetManagerImpl) AcquireOffset

func (o *OffsetManagerImpl) AcquireOffset(username, kafkaTopic, groupId string, partition int) (MessageIdPair, bool)

func (*OffsetManagerImpl) Close

func (o *OffsetManagerImpl) Close()

func (*OffsetManagerImpl) CommitOffset

func (o *OffsetManagerImpl) CommitOffset(username, kafkaTopic, groupId string, partition int, pair MessageIdPair) error

func (*OffsetManagerImpl) GenerateKey

func (o *OffsetManagerImpl) GenerateKey(username, kafkaTopic, groupId string, partition int) string

func (*OffsetManagerImpl) GetOffsetMap

func (o *OffsetManagerImpl) GetOffsetMap() map[string]MessageIdPair

func (*OffsetManagerImpl) RemoveOffset

func (o *OffsetManagerImpl) RemoveOffset(username, kafkaTopic, groupId string, partition int) bool

func (*OffsetManagerImpl) RemoveOffsetWithKey

func (o *OffsetManagerImpl) RemoveOffsetWithKey(key string)

func (*OffsetManagerImpl) Start

func (o *OffsetManagerImpl) Start() chan bool

type OtelTracerConfig

type OtelTracerConfig TraceConfig

func (*OtelTracerConfig) EndSpan

func (ot *OtelTracerConfig) EndSpan(span LocalSpan, logs ...string)

func (*OtelTracerConfig) IsDisabled

func (ot *OtelTracerConfig) IsDisabled() bool

func (*OtelTracerConfig) NewProvider

func (ot *OtelTracerConfig) NewProvider()

func (*OtelTracerConfig) NewSpan

func (ot *OtelTracerConfig) NewSpan(ctx context.Context, opName string, logs ...string) LocalSpan

func (*OtelTracerConfig) NewSubSpan

func (ot *OtelTracerConfig) NewSubSpan(span LocalSpan, opName string, logs ...string) LocalSpan

func (*OtelTracerConfig) SetAttribute

func (ot *OtelTracerConfig) SetAttribute(span LocalSpan, k, v string)

type PulsarConfig

type PulsarConfig struct {
	Host     string
	HttpPort int
	TcpPort  int
}

type Server

type Server interface {
	Auth(username string, password string, clientId string) (bool, error)

	AuthTopic(username string, password, clientId, topic, permissionType string) (bool, error)

	AuthTopicGroup(username string, password, clientId, consumerGroup string) (bool, error)

	AuthGroupTopic(topic, groupId string) bool

	SubscriptionName(groupId string) (string, error)

	// PulsarTopic the corresponding topic in pulsar
	PulsarTopic(username, topic string) (string, error)

	PartitionNum(username, topic string) (int, error)

	ListTopic(username string) ([]string, error)

	HasFlowQuota(username, topic string) bool
}

type ServerControl

type ServerControl struct {
	// contains filtered or unexported fields
}

func (*ServerControl) DisConnect

func (s *ServerControl) DisConnect(addr net.Addr) error

type SkywalkingTracerConfig

type SkywalkingTracerConfig TraceConfig

func (*SkywalkingTracerConfig) EndSpan

func (st *SkywalkingTracerConfig) EndSpan(span LocalSpan, logs ...string)

func (*SkywalkingTracerConfig) IsDisabled

func (st *SkywalkingTracerConfig) IsDisabled() bool

func (*SkywalkingTracerConfig) NewProvider

func (st *SkywalkingTracerConfig) NewProvider()

func (*SkywalkingTracerConfig) NewSpan

func (st *SkywalkingTracerConfig) NewSpan(ctx context.Context, operateName string, logs ...string) LocalSpan

func (*SkywalkingTracerConfig) NewSubSpan

func (st *SkywalkingTracerConfig) NewSubSpan(span LocalSpan, operateName string, logs ...string) LocalSpan

func (*SkywalkingTracerConfig) SetAttribute

func (st *SkywalkingTracerConfig) SetAttribute(span LocalSpan, k, v string)

type TraceConfig

type TraceConfig struct {
	DisableTracing bool
	Host           string
	Port           int
	SampleRate     float64
}

type TraceType

type TraceType int
const (
	TraceTypeSkywalking TraceType = iota
	TraceTypeOtel
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL