kafka

package
v0.0.0-...-2a2fe1c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 19, 2022 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// EncodingJSON is used for spans encoded as Protobuf-based JSON.
	EncodingJSON = "json"
	// EncodingProto is used for spans encoded as Protobuf.
	EncodingProto = "protobuf"
	// EncodingZipkinThrift is used for spans encoded as Zipkin Thrift.
	EncodingZipkinThrift = "zipkin-thrift"
)

Variables

View Source
var (
	// AllEncodings is a list of all supported encodings.
	AllEncodings = []string{EncodingJSON, EncodingProto, EncodingZipkinThrift}
)

Functions

This section is empty.

Types

type Factory

type Factory struct {
	producer.Builder
	// contains filtered or unexported fields
}

Factory implements storage.Factory and creates write-only storage components backed by kafka.

func NewFactory

func NewFactory() *Factory

NewFactory creates a new Factory.

func (*Factory) AddFlags

func (f *Factory) AddFlags(flagSet *flag.FlagSet)

AddFlags implements plugin.Configurable

func (*Factory) Close

func (f *Factory) Close() error

Close closes the resources held by the factory

func (*Factory) CreateDependencyReader

func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error)

CreateDependencyReader implements storage.Factory

func (*Factory) CreateSpanReader

func (f *Factory) CreateSpanReader() (spanstore.Reader, error)

CreateSpanReader implements storage.Factory

func (*Factory) CreateSpanWriter

func (f *Factory) CreateSpanWriter() (spanstore.Writer, error)

CreateSpanWriter implements storage.Factory

func (*Factory) InitFromOptions

func (f *Factory) InitFromOptions(o Options)

InitFromOptions initializes factory from options.

func (*Factory) InitFromViper

func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger)

InitFromViper implements plugin.Configurable

func (*Factory) Initialize

func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error

Initialize implements storage.Factory

type JSONUnmarshaller

type JSONUnmarshaller struct{}

JSONUnmarshaller implements Unmarshaller

func NewJSONUnmarshaller

func NewJSONUnmarshaller() *JSONUnmarshaller

NewJSONUnmarshaller constructs a JSONUnmarshaller

func (*JSONUnmarshaller) Unmarshal

func (h *JSONUnmarshaller) Unmarshal(msg []byte) (*model.Span, error)

Unmarshal decodes a json byte array to a span

type Marshaller

type Marshaller interface {
	Marshal(*model.Span) ([]byte, error)
}

Marshaller encodes a span into a byte array to be sent to Kafka

type Options

type Options struct {
	Config   producer.Configuration `mapstructure:",squash"`
	Topic    string                 `mapstructure:"topic"`
	Encoding string                 `mapstructure:"encoding"`
}

Options stores the configuration options for Kafka

func (*Options) AddFlags

func (opt *Options) AddFlags(flagSet *flag.FlagSet)

AddFlags adds flags for Options

func (*Options) InitFromViper

func (opt *Options) InitFromViper(v *viper.Viper)

InitFromViper initializes Options with properties from viper

type ProtobufUnmarshaller

type ProtobufUnmarshaller struct{}

ProtobufUnmarshaller implements Unmarshaller

func NewProtobufUnmarshaller

func NewProtobufUnmarshaller() *ProtobufUnmarshaller

NewProtobufUnmarshaller constructs a ProtobufUnmarshaller

func (*ProtobufUnmarshaller) Unmarshal

func (h *ProtobufUnmarshaller) Unmarshal(msg []byte) (*model.Span, error)

Unmarshal decodes a protobuf byte array to a span

type SpanWriter

type SpanWriter struct {
	// contains filtered or unexported fields
}

SpanWriter writes spans to kafka. Implements spanstore.Writer

func NewSpanWriter

func NewSpanWriter(
	producer sarama.AsyncProducer,
	marshaller Marshaller,
	topic string,
	factory metrics.Factory,
	logger *zap.Logger,
) *SpanWriter

NewSpanWriter initiates and returns a new kafka spanwriter

func (*SpanWriter) Close

func (w *SpanWriter) Close() error

Close closes SpanWriter by closing producer

func (*SpanWriter) WriteSpan

func (w *SpanWriter) WriteSpan(ctx context.Context, span *model.Span) error

WriteSpan writes the span to kafka.

type Unmarshaller

type Unmarshaller interface {
	Unmarshal([]byte) (*model.Span, error)
}

Unmarshaller decodes a byte array to a span

type ZipkinThriftUnmarshaller

type ZipkinThriftUnmarshaller struct{}

ZipkinThriftUnmarshaller implements Unmarshaller

func NewZipkinThriftUnmarshaller

func NewZipkinThriftUnmarshaller() *ZipkinThriftUnmarshaller

NewZipkinThriftUnmarshaller constructs a zipkinThriftUnmarshaller

func (*ZipkinThriftUnmarshaller) Unmarshal

func (h *ZipkinThriftUnmarshaller) Unmarshal(msg []byte) (*model.Span, error)

Unmarshal decodes a json byte array to a span

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL