queues_stream

package
v1.8.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2024 License: Apache-2.0 Imports: 15 Imported by: 12

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AckAllRequest

type AckAllRequest struct {
	ClientID        string
	Channel         string
	WaitTimeSeconds int32
	// contains filtered or unexported fields
}

func NewAckAllRequest

func NewAckAllRequest() *AckAllRequest

func (*AckAllRequest) SetChannel

func (req *AckAllRequest) SetChannel(channel string) *AckAllRequest

SetChannel - set ack all queue message request channel - mandatory if default channel was not set

func (*AckAllRequest) SetClientId

func (req *AckAllRequest) SetClientId(clientId string) *AckAllRequest

func (*AckAllRequest) SetWaitTimeSeconds

func (req *AckAllRequest) SetWaitTimeSeconds(wait int) *AckAllRequest

SetWaitTimeSeconds - set ack all queue message request wait timout

type AckAllResponse

type AckAllResponse struct {
	RequestID        string
	AffectedMessages uint64
	IsError          bool
	Error            string
}

type GrpcClient

type GrpcClient struct {
	pb.KubemqClient
	// contains filtered or unexported fields
}

func NewGrpcClient

func NewGrpcClient(ctx context.Context, op ...Option) (*GrpcClient, error)

func (*GrpcClient) Close

func (g *GrpcClient) Close() error

func (*GrpcClient) GlobalClientId

func (g *GrpcClient) GlobalClientId() string

func (*GrpcClient) Ping

func (g *GrpcClient) Ping(ctx context.Context) (*pb.PingResult, error)

type Option

type Option interface {
	// contains filtered or unexported methods
}

func WithAddress

func WithAddress(host string, port int) Option

WithAddress - set host and port address of KubeMQ server

func WithAuthToken

func WithAuthToken(token string) Option

WithAuthToken - set KubeMQ JWT Auth token to be used for KubeMQ connection

func WithAutoReconnect

func WithAutoReconnect(value bool) Option

WithAutoReconnect - set automatic reconnection in case of lost connectivity to server

func WithCertificate

func WithCertificate(certData, serverOverrideDomain string) Option

WithCertificate - set secured TLS credentials from the input certificate data for grpcClient. serverNameOverride is for testing only. If set to a non empty string, it will override the virtual host name of authority (e.g. :authority header field) in requests.

func WithCheckConnection

func WithCheckConnection(value bool) Option

WithCheckConnection - set server connectivity on grpcClient create

func WithClientId

func WithClientId(id string) Option

WithClientId - set grpcClient id to be used in all functions call with this grpcClient - mandatory

func WithConnectionNotificationFunc added in v1.7.2

func WithConnectionNotificationFunc(fn func(msg string)) Option

WithConnectionNotificationFunc - set on connection activity messages

func WithCredentials

func WithCredentials(certFile, serverOverrideDomain string) Option

WithCredentials - set secured TLS credentials from the input certificate file for grpcClient. serverNameOverride is for testing only. If set to a non empty string, it will override the virtual host name of authority (e.g. :authority header field) in requests.

func WithDefaultCacheTTL

func WithDefaultCacheTTL(ttl time.Duration) Option

WithDefaultCacheTTL - set default cache time to live for any query requests with any CacheKey set value

func WithDefaultChannel

func WithDefaultChannel(channel string) Option

WithDefaultChannel - set default channel for any outbound requests

func WithMaxReconnects

func WithMaxReconnects(value int) Option

WithMaxReconnects - set max reconnects before return error, default 0, never.

func WithReceiveBufferSize

func WithReceiveBufferSize(size int) Option

WithReceiveBufferSize - set length of buffered channel to be set in all subscriptions

func WithReconnectInterval

func WithReconnectInterval(duration time.Duration) Option

WithReconnectInterval - set reconnection interval duration, default is 5 seconds

type Options

type Options struct {
	// contains filtered or unexported fields
}

func GetDefaultOptions

func GetDefaultOptions() *Options

func (*Options) Validate

func (o *Options) Validate() error

type PollRequest

type PollRequest struct {
	Channel     string `json:"Channel"`
	MaxItems    int    `json:"max_items"`
	WaitTimeout int    `json:"wait_timeout"`
	AutoAck     bool   `json:"auto_ack"`
	OnErrorFunc func(err error)
	OnComplete  func()
}

PollRequest - Request parameters for Poll function

func NewPollRequest

func NewPollRequest() *PollRequest

func (*PollRequest) SetAutoAck

func (p *PollRequest) SetAutoAck(autoAck bool) *PollRequest

func (*PollRequest) SetChannel

func (p *PollRequest) SetChannel(channel string) *PollRequest

func (*PollRequest) SetMaxItems

func (p *PollRequest) SetMaxItems(maxItems int) *PollRequest

func (*PollRequest) SetOnComplete

func (p *PollRequest) SetOnComplete(onComplete func()) *PollRequest

func (*PollRequest) SetOnErrorFunc

func (p *PollRequest) SetOnErrorFunc(onErrorFunc func(err error)) *PollRequest

func (*PollRequest) SetWaitTimeout

func (p *PollRequest) SetWaitTimeout(waitTimeout int) *PollRequest

type PollResponse

type PollResponse struct {
	Messages []*QueueMessage
	// contains filtered or unexported fields
}

func (PollResponse) AckAll

func (r PollResponse) AckAll() error

func (PollResponse) AckOffsets

func (r PollResponse) AckOffsets(offsets ...int64) error

func (PollResponse) Close

func (r PollResponse) Close() error

func (*PollResponse) HasMessages

func (p *PollResponse) HasMessages() bool

func (PollResponse) NAckAll

func (r PollResponse) NAckAll() error

func (PollResponse) NAckOffsets

func (r PollResponse) NAckOffsets(offsets ...int64) error

func (PollResponse) ReQueueAll

func (r PollResponse) ReQueueAll(channel string) error

func (PollResponse) ReQueueOffsets

func (r PollResponse) ReQueueOffsets(channel string, offsets ...int64) error

type QueueInfo added in v1.7.0

type QueueInfo struct {
	Name          string `json:"name"`
	Messages      int64  `json:"messages"`
	Bytes         int64  `json:"bytes"`
	FirstSequence int64  `json:"first_sequence"`
	LastSequence  int64  `json:"last_sequence"`
	Sent          int64  `json:"sent"`
	Subscribers   int    `json:"subscribers"`
	Waiting       int64  `json:"waiting"`
	Delivered     int64  `json:"delivered"`
}

type QueueMessage

type QueueMessage struct {
	*pb.QueueMessage
	// contains filtered or unexported fields
}

func NewQueueMessage

func NewQueueMessage() *QueueMessage

func (*QueueMessage) Ack

func (qm *QueueMessage) Ack() error

func (QueueMessage) AckAll

func (r QueueMessage) AckAll() error

func (QueueMessage) AckOffsets

func (r QueueMessage) AckOffsets(offsets ...int64) error

func (*QueueMessage) AddTag

func (qm *QueueMessage) AddTag(key, value string) *QueueMessage

AddTag - add key value tags to query message

func (QueueMessage) Close

func (r QueueMessage) Close() error

func (*QueueMessage) NAck

func (qm *QueueMessage) NAck() error

func (QueueMessage) NAckAll

func (r QueueMessage) NAckAll() error

func (QueueMessage) NAckOffsets

func (r QueueMessage) NAckOffsets(offsets ...int64) error

func (*QueueMessage) ReQueue

func (qm *QueueMessage) ReQueue(channel string) error

func (QueueMessage) ReQueueAll

func (r QueueMessage) ReQueueAll(channel string) error

func (QueueMessage) ReQueueOffsets

func (r QueueMessage) ReQueueOffsets(channel string, offsets ...int64) error

func (*QueueMessage) SetBody

func (qm *QueueMessage) SetBody(body []byte) *QueueMessage

SetBody - set queue message body - mandatory if metadata field is empty

func (*QueueMessage) SetChannel

func (qm *QueueMessage) SetChannel(channel string) *QueueMessage

SetChannel - set queue message Channel - mandatory if default Channel was not set

func (*QueueMessage) SetClientId

func (qm *QueueMessage) SetClientId(clientId string) *QueueMessage

SetClientId - set queue message ClientId - mandatory if default grpcClient was not set

func (*QueueMessage) SetId

func (qm *QueueMessage) SetId(id string) *QueueMessage

SetId - set queue message id, otherwise new random uuid will be set

func (*QueueMessage) SetMetadata

func (qm *QueueMessage) SetMetadata(metadata string) *QueueMessage

SetMetadata - set queue message metadata - mandatory if body field is empty

func (*QueueMessage) SetPolicyDelaySeconds

func (qm *QueueMessage) SetPolicyDelaySeconds(sec int) *QueueMessage

SetPolicyDelaySeconds - set queue message delivery delay in seconds, 0 , immediate delivery

func (*QueueMessage) SetPolicyExpirationSeconds

func (qm *QueueMessage) SetPolicyExpirationSeconds(sec int) *QueueMessage

SetPolicyExpirationSeconds - set queue message expiration seconds, 0 never expires

func (*QueueMessage) SetPolicyMaxReceiveCount

func (qm *QueueMessage) SetPolicyMaxReceiveCount(max int) *QueueMessage

SetPolicyMaxReceiveCount - set max delivery attempts before message will discard or re-route to a new queue

func (*QueueMessage) SetPolicyMaxReceiveQueue

func (qm *QueueMessage) SetPolicyMaxReceiveQueue(channel string) *QueueMessage

SetPolicyMaxReceiveQueue - set queue name to be routed once MaxReceiveCount is triggered, empty will discard the message

func (*QueueMessage) SetTags

func (qm *QueueMessage) SetTags(tags map[string]string) *QueueMessage

SetTags - set key value tags to queue message

type QueuesInfo added in v1.7.0

type QueuesInfo struct {
	TotalQueues int          `json:"total_queues"`
	Sent        int64        `json:"sent"`
	Waiting     int64        `json:"waiting"`
	Delivered   int64        `json:"delivered"`
	Queues      []*QueueInfo `json:"queues"`
}

type QueuesStreamClient

type QueuesStreamClient struct {
	sync.Mutex
	// contains filtered or unexported fields
}

QueuesStreamClient is a client for streaming queues operations. It manages the connection to the Kubemq server and provides methods for interacting with queues.

func NewQueuesStreamClient

func NewQueuesStreamClient(ctx context.Context, op ...Option) (*QueuesStreamClient, error)

NewQueuesStreamClient is a function that creates a new QueuesStreamClient instance. It takes a context and an optional list of options. It returns a QueuesStreamClient and an error. The function creates a new GrpcClient using the provided context and options. If the creation of the GrpcClient fails, an error is returned. Otherwise, a new QueuesStreamClient is created with the client context and client. The QueuesStreamClient is then returned along with a nil error.

func (*QueuesStreamClient) AckAll

func (q *QueuesStreamClient) AckAll(ctx context.Context, request *AckAllRequest) (*AckAllResponse, error)

AckAll sends an acknowledgment for all messages in the specified channel. It validates the request, creates a new AckAllQueueMessagesRequest, and sends it to the client's AckAllQueueMessages method. If successful, it creates a new AckAllResponse with the response data and returns it.

func (*QueuesStreamClient) Close

func (q *QueuesStreamClient) Close() error

Close closes the QueuesStreamClient by closing the upstream and downstream connections and then closing the underlying GrpcClient connection. It also sleeps for 100 milliseconds before closing the connections. Returns an error if any of the close operations encounter an error, or if closing the underlying GrpcClient encounters an error.

func (*QueuesStreamClient) Create added in v1.8.0

func (q *QueuesStreamClient) Create(ctx context.Context, channel string) error

Create sends a create channel request to the Kubemq server. It creates a new channel of type "queues" with the specified channel name. It returns an error if the request fails or if there is an error creating the channel.

func (*QueuesStreamClient) Delete added in v1.8.0

func (q *QueuesStreamClient) Delete(ctx context.Context, channel string) error

Delete deletes a channel in the QueuesStreamClient. It sends a request to the server to delete the specified channel. If the request encounters an error while sending or if the response contains an error message, an error is returned.

Parameters: - ctx: the context.Context for the request - channel: the name of the channel to delete

Returns: - error: an error if the delete channel request fails, otherwise nil

func (*QueuesStreamClient) List added in v1.8.0

func (q *QueuesStreamClient) List(ctx context.Context, search string) ([]*common.QueuesChannel, error)

List returns a list of QueuesChannels based on the given search string. It sends a request to the Kubemq server to retrieve the list of channels. The search string is used to filter the channels. It returns the list of QueuesChannels and any error encountered.

func (*QueuesStreamClient) Poll

func (q *QueuesStreamClient) Poll(ctx context.Context, request *PollRequest) (*PollResponse, error)

Poll retrieves messages from the QueuesStreamClient. It checks if the downstream connection is ready and then calls downstream.poll()

func (*QueuesStreamClient) QueuesInfo added in v1.7.0

func (q *QueuesStreamClient) QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)

QueuesInfo returns information about queues based on the provided filter. It sends a request to the gRPC client to retrieve QueuesInfoResponse. The response is then transformed into a QueuesInfo struct and returned.

func (*QueuesStreamClient) Send

func (q *QueuesStreamClient) Send(ctx context.Context, messages ...*QueueMessage) (*SendResult, error)

Send sends one or more QueueMessages to the QueuesStreamClient. It acquires a lock to ensure thread safety. If there is no upstream connection, a new upstream connection is created. If the upstream connection is not ready, an error is returned. If there are no messages to send, an error is returned. The messages are converted to a list of pb.QueueMessage structs. A new QueuesUpstreamRequest is created with a unique RequestID and the list of messages. The request is sent to the upstream server using the send method of the upstream connection. The response is received through a select statement, handling both the response and the cancellation of the request. If the response indicates an error, an error is returned. Otherwise, a new SendResult is created from the response and returned along with no error. If the context is canceled before receiving the response, the transaction is canceled and an error is returned. If any other error occurs during the execution, it is returned.

type SendResult

type SendResult struct {
	Results []*pb.SendQueueMessageResult
}

type SubscribeRequest

type SubscribeRequest struct {
	Channels    []string
	MaxItems    int  `json:"max_items"`
	WaitTimeout int  `json:"wait_timeout"`
	AutoAck     bool `json:"auto_ack"`
}

func NewSubscribeRequest

func NewSubscribeRequest() *SubscribeRequest

func (*SubscribeRequest) SetAutoAck

func (s *SubscribeRequest) SetAutoAck(autoAck bool) *SubscribeRequest

func (*SubscribeRequest) SetChannels

func (s *SubscribeRequest) SetChannels(channels ...string) *SubscribeRequest

func (*SubscribeRequest) SetMaxItems

func (s *SubscribeRequest) SetMaxItems(maxItems int) *SubscribeRequest

func (*SubscribeRequest) SetWaitTimeout

func (s *SubscribeRequest) SetWaitTimeout(waitTimeout int) *SubscribeRequest

type SubscribeResponse

type SubscribeResponse struct {
	Messages []*QueueMessage
	// contains filtered or unexported fields
}

func NewSubscribeResponse

func NewSubscribeResponse() *SubscribeResponse

func (SubscribeResponse) AckAll

func (r SubscribeResponse) AckAll() error

func (SubscribeResponse) AckOffsets

func (r SubscribeResponse) AckOffsets(offsets ...int64) error

func (SubscribeResponse) Close

func (r SubscribeResponse) Close() error

func (SubscribeResponse) NAckAll

func (r SubscribeResponse) NAckAll() error

func (SubscribeResponse) NAckOffsets

func (r SubscribeResponse) NAckOffsets(offsets ...int64) error

func (SubscribeResponse) ReQueueAll

func (r SubscribeResponse) ReQueueAll(channel string) error

func (SubscribeResponse) ReQueueOffsets

func (r SubscribeResponse) ReQueueOffsets(channel string, offsets ...int64) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL