natspubsub

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2022 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package natspubsub provides a pubsub implementation for NATS.io. Use OpenTopic to construct a *pubsub.Topic, and/or OpenSubscription to construct a *pubsub.Subscription. This package uses gob to encode and decode driver.Message to []byte.

URLs

For pubsub.OpenTopic and pubsub.OpenSubscription, natspubsub registers for the scheme "nats". The default URL opener will connect to a default server based on the environment variable "NATS_SERVER_URL". To customize the URL opener, or for more details on the URL format, see URLOpener. See https://sraphs.github.io/gdk/concepts/urls/ for background information.

Message Delivery Semantics

NATS supports at-most-semantics; applications need not call Message.Ack, and must not call Message.Nack. See https://godoc.org/github.com/sraphs/gdk/pubsub#hdr-At_most_once_and_At_least_once_Delivery for more background.

As

natspubsub exposes the following types for As:

  • Topic: *nats.Conn
  • Subscription: *nats.Subscription
  • Message.BeforeSend: None.
  • Message.AfterSend: None.
  • Message: *nats.Msg
Example (OpenQueueSubscriptionFromURL)
package main

import (
	"context"
	"log"

	"github.com/sraphs/gdk/pubsub"
)

func main() {
	// PRAGMA: This example is used on github.com/sraphs/gdk; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On github.com/sraphs/gdk, add a blank import: _ "github.com/sraphs/gdk/pubsub/natspubsub"
	// PRAGMA: On github.com/sraphs/gdk, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
	// This URL will Dial the NATS server at the URL in the environment variable
	// NATS_SERVER_URL and receive messages with subject "example.my-subject"
	// This URL will be parsed and the queue attribute will be used as the Queue parameter when creating the NATS Subscription.
	subscription, err := pubsub.OpenSubscription(ctx, "nats://example.my-subject?queue=myqueue")
	if err != nil {
		log.Fatal(err)
	}
	defer subscription.Shutdown(ctx)
}
Output:

Example (OpenSubscriptionFromURL)
package main

import (
	"context"
	"log"

	"github.com/sraphs/gdk/pubsub"
)

func main() {
	// PRAGMA: This example is used on github.com/sraphs/gdk; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On github.com/sraphs/gdk, add a blank import: _ "github.com/sraphs/gdk/pubsub/natspubsub"
	// PRAGMA: On github.com/sraphs/gdk, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
	// This URL will Dial the NATS server at the URL in the environment variable
	// NATS_SERVER_URL and receive messages with subject "example.my-subject".
	subscription, err := pubsub.OpenSubscription(ctx, "nats://example.my-subject")
	if err != nil {
		log.Fatal(err)
	}
	defer subscription.Shutdown(ctx)
}
Output:

Example (OpenTopicFromURL)
package main

import (
	"context"
	"log"

	"github.com/sraphs/gdk/pubsub"
)

func main() {
	// PRAGMA: This example is used on github.com/sraphs/gdk; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On github.com/sraphs/gdk, add a blank import: _ "github.com/sraphs/gdk/pubsub/natspubsub"
	// PRAGMA: On github.com/sraphs/gdk, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
	// This URL will Dial the NATS server at the URL in the environment variable
	// NATS_SERVER_URL and send messages with subject "example.my-subject".
	topic, err := pubsub.OpenTopic(ctx, "nats://example.my-subject")
	if err != nil {
		log.Fatal(err)
	}
	defer topic.Shutdown(ctx)
}
Output:

Index

Examples

Constants

View Source
const Scheme = "nats"

Scheme is the URL scheme natspubsub registers its URLOpeners under on pubsub.DefaultMux.

Variables

This section is empty.

Functions

func OpenSubscription

func OpenSubscription(nc *nats.Conn, subject string, opts *SubscriptionOptions) (*pubsub.Subscription, error)

OpenSubscription returns a *pubsub.Subscription representing a NATS subscription or NATS queue subscription. The subject is the NATS Subject to subscribe to; for more info, see https://nats.io/documentation/writing_applications/subjects.

Example
// PRAGMA: This example is used on github.com/sraphs/gdk; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On github.com/sraphs/gdk, hide lines until the next blank line.
ctx := context.Background()

natsConn, err := nats.Connect("nats://nats.example.com")
if err != nil {
	log.Fatal(err)
}
defer natsConn.Close()

subscription, err := natspubsub.OpenSubscription(
	natsConn,
	"example.my-subject",
	nil)
if err != nil {
	log.Fatal(err)
}
defer subscription.Shutdown(ctx)
Output:

func OpenTopic

func OpenTopic(nc *nats.Conn, subject string, _ *TopicOptions) (*pubsub.Topic, error)

OpenTopic returns a *pubsub.Topic for use with NATS. The subject is the NATS Subject; for more info, see https://nats.io/documentation/writing_applications/subjects.

Example
// PRAGMA: This example is used on github.com/sraphs/gdk; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On github.com/sraphs/gdk, hide lines until the next blank line.
ctx := context.Background()

natsConn, err := nats.Connect("nats://nats.example.com")
if err != nil {
	log.Fatal(err)
}
defer natsConn.Close()

topic, err := natspubsub.OpenTopic(natsConn, "example.my-subject", nil)
if err != nil {
	log.Fatal(err)
}
defer topic.Shutdown(ctx)
Output:

Types

type SubscriptionOptions

type SubscriptionOptions struct {
	// Queue sets the subscription as a QueueSubcription.
	// For more info, see https://docs.nats.io/nats-concepts/queue.
	Queue string
}

SubscriptionOptions sets options for constructing a *pubsub.Subscription backed by NATS.

type TopicOptions

type TopicOptions struct{}

TopicOptions sets options for constructing a *pubsub.Topic backed by NATS.

type URLOpener

type URLOpener struct {
	// Connection to use for communication with the server.
	Connection *nats.Conn
	// TopicOptions specifies the options to pass to OpenTopic.
	TopicOptions TopicOptions
	// SubscriptionOptions specifies the options to pass to OpenSubscription.
	SubscriptionOptions SubscriptionOptions
}

URLOpener opens NATS URLs like "nats://my-subject".

The URL host+path is used as the subject.

No query parameters are supported.

func (*URLOpener) OpenSubscriptionURL

func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)

OpenSubscriptionURL opens a pubsub.Subscription based on u.

func (*URLOpener) OpenTopicURL

func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error)

OpenTopicURL opens a pubsub.Topic based on u.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL