kafka_schema

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2020 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package kafka_schema provides mechanisms for accessing avro schemata stored in Kafka Schemata are addressed by uuid. All other kinds of addressing mechanisms are built on top of this.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Alias

type Alias string

Alias is a plain text name for a schema. Aliases are used for plain text addressing of schemata.

func (Alias) String

func (a Alias) String() string

String casts Alias to string.

type AliasDTO

type AliasDTO struct {
	Alias Alias     `json:"alias"`
	UUID  uuid.UUID `json:"uuid"`
}

AliasDTO is used by the explorer to encode its response body.

type AliasListDTO

type AliasListDTO struct {
	Aliases []Alias `json:"aliases"`
	Count   int     `json:"count"`
}

AliasListDTO is used by the explorer to encode its response body.

type AliasMap

type AliasMap struct {
	catchall.ConcurrentObservable
	Map aliasMapType
}

SchemaMap is a KeyObservable map of Aliases to UUIDs

func NewAliasMap

func NewAliasMap() AliasMap

NewAliasMap constructs an empty AliasMap with no observers.

func (AliasMap) Insert

func (m AliasMap) Insert(alias Alias, schemaUUID uuid.UUID) bool

Upsert inserts or updates an Alias, UUID pair into the map. All of the map's observers are notified of this change. It returns true, if the map entry did already exist and was overwritten.

type AliasRepo

type AliasRepo interface {
	// WhoIs looks up an alias and returns the associated schema's uuid
	WhoIs(alias Alias) (uuid.UUID, bool)
	// WaitAliasReady returns a channel that can be used to wait for an alias and the associated schema to become ready.
	// This works analogous to func Repo.WaitSchemaReady.
	WaitAliasReady(alias Alias) chan bool
	// ListAliases returns a slice of all aliases that are currently available.
	// Note that this may not represent the actual state stored in Kafka, since there
	// is some lag between the time when an alias change arrives in Kafka and the
	// time the Repo implementation has consumed the change.
	ListAliases() []Alias
}

AliasRepo provides high-level access to schemata by their aliases

type AliasRequest

type AliasRequest struct {
	UUID  uuid.UUID `json:"UUID"`
	Alias string    `json:"alias"`
}

AliasRequest sets the given Alias to equal the given UUID.

type AliasesDTO

type AliasesDTO struct {
	Aliases []AliasDTO `json:"aliases"`
}

AliasesDTO is used by the explorer to encode its response body.

type Commander

type Commander struct {
	*core.Producer
}

Commander is a KafkaProducer used for writing schema updates into Kafka.

func (Commander) UpdateAlias

func (cmd Commander) UpdateAlias(alias string, schemaUUID uuid.UUID) error

func (Commander) UpdateSchema

func (cmd Commander) UpdateSchema(schemaUUID uuid.UUID, specification string) error

type LocalRepo

type LocalRepo struct {
	Schemata SchemaMap
	Aliases  AliasMap
	core.TopicRouter
}

LocalRepo is a local Kafka consumer that implements the various schema.*Repo interfaces.

func NewLocalRepo

func NewLocalRepo(broker string) (LocalRepo, error)

NewLocalRepo constructs a LocalRepo configured for the specified Kafka broker. Note that since the repo is a Consumer, it needs to be started with Run() before it starts consuming.

func (LocalRepo) Count

func (repo LocalRepo) Count() int

func (LocalRepo) Decode

func (repo LocalRepo) Decode(schema uuid.UUID, datum []byte) (interface{}, error)

func (LocalRepo) DecodeVersion

func (repo LocalRepo) DecodeVersion(schema NameVersion, datum []byte) (interface{}, error)

func (LocalRepo) Encode

func (repo LocalRepo) Encode(schema uuid.UUID, datum interface{}) ([]byte, error)

func (LocalRepo) EncodeVersion

func (repo LocalRepo) EncodeVersion(schema NameVersion, datum interface{}) ([]byte, error)

func (LocalRepo) GetSpecification

func (repo LocalRepo) GetSpecification(schema uuid.UUID) (string, bool)

func (LocalRepo) ListAliases

func (repo LocalRepo) ListAliases() []Alias

func (LocalRepo) ListSchemata

func (repo LocalRepo) ListSchemata() []uuid.UUID

func (LocalRepo) WaitAliasReady

func (repo LocalRepo) WaitAliasReady(alias Alias) chan bool

func (LocalRepo) WaitSchemaReady

func (repo LocalRepo) WaitSchemaReady(schema uuid.UUID) chan bool

func (LocalRepo) WaitVersionReady

func (repo LocalRepo) WaitVersionReady(schema NameVersion) chan bool

func (LocalRepo) WhoIs

func (repo LocalRepo) WhoIs(alias Alias) (uuid.UUID, bool)

type NameVersion

type NameVersion struct {
	// Name is the plain text address of the thing.
	Name string
	// Version is the version number of the thing.
	// The version number starts at 0 and increases linearly by 1.
	Version uint
}

NameVersion represents a plain text addressable thing that is linearly versioned.

func NewVersionOrigin

func NewVersionOrigin(alias string) NameVersion

func VersionFromAlias

func VersionFromAlias(a Alias) (NameVersion, error)

VersionFromAlias unmarshals a Version from Alias.

func VersionFromString

func VersionFromString(s string) (NameVersion, error)

VersionFromString unmarshals a Version from string. The format is {NAME}-v{VERSION}.

func (NameVersion) Alias

func (v NameVersion) Alias() Alias

func (NameVersion) GetName

func (v NameVersion) GetName() string

func (NameVersion) GetNext

func (v NameVersion) GetNext() (Version, bool)

func (NameVersion) GetPrevious

func (v NameVersion) GetPrevious() (Version, bool)

func (NameVersion) GetVersion

func (v NameVersion) GetVersion() uint

func (NameVersion) IsOrigin

func (v NameVersion) IsOrigin() bool

func (NameVersion) String

func (v NameVersion) String() string

type Repo

type Repo interface {
	// Decode decodes a datum with the given avro schema
	Decode(schema uuid.UUID, datum []byte) (interface{}, error)
	// Encode encodes a datum with the given avro schema
	Encode(schema uuid.UUID, datum interface{}) ([]byte, error)
	// WaitSchemaReady returns a channel that can be used to wait for a schema to become available.
	// Since the schemata are stored in Kafka, it might take the underlying implementation
	// a while until it has consumed all schema changes.
	WaitSchemaReady(schema uuid.UUID) chan bool
	// ListSchemata returns a slice of all uuids that are currently available.
	// Note that this may not represent the actual state stored in Kafka, since there
	// is some lag between the time when a schema change arrives in Kafka and the
	// time the Repo implementation has consumed the change.
	ListSchemata() []uuid.UUID
	// GetSpecification returns the avro specification for the given uuid in plain text.
	GetSpecification(schema uuid.UUID) (specification string, ok bool)
	// Count returns the number of schemata currently available
	Count() int
}

Repo provides high-level access to Schemata by their uuid

type SchemaDTO

type SchemaDTO struct {
	UUID          uuid.UUID `json:"uuid"`
	Specification string    `json:"spec"`
}

SchemaDTO is used by the explorer to encode its response body.

type SchemaListDTO

type SchemaListDTO struct {
	Count    int         `json:"count"`
	Schemata []uuid.UUID `json:"schemata"`
}

SchemaListDTO is used by the explorer to encode its response body.

type SchemaMap

type SchemaMap struct {
	catchall.ConcurrentObservable
	Map schemaMapType
}

SchemaMap is a KeyObservable map of UUIDs to Avro codecs

func NewSchemaMap

func NewSchemaMap() SchemaMap

NewSchemaMap constructs an empty SchemaMap with no observers.

func (SchemaMap) Upsert

func (m SchemaMap) Upsert(schemaUUID uuid.UUID, codec *goavro.Codec) bool

Upsert inserts or updates a UUID, Codec pair into the map. All of the map's observers are notified of this change. It returns true, if the map entry did already exist and was overwritten.

type SchemataDTO

type SchemataDTO struct {
	Schemata []SchemaDTO `json:"schemata"`
}

SchemataDTO is used by the explorer to encode its response body.

type UpdateRequest

type UpdateRequest struct {
	UUID uuid.UUID `json:"UUID"`
	Spec string    `json:"spec"`
}

UpdateRequest sets the given UUID to equal the given plain-text Avro spec.

type Updater

type Updater interface {
	// UpdateSchema sets the given UUID to equal the given plain-text Avro spec.
	UpdateSchema(schemaUUID uuid.UUID, specification string) error
	// UpdateAlias sets the given Alias to equal the given UUID.
	UpdateAlias(alias string, schemaUUID uuid.UUID) error
}

Updater encapsulates the methods required to update the schema repository stored in Kafka.

func NewUpdater

func NewUpdater(broker string) (Updater, error)

NewUpdater constructs an Updater that uses the given Kafka broker to write updates. This will create a new KafkaProducer.

func NewUpdaterWithProducer

func NewUpdaterWithProducer(p *core.Producer) Updater

NewUpdater constructs an Updater that uses the given KafkaProducer to produce its events. In most cases, it is fine to use NewUpdater instead and let it create a new KafkaProducer.

type Version

type Version interface {
	// IsOrigin indicates whether this is the first version of the thing.
	IsOrigin() bool
	// GetVersion returns the version number of the thing.
	GetVersion() uint
	// GetName returns the name i.e. the plain text address of the thing.
	GetName() string
	// GetPrevious returns the Version preceding this Version.
	// If this is the origin, it returns the origin unchanged.
	// The ok flag indicates whether the returned Version differs from the passed Version.
	GetPrevious() (v Version, ok bool)
	// GetNext returns the Version succeeding this Version.
	// If this is the most recent Version, it returns the Version unchanged.
	// The ok flag indicates whether the returned Version differs from the passed Version.
	GetNext() (v Version, ok bool)
	// String marshals the Version into a string.
	String() string
	// Alias marshals the Version into an Alias.
	Alias() Alias
}

Version provides a high-level interface for versioning operations on a plain-text addressable versioned thing.

type VersionedRepo

type VersionedRepo interface {
	// DecodeVersion decodes a datum using the specified schema at the specified version.
	DecodeVersion(schema NameVersion, datum []byte) (interface{}, error)
	// EncodeVersion encodes a datum using the specified schema at the specified version.
	EncodeVersion(schema NameVersion, datum interface{}) ([]byte, error)
	// WaitVersionReady returns a channel that can be used to wait for a schema to become available in the specified version.
	// This works analogous to func Repo.WaitSchemaReady.
	WaitVersionReady(schema NameVersion) chan bool
}

VersionedRepo provides high-level access to schemata by a using a versioning scheme.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL