pipeline

package
v0.10.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2021 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewRepo

func NewRepo(db *sql.DB) *sqlRepo

func NewSubscription

func NewSubscription(cfg *config.Config) (*pubsub.Subscription, error)

func PublishFiles

func PublishFiles(pub XferPublisher, xfer *client.Transfer, files []*ach.File) error

PublishFiles attempts to upload all files to the Pipeline and returns all errors as a base.ErrorList.

All files are attempted to be published as downstream processors are expected to de-duplicate files.

Types

type CanceledTransfer

type CanceledTransfer struct {
	TransferID string `json:"transferID"`
}

type CutoffCallback

type CutoffCallback func() error

CutoffCallback is a function called before cutoff processing is performed.

type MockPublisher

type MockPublisher struct {
	Xfers   map[string]Xfer
	Cancels map[string]CanceledTransfer

	Err error
}

func NewMockPublisher

func NewMockPublisher() *MockPublisher

func (*MockPublisher) Cancel

func (p *MockPublisher) Cancel(msg CanceledTransfer) error

func (*MockPublisher) Shutdown

func (p *MockPublisher) Shutdown(ctx context.Context)

func (*MockPublisher) Upload

func (p *MockPublisher) Upload(xfer Xfer) error

type MockXferMerging

type MockXferMerging struct {
	LatestXfer   *Xfer
	LatestCancel *CanceledTransfer

	Err error
	// contains filtered or unexported fields
}

func (*MockXferMerging) HandleCancel

func (merge *MockXferMerging) HandleCancel(cancel CanceledTransfer) error

func (*MockXferMerging) HandleXfer

func (merge *MockXferMerging) HandleXfer(xfer Xfer) error

func (*MockXferMerging) WithEachMerged

func (merge *MockXferMerging) WithEachMerged(func(*ach.File) error) (*processedTransfers, error)

type Repository

type Repository interface {
	MarkTransfersAsProcessed(transferIDs []string) error
}

type Xfer

type Xfer struct {
	Transfer *client.Transfer `json:"transfer"`
	File     *ach.File        `json:"file"`
}

type XferAggregator

type XferAggregator struct {
	// contains filtered or unexported fields
}

XferAggregator ...

this has a for loop which is triggered on cutoff warning

e.g. 10mins before 30mins before cutoff (10 mins is Moov's window, 30mins is ODFI)

consume as many transfers as possible, then upload.

func NewAggregator

func NewAggregator(
	cfg *config.Config,
	agent upload.Agent,
	repo Repository,
	merger XferMerging,
	sub *pubsub.Subscription,
	cutoffCallbacks []CutoffCallback,
) (*XferAggregator, error)

func (*XferAggregator) RegisterRoutes

func (xfagg *XferAggregator) RegisterRoutes(svc *admin.Server)

func (*XferAggregator) Shutdown

func (xfagg *XferAggregator) Shutdown()

func (*XferAggregator) Start

func (xfagg *XferAggregator) Start(ctx context.Context, cutoffs *schedule.CutoffTimes)

type XferMerging

type XferMerging interface {
	HandleXfer(xfer Xfer) error
	HandleCancel(cancel CanceledTransfer) error

	WithEachMerged(func(*ach.File) error) (*processedTransfers, error)
}

XferMerging represents logic for accepting ACH files to be merged together.

The idea is to take Xfers and store them on a filesystem (or other durable storage) prior to a cutoff window. The specific storage could be based on the FileHeader.

On the cutoff trigger WithEachMerged is called to merge files together and offer each merged file for an upload.

func NewMerging

func NewMerging(logger log.Logger, cfg config.Pipeline) (XferMerging, error)

type XferPublisher

type XferPublisher interface {
	Upload(xfer Xfer) error
	Cancel(msg CanceledTransfer) error
	Shutdown(ctx context.Context)
}

XferPublisher is an interface for pushing Transfers (and their ACH files) to be uploaded to an ODFI. These implementations can be to push Transfers onto streams (e.g. kafka, rabbitmq) or inmem (the default in our OSS PayGate).

func NewPublisher

func NewPublisher(cfg config.Pipeline) (XferPublisher, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL