process

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2023 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// MaxDelaySeconds is the maximum number of seconds to randomly wait in
	// response to BigQuery errors.
	MaxDelaySeconds = 60
	// QueryRetries is the maximum number of times to retry a query.
	QueryRetries = 2
	// ErrCorrupt may be returned by a processor implementation if the file
	// content should be considered corrupt and not included in the output archive.
	ErrCorrupt = errors.New("file content is corrupt")
)

Functions

This section is empty.

Types

type Copier

type Copier struct {
	Jobs    *jobs.Client
	Process Renamer
}

Copier manages bulk rename operations.

func (*Copier) ProcessDate

func (c *Copier) ProcessDate(ctx context.Context, date string) error

ProcessDate applies the renamer to the given date.

type Manager

type Manager[Row any] struct {
	Jobs              *jobs.Client
	Process           Processor[Row]
	QueryClient       query.Querier
	Query             string
	RetryQueryOnError bool
}

Manager uses a Processor to act on every result returned by the Querier. Manager uses a type parameter for the query result rows and Processor type.

func (*Manager[Row]) ProcessDate

func (r *Manager[Row]) ProcessDate(ctx context.Context, date string) error

ProcessDate processes all archives found on a given date.

func (*Manager[Row]) ProcessRow

func (r *Manager[Row]) ProcessRow(ctx context.Context, date string, row Row) error

ProcessRow acts on a single row for the given date. Typically the row represents an source archive with additional metadata needed for processing every file in the archive.

type Processor

type Processor[Row any] interface {
	// Init sets up the processor for processing the given date, e.g. downloading daily databases.
	Init(ctx context.Context, date string)
	// Source creates a new archive source to read archive files to process.
	Source(ctx context.Context, row Row) *archive.Source
	// File processes the given file content. File should only return ErrCorrupt
	// if the content is corrupt. If the file content cannot be processed for other
	// reasons, then return the original data with no error.
	File(h *tar.Header, b []byte) ([]byte, error)
	// Finish concludes an archive after all files have been processed.
	Finish(ctx context.Context, out *archive.Target) error
}

A Processor is used by the process Manager to act on the content of every file of every row archive. Processor uses a type parameter for the specific query row type.

type Renamer

type Renamer interface {
	List(ctx context.Context, date string) ([]string, error)
	Rename(ctx context.Context, url string) (string, error)
}

Renamer is an interface for types that support renaming.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL