prototransform

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2023 License: Apache-2.0 Imports: 30 Imported by: 0

README

The Buf logo

Prototransform

Build Report Card GoDoc

Convert protobuf message data to alternate formats

Use the prototransform library to simplify your data transformation & collection. Our simple package allows the caller to convert a given message data blob from one format to another by referring to a type schema on the Buf Schema Registry.

  • No need to bake in proto files
  • Supports Binary, JSON and Text formats
  • Extensible for other/custom formats

Getting started

prototransform is designed to be flexible enough to fit quickly into your development environment.

Here's an example of how you could use prototransform to transform messages received from a PubSub topic...

Transform Messages from a Topic

Whilst prototransform has various applications, converting messages off some kind of message queue is a primary use-case. This can take many forms, for the purposes of simplicity we will look at this abstractly in a pub/sub model where we want to:

  1. Open a subscription to a topic with the Pub/Sub service of your choice
  2. Start a SchemaWatcher to fetch a module from the Buf Schema Registry
  3. Receive, Transform and Acknowledge messages from the topic
Opening a Subscription & Schema Watcher
import (
	"context"
	"fmt"

	"github.com/bufbuild/prototransform"
	"gocloud.dev/pubsub"
	_ "gocloud.dev/pubsub/<driver>"
)
...
	subs, err := pubsub.OpenSubscription(ctx, "<driver-url>")
	if err != nil {
		return fmt.Errorf("could not open topic subscription: %v", err)
	}
	defer subs.Shutdown(ctx)
	// Supply auth credentials to the BSR
	client := prototransform.NewDefaultFileDescriptorSetServiceClient("<bsr-token>")
	// Configure the module for schema watcher
	cfg := &prototransform.SchemaWatcherConfig{
		SchemaPoller: prototransform.NewSchemaPoller(
			client,
			"buf.build/someuser/somerepo", // BSR module
			"some-tag", // tag or draft name or leave blank for "latest"
		),
	}
	watcher, err := prototransform.NewSchemaWatcher(ctx, cfg)
	if err != nil {
		return fmt.Errorf("failed to create schema watcher: %v", err)
	}
	defer watcher.Stop()
	// before we start processing messages, make sure the schema has been
	// downloaded
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()
	if err := watcher.AwaitReady(ctx); err != nil {
		return fmt.Errorf("schema watcher never became ready: %v", err)
	}
...

A SchemaWatcher is the entrypoint of prototransform. This is created first so your code can connect to the Buf Schema Registry and fetch a schema to be used to transform and/or filter payloads.

Prepare a converter

A Converter implements the functionality to convert payloads to different formats and optionally filter/mutate messages during this transformation. In the following example, we have initialized a *prototransform.Converter which expects a binary input and will return JSON.

...
    converter := &prototransform.Converter{
        Resolver:       schemaWatcher,
        InputFormat:    prototransform.BinaryInputFormat(proto.UnmarshalOptions{}),
        OutputFormat:   prototransform.JSONOutputFormat(protojson.MarshalOptions{}),
    }
...

Out of the box, you can supply proto, protojson and prototext here but feel free to supply your own custom formats as-well.

FORMAT InputFormat OutputFormat
JSON prototransform.JSONInputFormat() prototransform.JSONOutputFormat()
TEXT prototransform.TEXTInputFormat() prototransform.TEXTOutputFormat()
Binary prototransform.BinaryInputFormat() prototransform.BinaryOutputFormat()
Receiving and Transforming Messages

Now that we have an active subscription, schema watcher, and converter, we can start processing messages. A simple subscriber that transforms received messages looks like this:

...
    // Loop on received messages.
    for {
        msg, err := subscription.Receive(ctx)
        if err != nil {
            log.Printf("Receiving message: %v", err)
            break
        }
        // Do transformation based on the message name
        convertedMessage, err := converter.ConvertMessage("<message-name>", msg.Body)
        if err != nil {
            log.Printf("Converting message: %v", err)
            break
        }
        fmt.Printf("Converted message: %q\n", convertedMessage)

        msg.Ack()
    }
...

For illustrative purposes, let's assume that the topic we have subscribed to is buf.connect.demo.eliza.v1, we have the module stored on the BSR here. We would configure the message name as buf.connect.demo.eliza.v1.ConverseRequest.

Options

Cache

A SchemaWatcher can be configured with a user-supplied cache implementation, to act as a fallback when fetching schemas. The interface is of the form:

type Cache interface {
    Load(ctx context.Context, key string) ([]byte, error)
    Save(ctx context.Context, key string, data []byte) error
}

This repo provides three implementations that you can use:

  1. filecache: Cache schemas in local files.
  2. rediscache: Cache schemas in a shared Redis server.
  3. memcache: Cache schemas in a shared memcached server.
Filters

A use-case exists where the values within the output message should differ from the input given some set of defined rules. For example, Personally Identifiable Information(PII) may want to be removed from a message before it is piped into a sink. For this reason, we have supplied Filters.

Here's an example where we have defined a custom annotation to mark fields as sensitive:

syntax = "proto3";
package foo.v1;
// ...
extend google.protobuf.FieldOptions {
  bool sensitive = 30000;
}
// ...
message FooMessage {
  string name = 1 [(sensitive) = true];
}

We then use prototransform.Redact() to create a filter and supply it to our converter via its Filters field:

...
isSensitive := func (in protoreflect.FieldDescriptor) bool {
    return proto.GetExtension(in.Options(), foov1.E_Sensitive).(bool)
}
filter := prototransform.Redact(isSensitive)
converter.Filters = prototransform.Filters{filter}
...

Now, any attribute marked as "sensitive" will be omitted from the output produced by the converter.

This package also includes a predicate named HasDebugRedactOption that can be used to redact data for fields that have the debug_redact standard option set (this option was introduced in protoc v22.0).

Community

For help and discussion around Protobuf, best practices, and more, join us on Slack.

Status

This project is currently in alpha. The API should be considered unstable and likely to change.

Offered under the Apache 2 license.

Documentation

Overview

Package prototransform will fetch purpose-built descriptor sets on the run and easily converting protobuf messages into human-readable formats.

Use the prototransform library to simplify your data transformation & collection. Our simple package allows the caller to convert a given message data blob from one format to another by referring to a type schema on the Buf Schema Registry.

The package supports to and from Binary, JSON and Text formats out of the box, extensible for other/custom formats also.

The Buf Schema Registry Schema API builds an integration that can easily make use of your protobuf messages in new ways. This package will reduce your serialization down to exactly what you need and forget about everything else

Some advantages of using prototransform include: Automatic version handling, No baking proto files into containers, No flaky fetching logic, get only the descriptors you need.

Example
package main

import (
	"context"
	"encoding/hex"
	"fmt"
	"log"
	"time"

	"github.com/bufbuild/prototransform"
	"google.golang.org/protobuf/encoding/protojson"
	"google.golang.org/protobuf/proto"
)

var inputData = []byte(`{"sentence": "I feel happy."}`)

const (
	messageName = "buf.connect.demo.eliza.v1.SayRequest"
	moduleName  = "buf.build/bufbuild/eliza"
)

func main() {
	token, err := prototransform.BufTokenFromEnvironment(moduleName)
	if err != nil {
		log.Fatalf("Failed to get token from environment: %v\n"+
			"For help with authenticating with the Buf Schema Registry visit: https://docs.buf.build/bsr/authentication",
			err)
	}
	// Supply auth credentials to the BSR
	client := prototransform.NewDefaultFileDescriptorSetServiceClient(token)
	// Configure the module for schema watcher
	cfg := &prototransform.SchemaWatcherConfig{
		SchemaPoller: prototransform.NewSchemaPoller(
			client,
			moduleName, // BSR module
			"main",     // tag or draft name or leave blank for "latest"
		),
	}
	ctx := context.Background()
	schemaWatcher, err := prototransform.NewSchemaWatcher(ctx, cfg)
	if err != nil {
		log.Fatalf("failed to create schema watcher: %v", err)
		return
	}
	ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
	defer cancel()
	if err := schemaWatcher.AwaitReady(ctx); err != nil {
		log.Fatalf("schema watcher never became ready: %v", err)
		return
	}
	converter := &prototransform.Converter{
		Resolver:     schemaWatcher,
		InputFormat:  prototransform.JSONInputFormat(protojson.UnmarshalOptions{}),
		OutputFormat: prototransform.BinaryOutputFormat(proto.MarshalOptions{}),
	}
	convertedMessage, err := converter.ConvertMessage(messageName, inputData)
	if err != nil {
		log.Fatalf("Converting message: %v\n", err)
		return
	}
	fmt.Printf("Converted message: 0x%s\n", hex.EncodeToString(convertedMessage))
}
Output:

Converted message: 0x0a0d49206665656c2068617070792e

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrSchemaWatcherStopped is an error returned from the AwaitReady method
	// that indicates the schema watcher was stopped before it ever became ready.
	ErrSchemaWatcherStopped = errors.New("SchemaWatcher was stopped")
	// ErrSchemaWatcherNotReady is an error returned from the various Find*
	// methods of SchemaWatcher an initial schema has not yet been downloaded (or
	// loaded from cache).
	ErrSchemaWatcherNotReady = errors.New("SchemaWatcher not ready")
)
View Source
var ErrLeaseStateNotYetKnown = errors.New("haven't completed initial lease check yet")

ErrLeaseStateNotYetKnown is an error that may be returned by Lease.IsHeld to indicate that the leaser has not yet completed querying for the lease's initial state.

View Source
var (
	// ErrSchemaNotModified is an error that may be returned by a SchemaPoller to
	// indicate that the poller did not return any descriptors because the caller's
	// cached version is still sufficiently fresh.
	ErrSchemaNotModified = errors.New("no response because schema not modified")
)

Functions

func BufTokenFromEnvironment

func BufTokenFromEnvironment(moduleRef string) (string, error)

BufTokenFromEnvironment returns a token that can be used to download the given module from the BSR by inspecting the BUF_TOKEN environment variable. The given moduleRef can be a full module reference, with or without a version, or it can just be the domain of the BSR.

func HasDebugRedactOption

func HasDebugRedactOption(fd protoreflect.FieldDescriptor) bool

HasDebugRedactOption returns a function that can be used as a predicate, with Redact, to omit fields where the `debug_redact` field option is set to true.

message UserDetails {
  int64 user_id = 1;
  string name = 2;
  string email = 4;
  string ssn = 3 [debug_redact=true]; // social security number is sensitive
}

func NewAuthInterceptor

func NewAuthInterceptor(token string) connect.Interceptor

NewAuthInterceptor accepts a token for a Buf Schema Registry (BSR) and returns an interceptor which can be used when creating a Connect client so that every RPC to the BSR is correctly authenticated.

To understand more about authenticating with the BSR visit: https://docs.buf.build/bsr/authentication

To get a token from the environment (e.g. BUF_TOKEN env var), see BufTokenFromEnvironment.

func NewDefaultFileDescriptorSetServiceClient

func NewDefaultFileDescriptorSetServiceClient(token string) reflectv1beta1connect.FileDescriptorSetServiceClient

NewDefaultFileDescriptorSetServiceClient will create an authenticated connection to the public Buf Schema Registry (BSR) at https://buf.build. If the given token is empty, the BUF_TOKEN environment variable will be consulted.

If you require a connection to a different BSR instance, create your own reflectv1beta1connect.FileDescriptorSetServiceClient. You can use NewAuthInterceptor to configure authentication credentials. Also keep in mind that BSR instances support conditional GET requests for the endpoint in question, so also use connect.WithHTTPGet to enable that, which will typically eliminate unnecessary re-downloads of a schema. (It may not eliminate them if you are filtering the schema by a large number of types such that the entire request cannot fit in the URL of a GET request.)

For help with authenticating with the Buf Schema Registry visit: https://docs.buf.build/bsr/authentication

Types

type Cache

type Cache interface {
	Load(ctx context.Context, key string) ([]byte, error)
	Save(ctx context.Context, key string, data []byte) error
}

Cache can be implemented and supplied to prototransform for added guarantees in environments where uptime is critical. If present and the API call to retrieve a schema fails, the schema will instead be loaded from this cache. Whenever a new schema is downloaded from the BSR, it will be saved to the cache. Cache can be used from multiple goroutines and thus must be thread-safe.

type Converter

type Converter struct {
	// A custom [Resolver] can be supplied with the InputFormat [Unmarshaler] and
	// OutputFormat [Marshaler] for looking up types when expanding
	// google.protobuf.Any messages. As such, this is likely only needed in cases
	// where extensions may be present. For [proto], [protojson], and [prototext]
	// marshalers and unmarshalers are already handled so there is no need to
	// provide a WithResolver method. If nil, this defaults to using
	// protoregistry.GlobalTypes.
	Resolver Resolver
	// InputFormat handles unmarshaling bytes from the expected input format.
	// You can use a [proto.Unmarshaler], [protojson.Unmarshaler], or
	// [prototext.Unmarshaler] as a value for this field. You can also supply
	// your own custom format that implements the [Unmarshaler] interface. If
	// your custom format needs a [Resolver] (e.g. to resolve types in a
	// google.protobuf.Any message or to resolve extensions), then your custom
	// type should provide a method with the following signature:
	//     WithResolver(Resolver) Unmarshaler
	// This method should return a new unmarshaler that will make use of the
	// given resolver.
	InputFormat InputFormat
	// OutputFormat handles marshaling to bytes in the desired output format.
	// You can use a [proto.Marshaler], [protojson.Marshaler], or
	// [prototext.Marshaler] as a value for this field. You can also supply
	// your own custom format that implements the [Marshaler] interface. If
	// your custom format needs a [Resolver] (e.g. to format types in a
	// google.protobuf.Any message), then your custom type should provide
	// a method with the following signature:
	//     WithResolver(Resolver) Marshaler
	// This method should return a new marshaler that will make use of the
	// given resolver.
	OutputFormat OutputFormat
	// Filters are a set of user-supplied actions which will be performed on a
	// [ConvertMessage] call before the conversion takes place, meaning the
	// output value can be modified according to some set of rules.
	Filters Filters
}

Converter allows callers to convert byte payloads from one format to another.

func (*Converter) ConvertMessage

func (c *Converter) ConvertMessage(messageName string, inputData []byte) ([]byte, error)

ConvertMessage allows the caller to convert a given message data blob from one format to another by referring to a type schema for the blob.

type Filter

Filter provides a way for user-provided logic to alter the message being converted. It can return a derived message (which could even be a different type), or it can mutate the given message and return it.

func Redact

func Redact(predicate func(protoreflect.FieldDescriptor) bool) Filter

Redact returns a Filter that will remove information from a message. It invokes the given predicate for each field in the message (including in any nested messages) and _removes_ the field and corresponding value if the predicate returns true. This can be used to remove sensitive data from a message, for example.

type Filters

type Filters []Filter

Filters is a slice of filters. When there is more than one element, they are applied in order. In other words, the first filter is evaluated first. The result of that is then provided as input to the second, and so on.

type InputFormat

type InputFormat interface {
	WithResolver(Resolver) Unmarshaler
}

InputFormat provides the interface to supply the Converter with an input composition. The format provided must accept a Resolver. Includes WithResolver() method to return the Unmarshaler with Resolver Supplied.

func BinaryInputFormat

func BinaryInputFormat(in proto.UnmarshalOptions) InputFormat

BinaryInputFormat convenience method for binary input format.

func InputFormatWithoutResolver

func InputFormatWithoutResolver(in Unmarshaler) InputFormat

InputFormatWithoutResolver convenience method for input format without resolver.

func JSONInputFormat

func JSONInputFormat(in protojson.UnmarshalOptions) InputFormat

JSONInputFormat convenience method for JSON input format.

func TextInputFormat

func TextInputFormat(in prototext.UnmarshalOptions) InputFormat

TextInputFormat convenience method for text input format.

type Lease added in v0.2.0

type Lease interface {
	// IsHeld returns whether the current process holds the
	// lease. If it returns an error, then it is not known who
	// holds the lease, and the error indicates why not. Polling
	// for a schema will be suspended unless/until this method
	// returns (true, nil).
	IsHeld() (bool, error)
	// SetCallbacks configures the given functions to be called
	// when the lease is acquired or released. The initial state
	// of a lease is "not held". So if the lease is not held at
	// the time this method is invoked, neither callback is
	// invoked. But if the lease IS held at the time this method
	// is invoked, the onAcquire callback will be immediately
	// invoked. A lease must synchronize invocations of the
	// callbacks so that there will never be multiple concurrent
	// calls.
	SetCallbacks(onAcquire, onRelease func())
	// Cancel cancels this lease and frees any associated
	// resources (which may include background goroutines). If
	// the lease is currently held, it will be immediately
	// released, and any onRelease callback will be invoked.
	// IsHeld will return false from that moment. If the
	// same lease needs to be re-acquired later, use the
	// Leaser to create a new lease with the same name.
	Cancel()
}

Lease represents a long-lived distributed lease. This allows the current process to query if the lease is currently held or not as well as to configure callbacks for when the lease is acquired or released.

type Leaser added in v0.2.0

type Leaser interface {
	// NewLease tries to acquire the given lease name. This
	// returns a lease object, which represents the state of
	// the new lease, and whether the current process holds
	// it or not.
	//
	// Implementations should monitor the lease store so that
	// if the lease is not held but suddenly becomes available
	// (e.g. current leaseholder releases it or crashes),
	// another process can immediately pick it up. The given
	// leaseHolder bytes represent the current process and may
	// be persisted in the lease store if necessary. This is
	// particularly useful if the lease store has no other way
	// to identify connected clients or entry "owners", in which
	// case a lease implementation can compare the persisted
	// lease state to this value to determine if the current
	// client holds the lease.
	//
	// This may start background goroutines. In order to release
	// any such resources associated with the lease, callers must
	// call Lease.Cancel() or cancel the given context.
	NewLease(ctx context.Context, leaseName string, leaseHolder []byte) Lease
}

Leaser provides access to long-lived distributed leases, for leader election or distributed locking. This can be used by a SchemaWatcher so that only a single "leader" process polls the remote source for a schema and the others ("followers") just get the latest schema from a shared cache.

type Marshaler

type Marshaler interface {
	Marshal(proto.Message) ([]byte, error)
}

Marshaler is a Marshaler.

type OutputFormat

type OutputFormat interface {
	WithResolver(Resolver) Marshaler
}

OutputFormat provides the interface to supply the Converter with an output composition. The format provided must accept a Resolver. Includes WithResolver() method to return the Marshaler with Resolver Supplied.

func BinaryOutputFormat

func BinaryOutputFormat(in proto.MarshalOptions) OutputFormat

BinaryOutputFormat convenience method for binary output format.

func JSONOutputFormat

func JSONOutputFormat(in protojson.MarshalOptions) OutputFormat

JSONOutputFormat convenience method for JSON output format.

func OutputFormatWithoutResolver

func OutputFormatWithoutResolver(in Marshaler) OutputFormat

OutputFormatWithoutResolver convenience method for output format without resolver.

func TextOutputFormat

func TextOutputFormat(in prototext.MarshalOptions) OutputFormat

TextOutputFormat convenience method for text output format.

type Resolver

type Resolver interface {
	protoregistry.ExtensionTypeResolver
	protoregistry.MessageTypeResolver

	// FindEnumByName looks up an enum by its full name.
	// E.g., "google.protobuf.Field.Kind".
	//
	// This returns (nil, NotFound) if not found.
	FindEnumByName(enum protoreflect.FullName) (protoreflect.EnumType, error)
}

Resolver is used to resolve symbol names and numbers into schema definitions.

type SchemaPoller

type SchemaPoller interface {
	// GetSchema polls for a schema. The given symbols may be used to filter
	// the schema to return a smaller result. The given currentVersion, if not
	// empty, indicates the version that the caller already has fetched and
	// cached. So if that is still the current version of the schema (nothing
	// newer to download), the implementation may return an ErrSchemaNotModified
	// error.
	GetSchema(ctx context.Context, symbols []string, currentVersion string) (descriptors *descriptorpb.FileDescriptorSet, version string, err error)
	// GetSchemaID returns a string that identifies the schema that it fetches.
	// For a BSR module, for example, this might be "buf.build/owner/module:version".
	GetSchemaID() string
}

SchemaPoller polls for descriptors from a remote source. See NewSchemaPoller.

func NewSchemaPoller

func NewSchemaPoller(
	client reflectv1beta1connect.FileDescriptorSetServiceClient,
	module string,
	version string,
) SchemaPoller

NewSchemaPoller returns a SchemaPoller that uses the given Buf Reflection API client to download descriptors for the given module. If the given version is non-empty, the descriptors will be downloaded from that version of the module.

The version should either be blank or indicate a tag that may change over time, such as a draft name. If a fixed tag or commit is provided, then the periodic polling is unnecessary since the schema for such a version is immutable.

To create a client that can download descriptors from the buf.build public BSR, see NewDefaultFileDescriptorSetServiceClient.

type SchemaWatcher

type SchemaWatcher struct {
	// contains filtered or unexported fields
}

SchemaWatcher watches a schema in a remote registry by periodically polling. It implements the Resolver interface using the most recently downloaded schema. As schema changes are pushed to the remote registry, the watcher will incorporate the changes by downloading each change via regular polling.

func NewSchemaWatcher

func NewSchemaWatcher(ctx context.Context, config *SchemaWatcherConfig) (*SchemaWatcher, error)

NewSchemaWatcher creates a new SchemaWatcher for the given SchemaWatcherConfig.

The config is first validated to ensure all required attributes are provided. A non-nil error is returned if the configuration is not valid.

If the configuration is valid, a SchemaWatcher is returned, and the configured SchemaPoller is used to download a schema. The schema will then be periodically re-fetched based on the configured polling period. Either the Stop() method of the SchemaWatcher must be called or the given ctx must be cancelled to release resources and stop the periodic polling.

This function returns immediately, even before a schema has been initially downloaded. If the Find* methods on the returned watcher are called before an initial schema has been downloaded, they will return ErrSchemaWatcherNotReady. Use the SchemaWatcher.AwaitReady method to make sure the watcher is ready before use.

If the SchemaWatcher.Stop() method is called or the given ctx is cancelled, polling for an updated schema aborts. The SchemaWatcher may still be used after this, but it will be "frozen" using its most recently downloaded schema. If no schema was ever successfully downloaded, it will be frozen in a bad state and methods will return ErrSchemaWatcherNotReady.

func (*SchemaWatcher) AwaitReady

func (s *SchemaWatcher) AwaitReady(ctx context.Context) error

AwaitReady returns a non-nil error when s has downloaded a schema and is ready for use. If the given context is cancelled (or has a deadline that elapses) before s is ready, a non-nil error is returned. If an error occurred while trying to download a schema, that error will be returned at that time. If no error has yet occurred (e.g. the context was cancelled before a download attempt finished), this will return the context error.

Even if an error is returned, the SchemaWatcher will still be trying to download the schema. It will keep trying/polling until s.Stop is called or until the context passed to NewSchemaWatcher is cancelled.

func (*SchemaWatcher) FindDescriptorByName

func (s *SchemaWatcher) FindDescriptorByName(name protoreflect.FullName) (protoreflect.Descriptor, error)

FindDescriptorByName looks up a descriptor by the full name.

This uses the most recently downloaded schema.

func (*SchemaWatcher) FindEnumByName

func (s *SchemaWatcher) FindEnumByName(enum protoreflect.FullName) (protoreflect.EnumType, error)

FindEnumByName looks up an enum by its full name. E.g., "google.protobuf.Field.Kind".

Implements Resolver using the most recently downloaded schema.

func (*SchemaWatcher) FindExtensionByName

func (s *SchemaWatcher) FindExtensionByName(field protoreflect.FullName) (protoreflect.ExtensionType, error)

FindExtensionByName looks up an extension field by the field's full name. Note that this is the full name of the field as determined by where the extension is declared and is unrelated to the full name of the message being extended.

Implements Resolver using the most recently downloaded schema.

func (*SchemaWatcher) FindExtensionByNumber

func (s *SchemaWatcher) FindExtensionByNumber(message protoreflect.FullName, field protoreflect.FieldNumber) (protoreflect.ExtensionType, error)

FindExtensionByNumber looks up an extension field by the field number within some parent message, identified by full name.

Implements Resolver using the most recently downloaded schema.

func (*SchemaWatcher) FindFileByPath

func (s *SchemaWatcher) FindFileByPath(path string) (protoreflect.FileDescriptor, error)

FindFileByPath looks up a file by the path.

This uses the most recently downloaded schema.

func (*SchemaWatcher) FindMessageByName

func (s *SchemaWatcher) FindMessageByName(message protoreflect.FullName) (protoreflect.MessageType, error)

FindMessageByName looks up a message by its full name. E.g., "google.protobuf.Any"

Implements Resolver using the most recently downloaded schema.

func (*SchemaWatcher) FindMessageByURL

func (s *SchemaWatcher) FindMessageByURL(url string) (protoreflect.MessageType, error)

FindMessageByURL looks up a message by a URL identifier. See documentation on google.protobuf.Any.type_url for the URL format.

Implements Resolver using the most recently downloaded schema.

func (*SchemaWatcher) IsStopped

func (s *SchemaWatcher) IsStopped() bool

func (*SchemaWatcher) LastResolved

func (s *SchemaWatcher) LastResolved() (bool, time.Time)

LastResolved returns the time that a schema was last successfully downloaded. If the boolean value is false, the watcher is not yet ready and no schema has yet been successfully downloaded. Otherwise, the returned time indicates when the schema was downloaded. If the schema is loaded from a cache, the timestamp will indicate when that cached schema was originally downloaded.

This can be used for staleness heuristics if a partition occurs that makes the remote registry unavailable. Under typical operations when no failures are occurring, the maximum age will up to the configured polling period plus the latency of the RPC to the remote registry.

func (*SchemaWatcher) RangeFiles

func (s *SchemaWatcher) RangeFiles(f func(protoreflect.FileDescriptor) bool)

RangeFiles iterates over all registered files while f returns true. The iteration order is undefined.

This uses a snapshot of the most recently downloaded schema. So if the schema is updated (via concurrent download) while iterating, f will only see the contents of the older schema.

If the s is not yet ready, this will not call f at all and instead immediately return. This does not return an error so that the signature matches the method of the same name of *protoregistry.Files, allowing *SchemaWatcher to provide the same interface.

func (*SchemaWatcher) RangeFilesByPackage

func (s *SchemaWatcher) RangeFilesByPackage(name protoreflect.FullName, f func(protoreflect.FileDescriptor) bool)

RangeFilesByPackage iterates over all registered files in a given proto package while f returns true. The iteration order is undefined.

This uses a snapshot of the most recently downloaded schema. So if the schema is updated (via concurrent download) while iterating, f will only see the contents of the older schema.

If the s is not yet ready, this will not call f at all and instead immediately return. This does not return an error so that the signature matches the method of the same name of *protoregistry.Files, allowing *SchemaWatcher to provide the same interface.

func (*SchemaWatcher) ResolveNow

func (s *SchemaWatcher) ResolveNow()

ResolveNow tells the watcher to poll for a new schema immediately instead of waiting until the next scheduled time per the configured polling period.

func (*SchemaWatcher) ResolvedSchema added in v0.4.0

func (s *SchemaWatcher) ResolvedSchema() *descriptorpb.FileDescriptorSet

ResolvedSchema returns the resolved schema in the form of a FileDescriptorSet. Until AwaitReady returns a non-nil status, the return value of this function can be nil. The caller must not mutate the returned file descriptor set. Clone the returned file descriptor set using proto.Clone before performing mutations on it.

func (*SchemaWatcher) Stop

func (s *SchemaWatcher) Stop()

Stop the SchemaWatcher from polling the BSR for new schemas. Can be called multiple times safely.

type SchemaWatcherConfig

type SchemaWatcherConfig struct {
	// The downloader of descriptors. See [NewSchemaPoller].
	SchemaPoller SchemaPoller
	// The symbols that should be included in the downloaded schema. These must be
	// the fully-qualified names of elements in the schema, which can include
	// packages, messages, enums, extensions, services, and methods. If specified,
	// the downloaded schema will only include descriptors to describe these symbols.
	// If left empty, the entire schema will be downloaded.
	IncludeSymbols []string
	// The period of the polling the BSR for new versions is specified by the
	// PollingPeriod argument. The PollingPeriod will adjust the time interval.
	// The duration must be greater than zero; if not, [NewSchemaWatcher] will
	// return an error. If unset and left zero, a default period of 5 minutes
	// is used.
	PollingPeriod time.Duration
	// A number between 0 and 1 that represents the amount of jitter to add to
	// the polling period. A value of zero means no jitter. A value of one means
	// up to 100% jitter, so the actual period would be between 0 and 2*PollingPeriod.
	// To prevent self-synchronization (and thus thundering herds) when there are
	// multiple pollers, a value of 0.1 to 0.3 is typical.
	Jitter float64
	// If Cache is non-nil, it is used for increased robustness, even in the
	// face of the remote schema registry being unavailable. If non-nil and the
	// API call to initially retrieve a schema fails, the schema will instead
	// be loaded from this cache. Whenever a new schema is downloaded from the
	// remote registry, it will be saved to the cache. So if the process is
	// restarted and the remote registry is unavailable, the latest cached schema
	// can still be used.
	Cache Cache
	// If Leaser is non-nil, it is used to decide whether the current process
	// can poll for the schema. Cache must be non-nil. This is useful when the
	// schema source is a remote process, and the current process is replicated
	// (e.g. many instances running the same workload, for redundancy and/or
	// capacity). This prevents all the replicas from polling. Instead, a single
	// replica will "own" the lease and poll for the schema. It will then store
	// the downloaded schema in the shared cache. A replica that does not have
	// the lease will look only in the cache instead of polling the remote
	// source.
	Leaser Leaser
	// CurrentProcess is an optional identifier for the current process. This
	// is only used if Leaser is non-nil. If present, this value is used to
	// identify the current process as the leaseholder. If not present, a
	// default value will be computed using the current process's PID and the
	// host name and network addresses of the current host. If present, this
	// value must be unique for all other processes that might try to acquire
	// the same lease.
	CurrentProcess []byte
	// OnUpdate is an optional callback that will be invoked when a new schema
	// is fetched. This can be used by an application to take action when a new
	// schema becomes available.
	OnUpdate func(*SchemaWatcher)
	// OnError is an optional callback that will be invoked when a schema cannot
	// be fetched. This could be due to the SchemaPoller returning an error or
	// failure to convert the fetched descriptors into a resolver.
	OnError func(*SchemaWatcher, error)
}

SchemaWatcherConfig contains the configurable attributes of the SchemaWatcher.

type Unmarshaler

type Unmarshaler interface {
	Unmarshal([]byte, proto.Message) error
}

Unmarshaler is a Unmarshaler.

Directories

Path Synopsis
cache
filecache
Package filecache provides an implementation of prototransform.Cache that is based on the file system.
Package filecache provides an implementation of prototransform.Cache that is based on the file system.
memcache
Package memcache provides an implementation of prototransform.Cache that is backed by a memcached instance: https://memcached.org/.
Package memcache provides an implementation of prototransform.Cache that is backed by a memcached instance: https://memcached.org/.
rediscache
Package rediscache provides an implementation of prototransform.Cache that is backed by a Redis instance: https://redis.io/.
Package rediscache provides an implementation of prototransform.Cache that is backed by a Redis instance: https://redis.io/.
internal
memcacheleaser
Package memcacheleaser provides an implementation of prototransform.Leaser that is backed by a memcached instance: https://memcached.org/.
Package memcacheleaser provides an implementation of prototransform.Leaser that is backed by a memcached instance: https://memcached.org/.
redisleaser
Package redisleaser provides an implementation of prototransform.Leaser that is backed by a Redis instance: https://redis.io/.
Package redisleaser provides an implementation of prototransform.Leaser that is backed by a Redis instance: https://redis.io/.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL