nats

package module
v2.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 13, 2023 License: GPL-3.0 Imports: 23 Imported by: 8

README

NATS module

Overview

Observability stack for NATS microservice services. Because of NATS not supported any middleware we was forced to reinvent by ourselves, and we had been chosen just wrap nats.Conn as one simplest way to do it.

Unfortunately, we were not able to perform composition with original nats.Con structure, and we found that it could bring users in confusions since not full functions are covers.

Middleware concept allowed to decompose business logic in small mw components. Thus, we offer replacement of MsgHandler with context and error return.

How to start

go get github.com/tel-io/instrumentation/middleware/nats/v2@latest 
// create connection to nats
con, _ := nats.Connect(addr)

// wrap it 
mw := natsmw.New(natsmw.WithTel(t)).Use(con)

Features

  • Decorated instance has near legacy signature
  • Build-IN Trace, Logs, Metrics, Recovery middlewares
  • NATS Core fully supported functionality: async sub, pub, request, reply
  • NATS JetStream: partial support
  • Grafana Dashboard covered sub/pub,request,reply
  • *nats.Subscription all wrapped function return attached subscription watcher who scrap metrics
Consumer
NATS Core

SubscribeMW and QueueSubscribeMW function just backport compatibility for our previous version where signature was:

type PostFn func(ctx context.Context, sub string, data []byte) ([]byte, error)
  • Subscribe
  • QueueSubscribe

NOTE:

Example:

func main(){
    // create nats connection
    con, _ := nats.Connect(addr)
    // create our mw from nats connection
    mw := natsmw.New(natsmw.WithTel(t)).Use(con)
    // we have handler already used our new-brand handler
    ourHandler := func(ctx context.Context, msg *nats.Msg) error {
        // perform respond with possible error returning
        return msg.Respond([]byte("HELLO"))
    }
	
    // subscribe to queue via our middleware with our `ourHandler`
    subscribe, _ := mw.QueueSubscribe("nats.demo", "consumer", ourHandler)
	
	// or without queue
    subscribe, _ := mw.Subscribe("nats.demo",  ourHandler)
}
QueueSubscribeSyncWithChan
func main(){
    // create nats connection
    con, _ := nats.Connect(addr)
    // create our mw from nats connection
    mw := natsmw.New(natsmw.WithTel(t)).Use(con)
    // we have handler already used our new-brand handler
    ourHandler := nConn.BuildWrappedHandler(func(ctx context.Context, msg *nats.Msg) error{
        // perform respond with possible error returning
        return msg.Respond([]byte("HELLO"))
    })
	
	//create channel
	ch := make(chan *nats.Msg)
	// create subscription
	_, _ = nConn.QueueSubscribeSyncWithChan("sub", "queue", ch)
	// just read
	for msg := range ch {
        ourHandler(msg)
	}
}
JetStream

You should create JetStream from our wrapper For subscription, we covered Subscribe and QueueSubscribe as push stack, which quite less popular as it provide not optimized for horizontal scale

Here is example:

func main(){
	// create nats connection
    con, _ := nats.Connect(addr)
    // create our mw from nats connection
    mw := natsmw.New(natsmw.WithTel(t)).Use(con)
	
    // we have handler already used our new-brand handler
    ourHandler := func(ctx context.Context, msg *nats.Msg) error {
        _ = msg.Ack()
        return nil
    }

	// create wrapped js instance
    js, _ := mw.JetStream()
	
    // simple subscription with our handler to js
    handler := js.Subscribe("nats.demo",  ourHandler)
	
	// subscription with queue to js
    handler := js.QueueSubscribe("nats.demo",  "consumer", ourHandler)
}
BuildWrappedHandler

BuildWrappedHandler feature allow wrap any native function with middleware stack, allow to build middleware handler for function which not covered.

Here is example with jetstream:

func main(){
    // some context, it could be signal closer or just stub
    ccx := context.TODO()
    
    // create tel instance with ccx provided ccx context
    t, closer := tel.New(ccx, tel.DefaultDebugConfig())
    defer closer()

	// create nats connection
    con, _ := nats.Connect(addr)
    // create our mw from nats connection
    mw := natsmw.New(natsmw.WithTel(t)).Use(con)
	
    // we have handler already used our new-brand handler
    ourHandler := func(ctx context.Context, msg *nats.Msg) error {
        _ = msg.Ack()
        return nil
    }
    
    // create wrapped js instance
    js, _ := mw.JetStream()

    // but PullSubscribe don't process any handler, but we would like observe this process
    sub, _ := js.PullSubscribe("stream.demo", "PULL")
	
    // so we just create function which func(msg *nats.Msg) but would be processed with our `ourHandler` processor
    handler := mw.BuildWrappedHandler(ourHandler)

    for{
        msgs, _ := v.Fetch(100)
        for _, msg := range msgs {
            //and here we process it
            handler(msg)
    }
}
Producer

All our produced function receive ctx and name has WithContext suffix.

WARNING! Context should contain tel context, as most important feature is to continue span of traces

NATS Core

Here is example:

func main(){
	// some context, it could be signal closer or just stub
	ccx := context.TODO()
	
	// create tel instance with ccx provided ccx context
    t, closer := tel.New(ccx, tel.DefaultDebugConfig())
    defer closer()

	// wrap ccx with context
    ctx := tel.WithContext(ccx, t)
	
	// create nats connection
    con, _ := nats.Connect(addr)
	
    // create our mw from nats connection
    mw := natsmw.New(natsmw.WithTel(t)).Use(con)
	
	// send just with subject and body
    _ = con.PublishWithContext(ctx, "nats.test", []byte("HELLO_WORLD"))
	
	// or with native nats.Msg 
    _ = con.PublishMsgWithContext(ctx, &nats.Msg{Subject:"nats.test", Data: []byte("HELLO_WORLD"})	
	
	// or none-blocking request - assume that reply would be provided further
	_ = con.PublishRequestWithContext(ctx, "nats.test", "nats.reply", []byte("HELLO_WORLD"))

	// request with reply
    reply, _ = con.RequestWithContext(ctx, "nats.test",  []byte("HELLO_WORLD"))

    // request with reply via nats.Msg
    reply, _ = con.RequestWithContext(ctx, &nats.Msg{Subject:"nats.test", Data: []byte("HELLO_WORLD"})
}
JetStream

There is no helper yet

Documentation

Overview

Example (Handler)
tele := tel.NewNull()
ctx := tele.Ctx()

conn, _ := nats.Connect("example.com")
nConn := New(WithTel(tele)).Use(conn)

// legacy backport
cbLegacy := func(ctx context.Context, sub string, data []byte) ([]byte, error) {
	return nil, nil
}

cb := func(ctx context.Context, msg *nats.Msg) error {
	return nil
}

_, _ = nConn.QueueSubscribeMW("sub", "queue", cbLegacy)
_, _ = nConn.QueueSubscribeMW("sub2", "queue", cbLegacy)

// sub
_, _ = nConn.Subscribe("sub", cb)
_, _ = nConn.QueueSubscribe("sub", "xxx", cb)

// sync sub with wrap
ourHandler := nConn.BuildWrappedHandler(func(ctx context.Context, msg *nats.Msg) error {
	// perform respond with possible error returning
	return msg.Respond([]byte("HELLO"))
})
ch := make(chan *nats.Msg)
_, _ = nConn.QueueSubscribeSyncWithChan("sub", "queue", ch)
for msg := range ch {
	ourHandler(msg)
}

// pub
_ = nConn.PublishWithContext(ctx, "sub", []byte("HELLO"))
_ = nConn.PublishMsgWithContext(ctx, &nats.Msg{})
_ = nConn.PublishRequestWithContext(ctx, "sub", "reply", []byte("HELLO"))
_, _ = nConn.RequestWithContext(ctx, "sub", []byte("HELLO"))
_, _ = nConn.RequestMsgWithContext(ctx, &nats.Msg{})
Output:

Index

Examples

Constants

View Source
const (
	KindKey    = "kind_of"
	PayloadKey = "payload"
)
View Source
const (
	Subject  = attribute.Key("subject")
	Reply    = attribute.Key("reply")
	IsError  = attribute.Key("error")
	Kind     = attribute.Key(KindKey)
	Duration = attribute.Key("duration")
)

Attribute keys that can be added to a span.

View Source
const (
	KindUnk     = "UNK"
	KindSub     = "SUB"
	KindPub     = "PUB"
	KindRequest = "REQUEST"
	KindRespond = "RESPOND"
	KindReply   = "REPLY"
)
View Source
const (
	Count         = "nats.count"          // Incoming request count total
	ContentLength = "nats.content_length" // Incoming request bytes total
	Latency       = "nats.duration"       // Incoming end to end duration, microseconds

	SubscriptionsPendingCount = "nats.subscriptions.pending.msgs"
	SubscriptionsPendingBytes = "nats.subscriptions.pending.bytes"
	SubscriptionsDroppedMsgs  = "nats.subscriptions.dropped.count"
	SubscriptionCountMsgs     = "nats.subscriptions.send.count"
)

Server NATS metrics

Variables

View Source
var ErrMultipleMiddleWare = errors.New("not allow create multiple instances")

Functions

func ExtractAttributes added in v2.0.2

func ExtractAttributes(msg *nats.Msg, kind string, additional bool) []attribute.KeyValue

ExtractAttributes ... @additional - handle business cases

func ReplyFn added in v2.0.3

func ReplyFn(ctx context.Context, msg *nats.Msg, data []byte) error

ReplyFn reply helper which send reply with wrapping trace information

func SemVersion

func SemVersion() string

SemVersion is the semantic version to be supplied to tracer/meter creation.

func Version

func Version() string

Version is the current release version of the otelnats instrumentation.

func WrapKindOfContext added in v2.0.2

func WrapKindOfContext(ctx context.Context, kindOf string) context.Context

WrapKindOfContext create baggage which update context subMiddleware related info about kind of event return ctx contained baggage with KindKey or just return none touched source ctx

Types

type CommonPublish

type CommonPublish struct {
	Conn natsPublisher
	// contains filtered or unexported fields
}

func NewCommonPublish

func NewCommonPublish(conn *nats.Conn, interceptor Interceptor) *CommonPublish

NewCommonPublish create instance of wrapper publisher

func (*CommonPublish) PublishMsgWithContext

func (c *CommonPublish) PublishMsgWithContext(ctx context.Context, msg *nats.Msg) error

PublishMsgWithContext publishes the Msg structure, which includes the Subject, an optional Reply and an optional Data field.

func (*CommonPublish) PublishRequestWithContext

func (c *CommonPublish) PublishRequestWithContext(ctx context.Context, subj, reply string, data []byte) error

PublishRequestWithContext will perform a Publish() expecting a response on the reply subject. Use Request() for automatically waiting for a response inline.

func (*CommonPublish) PublishWithContext

func (c *CommonPublish) PublishWithContext(ctx context.Context, subj string, data []byte) error

PublishWithContext publishes the data argument to the given subject. The data argument is left untouched and needs to be correctly interpreted on the receiver.

func (*CommonPublish) RequestMsgWithContext

func (c *CommonPublish) RequestMsgWithContext(ccx context.Context, m *nats.Msg) (res *nats.Msg, err error)

RequestMsgWithContext takes a context, a subject and payload in bytes and request expecting a single response.

func (*CommonPublish) RequestWithContext

func (c *CommonPublish) RequestWithContext(ctx context.Context, subj string, data []byte) (*nats.Msg, error)

RequestWithContext will send a request payload and deliver the response message, or an error, including a timeout if no message was received properly.

type CommonSubscribe added in v2.0.2

type CommonSubscribe struct {
	*Core
	// contains filtered or unexported fields
}

func NewCommonSubscriber added in v2.0.2

func NewCommonSubscriber(conn *nats.Conn, core *Core) *CommonSubscribe

func (*CommonSubscribe) BuildWrappedHandler added in v2.0.2

func (c *CommonSubscribe) BuildWrappedHandler(next MsgHandler) nats.MsgHandler

BuildWrappedHandler allow to create own mw, for bach processing for example or so on

func (CommonSubscribe) DefaultMiddleware added in v2.0.2

func (c CommonSubscribe) DefaultMiddleware() []Middleware

DefaultMiddleware subInter interceptor

func (*CommonSubscribe) QueueSubscribe added in v2.0.2

func (c *CommonSubscribe) QueueSubscribe(subj, queue string, cb MsgHandler) (*nats.Subscription, error)

QueueSubscribe creates an asynchronous queue subscriber on the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message asynchronously.

func (*CommonSubscribe) QueueSubscribeMW added in v2.0.2

func (c *CommonSubscribe) QueueSubscribeMW(subj, queue string, next PostFn) (*nats.Subscription, error)

QueueSubscribeMW mw callback function, just legacy Deprecated: just backport compatibility for PostFn legacy

func (*CommonSubscribe) QueueSubscribeSyncWithChan added in v2.0.2

func (c *CommonSubscribe) QueueSubscribeSyncWithChan(subj, queue string, ch chan *nats.Msg) (*nats.Subscription, error)

QueueSubscribeSyncWithChan will express interest in the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message, which will be placed on the channel. You should not close the channel until sub.Unsubscribe() has been called.

NOTE: middleware only subscription hook performed

func (*CommonSubscribe) Subscribe added in v2.0.2

func (c *CommonSubscribe) Subscribe(subj string, cb MsgHandler) (*nats.Subscription, error)

Subscribe will express interest in the given subject. The subject can have wildcards. There are two type of wildcards: * for partial, and > for full. A subscription on subject time.*.east would receive messages sent to time.us.east and time.eu.east. A subscription on subject time.us.> would receive messages sent to time.us.east and time.us.east.atlanta, while time.us.* would only match time.us.east since it can't match more than one token. Messages will be delivered to the associated MsgHandler.

func (*CommonSubscribe) SubscribeMW added in v2.0.2

func (c *CommonSubscribe) SubscribeMW(subj string, cb PostFn) (*nats.Subscription, error)

SubscribeMW backport compatible function for previous mw approach Deprecated: just backport compatibility for PostFn legacy

type ConnContext

type ConnContext struct {
	Publish
	Subscriber

	*Core
	// contains filtered or unexported fields
}

ConnContext wrapper for nats.ConnContext aks mw connection approach

Features: Expose subscription stats via function overwrite

func (*ConnContext) Conn

func (c *ConnContext) Conn() *nats.Conn

Conn unwrap connection

func (ConnContext) DefaultMiddleware

func (c ConnContext) DefaultMiddleware() []Middleware

DefaultMiddleware subInter interceptor

func (*ConnContext) JetStream

func (c *ConnContext) JetStream(opts ...nats.JSOpt) (*JetStreamContext, error)

JetStream returns a JetStreamContext wrapper for consumer

type Core

type Core struct {
	// contains filtered or unexported fields
}

Core features for context

func New

func New(opts ...Option) *Core

New subMiddleware instance

func (Core) DefaultMiddleware

func (c Core) DefaultMiddleware() []Middleware

DefaultMiddleware subInter interceptor

func (*Core) Use added in v2.0.1

func (c *Core) Use(conn *nats.Conn) *ConnContext

Use connection with subMiddleware

type Interceptor added in v2.0.2

type Interceptor func(next MsgHandler) MsgHandler

Interceptor ...

func ChainInterceptor added in v2.0.2

func ChainInterceptor(interceptors ...Interceptor) Interceptor

func MiddlewareChain added in v2.0.2

func MiddlewareChain(mw ...Middleware) Interceptor

MiddlewareChain - MsgHandler decorator with subMiddleware

type JetStreamContext

type JetStreamContext struct {
	*Core
	// contains filtered or unexported fields
}

func (JetStreamContext) DefaultMiddleware

func (c JetStreamContext) DefaultMiddleware() []Middleware

DefaultMiddleware subInter interceptor

func (*JetStreamContext) JS

func (j *JetStreamContext) JS() nats.JetStreamContext

JS unwrap

func (*JetStreamContext) PullSubscribe

func (j *JetStreamContext) PullSubscribe(subj, durable string, opts ...nats.SubOpt) (*nats.Subscription, error)

PullSubscribe creates a Subscription that can fetch messages. See important note in Subscribe(). Additionally, for an ephemeral pull consumer, the "durable" value must be set to an empty string.

Only wrap Subscription for gather stats

func (*JetStreamContext) QueueSubscribe

func (j *JetStreamContext) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)

QueueSubscribe creates a Subscription with a queue group. If no optional durable name nor binding options are specified, the queue name will be used as a durable name. See important note in Subscribe()

func (*JetStreamContext) Subscribe

func (j *JetStreamContext) Subscribe(subj string, cb MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)

Subscribe creates an async Subscription for JetStream. The stream and consumer names can be provided with the nats.Bind() option. For creating an ephemeral (where the consumer name is picked by the server), you can provide the stream name with nats.BindStream(). If no stream name is specified, the library will attempt to figure out which stream the subscription is for. See important notes below for more details.

IMPORTANT NOTES: * If none of the options Bind() nor Durable() are specified, the library will send a request to the server to create an ephemeral JetStream consumer, which will be deleted after an Unsubscribe() or Drain(), or automatically by the server after a short period of time after the NATS subscription is gone. * If Durable() option is specified, the library will attempt to lookup a JetStream consumer with this name, and if found, will bind to it and not attempt to delete it. However, if not found, the library will send a request to create such durable JetStream consumer. Note that the library will delete the JetStream consumer after an Unsubscribe() or Drain() only if it created the durable consumer while subscribing. If the durable consumer already existed prior to subscribing it won't be deleted. * If Bind() option is provided, the library will attempt to lookup the consumer with the given name, and if successful, bind to it. If the lookup fails, then the Subscribe() call will return an error.

type Logs

type Logs struct {
	// contains filtered or unexported fields
}

Logs dump some payload

func NewLogs

func NewLogs(fn NameFn, dumpPayloadOnError, dumpRequest bool) *Logs

type Middleware

type Middleware interface {
	// contains filtered or unexported methods
}

type MsgHandler

type MsgHandler func(ctx context.Context, msg *nats.Msg) error

MsgHandler our desired way to handle subscriptions ctx allow inside function continue traces or pass log attachment error return allow subMiddleware to understand behaviour of system what has gone here, and it could change differently

type NameFn

type NameFn func(kind string, msg *nats.Msg) string

NameFn operation name description

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option allows configuration of the httptrace Extract() and Inject() functions.

func WithDisableDefaultMiddleware added in v2.0.2

func WithDisableDefaultMiddleware() Option

WithDisableDefaultMiddleware disable default middleware usage

func WithDump added in v2.0.2

func WithDump(enable bool) Option

WithDump dump request as plain text to log and trace i guess we can go further and perform option with encoding requests

func WithDumpPayloadOnError

func WithDumpPayloadOnError(enable bool) Option

WithDumpPayloadOnError write dump request and response on faults

Default: true

func WithNameFunction

func WithNameFunction(fn NameFn) Option

func WithPostHook

func WithPostHook(cb PostHook) Option

WithPostHook set (only one) where you can perform post handle operation with data provided by handler Deprecated: legacy usage only

func WithPubMiddleware added in v2.0.2

func WithPubMiddleware(list ...Middleware) Option

WithPubMiddleware for publish

func WithReply

func WithReply(inject bool) Option

WithReply extend mw with automatically sending reply on nats requests if they ask with data provided @inject - wrap nats.Msg handler with OTEL propagation data - extend traces, baggage and etc. Deprecated: legacy usage only

func WithSubMiddleware added in v2.0.2

func WithSubMiddleware(list ...Middleware) Option

WithSubMiddleware for subscriptions

func WithTel

func WithTel(t tel.Telemetry) Option

WithTel in some cases we should put another version

type PostFn

type PostFn func(ctx context.Context, sub string, data []byte) ([]byte, error)

PostFn callback function which got new instance of tele inside ctx and msg sub + data Deprecated: legacy function, but we use it via conn wrapper: QueueSubscribeMW or SubscribeMW just for backport compatibility

type PostHook

type PostHook func(ctx context.Context, msg *nats.Msg, data []byte) error

type Publish

type Publish interface {
	PublishWithContext(ctx context.Context, subj string, data []byte) error
	PublishMsgWithContext(ctx context.Context, msg *nats.Msg) error
	PublishRequestWithContext(ctx context.Context, subj, reply string, data []byte) error
	RequestWithContext(ctx context.Context, subj string, data []byte) (*nats.Msg, error)
	RequestMsgWithContext(ctx context.Context, msg *nats.Msg) (*nats.Msg, error)
}

type Recovery

type Recovery struct{}

func NewRecovery

func NewRecovery() *Recovery

type SubMetrics

type SubMetrics struct {
	// contains filtered or unexported fields
}

SubMetrics implement Middleware interface

func NewMetrics

func NewMetrics(m *metrics) *SubMetrics

type Subscriber added in v2.0.2

type Subscriber interface {
	Subscribe(subj string, cb MsgHandler) (*nats.Subscription, error)
	QueueSubscribe(subj, queue string, cb MsgHandler) (*nats.Subscription, error)
	QueueSubscribeMW(subj, queue string, next PostFn) (*nats.Subscription, error)
	SubscribeMW(subj string, cb PostFn) (*nats.Subscription, error)

	QueueSubscribeSyncWithChan(subj, queue string, ch chan *nats.Msg) (*nats.Subscription, error)

	BuildWrappedHandler(next MsgHandler) nats.MsgHandler
}

type SubscriptionStatMetric

type SubscriptionStatMetric struct {
	// contains filtered or unexported fields
}

SubscriptionStatMetric hook provide important subscription statistics

func NewSubscriptionStatMetrics

func NewSubscriptionStatMetrics(opts ...Option) (*SubscriptionStatMetric, error)

func (*SubscriptionStatMetric) Hook

func (s *SubscriptionStatMetric) Hook(sub *nats.Subscription, err error) (*nats.Subscription, error)

func (*SubscriptionStatMetric) Register

func (s *SubscriptionStatMetric) Register(sub ...*nats.Subscription)

type Tracer

type Tracer struct {
	// contains filtered or unexported fields
}

Tracer for subscribers implementing Middleware

func NewTracer

func NewTracer(fn NameFn) *Tracer

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL