consumer

package
v0.0.0-...-32ff608 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2023 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoConnectorPlugin    = errors.New("no connector plugin found")
	ErrNewConsumerConnector = errors.New("create consumer connector err")
	ErrNewProducerConnector = errors.New("create producer connector err")
)
View Source
var (
	ConsumerGroupWaitingRequestThreshold = 1000

	ErrRequestReachMaxThreshold = errors.New("request reach the max threshold")
)
View Source
var (
	ErrNoConsumerClient = errors.New("no consumer group client")
)
View Source
var (
	ErrNoProtocolFound = errors.New("no protocol type found in event message")
)
View Source
var (
	ErrProtocolPluginNotFound = fmt.Errorf("protocol plugin not found")
)

Functions

This section is empty.

Types

type BaseConsumerGroupTopicOption

type BaseConsumerGroupTopicOption struct {
	// contains filtered or unexported fields
}

BaseConsumerGroupTopicOption refers to ConsumerGroupTopicConfig

func (*BaseConsumerGroupTopicOption) ConsumerGroup

func (b *BaseConsumerGroupTopicOption) ConsumerGroup() string

func (*BaseConsumerGroupTopicOption) DeregisterClient

func (b *BaseConsumerGroupTopicOption) DeregisterClient() DeregisterClient

func (*BaseConsumerGroupTopicOption) GRPCType

func (*BaseConsumerGroupTopicOption) RegisterClient

func (b *BaseConsumerGroupTopicOption) RegisterClient() RegisterClient

func (*BaseConsumerGroupTopicOption) SubscriptionMode

func (*BaseConsumerGroupTopicOption) Topic

type ConsumerGroupConfig

type ConsumerGroupConfig struct {
	ConsumerGroup string

	// key is topic, value is  ConsumerGroupTopicConfig
	ConsumerGroupTopicConfigs *sync.Map
}

type ConsumerGroupMetadata

type ConsumerGroupMetadata struct {
	ConsumerGroup              string
	ConsumerGroupTopicMetadata *sync.Map
}

type ConsumerGroupStateEvent

type ConsumerGroupStateEvent struct {
	ConsumerGroup            string
	ConsumerGroupConfig      *ConsumerGroupConfig
	ConsumerGroupStateAction StateAction
}

type ConsumerGroupTopicConfChangeEvent

type ConsumerGroupTopicConfChangeEvent struct {
	Action                   StateAction
	ConsumerGroup            string
	ConsumerGroupTopicConfig *ConsumerGroupTopicConfig
}

type ConsumerGroupTopicConfig

type ConsumerGroupTopicConfig struct {
	ConsumerGroup    string
	Topic            string
	SubscriptionMode pb.Subscription_SubscriptionItem_SubscriptionMode
	GRPCType         consts.GRPCType
	// IDCWebhookURLs webhook urls seperated by IDC
	// key is IDC, value is vector.Vector
	IDCWebhookURLs *sync.Map

	// AllURLs all webhook urls, ignore idc
	AllURLs *set.Set
}

type ConsumerGroupTopicMetadata

type ConsumerGroupTopicMetadata struct {
	ConsumerGroup string
	Topic         string
	AllURLs       *set.Set
}

type ConsumerGroupTopicOption

type ConsumerGroupTopicOption interface {
	ConsumerGroup() string
	Topic() string
	SubscriptionMode() pb.Subscription_SubscriptionItem_SubscriptionMode
	GRPCType() consts.GRPCType
	RegisterClient() RegisterClient
	DeregisterClient() DeregisterClient
	IDCURLs() *sync.Map
	AllURLs() *set.Set
	AllEmiters() *set.Set
	IDCEmiters() *sync.Map
	Size() int
}

type ConsumerManager

type ConsumerManager interface {
	GetConsumer(consumerGroup string) (EventMeshConsumer, error)
	RegisterClient(cli *GroupClient) error
	DeRegisterClient(cli *GroupClient) error
	UpdateClientTime(cli *GroupClient)
	RestartConsumer(consumerGroup string) error
	Start() error
	Stop() error
}

func NewConsumerManager

func NewConsumerManager() (ConsumerManager, error)

NewConsumerManager create new consumer manager

type ConsumerService

type ConsumerService struct {
	pb.UnimplementedConsumerServiceServer
	// contains filtered or unexported fields
}

ConsumerService grpc service

func NewConsumerServiceServer

func NewConsumerServiceServer(consumerManager ConsumerManager, producerManager producer.ProducerManager) (*ConsumerService, error)

func (*ConsumerService) Subscribe

func (c *ConsumerService) Subscribe(ctx context.Context, sub *pb.Subscription) (*pb.Response, error)

func (*ConsumerService) SubscribeStream

SubscribeStream handle stream request with two groutine for Recv() and Send() message for Recv() goroutine if got err==io.EOF as the client close the stream(即客户端关闭stream) for Send() refers to https://github.com/grpc/grpc-go/issues/444

func (*ConsumerService) Unsubscribe

func (c *ConsumerService) Unsubscribe(ctx context.Context, sub *pb.Subscription) (*pb.Response, error)

type DeregisterClient

type DeregisterClient func(*GroupClient)

type EventMeshConsumer

type EventMeshConsumer interface {
	Init() error
	Start() error
	ServiceState() consts.ServiceState
	RegisterClient(cli *GroupClient) bool
	DeRegisterClient(cli *GroupClient) bool
	Shutdown() error
}

func NewEventMeshConsumer

func NewEventMeshConsumer(consumerGroup string) (EventMeshConsumer, error)

type GroupClient

type GroupClient struct {
	ENV              string
	IDC              string
	ConsumerGroup    string
	Topic            string
	GRPCType         consts.GRPCType
	URL              string
	SubscriptionMode pb.Subscription_SubscriptionItem_SubscriptionMode
	SYS              string
	IP               string
	PID              string
	Hostname         string
	APIVersion       string
	LastUPTime       time.Time
	Emiter           emitter.EventEmitter
}

GroupClient consumer group client details

func DefaultStreamGroupClient

func DefaultStreamGroupClient() *GroupClient

func DefaultWebhookGroupClient

func DefaultWebhookGroupClient() *GroupClient

type MessageContext

type MessageContext struct {
	MsgRandomNo      string
	SubscriptionMode pb.Subscription_SubscriptionItem_SubscriptionMode
	GrpcType         consts.GRPCType
	ConsumerGroup    string
	Event            *cloudv2.Event
	TopicConfig      ConsumerGroupTopicOption
}

type MessageHandler

type MessageHandler interface {
	Handler(mctx *MessageContext) error
}

func NewMessageHandler

func NewMessageHandler(consumerGroup string) (MessageHandler, error)

type Processor

type Processor interface {
	Subscribe(consumerMgr ConsumerManager, msg *pb.Subscription) (*pb.Response, error)
	UnSubscribe(consumerMgr ConsumerManager, msg *pb.Subscription) (*pb.Response, error)
	SubscribeStream(consumerMgr ConsumerManager, emiter emitter.EventEmitter, msg *pb.Subscription) error
	Heartbeat(consumerMgr ConsumerManager, msg *pb.Heartbeat) (*pb.Response, error)
	ReplyMessage(ctx context.Context, producerMgr producer.ProducerManager, emiter emitter.EventEmitter, msg *pb.SimpleMessage) error
}

type RegisterClient

type RegisterClient func(*GroupClient)

type Request

type Request struct {
	*retry.Retry

	MessageContext *MessageContext
	CreateTime     time.Time
	LastPushTime   time.Time
	Complete       *atomic.Bool
	SimpleMessage  *pb.SimpleMessage
	Try            func() error
}

func NewRequest

func NewRequest(mctx *MessageContext) (*Request, error)

func (*Request) Timeout

func (r *Request) Timeout() bool

type Response

type Response struct {
	RetCode string `json:"retCode"`
	ErrMsg  string `json:"errMsg"`
}

type StateAction

type StateAction string
const (
	NEW    StateAction = "NEW"
	CHANGE StateAction = "CHANGE"
	DELETE StateAction = "DELETE"
)

type StreamGroupTopicOption

type StreamGroupTopicOption struct {
	*BaseConsumerGroupTopicOption
	// contains filtered or unexported fields
}

StreamGroupTopicOption topic option for subscribe with stream

func (*StreamGroupTopicOption) AllEmiters

func (b *StreamGroupTopicOption) AllEmiters() *set.Set

func (*StreamGroupTopicOption) AllURLs

func (b *StreamGroupTopicOption) AllURLs() *set.Set

func (*StreamGroupTopicOption) DeregisterClient

func (b *StreamGroupTopicOption) DeregisterClient() DeregisterClient

func (*StreamGroupTopicOption) IDCEmiters

func (b *StreamGroupTopicOption) IDCEmiters() *sync.Map

func (*StreamGroupTopicOption) IDCURLs

func (b *StreamGroupTopicOption) IDCURLs() *sync.Map

func (*StreamGroupTopicOption) RegisterClient

func (b *StreamGroupTopicOption) RegisterClient() RegisterClient

func (*StreamGroupTopicOption) Size

func (b *StreamGroupTopicOption) Size() int

type StreamRequest

type StreamRequest struct {
	*Request
	// contains filtered or unexported fields
}

func NewStreamRequest

func NewStreamRequest(mctx *MessageContext) (*StreamRequest, error)

type WebhookGroupTopicOption

type WebhookGroupTopicOption struct {
	*BaseConsumerGroupTopicOption
	// contains filtered or unexported fields
}

WebhookGroupTopicOption topic option for subscribe with webhook

func (*WebhookGroupTopicOption) AllEmiters

func (b *WebhookGroupTopicOption) AllEmiters() *set.Set

func (*WebhookGroupTopicOption) AllURLs

func (b *WebhookGroupTopicOption) AllURLs() *set.Set

func (*WebhookGroupTopicOption) IDCEmiters

func (b *WebhookGroupTopicOption) IDCEmiters() *sync.Map

func (*WebhookGroupTopicOption) IDCURLs

func (b *WebhookGroupTopicOption) IDCURLs() *sync.Map

func (*WebhookGroupTopicOption) Size

func (b *WebhookGroupTopicOption) Size() int

type WebhookRequest

type WebhookRequest struct {
	*Request
	// IDCWebhookURLs webhook urls seperated by IDC
	// key is IDC, value is set.Set
	IDCWebhookURLs *sync.Map

	// AllURLs all webhook urls, ignore idc
	AllURLs *set.Set
	// contains filtered or unexported fields
}

func NewWebhookRequest

func NewWebhookRequest(mctx *MessageContext) (*WebhookRequest, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL