multicast

package module
v0.0.0-...-c029c77 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2019 License: MIT Imports: 1 Imported by: 1

README

Multicast Build Status GoDoc

Multi-subscriber channels for Golang

The multicast module provides single-writer, multiple-reader semantics around Go channels. It attempts to maintain semantics similar to those offered by standard Go channels while guaranteeing parallel delivery (slow consumers won't hold up delivery to other listeners) and guaranteeing delivery to all registered listeners when a message is published.

Features

  • Simple API if you know how to use channels then you'll be able to use multicast without learning anything special.
  • Similar Semantics to core channels mean that your approach to reasoning around channels doesn't need to change.
  • Low Overheads with linear memory growth as your number of listeners increases and no buffer overheads.

Example

import (
    "fmt"

    "github.com/SierraSoftworks/multicast"
)

func main() {
    c := multicast.New()

	go func() {
		l := c.Listen()
		for msg := range l.C {
			fmt.Printf("Listener 1: %s\n", msg)
		}
        fmt.Println("Listener 1 Closed")
	}()

	go func() {
		l := c.Listen()
		for msg := range l.C {
			fmt.Printf("Listener 2: %s\n", msg)
		}
        fmt.Println("Listener 2 Closed")
	}()

	c.C <- "Hello World!"
	c.Close()
}

Architecture

Multicast implements its channels as a linked list of listeners which automatically forward messages they receive to the next listener before exposing them on their C channel.

This approach removes the need to maintain an array of listeners belonging to a channel and greatly simplifies the implementation.

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Channel

type Channel struct {
	// C is a writable channel on which you can send messages which will
	// be delivered to all connected listeners.
	C chan<- interface{}
	// contains filtered or unexported fields
}

Channel represents a multicast channel container which provides a writable channel C and allows multiple listeners to be connected.

func From

func From(c chan interface{}) *Channel

From creates a new multicast channel which exposes messages it receives on the provided channel to all connected listeners.

Example
source := make(chan interface{})
c := multicast.From(source)
wg := sync.WaitGroup{}
wg.Add(2)

go func() {
	l := c.Listen()
	wg.Done()
	defer wg.Done()
	for msg := range l.C {
		fmt.Printf("Listener 1: %s\n", msg)
	}
}()

go func() {
	l := c.Listen()
	wg.Done()
	defer wg.Done()
	for msg := range l.C {
		fmt.Printf("Listener 2: %s\n", msg)
	}
}()

wg.Wait()
wg.Add(2)
source <- "Hello World!"
close(source)
wg.Wait()
Output:

Listener 1: Hello World!
Listener 2: Hello World!

func New

func New() *Channel

New creates a new multicast channel container which can have listeners connected to it and messages sent via its C channel property.

Example
c := multicast.New()
wg := sync.WaitGroup{}
wg.Add(2)

go func() {
	l := c.Listen()
	wg.Done()
	defer wg.Done()
	for msg := range l.C {
		fmt.Printf("Listener 1: %s\n", msg)
	}
}()

go func() {
	l := c.Listen()
	wg.Done()
	defer wg.Done()
	for msg := range l.C {
		fmt.Printf("Listener 2: %s\n", msg)
	}
}()

wg.Wait()
wg.Add(2)
c.C <- "Hello World!"
c.Close()
wg.Wait()
Output:

Listener 1: Hello World!
Listener 2: Hello World!

func (*Channel) Close

func (c *Channel) Close()

Close is a convenience function for closing the top level channel. You may also close the channel directly by using `close(c.C)`.

Example
c := multicast.New()
wg := sync.WaitGroup{}
wg.Add(1)

go func() {
	l := c.Listen()
	for range l.C {
	}
	fmt.Println("Listener closed")
	wg.Done()
}()

c.Close()
wg.Wait()
Output:

Listener closed

func (*Channel) Listen

func (c *Channel) Listen() *Listener

Listen returns a new listener instance attached to this channel. Each listener will receive a single instance of each message sent to the channel.

type Listener

type Listener struct {
	C <-chan interface{}
	// contains filtered or unexported fields
}

Listener represents a listener which will receive messages from a channel.

Example
m := multicast.New()
l := m.Listen()

wg := sync.WaitGroup{}
wg.Add(1)

go func() {
	for msg := range l.C {
		fmt.Printf("Listener got: %#v\n", msg)
	}
	wg.Done()
}()

m.C <- "Hello!"
m.Close()
wg.Wait()
Output:

Listener got: "Hello!"

func NewListener

func NewListener(source <-chan interface{}) *Listener

NewListener creates a new listener which will forward messages it receives on its f channel before exposing them on its C channel. You will very rarely need to use this method directly in your applications, prefer using From instead.

Example
s := make(chan interface{})

wg := sync.WaitGroup{}
wg.Add(1)

go func() {
	l := multicast.NewListener(s)
	fmt.Printf("Listener got: %s\n", <-l.C)
	wg.Done()
}()

s <- "Hello World!"
close(s)
wg.Wait()
Output:

Listener got: Hello World!

func (*Listener) Chain

func (l *Listener) Chain() *Listener

Chain is a shortcut which updates an existing listener to forward to a new listener and then returns the new listener. You will generally not need to make use of this method in your applications.

Example
s := make(chan interface{})

wg := sync.WaitGroup{}
wg.Add(1)

l1 := multicast.NewListener(s)
go func() {
	fmt.Printf("Listener 1: %s\n", <-l1.C)
	wg.Done()
}()

wg.Add(1)

l2 := l1.Chain()
go func() {
	fmt.Printf("Listener 2: %s\n", <-l2.C)
	wg.Done()
}()

s <- "Hello World!"
close(s)

wg.Wait()
Output:

Listener 1: Hello World!
Listener 2: Hello World!

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL