event

package
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2020 License: GPL-3.0, GPL-3.0 Imports: 8 Imported by: 0

Documentation

Overview

Package event deals with subscriptions to real-time events.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrMuxClosed = errors.New("event: mux closed")

ErrMuxClosed is returned when Posting on a closed TypeMux.

Functions

This section is empty.

Types

type Feed

type Feed struct {
	// contains filtered or unexported fields
}

Feed implements one-to-many subscriptions where the carrier of events is a channel. Values sent to a Feed are delivered to all subscribed channels simultaneously. Feed 实现了 1对多的订阅模式,使用了channel来传递事件。 发送给Feed的值会同时被传递给所有订阅的channel。

Feeds can only be used with a single type. The type is determined by the first Send or Subscribe operation. Subsequent calls to these methods panic if the type does not match. Feed只能被单个类型使用。这个和之前的event不同,event可以使用多个类型。 该类型被第一个调用Send或者调用Subscribe的决定。 后续的调用如果类型和其不一致会panic

The zero value is ready to use.

Example (AcknowledgedEvents)
package main

import (
	"fmt"

	"github.com/palletone/go-palletone/common/event"
)

func main() {
	// This example shows how the return value of Send can be used for request/reply
	// interaction between event consumers and producers.
	var feed event.Feed
	type ackedEvent struct {
		i   int
		ack chan<- struct{}
	}

	// Consumers wait for events on the feed and acknowledge processing.
	done := make(chan struct{})
	defer close(done)
	for i := 0; i < 3; i++ {
		ch := make(chan ackedEvent, 100)
		sub := feed.Subscribe(ch)
		go func() {
			defer sub.Unsubscribe()
			for {
				select {
				case ev := <-ch:
					fmt.Println(ev.i) // "process" the event
					ev.ack <- struct{}{}
				case <-done:
					return
				}
			}
		}()
	}

	// The producer sends values of type ackedEvent with increasing values of i.
	// It waits for all consumers to acknowledge before sending the next event.
	for i := 0; i < 3; i++ {
		acksignal := make(chan struct{})
		n := feed.Send(ackedEvent{i, acksignal})
		for ack := 0; ack < n; ack++ {
			<-acksignal
		}
	}
}
Output:

0
0
0
1
1
1
2
2
2

func (*Feed) Send

func (f *Feed) Send(value interface{}) (nsent int)

Send delivers to all subscribed channels simultaneously. It returns the number of subscribers that the value was sent to. Send方法不是遍历所有的channel然后阻塞方式的发送。这样可能导致慢的客户端影响快的客户端。而是使用反射的方式使用SelectCase。 首先调用非阻塞方式的TrySend来尝试发送。这样如果没有慢的客户端。数据会直接全部发送完成。 如果TrySend部分客户端失败。 那么后续在循环Select的方式发送。 这也是feed会取代event的原因。

func (*Feed) Subscribe

func (f *Feed) Subscribe(channel interface{}) Subscription

Subscribe adds a channel to the feed. Future sends will be delivered on the channel until the subscription is canceled. All channels added must have the same element type. 向Feed添加channel. 相对与event的不同, event的订阅是传入了需要订阅的类型,然后channel是在event的订阅代码里面构建然后返回的。 这种直接投递channel的模式可能会更加灵活。然后根据传入的channel生成了SelectCase。放入inbox。

The channel should have ample buffer space to avoid blocking other subscribers. Slow subscribers are not dropped. 该通道应具有足够的缓冲空间,以避免阻塞其他订户。

type ResubscribeFunc

type ResubscribeFunc func(context.Context) (Subscription, error)

A ResubscribeFunc attempts to establish a subscription.

type Subscription

type Subscription interface {
	Err() <-chan error // returns the error channel
	Unsubscribe()      // cancels sending of events, closing the error channel
}

Subscription represents a stream of events. The carrier of the events is typically a channel, but isn't part of the interface.

Subscriptions can fail while established. Failures are reported through an error channel. It receives a value if there is an issue with the subscription (e.g. the network connection delivering the events has been closed). Only one value will ever be sent.

The error channel is closed when the subscription ends successfully (i.e. when the source of events is closed). It is also closed when Unsubscribe is called.

The Unsubscribe method cancels the sending of events. You must call Unsubscribe in all cases to ensure that resources related to the subscription are released. It can be called any number of times.

func NewSubscription

func NewSubscription(producer func(<-chan struct{}) error) Subscription

NewSubscription runs a producer function as a subscription in a new goroutine. The channel given to the producer is closed when Unsubscribe is called. If fn returns an error, it is sent on the subscription's error channel.

Example
package main

import (
	"fmt"

	"github.com/palletone/go-palletone/common/event"
)

func main() {
	// Create a subscription that sends 10 integers on ch.
	ch := make(chan int)
	sub := event.NewSubscription(func(quit <-chan struct{}) error {
		for i := 0; i < 10; i++ {
			select {
			case ch <- i:
			case <-quit:
				fmt.Println("unsubscribed")
				return nil
			}
		}
		return nil
	})

	// This is the consumer. It reads 5 integers, then aborts the subscription.
	// Note that Unsubscribe waits until the producer has shut down.
	for i := range ch {
		fmt.Println(i)
		if i == 4 {
			sub.Unsubscribe()
			break
		}
	}
}
Output:

0
1
2
3
4
unsubscribed

func Resubscribe

func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription

Resubscribe calls fn repeatedly to keep a subscription established. When the subscription is established, Resubscribe waits for it to fail and calls fn again. This process repeats until Unsubscribe is called or the active subscription ends successfully.

Resubscribe applies backoff between calls to fn. The time between calls is adapted based on the error rate, but will never exceed backoffMax.

type SubscriptionScope

type SubscriptionScope struct {
	// contains filtered or unexported fields
}

SubscriptionScope provides a facility to unsubscribe multiple subscriptions at once.

For code that handle more than one subscription, a scope can be used to conveniently unsubscribe all of them with a single call. The example demonstrates a typical use in a larger program.

The zero value is ready to use.

Example
package main

import (
	"fmt"
	"sync"

	"github.com/palletone/go-palletone/common/event"
)

// This example demonstrates how SubscriptionScope can be used to control the lifetime of
// subscriptions.
//
// Our example program consists of two servers, each of which performs a calculation when
// requested. The servers also allow subscribing to results of all computations.
type divServer struct{ results event.Feed }
type mulServer struct{ results event.Feed }

func (s *divServer) do(a, b int) int {
	r := a / b
	s.results.Send(r)
	return r
}

func (s *mulServer) do(a, b int) int {
	r := a * b
	s.results.Send(r)
	return r
}

// The servers are contained in an App. The app controls the servers and exposes them
// through its API.
type App struct {
	divServer
	mulServer
	scope event.SubscriptionScope
}

func (s *App) Calc(op byte, a, b int) int {
	switch op {
	case '/':
		return s.divServer.do(a, b)
	case '*':
		return s.mulServer.do(a, b)
	default:
		panic("invalid op")
	}
}

// The app's SubscribeResults method starts sending calculation results to the given
// channel. Subscriptions created through this method are tied to the lifetime of the App
// because they are registered in the scope.
func (s *App) SubscribeResults(op byte, ch chan<- int) event.Subscription {
	switch op {
	case '/':
		return s.scope.Track(s.divServer.results.Subscribe(ch))
	case '*':
		return s.scope.Track(s.mulServer.results.Subscribe(ch))
	default:
		panic("invalid op")
	}
}

// Stop stops the App, closing all subscriptions created through SubscribeResults.
func (s *App) Stop() {
	s.scope.Close()
}

func main() {
	// Create the app.
	var (
		app  App
		wg   sync.WaitGroup
		divs = make(chan int)
		muls = make(chan int)
	)

	// Run a subscriber in the background.
	divsub := app.SubscribeResults('/', divs)
	mulsub := app.SubscribeResults('*', muls)
	wg.Add(1)
	go func() {
		defer wg.Done()
		defer fmt.Println("subscriber exited")
		defer divsub.Unsubscribe()
		defer mulsub.Unsubscribe()
		for {
			select {
			case result := <-divs:
				fmt.Println("division happened:", result)
			case result := <-muls:
				fmt.Println("multiplication happened:", result)
			case <-divsub.Err():
				return
			case <-mulsub.Err():
				return
			}
		}
	}()

	// Interact with the app.
	app.Calc('/', 22, 11)
	app.Calc('*', 3, 4)

	// Stop the app. This shuts down the subscriptions, causing the subscriber to exit.
	app.Stop()
	wg.Wait()

}
Output:

division happened: 2
multiplication happened: 12
subscriber exited

func (*SubscriptionScope) Close

func (sc *SubscriptionScope) Close()

Close calls Unsubscribe on all tracked subscriptions and prevents further additions to the tracked set. Calls to Track after Close return nil.

func (*SubscriptionScope) Count

func (sc *SubscriptionScope) Count() int

Count returns the number of tracked subscriptions. It is meant to be used for debugging.

func (*SubscriptionScope) Track

Track starts tracking a subscription. If the scope is closed, Track returns nil. The returned subscription is a wrapper. Unsubscribing the wrapper removes it from the scope.

type TypeMux deprecated

type TypeMux struct {
	// contains filtered or unexported fields
}

A TypeMux dispatches events to registered receivers. Receivers can be registered to handle events of certain type. Any operation called after mux is stopped will return ErrMuxClosed.

The zero value is ready to use.

Deprecated: use Feed 已弃用,请使用Feed

Example
type someEvent struct{ I int }
type otherEvent struct{ S string }
type yetAnotherEvent struct{ X, Y int }

var mux TypeMux

// Start a subscriber.
done := make(chan struct{})
sub := mux.Subscribe(someEvent{}, otherEvent{})
go func() {
	for event := range sub.Chan() {
		fmt.Printf("Received: %#v\n", event.Data)
	}
	fmt.Println("done")
	close(done)
}()

// Post some events.
mux.Post(someEvent{5})
mux.Post(yetAnotherEvent{X: 3, Y: 4})
mux.Post(someEvent{6})
mux.Post(otherEvent{"whoa"})

// Stop closes all subscription channels.
// The subscriber goroutine will print "done"
// and exit.
mux.Stop()

// Wait for subscriber to return.
<-done
Output:

Received: event.someEvent{I:5}
Received: event.someEvent{I:6}
Received: event.otherEvent{S:"whoa"}
done

func (*TypeMux) Post

func (mux *TypeMux) Post(ev interface{}) error

Post sends an event to all receivers registered for the given type. It returns ErrMuxClosed if the mux has been stopped. 发送一个 event 给TypeMux,所有订阅了这个event类型的订阅者都会收到

func (*TypeMux) Stop

func (mux *TypeMux) Stop()

Stop closes a mux. The mux can no longer be used. Future Post calls will fail with ErrMuxClosed. Stop blocks until all current deliveries have finished.

func (*TypeMux) Subscribe

func (mux *TypeMux) Subscribe(types ...interface{}) *TypeMuxSubscription

Subscribe creates a subscription for events of the given types. The subscription's channel is closed when it is unsubscribed or the mux is closed. 创建一个订阅, 可以同时订阅多种类型

type TypeMuxEvent

type TypeMuxEvent struct {
	Time time.Time
	Data interface{}
}

TypeMuxEvent is a time-tagged notification pushed to subscribers.

type TypeMuxSubscription

type TypeMuxSubscription struct {
	// contains filtered or unexported fields
}

TypeMuxSubscription is a subscription established through TypeMux.

func (*TypeMuxSubscription) Chan

func (s *TypeMuxSubscription) Chan() <-chan *TypeMuxEvent

func (*TypeMuxSubscription) Unsubscribe

func (s *TypeMuxSubscription) Unsubscribe()

Directories

Path Synopsis
Package filter implements event filters.
Package filter implements event filters.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL