dbpipe

package
v0.11.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2020 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package dbpipe contains DBMsgPublisherPipe which is used as a publisher pipeline from RDBMS to downstream publisher (deprecating).

Typical workflow (ignore error handling):

// Creates DBMsgPublisherPipe.
// `downstream` specifies the sink of pipeline.
// `dialect` specifies the dialect of RDBMS.
// `db`/`table` specify which table to store message data.
pipe, _ := dbpipe.NewDBMsgPublisherPipe(downstream, dialect, db, table)

// Run transaction.
sqlh.WithTx(ctx, db, func(ctx context.Context, tx *sql.Tx) error {
    // Create a new publisher inside transaction.
    publisher := pipe.NewMsgPublisherWithTx(ctx, tx)

    // ... Some db operations ....

    publisher.Publish(ctx, "subject", "data")

    // ... Some more db operations ...

    return nil
})

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultMaxDelBulkSz is the default value of OptMaxDelBulkSz.
	DefaultMaxDelBulkSz = 1000
	// DefaultMaxInflight is the default value of OptMaxInflight.
	DefaultMaxInflight = 2048
	// DefaultMaxBuf is the default value of OptMaxBuf.
	DefaultMaxBuf = 512
	// DefaultRetryWait is the default value of OptRetryWait.
	DefaultRetryWait = 2 * time.Second
	// DefaultFlushWait is the default value of OptFlushWait.
	DefaultFlushWait = 15 * time.Second
)
View Source
var (
	// ErrMaxInflightAndBuf is returned if OptMaxBuf > OptMaxInflight.
	ErrMaxInflightAndBuf = errors.New("nproto.dbpipe.DBMsgPublisherPipe: MaxBuf should be <= MaxInflight")
	// ErrUnknownDialect is returned if the dialect is not supported.
	ErrUnknownDialect = func(dialect string) error {
		return fmt.Errorf("nproto.dbpipe.DBMsgPublisherPipe: Unknown dialect: %+q", dialect)
	}
)

Functions

This section is empty.

Types

type DBMsgPublisher

type DBMsgPublisher struct {
	// contains filtered or unexported fields
}

DBMsgPublisher is used to "publish" messages to the database. Implements nproto.MsgPublisher interface.

func (*DBMsgPublisher) Flush

func (p *DBMsgPublisher) Flush(ctx context.Context)

Flush is the low level function used to flush messages to downstream. IMPORTANT: call this method ONLY after the transaction has been committed successfully.

Use NewMsgPublisherWithTx to handle it automatically if possible.

func (*DBMsgPublisher) Publish

func (p *DBMsgPublisher) Publish(ctx context.Context, subject string, msgData []byte) error

Publish implements nproto.MsgPublisher interface. MetaData attached `ctx` will be passed unmodified to downstream publisher.

type DBMsgPublisherPipe

type DBMsgPublisherPipe struct {
	// contains filtered or unexported fields
}

DBMsgPublisherPipe is used as a publisher pipeline from RDBMS to downstream publisher.

func NewDBMsgPublisherPipe

func NewDBMsgPublisherPipe(downstream nproto.MsgPublisher, dialect string, db *sql.DB, table string, opts ...Option) (*DBMsgPublisherPipe, error)

NewDBMsgPublisherPipe creates a new DBMsgPublisherPipe. `downstream` can be either nproto.MsgPublisher/nproto.MsgAsyncPublisher. `dialect` can be: "mysql". `db` is the database where to store messages. `table` is the name of the table to store messages. If `OptNoRedeliveryLoop` is given in `opts` then the redelivery loop will not be run.

func (*DBMsgPublisherPipe) Close

func (pipe *DBMsgPublisherPipe) Close()

Close close the DBMsgPublisherPipe and wait the redeliveryLoop exits (if exists).

func (*DBMsgPublisherPipe) NewMsgPublisher

func (pipe *DBMsgPublisherPipe) NewMsgPublisher(q sqlh.Queryer) *DBMsgPublisher

NewMsgPublisher creates a DBMsgPublisher. `q` must be connecting to the same database as DBMsgPublisherPipe.

func (*DBMsgPublisherPipe) NewMsgPublisherWithTx

func (pipe *DBMsgPublisherPipe) NewMsgPublisherWithTx(ctx context.Context, tx *sql.Tx) *DBMsgPublisher

NewMsgPublisherWithTx creates a DBMsgPublisher within sqlh.WithTx. NOTE: tx must be started by the same db of pipe.

type Option

type Option func(*DBMsgPublisherPipe) error

Option is used when createing DBMsgPublisherPipe.

func OptContext

func OptContext(ctx context.Context) Option

OptContext sets the base context for the pipe.

func OptFlushWait

func OptFlushWait(t time.Duration) Option

OptFlushWait sets the interval between redelivery.

func OptLogger

func OptLogger(logger *zerolog.Logger) Option

OptLogger sets structured logger.

func OptMaxBuf

func OptMaxBuf(maxBuf int) Option

OptMaxBuf sets the max number of messages buffered in memory.

func OptMaxDelBulkSz

func OptMaxDelBulkSz(bulkSize int) Option

OptMaxDelBulkSz sets the max number of messages to delete each time after delivered to downstream.

func OptMaxInflight

func OptMaxInflight(maxInflight int) Option

OptMaxInflight sets the max number of messages to publish to downstream before acknowledgement.

func OptNoRedeliveryLoop

func OptNoRedeliveryLoop() Option

OptNoRedeliveryLoop prevent DBMsgPublisherPipe to launch the redelivery loop.

func OptRetryWait

func OptRetryWait(t time.Duration) Option

OptRetryWait sets the interval between getting db connection and lock.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL