multicast

package module
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2022 License: MIT Imports: 1 Imported by: 12

README

Multicast Build Status GoDoc

Multi-subscriber channels for Golang

The multicast module provides single-writer, multiple-reader semantics around Go channels. It attempts to maintain semantics similar to those offered by standard Go channels while guaranteeing parallel delivery (slow consumers won't hold up delivery to other listeners) and guaranteeing delivery to all registered listeners when a message is published.

Features

  • Simple API if you know how to use channels then you'll be able to use multicast without learning anything special.
  • Similar Semantics to core channels mean that your approach to reasoning around channels doesn't need to change.
  • Low Overheads with linear memory growth as your number of listeners increases and no buffer overheads.
  • Generics Support when using the v2 library, allowing you to statically validate the types of messages you're sending and receiving.

Example

import (
    "fmt"

    "github.com/SierraSoftworks/multicast/v2"
)

func main() {
    c := multicast.New[string]()

	go func() {
		l := c.Listen()
		for msg := range l.C {
			fmt.Printf("Listener 1: %s\n", msg)
		}
        fmt.Println("Listener 1 Closed")
	}()

	go func() {
		l := c.Listen()
		for msg := range l.C {
			fmt.Printf("Listener 2: %s\n", msg)
		}
        fmt.Println("Listener 2 Closed")
	}()

	c.C <- "Hello World!"
	c.Close()
}

Architecture

Multicast implements its channels as a linked list of listeners which automatically forward messages they receive to the next listener before exposing them on their C channel.

This approach removes the need to maintain an array of listeners belonging to a channel and greatly simplifies the implementation.

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Channel

type Channel[T any] struct {
	// C is a writable channel on which you can send messages which will
	// be delivered to all connected listeners.
	C chan<- T
	// contains filtered or unexported fields
}

Channel represents a multicast channel container which provides a writable channel C and allows multiple listeners to be connected.

func From

func From[T any](c chan T) *Channel[T]

From creates a new multicast channel which exposes messages it receives on the provided channel to all connected listeners.

Example
source := make(chan interface{})
c := multicast.From(source)
wg := sync.WaitGroup{}
wg.Add(2)

go func() {
	l := c.Listen()
	wg.Done()
	defer wg.Done()
	for msg := range l.C {
		fmt.Printf("Listener 1: %s\n", msg)
	}
}()

go func() {
	l := c.Listen()
	wg.Done()
	defer wg.Done()
	for msg := range l.C {
		fmt.Printf("Listener 2: %s\n", msg)
	}
}()

wg.Wait()
wg.Add(2)
source <- "Hello World!"
close(source)
wg.Wait()
Output:

Listener 1: Hello World!
Listener 2: Hello World!

func New

func New[T any]() *Channel[T]

New creates a new multicast channel container which can have listeners connected to it and messages sent via its C channel property.

Example
c := multicast.New[string]()
wg := sync.WaitGroup{}
wg.Add(2)

go func() {
	l := c.Listen()
	wg.Done()
	defer wg.Done()
	for msg := range l.C {
		fmt.Printf("Listener 1: %s\n", msg)
	}
}()

go func() {
	l := c.Listen()
	wg.Done()
	defer wg.Done()
	for msg := range l.C {
		fmt.Printf("Listener 2: %s\n", msg)
	}
}()

wg.Wait()
wg.Add(2)
c.C <- "Hello World!"
c.Close()
wg.Wait()
Output:

Listener 1: Hello World!
Listener 2: Hello World!

func (*Channel[T]) Close

func (c *Channel[T]) Close()

Close is a convenience function for closing the top level channel. You may also close the channel directly by using `close(c.C)`.

Example
c := multicast.New[struct{}]()
wg := sync.WaitGroup{}
wg.Add(1)

go func() {
	l := c.Listen()
	for range l.C {
	}
	fmt.Println("Listener closed")
	wg.Done()
}()

c.Close()
wg.Wait()
Output:

Listener closed

func (*Channel[T]) Listen

func (c *Channel[T]) Listen() *Listener[T]

Listen returns a new listener instance attached to this channel. Each listener will receive a single instance of each message sent to the channel.

type Listener

type Listener[T any] struct {
	C <-chan T
	// contains filtered or unexported fields
}

Listener represents a listener which will receive messages from a channel.

Example
m := multicast.New[string]()
l := m.Listen()

wg := sync.WaitGroup{}
wg.Add(1)

go func() {
	for msg := range l.C {
		fmt.Printf("Listener got: %#v\n", msg)
	}
	wg.Done()
}()

m.C <- "Hello!"
m.Close()
wg.Wait()
Output:

Listener got: "Hello!"

func NewListener

func NewListener[T any](source <-chan T) *Listener[T]

NewListener creates a new listener which will forward messages it receives on its f channel before exposing them on its C channel. You will very rarely need to use this method directly in your applications, prefer using From instead.

Example
s := make(chan interface{})

wg := sync.WaitGroup{}
wg.Add(1)

go func() {
	l := multicast.NewListener(s)
	fmt.Printf("Listener got: %s\n", <-l.C)
	wg.Done()
}()

s <- "Hello World!"
close(s)
wg.Wait()
Output:

Listener got: Hello World!

func (*Listener[T]) Chain

func (l *Listener[T]) Chain() *Listener[T]

Chain is a shortcut which updates an existing listener to forward to a new listener and then returns the new listener. You will generally not need to make use of this method in your applications.

Example
s := make(chan interface{})

wg := sync.WaitGroup{}
wg.Add(1)

l1 := multicast.NewListener(s)
go func() {
	fmt.Printf("Listener 1: %s\n", <-l1.C)
	wg.Done()
}()

wg.Add(1)

l2 := l1.Chain()
go func() {
	fmt.Printf("Listener 2: %s\n", <-l2.C)
	wg.Done()
}()

s <- "Hello World!"
close(s)

wg.Wait()
Output:

Listener 1: Hello World!
Listener 2: Hello World!

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL