kubemq

package module
v1.7.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2022 License: Apache-2.0 Imports: 19 Imported by: 0

README

KubeMQ Go SDK

KubeMQ is an enterprise-grade message queue and broker for containers, designed for any workload and architecture running in Kubernetes. This library is Go implementation of KubeMQ client connection.

Install KubeMQ Community Edition

Please visit KubeMQ Community for intallation steps.

Install KubeMQ Go SDK

go get github.com/kubemq-io/kubemq-go

Learn KubeMQ

Visit our Extensive KubeMQ Documentation.

Examples - Cookbook Recipes

Please visit our cookbook repository

Support

if you encounter any issues, please open an issue here, In addition, you can reach us for support by:

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoTransportDefined    = errors.New("no transport layer defined, create object with client instance")
	ErrNoTransportConnection = errors.New("no transport layer established, aborting")
)

Functions

This section is empty.

Types

type AckAllQueueMessagesRequest

type AckAllQueueMessagesRequest struct {
	RequestID       string
	ClientID        string
	Channel         string
	WaitTimeSeconds int32
	// contains filtered or unexported fields
}

func (*AckAllQueueMessagesRequest) AddTrace

func (req *AckAllQueueMessagesRequest) AddTrace(name string) *Trace

AddTrace - add tracing support to ack all receive queue message request

func (*AckAllQueueMessagesRequest) Complete

func (*AckAllQueueMessagesRequest) Send

Send - sending receive queue messages request , waiting for response or timeout

func (*AckAllQueueMessagesRequest) SetChannel

SetChannel - set ack all queue message request channel - mandatory if default channel was not set

func (*AckAllQueueMessagesRequest) SetClientId

func (req *AckAllQueueMessagesRequest) SetClientId(clientId string) *AckAllQueueMessagesRequest

SetClientId - set ack all queue message request ClientId - mandatory if default client was not set

func (*AckAllQueueMessagesRequest) SetId

SetId - set ack all queue message request id, otherwise new random uuid will be set

func (*AckAllQueueMessagesRequest) SetWaitTimeSeconds

func (req *AckAllQueueMessagesRequest) SetWaitTimeSeconds(wait int) *AckAllQueueMessagesRequest

SetWaitTimeSeconds - set ack all queue message request wait timout

func (*AckAllQueueMessagesRequest) Validate

func (req *AckAllQueueMessagesRequest) Validate() error

type AckAllQueueMessagesResponse

type AckAllQueueMessagesResponse struct {
	RequestID        string
	AffectedMessages uint64
	IsError          bool
	Error            string
}

type Client

type Client struct {
	ServerInfo *ServerInfo
	// contains filtered or unexported fields
}

func NewClient

func NewClient(ctx context.Context, op ...Option) (*Client, error)

NewClient - create client instance to be use to communicate with KubeMQ server

func (*Client) AQM

AQM - create an empty ack all receive queue messages request object

func (*Client) AckAllQueueMessages

func (c *Client) AckAllQueueMessages(ctx context.Context, req *AckAllQueueMessagesRequest) (*AckAllQueueMessagesResponse, error)

AckAllQueueMessages - send ack all messages in queue

func (*Client) C

func (c *Client) C() *Command

C - create an empty command object

func (*Client) Close

func (c *Client) Close() error

Close - closing client connection. any on going transactions will be aborted

func (*Client) E

func (c *Client) E() *Event

E - create an empty event object

func (*Client) ES

func (c *Client) ES() *EventStore

ES - create an empty event store object

func (*Client) NewAckAllQueueMessagesRequest

func (c *Client) NewAckAllQueueMessagesRequest() *AckAllQueueMessagesRequest

NewAckAllQueueMessagesRequest - create an empty ack all receive queue messages request object

func (*Client) NewCommand

func (c *Client) NewCommand() *Command

NewCommand - create an empty command

func (*Client) NewEvent

func (c *Client) NewEvent() *Event

NewEvent - create an empty event

func (*Client) NewEventStore

func (c *Client) NewEventStore() *EventStore

NewEventStore- create an empty event store

func (*Client) NewQuery

func (c *Client) NewQuery() *Query

NewQuery - create an empty query

func (*Client) NewQueueMessage

func (c *Client) NewQueueMessage() *QueueMessage

NewQueueMessage - create an empty queue messages

func (*Client) NewQueueMessages

func (c *Client) NewQueueMessages() *QueueMessages

NewQueueMessages - create an empty queue messages array

func (*Client) NewReceiveQueueMessagesRequest

func (c *Client) NewReceiveQueueMessagesRequest() *ReceiveQueueMessagesRequest

NewReceiveQueueMessagesRequest - create an empty receive queue message request object

func (*Client) NewResponse

func (c *Client) NewResponse() *Response

NewResponse - create an empty response

func (*Client) NewStreamQueueMessage

func (c *Client) NewStreamQueueMessage() *StreamQueueMessage

NewStreamQueueMessage - create an empty stream receive queue message object

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) (*ServerInfo, error)

Ping - get status of current connection

func (*Client) Q

func (c *Client) Q() *Query

Q - create an empty query object

func (*Client) QM

func (c *Client) QM() *QueueMessage

QM - create an empty queue message object

func (*Client) QMB

func (c *Client) QMB() *QueueMessages

QMB - create an empty queue message array object

func (*Client) QueuesClient

func (c *Client) QueuesClient() *QueuesClient

QueuesClient - create a new QueuesClient

func (*Client) QueuesInfo

func (c *Client) QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)

QueuesInfo - get queues detailed information

func (*Client) R

func (c *Client) R() *Response

R - create an empty response object for command or query responses

func (*Client) RQM

RQM - create an empty receive queue message request object

func (*Client) ReceiveQueueMessages

ReceiveQueueMessages - call to receive messages from a queue

func (*Client) SQM

func (c *Client) SQM() *StreamQueueMessage

SQM - create an empty stream receive queue message object

func (*Client) SendQueueMessage

func (c *Client) SendQueueMessage(ctx context.Context, msg *QueueMessage) (*SendQueueMessageResult, error)

SendQueueMessage - send single queue message

func (*Client) SendQueueMessages

func (c *Client) SendQueueMessages(ctx context.Context, msg []*QueueMessage) ([]*SendQueueMessageResult, error)

SendQueueMessages - send multiple queue messages

func (*Client) SetCommand

func (c *Client) SetCommand(cmd *Command) *Command

func (*Client) SetEvent

func (c *Client) SetEvent(e *Event) *Event

func (*Client) SetEventStore

func (c *Client) SetEventStore(es *EventStore) *EventStore

func (*Client) SetQuery

func (c *Client) SetQuery(query *Query) *Query

func (*Client) SetQueueMessage

func (c *Client) SetQueueMessage(qm *QueueMessage) *QueueMessage

func (*Client) SetResponse

func (c *Client) SetResponse(response *Response) *Response

func (*Client) StreamEvents

func (c *Client) StreamEvents(ctx context.Context, eventsCh chan *Event, errCh chan error)

StreamEvents - send stream of events in a single call

func (*Client) StreamEventsStore

func (c *Client) StreamEventsStore(ctx context.Context, eventsCh chan *EventStore, eventsResultCh chan *EventStoreResult, errCh chan error)

StreamEventsStore - send stream of events store in a single call

func (*Client) SubscribeToCommands

func (c *Client) SubscribeToCommands(ctx context.Context, channel, group string, errCh chan error) (<-chan *CommandReceive, error)

SubscribeToCommands - subscribe to commands requests by channel and group. return channel of CommandReceived or en error

func (*Client) SubscribeToCommandsWithRequest

func (c *Client) SubscribeToCommandsWithRequest(ctx context.Context, request *CommandsSubscription, errCh chan error) (<-chan *CommandReceive, error)

SubscribeToCommands - subscribe to commands requests by channel and group. return channel of CommandReceived or en error

func (*Client) SubscribeToEvents

func (c *Client) SubscribeToEvents(ctx context.Context, channel, group string, errCh chan error) (<-chan *Event, error)

SubscribeToEvents - subscribe to events by channel and group. return channel of events or en error

func (*Client) SubscribeToEventsStore

func (c *Client) SubscribeToEventsStore(ctx context.Context, channel, group string, errCh chan error, opt SubscriptionOption) (<-chan *EventStoreReceive, error)

SubscribeToEventsStore - subscribe to events store by channel and group with subscription option. return channel of events or en error

func (*Client) SubscribeToEventsStoreWithRequest

func (c *Client) SubscribeToEventsStoreWithRequest(ctx context.Context, request *EventsStoreSubscription, errCh chan error) (<-chan *EventStoreReceive, error)

SubscribeToEventsStoreWithRequest - subscribe to events store by channel and group with subscription option. return channel of events or en error

func (*Client) SubscribeToEventsWithRequest

func (c *Client) SubscribeToEventsWithRequest(ctx context.Context, request *EventsSubscription, errCh chan error) (<-chan *Event, error)

SubscribeToEvents - subscribe to events by channel and group. return channel of events or en error

func (*Client) SubscribeToQueries

func (c *Client) SubscribeToQueries(ctx context.Context, channel, group string, errCh chan error) (<-chan *QueryReceive, error)

SubscribeToQueries - subscribe to queries requests by channel and group. return channel of QueryReceived or en error

func (*Client) SubscribeToQueriesWithRequest

func (c *Client) SubscribeToQueriesWithRequest(ctx context.Context, request *QueriesSubscription, errCh chan error) (<-chan *QueryReceive, error)

SubscribeToQueries - subscribe to queries requests by channel and group. return channel of QueryReceived or en error

type Command

type Command struct {
	Id       string
	Channel  string
	Metadata string
	Body     []byte
	Timeout  time.Duration
	ClientId string
	Tags     map[string]string
	// contains filtered or unexported fields
}

func NewCommand

func NewCommand() *Command

func (*Command) AddTag

func (c *Command) AddTag(key, value string) *Command

AddTag - add key value tags to command message

func (*Command) AddTrace

func (c *Command) AddTrace(name string) *Trace

AddTrace - add tracing support to command

func (*Command) Send

func (c *Command) Send(ctx context.Context) (*CommandResponse, error)

Send - sending command , waiting for response or timeout

func (*Command) SetBody

func (c *Command) SetBody(body []byte) *Command

SetBody - set command body - mandatory if metadata field is empty

func (*Command) SetChannel

func (c *Command) SetChannel(channel string) *Command

SetChannel - set command channel - mandatory if default channel was not set

func (*Command) SetClientId

func (c *Command) SetClientId(clientId string) *Command

SetClientId - set command ClientId - mandatory if default client was not set

func (*Command) SetId

func (c *Command) SetId(id string) *Command

SetId - set command requestId, otherwise new random uuid will be set

func (*Command) SetMetadata

func (c *Command) SetMetadata(metadata string) *Command

SetMetadata - set command metadata - mandatory if body field is empty

func (*Command) SetTags

func (c *Command) SetTags(tags map[string]string) *Command

SetTags - set key value tags to command message

func (*Command) SetTimeout

func (c *Command) SetTimeout(timeout time.Duration) *Command

SetTimeout - set timeout for command to be returned. if timeout expired , send command will result with an error

type CommandReceive

type CommandReceive struct {
	Id         string
	ClientId   string
	Channel    string
	Metadata   string
	Body       []byte
	ResponseTo string
	Tags       map[string]string
}

type CommandResponse

type CommandResponse struct {
	CommandId        string
	ResponseClientId string
	Executed         bool
	ExecutedAt       time.Time
	Error            string
	Tags             map[string]string
}

type CommandsClient

type CommandsClient struct {
	// contains filtered or unexported fields
}

func NewCommandsClient

func NewCommandsClient(ctx context.Context, op ...Option) (*CommandsClient, error)

func (*CommandsClient) Close

func (c *CommandsClient) Close() error

func (*CommandsClient) Response

func (c *CommandsClient) Response(ctx context.Context, response *Response) error

func (*CommandsClient) Send

func (c *CommandsClient) Send(ctx context.Context, request *Command) (*CommandResponse, error)

func (*CommandsClient) Subscribe

func (c *CommandsClient) Subscribe(ctx context.Context, request *CommandsSubscription, onCommandReceive func(cmd *CommandReceive, err error)) error

type CommandsSubscription

type CommandsSubscription struct {
	Channel  string
	Group    string
	ClientId string
}

func (*CommandsSubscription) Complete

func (cs *CommandsSubscription) Complete(opts *Options) *CommandsSubscription

func (*CommandsSubscription) Validate

func (cs *CommandsSubscription) Validate() error

type Event

type Event struct {
	Id       string
	Channel  string
	Metadata string
	Body     []byte
	ClientId string
	Tags     map[string]string
	// contains filtered or unexported fields
}

func NewEvent

func NewEvent() *Event

func (*Event) AddTag

func (e *Event) AddTag(key, value string) *Event

AddTag - add key value tags to event message

func (*Event) Send

func (e *Event) Send(ctx context.Context) error

func (*Event) SetBody

func (e *Event) SetBody(body []byte) *Event

SetBody - set event body - mandatory if metadata field was not set

func (*Event) SetChannel

func (e *Event) SetChannel(channel string) *Event

SetChannel - set event channel - mandatory if default channel was not set

func (*Event) SetClientId

func (e *Event) SetClientId(clientId string) *Event

SetClientId - set event ClientId - mandatory if default client was not set

func (*Event) SetId

func (e *Event) SetId(id string) *Event

SetId - set event id otherwise new random uuid will be set

func (*Event) SetMetadata

func (e *Event) SetMetadata(metadata string) *Event

SetMetadata - set event metadata - mandatory if body field was not set

func (*Event) SetTags

func (e *Event) SetTags(tags map[string]string) *Event

SetTags - set key value tags to event message

type EventStore

type EventStore struct {
	Id       string
	Channel  string
	Metadata string
	Body     []byte
	ClientId string
	Tags     map[string]string
	// contains filtered or unexported fields
}

func NewEventStore

func NewEventStore() *EventStore

func (*EventStore) AddTag

func (es *EventStore) AddTag(key, value string) *EventStore

AddTag - add key value tags to event store message

func (*EventStore) Send

func (es *EventStore) Send(ctx context.Context) (*EventStoreResult, error)

Send - sending events store message

func (*EventStore) SetBody

func (es *EventStore) SetBody(body []byte) *EventStore

SetBody - set event store body - mandatory if metadata field was not set

func (*EventStore) SetChannel

func (es *EventStore) SetChannel(channel string) *EventStore

SetChannel - set event store channel - mandatory if default channel was not set

func (*EventStore) SetClientId

func (es *EventStore) SetClientId(clientId string) *EventStore

SetClientId - set event store ClientId - mandatory if default client was not set

func (*EventStore) SetId

func (es *EventStore) SetId(id string) *EventStore

SetId - set event store id otherwise new random uuid will be set

func (*EventStore) SetMetadata

func (es *EventStore) SetMetadata(metadata string) *EventStore

SetMetadata - set event store metadata - mandatory if body field was not set

func (*EventStore) SetTags

func (es *EventStore) SetTags(tags map[string]string) *EventStore

SetTags - set key value tags to event store message

type EventStoreReceive

type EventStoreReceive struct {
	Id        string
	Sequence  uint64
	Timestamp time.Time
	Channel   string
	Metadata  string
	Body      []byte
	ClientId  string
	Tags      map[string]string
}

type EventStoreResult

type EventStoreResult struct {
	Id   string
	Sent bool
	Err  error
}

type EventsClient

type EventsClient struct {
	// contains filtered or unexported fields
}

func NewEventsClient

func NewEventsClient(ctx context.Context, op ...Option) (*EventsClient, error)

func (*EventsClient) Close

func (e *EventsClient) Close() error

func (*EventsClient) Send

func (e *EventsClient) Send(ctx context.Context, message *Event) error

func (*EventsClient) Stream

func (e *EventsClient) Stream(ctx context.Context, onError func(err error)) (func(msg *Event) error, error)

func (*EventsClient) Subscribe

func (e *EventsClient) Subscribe(ctx context.Context, request *EventsSubscription, onEvent func(msg *Event, err error)) error

type EventsErrorsHandler

type EventsErrorsHandler func(error)

type EventsMessageHandler

type EventsMessageHandler func(*Event)

type EventsStoreClient

type EventsStoreClient struct {
	// contains filtered or unexported fields
}

func NewEventsStoreClient

func NewEventsStoreClient(ctx context.Context, op ...Option) (*EventsStoreClient, error)

func (*EventsStoreClient) Close

func (es *EventsStoreClient) Close() error

func (*EventsStoreClient) Send

func (es *EventsStoreClient) Send(ctx context.Context, message *EventStore) (*EventStoreResult, error)

func (*EventsStoreClient) Stream

func (es *EventsStoreClient) Stream(ctx context.Context, onResult func(result *EventStoreResult, err error)) (func(msg *EventStore) error, error)

func (*EventsStoreClient) Subscribe

func (es *EventsStoreClient) Subscribe(ctx context.Context, request *EventsStoreSubscription, onEvent func(msg *EventStoreReceive, err error)) error

type EventsStoreSubscription

type EventsStoreSubscription struct {
	Channel          string
	Group            string
	ClientId         string
	SubscriptionType SubscriptionOption
}

func (*EventsStoreSubscription) Complete

func (*EventsStoreSubscription) Validate

func (es *EventsStoreSubscription) Validate() error

type EventsSubscription

type EventsSubscription struct {
	Channel  string
	Group    string
	ClientId string
}

func (*EventsSubscription) Complete

func (es *EventsSubscription) Complete(opts *Options) *EventsSubscription

func (*EventsSubscription) Validate

func (es *EventsSubscription) Validate() error

type Option

type Option interface {
	// contains filtered or unexported methods
}

func WithAddress

func WithAddress(host string, port int) Option

WithAddress - set host and port address of KubeMQ server

func WithAuthToken

func WithAuthToken(token string) Option

WithAuthToken - set KubeMQ JWT Auth token to be used for KubeMQ connection

func WithAutoReconnect

func WithAutoReconnect(value bool) Option

WithAutoReconnect - set automatic reconnection in case of lost connectivity to server

func WithCertificate

func WithCertificate(certData, serverOverrideDomain string) Option

WithCertificate - set secured TLS credentials from the input certificate data for client. serverNameOverride is for testing only. If set to a non empty string, it will override the virtual host name of authority (e.g. :authority header field) in requests.

func WithCheckConnection

func WithCheckConnection(value bool) Option

WithCheckConnection - set server connectivity on client create

func WithClientId

func WithClientId(id string) Option

WithClientId - set client id to be used in all functions call with this client - mandatory

func WithCredentials

func WithCredentials(certFile, serverOverrideDomain string) Option

WithCredentials - set secured TLS credentials from the input certificate file for client. serverNameOverride is for testing only. If set to a non empty string, it will override the virtual host name of authority (e.g. :authority header field) in requests.

func WithDefaultCacheTTL

func WithDefaultCacheTTL(ttl time.Duration) Option

WithDefaultCacheTTL - set default cache time to live for any query requests with any CacheKey set value

func WithDefaultChannel

func WithDefaultChannel(channel string) Option

WithDefaultChannel - set default channel for any outbound requests

func WithMaxReconnects

func WithMaxReconnects(value int) Option

WithMaxReconnects - set max reconnects before return error, default 0, never.

func WithReceiveBufferSize

func WithReceiveBufferSize(size int) Option

WithReceiveBufferSize - set length of buffered channel to be set in all subscriptions

func WithReconnectInterval

func WithReconnectInterval(duration time.Duration) Option

WithReconnectInterval - set reconnection interval duration, default is 5 seconds

func WithTransportType

func WithTransportType(transportType TransportType) Option

WithTransportType - set client transport type, currently GRPC or Rest

func WithUri

func WithUri(uri string) Option

WithUriAddress - set uri address of KubeMQ server

type Options

type Options struct {
	// contains filtered or unexported fields
}

func GetDefaultOptions

func GetDefaultOptions() *Options

func (*Options) Validate

func (o *Options) Validate() error

type QueriesClient

type QueriesClient struct {
	// contains filtered or unexported fields
}

func NewQueriesClient

func NewQueriesClient(ctx context.Context, op ...Option) (*QueriesClient, error)

func (*QueriesClient) Close

func (q *QueriesClient) Close() error

func (*QueriesClient) Response

func (q *QueriesClient) Response(ctx context.Context, response *Response) error

func (*QueriesClient) Send

func (q *QueriesClient) Send(ctx context.Context, request *Query) (*QueryResponse, error)

func (*QueriesClient) Subscribe

func (q *QueriesClient) Subscribe(ctx context.Context, request *QueriesSubscription, onQueryReceive func(query *QueryReceive, err error)) error

type QueriesSubscription

type QueriesSubscription struct {
	Channel  string
	Group    string
	ClientId string
}

func (*QueriesSubscription) Complete

func (qs *QueriesSubscription) Complete(opts *Options) *QueriesSubscription

func (*QueriesSubscription) Validate

func (qs *QueriesSubscription) Validate() error

type Query

type Query struct {
	Id       string
	Channel  string
	Metadata string
	Body     []byte
	Timeout  time.Duration
	ClientId string
	CacheKey string
	CacheTTL time.Duration
	Tags     map[string]string
	// contains filtered or unexported fields
}

func NewQuery

func NewQuery() *Query

func (*Query) AddTag

func (q *Query) AddTag(key, value string) *Query

AddTag - add key value tags to query message

func (*Query) AddTrace

func (q *Query) AddTrace(name string) *Trace

AddTrace - add tracing support to query

func (*Query) Send

func (q *Query) Send(ctx context.Context) (*QueryResponse, error)

Send - sending query request , waiting for response or timeout

func (*Query) SetBody

func (q *Query) SetBody(body []byte) *Query

SetBody - set query body - mandatory if metadata field is empty

func (*Query) SetCacheKey

func (q *Query) SetCacheKey(cacheKey string) *Query

SetCacheKey - set cache key to retrieve already stored query response, otherwise the response for this query will be stored in cache for future query requests

func (*Query) SetCacheTTL

func (q *Query) SetCacheTTL(ttl time.Duration) *Query

SetCacheTTL - set cache time to live for the this query cache key response to be retrieved already stored query response, if not set default cacheTTL will be set

func (*Query) SetChannel

func (q *Query) SetChannel(channel string) *Query

SetChannel - set query channel - mandatory if default channel was not set

func (*Query) SetClientId

func (q *Query) SetClientId(clientId string) *Query

SetClientId - set query ClientId - mandatory if default client was not set

func (*Query) SetId

func (q *Query) SetId(id string) *Query

SetId - set query requestId, otherwise new random uuid will be set

func (*Query) SetMetadata

func (q *Query) SetMetadata(metadata string) *Query

SetMetadata - set query metadata - mandatory if body field is empty

func (*Query) SetTags

func (q *Query) SetTags(tags map[string]string) *Query

SetTags - set key value tags to query message

func (*Query) SetTimeout

func (q *Query) SetTimeout(timeout time.Duration) *Query

SetTimeout - set timeout for query to be returned. if timeout expired , send query will result with an error

type QueryReceive

type QueryReceive struct {
	Id         string
	Channel    string
	ClientId   string
	Metadata   string
	Body       []byte
	ResponseTo string
	Tags       map[string]string
}

type QueryResponse

type QueryResponse struct {
	QueryId          string
	Executed         bool
	ExecutedAt       time.Time
	Metadata         string
	ResponseClientId string
	Body             []byte
	CacheHit         bool
	Error            string
	Tags             map[string]string
}

type QueueInfo

type QueueInfo struct {
	Name          string `json:"name"`
	Messages      int64  `json:"messages"`
	Bytes         int64  `json:"bytes"`
	FirstSequence int64  `json:"first_sequence"`
	LastSequence  int64  `json:"last_sequence"`
	Sent          int64  `json:"sent"`
	Subscribers   int    `json:"subscribers"`
	Waiting       int64  `json:"waiting"`
	Delivered     int64  `json:"delivered"`
}

type QueueMessage

type QueueMessage struct {
	*pb.QueueMessage
	// contains filtered or unexported fields
}

func NewQueueMessage

func NewQueueMessage() *QueueMessage

func (*QueueMessage) Ack

func (qm *QueueMessage) Ack() error

ack - sending ack queue message in stream queue message mode

func (*QueueMessage) AddTag

func (qm *QueueMessage) AddTag(key, value string) *QueueMessage

AddTag - add key value tags to query message

func (*QueueMessage) AddTrace

func (qm *QueueMessage) AddTrace(name string) *Trace

AddTrace - add tracing support to queue message

func (*QueueMessage) ExtendVisibility

func (qm *QueueMessage) ExtendVisibility(value int32) error

ExtendVisibility - extend the visibility time for the current receive message

func (*QueueMessage) Reject

func (qm *QueueMessage) Reject() error

reject - sending reject queue message in stream queue message mode

func (*QueueMessage) Resend

func (qm *QueueMessage) Resend(channel string) error

Resend - sending resend

func (*QueueMessage) Send

Send - sending queue message request , waiting for response or timeout

func (*QueueMessage) SetBody

func (qm *QueueMessage) SetBody(body []byte) *QueueMessage

SetBody - set queue message body - mandatory if metadata field is empty

func (*QueueMessage) SetChannel

func (qm *QueueMessage) SetChannel(channel string) *QueueMessage

SetChannel - set queue message channel - mandatory if default channel was not set

func (*QueueMessage) SetClientId

func (qm *QueueMessage) SetClientId(clientId string) *QueueMessage

SetClientId - set queue message ClientId - mandatory if default client was not set

func (*QueueMessage) SetId

func (qm *QueueMessage) SetId(id string) *QueueMessage

SetId - set queue message id, otherwise new random uuid will be set

func (*QueueMessage) SetMetadata

func (qm *QueueMessage) SetMetadata(metadata string) *QueueMessage

SetMetadata - set queue message metadata - mandatory if body field is empty

func (*QueueMessage) SetPolicyDelaySeconds

func (qm *QueueMessage) SetPolicyDelaySeconds(sec int) *QueueMessage

SetPolicyDelaySeconds - set queue message delivery delay in seconds, 0 , immediate delivery

func (*QueueMessage) SetPolicyExpirationSeconds

func (qm *QueueMessage) SetPolicyExpirationSeconds(sec int) *QueueMessage

SetPolicyExpirationSeconds - set queue message expiration seconds, 0 never expires

func (*QueueMessage) SetPolicyMaxReceiveCount

func (qm *QueueMessage) SetPolicyMaxReceiveCount(max int) *QueueMessage

SetPolicyMaxReceiveCount - set max delivery attempts before message will discard or re-route to a new queue

func (*QueueMessage) SetPolicyMaxReceiveQueue

func (qm *QueueMessage) SetPolicyMaxReceiveQueue(channel string) *QueueMessage

SetPolicyMaxReceiveQueue - set queue name to be routed once MaxReceiveCount is triggered, empty will discard the message

func (*QueueMessage) SetTags

func (qm *QueueMessage) SetTags(tags map[string]string) *QueueMessage

SetTags - set key value tags to queue message

type QueueMessageAttributes

type QueueMessageAttributes struct {
	Timestamp         int64
	Sequence          uint64
	MD5OfBody         string
	ReceiveCount      int32
	ReRouted          bool
	ReRoutedFromQueue string
	ExpirationAt      int64
	DelayedTo         int64
}

type QueueMessagePolicy

type QueueMessagePolicy struct {
	ExpirationSeconds int32
	DelaySeconds      int32
	MaxReceiveCount   int32
	MaxReceiveQueue   string
}

type QueueMessages

type QueueMessages struct {
	Messages []*QueueMessage
	// contains filtered or unexported fields
}

func (*QueueMessages) Add

func (qma *QueueMessages) Add(msg *QueueMessage) *QueueMessages

Add - adding new queue message to array of messages

func (*QueueMessages) Send

Send - sending queue messages array request , waiting for response or timeout

type QueueTransactionMessageRequest

type QueueTransactionMessageRequest struct {
	RequestID         string
	ClientID          string
	Channel           string
	VisibilitySeconds int32
	WaitTimeSeconds   int32
}

func NewQueueTransactionMessageRequest

func NewQueueTransactionMessageRequest() *QueueTransactionMessageRequest

func (*QueueTransactionMessageRequest) Complete

func (*QueueTransactionMessageRequest) SetChannel

SetChannel - set receive queue transaction message request channel - mandatory if default channel was not set

func (*QueueTransactionMessageRequest) SetClientId

SetClientId - set receive queue transaction message request ClientId - mandatory if default client was not set

func (*QueueTransactionMessageRequest) SetId

SetId - set receive queue transaction message request id, otherwise new random uuid will be set

func (*QueueTransactionMessageRequest) SetVisibilitySeconds

func (req *QueueTransactionMessageRequest) SetVisibilitySeconds(visibility int) *QueueTransactionMessageRequest

SetVisibilitySeconds - set receive queue transaction message visibility seconds for hiding message from other clients during processing

func (*QueueTransactionMessageRequest) SetWaitTimeSeconds

func (req *QueueTransactionMessageRequest) SetWaitTimeSeconds(wait int) *QueueTransactionMessageRequest

SetWaitTimeSeconds - set receive queue transaction message request wait timout for receiving queue message

func (*QueueTransactionMessageRequest) Validate

func (req *QueueTransactionMessageRequest) Validate() error

type QueueTransactionMessageResponse

type QueueTransactionMessageResponse struct {
	Message *QueueMessage
	// contains filtered or unexported fields
}

func (*QueueTransactionMessageResponse) Ack

func (*QueueTransactionMessageResponse) ExtendVisibilitySeconds

func (qt *QueueTransactionMessageResponse) ExtendVisibilitySeconds(value int) error

func (*QueueTransactionMessageResponse) Reject

func (*QueueTransactionMessageResponse) Resend

func (qt *QueueTransactionMessageResponse) Resend(channel string) error

func (*QueueTransactionMessageResponse) ResendNewMessage

func (qt *QueueTransactionMessageResponse) ResendNewMessage(msg *QueueMessage) error

type QueuesClient

type QueuesClient struct {
	// contains filtered or unexported fields
}

func NewQueuesStreamClient

func NewQueuesStreamClient(ctx context.Context, op ...Option) (*QueuesClient, error)

func (*QueuesClient) AckAll

func (*QueuesClient) Batch

func (q *QueuesClient) Batch(ctx context.Context, messages []*QueueMessage) ([]*SendQueueMessageResult, error)

func (*QueuesClient) Close

func (q *QueuesClient) Close() error

func (*QueuesClient) Peek

func (*QueuesClient) Pull

func (*QueuesClient) QueuesInfo

func (q *QueuesClient) QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)

func (*QueuesClient) Send

func (*QueuesClient) Subscribe

func (q *QueuesClient) Subscribe(ctx context.Context, request *ReceiveQueueMessagesRequest, onReceive func(response *ReceiveQueueMessagesResponse, err error)) (chan struct{}, error)

func (*QueuesClient) Transaction

func (*QueuesClient) TransactionStream

func (q *QueuesClient) TransactionStream(ctx context.Context, request *QueueTransactionMessageRequest, onReceive func(response *QueueTransactionMessageResponse, err error)) (chan struct{}, error)

type QueuesInfo

type QueuesInfo struct {
	TotalQueues int          `json:"total_queues"`
	Sent        int64        `json:"sent"`
	Waiting     int64        `json:"waiting"`
	Delivered   int64        `json:"delivered"`
	Queues      []*QueueInfo `json:"queues"`
}

type ReceiveQueueMessagesRequest

type ReceiveQueueMessagesRequest struct {
	RequestID           string
	ClientID            string
	Channel             string
	MaxNumberOfMessages int32
	WaitTimeSeconds     int32
	IsPeak              bool
	// contains filtered or unexported fields
}

func NewReceiveQueueMessagesRequest

func NewReceiveQueueMessagesRequest() *ReceiveQueueMessagesRequest

func (*ReceiveQueueMessagesRequest) AddTrace

func (req *ReceiveQueueMessagesRequest) AddTrace(name string) *Trace

AddTrace - add tracing support to receive queue message request

func (*ReceiveQueueMessagesRequest) Complete

func (*ReceiveQueueMessagesRequest) Send

Send - sending receive queue messages request , waiting for response or timeout

func (*ReceiveQueueMessagesRequest) SetChannel

SetChannel - set receive queue message request channel - mandatory if default channel was not set

func (*ReceiveQueueMessagesRequest) SetClientId

SetClientId - set receive queue message request ClientId - mandatory if default client was not set

func (*ReceiveQueueMessagesRequest) SetId

SetId - set receive queue message request id, otherwise new random uuid will be set

func (*ReceiveQueueMessagesRequest) SetIsPeak

SetIsPeak - set receive queue message request type, true - peaking at the queue and not actual dequeue , false - dequeue the queue

func (*ReceiveQueueMessagesRequest) SetMaxNumberOfMessages

func (req *ReceiveQueueMessagesRequest) SetMaxNumberOfMessages(max int) *ReceiveQueueMessagesRequest

SetMaxNumberOfMessages - set receive queue message request max number of messages to receive in single call

func (*ReceiveQueueMessagesRequest) SetWaitTimeSeconds

func (req *ReceiveQueueMessagesRequest) SetWaitTimeSeconds(wait int) *ReceiveQueueMessagesRequest

SetWaitTimeSeconds - set receive queue message request wait timout for receiving all requested messages

func (*ReceiveQueueMessagesRequest) Validate

func (req *ReceiveQueueMessagesRequest) Validate() error

type ReceiveQueueMessagesResponse

type ReceiveQueueMessagesResponse struct {
	RequestID        string
	Messages         []*QueueMessage
	MessagesReceived int32
	MessagesExpired  int32
	IsPeak           bool
	IsError          bool
	Error            string
}

type Response

type Response struct {
	RequestId  string
	ResponseTo string
	Metadata   string
	Body       []byte
	ClientId   string
	ExecutedAt time.Time
	Err        error
	Tags       map[string]string
	// contains filtered or unexported fields
}

func NewResponse

func NewResponse() *Response

func (*Response) AddTrace

func (r *Response) AddTrace(name string) *Trace

AddTrace - add tracing support to response

func (*Response) Send

func (r *Response) Send(ctx context.Context) error

Send - sending response to command or query request

func (*Response) SetBody

func (r *Response) SetBody(body []byte) *Response

SetMetadata - set body response, for query only

func (*Response) SetClientId

func (r *Response) SetClientId(clientId string) *Response

SetClientID - set clientId response, if not set default clientId will be used

func (*Response) SetError

func (r *Response) SetError(err error) *Response

SetError - set query or command execution error

func (*Response) SetExecutedAt

func (r *Response) SetExecutedAt(executedAt time.Time) *Response

SetExecutedAt - set query or command execution time

func (*Response) SetMetadata

func (r *Response) SetMetadata(metadata string) *Response

SetMetadata - set metadata response, for query only

func (*Response) SetRequestId

func (r *Response) SetRequestId(id string) *Response

SetId - set response corresponded requestId - mandatory

func (*Response) SetResponseTo

func (r *Response) SetResponseTo(channel string) *Response

SetResponseTo - set response channel as received in CommandReceived or QueryReceived object - mandatory

func (*Response) SetTags

func (r *Response) SetTags(tags map[string]string) *Response

SetTags - set response tags

type SendQueueMessageResult

type SendQueueMessageResult struct {
	MessageID    string
	SentAt       int64
	ExpirationAt int64
	DelayedTo    int64
	IsError      bool
	Error        string
}

type ServerInfo

type ServerInfo struct {
	Host                string
	Version             string
	ServerStartTime     int64
	ServerUpTimeSeconds int64
}

type StreamQueueMessage

type StreamQueueMessage struct {
	RequestID string
	ClientID  string
	Channel   string
	// contains filtered or unexported fields
}

func (*StreamQueueMessage) AddTrace

func (req *StreamQueueMessage) AddTrace(name string) *Trace

AddTrace - add tracing support to stream receive queue message request

func (*StreamQueueMessage) Close

func (req *StreamQueueMessage) Close()

Close - end stream of queue messages and cancel all pending operations

func (*StreamQueueMessage) Next

func (req *StreamQueueMessage) Next(ctx context.Context, visibility, wait int32) (*QueueMessage, error)

Next - receive queue messages request , waiting for response or timeout

func (*StreamQueueMessage) ResendWithNewMessage

func (req *StreamQueueMessage) ResendWithNewMessage(msg *QueueMessage) error

ResendWithNewMessage - resend the current received message to a new channel

func (*StreamQueueMessage) SetChannel

func (req *StreamQueueMessage) SetChannel(channel string) *StreamQueueMessage

SetChannel - set stream queue message request channel - mandatory if default channel was not set

func (*StreamQueueMessage) SetClientId

func (req *StreamQueueMessage) SetClientId(clientId string) *StreamQueueMessage

SetClientId - set stream queue message request ClientId - mandatory if default client was not set

func (*StreamQueueMessage) SetId

SetId - set stream queue message request id, otherwise new random uuid will be set

type SubscriptionOption

type SubscriptionOption interface {
	// contains filtered or unexported methods
}

func StartFromFirstEvent

func StartFromFirstEvent() SubscriptionOption

StartFromFirstEvent - replay all the stored events from the first available sequence and continue stream new events from this point

func StartFromLastEvent

func StartFromLastEvent() SubscriptionOption

StartFromLastEvent - replay last event and continue stream new events from this point

func StartFromNewEvents

func StartFromNewEvents() SubscriptionOption

StartFromNewEvents - start event store subscription with only new events

func StartFromSequence

func StartFromSequence(sequence int) SubscriptionOption

StartFromSequence - replay events from specific event sequence number and continue stream new events from this point

func StartFromTime

func StartFromTime(since time.Time) SubscriptionOption

StartFromTime - replay events from specific time continue stream new events from this point

func StartFromTimeDelta

func StartFromTimeDelta(delta time.Duration) SubscriptionOption

StartFromTimeDelta - replay events from specific current time - delta duration in seconds, continue stream new events from this point

type Trace

type Trace struct {
	Name string
	// contains filtered or unexported fields
}

func CreateTrace

func CreateTrace(name string) *Trace

func (*Trace) AddAnnotation

func (t *Trace) AddAnnotation(timestamp time.Time, message string) *Trace

func (*Trace) AddBoolAttribute

func (t *Trace) AddBoolAttribute(key string, value bool) *Trace

func (*Trace) AddInt64Attribute

func (t *Trace) AddInt64Attribute(key string, value int64) *Trace

func (*Trace) AddStringAttribute

func (t *Trace) AddStringAttribute(key string, value string) *Trace

type Transport

type Transport interface {
	Ping(ctx context.Context) (*ServerInfo, error)
	SendEvent(ctx context.Context, event *Event) error
	StreamEvents(ctx context.Context, eventsCh chan *Event, errCh chan error)
	SubscribeToEvents(ctx context.Context, request *EventsSubscription, errCh chan error) (<-chan *Event, error)
	SendEventStore(ctx context.Context, eventStore *EventStore) (*EventStoreResult, error)
	StreamEventsStore(ctx context.Context, eventsCh chan *EventStore, eventsResultCh chan *EventStoreResult, errCh chan error)
	SubscribeToEventsStore(ctx context.Context, request *EventsStoreSubscription, errCh chan error) (<-chan *EventStoreReceive, error)
	SendCommand(ctx context.Context, command *Command) (*CommandResponse, error)
	SubscribeToCommands(ctx context.Context, request *CommandsSubscription, errCh chan error) (<-chan *CommandReceive, error)
	SendQuery(ctx context.Context, query *Query) (*QueryResponse, error)
	SubscribeToQueries(ctx context.Context, request *QueriesSubscription, errCh chan error) (<-chan *QueryReceive, error)
	SendResponse(ctx context.Context, response *Response) error
	SendQueueMessage(ctx context.Context, msg *QueueMessage) (*SendQueueMessageResult, error)
	SendQueueMessages(ctx context.Context, msg []*QueueMessage) ([]*SendQueueMessageResult, error)
	ReceiveQueueMessages(ctx context.Context, req *ReceiveQueueMessagesRequest) (*ReceiveQueueMessagesResponse, error)
	AckAllQueueMessages(ctx context.Context, req *AckAllQueueMessagesRequest) (*AckAllQueueMessagesResponse, error)
	StreamQueueMessage(ctx context.Context, reqCh chan *pb.StreamQueueMessagesRequest, resCh chan *pb.StreamQueueMessagesResponse, errCh chan error, doneCh chan bool)
	QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)
	GetGRPCRawClient() (pb.KubemqClient, error)
	Close() error
}

type TransportType

type TransportType int
const (
	TransportTypeGRPC TransportType = iota
	TransportTypeRest
)

Directories

Path Synopsis
examples
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL