network

package
v0.0.0-...-7abe817 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2023 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ALL_PERMISSION_TYPE      = "ALL"
	PRODUCER_PERMISSION_TYPE = "W"
	CONSUMER_PERMISSION_TYPE = "R"
)

Functions

This section is empty.

Types

type KafkaProtocolConfig

type KafkaProtocolConfig struct {
	ClusterId     string
	NodeId        int32
	AdvertiseHost string
	AdvertisePort int
	NeedSasl      bool
	MaxConn       int32
}

type KafsarServer

type KafsarServer interface {
	PartitionNum(addr net.Addr, topic string) (int, error)

	TopicList(addr net.Addr) ([]string, error)

	// Fetch method called this already authed
	Fetch(addr net.Addr, req *codec.FetchReq) ([]*codec.FetchTopicResp, error)

	// GroupJoin method called this already authed
	GroupJoin(addr net.Addr, req *codec.JoinGroupReq) (*codec.JoinGroupResp, error)

	// GroupLeave method called this already authed
	GroupLeave(addr net.Addr, req *codec.LeaveGroupReq) (*codec.LeaveGroupResp, error)

	// GroupSync method called this already authed
	GroupSync(addr net.Addr, req *codec.SyncGroupReq) (*codec.SyncGroupResp, error)

	// OffsetListPartition method called this already authed
	OffsetListPartition(addr net.Addr, topic, clientID string, req *codec.ListOffsetsPartition) (*codec.ListOffsetsPartitionResp, error)

	// OffsetCommitPartition method called this already authed
	OffsetCommitPartition(addr net.Addr, topic, clientID string, req *codec.OffsetCommitPartitionReq) (*codec.OffsetCommitPartitionResp, error)

	// OffsetFetch method called this already authed
	OffsetFetch(addr net.Addr, topic, clientID, groupID string, req *codec.OffsetFetchPartitionReq) (*codec.OffsetFetchPartitionResp, error)

	// OffsetLeaderEpoch method called this already authed
	OffsetLeaderEpoch(addr net.Addr, topic string, req *codec.OffsetLeaderEpochPartitionReq) (*codec.OffsetForLeaderEpochPartitionResp, error)

	// Produce method called this already authed
	Produce(addr net.Addr, topic string, partition int, req *codec.ProducePartitionReq) (*codec.ProducePartitionResp, error)

	SaslAuth(addr net.Addr, req codec.SaslAuthenticateReq) (bool, codec.ErrorCode)

	SaslAuthTopic(addr net.Addr, req codec.SaslAuthenticateReq, topic, permissionType string) (bool, codec.ErrorCode)

	AuthGroupTopic(topic, groupId string) bool

	SaslAuthConsumerGroup(addr net.Addr, req codec.SaslAuthenticateReq, consumerGroup string) (bool, codec.ErrorCode)

	HeartBeat(addr net.Addr, req codec.HeartbeatReq) *codec.HeartbeatResp

	Disconnect(addr net.Addr)
}

type Server

type Server struct {
	ConnMap sync.Map
	SaslMap sync.Map
	// contains filtered or unexported fields
}

func NewServer

func NewServer(config *kgnet.GnetServerConfig, kfkProtocolConfig *KafkaProtocolConfig, impl KafsarServer) (*Server, error)

func (*Server) ApiVersion

func (s *Server) ApiVersion(c gnet.Conn, req *codec.ApiReq) (*codec.ApiResp, gnet.Action)

func (*Server) AuthFailed

func (s *Server) AuthFailed() ([]byte, gnet.Action)

func (*Server) Authed

func (s *Server) Authed(context *ctx.NetworkContext) bool

func (*Server) Close

func (s *Server) Close(ctx context.Context) (err error)

func (*Server) ConnError

func (s *Server) ConnError(c gnet.Conn, err error)

func (*Server) Fetch

func (s *Server) Fetch(c gnet.Conn, req *codec.FetchReq) (*codec.FetchResp, gnet.Action)

func (*Server) FindCoordinator

func (s *Server) FindCoordinator(c gnet.Conn, req *codec.FindCoordinatorReq) (*codec.FindCoordinatorResp, gnet.Action)

func (*Server) Heartbeat

func (s *Server) Heartbeat(c gnet.Conn, req *codec.HeartbeatReq) (*codec.HeartbeatResp, gnet.Action)

func (*Server) InvalidKafkaPacket

func (s *Server) InvalidKafkaPacket(c gnet.Conn)

func (*Server) JoinGroup

func (s *Server) JoinGroup(c gnet.Conn, req *codec.JoinGroupReq) (*codec.JoinGroupResp, gnet.Action)

func (*Server) LeaveGroup

func (s *Server) LeaveGroup(c gnet.Conn, req *codec.LeaveGroupReq) (*codec.LeaveGroupResp, gnet.Action)

func (*Server) ListOffsets

func (s *Server) ListOffsets(c gnet.Conn, req *codec.ListOffsetsReq) (*codec.ListOffsetsResp, gnet.Action)

func (*Server) ListOffsetsVersion

func (s *Server) ListOffsetsVersion(ctx *ctx.NetworkContext, req *codec.ListOffsetsReq) (*codec.ListOffsetsResp, gnet.Action)

func (*Server) Metadata

func (s *Server) Metadata(c gnet.Conn, req *codec.MetadataReq) (*codec.MetadataResp, gnet.Action)

func (*Server) OffsetCommit

func (s *Server) OffsetCommit(c gnet.Conn, req *codec.OffsetCommitReq) (*codec.OffsetCommitResp, gnet.Action)

func (*Server) OffsetCommitVersion

func (s *Server) OffsetCommitVersion(ctx *ctx.NetworkContext, req *codec.OffsetCommitReq) (*codec.OffsetCommitResp, gnet.Action)

func (*Server) OffsetFetch

func (s *Server) OffsetFetch(c gnet.Conn, req *codec.OffsetFetchReq) (*codec.OffsetFetchResp, gnet.Action)

func (*Server) OffsetFetchVersion

func (s *Server) OffsetFetchVersion(ctx *ctx.NetworkContext, req *codec.OffsetFetchReq) (*codec.OffsetFetchResp, gnet.Action)

func (*Server) OffsetForLeaderEpoch

func (s *Server) OffsetForLeaderEpoch(c gnet.Conn, req *codec.OffsetForLeaderEpochReq) (*codec.OffsetForLeaderEpochResp, gnet.Action)

func (*Server) OffsetForLeaderEpochVersion

func (s *Server) OffsetForLeaderEpochVersion(ctx *ctx.NetworkContext, req *codec.OffsetForLeaderEpochReq) (*codec.OffsetForLeaderEpochResp, gnet.Action)

func (*Server) OnClosed

func (s *Server) OnClosed(c gnet.Conn, err error) (action gnet.Action)

func (*Server) OnInitComplete

func (s *Server) OnInitComplete(server gnet.Server) (action gnet.Action)

func (*Server) OnOpened

func (s *Server) OnOpened(c gnet.Conn) (out []byte, action gnet.Action)

func (*Server) Produce

func (s *Server) Produce(c gnet.Conn, req *codec.ProduceReq) (*codec.ProduceResp, gnet.Action)

func (*Server) ReactApiVersion

func (s *Server) ReactApiVersion(apiRequest *codec.ApiReq) (*codec.ApiResp, gnet.Action)

func (*Server) ReactFetch

func (s *Server) ReactFetch(ctx *ctx.NetworkContext, req *codec.FetchReq) (*codec.FetchResp, gnet.Action)

func (*Server) ReactFindCoordinator

func (s *Server) ReactFindCoordinator(req *codec.FindCoordinatorReq, config *KafkaProtocolConfig) (*codec.FindCoordinatorResp, gnet.Action)

func (*Server) ReactHeartbeat

func (s *Server) ReactHeartbeat(heartbeatReqV4 *codec.HeartbeatReq, context *ctx.NetworkContext) (*codec.HeartbeatResp, gnet.Action)

func (*Server) ReactJoinGroup

func (s *Server) ReactJoinGroup(ctx *ctx.NetworkContext, req *codec.JoinGroupReq) (*codec.JoinGroupResp, gnet.Action)

func (*Server) ReactLeaveGroup

func (s *Server) ReactLeaveGroup(ctx *ctx.NetworkContext, req *codec.LeaveGroupReq) (*codec.LeaveGroupResp, gnet.Action)

func (*Server) ReactMetadata

func (s *Server) ReactMetadata(ctx *ctx.NetworkContext, req *codec.MetadataReq, config *KafkaProtocolConfig) (*codec.MetadataResp, gnet.Action)

func (*Server) ReactProduce

func (s *Server) ReactProduce(ctx *ctx.NetworkContext, req *codec.ProduceReq, config *KafkaProtocolConfig) (*codec.ProduceResp, gnet.Action)

func (*Server) ReactSasl

func (*Server) ReactSaslHandshakeAuth

func (s *Server) ReactSaslHandshakeAuth(req *codec.SaslAuthenticateReq, context *ctx.NetworkContext) (*codec.SaslAuthenticateResp, gnet.Action)

func (*Server) ReactSyncGroup

func (s *Server) ReactSyncGroup(ctx *ctx.NetworkContext, req *codec.SyncGroupReq) (*codec.SyncGroupResp, gnet.Action)

func (*Server) Run

func (s *Server) Run() error

func (*Server) SaslAuthenticate

func (s *Server) SaslAuthenticate(c gnet.Conn, req *codec.SaslAuthenticateReq) (*codec.SaslAuthenticateResp, gnet.Action)

func (*Server) SaslHandshake

func (s *Server) SaslHandshake(c gnet.Conn, req *codec.SaslHandshakeReq) (*codec.SaslHandshakeResp, gnet.Action)

func (*Server) SyncGroup

func (s *Server) SyncGroup(c gnet.Conn, req *codec.SyncGroupReq) (*codec.SyncGroupResp, gnet.Action)

func (*Server) UnSupportedApi

func (s *Server) UnSupportedApi(c gnet.Conn, apiKey codec.ApiCode, apiVersion int16)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL