package messager

import "gopkg.in/src-d/go-vitess.v1/vt/vttablet/tabletserver/messager"


Package Files

cache.go engine.go message_manager.go


var MessageDelayTimings = stats.NewMultiTimings(
    "MessageDelayTimings records total latency from queueing to client sends",

MessageDelayTimings records total latency from queueing to sent to clients.

var MessageStats = stats.NewGaugesWithMultiLabels(
    "Stats for messages",
    []string{"TableName", "Metric"})

MessageStats tracks stats for messages.

type Engine Uses

type Engine struct {
    // contains filtered or unexported fields

Engine is the engine for handling messages.

func NewEngine Uses

func NewEngine(tsv TabletService, se *schema.Engine, config tabletenv.TabletConfig) *Engine

NewEngine creates a new Engine.

func (*Engine) Close Uses

func (me *Engine) Close()

Close closes the Engine service.

func (*Engine) GenerateAckQuery Uses

func (me *Engine) GenerateAckQuery(name string, ids []string) (string, map[string]*querypb.BindVariable, error)

GenerateAckQuery returns the query and bind vars for acking a message.

func (*Engine) GenerateLoadMessagesQuery Uses

func (me *Engine) GenerateLoadMessagesQuery(name string) (*sqlparser.ParsedQuery, error)

GenerateLoadMessagesQuery returns the ParsedQuery for loading messages by pk. The results of the query can be used in a BuildMessageRow call.

func (*Engine) GeneratePostponeQuery Uses

func (me *Engine) GeneratePostponeQuery(name string, ids []string) (string, map[string]*querypb.BindVariable, error)

GeneratePostponeQuery returns the query and bind vars for postponing a message.

func (*Engine) GeneratePurgeQuery Uses

func (me *Engine) GeneratePurgeQuery(name string, timeCutoff int64) (string, map[string]*querypb.BindVariable, error)

GeneratePurgeQuery returns the query and bind vars for purging messages.

func (*Engine) InitDBConfig Uses

func (me *Engine) InitDBConfig(dbcfgs *dbconfigs.DBConfigs)

InitDBConfig must be called before Open.

func (*Engine) LockDB Uses

func (me *Engine) LockDB(newMessages map[string][]*MessageRow, changedMessages map[string][]string) func()

LockDB obtains db locks for all messages that need to be updated and returns the counterpart unlock function.

func (*Engine) Open Uses

func (me *Engine) Open() error

Open starts the Engine service.

func (*Engine) Subscribe Uses

func (me *Engine) Subscribe(ctx context.Context, name string, send func(*sqltypes.Result) error) (done <-chan struct{}, err error)

Subscribe subscribes to messages from the requested table. The function returns a done channel that will be closed when the subscription ends, which can be initiated by the send function returning io.EOF. The engine can also end a subscription which is usually triggered by Close. It's the responsibility of the send function to promptly return if the done channel is closed. Otherwise, the engine's Close function will hang indefinitely.

func (*Engine) UpdateCaches Uses

func (me *Engine) UpdateCaches(newMessages map[string][]*MessageRow, changedMessages map[string][]string)

UpdateCaches updates the caches for the committed changes.

type MessageRow Uses

type MessageRow struct {
    TimeNext    int64
    Epoch       int64
    TimeCreated int64
    Row         []sqltypes.Value
    // contains filtered or unexported fields

MessageRow represents a message row. The first column in Row is always the "id".

func BuildMessageRow Uses

func BuildMessageRow(row []sqltypes.Value) (*MessageRow, error)

BuildMessageRow builds a MessageRow for a db row.

type TabletService Uses

type TabletService interface {
    PostponeMessages(ctx context.Context, target *querypb.Target, name string, ids []string) (count int64, err error)
    PurgeMessages(ctx context.Context, target *querypb.Target, name string, timeCutoff int64) (count int64, err error)

TabletService defines the functions of TabletServer that the messager needs for callback.

