gomongostreams

package module
v0.1.2-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 11, 2021 License: MIT Imports: 11 Imported by: 0

README

What is go-mongo-streams?

Go Mongo Streams is a small library for integrating MongoDB Change Stream into a golang project. This library has been developed to work with GQLGen subscriptions. It employs a Publisher Subscriber pattern, where multiple subscribers can wait for ChangeStream events of MongoDB.

How to Use?

Install The Package And Follow The Below Steps.

1 ) Create a SubscriptionManager :

Create a new Subscription Manager using the NewSubscriptionManager function. It takes in a pointer of a mongodb database as an argument. Typically one Subscription Manager is enough for a mongodb database instance. A Subscription manager holds multiple publishers. Each of which is responsible for listening to a single change stream event.

var err error
	var ctx, cancel = context.WithCancel(context.Background())

	client, err := mongo.Connect(context.Background(), options.Client().ApplyURI("dbURI").SetMaxPoolSize(1).SetConnectTimeout(15*time.Second))
	if err != nil {
		panic(err)
	}
	instance := client.Database("dbName")

	subscriptionManager := gomongostreams.NewSubscriptionManager(instance)
2 ) Create a Publisher :

Create a publisher by calling the GetPublisher function on the subscription manager instance. You need to pass the collection name on which the publisher has to listen and a mongodb filter. If no filter is required, then use "mongo.Pipeline{}" as the second argument. Note: GetPublisher is idempotent, and would return the same publisher instance for the same collection name and filter combination. This allows reusing the same publisher to serve multiple subscribers, listening for the same data.

tid, err := primitive.ObjectIDFromHex("60e8eecdea69f2f6cf10530f")
	if err != nil {
		return
	}

	matchID := bson.D{
		{"$match", bson.M{"fullDocument._id": tid}},
	}

	pipeline := mongo.Pipeline{matchID}

	publisher := subscriptionManager.GetPublisher("tasks", pipeline)
3 ) Implement the Subscriber interface:

To subscribe to the publisher, you need a Struct that implements the Subscriber interface. And this allows the publisher to call the "OnEvent" function of the subscriber interface when there is a new event.

type TaskSubscriber struct {
	channel chan *Task
}

func (t *TaskSubscriber) OnEvent(data interface{}) error {
	var task = Task{}
	bsonBytes, err := bson.Marshal(data)
	if err != nil {
		log.Println(err.Error())
	}
	err = bson.Unmarshal(bsonBytes, &task)
	if err != nil {
		log.Println(err.Error())
	}
	t.channel <- &task
	return nil
}
4 ) Subscribe to the Publisher:

The final step is to subscribe to the publisher by calling its Subscribe method and passing in the subscriber instance.

Documentation

Overview

Package gomongostreams provides a publisher subscriber model for mongo watchstreams. The Subscriber interface can be extended to return a channel, which can be used in graphql subscriptions Refer to this page to know more about mongo change streams -> https://www.mongodb.com/basics/change-streams !Mongo change streams work only on replica servers and not on standalone servers

Index

Constants

View Source
const Version = "0.1.0"

Version of the package

Variables

This section is empty.

Functions

This section is empty.

Types

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher listens to a changestream even generated on a mongodb collection. In order for the publisher to run, it needs a collection object on which it listens and a filter of type mongo.Pipeline which can be used to listen for specific events on the collection.

func (*Publisher) Subscribe

func (p *Publisher) Subscribe(ctx context.Context, subscriber *Subscriber)

Subscribe - publisher will add the @subscriber to its list and notifies on new events by calling OnEvent method of the subscriber The subscriber should implement the Subscriber interface, refer to examples for more information. Remove the subscription by calling cancel() function of the context

type Subscriber

type Subscriber interface {
	OnEvent(data interface{}) error
}

Subscriber interface needs to be implemented to subscribe to the publisher. The publisher will call the OnEvent method of the subscriber and provide the data retrived from the mongo change stream.

type SubscriptionManager

type SubscriptionManager struct {
	// contains filtered or unexported fields
}

SubscriptionManager holds a map of publishers. It creates a key for the publisher map, which is a combination of the collectionName and filter , which allows reuse of publishers. ! Once instance of Subscription Manager is enough for a Database

func NewSubscriptionManager

func NewSubscriptionManager() *SubscriptionManager

NewSubscriptionManager Creates a new Subscription manager

func (*SubscriptionManager) GetPublisher

func (s *SubscriptionManager) GetPublisher(collection *mongo.Collection, filter mongo.Pipeline) (*Publisher, error)

GetPublisher creates or retrives a Publisher. It creates a key for the publisher, which is a combination of the collectionName and filter, which allows reuse of publishers. If there is publisher matching the key , a new publisher is created

func (*SubscriptionManager) Shutdown

func (s *SubscriptionManager) Shutdown()

Shutdown stops all the publishers.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL