pubsubsse

package module
v0.0.0-...-a727c88 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2023 License: MIT Imports: 8 Imported by: 0

README

PubSub-SSE

Overview

PubSub-SSE is a Go-based server-sent events (SSE) publication and subscription service. It provides real-time data streaming from a server to connected clients using HTTP. This service is particularly useful for applications that require live data updates, such as dashboards, live feeds, or any real-time monitoring system.

Features

  • Real-Time Data Streaming: Utilizes SSE to push live data updates to clients.
  • Topic-Based Subscriptions: Supports public, private, and group topics for targeted data distribution.
  • Dynamic Topic Handling: Add, remove, subscribe, and unsubscribe from topics at runtime.
  • Flexible Topic Hierarchy: Topics can have nested subtopics for granular control.
  • Client Management: Add and remove clients dynamically.

How It Works

The service uses Go's net/http package to handle SSE connections. Clients receive JSON-formatted data, consisting of system events (sys) and data updates (updates). The data format includes information about topics (public, private, group), subscribed topics, and updated data for subscribed topics. Topic Types

  • Public Topics: Visible and subscribable by all clients.
  • Private Topics: Exclusive to individual clients.
  • Group Topics: Shared among clients in the same group.

Topic Subscription Hierarchy

  • Subscribing to a topic also includes all its subtopics.
  • Topics and subtopics can be nested indefinitely.

Contribute

Contributions to extend or improve the PubSub-SSE are welcome. Please follow standard Go coding practices and provide documentation for new features.

Dev Docs

Server side (golang)

Usage
func main() {
	// Create a new SSEPubSubService
	ssePubSub := NewSSEPubSubService()

	// Handle endpoints
	// You can write your own endpoints if you want. Just have a look at the examples and modify them to your needs.
	http.HandleFunc("/add/user", func(w http.ResponseWriter, r *http.Request) { AddClient(ssePubSub, w, r) })                 // Add client endpoint
	http.HandleFunc("/add/topic/public/", func(w http.ResponseWriter, r *http.Request) { AddPublicTopic(ssePubSub, w, r) })   // Add topic endpoint
	http.HandleFunc("/add/topic/private/", func(w http.ResponseWriter, r *http.Request) { AddPrivateTopic(ssePubSub, w, r) }) // Add topic endpoint
	http.HandleFunc("/sub", func(w http.ResponseWriter, r *http.Request) { Subscribe(ssePubSub, w, r) })                      // Subscribe endpoint
	http.HandleFunc("/unsub", func(w http.ResponseWriter, r *http.Request) { Unsubscribe(ssePubSub, w, r) })                  // Unsubscribe endpoint
	http.HandleFunc("/event", func(w http.ResponseWriter, r *http.Request) { Event(ssePubSub, w, r) })                        // Event SSE endpoint
	go func() {
		log.Fatal(http.ListenAndServe(":8080", nil)) // Start http server
	}()

	// Create a new client and get it by id
	client := ssePubSub.NewClient()
	client, _ = ssePubSub.GetClientByID(client.GetID())
	fmt.Println("Client ID:", client.GetID())

	// Create a public topic
	pubTopic := ssePubSub.NewPublicTopic("server/status")

	// Get topic by name. 3 ways to get a public topic:
	pubTopic, _ = ssePubSub.GetPublicTopicByName("server/status")
	pubTopic, _ = client.GetTopicByName("server/status")
	pubTopic, _ = client.GetPublicTopicByName("server/status")

	// Subscribe to the topic
	client.Sub(pubTopic)

	// Send data to the topic
	pubTopic.Pub(TestData{Testdata: "testdata"})

	// Unsubscribe from topic
	client.Unsub(pubTopic)

	// Remove public topic
	ssePubSub.RemovePublicTopic(pubTopic)

	// Create a private topic
	privTopic := client.NewPrivateTopic("test/server")

	// Get topic by name. 2 ways to get a private topic:
	privTopic, _ = client.GetTopicByName("test/server")
	privTopic, _ = client.GetPrivateTopicByName("test/server")

	// Subscribe to the topic
	client.Sub(privTopic)

	// Send data to the topic
	privTopic.Pub(TestData{Testdata: "testdata"})

	// Unsubscribe from topic
	client.Unsub(privTopic)

	// Remove private topic
	client.RemovePrivateTopic(privTopic)

	// Remove client
	ssePubSub.RemoveClient(client)

    //----------------------------
    // PLANED FOR FUTURE VERSIONS
    //----------------------------

	// // Create a group
	// group := ssePubSub.NewGroup("testgroup")
	// group, _ = ssePubSub.GetGroupByName("testgroup")

	// // Add client to group
	// group.AddClient(client)

	// // Get group from client
	// group, _ = client.GetGroupByName("testgroup")

	// // Create a group topic
	// groupTopic := group.NewTopic("test/group")

	// // Get topic by name. 3 ways to get a group topic:
	// groupTopic, _ = group.GetTopicByName("test/group")
	// groupTopic, _ = client.GetTopicByName("test/group")
	// groupTopic, _ = client.GetGroupTopicByName("test/group")

	// // Subscribe to the topic
	// client.Sub(groupTopic)

	// // Send data to the topic
	// groupTopic.Pub(TestData{Testdata: "testdata"})

	// // Unsubscribe from topic
	// client.Unsub(groupTopic)

	// // Remove group topic
	// group.RemoveTopic(groupTopic)

	// // Remove client from group
	// group.RemoveClient(client)

	// // Remove group
	// client.RemoveGroup(group)

    //----------------------------
    // PLANED FOR FUTURE VERSIONS
    //----------------------------

	time.Sleep(500 * time.Second)
}
Code structure

Browser/Client side

Explanation of Data Received by the Browser Client via SSE:

The SSE (Server-Sent Events) mechanism in this service sends real-time updates to the browser client in JSON format. The structure of the data received by the client is divided into two main parts: 'sys' and 'updates'.

1. 'sys' (System Events):

  • This section provides metadata about the topics and the client's subscription status.
  • It contains arrays of topics categorized by their type: 'topics', 'subscribed', and 'unsubscribed'. a. 'topics': Lists all available topics (public, private, and group). b. 'subscribed': Event which indicates topics the client has recently subscribed to. c. 'unsubscribed': Event which indicates topics the client has recently unsubscribed from.
  • Each topic in these lists includes its 'name'.
  • The 'topics' list also includes the 'type' of each topic, which can be 'public', 'private', or 'group'.

2. 'updates' (Data Updates):

  • This part contains the actual data updates for the topics the client is subscribed to.
  • It is an array of objects, each representing an update for a specific topic.
  • Each update object includes: a. 'topic': The name of the topic being updated. b. 'data': The new data for the topic, encapsulated in a nested JSON object.

3. Note on Topics:

  • Topics are case sensitive and adhere to a naming convention that includes alphabets, numbers, and underscores.
  • Topics support hierarchical structuring using slashes ('/'), allowing nested subtopics.
  • Subscribing to a higher-level topic automatically subscribes the client to all its nested subtopics.

4. Note on Data Transmission:

  • Only changes are sent to the client to minimize data transfer.
  • When a topic is added or removed, the entire updated 'sys' list is sent.
  • Subscriptions and unsubscriptions are communicated through respective 'sys' lists.
  • Updates are sent only for those topics which have new data.

Examples of JSON messages received by the client:

1. Example: Empty:

{"sys": null, "updates": null}
  • Indicates no system updates or data updates are available at the moment.

2. Example: Data Update:

{
  "sys": null,
  "updates": [
    {
      "topic": "exampleTopic",
      "data": {"key": "value"}
    }
  ]
}
  • Demonstrates a scenario where there is a new data update for a subscribed topic.
  • The data field can contain any valid JSON object.

3. Example: Subscribing to a Topic

  • When a client subscribes to a topic, the system updates the 'sys.subscribed' list and sends it to the client.
  • Example JSON message upon subscription:
  {
    "sys": {
      "subscribed": [
        {"name": "exampleTopic", "type": "public"}
      ]
    },
    "updates": null
  }
  • This indicates the client has successfully subscribed to "exampleTopic".
  • If the client is subscribed to multiple topics, they are not shown in this message. This message only shows the topic the client has subscribed to most recently.

4. Example: Unsubscribing from a Topic

  • Upon unsubscribing from a topic, the system updates the 'sys.unsubscribed' list.
  • Example JSON message upon unsubscription:
  {
    "sys": {
      "unsubscribed": [
        {"name": "exampleTopic", "type": "public"}
      ]
    },
    "updates": null
  }
  • This indicates the client has unsubscribed from "exampleTopic".
  • This message shows the topic the client has unsubscribed from most recently.

5. Example: Creating a New Topic

  • When a new topic is created, the full list of topics is updated and sent to the client.
  • Example JSON message upon new topic creation:
  {
    "sys": {
      "topics": [
        {"name": "newTopic", "type": "public"},
        {"name": "existingTopic", "type": "private"}
        ... other existing topics
      ]
    },
    "updates": null
  }
  • This shows "newTopic" has been added to the list of available topics.

6. Example: Deleting a Topic

  • When a topic is deleted, the system updates the 'sys.topics' list excluding the deleted topic.
  • Additionally, if any clients were subscribed to the deleted topic, it will appear in their 'sys.unsubscribed' list.
  • Example JSON message upon topic deletion:
  {
    "sys": {
      "topics": [
        List of remaining topics after deletion
      ],
      "unsubscribed": [
        {"name": "deletedTopic", "type": "public"}
      ]
    },
    "updates": null
  }
  • This indicates "deletedTopic" has been removed, and clients subscribed to it are informed of the unsubscription.

These examples demonstrate the JSON structure the client receives in different scenarios related to topic management. The design ensures that clients are always informed about their subscription status and the availability of topics, enabling dynamic and responsive interactions with the SSEPubSubService.

Documentation

Index

Constants

View Source
const (
	Waiting status = iota
	Receving
)
View Source
const (
	TPublic  topicType = "public"
	TPrivate topicType = "private"
	TGroup   topicType = "group"
)

Variables

This section is empty.

Functions

func AddClient

func AddClient(s *SSEPubSubService, w http.ResponseWriter, r *http.Request)

AddClient handles HTTP requests for adding a new client.

func AddPrivateTopic

func AddPrivateTopic(s *SSEPubSubService, w http.ResponseWriter, r *http.Request)

AddPrivateTopic handles HTTP requests for adding a new private topic.

func AddPublicTopic

func AddPublicTopic(s *SSEPubSubService, w http.ResponseWriter, r *http.Request)

AddPublicTopic handles HTTP requests for adding a new public topic.

func Event

Event

func Subscribe

func Subscribe(s *SSEPubSubService, w http.ResponseWriter, r *http.Request)

Subscribe handles HTTP requests for client subscriptions.

func Unsubscribe

func Unsubscribe(s *SSEPubSubService, w http.ResponseWriter, r *http.Request)

Unsubscribe handles HTTP requests for client unsubscriptions.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a subscriber with a channel to send messages.

func (*Client) GetAllTopics

func (c *Client) GetAllTopics() map[string]*Topic

Get all topics

func (*Client) GetGroupByName

func (c *Client) GetGroupByName(name string) (*Group, bool)

Get group by name

func (*Client) GetGroups

func (c *Client) GetGroups() map[string]*Group

Get groups

func (*Client) GetID

func (c *Client) GetID() string

Get ID

func (*Client) GetPrivateTopicByName

func (c *Client) GetPrivateTopicByName(name string) (*Topic, bool)

Get private topic by name

func (*Client) GetPrivateTopics

func (c *Client) GetPrivateTopics() map[string]*Topic

Get private topics

func (*Client) GetPublicTopicByName

func (c *Client) GetPublicTopicByName(name string) (*Topic, bool)

Get public topic by name

func (*Client) GetPublicTopics

func (c *Client) GetPublicTopics() map[string]*Topic

Get public topics

func (*Client) GetStatus

func (c *Client) GetStatus() status

Get Status

func (*Client) GetSubscribedTopics

func (c *Client) GetSubscribedTopics() map[string]*Topic

Get subscribed topics

func (*Client) GetTopicByName

func (c *Client) GetTopicByName(name string) (*Topic, bool)

Get topic by name

func (*Client) NewPrivateTopic

func (c *Client) NewPrivateTopic(name string) *Topic

New private topic 0. Check if topic already exists, return it if it does 1. Create a new private topic 2. Add the topic to the client 3. Inform the client about the new topic

func (*Client) RemovePrivateTopic

func (c *Client) RemovePrivateTopic(t *Topic)

Remove private topic 0. Check if topic exists, return error if it does not 1. Unsubscribe from the topic 2. Remove the topic from the client 3. Inform the client about the removed topic by sending the new topic list

func (*Client) Start

func (c *Client) Start(ctx context.Context, onEvent OnEventFunc) error

Start the client 0. Check if client is already receiving 1. Set status to Receving and create stop channel 2. Send init message to client 3. Keep the connection open 4. Send message to client if new data is published over the stream 5. Stop the client if the stop channel is closed

func (*Client) Sub

func (c *Client) Sub(topic *Topic) error

Subscribe to a topic 1. If client can subscribe to this topic, add client to topic and return nil 2. Inform the client about the new topic by sending this topic as subscribed

func (*Client) Unsub

func (c *Client) Unsub(topic *Topic) error

Unsubscribe from a topic 1. If client is subscribed to this topic, remove client from topic and return nil 2. Inform the client about the new topic by sending this topic as unsubscribed

type Group

type Group struct {
	// contains filtered or unexported fields
}

Group is a collection of topics.

func (*Group) AddClient

func (g *Group) AddClient(c *Client)

AddClient adds a client to the group. 0. Check if client already exists in the group 1. Add client to the group 2. Add group to client

func (*Group) GetClientByID

func (g *Group) GetClientByID(id string) (*Client, bool)

Get client by ID

func (*Group) GetClients

func (g *Group) GetClients() map[string]*Client

GetClients returns a map of clients.

func (*Group) GetID

func (g *Group) GetID() string

GetID returns the ID of the group.

func (*Group) GetName

func (g *Group) GetName() string

GetName returns the name of the group.

func (*Group) GetTopicByName

func (g *Group) GetTopicByName(name string) (*Topic, bool)

Get topic by name

func (*Group) GetTopics

func (g *Group) GetTopics() map[string]*Topic

GetTopics returns a map of topics.

func (*Group) NewTopic

func (g *Group) NewTopic(name string) *Topic

AddTopic adds a topic to the group. 1. Check if topic already exists, return it if it does 2. Add the topic to the group 3. Inform all clients about the new topic

func (*Group) RemoveClient

func (g *Group) RemoveClient(c *Client)

RemoveClient removes a client from the group. 0. Check if client exists in the group 1. Unsubscribe client from all group topics 2. Remove client from the group 3. Remove group from client 4. Inform client about the removed topic

func (*Group) RemoveTopic

func (g *Group) RemoveTopic(t *Topic)

RemoveTopic removes a topic from the group. 0. Check if topic is a group topic 1. Check if topic exists in the group 2. Unsuscribe all clients from the topic 3. Remove topic from the group 4. Inform all clients about the removed topic

type OnEventFunc

type OnEventFunc func(string)

type SSEPubSubService

type SSEPubSubService struct {
	// contains filtered or unexported fields
}

SSEPubSubService represents the SSE publisher and subscriber system.

func NewSSEPubSubService

func NewSSEPubSubService() *SSEPubSubService

NewSSEPubSub creates a new sSEPubSubService instance.

func (*SSEPubSubService) GetClientByID

func (s *SSEPubSubService) GetClientByID(id string) (*Client, bool)

Get client by ID

func (*SSEPubSubService) GetClients

func (s *SSEPubSubService) GetClients() map[string]*Client

Get clients

func (*SSEPubSubService) GetGroupByName

func (s *SSEPubSubService) GetGroupByName(name string) (*Group, bool)

Get group by name

func (*SSEPubSubService) GetGroups

func (s *SSEPubSubService) GetGroups() map[string]*Group

Get groups

func (*SSEPubSubService) GetPublicTopicByName

func (s *SSEPubSubService) GetPublicTopicByName(name string) (*Topic, bool)

Get public topic by name

func (*SSEPubSubService) GetPublicTopics

func (s *SSEPubSubService) GetPublicTopics() map[string]*Topic

Get public topics

func (*SSEPubSubService) NewClient

func (s *SSEPubSubService) NewClient() *Client

Create new client

func (*SSEPubSubService) NewGroup

func (s *SSEPubSubService) NewGroup(name string) *Group

Add Group 0. Check if group already exists, return it if it does 1. Create a new group 2. Add the group to the sSEPubSubService

func (*SSEPubSubService) NewPublicTopic

func (s *SSEPubSubService) NewPublicTopic(name string) *Topic

Create new public topic 0. Check if topic already exists, return it if it does 1. Create a new public topic 2. Add the topic to the sSEPubSubService 3. Inform all clients about the new topic

func (*SSEPubSubService) OnNewClient

func (s *SSEPubSubService) OnNewClient(f funcClient) string

Event: When client is created

func (*SSEPubSubService) RemoveClient

func (s *SSEPubSubService) RemoveClient(c *Client)

Remove client 1. Unsubscribe from all topics 2. Remove all private topics 3. Stop the client 4. Remove client from sSEPubSubService

func (*SSEPubSubService) RemoveGroup

func (s *SSEPubSubService) RemoveGroup(g *Group)

Remove group 0. Check if group exists in sSEPubSubService 1. Remove all topics from the group 2. Remove all clients from the group 3. Remove group from sSEPubSubService

func (*SSEPubSubService) RemoveOnNewClient

func (s *SSEPubSubService) RemoveOnNewClient(id string)

Remove Event: When client is created

func (*SSEPubSubService) RemovePublicTopic

func (s *SSEPubSubService) RemovePublicTopic(t *Topic)

Remove public topic 0. Check if topic is public 1. Unsubscribe all clients from the topic 2. Check if topic exists in sSEPubSubService 3. Remove topic from sSEPubSubService 4. Inform all clients about the removed topic by sending the new topic list

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic represents a messaging Topic in the SSE pub-sub system.

func (*Topic) GetClients

func (t *Topic) GetClients() map[string]*Client

Get all clients in the topic

func (*Topic) GetID

func (t *Topic) GetID() string

Get ID

func (*Topic) GetName

func (t *Topic) GetName() string

Get Name

func (*Topic) GetType

func (t *Topic) GetType() string

Get Type

func (*Topic) IsSubscribed

func (t *Topic) IsSubscribed(c *Client) bool

Check if a client is subscribed to the topic

func (*Topic) Pub

func (t *Topic) Pub(msg interface{}) error

Publish a message to all clients in the topic

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL