kubemq

package module
v1.8.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2024 License: Apache-2.0 Imports: 20 Imported by: 197

README

KubeMQ Go SDK

KubeMQ is an enterprise-grade message queue and broker for containers, designed for any workload and architecture running in Kubernetes. This library is Go implementation of KubeMQ client connection.

Install KubeMQ Community Edition

Please visit KubeMQ Community for intallation steps.

Install KubeMQ Go SDK

go get github.com/kubemq-io/kubemq-go

Learn KubeMQ

Visit our Extensive KubeMQ Documentation.

Examples - Cookbook Recipes

Please visit our cookbook repository

Support

if you encounter any issues, please open an issue here, In addition, you can reach us for support by:

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoTransportDefined    = errors.New("no transport layer defined, create object with client instance")
	ErrNoTransportConnection = errors.New("no transport layer established, aborting")
)

Functions

func CreateChannel added in v1.8.0

func CreateChannel(ctx context.Context, client *Client, clientId string, channel string, channelType string) error

func DecodeCQChannelList added in v1.8.0

func DecodeCQChannelList(dataBytes []byte) ([]*common.CQChannel, error)

func DecodePubSubChannelList added in v1.8.0

func DecodePubSubChannelList(dataBytes []byte) ([]*common.PubSubChannel, error)

func DecodeQueuesChannelList added in v1.8.0

func DecodeQueuesChannelList(dataBytes []byte) ([]*common.QueuesChannel, error)

func DeleteChannel added in v1.8.0

func DeleteChannel(ctx context.Context, client *Client, clientId string, channel string, channelType string) error

func ListCQChannels added in v1.8.0

func ListCQChannels(ctx context.Context, client *Client, clientId string, channelType string, search string) ([]*common.CQChannel, error)

func ListPubSubChannels added in v1.8.0

func ListPubSubChannels(ctx context.Context, client *Client, clientId string, channelType string, search string) ([]*common.PubSubChannel, error)

func ListQueuesChannels added in v1.8.0

func ListQueuesChannels(ctx context.Context, client *Client, clientId string, search string) ([]*common.QueuesChannel, error)

Types

type AckAllQueueMessagesRequest added in v1.2.0

type AckAllQueueMessagesRequest struct {
	RequestID       string
	ClientID        string
	Channel         string
	WaitTimeSeconds int32
	// contains filtered or unexported fields
}

func (*AckAllQueueMessagesRequest) AddTrace added in v1.2.0

func (req *AckAllQueueMessagesRequest) AddTrace(name string) *Trace

AddTrace - add tracing support to ack all receive queue message request

func (*AckAllQueueMessagesRequest) Complete added in v1.5.0

func (*AckAllQueueMessagesRequest) Send added in v1.2.0

Send - sending receive queue messages request , waiting for response or timeout

func (*AckAllQueueMessagesRequest) SetChannel added in v1.2.0

SetChannel - set ack all queue message request channel - mandatory if default channel was not set

func (*AckAllQueueMessagesRequest) SetClientId added in v1.2.0

func (req *AckAllQueueMessagesRequest) SetClientId(clientId string) *AckAllQueueMessagesRequest

SetClientId - set ack all queue message request ClientId - mandatory if default client was not set

func (*AckAllQueueMessagesRequest) SetId added in v1.2.0

SetId - set ack all queue message request id, otherwise new random uuid will be set

func (*AckAllQueueMessagesRequest) SetWaitTimeSeconds added in v1.2.0

func (req *AckAllQueueMessagesRequest) SetWaitTimeSeconds(wait int) *AckAllQueueMessagesRequest

SetWaitTimeSeconds - set ack all queue message request wait timout

func (*AckAllQueueMessagesRequest) Validate added in v1.5.0

func (req *AckAllQueueMessagesRequest) Validate() error

type AckAllQueueMessagesResponse added in v1.2.0

type AckAllQueueMessagesResponse struct {
	RequestID        string
	AffectedMessages uint64
	IsError          bool
	Error            string
}

type Client

type Client struct {
	ServerInfo *ServerInfo
	// contains filtered or unexported fields
}

func NewClient

func NewClient(ctx context.Context, op ...Option) (*Client, error)

NewClient - create client instance to be use to communicate with KubeMQ server

func (*Client) AQM added in v1.2.0

AQM - create an empty ack all receive queue messages request object

func (*Client) AckAllQueueMessages added in v1.2.0

func (c *Client) AckAllQueueMessages(ctx context.Context, req *AckAllQueueMessagesRequest) (*AckAllQueueMessagesResponse, error)

AckAllQueueMessages - send ack all messages in queue

func (*Client) C

func (c *Client) C() *Command

C - create an empty command object

func (*Client) Close

func (c *Client) Close() error

Close - closing client connection. any on going transactions will be aborted

func (*Client) E

func (c *Client) E() *Event

E - create an empty event object

func (*Client) ES

func (c *Client) ES() *EventStore

ES - create an empty event store object

func (*Client) NewAckAllQueueMessagesRequest added in v1.2.0

func (c *Client) NewAckAllQueueMessagesRequest() *AckAllQueueMessagesRequest

NewAckAllQueueMessagesRequest - create an empty ack all receive queue messages request object

func (*Client) NewCommand added in v1.2.0

func (c *Client) NewCommand() *Command

NewCommand - create an empty command

func (*Client) NewEvent added in v1.2.0

func (c *Client) NewEvent() *Event

NewEvent - create an empty event

func (*Client) NewEventStore added in v1.2.0

func (c *Client) NewEventStore() *EventStore

NewEventStore- create an empty event store

func (*Client) NewQuery added in v1.2.0

func (c *Client) NewQuery() *Query

NewQuery - create an empty query

func (*Client) NewQueueMessage added in v1.2.0

func (c *Client) NewQueueMessage() *QueueMessage

NewQueueMessage - create an empty queue messages

func (*Client) NewQueueMessages added in v1.2.0

func (c *Client) NewQueueMessages() *QueueMessages

NewQueueMessages - create an empty queue messages array

func (*Client) NewReceiveQueueMessagesRequest added in v1.2.0

func (c *Client) NewReceiveQueueMessagesRequest() *ReceiveQueueMessagesRequest

NewReceiveQueueMessagesRequest - create an empty receive queue message request object

func (*Client) NewResponse added in v1.2.0

func (c *Client) NewResponse() *Response

NewResponse - create an empty response

func (*Client) NewStreamQueueMessage added in v1.2.0

func (c *Client) NewStreamQueueMessage() *StreamQueueMessage

NewStreamQueueMessage - create an empty stream receive queue message object

func (*Client) Ping added in v1.3.5

func (c *Client) Ping(ctx context.Context) (*ServerInfo, error)

Ping - get status of current connection

func (*Client) Q

func (c *Client) Q() *Query

Q - create an empty query object

func (*Client) QM added in v1.2.0

func (c *Client) QM() *QueueMessage

QM - create an empty queue message object

func (*Client) QMB added in v1.2.0

func (c *Client) QMB() *QueueMessages

QMB - create an empty queue message array object

func (*Client) QueuesInfo added in v1.7.0

func (c *Client) QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)

QueuesInfo - get queues detailed information

func (*Client) R

func (c *Client) R() *Response

R - create an empty response object for command or query responses

func (*Client) RQM added in v1.2.0

RQM - create an empty receive queue message request object

func (*Client) ReceiveQueueMessages added in v1.2.0

ReceiveQueueMessages - call to receive messages from a queue

func (*Client) SQM added in v1.2.0

func (c *Client) SQM() *StreamQueueMessage

SQM - create an empty stream receive queue message object

func (*Client) SendQueueMessage added in v1.2.0

func (c *Client) SendQueueMessage(ctx context.Context, msg *QueueMessage) (*SendQueueMessageResult, error)

SendQueueMessage - send single queue message

func (*Client) SendQueueMessages added in v1.2.0

func (c *Client) SendQueueMessages(ctx context.Context, msg []*QueueMessage) ([]*SendQueueMessageResult, error)

SendQueueMessages - send multiple queue messages

func (*Client) SetCommand added in v1.4.0

func (c *Client) SetCommand(cmd *Command) *Command

func (*Client) SetEvent added in v1.4.0

func (c *Client) SetEvent(e *Event) *Event

func (*Client) SetEventStore added in v1.4.0

func (c *Client) SetEventStore(es *EventStore) *EventStore

func (*Client) SetQuery added in v1.4.0

func (c *Client) SetQuery(query *Query) *Query

func (*Client) SetQueueMessage added in v1.4.0

func (c *Client) SetQueueMessage(qm *QueueMessage) *QueueMessage

func (*Client) SetResponse added in v1.4.0

func (c *Client) SetResponse(response *Response) *Response

func (*Client) StreamEvents

func (c *Client) StreamEvents(ctx context.Context, eventsCh chan *Event, errCh chan error)

StreamEvents - send stream of events in a single call

func (*Client) StreamEventsStore

func (c *Client) StreamEventsStore(ctx context.Context, eventsCh chan *EventStore, eventsResultCh chan *EventStoreResult, errCh chan error)

StreamEventsStore - send stream of events store in a single call

func (*Client) SubscribeToCommands

func (c *Client) SubscribeToCommands(ctx context.Context, channel, group string, errCh chan error) (<-chan *CommandReceive, error)

SubscribeToCommands - subscribe to commands requests by channel and group. return channel of CommandReceived or en error

func (*Client) SubscribeToCommandsWithRequest added in v1.5.0

func (c *Client) SubscribeToCommandsWithRequest(ctx context.Context, request *CommandsSubscription, errCh chan error) (<-chan *CommandReceive, error)

SubscribeToCommands - subscribe to commands requests by channel and group. return channel of CommandReceived or en error

func (*Client) SubscribeToEvents

func (c *Client) SubscribeToEvents(ctx context.Context, channel, group string, errCh chan error) (<-chan *Event, error)

SubscribeToEvents - subscribe to events by channel and group. return channel of events or en error

func (*Client) SubscribeToEventsStore

func (c *Client) SubscribeToEventsStore(ctx context.Context, channel, group string, errCh chan error, opt SubscriptionOption) (<-chan *EventStoreReceive, error)

SubscribeToEventsStore - subscribe to events store by channel and group with subscription option. return channel of events or en error

func (*Client) SubscribeToEventsStoreWithRequest added in v1.5.0

func (c *Client) SubscribeToEventsStoreWithRequest(ctx context.Context, request *EventsStoreSubscription, errCh chan error) (<-chan *EventStoreReceive, error)

SubscribeToEventsStoreWithRequest - subscribe to events store by channel and group with subscription option. return channel of events or en error

func (*Client) SubscribeToEventsWithRequest added in v1.5.0

func (c *Client) SubscribeToEventsWithRequest(ctx context.Context, request *EventsSubscription, errCh chan error) (<-chan *Event, error)

SubscribeToEvents - subscribe to events by channel and group. return channel of events or en error

func (*Client) SubscribeToQueries

func (c *Client) SubscribeToQueries(ctx context.Context, channel, group string, errCh chan error) (<-chan *QueryReceive, error)

SubscribeToQueries - subscribe to queries requests by channel and group. return channel of QueryReceived or en error

func (*Client) SubscribeToQueriesWithRequest added in v1.5.0

func (c *Client) SubscribeToQueriesWithRequest(ctx context.Context, request *QueriesSubscription, errCh chan error) (<-chan *QueryReceive, error)

SubscribeToQueries - subscribe to queries requests by channel and group. return channel of QueryReceived or en error

type Command

type Command struct {
	Id       string
	Channel  string
	Metadata string
	Body     []byte
	Timeout  time.Duration
	ClientId string
	Tags     map[string]string
	// contains filtered or unexported fields
}

func NewCommand added in v1.4.0

func NewCommand() *Command

func (*Command) AddTag added in v1.2.0

func (c *Command) AddTag(key, value string) *Command

AddTag - add key value tags to command message

func (*Command) AddTrace added in v1.2.0

func (c *Command) AddTrace(name string) *Trace

AddTrace - add tracing support to command

func (*Command) Send

func (c *Command) Send(ctx context.Context) (*CommandResponse, error)

Send - sending command , waiting for response or timeout

func (*Command) SetBody

func (c *Command) SetBody(body []byte) *Command

SetBody - set command body - mandatory if metadata field is empty

func (*Command) SetChannel

func (c *Command) SetChannel(channel string) *Command

SetChannel - set command channel - mandatory if default channel was not set

func (*Command) SetClientId

func (c *Command) SetClientId(clientId string) *Command

SetClientId - set command ClientId - mandatory if default client was not set

func (*Command) SetId

func (c *Command) SetId(id string) *Command

SetId - set command requestId, otherwise new random uuid will be set

func (*Command) SetMetadata

func (c *Command) SetMetadata(metadata string) *Command

SetMetadata - set command metadata - mandatory if body field is empty

func (*Command) SetTags added in v1.4.1

func (c *Command) SetTags(tags map[string]string) *Command

SetTags - set key value tags to command message

func (*Command) SetTimeout

func (c *Command) SetTimeout(timeout time.Duration) *Command

SetTimeout - set timeout for command to be returned. if timeout expired , send command will result with an error

type CommandReceive

type CommandReceive struct {
	Id         string
	ClientId   string
	Channel    string
	Metadata   string
	Body       []byte
	ResponseTo string
	Tags       map[string]string
}

type CommandResponse

type CommandResponse struct {
	CommandId        string
	ResponseClientId string
	Executed         bool
	ExecutedAt       time.Time
	Error            string
	Tags             map[string]string
}

type CommandsClient added in v1.5.0

type CommandsClient struct {
	// contains filtered or unexported fields
}

CommandsClient represents a client that can be used to send commands to a server. It contains a reference to the underlying client that handles the communication.

func NewCommandsClient added in v1.5.0

func NewCommandsClient(ctx context.Context, op ...Option) (*CommandsClient, error)

NewCommandsClient creates a new instance of CommandsClient with the provided context and options. It returns the created CommandsClient instance and an error if any.

func (*CommandsClient) Close added in v1.5.0

func (c *CommandsClient) Close() error

Close closes the connection to the server by invoking the Close method of the underlying client. It returns an error if the close operation fails.

func (*CommandsClient) Create added in v1.8.0

func (c *CommandsClient) Create(ctx context.Context, channel string) error

Create sends a request to create a channel of type "commands" with the given channel name.

It returns an error if there was a failure in sending the create channel request, or if there was an error creating the channel.

func (*CommandsClient) Delete added in v1.8.0

func (c *CommandsClient) Delete(ctx context.Context, channel string) error

Delete deletes a channel from the commands client.

It sends a delete channel request to the KubeMQ server and returns an error if there is any. The function constructs a delete channel query, sets the required metadata and timeout, and makes the request through the client's query service. If the response contains an error message, it returns an error.

ctx: The context.Context object for the request. channel: The name of the channel to be deleted.

Returns: - nil if the channel was deleted successfully. - An error if the channel deletion failed.

func (*CommandsClient) List added in v1.8.0

func (c *CommandsClient) List(ctx context.Context, search string) ([]*common.CQChannel, error)

List returns a list of CQChannels that match the given search criteria. It uses the ListCQChannels function to retrieve the data and decode it into CQChannel objects. The search parameter is optional and can be used to filter the results. The function requires a context, a client, a client ID, a channel type, and a search string. It returns a slice of CQChannel objects and an error if any occurred.

func (*CommandsClient) Response added in v1.5.0

func (c *CommandsClient) Response(ctx context.Context, response *Response) error

Response sets the response object in the CommandsClient and sends the response using the client's transport.

This method requires the client to be initialized.

Parameters:

ctx: The context.Context object for the request.
response: The Response object to set and send.

Returns:

  • error: An error if the client is not ready or if sending the response fails.

func (*CommandsClient) Send added in v1.5.0

func (c *CommandsClient) Send(ctx context.Context, request *Command) (*CommandResponse, error)

Send sends a command using the provided context and command request. It checks if the client is ready to send the command. It sets the transport for the request and calls the Send method on the client to send the command. It returns the command response and any error that occurred during the process.

func (*CommandsClient) Subscribe added in v1.5.0

func (c *CommandsClient) Subscribe(ctx context.Context, request *CommandsSubscription, onCommandReceive func(cmd *CommandReceive, err error)) error

Subscribe starts a subscription to receive commands from the server. It takes a context and a CommandsSubscription request as input, along with a callback function onCommandReceive that will be invoked whenever a command is received or an error occurs. It returns an error if the client is not ready, if the callback function is nil, or if the request fails validation. The Subscribe method launches a goroutine that listens for incoming commands and triggers the callback function accordingly.

type CommandsSubscription added in v1.5.0

type CommandsSubscription struct {
	Channel  string
	Group    string
	ClientId string
}

CommandsSubscription represents a subscription to commands requests by channel and group. It contains the following fields:

  • Channel: the channel to subscribe to
  • Group: the group to subscribe to
  • ClientId: the ID of the client subscribing to the commands

Usage example:

commandsSubscription := &CommandsSubscription{
  Channel:  "channel",
  Group:    "group",
  ClientId: "clientID",
}
err := commandsSubscription.Validate()
if err != nil {
  // handle validation error
}
commandsCh, err := client.SubscribeToCommands(context.Background(), commandsSubscription, errCh)
if err != nil {
  // handle subscribe error
}
for command := range commandsCh {
  // handle received command
}

It also has the following methods:

Complete(opts *Options) *CommandsSubscription: completes the commands subscription with the given options Validate() error: validates the commands subscription, ensuring that it has a channel and client ID

func (*CommandsSubscription) Complete added in v1.5.0

func (cs *CommandsSubscription) Complete(opts *Options) *CommandsSubscription

Complete method sets the `ClientId` field of the `CommandsSubscription` struct if it is empty. It takes an `Options` object as a parameter, and uses the `clientId` field of the `Options` object to set the `ClientId` field of `CommandsSubscription` if it is empty. It returns a pointer to the modified `CommandsSubscription` object.

Example usage:

request := &CommandsSubscription{
    Channel: "my-channel",
    Group: "my-group",
}
options := &Options{
    clientId: "my-client-id",
}
request.Complete(options)
// Now the `ClientId` field of `request` will be set as "my-client-id" if it was empty.

func (*CommandsSubscription) Validate added in v1.5.0

func (cs *CommandsSubscription) Validate() error

Validate checks if a CommandsSubscription object has valid channel and clientId values. It returns an error if any of the required fields is empty. Otherwise, it returns nil.

type Event

type Event struct {
	Id       string
	Channel  string
	Metadata string
	Body     []byte
	ClientId string
	Tags     map[string]string
	// contains filtered or unexported fields
}

func NewEvent added in v1.4.0

func NewEvent() *Event

func (*Event) AddTag added in v1.2.0

func (e *Event) AddTag(key, value string) *Event

AddTag - add key value tags to event message

func (*Event) Send

func (e *Event) Send(ctx context.Context) error

func (*Event) SetBody

func (e *Event) SetBody(body []byte) *Event

SetBody - set event body - mandatory if metadata field was not set

func (*Event) SetChannel

func (e *Event) SetChannel(channel string) *Event

SetChannel - set event channel - mandatory if default channel was not set

func (*Event) SetClientId

func (e *Event) SetClientId(clientId string) *Event

SetClientId - set event ClientId - mandatory if default client was not set

func (*Event) SetId

func (e *Event) SetId(id string) *Event

SetId - set event id otherwise new random uuid will be set

func (*Event) SetMetadata

func (e *Event) SetMetadata(metadata string) *Event

SetMetadata - set event metadata - mandatory if body field was not set

func (*Event) SetTags added in v1.4.1

func (e *Event) SetTags(tags map[string]string) *Event

SetTags - set key value tags to event message

type EventStore

type EventStore struct {
	Id       string
	Channel  string
	Metadata string
	Body     []byte
	ClientId string
	Tags     map[string]string
	// contains filtered or unexported fields
}

func NewEventStore added in v1.4.0

func NewEventStore() *EventStore

func (*EventStore) AddTag added in v1.2.0

func (es *EventStore) AddTag(key, value string) *EventStore

AddTag - add key value tags to event store message

func (*EventStore) Send

func (es *EventStore) Send(ctx context.Context) (*EventStoreResult, error)

Send - sending events store message

func (*EventStore) SetBody

func (es *EventStore) SetBody(body []byte) *EventStore

SetBody - set event store body - mandatory if metadata field was not set

func (*EventStore) SetChannel

func (es *EventStore) SetChannel(channel string) *EventStore

SetChannel - set event store channel - mandatory if default channel was not set

func (*EventStore) SetClientId

func (es *EventStore) SetClientId(clientId string) *EventStore

SetClientId - set event store ClientId - mandatory if default client was not set

func (*EventStore) SetId

func (es *EventStore) SetId(id string) *EventStore

SetId - set event store id otherwise new random uuid will be set

func (*EventStore) SetMetadata

func (es *EventStore) SetMetadata(metadata string) *EventStore

SetMetadata - set event store metadata - mandatory if body field was not set

func (*EventStore) SetTags added in v1.4.1

func (es *EventStore) SetTags(tags map[string]string) *EventStore

SetTags - set key value tags to event store message

type EventStoreReceive

type EventStoreReceive struct {
	Id        string
	Sequence  uint64
	Timestamp time.Time
	Channel   string
	Metadata  string
	Body      []byte
	ClientId  string
	Tags      map[string]string
}

type EventStoreResult

type EventStoreResult struct {
	Id   string
	Sent bool
	Err  error
}

type EventsClient added in v1.5.0

type EventsClient struct {
	// contains filtered or unexported fields
}

EventsClient is a client for interacting with events. It encapsulates a client for making API requests.

func NewEventsClient added in v1.5.0

func NewEventsClient(ctx context.Context, op ...Option) (*EventsClient, error)

NewEventsClient creates an instance of EventsClient by calling NewClient and returning EventsClient{client}

Parameters: - ctx: The context.Context to be used in NewClient call. - op: Optional parameters of type Option to be passed to NewClient.

Returns a pointer to EventsClient and an error if NewClient call fails.

func (*EventsClient) Close added in v1.5.0

func (e *EventsClient) Close() error

Close closes the EventsClient by invoking the Close method on its underlying Client. It returns an error if there was a problem closing the client.

func (*EventsClient) Create added in v1.8.0

func (e *EventsClient) Create(ctx context.Context, channel string) error

Create creates a new event channel with the specified channel name. It sends a create-channel request to the KubeMQ server using the provided context and client. The channelType parameter specifies the type of the channel ('events' in this case). It returns an error if an error occurs during the creation of the channel.

func (*EventsClient) Delete added in v1.8.0

func (e *EventsClient) Delete(ctx context.Context, channel string) error

Delete deletes a channel from the events client. It sends a delete channel request with the specified channel ID and type to the client. Returns an error if the delete channel request fails or if there is an error deleting the channel.

Example usage:

err := eventsClient.Delete(ctx, "events.A")
if err != nil {
  log.Fatal(err)
}

func (*EventsClient) List added in v1.8.0

func (e *EventsClient) List(ctx context.Context, search string) ([]*common.PubSubChannel, error)

List retrieves a list of PubSubChannels based on the provided search string. It calls ListPubSubChannels function with the given context, EventsClient's client, client ID, channel type "events", and the search string. It returns a slice of PubSubChannel pointers and an error.

func (*EventsClient) Send added in v1.5.0

func (e *EventsClient) Send(ctx context.Context, message *Event) error

Check if the client is ready

func (*EventsClient) Stream added in v1.5.0

func (e *EventsClient) Stream(ctx context.Context, onError func(err error)) (func(msg *Event) error, error)

Stream sends events from client to server and receives events from server to client. It takes a context as input, which can be used to cancel the streaming process. It also takes an onError function callback, which will be called when an error occurs during the streaming process. The method returns a sendFunc function, which can be used to send events to the server, and an error, which will be non-nil if the client is not ready or if the onError callback is not provided. The sendFunc function takes an event message as input and returns an error. It sends the event to the server through a channel, and if the context is cancelled before the event is sent, it returns an error indicating that the context was cancelled during event message sending. The method starts two goroutines, one for sending events to the server and one for receiving events from the server. The sending goroutine sends events to the server by accepting events from the eventsCh channel. The receiving goroutine receives errors from the errCh channel and calls the onError callback for each error received. It also checks if the context is cancelled and stops the receiving goroutine if it is. The method returns the sendFunc function and a nil error.

func (*EventsClient) Subscribe added in v1.5.0

func (e *EventsClient) Subscribe(ctx context.Context, request *EventsSubscription, onEvent func(msg *Event, err error)) error

Subscribe subscribes to events using the provided EventsSubscription and callback function. It checks if the client is ready and if the callback function is provided. It validates the subscription request. It creates a channel for errors, subscribes to events with the request and initializes an events channel. It starts a goroutine to listen for events or errors and calls the callback function accordingly. If the context is canceled, it returns. It returns an error if any.

type EventsErrorsHandler added in v1.5.0

type EventsErrorsHandler func(error)

EventsErrorsHandler is a type representing a function that handles errors for events.

type EventsMessageHandler added in v1.5.0

type EventsMessageHandler func(*Event)

EventsMessageHandler is a function type that takes in a pointer to an Event object and does not return anything.

type EventsStoreClient added in v1.5.0

type EventsStoreClient struct {
	// contains filtered or unexported fields
}

EventsStoreClient is a struct that holds a client instance.

func NewEventsStoreClient added in v1.5.0

func NewEventsStoreClient(ctx context.Context, op ...Option) (*EventsStoreClient, error)

NewEventsStoreClient is a function that creates a new EventsStoreClient.

func (*EventsStoreClient) Close added in v1.5.0

func (es *EventsStoreClient) Close() error

Close is a method that closes the client connection.

func (*EventsStoreClient) Create added in v1.8.0

func (es *EventsStoreClient) Create(ctx context.Context, channel string) error

Create is a method that creates a new channel in the events store.

func (*EventsStoreClient) Delete added in v1.8.0

func (es *EventsStoreClient) Delete(ctx context.Context, channel string) error

Delete is a method that deletes a channel from the events store.

func (*EventsStoreClient) List added in v1.8.0

func (es *EventsStoreClient) List(ctx context.Context, search string) ([]*common.PubSubChannel, error)

List is a method that lists all channels in the events store.

func (*EventsStoreClient) Send added in v1.5.0

func (es *EventsStoreClient) Send(ctx context.Context, message *EventStore) (*EventStoreResult, error)

Send is a method that sends an event to the store.

func (*EventsStoreClient) Stream added in v1.5.0

func (es *EventsStoreClient) Stream(ctx context.Context, onResult func(result *EventStoreResult, err error)) (func(msg *EventStore) error, error)

Stream is a method that streams events from the store.

func (*EventsStoreClient) Subscribe added in v1.5.0

func (es *EventsStoreClient) Subscribe(ctx context.Context, request *EventsStoreSubscription, onEvent func(msg *EventStoreReceive, err error)) error

Subscribe is a method that subscribes to events from the store.

type EventsStoreSubscription added in v1.5.0

type EventsStoreSubscription struct {
	Channel          string
	Group            string
	ClientId         string
	SubscriptionType SubscriptionOption
}

EventsStoreSubscription is a struct that holds the subscription details.

func (*EventsStoreSubscription) Complete added in v1.5.0

Complete is a method that completes the subscription with the provided options.

func (*EventsStoreSubscription) Validate added in v1.5.0

func (es *EventsStoreSubscription) Validate() error

Validate is a method that validates the subscription details.

type EventsSubscription added in v1.5.0

type EventsSubscription struct {
	Channel  string
	Group    string
	ClientId string
}

EventsSubscription represents a subscription to events by channel and group.

func (*EventsSubscription) Complete added in v1.5.0

func (es *EventsSubscription) Complete(opts *Options) *EventsSubscription

Complete sets the ClientId of the EventsSubscription if it is empty. It takes an *Options argument to retrieve the clientId value. If the ClientId is already set in the EventsSubscription, it will not be overridden. It returns a pointer to the modified EventsSubscription.

func (*EventsSubscription) Validate added in v1.5.0

func (es *EventsSubscription) Validate() error

Validate checks if the EventsSubscription has a non-empty Channel and ClientId. If either of them is empty, it returns an error. Otherwise, it returns nil.

type Option

type Option interface {
	// contains filtered or unexported methods
}

func WithAddress

func WithAddress(host string, port int) Option

WithAddress - set host and port address of KubeMQ server

func WithAuthToken added in v1.3.2

func WithAuthToken(token string) Option

WithAuthToken - set KubeMQ JWT Auth token to be used for KubeMQ connection

func WithAutoReconnect added in v1.4.0

func WithAutoReconnect(value bool) Option

WithAutoReconnect - set automatic reconnection in case of lost connectivity to server

func WithCertificate added in v1.3.1

func WithCertificate(certData, serverOverrideDomain string) Option

WithCertificate - set secured TLS credentials from the input certificate data for client. serverNameOverride is for testing only. If set to a non empty string, it will override the virtual host name of authority (e.g. :authority header field) in requests.

func WithCheckConnection added in v1.4.0

func WithCheckConnection(value bool) Option

WithCheckConnection - set server connectivity on client create

func WithClientId

func WithClientId(id string) Option

WithClientId - set client id to be used in all functions call with this client - mandatory

func WithCredentials

func WithCredentials(certFile, serverOverrideDomain string) Option

WithCredentials - set secured TLS credentials from the input certificate file for client. serverNameOverride is for testing only. If set to a non empty string, it will override the virtual host name of authority (e.g. :authority header field) in requests.

func WithDefaultCacheTTL

func WithDefaultCacheTTL(ttl time.Duration) Option

WithDefaultCacheTTL - set default cache time to live for any query requests with any CacheKey set value

func WithDefaultChannel added in v1.3.1

func WithDefaultChannel(channel string) Option

WithDefaultChannel - set default channel for any outbound requests

func WithMaxReconnects added in v1.4.0

func WithMaxReconnects(value int) Option

WithMaxReconnects - set max reconnects before return error, default 0, never.

func WithReceiveBufferSize

func WithReceiveBufferSize(size int) Option

WithReceiveBufferSize - set length of buffered channel to be set in all subscriptions

func WithReconnectInterval added in v1.4.0

func WithReconnectInterval(duration time.Duration) Option

WithReconnectInterval - set reconnection interval duration, default is 5 seconds

func WithTransportType

func WithTransportType(transportType TransportType) Option

WithTransportType - set client transport type, currently GRPC or Rest

func WithUri added in v1.2.0

func WithUri(uri string) Option

WithUriAddress - set uri address of KubeMQ server

type Options

type Options struct {
	// contains filtered or unexported fields
}

func GetDefaultOptions

func GetDefaultOptions() *Options

func (*Options) Validate added in v1.2.0

func (o *Options) Validate() error

type QueriesClient added in v1.5.0

type QueriesClient struct {
	// contains filtered or unexported fields
}

QueriesClient represents a client for making queries to a server. It contains a reference to the underlying client that handles the communication with the server.

func NewQueriesClient added in v1.5.0

func NewQueriesClient(ctx context.Context, op ...Option) (*QueriesClient, error)

NewQueriesClient creates a new instance of QueriesClient by calling NewClient function and returning QueriesClient with the newly created Client instance. It receives a context and an optional list of options as arguments and returns a pointer to QueriesClient and an error.

func (*QueriesClient) Close added in v1.5.0

func (q *QueriesClient) Close() error

Close closes the QueriesClient's underlying client connection. This method returns an error if the client is not initialized.

func (*QueriesClient) Create added in v1.8.0

func (q *QueriesClient) Create(ctx context.Context, channel string) error

Create creates a new channel in the QueriesClient with the given channel name.

Parameters:

  • ctx (context.Context): The context for the request.
  • channel (string): The name of the channel to create.

Returns:

  • error: An error if the channel creation fails.

func (*QueriesClient) Delete added in v1.8.0

func (q *QueriesClient) Delete(ctx context.Context, channel string) error

Delete deletes a channel.

The method receives a context and the channel name to be deleted. It invokes the DeleteChannel function passing the received channel name as well as the clientId and channelType. DeleteChannel creates a new Query instance and sets the necessary properties such as the channel, clientId, metadata, tags, and timeout. It then calls the Send method of the client to send the delete channel request. If an error occurs during the request execution, it returns an error. If the response contains an error message, it returns an error. Otherwise, it returns nil, indicating the channel was successfully deleted.

func (*QueriesClient) List added in v1.8.0

func (q *QueriesClient) List(ctx context.Context, search string) ([]*common.CQChannel, error)

List retrieves a list of channels with the specified search criteria. It returns a slice of *common.CQChannel and an error. The search criteria is passed as a string parameter. The Channels are retrieved using the ListCQChannels function, passing the context, client, client ID, channel type, and search criteria. If an error occurs during the retrieval, it is returned. If the retrieval is successful, the data is decoded into a slice of *common.CQChannel using the DecodeCQChannelList function. The decoded data and any error are returned.

func (*QueriesClient) Response added in v1.5.0

func (q *QueriesClient) Response(ctx context.Context, response *Response) error

Response sends a response to a command or query request.

The response must have a corresponding requestId and response channel, which are set using SetRequestId and SetResponseTo methods, respectively. The requestId is mandatory, while the response channel is received from either CommandReceived or QueryReceived objects.

Additional optional attributes that can be set for the response include:

  • Metadata: SetMetadata should be used to set metadata for a query response only.
  • Body: SetBody should be used to set the body for a query response only.
  • Tags: SetTags can be used to set tags for the response.
  • ClientId: SetClientId can be used to set the clientId for the response. If not set, the default clientId will be used.
  • Error: SetError can be used to set an error when executing a command or query.
  • ExecutedAt: SetExecutedAt can be used to set the execution time for a command or query.
  • Trace: AddTrace can be used to add tracing support to the response.

Once all the necessary attributes are set, call the Send method to send the response.

Example:

resp := &Response{}
resp.SetRequestId("12345")
resp.SetResponseTo("channel-name")
resp.SetMetadata("metadata")
resp.SetBody([]byte("response-body"))
resp.SetTags(map[string]string{"tag1": "value1", "tag2": "value2"})
resp.SetClientId("client-id")
resp.SetError(errors.New("some error"))
resp.SetExecutedAt(time.Now())
resp.AddTrace("trace-name")

err := resp.Send(ctx)
if err != nil {
  log.Println("Failed to send response:", err)
}

func (*QueriesClient) Send added in v1.5.0

func (q *QueriesClient) Send(ctx context.Context, request *Query) (*QueryResponse, error)

Send sends a query request using the provided context. It checks if the client is ready, sets the transport from the client, and calls the Send method on the client with the request. It returns the query response and an error, if any.

The following fields in the request are required: - transport (set from the client)

Example usage:

request := &Query{
    Id:        "123",
    Channel:   "channel1",
    Metadata:  "metadata",
    Body:      []byte("query body"),
    Timeout:   time.Second,
    ClientId:  "client1",
    CacheKey:  "cacheKey",
    CacheTTL:  time.Minute,
    Tags:      map[string]string{"tag1": "value1"},
}
response, err := client.Send(ctx, request)

func (*QueriesClient) Subscribe added in v1.5.0

func (q *QueriesClient) Subscribe(ctx context.Context, request *QueriesSubscription, onQueryReceive func(query *QueryReceive, err error)) error

Subscribe is a method of QueriesClient that allows a client to subscribe to queries. It takes a context, a QueriesSubscription request, and a callback function onQueryReceive. The context is used for cancellation, timing out, and passing values between middleware. The QueriesSubscription defines the channel, group, and clientId for the subscription. The onQueryReceive callback function will be called when a query is received or an error occurs. The method returns an error if the client is not ready, if the onQueryReceive function is nil, or if the QueriesSubscription request is invalid. The method creates a channel for receiving errors, subscribes to queries with the given request, and starts a goroutine that continuously listens for new queries or errors on the channel. When a query is received, it is passed to the onQueryReceive function with a nil error. When an error is received, it is passed to the onQueryReceive function with a nil query. If the context is canceled, the goroutine returns. The method returns with nil if the subscription is successfully set up.

type QueriesSubscription added in v1.5.0

type QueriesSubscription struct {
	Channel  string
	Group    string
	ClientId string
}

QueriesSubscription represents a subscription to queries requests by channel and group

func (*QueriesSubscription) Complete added in v1.5.0

func (qs *QueriesSubscription) Complete(opts *Options) *QueriesSubscription

Complete updates the ClientId of the QueriesSubscription if it is empty, using the clientId value from the provided Options. It returns a pointer to the modified QueriesSubscription.

func (*QueriesSubscription) Validate added in v1.5.0

func (qs *QueriesSubscription) Validate() error

Validate checks if a queries subscription is valid. It returns an error if any of the required fields are empty.

type Query

type Query struct {
	Id       string
	Channel  string
	Metadata string
	Body     []byte
	Timeout  time.Duration
	ClientId string
	CacheKey string
	CacheTTL time.Duration
	Tags     map[string]string
	// contains filtered or unexported fields
}

func NewQuery added in v1.4.0

func NewQuery() *Query

func (*Query) AddTag added in v1.2.0

func (q *Query) AddTag(key, value string) *Query

AddTag - add key value tags to query message

func (*Query) AddTrace added in v1.2.0

func (q *Query) AddTrace(name string) *Trace

AddTrace - add tracing support to query

func (*Query) Send

func (q *Query) Send(ctx context.Context) (*QueryResponse, error)

Send - sending query request , waiting for response or timeout

func (*Query) SetBody

func (q *Query) SetBody(body []byte) *Query

SetBody - set query body - mandatory if metadata field is empty

func (*Query) SetCacheKey

func (q *Query) SetCacheKey(cacheKey string) *Query

SetCacheKey - set cache key to retrieve already stored query response, otherwise the response for this query will be stored in cache for future query requests

func (*Query) SetCacheTTL

func (q *Query) SetCacheTTL(ttl time.Duration) *Query

SetCacheTTL - set cache time to live for the this query cache key response to be retrieved already stored query response, if not set default cacheTTL will be set

func (*Query) SetChannel

func (q *Query) SetChannel(channel string) *Query

SetChannel - set query channel - mandatory if default channel was not set

func (*Query) SetClientId

func (q *Query) SetClientId(clientId string) *Query

SetClientId - set query ClientId - mandatory if default client was not set

func (*Query) SetId

func (q *Query) SetId(id string) *Query

SetId - set query requestId, otherwise new random uuid will be set

func (*Query) SetMetadata

func (q *Query) SetMetadata(metadata string) *Query

SetMetadata - set query metadata - mandatory if body field is empty

func (*Query) SetTags added in v1.4.1

func (q *Query) SetTags(tags map[string]string) *Query

SetTags - set key value tags to query message

func (*Query) SetTimeout

func (q *Query) SetTimeout(timeout time.Duration) *Query

SetTimeout - set timeout for query to be returned. if timeout expired , send query will result with an error

type QueryReceive

type QueryReceive struct {
	Id         string
	Channel    string
	ClientId   string
	Metadata   string
	Body       []byte
	ResponseTo string
	Tags       map[string]string
}

type QueryResponse

type QueryResponse struct {
	QueryId          string
	Executed         bool
	ExecutedAt       time.Time
	Metadata         string
	ResponseClientId string
	Body             []byte
	CacheHit         bool
	Error            string
	Tags             map[string]string
}

type QueueInfo added in v1.7.0

type QueueInfo struct {
	Name          string `json:"name"`
	Messages      int64  `json:"messages"`
	Bytes         int64  `json:"bytes"`
	FirstSequence int64  `json:"first_sequence"`
	LastSequence  int64  `json:"last_sequence"`
	Sent          int64  `json:"sent"`
	Subscribers   int    `json:"subscribers"`
	Waiting       int64  `json:"waiting"`
	Delivered     int64  `json:"delivered"`
}

type QueueMessage added in v1.2.0

type QueueMessage struct {
	*pb.QueueMessage
	// contains filtered or unexported fields
}

func NewQueueMessage added in v1.4.0

func NewQueueMessage() *QueueMessage

func (*QueueMessage) Ack added in v1.2.0

func (qm *QueueMessage) Ack() error

ack - sending ack queue message in stream queue message mode

func (*QueueMessage) AddTag added in v1.2.0

func (qm *QueueMessage) AddTag(key, value string) *QueueMessage

AddTag - add key value tags to query message

func (*QueueMessage) AddTrace added in v1.2.0

func (qm *QueueMessage) AddTrace(name string) *Trace

AddTrace - add tracing support to queue message

func (*QueueMessage) ExtendVisibility added in v1.2.0

func (qm *QueueMessage) ExtendVisibility(value int32) error

ExtendVisibility - extend the visibility time for the current receive message

func (*QueueMessage) Reject added in v1.2.0

func (qm *QueueMessage) Reject() error

reject - sending reject queue message in stream queue message mode

func (*QueueMessage) Resend added in v1.2.0

func (qm *QueueMessage) Resend(channel string) error

Resend - sending resend

func (*QueueMessage) Send added in v1.2.0

Send - sending queue message request , waiting for response or timeout

func (*QueueMessage) SetBody added in v1.2.0

func (qm *QueueMessage) SetBody(body []byte) *QueueMessage

SetBody - set queue message body - mandatory if metadata field is empty

func (*QueueMessage) SetChannel added in v1.2.0

func (qm *QueueMessage) SetChannel(channel string) *QueueMessage

SetChannel - set queue message channel - mandatory if default channel was not set

func (*QueueMessage) SetClientId added in v1.2.0

func (qm *QueueMessage) SetClientId(clientId string) *QueueMessage

SetClientId - set queue message ClientId - mandatory if default client was not set

func (*QueueMessage) SetId added in v1.2.0

func (qm *QueueMessage) SetId(id string) *QueueMessage

SetId - set queue message id, otherwise new random uuid will be set

func (*QueueMessage) SetMetadata added in v1.2.0

func (qm *QueueMessage) SetMetadata(metadata string) *QueueMessage

SetMetadata - set queue message metadata - mandatory if body field is empty

func (*QueueMessage) SetPolicyDelaySeconds added in v1.2.0

func (qm *QueueMessage) SetPolicyDelaySeconds(sec int) *QueueMessage

SetPolicyDelaySeconds - set queue message delivery delay in seconds, 0 , immediate delivery

func (*QueueMessage) SetPolicyExpirationSeconds added in v1.2.0

func (qm *QueueMessage) SetPolicyExpirationSeconds(sec int) *QueueMessage

SetPolicyExpirationSeconds - set queue message expiration seconds, 0 never expires

func (*QueueMessage) SetPolicyMaxReceiveCount added in v1.2.0

func (qm *QueueMessage) SetPolicyMaxReceiveCount(max int) *QueueMessage

SetPolicyMaxReceiveCount - set max delivery attempts before message will discard or re-route to a new queue

func (*QueueMessage) SetPolicyMaxReceiveQueue added in v1.2.0

func (qm *QueueMessage) SetPolicyMaxReceiveQueue(channel string) *QueueMessage

SetPolicyMaxReceiveQueue - set queue name to be routed once MaxReceiveCount is triggered, empty will discard the message

func (*QueueMessage) SetTags added in v1.4.1

func (qm *QueueMessage) SetTags(tags map[string]string) *QueueMessage

SetTags - set key value tags to queue message

type QueueMessageAttributes added in v1.2.0

type QueueMessageAttributes struct {
	Timestamp         int64
	Sequence          uint64
	MD5OfBody         string
	ReceiveCount      int32
	ReRouted          bool
	ReRoutedFromQueue string
	ExpirationAt      int64
	DelayedTo         int64
}

type QueueMessagePolicy added in v1.2.0

type QueueMessagePolicy struct {
	ExpirationSeconds int32
	DelaySeconds      int32
	MaxReceiveCount   int32
	MaxReceiveQueue   string
}

type QueueMessages added in v1.2.0

type QueueMessages struct {
	Messages []*QueueMessage
	// contains filtered or unexported fields
}

func (*QueueMessages) Add added in v1.2.0

func (qma *QueueMessages) Add(msg *QueueMessage) *QueueMessages

Add - adding new queue message to array of messages

func (*QueueMessages) Send added in v1.2.0

Send - sending queue messages array request , waiting for response or timeout

type QueueTransactionMessageRequest added in v1.5.0

type QueueTransactionMessageRequest struct {
	RequestID         string
	ClientID          string
	Channel           string
	VisibilitySeconds int32
	WaitTimeSeconds   int32
}

QueueTransactionMessageRequest represents a request to enqueue a transaction message on a queue. It contains the following fields: - RequestID: The ID of the request. - ClientID: The ID of the client. - Channel: The channel to enqueue the message on. - VisibilitySeconds: The number of seconds for which the message will be hidden from other clients. - WaitTimeSeconds: The number of seconds to wait for a message to be received from the queue.

It has the following methods: - SetId: Set the request ID. - SetClientId: Set the client ID. - SetChannel: Set the channel. - SetWaitTimeSeconds: Set the wait time in seconds. - SetVisibilitySeconds: Set the visibility time in seconds. - Complete: Complete the request using the specified options. - Validate: Validate that the request is valid.

Usage example:

req := NewQueueTransactionMessageRequest().
	SetId("123").
	SetClientId("456").
	SetChannel("channel").
	SetWaitTimeSeconds(60).
	SetVisibilitySeconds(30)

err := req.Validate()
if err != nil {
	fmt.Println(err)
	return
}

client := NewClient()
resp, err := client.Transaction(ctx, req)
if err != nil {
	fmt.Println(err)
	return
}

fmt.Println(resp)

func NewQueueTransactionMessageRequest added in v1.5.0

func NewQueueTransactionMessageRequest() *QueueTransactionMessageRequest

NewQueueTransactionMessageRequest - create a new instance of QueueTransactionMessageRequest

func (*QueueTransactionMessageRequest) Complete added in v1.5.0

Complete sets the ClientID field of the QueueTransactionMessageRequest to the value of opts.clientId if the ClientID field is empty. It returns the modified QueueTransactionMessageRequest.

func (*QueueTransactionMessageRequest) SetChannel added in v1.5.0

SetChannel - set receive queue transaction message request channel - mandatory if default channel was not set

func (*QueueTransactionMessageRequest) SetClientId added in v1.5.0

SetClientId - set receive queue transaction message request ClientId - mandatory if default client was not set

func (*QueueTransactionMessageRequest) SetId added in v1.5.0

SetId - set receive queue transaction message request id, otherwise new random uuid will be set

func (*QueueTransactionMessageRequest) SetVisibilitySeconds added in v1.5.0

func (req *QueueTransactionMessageRequest) SetVisibilitySeconds(visibility int) *QueueTransactionMessageRequest

SetVisibilitySeconds - set receive queue transaction message visibility seconds for hiding message from other clients during processing. It takes an integer argument 'visibility' as the number of seconds. It sets the visibility seconds of the request to the given value. The updated QueueTransactionMessageRequest is returned.

func (*QueueTransactionMessageRequest) SetWaitTimeSeconds added in v1.5.0

func (req *QueueTransactionMessageRequest) SetWaitTimeSeconds(wait int) *QueueTransactionMessageRequest

SetWaitTimeSeconds - set receive queue transaction message request wait timout for receiving queue message.

func (*QueueTransactionMessageRequest) Validate added in v1.5.0

func (req *QueueTransactionMessageRequest) Validate() error

Validate checks if the QueueTransactionMessageRequest is valid. It returns an error if any of the mandatory fields are empty or if any of the numeric fields are less than or equal to zero.

type QueueTransactionMessageResponse added in v1.5.0

type QueueTransactionMessageResponse struct {
	Message *QueueMessage
	// contains filtered or unexported fields
}

QueueTransactionMessageResponse represents the response returned from `Transaction` method of QueuesClient. It contains the client instance, the stream used for communication, and the Message received. It can be used to interact with the response of a transaction operation on a queue message.

func (*QueueTransactionMessageResponse) Ack added in v1.5.0

Ack sends an acknowledgment for the received message by sending the ACK request to the server. It returns an error if there is an issue with sending the ACK request or closing the stream connection.

func (*QueueTransactionMessageResponse) ExtendVisibilitySeconds added in v1.5.0

func (qt *QueueTransactionMessageResponse) ExtendVisibilitySeconds(value int) error

func (*QueueTransactionMessageResponse) Reject added in v1.5.0

Reject - rejects the queue transaction message by sending a StreamQueueMessagesRequest with StreamRequestType_RejectMessage to the Kubemq server. If an error occurs during the transaction, it will be returned. Otherwise, it will close the stream connection by calling CloseSend() on the QueueTransactionMessageResponse stream.

func (*QueueTransactionMessageResponse) Resend added in v1.5.0

func (qt *QueueTransactionMessageResponse) Resend(channel string) error

func (*QueueTransactionMessageResponse) ResendNewMessage added in v1.5.0

func (qt *QueueTransactionMessageResponse) ResendNewMessage(msg *QueueMessage) error

ResendNewMessage - resends a modified message with a new QueueMessage to the same channel as the original message.

type QueuesClient added in v1.5.0

type QueuesClient struct {
	// contains filtered or unexported fields
}

QueuesClient is a client for managing queues in a messaging system.

func NewQueuesClient added in v1.5.0

func NewQueuesClient(ctx context.Context, op ...Option) (*QueuesClient, error)

NewQueuesClient - create a new client for interacting with KubeMQ Queues Parameters:

  • ctx: the context.Context used for the client
  • op...: options to configure the client

Returns:

  • *QueuesClient: the created Queues client instance
  • error: any error that occurred during the creation of the client

func NewQueuesStreamClient added in v1.7.1

func NewQueuesStreamClient(ctx context.Context, op ...Option) (*QueuesClient, error)

func (*QueuesClient) AckAll added in v1.5.0

func (*QueuesClient) Batch added in v1.5.0

func (q *QueuesClient) Batch(ctx context.Context, messages []*QueueMessage) ([]*SendQueueMessageResult, error)

Batch sends a batch of queue messages in a single request. It creates a new QueueMessages request with the given messages and sets the transport to the client's transport. It then calls the Send method of the QueueMessages request with the provided context and returns the result.

func (*QueuesClient) Close added in v1.5.0

func (q *QueuesClient) Close() error

func (*QueuesClient) Create added in v1.8.0

func (q *QueuesClient) Create(ctx context.Context, channel string) error

Create - creates a new channel with the specified name in the 'queues' category. The channelType is set to "queues". It sends a create channel request to the KubeMQ server using the provided client, clientId, channel, and channelType. It returns an error if the request fails or if an error occurs during channel creation. Example:

err := queuesClient.Create(ctx, "queues.A")
if err != nil {
   log.Fatal(err)
}

It is recommended to defer the closing of the queues client using `defer queuesClient.Close()`

func (*QueuesClient) Delete added in v1.8.0

func (q *QueuesClient) Delete(ctx context.Context, channel string) error

Delete deletes a channel by sending a delete channel request to the KubeMQ server. It takes a context, a client, a client ID, a channel, and a channel type as parameters. The channel is the name of the channel to be deleted, and the channel type is the type of the channel. It returns an error if there was a problem sending the delete channel request or if there was an error deleting the channel. Example usage:

if err := queuesClient.Delete(ctx, "queues.A"); err != nil {
    log.Fatal(err)
}

func (*QueuesClient) List added in v1.8.0

func (q *QueuesClient) List(ctx context.Context, search string) ([]*common.QueuesChannel, error)

func (*QueuesClient) Peek added in v1.5.0

Peek - retrieve messages from the queue without removing them, using the provided ReceiveQueueMessagesRequest.

func (*QueuesClient) Pull added in v1.5.0

Pull - Pulls messages from a queue using the provided request parameters. The request parameters include the channel, maximum number of messages to receive, wait time, and whether to peak or dequeue the queue. The method sets the transport field of the request to the QueuesClient's transport, sets the IsPeak field of the request to false, completes and validates the request, and sends the request using the provided context. It returns the response received from the request or an error if any.

func (*QueuesClient) QueuesInfo added in v1.7.0

func (q *QueuesClient) QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)

func (*QueuesClient) Send added in v1.5.0

Send sends a queue message using the specified context and message. The transport field of the message is set to the transport of the QueuesClient. Returns the result of the SendQueueMessage method of the client, with the provided context.

ctx - The context to use for the send operation. message - The queue message to send.

func (*QueuesClient) Subscribe added in v1.5.0

func (q *QueuesClient) Subscribe(ctx context.Context, request *ReceiveQueueMessagesRequest, onReceive func(response *ReceiveQueueMessagesResponse, err error)) (chan struct{}, error)

Subscribe receives queue messages and executes the provided callback function onReceive when messages are received. It takes a context.Context object ctx, a *ReceiveQueueMessagesRequest object request, and a callback function onReceive that accepts a *ReceiveQueueMessagesResponse object and an error object. It returns a channel of type struct{} and an error. The channel can be used to signal the completion of the subscription and stop the subscription process. The function spawns a goroutine that continuously sends requests to receive queue messages until the subscription is stopped or the context is canceled. When a response is received, the onReceive callback is executed with the response and any error as arguments. If an error occurs during the request or response processing, the onReceive callback is executed with a nil response and the corresponding error.

func (*QueuesClient) Transaction added in v1.5.0

Transaction - allows to receive a single message from a queue using transactional processing. The method sends a request to the server through a gRPC stream. If the gRPC raw client fails to connect, an error is returned. If the request fails to complete or validate, an error is returned. If the gRPC client fails to stream the queue message, an error is returned. If the request has an empty ClientID, it is assigned the default client ID from the client options. The request is sent to the gRPC stream using a StreamQueueMessagesRequest. If an error occurs while sending the request, an error is returned. The response is received from the gRPC stream using a ReceiveQueueMessagesResponse. If an error occurs while receiving the response, an error is returned. If the response has an error, an error is returned. If the response message is not in error, a QueueTransactionMessageResponse is returned, including the client, stream, and the QueueMessage. The method returns a pointer to the QueueTransactionMessageResponse and an error.

func (*QueuesClient) TransactionStream added in v1.5.0

func (q *QueuesClient) TransactionStream(ctx context.Context, request *QueueTransactionMessageRequest, onReceive func(response *QueueTransactionMessageResponse, err error)) (chan struct{}, error)

TransactionStream - opens a transaction stream for receiving queue transaction messages. The provided `onReceive` callback function will be called for each received queue transaction message in the stream. The function should have the following signature:

func(response *QueueTransactionMessageResponse, err error)

The `response` parameter will contain the received queue transaction message, or nil if an error occurred. The `err` parameter will contain any error that occurred during the receiving process. If the `onReceive` callback function is nil, an error will be returned. The `request` parameter represents the queue transaction message request. The request must be completed and validated before calling this method. The function returns a `done` channel that can be used to stop the transaction stream, and an error if one occurred.

type QueuesInfo added in v1.7.0

type QueuesInfo struct {
	TotalQueues int          `json:"total_queues"`
	Sent        int64        `json:"sent"`
	Waiting     int64        `json:"waiting"`
	Delivered   int64        `json:"delivered"`
	Queues      []*QueueInfo `json:"queues"`
}

type ReceiveQueueMessagesRequest added in v1.2.0

type ReceiveQueueMessagesRequest struct {
	RequestID           string
	ClientID            string
	Channel             string
	MaxNumberOfMessages int32
	WaitTimeSeconds     int32
	IsPeak              bool
	// contains filtered or unexported fields
}

func NewReceiveQueueMessagesRequest added in v1.5.0

func NewReceiveQueueMessagesRequest() *ReceiveQueueMessagesRequest

func (*ReceiveQueueMessagesRequest) AddTrace added in v1.2.0

func (req *ReceiveQueueMessagesRequest) AddTrace(name string) *Trace

AddTrace - add tracing support to receive queue message request

func (*ReceiveQueueMessagesRequest) Complete added in v1.5.0

func (*ReceiveQueueMessagesRequest) Send added in v1.2.0

Send - sending receive queue messages request , waiting for response or timeout

func (*ReceiveQueueMessagesRequest) SetChannel added in v1.2.0

SetChannel - set receive queue message request channel - mandatory if default channel was not set

func (*ReceiveQueueMessagesRequest) SetClientId added in v1.2.0

SetClientId - set receive queue message request ClientId - mandatory if default client was not set

func (*ReceiveQueueMessagesRequest) SetId added in v1.2.0

SetId - set receive queue message request id, otherwise new random uuid will be set

func (*ReceiveQueueMessagesRequest) SetIsPeak added in v1.2.0

SetIsPeak - set receive queue message request type, true - peaking at the queue and not actual dequeue , false - dequeue the queue

func (*ReceiveQueueMessagesRequest) SetMaxNumberOfMessages added in v1.2.0

func (req *ReceiveQueueMessagesRequest) SetMaxNumberOfMessages(max int) *ReceiveQueueMessagesRequest

SetMaxNumberOfMessages - set receive queue message request max number of messages to receive in single call

func (*ReceiveQueueMessagesRequest) SetWaitTimeSeconds added in v1.2.0

func (req *ReceiveQueueMessagesRequest) SetWaitTimeSeconds(wait int) *ReceiveQueueMessagesRequest

SetWaitTimeSeconds - set receive queue message request wait timout for receiving all requested messages

func (*ReceiveQueueMessagesRequest) Validate added in v1.5.0

func (req *ReceiveQueueMessagesRequest) Validate() error

type ReceiveQueueMessagesResponse added in v1.2.0

type ReceiveQueueMessagesResponse struct {
	RequestID        string
	Messages         []*QueueMessage
	MessagesReceived int32
	MessagesExpired  int32
	IsPeak           bool
	IsError          bool
	Error            string
}

type Response

type Response struct {
	RequestId  string
	ResponseTo string
	Metadata   string
	Body       []byte
	ClientId   string
	ExecutedAt time.Time
	Err        error
	Tags       map[string]string
	// contains filtered or unexported fields
}

func NewResponse added in v1.4.0

func NewResponse() *Response

func (*Response) AddTrace added in v1.2.0

func (r *Response) AddTrace(name string) *Trace

AddTrace - add tracing support to response

func (*Response) Send

func (r *Response) Send(ctx context.Context) error

Send - sending response to command or query request

func (*Response) SetBody

func (r *Response) SetBody(body []byte) *Response

SetMetadata - set body response, for query only

func (*Response) SetClientId

func (r *Response) SetClientId(clientId string) *Response

SetClientID - set clientId response, if not set default clientId will be used

func (*Response) SetError

func (r *Response) SetError(err error) *Response

SetError - set query or command execution error

func (*Response) SetExecutedAt

func (r *Response) SetExecutedAt(executedAt time.Time) *Response

SetExecutedAt - set query or command execution time

func (*Response) SetMetadata

func (r *Response) SetMetadata(metadata string) *Response

SetMetadata - set metadata response, for query only

func (*Response) SetRequestId

func (r *Response) SetRequestId(id string) *Response

SetId - set response corresponded requestId - mandatory

func (*Response) SetResponseTo

func (r *Response) SetResponseTo(channel string) *Response

SetResponseTo - set response channel as received in CommandReceived or QueryReceived object - mandatory

func (*Response) SetTags added in v1.2.0

func (r *Response) SetTags(tags map[string]string) *Response

SetTags - set response tags

type SendQueueMessageResult added in v1.2.0

type SendQueueMessageResult struct {
	MessageID    string
	SentAt       int64
	ExpirationAt int64
	DelayedTo    int64
	IsError      bool
	Error        string
}

type ServerInfo added in v1.2.0

type ServerInfo struct {
	Host                string
	Version             string
	ServerStartTime     int64
	ServerUpTimeSeconds int64
}

type StreamQueueMessage added in v1.2.0

type StreamQueueMessage struct {
	RequestID string
	ClientID  string
	Channel   string
	// contains filtered or unexported fields
}

func (*StreamQueueMessage) AddTrace added in v1.2.0

func (req *StreamQueueMessage) AddTrace(name string) *Trace

AddTrace - add tracing support to stream receive queue message request

func (*StreamQueueMessage) Close added in v1.2.0

func (req *StreamQueueMessage) Close()

Close - end stream of queue messages and cancel all pending operations

func (*StreamQueueMessage) Next added in v1.2.0

func (req *StreamQueueMessage) Next(ctx context.Context, visibility, wait int32) (*QueueMessage, error)

Next - receive queue messages request , waiting for response or timeout

func (*StreamQueueMessage) ResendWithNewMessage added in v1.2.0

func (req *StreamQueueMessage) ResendWithNewMessage(msg *QueueMessage) error

ResendWithNewMessage - resend the current received message to a new channel

func (*StreamQueueMessage) SetChannel added in v1.2.0

func (req *StreamQueueMessage) SetChannel(channel string) *StreamQueueMessage

SetChannel - set stream queue message request channel - mandatory if default channel was not set

func (*StreamQueueMessage) SetClientId added in v1.2.0

func (req *StreamQueueMessage) SetClientId(clientId string) *StreamQueueMessage

SetClientId - set stream queue message request ClientId - mandatory if default client was not set

func (*StreamQueueMessage) SetId added in v1.2.0

SetId - set stream queue message request id, otherwise new random uuid will be set

type SubscriptionOption

type SubscriptionOption interface {
	// contains filtered or unexported methods
}

func StartFromFirstEvent

func StartFromFirstEvent() SubscriptionOption

StartFromFirstEvent - replay all the stored events from the first available sequence and continue stream new events from this point

func StartFromLastEvent

func StartFromLastEvent() SubscriptionOption

StartFromLastEvent - replay last event and continue stream new events from this point

func StartFromNewEvents

func StartFromNewEvents() SubscriptionOption

StartFromNewEvents - start event store subscription with only new events

func StartFromSequence

func StartFromSequence(sequence int) SubscriptionOption

StartFromSequence - replay events from specific event sequence number and continue stream new events from this point

func StartFromTime

func StartFromTime(since time.Time) SubscriptionOption

StartFromTime - replay events from specific time continue stream new events from this point

func StartFromTimeDelta

func StartFromTimeDelta(delta time.Duration) SubscriptionOption

StartFromTimeDelta - replay events from specific current time - delta duration in seconds, continue stream new events from this point

type Trace added in v1.2.0

type Trace struct {
	Name string
	// contains filtered or unexported fields
}

func CreateTrace added in v1.2.0

func CreateTrace(name string) *Trace

func (*Trace) AddAnnotation added in v1.2.0

func (t *Trace) AddAnnotation(timestamp time.Time, message string) *Trace

func (*Trace) AddBoolAttribute added in v1.2.0

func (t *Trace) AddBoolAttribute(key string, value bool) *Trace

func (*Trace) AddInt64Attribute added in v1.2.0

func (t *Trace) AddInt64Attribute(key string, value int64) *Trace

func (*Trace) AddStringAttribute added in v1.2.0

func (t *Trace) AddStringAttribute(key string, value string) *Trace

type Transport

type Transport interface {
	Ping(ctx context.Context) (*ServerInfo, error)
	SendEvent(ctx context.Context, event *Event) error
	StreamEvents(ctx context.Context, eventsCh chan *Event, errCh chan error)
	SubscribeToEvents(ctx context.Context, request *EventsSubscription, errCh chan error) (<-chan *Event, error)
	SendEventStore(ctx context.Context, eventStore *EventStore) (*EventStoreResult, error)
	StreamEventsStore(ctx context.Context, eventsCh chan *EventStore, eventsResultCh chan *EventStoreResult, errCh chan error)
	SubscribeToEventsStore(ctx context.Context, request *EventsStoreSubscription, errCh chan error) (<-chan *EventStoreReceive, error)
	SendCommand(ctx context.Context, command *Command) (*CommandResponse, error)
	SubscribeToCommands(ctx context.Context, request *CommandsSubscription, errCh chan error) (<-chan *CommandReceive, error)
	SendQuery(ctx context.Context, query *Query) (*QueryResponse, error)
	SubscribeToQueries(ctx context.Context, request *QueriesSubscription, errCh chan error) (<-chan *QueryReceive, error)
	SendResponse(ctx context.Context, response *Response) error
	SendQueueMessage(ctx context.Context, msg *QueueMessage) (*SendQueueMessageResult, error)
	SendQueueMessages(ctx context.Context, msg []*QueueMessage) ([]*SendQueueMessageResult, error)
	ReceiveQueueMessages(ctx context.Context, req *ReceiveQueueMessagesRequest) (*ReceiveQueueMessagesResponse, error)
	AckAllQueueMessages(ctx context.Context, req *AckAllQueueMessagesRequest) (*AckAllQueueMessagesResponse, error)
	StreamQueueMessage(ctx context.Context, reqCh chan *pb.StreamQueueMessagesRequest, resCh chan *pb.StreamQueueMessagesResponse, errCh chan error, doneCh chan bool)
	QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)
	GetGRPCRawClient() (pb.KubemqClient, error)
	Close() error
}

type TransportType

type TransportType int
const (
	TransportTypeGRPC TransportType = iota
	TransportTypeRest
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL