natspubsub

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2024 License: Apache-2.0 Imports: 17 Imported by: 2

README

natspubsub

A go-cloud pubsub plugin for nats io supporting jetstream as well.

Tests

Package natspubsub provides a pubsub implementation for NATS.io and Jetstream. Use OpenTopic to construct a *pubsub.Topic, and/or OpenSubscription to construct a *pubsub.Subscription. This package uses gob to encode and decode driver.Message to []byte.

URLs

For pubsub.OpenTopic and pubsub.OpenSubscription, natspubsub registers for the scheme "nats".

The default URL opener will connect to a default server based on the environment variable "NATS_SERVER_URL".

This implementation supports (NATS Server 2.2.0 or later), messages can be encoded using native NATS message headers, and native message content providing full support for non-Go clients. Operating in this manner is more efficient than using gob.Encoder. Using older versions of the server will result in errors and untested scenarios.

In the event the user cannot upgrade the nats server, we recommend the use of the in tree implementation instead: https://github.com/google/go-cloud/tree/master/pubsub/natspubsub

To customize the URL opener, or for more details on the URL format, see URLOpener. See https://gocloud.dev/concepts/urls/ for background information.

Message Delivery Semantics

NATS supports : See https://godoc.org/gocloud.dev/pubsub#hdr-At_most_once_and_At_least_once_Delivery for more background.

  1. at-most-once-semantics; applications need not call Message.Ack, applications must also not call Message.Nack.

  2. at-least-once-semantics; this mode is possible with jetstream enabled. applications are supposed to call Message.Ack to remove processed messages from the stream. Enabling this the jetstream mode requires adding the jetstream parameter in the url query.

Subscription options

All the subscription options are passed in as query parameters to the nats url supplied.

Comprehensive definitions of the options can be found here :


Option Default value Description
jetstream true Enables
subject A string of characters that form a name which the publisher and subscriber can use to find each other.
stream_name Name of a stream, Names cannot contain whitespace, ., *, >, path separators (forward or backwards slash), and non-printable characters.
stream_description A short explanation of what the stream is about
stream_subjects [] A list of subjects to bind. Wildcards are supported. Cannot be set for mirror streams.
consumer_max_count 10 How many Consumers can be defined for a given Stream, -1 for unlimited
consumer_durable If set, clients can have subscriptions bind to the consumer and resume until the consumer is explicitly deleted. A durable name cannot contain whitespace, ., *, >, path separators (forward or backwards slash), and non-printable characters.
consumer_max_waiting The maximum number of waiting pull requests.
consumer_max_request_expires_ms 30000 The maximum duration a single pull request will wait for messages to be available to pull.
consumer_request_batch 50 The maximum batch size a single pull request can make. When set with MaxRequestMaxBytes, the batch size will be constrained by whichever limit is hit first.
consumer_request_max_batch_bytes 0 The maximum total bytes that can be requested in a given batch. When set with MaxRequestBatch, the batch size will be constrained by whichever limit is hit first.
consumer_request_timeout_ms 1000 Duration the consumer waits for messages to buffer when pulling a batch
consumer_ack_wait_timeout_ms 300000 The duration that the server will wait for an ack for any individual message once it has been delivered to a consumer. If an ack is not received in time, the message will be redelivered.
consumer_max_ack_pending 100 Defines the maximum number of messages, without an acknowledgement, that can be outstanding. Once this limit is reached message delivery will be suspended. This limit applies across all of the consumer's bound subscriptions. A value of -1 means there can be any number of pending acks (i.e. no flow control). This does not apply when the AckNone policy is used.

Publishing options


Option Default value Description
Subject An identifier that publishers send messages to subscribers of interest

Documentation

Overview

Example (OpenSubscriptionFromURL)
package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
	// This URL will Dial the NATS server at the URL in the environment variable
	// NATS_SERVER_URL and receive messages with subject "example.mysubject".
	// This URL will be parsed and the natsv2 attribute will be used to
	// use NATS v2.2.0+ native message headers as the message metadata.
	subscription, err := pubsub.OpenSubscription(ctx, "nats://nats.example.com/example.mysubject?jetstream=true")
	if err != nil {
		log.Fatal(err)
	}
	defer func(subscription *pubsub.Subscription, ctx context.Context) {
		_ = subscription.Shutdown(ctx)
	}(subscription, ctx)
}
Output:

Example (OpenTopicFromURL)
package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenTopic creates a *pubsub.Connection from a URL.
	// This URL will Dial the NATS server at the URL in the environment variable
	// NATS_SERVER_URL and send messages with subject "example.mysubject".
	// This URL will be parsed and the natsv2 attribute will be used to
	// use NATS v2.2.0+ native message headers as the message metadata.
	topic, err := pubsub.OpenTopic(ctx, "nats://nats.example.com/example.mysubject")
	if err != nil {
		log.Fatal(err)
	}
	defer func(topic *pubsub.Topic, ctx context.Context) {
		_ = topic.Shutdown(ctx)
	}(topic, ctx)
}
Output:

Index

Examples

Constants

View Source
const Scheme = "nats"

Scheme is the URL scheme natspubsub registers its URLOpeners under on pubsub.DefaultMux.

Variables

This section is empty.

Functions

func OpenSubscription

OpenSubscription returns a *pubsub.Subscription representing a NATS subscription or NATS queue subscription for use with NATS at least version 2.2.0. This changes the encoding of the message as, starting with version 2.2.0, NATS supports message headers. In previous versions the message headers were encoded along with the message content using gob.Encoder, which limits the subscribers only to Go clients. This implementation uses native NATS message headers, and native message content, which provides full support for non-Go clients.

Example
// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub"
// PRAGMA: On gocloud.dev, hide lines until the next blank line.
ctx := context.Background()
natsConn, err := nats.Connect("nats://nats.example.com")
if err != nil {
	log.Fatal(err)
}
defer natsConn.Close()
conn := connections.NewPlain(natsConn)

subscription, err := natspubsub.OpenSubscription(
	ctx, conn, &connections.SubscriptionOptions{Subjects: []string{"example.mysubject"}})
if err != nil {
	log.Fatal(err)
}
defer func(subscription *pubsub.Subscription, ctx context.Context) {
	_ = subscription.Shutdown(ctx)
}(subscription, ctx)
Output:

func OpenTopic

OpenTopic returns a *pubsub.Topic for use with NATS at least version 2.2.0. This changes the encoding of the message as, starting with version 2.2.0, NATS supports message headers. In previous versions the message headers were encoded along with the message content using gob.Encoder, which limits the subscribers only to Go clients. This implementation uses native NATS message headers, and native message content, which provides full support for non-Go clients.

Example
// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub"
// PRAGMA: On gocloud.dev, hide lines until the next blank line.
ctx := context.Background()

natsConn, err := nats.Connect("nats://nats.example.com")
if err != nil {
	log.Fatal(err)
}
defer natsConn.Close()

js, err := jetstream.New(natsConn)
if err != nil {
	log.Fatal(err)
}

conn := connections.NewJetstream(js)

topic, err := natspubsub.OpenTopic(ctx, conn, &connections.TopicOptions{Subject: "example.mysubject"})
if err != nil {
	log.Fatal(err)
}
defer func(topic *pubsub.Topic, ctx context.Context) {
	_ = topic.Shutdown(ctx)
}(topic, ctx)
Output:

Types

type URLOpener

type URLOpener struct {
	Connection connections.Connection
	// TopicOptions specifies the options to pass to OpenTopic.
	TopicOptions connections.TopicOptions
	// SubscriptionOptions specifies the options to pass to OpenSubscription.
	SubscriptionOptions connections.SubscriptionOptions
}

URLOpener opens NATS URLs like "nats://mysubject".

The URL host+path is used as the subject.

No query parameters are supported.

func (*URLOpener) OpenSubscriptionURL

func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)

OpenSubscriptionURL opens a pubsub.Subscription based on url supplied.

 A subscription also creates the required underlaying queue or streams
 There are many more parameters checked in this case compared to the publish topic section.
 If required the list of parameters can be extended but for now only a subset is defined and
 the remaining ones utilize the sensible defaults that nats comes with.

	The list of parameters include :

		- subject,
		- stream_name,
		- stream_description,
		- stream_subjects,
		- consumer_max_count,
		- consumer_queue
		- consumer_max_waiting
		-

func (*URLOpener) OpenTopicURL

func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error)

OpenTopicURL opens a pubsub.Topic based on a url supplied.

A topic can be specified in the subject and suffixed by the url path
These definitions will yield the subject shown infront of them

	- nats://host:8934?subject=foo --> foo
	- nats://host:8934/bar?subject=foo --> foo/bar
	- nats://host:8934/bar --> /bar
	- nats://host:8934?no_subject=foo --> [this yields an error]

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL