bqloader

package module
v1.3.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2024 License: MIT Imports: 21 Imported by: 2

README

bqloader

PkgGoDev Main Branch Workflow Go Report Card codecov GitHub GitHub tag (latest SemVer)

bqloader is a simple ETL framework to load data from Cloud Storage into BigQuery.

Installation

go get -u go.nownabe.dev/bqloader

Getting Started with Pre-configured Handlers

See the example to get a full instruction.

To load some types of CSV formats, you can use pre-configured handlers. See full list.

package myfunc

import (
	"context"

	"go.nownabe.dev/bqloader"
	"go.nownabe.dev/bqloader/contrib/handlers"
)

var loader bqloader.BQLoader

func init() {
	loader, _ = bqloader.New()

	t := handlers.TableGenerator(os.Getenv("BIGQUERY_PROJECT_ID"), os.Getenv("BIGQUERY_DATASET_ID"))
	n := &bqloader.SlackNotifier{
		Token:   os.Getenv("SLACK_TOKEN"),
		Channel: os.Getenv("SLACK_CHANNEL"),
	}

	handlers.MustAddHandlers(context.Background(), loader,
		/*
			These build handlers to load CSVs, given four arguments:
			handler name, a pattern to file path on Cloud Storage, a BigQuery table and a notifier.
		*/
		handlers.SBISumishinNetBankStatement("SBI Bank", `^sbi_bank/`, t("sbi_bank"), n),
		handlers.SMBCCardStatement("SMBC Card", `^smbc_card/`, t("smbc_card"), n),
	)
}

// BQLoad is the entrypoint for Cloud Functions.
func BQLoad(ctx context.Context, e bqloader.Event) error {
	return loader.Handle(ctx, e)
}

Getting Started with Custom Handlers

(See Quickstart example to get a full instruction.)

To load other CSVs, import the package go.nownabe.dev/bqloader and write your custom handler.

package myfunc

import (
	"context"
	"os"
	"regexp"
	"strings"
	"time"

	"golang.org/x/text/encoding/japanese"
	"golang.org/x/xerrors"

	"go.nownabe.dev/bqloader"
)

var loader bqloader.BQLoader

func init() {
	loader, _ = bqloader.New()
	loader.MustAddHandler(context.Background(), newHandler())
}

func newHandler() *bqloader.Handler {
	/*
		This projector converts date fields formatted as "2006/01/02"
		at the first column into strings like "2006-01-02" that satisfies
		BigQuery date type.
	*/
	projector := func(_ context.Context, r []string) ([]string, error) {
		t, err := time.Parse("2006/01/02", r[0])
		if err != nil {
			return nil, xerrors.Errorf("Column 0 cannot parse as a date: %w", err)
		}

		r[0] = t.Format("2006-01-02")

		return r, nil
	}

	return &bqloader.Handler{
		Name:     "quickstart",                         // Handler name used in logs and notifications.
		Pattern:  regexp.MustCompile("^example_bank/"), // This handler processes files matched to this pattern.
		Encoding: japanese.ShiftJIS,                    // Source file encoding.
		Parser:   bqloader.CSVParser(),                 // Parser parses source file into records.
		Notifier: &bqloader.SlackNotifier{
			Token:   os.Getenv("SLACK_TOKEN"),
			Channel: os.Getenv("SLACK_CHANNEL"),
		},
		Projector:       projector, // Projector transforms each row.
		SkipLeadingRows: 1,         // Skip header row.

		// Destination.
		Project: os.Getenv("BIGQUERY_PROJECT_ID"),
		Dataset: os.Getenv("BIGQUERY_DATASET_ID"),
		Table:   os.Getenv("BIGQUERY_TABLE_ID"),
	}
}

// BQLoad is the entrypoint for Cloud Functions.
func BQLoad(ctx context.Context, e bqloader.Event) error {
	return loader.Handle(ctx, e)
}

Diagram

diagram

Documentation

Overview

Package bqloader is a simple ETL framework running on Cloud Functions to load data from Cloud Storage into BigQuery.

Getting started with pre-configured handlers

See the example to get a full instruction. https://github.com/nownabe/go-bqloader/tree/main/examples/pre_configured_handlers

To load some types of CSV formats, you can use pre-configured handlers. See the full list on GitHub. https://github.com/nownabe/go-bqloader/tree/main/contrib/handlers

package myfunc

import (
	"context"

	"go.nownabe.dev/bqloader"
	"go.nownabe.dev/bqloader/contrib/handlers"
)

var loader bqloader.BQLoader

func init() {
	loader, _ = bqloader.New()

	t := handlers.TableGenerator(os.Getenv("BIGQUERY_PROJECT_ID"), os.Getenv("BIGQUERY_DATASET_ID"))
	n := &bqloader.SlackNotifier{
		Token:   os.Getenv("SLACK_TOKEN"),
		Channel: os.Getenv("SLACK_CHANNEL"),
	}

	handlers.MustAddHandlers(context.Background(), loader,
		// These build handlers to load CSVs, given four arguments:
		// handler name, a pattern to file path on Cloud Storage, a BigQuery table and a notifier.
		handlers.SBISumishinNetBankStatement("SBI Bank", `^sbi_bank/`, t("sbi_bank"), n),
		handlers.SMBCCardStatement("SMBC Card", `^smbc_card/`, t("smbc_card"), n),
	)
}

// BQLoad is the entrypoint for Cloud Functions.
func BQLoad(ctx context.Context, e bqloader.Event) error {
	return loader.Handle(ctx, e)
}

Getting started with custom handlers

See Quickstart example to get a full instruction. https://github.com/nownabe/go-bqloader/tree/main/examples/quickstart

For simple transforming and loading CSV, import the package `go.nownabe.dev/bqloader` and write your handler.

package myfunc

import (
	"context"
	"os"
	"regexp"
	"strings"
	"time"

	"golang.org/x/text/encoding/japanese"
	"golang.org/x/xerrors"

	"go.nownabe.dev/bqloader"
)

var loader bqloader.BQLoader

func init() {
	loader, _ = bqloader.New()
	loader.MustAddHandler(context.Background(), newHandler())
}

func newHandler() *bqloader.Handler {
	// This projector converts date fields formatted as "2006/01/02"
	// at the first column into strings like "2006-01-02" that satisfies
	// BigQuery date type.
	projector := func(_ context.Context, r []string) ([]string, error) {
		t, err := time.Parse("2006/01/02", r[0])
		if err != nil {
			return nil, xerrors.Errorf("Column 0 cannot parse as a date: %w", err)
		}

		r[0] = t.Format("2006-01-02")

		return r, nil
	}

	return &bqloader.Handler{
		Name:     "quickstart",                         // Handler name used in logs and notifications.
		Pattern:  regexp.MustCompile("^example_bank/"), // This handler processes files matched to this pattern.
		Encoding: japanese.ShiftJIS,                    // Source file encoding.
		Parser:   bqloader.CSVParser(),                 // Parser parses source file into records.
		Notifier: &bqloader.SlackNotifier{
			Token:   os.Getenv("SLACK_TOKEN"),
			Channel: os.Getenv("SLACK_CHANNEL"),
		},
		Projector:       projector, // Projector transforms each row.
		SkipLeadingRows: 1,         // Skip header row.

		// Destination.
		Project: os.Getenv("BIGQUERY_PROJECT_ID"),
		Dataset: os.Getenv("BIGQUERY_DATASET_ID"),
		Table:   os.Getenv("BIGQUERY_TABLE_ID"),
	}
}

// BQLoad is the entrypoint for Cloud Functions.
func BQLoad(ctx context.Context, e bqloader.Event) error {
	return loader.Handle(ctx, e)
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BQLoader

type BQLoader interface {
	AddHandler(context.Context, *Handler) error
	Handle(context.Context, Event) error
	MustAddHandler(context.Context, *Handler)
}

BQLoader loads data from Cloud Storage to BigQuery table.

func New

func New(opts ...Option) (BQLoader, error)

New build a new Loader.

type Event

type Event struct {
	Name        string    `json:"name"`
	Bucket      string    `json:"bucket"`
	TimeCreated time.Time `json:"timeCreated"`
	// contains filtered or unexported fields
}

Event is an event from Cloud Storage.

func (*Event) FullPath

func (e *Event) FullPath() string

FullPath returns full path of storage object beginning with gs://.

type Extractor added in v1.3.0

type Extractor interface {
	Extract(context.Context, Event) (io.Reader, func(), error)
}

Extractor extracts data from source such as Cloud Storage.

type Handler

type Handler struct {
	// Name is the handler's name.
	Name string

	Pattern         *regexp.Regexp
	Encoding        encoding.Encoding
	Parser          Parser
	Notifier        Notifier
	Projector       Projector
	SkipLeadingRows uint
	Preprocessor    Preprocessor

	// BatchSize specifies how much records are processed in a groutine.
	// Default is 10000.
	BatchSize int

	// Project specifies GCP project name of destination BigQuery table.
	Project string

	// Dataset specifies BigQuery dataset ID of destination table
	Dataset string

	// Table specifies BigQuery table ID as destination.
	Table string

	Extractor Extractor
	Loader    Loader
	// contains filtered or unexported fields
}

Handler defines how to handle events which match specified pattern.

func (*Handler) Handle added in v1.3.0

func (h *Handler) Handle(ctx context.Context, e Event) error

Handle handles events.

func (*Handler) SetConcurrency added in v1.3.0

func (h *Handler) SetConcurrency(n int)

SetConcurrency sets handler's concurrency directly. Normally set concurrency to BQLoader with WithConcurrency option.

type Loader added in v1.3.0

type Loader interface {
	Load(context.Context, [][]string) error
}

Loader loads projected data into a destination such as BigQuery.

type Notifier added in v0.4.0

type Notifier interface {
	Notify(context.Context, *Result) error
}

Notifier notifies results for each event.

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option configures BQLoader.

func WithConcurrency added in v0.5.0

func WithConcurrency(n int) Option

WithConcurrency configures the concurrency of projectors. Before setting this, confirm GOMAXPROCS.

func WithLogLevel

func WithLogLevel(l string) Option

WithLogLevel configures log level to print logs. Allowed values are trace, debug, info, warn, error, fatal or panic.

func WithPrettyLogging

func WithPrettyLogging() Option

WithPrettyLogging configures BQLoader to print human friendly logs.

type Parser

type Parser func(context.Context, io.Reader) ([][]string, error)

Parser parses files from storage.

func CSVParser

func CSVParser() Parser

CSVParser provides a parser to parse CSV files.

type Preprocessor added in v1.2.0

type Preprocessor func(context.Context, Event) (context.Context, error)

Preprocessor preprocesses event and store data into a map.

type Projector

type Projector func(context.Context, []string) ([]string, error)

Projector transforms source records into records for destination.

type Result added in v0.4.0

type Result struct {
	Event   Event
	Handler *Handler
	Error   error
}

Result is a result for each event.

type SlackNotifier added in v0.4.0

type SlackNotifier struct {
	Channel string
	Token   string

	// Optional.
	IconEmoji string

	// Optional.
	Username string

	// Optional.
	HTTPClient *http.Client
	// contains filtered or unexported fields
}

SlackNotifier is a notifier for Slack. SlackNotifier requires bot token and permissions. Recommended permissions are chat:write, chat:write.customize and chat:write.public.

func (*SlackNotifier) Notify added in v0.4.0

func (n *SlackNotifier) Notify(ctx context.Context, r *Result) error

Notify notifies results to Slack channel.

Directories

Path Synopsis
contrib
handlers
Package handlers includes pre-configured handlers for bqloader.
Package handlers includes pre-configured handlers for bqloader.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL