baker: github.com/AdRoll/baker Index | Files | Directories

package baker

import "github.com/AdRoll/baker"

Package baker provides types and functions to build a pipeline for the processing of structured data.

Structured data is represented by the Record interface. LogLine implements that interface and represents a csv record.

Using the functions in the package one can build and run a Topology, reading its configuration from a TOML file.

The package doesn't include any component. They can be found in their respective packages (baker/input, baker/filter, baker/output and baker/upload).

The README file in the project repository provides additional information and examples: https://github.com/AdRoll/baker/blob/main/README.md

Index

Package Files

api.go baker.go baker_cli.go config.go desc.go help.go help_config.go help_markdown.go help_text.go logline.go metrics.go metrics_client.go nop_metrics.go record.go stats.go term.go test_helper.go topology.go

Constants

const (
    // LogLineNumFields is the maximum number of standard fields in a log line.
    LogLineNumFields FieldIndex = 3000
    // NumFieldsBaker is an additional list of custom fields, not present
    // in the input logline nor in the output, that can be set during processing.
    // Its main purpose it to fastly exchange values between filters (and possibly
    // outputs) on a per-record basis.
    NumFieldsBaker FieldIndex = 100

    // DefaultLogLineFieldSeparator defines the default field separator, which is the comma
    DefaultLogLineFieldSeparator byte = 44
)

func CheckRequiredFields Uses

func CheckRequiredFields(cfg interface{}) string

CheckRequiredFields checks that all fields that are tagged as required in cfg's type have actually been set to a value other than the field type zero value. If not CheckRequiredFields returns the name of the first required field that is not set, or, it returns an empty string if all required fields are set of the struct doesn't have any required fields (or any fields at all).

CheckRequiredFields doesn't support struct embedding other structs.

func GenerateMarkdownHelp Uses

func GenerateMarkdownHelp(w io.Writer, desc interface{}) error

GenerateMarkdownHelp generates markdown-formatted textual help for a Baker component from its description structure. Markdown is written into w.

func GenerateTextHelp Uses

func GenerateTextHelp(w io.Writer, desc interface{}) error

GenerateTextHelp generates non-formatted textual help for a Baker component from its description structure, into w.

func Main Uses

func Main(cfg *Config) error

Main runs the topology corresponding to the provided configuration. Depending on the input, it either blocks forever (daemon) or terminates when all the records have been processed (batch).

func MainCLI Uses

func MainCLI(components Components) error

MainCLI provides a handy way to quickly create a command-line interface to Baker by providing the list of components available to build and run a topology.

The function includes many utilities that can be configured by command line arguments:

-help: Prints available options and components
-v: verbose logging (not compatible with -q)
-q: quiet logging (not compatible with -v)
-pretty: logs in textual format instead of JSON format
-pprof: run a pprof server on the provided host:port address

The function also expects the first non-positional argument to represent the path to the Baker Topology file

func PrintHelp Uses

func PrintHelp(w io.Writer, name string, comp Components, format HelpFormat) error

PrintHelp prints the help message for the given component, identified by its name. When name is '*' it shows the help messages for all components.

The help message includes the component's description as well as the help messages for all component's configuration keys.

An example of usage is:

var flagPrintHelp = flag.String("help", "", "show help for a `component` ('*' for all)")
flag.Parse()
comp := baker.Components{ /* add all your baker components here */ }
PrintHelp(os.Stderr, *flagPrintHelp, comp)

Help output example:

$ ./baker-bin -help TCP

=============================================
Input: TCP
=============================================
This input relies on a TCP connection to receive records in the usual format
Configure it with a host and port that you want to accept connection from.
By default it listens on port 6000 for any connection
It never exits.

Keys available in the [input.config] section:

Name               | Type               | Default                    | Help
----------------------------------------------------------------------------------------------------
Listener           | string             |                            | Host:Port to bind to
----------------------------------------------------------------------------------------------------

func RecordConformanceTest Uses

func RecordConformanceTest(t *testing.T, create func() Record)

RecordConformanceTest is a test helper that verifies the conformance of Record implementation with a set of requirements.

func RenderHelpMarkdown Uses

func RenderHelpMarkdown(w io.Writer, name string, comp Components) error

RenderHelpMarkdown prints markdown formatted help for a single component or for all of them (with name = '*'), and renders it so that it can be printed on a terminal.

func RequiredFields Uses

func RequiredFields(cfg interface{}) []string

RequiredFields returns the names of the underlying configuration structure fields which are tagged as required. To tag a field as being required, a "required" struct struct tag must be present and set to true.

RequiredFields doesn't support struct embedding other structs.

type Cache Uses

type Cache map[string]interface{}

Cache is a per-record cache.

func (*Cache) Clear Uses

func (c *Cache) Clear()

Clear clears all the entries in the cache.

func (*Cache) Del Uses

func (c *Cache) Del(key string)

Del removes the given cache entry.

func (*Cache) Get Uses

func (c *Cache) Get(key string) (val interface{}, ok bool)

Get fetches the value with the given key. If the key is not present in the cache Get returns (nil, false).

func (*Cache) Set Uses

func (c *Cache) Set(key string, val interface{})

Set assigns the given value to a specific key.

type ComponentParams Uses

type ComponentParams struct {
    DecodedConfig  interface{}                     // decoded component-specific struct (from configuration file)
    CreateRecord   func() Record                   // factory function to create new empty records
    FieldByName    func(string) (FieldIndex, bool) // translates field names to Record indexes
    FieldNames     []string                        // FieldNames holds field names, indexed by their FieldIndex
    ValidateRecord ValidationFunc                  // function to validate a record
    Metrics        MetricsClient                   // Metrics allows components to add code instrumentation and have metrics exported to the configured backend, if any?
}

ComponentParams holds the common configuration parameters passed to components of all kinds.

type Components Uses

type Components struct {
    Inputs  []InputDesc  // Inputs represents the list of available inputs
    Filters []FilterDesc // Filters represents the list of available filters
    Outputs []OutputDesc // Outputs represents the list of available outputs
    Uploads []UploadDesc // Uploads represents the list of available uploads

    Metrics []MetricsDesc // Metrics represents the list of available metrics clients
    User    []UserDesc    // User represents the list of user-defined configurations

    ShardingFuncs map[FieldIndex]ShardingFunc // ShardingFuncs are functions to calculate sharding based on field index
    Validate      ValidationFunc              // Validate is the function used to validate a Record
    CreateRecord  func() Record               // CreateRecord creates a new record

    FieldByName func(string) (FieldIndex, bool) // FieldByName gets a field index by its name
    FieldNames  []string                        // FieldNames holds field names, indexed by their FieldIndex
}

Components holds the descriptions of all components one can use to build a topology.

type Config Uses

type Config struct {
    Input       ConfigInput
    FilterChain ConfigFilterChain
    Filter      []ConfigFilter
    Output      ConfigOutput
    Upload      ConfigUpload

    General ConfigGeneral
    Fields  ConfigFields
    Metrics ConfigMetrics
    CSV     ConfigCSV
    User    []ConfigUser
    // contains filtered or unexported fields
}

A Config specifies the configuration for a topology.

func NewConfigFromToml Uses

func NewConfigFromToml(f io.Reader, comp Components) (*Config, error)

NewConfigFromToml creates a Config from a reader reading from a TOML configuration. comp describes all the existing components.

func (*Config) String Uses

func (c *Config) String() string

String returns a string representation of the exported fields of c.

type ConfigCSV Uses

type ConfigCSV struct {
    // FieldSeparator defines the fields separator used in the records
    FieldSeparator string `toml:"field_separator"`
}

ConfigCSV defines configuration for CSV records

type ConfigFields Uses

type ConfigFields struct {
    Names []string
}

ConfigFields specifies names for records fields. In addition of being a list of names, the position of each name in the slice also indicates the FieldIndex for that name. In other words, if Names[0] = "address", then a FieldIndex of 0 is that field, and "address" is the name of that field.

type ConfigFilter Uses

type ConfigFilter struct {
    Name          string
    DecodedConfig interface{}

    Config *toml.Primitive
    // contains filtered or unexported fields
}

ConfigFilter specifies the configuration for a single filter component.

type ConfigFilterChain Uses

type ConfigFilterChain struct {
    // Procs specifies the number of baker filters running concurrently.
    // When set to a value greater than 1, filtering may be faster but
    // record ordering is not guaranteed anymore.
    // The default value is 16
    Procs int
}

ConfigFilterChain specifies the configuration for the whole fitler chain.

type ConfigGeneral Uses

type ConfigGeneral struct {
    // DontValidateFields reports whether records validation is skipped (by not calling Components.Validate)
    DontValidateFields bool `toml:"dont_validate_fields"`
}

A ConfigGeneral specifies general configuration for the whole topology.

type ConfigInput Uses

type ConfigInput struct {
    Name          string
    ChanSize      int // ChanSize represents the size of the channel to send records from the input to the filters, the default value is 1024
    DecodedConfig interface{}

    Config *toml.Primitive
    // contains filtered or unexported fields
}

ConfigInput specifies the configuration for the input component.

type ConfigMetrics Uses

type ConfigMetrics struct {
    Name          string
    DecodedConfig interface{}

    Config *toml.Primitive
    // contains filtered or unexported fields
}

ConfigMetrics holds metrics configuration.

type ConfigOutput Uses

type ConfigOutput struct {
    Name string
    // Procs defines the number of baker outputs running concurrently.
    // Only set Procs to a value greater than 1 if the output is concurrent safe.
    Procs         int
    ChanSize      int      // ChanSize represents the size of the channel to send records to the ouput component(s), the default value is 16384
    Sharding      string   // Sharding is the name of the field used for sharding
    Fields        []string // Fields holds the name of the record fields the output receives
    DecodedConfig interface{}

    Config *toml.Primitive
    // contains filtered or unexported fields
}

ConfigOutput specifies the configuration for the output component.

type ConfigUpload Uses

type ConfigUpload struct {
    Name          string
    DecodedConfig interface{}

    Config *toml.Primitive
    // contains filtered or unexported fields
}

ConfigUpload specifies the configuration for the upload component.

type ConfigUser Uses

type ConfigUser struct {
    Name   string
    Config *toml.Primitive
}

A ConfigUser defines a user-specific configuration entry.

type Data Uses

type Data struct {
    Bytes []byte   // Bytes is the slice of raw bytes read by an input
    Meta  Metadata // Meta is filled by the input and holds metadata that will be associated to the records parsed from Bytes
}

Data represents raw data consumed by a baker input, possibly containing multiple records before they're parsed.

type ErrorRequiredField Uses

type ErrorRequiredField struct {
    Field string // Field is the name of the missing field
}

ErrorRequiredField describes the absence of a required field in a component configuration.

func (ErrorRequiredField) Error Uses

func (e ErrorRequiredField) Error() string

type FieldIndex Uses

type FieldIndex int

FieldIndex is the index uniquely representing of a field in a Record.

type Filter Uses

type Filter interface {
    // Process processes a single Record, and then optionally sends it to
    // next filter in the chain.
    // Process might mutate the Record, adding/modifying/removing fields,
    // and might decide to throw it away, or pass it to next filter in chain
    // by calling the next() function. In some cases, a filter might generate
    // multiple Record in output, by calling next() multiple times.
    // next() is guaranteed to be non-nil; for the last filter of the chain,
    // it points to a function that wraps up the filtering chain and sends
    // the Record to the output.
    Process(l Record, next func(Record))

    // Stats returns stats about the filter
    Stats() FilterStats
}

Filter represents a data filter; a filter is a function that processes records. A filter can discard, transform, forward and even create records.

type FilterDesc Uses

type FilterDesc struct {
    Name   string                             // Name of the filter
    New    func(FilterParams) (Filter, error) // New is the constructor-like function called by the topology to create a new filter
    Config interface{}                        // Config is the component configuration
    Help   string                             // Help string
}

FilterDesc describes a Filter component to the topology.

type FilterParams Uses

type FilterParams struct {
    ComponentParams
}

FilterParams holds the parameters passed to Filter constructor.

type FilterStats Uses

type FilterStats struct {
    NumProcessedLines int64
    NumFilteredLines  int64
    Metrics           MetricsBag
}

FilterStats contains statistics about the filter components, ready for export to the metric client and to print debug info

type HelpFormat Uses

type HelpFormat int

HelpFormat represents the possible formats for baker help.

const (
    // HelpFormatRaw is for raw-formatted help.
    HelpFormatRaw HelpFormat = iota

    // HelpFormatMarkdown is for markdown formatted help.
    HelpFormatMarkdown
)

type Input Uses

type Input interface {
    // Start fetching data and pushing it into the channel.
    // If this call blocks forever, the topology is permanent and
    // acts like a long-running daemon; if this calls exits after
    // it has finished, the topology is meant to be run as a task
    // to process a fixed-size input, and baker will cleanly shutdown
    // after all inputs have been fully processed.
    Run(output chan<- *Data) error

    // Force the input to stop as clean as possible, at a good boundary.
    // This is usually issued at the user's request of exiting the process.
    // For instance, it might make sense to finish processing the current
    // batch of data or the current file, and then save in stable storage
    // the checkpoint to resume it later.
    Stop()

    // Return stats about the input
    Stats() InputStats

    // This function is called when the filter is finished with
    // the memory received through the input channel. Since the
    // memory was allocated by Input, it is returned to it
    // so that it might be recycled.
    FreeMem(data *Data)
}

Input is an interface representing an object that produces (fetches) datas for the filter.

type InputDesc Uses

type InputDesc struct {
    Name   string                           // Name of the input
    New    func(InputParams) (Input, error) // New is the constructor-like function called by the topology to create a new input
    Config interface{}                      // Config is the component configuration
    Help   string                           // Help string
}

InputDesc describes an Input component to the topology.

type InputParams Uses

type InputParams struct {
    ComponentParams
}

InputParams holds the parameters passed to Input constructor.

type InputStats Uses

type InputStats struct {
    NumProcessedLines int64
    CustomStats       map[string]string
    Metrics           MetricsBag
}

InputStats contains statistics about the input component, ready for export to the metric client and to print debug info.

type LogLine Uses

type LogLine struct {

    // FieldSeparator is the byte used to separate fields value.
    FieldSeparator byte
    // contains filtered or unexported fields
}

LogLine represents a CSV text line using ASCII 30 as field separator. It implement Record..

In memory, it is kept in a format optimized for very fast parsing and low memory-consumption. The vast majority of fields are never accessed during the lifetime of an object, as a filter usually reads or writes just a handful of fields; it thus makes sense to do the quickest possible initial parsing, deferring as much as possible to when a field is actually accessed.

It is also possible to modify a LogLine in memory, as it gets processed. Modifications can be done through the Set() method, and can be done to any field, both those that had a parsed value, and those that were empty.

func (*LogLine) Cache Uses

func (l *LogLine) Cache() *Cache

Cache returns the cache that is local to the current log line.

func (*LogLine) Clear Uses

func (l *LogLine) Clear()

Clear clears the logline

func (*LogLine) Copy Uses

func (l *LogLine) Copy() Record

Copy creates and returns a copy of the current log line.

func (*LogLine) Get Uses

func (l *LogLine) Get(f FieldIndex) []byte

Get the value of a field (either standard or custom)

func (*LogLine) Meta Uses

func (l *LogLine) Meta(key string) (interface{}, bool)

Meta returns the metadata having the given specific key, if any.

func (*LogLine) Parse Uses

func (l *LogLine) Parse(text []byte, meta Metadata) error

Parse finds the next newline in data and parse log line fields from it into the current LogLine.

This is the moral equivalent of bytes.Split(), but without memory allocations.

NOTE: this function is meant to be called onto a just-constructed LogLine instance. For performance reasons, it doesn't reset all the writable fields of the line. If you want to use Parse over an already parsed LogLine, use Clear before.

func (*LogLine) Set Uses

func (l *LogLine) Set(f FieldIndex, data []byte)

Set changes the value of a field (either standard or custom) to a new value

func (*LogLine) ToText Uses

func (l *LogLine) ToText(buf []byte) []byte

ToText converts back the LogLine to textual format and appends it to the specified buffer. If called on a default constructed LogLine (zero-value), ToText returns nil, which is an useless but syntactically valid buffer.

type Metadata Uses

type Metadata map[string]interface{}

Metadata about the input data; each Input will directly populate this map as appropriate. Consumers (filters) will access via Get()

type MetricsBag Uses

type MetricsBag map[string]interface{}

A MetricsBag is collection of metrics, those metrics are reported by every Baker components, through their Stats method. Stats() is called once per second, and contains a MetricsBag filled with values relative to that last second.

func (MetricsBag) AddDeltaCounter Uses

func (bag MetricsBag) AddDeltaCounter(name string, delta int64)

AddDeltaCounter adds a count of something that happened in the last second

func (MetricsBag) AddGauge Uses

func (bag MetricsBag) AddGauge(name string, value float64)

AddGauge takes a snapshot of a value.

func (MetricsBag) AddHistogram Uses

func (bag MetricsBag) AddHistogram(name string, values []float64)

AddHistogram adds a set of values to track their statistical distribution.

func (MetricsBag) AddRawCounter Uses

func (bag MetricsBag) AddRawCounter(name string, value int64)

AddRawCounter adds a counter that always increments.

func (MetricsBag) AddTimings Uses

func (bag MetricsBag) AddTimings(name string, values []time.Duration)

AddTimings adds a set of timings to track their statistical distribution.

func (MetricsBag) Merge Uses

func (bag MetricsBag) Merge(other MetricsBag)

Merge merges another MetricsBag into this 'bag'.

type MetricsClient Uses

type MetricsClient interface {

    // Gauge sets the value of a metric of type gauge. A Gauge represents a
    // single numerical data point that can arbitrarily go up and down.
    Gauge(name string, value float64)

    // GaugeWithTags sets the value of a metric of type gauge and associates
    // that value with a set of tags.
    GaugeWithTags(name string, value float64, tags []string)

    // RawCount sets the value of a metric of type counter. A counter is a
    // cumulative metrics that can only increase. RawCount sets the current
    // value of the counter.
    RawCount(name string, value int64)

    // RawCountWithTags sets the value of a metric or type counter and associates
    // that value with a set of tags.
    RawCountWithTags(name string, value int64, tags []string)

    // DeltaCount increments the value of a metric of type counter by delta.
    // delta must be positive.
    DeltaCount(name string, delta int64)

    // DeltaCountWithTags increments the value of a metric or type counter and
    // associates that value with a set of tags.
    DeltaCountWithTags(name string, delta int64, tags []string)

    // Histogram adds a sample to a metric of type histogram. A histogram
    // samples observations and counts them in different 'buckets' in order
    // to track and show the statistical distribution of a set of values.
    Histogram(name string, value float64)

    // HistogramWithTags adds a sample to an histogram and associates that
    // sample with a set of tags.
    HistogramWithTags(name string, value float64, tags []string)

    // Duration adds a duration to a metric of type histogram. A histogram
    // samples observations and counts them in different 'buckets'. Duration
    // is basically an histogram but allows to sample values of type time.Duration.
    Duration(name string, value time.Duration)

    // DurationWithTags adds a duration to an histogram and associates that
    // duration with a set of tags.
    DurationWithTags(name string, value time.Duration, tags []string)
}

A MetricsClient allows to instrument components code and communicate the metrics to the metrics backend that is configured in Baker.

New metrics backends must implement this interface and register their description as a MetricDesc in Components.Metrics. See ./examples/metrics.

type MetricsDesc Uses

type MetricsDesc struct {
    Name   string                                   // Name of the metrics interface
    Config interface{}                              // Config is the metrics client specific configuration
    New    func(interface{}) (MetricsClient, error) // New is the constructor-like function called by the topology to create a new metrics client
}

MetricsDesc describes a Metrics interface to the topology.

type NopMetrics Uses

type NopMetrics struct{}

NopMetrics implements a metrics.Client that does nothing.

func (NopMetrics) DeltaCount Uses

func (NopMetrics) DeltaCount(name string, delta int64)

func (NopMetrics) DeltaCountWithTags Uses

func (NopMetrics) DeltaCountWithTags(name string, delta int64, tags []string)

func (NopMetrics) Duration Uses

func (NopMetrics) Duration(name string, value time.Duration)

func (NopMetrics) DurationWithTags Uses

func (NopMetrics) DurationWithTags(name string, value time.Duration, tags []string)

func (NopMetrics) Gauge Uses

func (NopMetrics) Gauge(name string, value float64)

func (NopMetrics) GaugeWithTags Uses

func (NopMetrics) GaugeWithTags(name string, value float64, tags []string)

func (NopMetrics) Histogram Uses

func (NopMetrics) Histogram(name string, value float64)

func (NopMetrics) HistogramWithTags Uses

func (NopMetrics) HistogramWithTags(name string, value float64, tags []string)

func (NopMetrics) RawCount Uses

func (NopMetrics) RawCount(name string, value int64)

func (NopMetrics) RawCountWithTags Uses

func (NopMetrics) RawCountWithTags(name string, value int64, tags []string)

type Output Uses

type Output interface {
    // Run processes the OutputRecord data coming through a channel.
    // Run must block until in channel has been closed and it has processed
    // all records.
    // It can send filenames via upch, they will be handled by an Upload if one
    // is present in the topology.
    // TODO: since Run must be blocking, it could return an error, useful
    // for the topology to acknowledge the correct processing if nil, or
    // end the whole topology in case non-nil.
    Run(in <-chan OutputRecord, upch chan<- string) error

    // Stats returns stats about the output.
    Stats() OutputStats

    // CanShards returns true if this output supports sharding.
    CanShard() bool
}

Output is the final end of a topology, it process the records that have reached the end of the filter chain and performs the final action (storing, sending through the wire, counting, etc.)

type OutputDesc Uses

type OutputDesc struct {
    Name   string                             // Name of the output
    New    func(OutputParams) (Output, error) // New is the constructor-like function called by the topology to create a new output
    Config interface{}                        // Config is the component configuration
    Raw    bool                               // Raw reports whether the output accepts a raw record
    Help   string                             // Help string
}

OutputDesc describes an Output component to the topology.

type OutputParams Uses

type OutputParams struct {
    ComponentParams
    Index  int          // tells the index of the output, in case multiple parallel output procs are used
    Fields []FieldIndex // fields of the record that will be send to the output
}

OutputParams holds the parameters passed to Output constructor.

type OutputRecord Uses

type OutputRecord struct {
    Fields []string // Fields are the fields sent to a Baker output.
    Record []byte   // Record is the data representation of a Record (obtained with Record.ToText())
}

OutputRecord is the data structure sent to baker output components.

It represents a Record in two possibile formats:

* a list of pre-parsed fields, extracted from the record (as string).
  This is useful when the output only cares about specific fields and does
  not need the full record.
* the whole record, as processed and possibly modified by baker filters (as []byte).

Fields sent to the output are described in the topology. This was designed such as an output can work in different modes, by processing different fields under the control of the user. Some fields might be required, and this validation should be performed by the Output itself. The topology can also declare no fields in which case, the Fields slice will be empty.

Record is non-nil only if the output declares itself as a raw output (see OutputDesc.Raw). This is done for performance reasons, as recreating the whole record requires allocations and memory copies, and is not always required.

type OutputStats Uses

type OutputStats struct {
    NumProcessedLines int64
    NumErrorLines     int64
    CustomStats       map[string]string
    Metrics           MetricsBag
}

OutputStats contains statistics about the output component, ready for export to the metric client and to print debug info

type Record Uses

type Record interface {
    // Parse decodes a buffer representing a record in its data format into
    // the current record instance.
    //
    // The given Metadata will be attached to that record. Record
    // implementations should also accept a nil in case the record has no
    // Metadata attached.
    Parse([]byte, Metadata) error

    // ToText returns the reconstructed data format of a record.
    //
    // In case a big enough buf is passed, it will be used to serialize the
    // record.
    ToText(buf []byte) []byte

    // Copy creates and returns a copy of the current record.
    //
    // The copied record could have been obtained by:
    //  var dst Record
    //  src.Parse(dst.ToText(), nil)
    //
    // but Record implementations should provide a more efficient way.
    Copy() Record

    // Clear clears the record internal state, making it empty.
    Clear()

    // Get the value of a field.
    Get(FieldIndex) []byte

    // Set the value of a field.
    Set(FieldIndex, []byte)

    // Meta returns the value of the attached metadata for the given key, if any.
    //
    // Records implementers may implement that method by declaring:
    //  type MyRecord struct {
    // 	      meta baker.Metadata
    //  }
    //
    //  func (r *MyRecord) Meta(key string) (interface{}, bool) {
    //  	return l.meta.get(key)
    //  }
    Meta(key string) (v interface{}, ok bool)

    // Cache holds a cache which is local to the record. It may be used to
    // speed up parsing of specific fields by caching the result. When
    // accessing a field and parsing its value, we want to try caching as much
    // as possible the parsing we do, to avoid redoing it later when
    // the same record is processed by different code.
    // Since cached values are interfaces it's up to who fetches a value to
    // know the underlying type of the cached value and perform a type assertion.
    //
    //  var ll Record
    //  val, ok := ll.Cache.Get("mykey")
    //  if !ok {
    //  	// long computation/parsing...
    //  	val = "14/07/1789"
    //  	ll.Cache.Set("mykey", val)
    //  }
    //
    //  // do something with the result
    //  result := val.(string)
    Cache() *Cache
}

Record is the basic object being processed by baker components. types implementing Record hold the memory representation of a single record.

type ShardingFunc Uses

type ShardingFunc func(Record) uint64

A ShardingFunc calculates a sharding value for a record.

Sharding functions are silent to errors in the specified fields. If a field is corrupt, they will probabily ignore it and still compute the best possible sharding value. Obviously a very corrupted field (eg: empty) could result into an uneven sharding.

type StatsDumper Uses

type StatsDumper struct {
    // contains filtered or unexported fields
}

A StatsDumper gathers statistics about all baker components of topology.

func NewStatsDumper Uses

func NewStatsDumper(t *Topology) (sd *StatsDumper)

NewStatsDumper creates and initializes a StatsDumper using the given topology and writing stats on standard output. If also exports metrics via the Metrics interface configured with the Topology, if any.

func (*StatsDumper) Run Uses

func (sd *StatsDumper) Run() (stop func())

Run starts dumping stats every second on standard output. Call stop() to stop periodically dumping stats, this prints stats one last time.

func (*StatsDumper) SetWriter Uses

func (sd *StatsDumper) SetWriter(w io.Writer)

SetWriter sets the writer into which stats are written. SetWriter must be called before Run().

type Topology Uses

type Topology struct {
    Input   Input
    Filters []Filter
    Output  []Output
    Upload  Upload
    // contains filtered or unexported fields
}

Topology defines the baker topology, that is how to retrieve records (input), how to process them (filter), and where to output the results (output+upload)

func NewTopologyFromConfig Uses

func NewTopologyFromConfig(cfg *Config) (*Topology, error)

NewTopologyFromConfig gets a baker configuration and returns a Topology

func (*Topology) Error Uses

func (t *Topology) Error() error

Return the global (sticky) error state of the topology. Calling this function makes sense after Wait() is complete (before that, it is potentially subject to races). Errors from the input components are returned here, because they are considered fatals for the topology; all other errors (like transient network stuff during output) are not considered fatal, and are supposed to be handled within the components themselves.

func (*Topology) Start Uses

func (t *Topology) Start()

Start starts the Topology, that is start all components. This function also intercepts the interrupt signal (ctrl+c) starting the graceful shutdown (calling Topology.Stop())

func (*Topology) Stop Uses

func (t *Topology) Stop()

Stop requires the currently running topology stop safely, but ASAP. The stop request is forwarded to the input that triggers the chain of stops from the components (managed into Topology.Wait)

func (*Topology) Wait Uses

func (t *Topology) Wait()

Wait until the topology shuts itself down. This can happen because the input component exits (in a batch topology), or in response to a SIGINT signal, that is handled as a clean shutdown request.

type Upload Uses

type Upload interface {
    // Run processes the output result as it comes through the channel.
    // Run must block forever
    // upch will receive filenames that Output wants to see uploaded.
    Run(upch <-chan string) error

    // Stop forces the upload to stop as cleanly as possible, which usually
    // means to finish up all the existing downloads.
    Stop()

    // Stats returns stats about the upload process
    Stats() UploadStats
}

Upload uploads files created by the topology output to a configured location.

type UploadDesc Uses

type UploadDesc struct {
    Name   string                             // Name of the upload component
    New    func(UploadParams) (Upload, error) // New is the constructor-like function called by the topology to create a new upload
    Config interface{}                        // Config is the component configuration
    Help   string                             // Help string
}

UploadDesc describes an Upload component to the topology.

type UploadParams Uses

type UploadParams struct {
    ComponentParams
}

UploadParams is the struct passed to the Upload constructor.

type UploadStats Uses

type UploadStats struct {
    NumProcessedFiles int64
    NumErrorFiles     int64
    CustomStats       map[string]string
    Metrics           MetricsBag
}

UploadStats contains statistics about the upload component, ready for export to the metric client and to print debug info

type UserDesc Uses

type UserDesc struct {
    Name   string
    Config interface{}
}

UserDesc describes user-specific configuration sections.

type ValidationFunc Uses

type ValidationFunc func(Record) (bool, FieldIndex)

ValidationFunc checks the validity of a record, returning true if it's valid. If a validation error is found it returns false and the index of the field that failed validation.

Directories

PathSynopsis
filterPackage filter provides filter components.
filter/filtertest
inputPackage input provides input components
input/inputtest
input/inpututils
metricsPackage metrics provides the available metrics implementations
metrics/datadogPackage datadog provides types and functions to export metrics and logs to Datadog via a statds client.
outputPackage output provides output components
output/outputtest
output/websocket
pkg/awsutilsPackage awsutils provides aws-specific types and functions.
pkg/buffercachePackage buffercache provides the BufferCache type, a kind of map[string][]byte that is optimized for appending new bytes to the cache values.
pkg/splitwriterPackage splitwriter provides a WriteCloser that writes to a file and splits it into smaller files when it's closed.
testutil
uploadPackage upload provides upload components
upload/uploadtest

Package baker imports 24 packages (graph) and is imported by 13 packages. Updated 2021-01-22. Refresh now. Tools for package owners.