sr

package module
v0.0.0-...-5028cac Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2022 License: BSD-3-Clause Imports: 17 Imported by: 0

Documentation

Overview

Package sr provides a schema registry client and a helper type to encode values and decode data according to the schema registry wire format.

As mentioned on the Serde type, this package does not provide schema auto-discovery and type auto-decoding. To aid in strong typing and validated encoding/decoding, you must register IDs and values to how to encode or decode them.

The client does not automatically cache schemas, instead, the Serde type is used for the actual caching of IDs to how to encode/decode the IDs. The Client type itself simply speaks http to your schema registry and returns the results.

To read more about the schema registry, see the following:

https://docs.confluent.io/platform/current/schema-registry/develop/api.html

Index

Constants

View Source
const (
	// HideDeleted hides soft deleted schemas or subjects.
	HideDeleted = false
	// ShowDeleted shows soft deleted schemas or subjects.
	ShowDeleted = true
)
View Source
const (
	// SoftDelete performs a soft deletion.
	SoftDelete = false
	// HardDelete performs a hard deletion. Values must be soft deleted
	// before they can be hard deleted.
	HardDelete = true
)
View Source
const GlobalSubject = ""

GlobalSubject is a constant to make API usage of requesting global subjects clearer.

Variables

View Source
var (
	// ErrNotRegistered is returned from Serde when attempting to encode a
	// value or decode an ID that has not been registered, or when using
	// Decode with a missing new value function.
	ErrNotRegistered = errors.New("registration is missing for encode/decode")

	// ErrBadHeader is returned from Decode when the input slice is shorter
	// than five bytes, or if the first byte is not the magic 0 byte.
	ErrBadHeader = errors.New("5 byte header for value is missing or does no have 0 magic byte")
)

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client talks to a schema registry and contains helper functions to serialize and deserialize objects according to schemas.

func NewClient

func NewClient(opts ...Opt) (*Client, error)

NewClient returns a new schema registry client.

func (*Client) CheckCompatibility

func (cl *Client) CheckCompatibility(ctx context.Context, subject string, version int, s Schema) (bool, error)

CheckCompatibility checks if a schema is compatible with the given version that exists. You can use -1 to check compatibility with the latest version, and -2 to check compatibility against all versions.

func (*Client) CompatibilityLevel

func (cl *Client) CompatibilityLevel(ctx context.Context, subjects ...string) []CompatibilityResult

CompatibilityLevel returns the subject level and global level compatibility of each requested subject. The global level can be requested by using either an empty subject or by specifying no subjects.

func (*Client) CreateSchema

func (cl *Client) CreateSchema(ctx context.Context, subject string, s Schema) (SubjectSchema, error)

CreateSchema attempts to create a schema in the given subject.

func (*Client) DeleteSchema

func (cl *Client) DeleteSchema(ctx context.Context, subject string, version int, how DeleteHow) error

DeleteSubjects deletes the schema at the given version. You must soft delete a schema before it can be hard deleted. You can use -1 to delete the latest version.

func (*Client) DeleteSubject

func (cl *Client) DeleteSubject(ctx context.Context, subject string, how DeleteHow) ([]int, error)

DeleteSubjects deletes the subject. You must soft delete a subject before it can be hard deleted. This returns all versions that were deleted.

func (*Client) LookupSchema

func (cl *Client) LookupSchema(ctx context.Context, subject string, s Schema) (SubjectSchema, error)

LookupSchema checks to see if a schema is already registered and if so, returns its ID and version in the SubjectSchema.

func (*Client) Mode

func (cl *Client) Mode(ctx context.Context, subjects ...string) []ModeResult

Mode returns the subject and global mode of each requested subject. The global mode can be requested by using either an empty subject or by specifying no subjects.

func (*Client) ResetCompatibilityLevel

func (cl *Client) ResetCompatibilityLevel(ctx context.Context, subjects ...string) []CompatibilityResult

ResetCompatibilityLevel deletes any subject-level compatibility level and reverts to the global default.

func (*Client) ResetMode

func (cl *Client) ResetMode(ctx context.Context, subjects ...string) []ModeResult

ResetMode deletes any subject modes and reverts to the global default.

func (*Client) SchemaByVersion

func (cl *Client) SchemaByVersion(ctx context.Context, subject string, version int, deleted HideShowDeleted) (SubjectSchema, error)

SchemaByVersion returns the schema for a given subject and version. You can use -1 as the version to return the latest schema.

func (*Client) SchemaReferences

func (cl *Client) SchemaReferences(ctx context.Context, subject string, version int, deleted HideShowDeleted) ([]SubjectSchema, error)

SchemaReferences returns all schemas that references the input subject-version. You can use -1 to check the latest version.

func (*Client) SchemaTextByID

func (cl *Client) SchemaTextByID(ctx context.Context, id int) (string, error)

SchemaTextByID returns the actual text of a schema.

For example, if the schema for an ID is

"{\"type\":\"boolean\"}"

this will return

{"type":"boolean"}

func (*Client) SchemaUsagesByID

func (cl *Client) SchemaUsagesByID(ctx context.Context, id int, deleted HideShowDeleted) ([]SubjectSchema, error)

SchemaUsagesByID returns all usages of a given schema ID. A single schema's can be reused in many subject-versions; this function can be used to map a schema to all subject-versions that use it.

func (*Client) Schemas

func (cl *Client) Schemas(ctx context.Context, subject string, deleted HideShowDeleted) ([]SubjectSchema, error)

Schemas returns all schemas for the given subject.

func (*Client) SetCompatibilityLevel

func (cl *Client) SetCompatibilityLevel(ctx context.Context, level CompatibilityLevel, subjects ...string) []CompatibilityResult

SetCompatibilityLevel sets the compatibility level for each requested subject. The global level can be set by either using an empty subject or by specifying no subjects. If specifying no subjects, this returns one element.

func (*Client) SetMode

func (cl *Client) SetMode(ctx context.Context, mode Mode, force bool, subjects ...string) []ModeResult

SetMode sets the mode for each requested subject. The global mode can be set by either using an empty subject or by specifying no subjects. If specifying no subjects, this returns one element. Force can be used to force setting the mode even if the registry has existing schemas.

func (*Client) Subjects

func (cl *Client) Subjects(ctx context.Context, deleted HideShowDeleted) ([]string, error)

Subjects returns subjects available in the registry.

func (*Client) SupportedTypes

func (cl *Client) SupportedTypes(ctx context.Context) ([]SchemaType, error)

SupportedTypes returns the schema types that are supported in the schema registry.

type CompatibilityLevel

type CompatibilityLevel int

CompatibilityLevel as an enum representing config compatibility levels.

const (
	CompatNone CompatibilityLevel = 1 + iota
	CompatBackward
	CompatBackwardTransitive
	CompatForward
	CompatForwardTransitive
	CompatFull
	CompatFullTransitive
)

func (CompatibilityLevel) MarshalText

func (l CompatibilityLevel) MarshalText() ([]byte, error)

func (CompatibilityLevel) String

func (l CompatibilityLevel) String() string

func (*CompatibilityLevel) UnmarshalText

func (l *CompatibilityLevel) UnmarshalText(text []byte) error

type CompatibilityResult

type CompatibilityResult struct {
	Subject string             // The subject this compatibility result is for, or empty for the global level.
	Level   CompatibilityLevel // The subject (or global) compatibility level.
	Err     error              // The error received for getting this compatibility level.
}

CompatibilityResult is the compatibility level for a subject.

type DeleteHow

type DeleteHow bool

DeleteHow is a typed bool indicating how subjects or schemas should be deleted.

type HideShowDeleted

type HideShowDeleted bool

HideShowDeleted is a typed bool indicating whether queries should show or hide soft deleted schemas / subjects.

type Mode

type Mode int

Mode as an enum representing the "mode" of the registry or a subject.

const (
	ModeImport Mode = iota
	ModeReadOnly
	ModeReadWrite
)

func (Mode) MarshalText

func (m Mode) MarshalText() ([]byte, error)

func (Mode) String

func (m Mode) String() string

func (*Mode) UnmarshalText

func (m *Mode) UnmarshalText(text []byte) error

type ModeResult

type ModeResult struct {
	Subject string // The subject this mode result is for, or empty for the global mode.
	Mode    Mode   // The subject (or global) mode.
	Err     error  // The error received for getting this mode.
}

ModeResult is the mode for a subject.

type Opt

type Opt interface {
	// contains filtered or unexported methods
}

Opt is an option to configure a client.

func BasicAuth

func BasicAuth(user, pass string) Opt

BasicAuth sets basic authorization to use for every request.

func DialTLSConfig

func DialTLSConfig(c *tls.Config) Opt

DialTLSConfig sets a tls.Config to use in a the default http client.

func HTTPClient

func HTTPClient(httpcl *http.Client) Opt

HTTPClient sets the http client that the schema registry client uses, overriding the default client that speaks plaintext with a timeout of 5s.

func Normalize

func Normalize() Opt

Normalize sets the client to add the "?normalize=true" query parameter when getting or creating schemas. This can help collapse duplicate schemas into one, but can also be done with a configuration parameter on the schema registry itself.

func URLs

func URLs(urls ...string) Opt

URLs sets the URLs that the client speaks to, overriding the default http://localhost:8081. This option automatically prefixes any URL that is missing an http:// or https:// prefix with http://.

func UserAgent

func UserAgent(ua string) Opt

UserAgent sets the User-Agent to use in requests, overriding the default "franz-go".

type ResponseError

type ResponseError struct {
	// Method is the requested http method.
	Method string `json:"-"`
	// URL is the full path that was requested that resulted in this error.
	URL string `json:"-"`

	ErrorCode int    `json:"error_code"`
	Message   string `json:"message"`
}

ResponseError is the type returned from the schema registry for errors.

func (*ResponseError) Error

func (e *ResponseError) Error() string

type Schema

type Schema struct {
	// Schema is the actual unescaped text of a schema.
	Schema string `json:"schema"`

	// Type is the type of a schema. The default type is avro.
	Type SchemaType `json:"schemaType,omitempty"`

	// References declares other schemas this schema references. See the
	// docs on SchemaReference for more details.
	References []SchemaReference `json:"references,omitempty"`
}

Schema is the object form of a schema for the HTTP API.

type SchemaReference

type SchemaReference struct {
	Name    string `json:"name"`
	Subject string `json:"subject"`
	Version int    `json:"version"`
}

SchemaReference is a way for a one schema to reference another. The details for how referencing is done are type specific; for example, JSON objects that use the key "$ref" can refer to another schema via URL. For more details on references, see the following link:

https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#schema-references
https://docs.confluent.io/platform/current/schema-registry/develop/api.html

type SchemaType

type SchemaType int

SchemaType as an enum representing schema types. The default schema type is avro.

const (
	TypeAvro SchemaType = iota
	TypeProtobuf
	TypeJSON
)

func (SchemaType) MarshalText

func (t SchemaType) MarshalText() ([]byte, error)

func (SchemaType) String

func (t SchemaType) String() string

func (*SchemaType) UnmarshalText

func (t *SchemaType) UnmarshalText(text []byte) error

type Serde

type Serde struct {
	// contains filtered or unexported fields
}

Serde encodes and decodes values according to the schema registry wire format. A Serde itself does not perform schema auto-discovery and type auto-decoding. To aid in strong typing and validated encoding/decoding, you must register IDs and values to how to encode or decode them.

To use a Serde for encoding, you must pre-register schema ids and values you will encode, and then you can use the encode functions.

To use a Serde for decoding, you can either pre-register schema ids and values you will consume, or you can discover the schema every time you receive an ErrNotRegistered error from decode.

func (*Serde) AppendEncode

func (s *Serde) AppendEncode(b []byte, v any) ([]byte, error)

AppendEncode appends an encoded value to b according to the schema registry wire format and returns it. If EncodeFn was not used, this returns ErrNotRegistered.

func (*Serde) Decode

func (s *Serde) Decode(b []byte, v any) error

Decode decodes b into v. If DecodeFn option was not used, this returns ErrNotRegistered.

Serde does not handle references in schemas; it is up to you to register the full decode function for any top-level ID, regardless of how many other schemas are referenced in top-level ID.

func (*Serde) DecodeNew

func (s *Serde) DecodeNew(b []byte) (any, error)

DecodeNew is the same as Decode, but decodes into a new value rather than the input value. If DecodeFn was not used, this returns ErrNotRegistered. GenerateFn can be used to control the instantiation of a new value, otherwise this uses reflect.New(reflect.TypeOf(v)).Interface().

func (*Serde) Encode

func (s *Serde) Encode(v any) ([]byte, error)

Encode encodes a value according to the schema registry wire format and returns it. If EncodeFn was not used, this returns ErrNotRegistered.

func (*Serde) MustAppendEncode

func (s *Serde) MustAppendEncode(b []byte, v any) []byte

MustAppendEncode returns the value of AppendEncode, panicking on error. This is a shortcut for if your encode function cannot error.

func (*Serde) MustEncode

func (s *Serde) MustEncode(v any) []byte

MustEncode returns the value of Encode, panicking on error. This is a shortcut for if your encode function cannot error.

func (*Serde) Register

func (s *Serde) Register(id int, v any, opts ...SerdeOpt)

Register registers a schema ID and the value it corresponds to, as well as the encoding or decoding functions. You need to register functions depending on whether you are only encoding, only decoding, or both.

func (*Serde) SetDefaults

func (s *Serde) SetDefaults(opts ...SerdeOpt)

SetDefaults sets default options to apply to every registered type. These options are always applied first, so you can override them as necessary when registering.

This can be useful if you always want to use the same encoding or decoding functions.

type SerdeOpt

type SerdeOpt interface {
	// contains filtered or unexported methods
}

SerdeOpt is an option to configure a Serde.

func AppendEncodeFn

func AppendEncodeFn(fn func([]byte, any) ([]byte, error)) SerdeOpt

AppendEncodeFn allows Serde to encode a value to an existing slice. This can be more efficient than EncodeFn; this function is used if it exists.

func DecodeFn

func DecodeFn(fn func([]byte, any) error) SerdeOpt

DecodeFn allows Serde to decode into a value.

func EncodeFn

func EncodeFn(fn func(any) ([]byte, error)) SerdeOpt

EncodeFn allows Serde to encode a value.

func GenerateFn

func GenerateFn(fn func() any) SerdeOpt

GenerateFn returns a new(Value) that can be decoded into. This function can be used to control the instantiation of a new type for DecodeNew.

func Index

func Index(index ...int) SerdeOpt

Index attaches a message index to a value. A single schema ID can be registered multiple times with different indices.

This option supports schemas that encode many different values from the same schema (namely, protobuf). The index into the the schema to encode a particular message is specified with `index`.

NOTE: this option must be used for protobuf schemas.

For more information, see where `message-indexes` are described in:

https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format

type SubjectSchema

type SubjectSchema struct {
	// Subject is the subject for this schema. This usually corresponds to
	// a Kafka topic, and whether this is for a key or value. For example,
	// "foo-key" would be the subject for the foo topic for serializing the
	// key field of a record.
	Subject string `json:"subject"`

	// Version is the version of this subject.
	Version int `json:"version"`

	// ID is the globally unique ID of the schema.
	ID int `json:"id"`

	Schema
}

SubjectSchema pairs the subject, global identifier, and version of a schema with the schema itself.

func CommSubjectSchemas

func CommSubjectSchemas(l, r []SubjectSchema) (luniq, runiq, common []SubjectSchema)

CommSubjectSchemas splits l and r into three sets: what is unique in l, what is unique in r, and what is common in both. Duplicates in either map are eliminated.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL