protonats

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2021 License: MIT Imports: 26 Imported by: 0

README

= Cloud Events NATS protocol

Updated nats protocol for https://github.com/cloudevents/sdk-go/tree/main/protocol/nats/v2

This package extent `opentracing` package for trace carrier

Feature:

* OpenTracing features with "github.com/d7561985/tel" send trace `to` NATS, read tracing span `from` NATS
* Producer uses `context.TopicFrom` feature for overwrite default subject
* Consumer subject pool for group
* Protocol Consumer and Sender struct members are Interfaces and easily could be replaced
* Trace carrier cloudevents protocol this allow `TeleObservability` use as idempotent ;)

== Trace feature enable

We use `TeleObservability` correctly read span from NATS and pack it correctly.
But this is only like middleware.

All engine pack/unpack under `adapter`

[source,go]
----
package main

import (
	"github.com/d7561985/tel"
	"github.com/d7561985/tel/monitoring/metrics"
	cenats "github.com/cloudevents/sdk-go/protocol/nats/v2"
	cloudevents "github.com/cloudevents/sdk-go/v2"
	"github.com/cloudevents/sdk-go/v2/client"
	"github.com/d7561985/protonats"
)

func main()  {
	t := tel.New(tel.GetConfigFromEnv())

	p, err := protonats.NewProtocol(env.NATSServer, "-", "", cenats.NatsOptions())

	metricsss := metrics.NewCollectorMetricsReader()
     ce, err := cloudevents.NewClient(p,
         client.WithObservabilityService(protonats.NewTeleObservability(&t, metricsss)),
     )

}
----

== Consumer Subject Group pool

Use option for protocol - `WithConsumerOptions`

[source,go]
----
    p, err := protonats.NewProtocol(env.NATSServer, "-", "",
		cenats.NatsOptions(),
		protonats.WithConsumerOptions(
			protonats.WithQueuePoolSubscriber("MyQueue",
				"MySubject1", "MySubject2",
			),
		),
	)
----

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrEmptySubject = errors.New("empty subject list")
View Source
var (
	ErrTraceStateExtension = errors.New("cloudevents extension not contain key: " + extensions.TraceStateExtension)
)

Functions

func ExtractDistributedTracingExtension added in v0.2.0

func ExtractDistributedTracingExtension(ctx context.Context, event *cloudevents.Event) (opentracing.SpanContext, error)

ExtractDistributedTracingExtension extracts the tracecontext from the cloud event.

func InjectDistributedTracingExtension added in v0.2.0

func InjectDistributedTracingExtension(ctx context.Context, event *cloudevents.Event)

InjectDistributedTracingExtension injects the tracecontext from the context into the event as a DistributedTracingExtension

If a DistributedTracingExtension is present in the provided event, its current value is replaced with the tracecontext obtained from the context.

func NewSender

func NewSender(url, subject string, natsOpts []nats.Option, opts ...cn.SenderOption) (protocol.SendCloser, error)

NewSender creates a new protocol.Sender responsible for opening and closing the STAN connection

func NewTeleObservability added in v0.2.0

Types

type Consumer

type Consumer struct {
	NatsReceiver

	Conn       *nats.Conn
	Subject    string
	Subscriber Subscriber
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(url, subject string, natsOpts []nats.Option, opts ...ConsumerOption) (*Consumer, error)

func NewConsumerFromConn

func NewConsumerFromConn(conn *nats.Conn, subject string, opts ...ConsumerOption) (*Consumer, error)

func (*Consumer) Close

func (c *Consumer) Close(_ context.Context) error

func (*Consumer) OpenInbound

func (c *Consumer) OpenInbound(ctx context.Context) error

type ConsumerOption

type ConsumerOption func(*Consumer) error

func WithQueuePoolSubscriber

func WithQueuePoolSubscriber(queue string, subject ...string) ConsumerOption

WithQueuePoolSubscriber create subject list pool for specific queue

func WithQueueSubscriber

func WithQueueSubscriber(queue string) ConsumerOption

WithQueueSubscriber configures the Consumer to join a queue group when subscribing

type DrainList

type DrainList []Dryer

DrainList simple cleaner which on drain error note call left drains

func (DrainList) Drain

func (d DrainList) Drain() error

type Dryer

type Dryer interface {
	Drain() error
}

type NatsReceiver

type NatsReceiver interface {
	protocol.Receiver
}

func NewReceiver

func NewReceiver(ch <-chan *nats.Msg) NatsReceiver

type ObservabilityOption added in v0.2.0

type ObservabilityOption func(*TeleObservability)

func WithSpanAttributesGetter added in v0.2.0

func WithSpanAttributesGetter(attrGetter SpanAttrGetter) ObservabilityOption

WithSpanAttributesGetter appends the returned attributes from the function to the span.

func WithSpanNameFormatter added in v0.2.0

func WithSpanNameFormatter(nameFormatter SpanNameFormatter) ObservabilityOption

WithSpanNameFormatter replaces the default span name with the string returned from the function

type OpenerReceiverCloser

type OpenerReceiverCloser interface {
	protocol.Opener
	protocol.ReceiveCloser
}

type Protocol

type Protocol struct {
	Conn *nats.Conn

	Consumer OpenerReceiverCloser

	Sender protocol.SendCloser
	// contains filtered or unexported fields
}

Protocol is a reference implementation for using the CloudEvents binding integration. Protocol acts as both a NATS client and a NATS handler.

func NewProtocol

func NewProtocol(url, sendSubject, receiveSubject string, natsOpts []nats.Option, opts ...ProtocolOption) (*Protocol, error)

NewProtocol creates a new NATS protocol.

func NewProtocolFromConn

func NewProtocolFromConn(conn *nats.Conn, sendSubject, receiveSubject string, opts ...ProtocolOption) (*Protocol, error)

func (*Protocol) Close

func (p *Protocol) Close(ctx context.Context) error

Close implements Closer.Close

func (*Protocol) OpenInbound

func (p *Protocol) OpenInbound(ctx context.Context) error

func (*Protocol) Receive

func (p *Protocol) Receive(ctx context.Context) (binding.Message, error)

Receive implements Receiver.Receive

func (*Protocol) Send

func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) error

Send implements Sender.Send

type ProtocolOption

type ProtocolOption func(*Protocol) error

ProtocolOption is the function signature required to be considered an nats.ProtocolOption.

func WithConsumerOptions

func WithConsumerOptions(opts ...ConsumerOption) ProtocolOption

type QueueSubscriber

type QueueSubscriber struct {
	Queue string
}

QueueSubscriber creates queue subscriptions

func (*QueueSubscriber) Subscribe

func (s *QueueSubscriber) Subscribe(conn *nats.Conn, subject string, cn chan *nats.Msg) (Dryer, error)

Subscribe implements Subscriber.Subscribe

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

func (*Receiver) Receive

func (r *Receiver) Receive(ctx context.Context) (binding.Message, error)

type RegularSubscriber

type RegularSubscriber struct {
}

RegularSubscriber creates regular subscriptions

func (*RegularSubscriber) Subscribe

func (s *RegularSubscriber) Subscribe(conn *nats.Conn, subject string, cn chan *nats.Msg) (Dryer, error)

Subscribe implements Subscriber.Subscribe

type Sender

type Sender struct {
	*cn.Sender
}

func NewSenderFromConn

func NewSenderFromConn(conn *nats.Conn, subject string, opts ...cn.SenderOption) (*Sender, error)

NewSenderFromConn creates a new protocol.Sender which leaves responsibility for opening and closing the STAN connection to the caller

func (*Sender) Send

func (s *Sender) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) (err error)

type SpanAttrGetter added in v0.2.0

type SpanAttrGetter func(cloudevents.Event) opentracing.Tags

type SpanNameFormatter added in v0.2.0

type SpanNameFormatter func(cloudevents.Event) string

type SubjectQueuePool

type SubjectQueuePool struct {
	Queue    string
	Subjects []string
}

func (*SubjectQueuePool) Subscribe

func (s *SubjectQueuePool) Subscribe(conn *nats.Conn, _ string, cn chan *nats.Msg) (Dryer, error)

type Subscriber

type Subscriber interface {
	Subscribe(conn *nats.Conn, subject string, cn chan *nats.Msg) (Dryer, error)
}

The Subscriber interface allows us to configure how the subscription is created

type TeleObservability

type TeleObservability struct {
	*tel.Telemetry

	Metrics metrics.MetricsReader
	// contains filtered or unexported fields
}

TeleObservability implement cloudevents client.ObservabilityService with OpenTracing propagation This flow idempotent and not tight coupled to NATS and can easily treat any opentracing flow which provides correct context values

Producer component handled in RecordSendingEvent and has specific context requirements

func (*TeleObservability) GetSpanAttributes added in v0.2.0

func (t *TeleObservability) GetSpanAttributes(e cloudevents.Event, method string) opentracing.Tags

GetSpanAttributes returns the attributes that are always added to the spans

func (*TeleObservability) InboundContextDecorators

func (t *TeleObservability) InboundContextDecorators() []func(context.Context, binding.Message) context.Context

InboundContextDecorators returns a decorator function that allows enriching the context with the incoming parent trace. This method gets invoked automatically by passing the option 'WithObservabilityService' when creating the cloudevents client.

func (*TeleObservability) RecordCallingInvoker

func (t *TeleObservability) RecordCallingInvoker(_ctx context.Context, e *event.Event) (context.Context, func(errOrResult error))

RecordCallingInvoker consumer middleware expect special data inside containing opentracing.SpanReference which receiver should put inside ˚ consumer represent invoker model for opentracing, from tracing objectives this mean that it begin new span either and that span should be return and used by others

func (*TeleObservability) RecordReceivedMalformedEvent

func (t *TeleObservability) RecordReceivedMalformedEvent(ctx context.Context, err error)

RecordReceivedMalformedEvent if content is unpredictable

func (TeleObservability) RecordRequestEvent

func (t TeleObservability) RecordRequestEvent(ctx context.Context, _ event.Event) (context.Context, func(error, *event.Event))

func (*TeleObservability) RecordSendingEvent

func (t *TeleObservability) RecordSendingEvent(_ctx context.Context, e event.Event) (context.Context, func(errOrResult error))

RecordSendingEvent producer interceptor required context argument to be polluted with tel context creates new tracing brunch from provided inside context OpenTracing or tel data

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL