beam: github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder Index | Files | Directories

package coder

import "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"

Package coder contains coder representation and utilities. Coders describe how to serialize and deserialize pipeline data and may be provided by users.

Index

Package Files

bool.go bytes.go coder.go double.go int.go iterable.go map.go registry.go row.go row_decoder.go row_encoder.go stringutf8.go time.go varint.go windows.go

Variables

var ErrVarIntTooLong = errors.New("varint too long")

ErrVarIntTooLong indicates a data corruption issue that needs special handling by callers of decode. TODO(herohde): have callers perform this special handling.

func DecodeBool Uses

func DecodeBool(r io.Reader) (bool, error)

DecodeBool decodes a boolean according to the beam protocol.

func DecodeByte Uses

func DecodeByte(r io.Reader) (byte, error)

DecodeByte decodes a single byte.

func DecodeBytes Uses

func DecodeBytes(r io.Reader) ([]byte, error)

DecodeBytes decodes a length prefixed []byte according to the beam protocol.

func DecodeDouble Uses

func DecodeDouble(r io.Reader) (float64, error)

DecodeDouble decodes a float64 in big endian format.

func DecodeEventTime Uses

func DecodeEventTime(r io.Reader) (typex.EventTime, error)

DecodeEventTime decodes an EventTime.

func DecodeInt32 Uses

func DecodeInt32(r io.Reader) (int32, error)

DecodeInt32 decodes an int32 in big endian format.

func DecodeStringUTF8 Uses

func DecodeStringUTF8(r io.Reader) (string, error)

DecodeStringUTF8 decodes a length prefixed UTF8 string.

func DecodeUint32 Uses

func DecodeUint32(r io.Reader) (uint32, error)

DecodeUint32 decodes an uint32 in big endian format.

func DecodeUint64 Uses

func DecodeUint64(r io.Reader) (uint64, error)

DecodeUint64 decodes an uint64 in big endian format.

func DecodeVarInt Uses

func DecodeVarInt(r io.Reader) (int64, error)

DecodeVarInt decodes an int64.

func DecodeVarUint64 Uses

func DecodeVarUint64(r io.Reader) (uint64, error)

DecodeVarUint64 decodes an uint64.

func DecoderForSlice Uses

func DecoderForSlice(rt reflect.Type) (func(io.Reader) (interface{}, error), error)

DecoderForSlice returns a decoding function that decodes the beam row encoding into the given type.

Returns an error if the given type is invalid or not decodable from a beam schema row.

func EncodeBool Uses

func EncodeBool(v bool, w io.Writer) error

EncodeBool encodes a boolean according to the beam protocol.

func EncodeByte Uses

func EncodeByte(v byte, w io.Writer) error

EncodeByte encodes a single byte.

func EncodeBytes Uses

func EncodeBytes(v []byte, w io.Writer) error

EncodeBytes encodes a []byte with a length prefix per the beam protocol.

func EncodeDouble Uses

func EncodeDouble(value float64, w io.Writer) error

EncodeDouble encodes a float64 in big endian format.

func EncodeEventTime Uses

func EncodeEventTime(et typex.EventTime, w io.Writer) error

EncodeEventTime encodes an EventTime as an uint64. The encoding is millis-since-epoch, but shifted so that the byte representation of negative values are lexicographically ordered before the byte representation of positive values.

func EncodeInt32 Uses

func EncodeInt32(value int32, w io.Writer) error

EncodeInt32 encodes an int32 in big endian format.

func EncodeStringUTF8 Uses

func EncodeStringUTF8(s string, w io.Writer) error

EncodeStringUTF8 encodes a UTF8 string with a length prefix.

func EncodeUint32 Uses

func EncodeUint32(value uint32, w io.Writer) error

EncodeUint32 encodes an uint32 in big endian format.

func EncodeUint64 Uses

func EncodeUint64(value uint64, w io.Writer) error

EncodeUint64 encodes an uint64 in big endian format.

func EncodeVarInt Uses

func EncodeVarInt(value int64, w io.Writer) error

EncodeVarInt encodes an int64.

func EncodeVarUint64 Uses

func EncodeVarUint64(value uint64, w io.Writer) error

EncodeVarUint64 encodes an uint64.

func EncoderForSlice Uses

func EncoderForSlice(rt reflect.Type) (func(interface{}, io.Writer) error, error)

EncoderForSlice returns an encoding function that encodes a struct type or a pointer to a struct type using the beam row encoding.

Returns an error if the given type is invalid or not encodable to a beam schema row.

func IsCoGBK Uses

func IsCoGBK(c *Coder) bool

IsCoGBK returns true iff the coder is for a CoGBK type.

func IsFieldNil Uses

func IsFieldNil(nils []byte, f int) bool

IsFieldNil examines the passed in packed bits nils buffer and returns true if the field at that index wasn't encoded and can be skipped in decoding.

func IsKV Uses

func IsKV(c *Coder) bool

IsKV returns true iff the coder is for key-value pairs.

func IsW Uses

func IsW(c *Coder) bool

IsW returns true iff the coder is for a WindowedValue.

func ReadRowHeader Uses

func ReadRowHeader(r io.Reader) (int, []byte, error)

ReadRowHeader handles the field header for row decodings.

This returns the number of encoded fileds, the raw bitpacked bytes and any error during decoding. Each bit only needs only needs to be examined once during decoding using the IsFieldNil helper function.

If there are no nil fields encoded,the byte array will be nil, and no encoded fields will be nil.

func ReadSimpleRowHeader Uses

func ReadSimpleRowHeader(fields int, r io.Reader) error

ReadSimpleRowHeader is a convenience function to read Beam Schema Row Headers for values that do not have any nil fields. Reads and validates the number of fields total (returning an error for mismatches, and checks that there are no nils encoded as a bit field.

func RegisterCoder Uses

func RegisterCoder(t reflect.Type, enc, dec interface{})

RegisterCoder registers a user defined coder for a given type, and will be used if there is no beam coder for that type. Must be called prior to beam.Init(), preferably in an init() function.

Coders are encoder and decoder pairs, and operate around []bytes.

The coder used for a given type follows this ordering:

1. Coders for Known Beam types.
2. Coders registered for specific types
3. Coders registered for interfaces types
4. Default coder (JSON)

Types of kind Interface, are handled specially by the registry, so they may be iterated over to check if element types implement them.

Repeated registrations of the same type overrides prior ones.

func RegisterSchemaProviders Uses

func RegisterSchemaProviders(rt reflect.Type, enc, dec interface{})

RegisterSchemaProviders Register Custom Schema providers.

func RequireAllFieldsExported Uses

func RequireAllFieldsExported(require bool)

RequireAllFieldsExported when set to true will have the default coder buildings using RowEncoderForStruct and RowDecoderForStruct fail if there are any unexported fields. When set false, unexported fields in default destination structs will be silently ignored when coding. This has no effect on types with registered coder providers.

func RowDecoderForStruct Uses

func RowDecoderForStruct(rt reflect.Type) (func(io.Reader) (interface{}, error), error)

RowDecoderForStruct returns a decoding function that decodes the beam row encoding into the given type.

Returns an error if the given type is invalid or not decodable from a beam schema row.

func RowEncoderForStruct Uses

func RowEncoderForStruct(rt reflect.Type) (func(interface{}, io.Writer) error, error)

RowEncoderForStruct returns an encoding function that encodes a struct type or a pointer to a struct type using the beam row encoding.

Returns an error if the given type is invalid or not encodable to a beam schema row.

func Types Uses

func Types(list []*Coder) []typex.FullType

Types returns a slice of types used by the supplied coders.

func WriteRowHeader Uses

func WriteRowHeader(n int, isNil func(int) bool, w io.Writer) error

WriteRowHeader handles the field header for row encodings.

func WriteSimpleRowHeader Uses

func WriteSimpleRowHeader(fields int, w io.Writer) error

WriteSimpleRowHeader is a convenience function to write Beam Schema Row Headers for values that do not have any nil fields. Writes the number of fields total and a 0 len byte slice to indicate no fields are nil.

type Coder Uses

type Coder struct {
    Kind Kind
    T    typex.FullType

    Components []*Coder     // WindowedValue, KV, CoGBK
    Custom     *CustomCoder // Custom
    Window     *WindowCoder // WindowedValue

    ID  string // (optional) This coder's ID if translated from a pipeline proto.
}

Coder is a description of how to encode and decode values of a given type. Except for the "custom" kind, they are built in and must adhere to the (unwritten) Beam specification.

func CoderFrom Uses

func CoderFrom(c *CustomCoder) *Coder

CoderFrom is a helper that creates a Coder from a CustomCoder.

func NewBool Uses

func NewBool() *Coder

NewBool returns a new bool coder using the built-in scheme.

func NewBytes Uses

func NewBytes() *Coder

NewBytes returns a new []byte coder using the built-in scheme. It is always nested, for now.

func NewCoGBK Uses

func NewCoGBK(components []*Coder) *Coder

NewCoGBK returns a coder for CoGBK elements.

func NewDouble Uses

func NewDouble() *Coder

NewDouble returns a new double coder using the built-in scheme.

func NewI Uses

func NewI(c *Coder) *Coder

NewI returns an iterable coder in the form of a slice.

func NewKV Uses

func NewKV(components []*Coder) *Coder

NewKV returns a coder for key-value pairs.

func NewPW Uses

func NewPW(c *Coder, w *WindowCoder) *Coder

NewPW returns a ParamWindowedValue coder for the window of elements.

func NewR Uses

func NewR(t typex.FullType) *Coder

NewR returns a schema row coder for the type.

func NewString Uses

func NewString() *Coder

NewString returns a new string coder using the built-in scheme.

func NewT Uses

func NewT(c *Coder, w *WindowCoder) *Coder

NewT returns a timer coder for the window of elements.

func NewVarInt Uses

func NewVarInt() *Coder

NewVarInt returns a new int64 coder using the built-in scheme.

func NewW Uses

func NewW(c *Coder, w *WindowCoder) *Coder

NewW returns a WindowedValue coder for the window of elements.

func SkipW Uses

func SkipW(c *Coder) *Coder

SkipW returns the data coder used by a WindowedValue, or returns the coder. This allows code to seamlessly traverse WindowedValues without additional conditional code.

func (*Coder) Equals Uses

func (c *Coder) Equals(o *Coder) bool

Equals returns true iff the two coders are equal. It assumes that functions with the same name and types are identical.

func (*Coder) String Uses

func (c *Coder) String() string

type CustomCoder Uses

type CustomCoder struct {
    // Name is the coder name. Informational only.
    Name string
    // Type is the underlying concrete type that is being coded. It is
    // available to Enc and Dec. It must be a concrete type.
    Type reflect.Type

    // Enc is the encoding function : T -> []byte. It may optionally take a
    // reflect.Type parameter and return an error as well.
    Enc *funcx.Fn
    // Dec is the decoding function: []byte -> T. It may optionally take a
    // reflect.Type parameter and return an error as well.
    Dec *funcx.Fn

    ID  string // (optional) This coder's ID if translated from a pipeline proto.
}

CustomCoder contains possibly untyped encode/decode user functions that are type-bound at runtime. Universal coders can thus be used for many different types, but each CustomCoder instance will be bound to a specific type.

func LookupCustomCoder Uses

func LookupCustomCoder(t reflect.Type) *CustomCoder

LookupCustomCoder returns the custom coder for the type if any, first checking for a specific matching type, and then iterating through registered interface coders in reverse registration order.

func NewCustomCoder Uses

func NewCustomCoder(id string, t reflect.Type, encode, decode interface{}) (*CustomCoder, error)

NewCustomCoder creates a coder for the supplied parameters defining a particular encoding strategy.

func (*CustomCoder) Equals Uses

func (c *CustomCoder) Equals(o *CustomCoder) bool

Equals returns true iff the two custom coders are equal. It assumes that functions with the same name and types are identical.

func (*CustomCoder) String Uses

func (c *CustomCoder) String() string

type ElementDecoder Uses

type ElementDecoder interface {
    Decode(r io.Reader) (interface{}, error)
}

ElementDecoder encapsulates being able to decode an element from a reader.

type ElementEncoder Uses

type ElementEncoder interface {
    Encode(element interface{}, w io.Writer) error
}

ElementEncoder encapsulates being able to encode an element into a writer.

type Kind Uses

type Kind string

Kind represents the type of coder used.

const (
    Custom             Kind = "Custom" // Implicitly length-prefixed
    Bytes              Kind = "bytes"  // Implicitly length-prefixed as part of the encoding
    String             Kind = "string" // Implicitly length-prefixed as part of the encoding.
    Bool               Kind = "bool"
    VarInt             Kind = "varint"
    Double             Kind = "double"
    Row                Kind = "R"
    Timer              Kind = "T"
    WindowedValue      Kind = "W"
    ParamWindowedValue Kind = "PW"
    Iterable           Kind = "I"
    KV                 Kind = "KV"
    LP                 Kind = "LP" // Explicitly length prefixed, likely at the runner's direction.

    Window Kind = "window" // A debug wrapper around a window coder.

    // CoGBK is currently equivalent to either
    //
    //     KV<X,Iterable<Y>>         (if GBK)
    //     KV<X,Iterable<KV<int,Y>>> (if CoGBK, using a tagged union encoding)
    //
    // It requires special handling in translation to the model pipeline in the latter case
    // to add the incoming index for each input.
    //
    // TODO(BEAM-490): once this JIRA is done, this coder should become the new thing.
    CoGBK Kind = "CoGBK"
)

Tags for the various Beam encoding strategies. https://beam.apache.org/documentation/programming-guide/#coders documents the usage of coders in the Beam environment.

type RowDecoderBuilder Uses

type RowDecoderBuilder struct {

    // RequireAllFieldsExported when set to true will have the default decoder building fail if
    // there are any unexported fields. When set false, unexported fields in default
    // destination structs will be silently ignored when decoding.
    // This has no effect on types with registered decoder providers.
    RequireAllFieldsExported bool
    // contains filtered or unexported fields
}

RowDecoderBuilder allows one to build Beam Schema row encoders for provided types.

func (*RowDecoderBuilder) Build Uses

func (b *RowDecoderBuilder) Build(rt reflect.Type) (func(io.Reader) (interface{}, error), error)

Build constructs a Beam Schema coder for the given type, using any providers registered for itself or it's fields.

func (*RowDecoderBuilder) Register Uses

func (b *RowDecoderBuilder) Register(rt reflect.Type, f interface{})

Register accepts a provider to decode schema encoded values of that type.

When decoding values, decoder functions produced by this builder will first check for exact type matches, then interfaces implemented by the type in recency order of registration, and then finally the default Beam Schema encoding behavior.

TODO(BEAM-9615): Add final factory types. This interface is subject to change. Currently f must be a function func(reflect.Type) (func(io.Reader) (interface{}, error), error)

type RowEncoderBuilder Uses

type RowEncoderBuilder struct {

    // RequireAllFieldsExported when set to true will have the default decoder building fail if
    // there are any unexported fields. When set false, unexported fields in default
    // destination structs will be silently ignored when decoding.
    // This has no effect on types with registered decoder providers.
    RequireAllFieldsExported bool
    // contains filtered or unexported fields
}

RowEncoderBuilder allows one to build Beam Schema row encoders for provided types.

func (*RowEncoderBuilder) Build Uses

func (b *RowEncoderBuilder) Build(rt reflect.Type) (func(interface{}, io.Writer) error, error)

Build constructs a Beam Schema coder for the given type, using any providers registered for itself or it's fields.

func (*RowEncoderBuilder) Register Uses

func (b *RowEncoderBuilder) Register(rt reflect.Type, f interface{})

Register accepts a provider for the given type to schema encode values of that type.

When generating encoding functions, this builder will first check for exact type matches, then against interfaces with registered factories in recency order of registration, and then finally use the default Beam Schema encoding behavior.

TODO(BEAM-9615): Add final factory types. This interface is subject to change. Currently f must be a function of the type func(reflect.Type) func(T, io.Writer) (error).

type WindowCoder Uses

type WindowCoder struct {
    Kind    WindowKind
    Payload string // Payload is only populated for parameterized window coders.
}

WindowCoder represents a Window coder.

func NewGlobalWindow Uses

func NewGlobalWindow() *WindowCoder

NewGlobalWindow returns a window coder for the global window.

func NewIntervalWindow Uses

func NewIntervalWindow() *WindowCoder

NewIntervalWindow returns a window coder for interval windows.

func (*WindowCoder) Equals Uses

func (w *WindowCoder) Equals(o *WindowCoder) bool

Equals returns whether passed in WindowCoder has the same Kind and Payload as this WindowCoder.

func (*WindowCoder) String Uses

func (w *WindowCoder) String() string

type WindowKind Uses

type WindowKind string

WindowKind represents a kind of window coder.

const (
    GlobalWindow   WindowKind = "GWC"
    IntervalWindow WindowKind = "IWC"
)

Available window coders. The same coder could be used for more than one kind of windowing strategy.

Directories

PathSynopsis
testutilPackage testutil contains helpers to test and validate custom Beam Schema coders.

Package coder imports 12 packages (graph) and is imported by 11 packages. Updated 2021-01-25. Refresh now. Tools for package owners.