Documentation ¶
Index ¶
- Variables
- type KafkaProtocolConfig
- type KafsarServer
- type Server
- func (s *Server) ApiVersion(c gnet.Conn, req *codec.ApiReq) (*codec.ApiResp, gnet.Action)
- func (s *Server) AuthFailed() ([]byte, gnet.Action)
- func (s *Server) Authed(context *ctx.NetworkContext) bool
- func (s *Server) Close(ctx context.Context) (err error)
- func (s *Server) ConnError(c gnet.Conn, err error)
- func (s *Server) Fetch(c gnet.Conn, req *codec.FetchReq) (*codec.FetchResp, gnet.Action)
- func (s *Server) FindCoordinator(c gnet.Conn, req *codec.FindCoordinatorReq) (*codec.FindCoordinatorResp, gnet.Action)
- func (s *Server) Heartbeat(c gnet.Conn, req *codec.HeartbeatReq) (*codec.HeartbeatResp, gnet.Action)
- func (s *Server) InvalidKafkaPacket(c gnet.Conn)
- func (s *Server) JoinGroup(c gnet.Conn, req *codec.JoinGroupReq) (*codec.JoinGroupResp, gnet.Action)
- func (s *Server) LeaveGroup(c gnet.Conn, req *codec.LeaveGroupReq) (*codec.LeaveGroupResp, gnet.Action)
- func (s *Server) ListOffsets(c gnet.Conn, req *codec.ListOffsetsReq) (*codec.ListOffsetsResp, gnet.Action)
- func (s *Server) ListOffsetsVersion(ctx *ctx.NetworkContext, req *codec.ListOffsetsReq) (*codec.ListOffsetsResp, gnet.Action)
- func (s *Server) Metadata(c gnet.Conn, req *codec.MetadataReq) (*codec.MetadataResp, gnet.Action)
- func (s *Server) OffsetCommit(c gnet.Conn, req *codec.OffsetCommitReq) (*codec.OffsetCommitResp, gnet.Action)
- func (s *Server) OffsetCommitVersion(ctx *ctx.NetworkContext, req *codec.OffsetCommitReq) (*codec.OffsetCommitResp, gnet.Action)
- func (s *Server) OffsetFetch(c gnet.Conn, req *codec.OffsetFetchReq) (*codec.OffsetFetchResp, gnet.Action)
- func (s *Server) OffsetFetchVersion(ctx *ctx.NetworkContext, req *codec.OffsetFetchReq) (*codec.OffsetFetchResp, gnet.Action)
- func (s *Server) OffsetForLeaderEpoch(c gnet.Conn, req *codec.OffsetForLeaderEpochReq) (*codec.OffsetForLeaderEpochResp, gnet.Action)
- func (s *Server) OffsetForLeaderEpochVersion(ctx *ctx.NetworkContext, req *codec.OffsetForLeaderEpochReq) (*codec.OffsetForLeaderEpochResp, gnet.Action)
- func (s *Server) OnClosed(c gnet.Conn, err error) (action gnet.Action)
- func (s *Server) OnInitComplete(server gnet.Server) (action gnet.Action)
- func (s *Server) OnOpened(c gnet.Conn) (out []byte, action gnet.Action)
- func (s *Server) Produce(c gnet.Conn, req *codec.ProduceReq) (*codec.ProduceResp, gnet.Action)
- func (s *Server) ReactApiVersion(apiRequest *codec.ApiReq) (*codec.ApiResp, gnet.Action)
- func (s *Server) ReactFetch(ctx *ctx.NetworkContext, req *codec.FetchReq) (*codec.FetchResp, gnet.Action)
- func (s *Server) ReactFindCoordinator(req *codec.FindCoordinatorReq, config *KafkaProtocolConfig) (*codec.FindCoordinatorResp, gnet.Action)
- func (s *Server) ReactHeartbeat(heartbeatReqV4 *codec.HeartbeatReq, context *ctx.NetworkContext) (*codec.HeartbeatResp, gnet.Action)
- func (s *Server) ReactJoinGroup(ctx *ctx.NetworkContext, req *codec.JoinGroupReq) (*codec.JoinGroupResp, gnet.Action)
- func (s *Server) ReactLeaveGroup(ctx *ctx.NetworkContext, req *codec.LeaveGroupReq) (*codec.LeaveGroupResp, gnet.Action)
- func (s *Server) ReactMetadata(ctx *ctx.NetworkContext, req *codec.MetadataReq, config *KafkaProtocolConfig) (*codec.MetadataResp, gnet.Action)
- func (s *Server) ReactProduce(ctx *ctx.NetworkContext, req *codec.ProduceReq, config *KafkaProtocolConfig) (*codec.ProduceResp, gnet.Action)
- func (s *Server) ReactSasl(req *codec.SaslHandshakeReq) (*codec.SaslHandshakeResp, gnet.Action)
- func (s *Server) ReactSaslHandshakeAuth(req *codec.SaslAuthenticateReq, context *ctx.NetworkContext) (*codec.SaslAuthenticateResp, gnet.Action)
- func (s *Server) ReactSyncGroup(ctx *ctx.NetworkContext, req *codec.SyncGroupReq) (*codec.SyncGroupResp, gnet.Action)
- func (s *Server) Run() error
- func (s *Server) SaslAuthenticate(c gnet.Conn, req *codec.SaslAuthenticateReq) (*codec.SaslAuthenticateResp, gnet.Action)
- func (s *Server) SaslHandshake(c gnet.Conn, req *codec.SaslHandshakeReq) (*codec.SaslHandshakeResp, gnet.Action)
- func (s *Server) SyncGroup(c gnet.Conn, req *codec.SyncGroupReq) (*codec.SyncGroupResp, gnet.Action)
- func (s *Server) UnSupportedApi(c gnet.Conn, apiKey codec.ApiCode, apiVersion int16)
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ALL_PERMISSION_TYPE = "ALL" PRODUCER_PERMISSION_TYPE = "W" CONSUMER_PERMISSION_TYPE = "R" )
Functions ¶
This section is empty.
Types ¶
type KafkaProtocolConfig ¶
type KafsarServer ¶
type KafsarServer interface { PartitionNum(addr net.Addr, topic string) (int, error) TopicList(addr net.Addr) ([]string, error) // Fetch method called this already authed Fetch(addr net.Addr, req *codec.FetchReq) ([]*codec.FetchTopicResp, error) // GroupJoin method called this already authed GroupJoin(addr net.Addr, req *codec.JoinGroupReq) (*codec.JoinGroupResp, error) // GroupLeave method called this already authed GroupLeave(addr net.Addr, req *codec.LeaveGroupReq) (*codec.LeaveGroupResp, error) // GroupSync method called this already authed GroupSync(addr net.Addr, req *codec.SyncGroupReq) (*codec.SyncGroupResp, error) // OffsetListPartition method called this already authed OffsetListPartition(addr net.Addr, topic, clientID string, req *codec.ListOffsetsPartition) (*codec.ListOffsetsPartitionResp, error) // OffsetCommitPartition method called this already authed OffsetCommitPartition(addr net.Addr, topic, clientID string, req *codec.OffsetCommitPartitionReq) (*codec.OffsetCommitPartitionResp, error) // OffsetFetch method called this already authed OffsetFetch(addr net.Addr, topic, clientID, groupID string, req *codec.OffsetFetchPartitionReq) (*codec.OffsetFetchPartitionResp, error) // OffsetLeaderEpoch method called this already authed OffsetLeaderEpoch(addr net.Addr, topic string, req *codec.OffsetLeaderEpochPartitionReq) (*codec.OffsetForLeaderEpochPartitionResp, error) // Produce method called this already authed Produce(addr net.Addr, topic string, partition int, req *codec.ProducePartitionReq) (*codec.ProducePartitionResp, error) SaslAuth(addr net.Addr, req codec.SaslAuthenticateReq) (bool, codec.ErrorCode) SaslAuthTopic(addr net.Addr, req codec.SaslAuthenticateReq, topic, permissionType string) (bool, codec.ErrorCode) AuthGroupTopic(topic, groupId string) bool SaslAuthConsumerGroup(addr net.Addr, req codec.SaslAuthenticateReq, consumerGroup string) (bool, codec.ErrorCode) HeartBeat(addr net.Addr, req codec.HeartbeatReq) *codec.HeartbeatResp Disconnect(addr net.Addr) }
type Server ¶
func NewServer ¶
func NewServer(config *kgnet.GnetServerConfig, kfkProtocolConfig *KafkaProtocolConfig, impl KafsarServer) (*Server, error)
func (*Server) ApiVersion ¶
func (*Server) FindCoordinator ¶
func (s *Server) FindCoordinator(c gnet.Conn, req *codec.FindCoordinatorReq) (*codec.FindCoordinatorResp, gnet.Action)
func (*Server) Heartbeat ¶
func (s *Server) Heartbeat(c gnet.Conn, req *codec.HeartbeatReq) (*codec.HeartbeatResp, gnet.Action)
func (*Server) InvalidKafkaPacket ¶
func (*Server) JoinGroup ¶
func (s *Server) JoinGroup(c gnet.Conn, req *codec.JoinGroupReq) (*codec.JoinGroupResp, gnet.Action)
func (*Server) LeaveGroup ¶
func (s *Server) LeaveGroup(c gnet.Conn, req *codec.LeaveGroupReq) (*codec.LeaveGroupResp, gnet.Action)
func (*Server) ListOffsets ¶
func (s *Server) ListOffsets(c gnet.Conn, req *codec.ListOffsetsReq) (*codec.ListOffsetsResp, gnet.Action)
func (*Server) ListOffsetsVersion ¶
func (s *Server) ListOffsetsVersion(ctx *ctx.NetworkContext, req *codec.ListOffsetsReq) (*codec.ListOffsetsResp, gnet.Action)
func (*Server) Metadata ¶
func (s *Server) Metadata(c gnet.Conn, req *codec.MetadataReq) (*codec.MetadataResp, gnet.Action)
func (*Server) OffsetCommit ¶
func (s *Server) OffsetCommit(c gnet.Conn, req *codec.OffsetCommitReq) (*codec.OffsetCommitResp, gnet.Action)
func (*Server) OffsetCommitVersion ¶
func (s *Server) OffsetCommitVersion(ctx *ctx.NetworkContext, req *codec.OffsetCommitReq) (*codec.OffsetCommitResp, gnet.Action)
func (*Server) OffsetFetch ¶
func (s *Server) OffsetFetch(c gnet.Conn, req *codec.OffsetFetchReq) (*codec.OffsetFetchResp, gnet.Action)
func (*Server) OffsetFetchVersion ¶
func (s *Server) OffsetFetchVersion(ctx *ctx.NetworkContext, req *codec.OffsetFetchReq) (*codec.OffsetFetchResp, gnet.Action)
func (*Server) OffsetForLeaderEpoch ¶
func (s *Server) OffsetForLeaderEpoch(c gnet.Conn, req *codec.OffsetForLeaderEpochReq) (*codec.OffsetForLeaderEpochResp, gnet.Action)
func (*Server) OffsetForLeaderEpochVersion ¶
func (s *Server) OffsetForLeaderEpochVersion(ctx *ctx.NetworkContext, req *codec.OffsetForLeaderEpochReq) (*codec.OffsetForLeaderEpochResp, gnet.Action)
func (*Server) OnInitComplete ¶
func (*Server) Produce ¶
func (s *Server) Produce(c gnet.Conn, req *codec.ProduceReq) (*codec.ProduceResp, gnet.Action)
func (*Server) ReactApiVersion ¶
func (*Server) ReactFetch ¶
func (*Server) ReactFindCoordinator ¶
func (s *Server) ReactFindCoordinator(req *codec.FindCoordinatorReq, config *KafkaProtocolConfig) (*codec.FindCoordinatorResp, gnet.Action)
func (*Server) ReactHeartbeat ¶
func (s *Server) ReactHeartbeat(heartbeatReqV4 *codec.HeartbeatReq, context *ctx.NetworkContext) (*codec.HeartbeatResp, gnet.Action)
func (*Server) ReactJoinGroup ¶
func (s *Server) ReactJoinGroup(ctx *ctx.NetworkContext, req *codec.JoinGroupReq) (*codec.JoinGroupResp, gnet.Action)
func (*Server) ReactLeaveGroup ¶
func (s *Server) ReactLeaveGroup(ctx *ctx.NetworkContext, req *codec.LeaveGroupReq) (*codec.LeaveGroupResp, gnet.Action)
func (*Server) ReactMetadata ¶
func (s *Server) ReactMetadata(ctx *ctx.NetworkContext, req *codec.MetadataReq, config *KafkaProtocolConfig) (*codec.MetadataResp, gnet.Action)
func (*Server) ReactProduce ¶
func (s *Server) ReactProduce(ctx *ctx.NetworkContext, req *codec.ProduceReq, config *KafkaProtocolConfig) (*codec.ProduceResp, gnet.Action)
func (*Server) ReactSasl ¶
func (s *Server) ReactSasl(req *codec.SaslHandshakeReq) (*codec.SaslHandshakeResp, gnet.Action)
func (*Server) ReactSaslHandshakeAuth ¶
func (s *Server) ReactSaslHandshakeAuth(req *codec.SaslAuthenticateReq, context *ctx.NetworkContext) (*codec.SaslAuthenticateResp, gnet.Action)
func (*Server) ReactSyncGroup ¶
func (s *Server) ReactSyncGroup(ctx *ctx.NetworkContext, req *codec.SyncGroupReq) (*codec.SyncGroupResp, gnet.Action)
func (*Server) SaslAuthenticate ¶
func (s *Server) SaslAuthenticate(c gnet.Conn, req *codec.SaslAuthenticateReq) (*codec.SaslAuthenticateResp, gnet.Action)
func (*Server) SaslHandshake ¶
func (s *Server) SaslHandshake(c gnet.Conn, req *codec.SaslHandshakeReq) (*codec.SaslHandshakeResp, gnet.Action)
Source Files ¶
- config.go
- const.go
- kafka.go
- kafka_api_versions.go
- kafka_auth.go
- kafka_fetch.go
- kafka_find_coordinator.go
- kafka_heartbeat.go
- kafka_join_group.go
- kafka_leave_group.go
- kafka_list_offsets.go
- kafka_metadata.go
- kafka_offset_commit.go
- kafka_offset_fetch.go
- kafka_offset_leader_epoch.go
- kafka_produce.go
- kafka_sasl_authenticate.go
- kafka_sasl_handshake.go
- kafka_sync_group.go
- sasl.go
Click to show internal directories.
Click to hide internal directories.