pubsub

package
v0.0.0-...-bf83fb3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2023 License: MIT Imports: 14 Imported by: 0

README

go-lib/pubsub

This package allows the creation of publishers (pub) and subscribers (sub). Sub can be linked to multiple pub. When a pub receives new/updated data, it can update linked subs either immediately or in batches. It is up to the subs to define what to do with the published data. Published data must have a type and an id specified, which define that pub's unique record. Optionally, a JSON representation of the pub object can also be provided. Deleted pub data records are handled using soft deletes. An internal versioning system tracks the latest version. It is possible for a subscriber to send the same version more than once. So, the sub data handlers should handle this accordingly.

Usage

Publish data

	version, err := pubsub.Publish(db, pubsub.PublishParam{
		PublishID: 1,
		Type:      "data-type",
		ID:        "data-id",
		Deleted:   false,
		JSON:      []byte(`{"type":"data-type","id":"data-id"}`),
	})

Mark as deleted

	version, err := pubsub.Publish(db, pubsub.PublishParam{
		PublishID: 1,
		Type:      "data-type",
		ID:        "data-id",
		Deleted:   true,
	})

Subscriber that both listens for updates and processes all new/updated records once

// Define the batch data handler that implements the pubsub.SubBatchHandler
type batchHandler struct {
	s    *pubsub.Subscriber
	time time.Time
}

// Push push the records
func (bh *batchHandler) Push(notifyList []*pubsub.Notify) (err error) {
	// Informational log - not required
	total, success, retry, fail := bh.s.GetStats()
	log.Info().Msgf("time: %v, %d total, %d success, %d retry, %d fail",
		time.Since(bh.time), total, success, retry, fail)

	// Lookup actual data and push..., return error if fails

	return nil
}

// Define listen handler
type listenHandler struct {
	time  time.Time
}

func (lh *listenHandler) Send(n *pubsub.Notify) (hash string, jsonBytes []byte, err error) {
	return "test." + n.ID + ".v" + strconv.Itoa(n.Version), nil, nil
}

// Initialize the subscriber
s, err := pubsub.NewSubscriber(*sql.Connection, "subscriber-example")
if err != nil {
	// handle err
}

// Initialize the listen handler
lh := &listenHandler{
	time: time.Now(),
}

// Listen for pub events
if err := s.Listen(*sql.ConnParam, lh); err != nil {
	// handle err
}

// Initialize the data handler
bh := &batchHandler{
	time: time.Now(),
	s:    s,
}

batchSize := 100 // The number of records to send to a subscriber at a time
batchLimit := 1000 // The total number of records to process in the batch

// Run the batch
if err := s.RunBatch(bh, batchSize, batchLimit); err != nil {
	// handle err
}

  • The sub data handler will receive a notification of new/updated pub data with the following: pub id, data type, data id, deleted field, JSON representation of pub data (if set), previous hash (if set) and the new pub data version (incremented every time a record is published).
  • The subscriber has a built in retry system. When a sub data handler returns an error, it will increment the retry count and try again the next time it is called. Once the retry count exceeds the subscriber's retry count, it will mark that record as failed and, if set, call the error handler. The subscriber can be configured to have 0 retries, in which case it will fail the first time.
Missing features to be implemented:
  • API to create publishers, subscribers and pub/sub links. Currently, they must be created manualy in the database
  • Optionally utilize the process package to parallelize processing of sub data
  • Optionally require no duplicates be sent. This will slow down processing as it will have to ensure the record was processed before continuing

Documentation

Index

Constants

View Source
const (
	// Error constants
	ECode070101 = e.Code0701 + "01"
	ECode070102 = e.Code0701 + "02"
	ECode070103 = e.Code0701 + "03"
	ECode070104 = e.Code0701 + "04"
	ECode070105 = e.Code0701 + "05"
)
View Source
const (
	// Error constants
	ECode070D01 = e.Code070D + "01"
	ECode070D02 = e.Code070D + "02"
	ECode070D03 = e.Code070D + "03"
	ECode070D04 = e.Code070D + "04"
	ECode070D05 = e.Code070D + "05"
)
View Source
const (

	// Error constants
	ECode070201 = e.Code0702 + "01"
	ECode070202 = e.Code0702 + "02"
)
View Source
const (
	// Error constants
	ECode070A01 = e.Code070A + "01"
	ECode070A02 = e.Code070A + "02"
	ECode070A03 = e.Code070A + "03"
	ECode070A04 = e.Code070A + "04"
	ECode070A05 = e.Code070A + "05"
	ECode070A06 = e.Code070A + "06"
	ECode070A07 = e.Code070A + "07"
	ECode070A08 = e.Code070A + "08"
	ECode070A09 = e.Code070A + "09"
	ECode070A0A = e.Code070A + "0A"
	ECode070A0B = e.Code070A + "0B"
	ECode070A0C = e.Code070A + "0C"
)
View Source
const (
	// Error constants
	ECode070B01 = e.Code070B + "01"
	ECode070B02 = e.Code070B + "02"
	ECode070B03 = e.Code070B + "03"
	ECode070B04 = e.Code070B + "04"
	ECode070B05 = e.Code070B + "05"
)
View Source
const (

	// Error constants
	ECode070301 = e.Code0703 + "01"
	ECode070302 = e.Code0703 + "02"
	ECode070303 = e.Code0703 + "03"
	ECode070304 = e.Code0703 + "04"
	ECode070305 = e.Code0703 + "05"
	ECode070306 = e.Code0703 + "06"
	ECode070307 = e.Code0703 + "07"
	ECode070308 = e.Code0703 + "08"
	ECode070309 = e.Code0703 + "09"
	ECode07030A = e.Code0703 + "0A"
	ECode07030B = e.Code0703 + "0B"
	ECode07030C = e.Code0703 + "0C"
)
View Source
const (
	// Error constants
	ECode070401 = e.Code0704 + "01"
	ECode070402 = e.Code0704 + "02"
)
View Source
const (
	// Error constants
	ECode070C01 = e.Code070C + "01"
)
View Source
const (
	MIGRATION_CODE = "pubsub"
)

Variables

This section is empty.

Functions

func GetMigrationList

func GetMigrationList() (ml *migration.List)

GetMigrationList returns this packages migration list

func GetPublisher

func GetPublisher(db *sql.Connection, code string) (p *model.Pub, err error)

GetPublisher returns the pub record if it exists

func Publish

func Publish(db *sql.Connection, p PublishParam) (version int, err error)

Publish upserts a new pub data record for the specified data type/id. If it already exists, it will update the deleted field, the JSON value and increment the version.

func PublishList

func PublishList(db *sql.Connection, list []PublishParam) (successCount int, err error)

PublishList upserts the list of new pub data records. If any already exists, it will update the deleted field, the JSON value and increment the version for that record. If duplicate records are in the list, it is not guaranteed which one will be saved. This would only have an impact if the deleted value or the JSON value are different between the duplicate records.

Types

type BatchPublisher

type BatchPublisher struct {
	// contains filtered or unexported fields
}

BatchPublisher helper to publish records in batches. Initialize and set the batch size. Then call Add, and records will be automatically committed when the batch size is reached. Call Flush to commit any pending records and Close when finished.

func NewBatchPublisher

func NewBatchPublisher(db *sql.Connection, p *BatchPublisherParam) (bp *BatchPublisher)

NewBatchPublisher creates a new batch publisher to upserts pub data record in batches.

func (*BatchPublisher) Add

func (bp *BatchPublisher) Add(pp PublishParam) (commitCount int, err error)

Add adds the record to the pending publish list. If the size of the list exceeds the batch size, then it will automatically commit the pending records.

func (*BatchPublisher) Close

func (bp *BatchPublisher) Close() (errList []error)

Close closes any open database statements. This should be called when finished with the batch publishing.

func (*BatchPublisher) Flush

func (bp *BatchPublisher) Flush() (commitCount int, err error)

Flush commits any pending records.

func (*BatchPublisher) GetBatchSize

func (bp *BatchPublisher) GetBatchSize() (batchSize uint)

GetBatchSize get the currently set batch size.

func (*BatchPublisher) GetList

func (bp *BatchPublisher) GetList() (mList []*model.Data)

GetList get the current list of records

func (*BatchPublisher) SetBatchSize

func (bp *BatchPublisher) SetBatchSize(size uint) (actualBatchSize uint)

SetBatchSize set the batch size. The actual batch size may be reduced if it exceeds the maximum allowed rows per insert based on the number of columns in the bulk insert.

func (*BatchPublisher) SetPostCommit

func (bp *BatchPublisher) SetPostCommit(f func(commitCount int) error)

SetPostCommit sets the post commit func called right after each commit. Set to nil to disable

func (*BatchPublisher) SetPreCommit

func (bp *BatchPublisher) SetPreCommit(f func() error)

SetPreCommit sets the pre commit func called right before each commit. Set to nil to disable

type BatchPublisherParam

type BatchPublisherParam struct {
	BatchSize  uint                           // The size of each batch
	PreCommit  func() error                   // Called right before a commit of records
	PostCommit func(committedCount int) error // Called right after a commit of records
}

BatchPublisherParam params sent to NewBatchPublisher

type Event

type Event struct {
	PubID        int    `json:"pubId"`
	Type         string `json:"dataType"`
	ID           string `json:"dataId"`
	Deleted      bool   `json:"deleted"`
	Version      int    `json:"version"`
	PreviousHash string `json:"-"`
	NewHash      string `json:"-"`
	NewJSON      []byte `json:"-"`
	// contains filtered or unexported fields
}

Event the expected JSON from a skyrin_dps_notify call

func (*Event) Error

func (ev *Event) Error(err error)

Error sets an error for the event. If an error is set for an event, it will be automatically saved with the event when the status is updated

func (*Event) GetEventJSON

func (ev *Event) GetEventJSON(db *sql.Connection) (b []byte, err error)

GetEventJSON retrieves the new JSON from the event record

type PublishParam

type PublishParam struct {
	PublishID int    // The publisher id
	Type      string // The data type
	ID        string // The data id
	Deleted   bool   // Indicates if the item was deleted or not
	JSON      []byte // Optional JSON bytes representing the object
}

PublishParam parameters for the Publish func

type SubBatchHandler

type SubBatchHandler interface {
	// Push is called for each batch of sub data to send to a subscriber. The subscriber should load and process
	// the data as needed. If an error is returned, all the pending records will be marked with this error and
	// their retries will be incremented. If any record has reached its retry limit, it will be marked as failed
	// and will not be pushed again until it is changed or it is manually reset to pending.
	Push([]*Event) (err error)
}

SubBatchHandler defines the logic to send the publish events for the specific subscriber.

type SubDataListener

type SubDataListener interface {
	// Send should send the publish event for the subscriber, returning the hash of the
	// object sent and optionally a JSON representation of the object. If it failed, it
	// should return an error.
	Send(ev *Event) (hash string, jsonBytes []byte, err error)
}

SubDataListener defines the logic to send the publish event for a listening subscriber

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber use NewSubscriber to initialize and either listen for pub data or process new/updated pub data in the skyrin_dps_pub table

func NewSubscriber

func NewSubscriber(db *sql.Connection, code string) (s *Subscriber, err error)

NewSubscriber initializes the subscriber and processes any pending sub data records

func (*Subscriber) Close

func (s *Subscriber) Close() (err error)

Close stops listening and cleans up

func (*Subscriber) GetStats

func (s *Subscriber) GetStats() (total, success, retry, fail int)

GetStats returns info on how many records have been processed. total = total number of records success = total number of successfully sent records retry = total number of failed records, but have been marked to retry fail = total number of failed records (retry limit has been exceeded and it will not automatically retry again)

func (*Subscriber) Listen

func (s *Subscriber) Listen(cp *sql.ConnParam, sdl SubDataListener) (err error)

Listen use to listen for change events to records in the skyrin_dps_data table. If an insert or update occurs, the event will be triggered with a JSON string. The subscriber will check if the pubId matches a linked publisher. If it does, it will proceed to process that record.

func (*Subscriber) Populate

func (s *Subscriber) Populate() (err error)

Populate creates missing and updates existing sub data records

func (*Subscriber) PopulateAndRun

func (s *Subscriber) PopulateAndRun(sbh SubBatchHandler, batchSize, batchLimit int) (err error)

PopulateAndRun helper that calls Populate then Run

func (*Subscriber) Run

func (s *Subscriber) Run(sbh SubBatchHandler, batchSize, batchLimit int) (err error)

Run runs the batch process for the specified number of records (batchLimit)

func (*Subscriber) RunBatch

func (s *Subscriber) RunBatch(sbh SubBatchHandler, batchSize, batchLimit int) (err error)

RunBatch runs the batch process for the specified number of records (batchLimit)

func (*Subscriber) SetErrorHandler

func (s *Subscriber) SetErrorHandler(f func(err error))

SetErrorHandler sets the error handler. If a pubsub fails after the subscriber's configured number of retries, this will be called if set when the record is marked as failed.

func (*Subscriber) SetMaxGoRoutines

func (s *Subscriber) SetMaxGoRoutines(max uint)

SetMaxGoRoutines set the maximum number of go routines to use while processing the subscriber data

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL