Documentation ¶
Overview ¶
Package dbpipe contains DBMsgPublisherPipe which is used as a publisher pipeline from RDBMS to downstream publisher (deprecating).
Typical workflow (ignore error handling):
// Creates DBMsgPublisherPipe. // `downstream` specifies the sink of pipeline. // `dialect` specifies the dialect of RDBMS. // `db`/`table` specify which table to store message data. pipe, _ := dbpipe.NewDBMsgPublisherPipe(downstream, dialect, db, table) // Run transaction. sqlh.WithTx(ctx, db, func(ctx context.Context, tx *sql.Tx) error { // Create a new publisher inside transaction. publisher := pipe.NewMsgPublisherWithTx(ctx, tx) // ... Some db operations .... publisher.Publish(ctx, "subject", "data") // ... Some more db operations ... return nil })
Index ¶
- Variables
- type DBMsgPublisher
- type DBMsgPublisherPipe
- type Option
- func OptContext(ctx context.Context) Option
- func OptFlushWait(t time.Duration) Option
- func OptLogger(logger *zerolog.Logger) Option
- func OptMaxBuf(maxBuf int) Option
- func OptMaxDelBulkSz(bulkSize int) Option
- func OptMaxInflight(maxInflight int) Option
- func OptNoRedeliveryLoop() Option
- func OptRetryWait(t time.Duration) Option
Constants ¶
This section is empty.
Variables ¶
var ( // DefaultMaxDelBulkSz is the default value of OptMaxDelBulkSz. DefaultMaxDelBulkSz = 1000 // DefaultMaxInflight is the default value of OptMaxInflight. DefaultMaxInflight = 2048 // DefaultMaxBuf is the default value of OptMaxBuf. DefaultMaxBuf = 512 // DefaultRetryWait is the default value of OptRetryWait. DefaultRetryWait = 2 * time.Second // DefaultFlushWait is the default value of OptFlushWait. DefaultFlushWait = 15 * time.Second )
var ( // ErrMaxInflightAndBuf is returned if OptMaxBuf > OptMaxInflight. ErrMaxInflightAndBuf = errors.New("nproto.dbpipe.DBMsgPublisherPipe: MaxBuf should be <= MaxInflight") // ErrUnknownDialect is returned if the dialect is not supported. ErrUnknownDialect = func(dialect string) error { return fmt.Errorf("nproto.dbpipe.DBMsgPublisherPipe: Unknown dialect: %+q", dialect) } )
Functions ¶
This section is empty.
Types ¶
type DBMsgPublisher ¶
type DBMsgPublisher struct {
// contains filtered or unexported fields
}
DBMsgPublisher is used to "publish" messages to the database. Implements nproto.MsgPublisher interface.
func (*DBMsgPublisher) Flush ¶
func (p *DBMsgPublisher) Flush(ctx context.Context)
Flush is the low level function used to flush messages to downstream. IMPORTANT: call this method ONLY after the transaction has been committed successfully.
Use NewMsgPublisherWithTx to handle it automatically if possible.
type DBMsgPublisherPipe ¶
type DBMsgPublisherPipe struct {
// contains filtered or unexported fields
}
DBMsgPublisherPipe is used as a publisher pipeline from RDBMS to downstream publisher.
func NewDBMsgPublisherPipe ¶
func NewDBMsgPublisherPipe(downstream nproto.MsgPublisher, dialect string, db *sql.DB, table string, opts ...Option) (*DBMsgPublisherPipe, error)
NewDBMsgPublisherPipe creates a new DBMsgPublisherPipe. `downstream` can be either nproto.MsgPublisher/nproto.MsgAsyncPublisher. `dialect` can be: "mysql". `db` is the database where to store messages. `table` is the name of the table to store messages. If `OptNoRedeliveryLoop` is given in `opts` then the redelivery loop will not be run.
func (*DBMsgPublisherPipe) Close ¶
func (pipe *DBMsgPublisherPipe) Close()
Close close the DBMsgPublisherPipe and wait the redeliveryLoop exits (if exists).
func (*DBMsgPublisherPipe) NewMsgPublisher ¶
func (pipe *DBMsgPublisherPipe) NewMsgPublisher(q sqlh.Queryer) *DBMsgPublisher
NewMsgPublisher creates a DBMsgPublisher. `q` must be connecting to the same database as DBMsgPublisherPipe.
func (*DBMsgPublisherPipe) NewMsgPublisherWithTx ¶
func (pipe *DBMsgPublisherPipe) NewMsgPublisherWithTx(ctx context.Context, tx *sql.Tx) *DBMsgPublisher
NewMsgPublisherWithTx creates a DBMsgPublisher within sqlh.WithTx. NOTE: tx must be started by the same db of pipe.
type Option ¶
type Option func(*DBMsgPublisherPipe) error
Option is used when createing DBMsgPublisherPipe.
func OptContext ¶
OptContext sets the base context for the pipe.
func OptFlushWait ¶
OptFlushWait sets the interval between redelivery.
func OptMaxDelBulkSz ¶
OptMaxDelBulkSz sets the max number of messages to delete each time after delivered to downstream.
func OptMaxInflight ¶
OptMaxInflight sets the max number of messages to publish to downstream before acknowledgement.
func OptNoRedeliveryLoop ¶
func OptNoRedeliveryLoop() Option
OptNoRedeliveryLoop prevent DBMsgPublisherPipe to launch the redelivery loop.
func OptRetryWait ¶
OptRetryWait sets the interval between getting db connection and lock.