luci: Index | Files

package bqlog

import ""

Package bqlog provides a mechanism to asynchronously log rows to BigQuery.

It uses Pull Task Queues as a temporary buffer for rows. The main use case is to log events from online RPC handlers that are supposed to be fast and reliable (and not depend on BigQuery latency or availability). If you need to upload a large number of events at once or you are doing some offline batch processing, better use BigQuery API directly: it will be much cheaper and faster.

This is a relatively low-level library, not a complete end-to-end solution. It doesn't make many assumptions about the nature of logged events, and some defaults are likely need an adjustment to support a specific rate and size of events.

It also doesn't try to guess how often to flush events or how to parallelize this process. Users are responsible to periodically call 'Flush' themselves (preferably from a dedicated GAE module, since flushes are CPU intensive if there's a large number of pending events).


Package Files


type Entry Uses

type Entry struct {
    // InsertID is used to deduplicate entries.
    // Will be autogenerated if empty. All autogenerated IDs start with 'bqlog:'.
    InsertID string

    // Data is JSON-serializable body of the entry.
    // It must match the table schema, otherwise 'Flush' will skip this entry.
    Data map[string]interface{}

Entry is a single structured entry in the log.

type Log Uses

type Log struct {
    // QueueName is a name of a pull queue to use as a buffer for inserts.
    // Required. It must be defined in queue.yaml file and it must not be used by
    // any other Log object.
    QueueName string

    // ProjectID is Cloud Project that owns the dataset.
    // If empty, will be derived from the current app ID.
    ProjectID string

    // DatasetID identifies the already existing dataset that contains the table.
    // Required.
    DatasetID string

    // TableID identifies the name of the table in the dataset.
    // Required. The table must exist already.
    TableID string

    // BatchesPerRequest is how many batches of entries to send in one BQ insert.
    // A call to 'Insert' generates one batch of entries, thus BatchesPerRequest
    // essentially specifies how many 'Insert's to clump together when sending
    // data to BigQuery. If your Inserts are known to be huge, lowering this value
    // may help to avoid hitting memory limits.
    // Default is 250. It assumes your batches are very small (1-3 rows), which
    // is usually the case if events are generated by online RPC handlers.
    BatchesPerRequest int

    // MaxParallelUploads is how many parallel ops to do when flushing.
    // We limit it to avoid hitting OOM errors on GAE.
    // Default is 64.
    MaxParallelUploads int

    // FlushTimeout is maximum duration to spend in fetching from Pull Queue in
    // 'Flush'.
    // We limit it to make sure 'Flush' has a chance to finish running before
    // GAE kills it by deadline. Next time 'Flush' is called, it will resume
    // flushing from where it left off.
    // Note that 'Flush' can run for slightly longer, since it waits for all
    // pulled data to be flushed before returning.
    // Default is 1 min.
    FlushTimeout time.Duration

    // DumpEntriesToLogger makes 'Insert' log all entries (at debug level).
    DumpEntriesToLogger bool

    // DryRun disables the actual uploads (keeps the local logging though).
    DryRun bool
    // contains filtered or unexported fields

Log can be used to insert entries into a BigQuery table.

func (*Log) Flush Uses

func (l *Log) Flush(ctx context.Context) (int, error)

Flush pulls buffered rows from Pull Queue and sends them to BigQuery.

Must be called periodically from some cron job. It is okay to call 'Flush' concurrently from multiple processes to speed up the upload.

It succeeds if all entries it attempted to send were successfully handled by BigQuery. If some entries are malformed, it logs the error and skip them, so they don't get stuck in the pending buffer forever. This corresponds to 'skipInvalidRows=true' in 'insertAll' BigQuery call.

Returns number of rows sent to BigQuery. May return both non zero number of rows and an error if something bad happened midway.

func (*Log) Insert Uses

func (l *Log) Insert(ctx context.Context, entries ...Entry) (err error)

Insert adds a bunch of entries to the buffer of pending entries.

It will reuse existing datastore transaction (if any). This allows to log entries transactionally when changing something in the datastore.

Malformed entries are silently skipped during the flush.

Package bqlog imports 24 packages (graph) and is imported by 6 packages. Updated 2018-08-19. Refresh now. Tools for package owners.