messages

package module
v0.0.0-...-7d2ac21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2018 License: Apache-2.0 Imports: 9 Imported by: 0

README

messages GoDoc Go Report Card

Vitess messages client

Documentation

Overview

Package messages defines a pubsub style of interfacing with the Vitess Messaging feature. The goal is to provide a simple and performant way to use message queues without needing to worry about the underlying SQL and system architecture. All operations are safe for concurrent use.

More details can be found at http://vitess.io/advanced/messaging/

Queues

Queues are the core component of messaging, and require the MySQL tables to be set up before they can be used. They are treated like any other table in terms of vschema and sharding. The definition requires strict field ordering for required fields, followed by N payload fields. There is no restriction on the payload field types.

create table my_important_messages(
	-- required fields for Vitess - order matters
	time_scheduled bigint,
	id bigint,
	time_next bigint,
	epoch bigint,
	time_created bigint,
	time_acked bigint,

	-- whatever payload fields your queue needs, including potential sharding keys
	my_data_field_1 varchar(128),
	my_data_field_2 bigint,
	my_data_field_3 json,

	-- recommended indexes for common Vitess queries
	primary key(time_scheduled, id),
	unique index id_idx(id),
	index next_idx(time_next, epoch)

	-- Vitess specific settings are set in the comments
) comment 'vitess_message,vt_ack_wait=30,vt_purge_after=86400,vt_batch_size=10,vt_cache_size=10000,vt_poller_interval=30'

Inserting and Acknowledging Messages

Perhaps the most valuable feature of Vitess messages is that you can atomically add, ack or fail messages in the same transaction as your actual data commits. The Execer interface lets you optionally provide a sql.DB / sql.Tx that these commands will be executed on. There is no requirement that these need to use the Vitess driver. You can perform any of these operations without needing to run Queue.Open.

Subscribing to Messages

This utilizes some custom Vitess syntax and connection parameters, so the queue itself creates a sql.DB that it uses under the hood to stream messages. The max concurrency definition determines how many messages will be leased out and kept in memory at any given time. No connection is made until Queue.Open is run.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Execer

type Execer interface {
	ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}

Execer lets functions accept a DB or a Tx without knowing the difference

type Queue

type Queue struct {
	Name string
	// contains filtered or unexported fields
}

A Queue represents a Vitess message queue

func NewQueue

func NewQueue(name string, fieldNames []string) *Queue

NewQueue returns a queue definition but doesn't make any network requests

func (*Queue) Ack

func (q *Queue) Ack(ctx context.Context, e Execer, messageID int64) error

Ack marks a message as successfully completed

func (*Queue) Add

func (q *Queue) Add(ctx context.Context, e Execer, messageID int64, data ...interface{}) error

Add adds a task to the queue

func (*Queue) AddScheduled

func (q *Queue) AddScheduled(ctx context.Context, e Execer, messageID, timeScheduled int64, data ...interface{}) error

AddScheduled adds a task to the queue to be executed at the specified time timeScheduled needs to be in Unix Nanoseconds

func (*Queue) Close

func (q *Queue) Close() error

Close drains the processing channel and closes the connection to the database TODO: Nack all remaining messages

func (*Queue) Fail

func (q *Queue) Fail(ctx context.Context, e Execer, messageID int64) error

Fail marks a task as failed, and it will not be queued again until manual action is taken

func (*Queue) Get

func (q *Queue) Get(ctx context.Context, dest ...interface{}) error

Get returns the next available message. It blocks until either a message is available or the context is cancelled.

func (*Queue) Nack

func (q *Queue) Nack(ctx context.Context, e Execer, messageID int64) error

Nack marks a message as unsuccessfully completed

func (*Queue) Open

func (q *Queue) Open(ctx context.Context, address, target string) error

Open connects to an underlying Vitess cluster and streams messages. The queue will buffer the defined max concurrent number of messages in memory and will block until one of the messages is acknowledged.

using the vitessdriver for database/sql.

Only a single connection is opened and it remains open until Close is called. Context cancellation is respected

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL