contentpubsub

package module
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2021 License: MIT Imports: 25 Imported by: 0

README

ScoutSubs-FastDelivery

For more context about this system check here.

PubSub Interface

To use this pubsub middleware, you can use the following functions ...

// Geographical region of the user
region := "PT"
// number of nodes he may support in FastDelivery
capacity := 100 
// Create a pubsub instance using a kad-dht instance
pubsub := NewPubSub(kad-dht, DefaultConfig(region, capacity)) 

// Make a subscription (ScoutSubs)
pubsub.MySubscribe("waves T/height R 1 2/period R 10 12")

// Publish a event (ScoutSubs)
pubsub.MyPublish("Waves are pumping at Ericeira, sets of 2 m with a nice 12 second period",
 "waves T/Ericeira T/height R 2 2/period R 12 12")

// Search and Subscribe to multicastGroups of interest (FastDelivery)
pubsub.MySearchAndPremiumSub("waves T/height R 1 2")
// Subscribe to a known premium publisher (FastDelivery)
publisherAddress := "localhost:8888"
groupPredicate := "waves T/height R 0 5"
pubsub.MyPremiumSubscribe("waves T/height R 1 2", publisherAddress, groupPredicate)

// Create and advertise a multicast group and publish a event on it (FastDelivery)
pubsub.CreateMulticastGroup("portugal T")
pubsub.MyPremiumPublish("portugal T",
 "Portugal is one of the european contries with more hours of sun per year", "portugal T")

Instructions to run and test this codebase

  • Before running the tests, switch the flag TestgroundReady to false at the utils.go file
  • The tests should be run one by one because of port and peer-ID assigning
  • Most of the tests need to be confirmed by analyzing the output

Latest versions of each variant prepared for testground

Active
  • v0.0.18 >> Base-Unreliable
  • v0.1.22 >> Redirect-Unreliable
  • v0.4.17 >> Base-Rv-Reliable
  • v0.5.22 >> Redirect-Rv-Reliable and latest FastDelivery
Abandoned
  • v0.2.13 >> Redirect-Reliable
  • v0.3.9 >> Base-Reliable
Auxiliar
  • v0.13s >> Used for Debugging

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AckUp

type AckUp struct {
	// contains filtered or unexported fields
}

type Attribute

type Attribute struct {
	// contains filtered or unexported fields
}

AttributeType can be Topic or Range Topic >> name: oil Range >> name: price, rangeQuery: [120,140]

func (*Attribute) String

func (a *Attribute) String() string

type AttributeType

type AttributeType int
const (
	Topic AttributeType = iota
	Range
)

type ByCapacity

type ByCapacity []*SubData

func (ByCapacity) Len

func (a ByCapacity) Len() int

func (ByCapacity) Less

func (a ByCapacity) Less(i, j int) bool

func (ByCapacity) Swap

func (a ByCapacity) Swap(i, j int)

type EventLedger

type EventLedger struct {
	// contains filtered or unexported fields
}

EventLedger keeps track of all acknowledge received for a specific event

func NewEventLedger

func NewEventLedger(eID string, log map[string]bool, addr string, e *pb.Event, dest string) *EventLedger

type EventRecord

type EventRecord struct {
	// contains filtered or unexported fields
}

type FilterTable

type FilterTable struct {
	// contains filtered or unexported fields
}

FilterTable keeps filter information of all peers by keeping its peers' routeStats and redirect support

func NewFilterTable

func NewFilterTable(dht *dht.IpfsDHT, addrOption bool) *FilterTable

func (*FilterTable) PrintFilterTable

func (ft *FilterTable) PrintFilterTable()

type ForwardAdvert

type ForwardAdvert struct {
	// contains filtered or unexported fields
}

type ForwardEvent

type ForwardEvent struct {
	// contains filtered or unexported fields
}

type ForwardSubRequest

type ForwardSubRequest struct {
	// contains filtered or unexported fields
}

type HelperTracker

type HelperTracker struct {
	// contains filtered or unexported fields
}

type HistoryRecord

type HistoryRecord struct {
	// contains filtered or unexported fields
}

func NewHistoryRecord

func NewHistoryRecord() *HistoryRecord

func (*HistoryRecord) CorrectnessStats

func (r *HistoryRecord) CorrectnessStats(expected []string) (int, int)

CompileCorrectnessResults returns the number of events missing or received more than once, by comparing with a array of supposed received events

func (*HistoryRecord) EventStats

func (r *HistoryRecord) EventStats() []int

EventStats returns all events time of travel

func (*HistoryRecord) SaveReceivedEvent

func (r *HistoryRecord) SaveReceivedEvent(eScource string, eBirth string, eData string)

SaveReceivedEvent register the time a event took until it reached the subscriber

func (*HistoryRecord) SaveTimeToSub

func (r *HistoryRecord) SaveTimeToSub(start string)

SaveTimeToSub register the time it took to confirm a subscription

func (*HistoryRecord) SubStats added in v0.13.75

func (r *HistoryRecord) SubStats() []int

SubStats returns all subscriptions time to completion and deletes the saved values

type MulticastGroup

type MulticastGroup struct {
	// contains filtered or unexported fields
}

MulticastGroup contains all suport metadata for a Premium publisher proper functioning

func NewMulticastGroup

func NewMulticastGroup(p *Predicate, addr string, maxSubReg int, powerSubs int) *MulticastGroup

func (*MulticastGroup) AddSubToGroup

func (mg *MulticastGroup) AddSubToGroup(addr string, cap int, region string, pred *Predicate) error

AddSubToGroup is used to add a sub to that multicastGroup. This function analyzes the current state of the Group and decides if the publisher should simply add the sub to his infrastructure, recruit a sub to help him and delegate the sub and others to that new helper or simply delegate the sub to a node that is already helping him and still can receive more. To utilize helpers properly the subs are oredered in geografical regions to minimize latency between helper nodes and its delegated nodes

func (*MulticastGroup) AddSubToHelper

func (mg *MulticastGroup) AddSubToHelper(sub *SubData, addr string) error

AddSubToHelper delegates a sub to an existing helper of the group

func (*MulticastGroup) AddToRangeTrees

func (mg *MulticastGroup) AddToRangeTrees(sub *SubData)

AddToRangeTrees simply adds the subs to the multicast fetching-structure so that when publishing a event interested nodes may be efficiently recovered

func (*MulticastGroup) AddrsToPublishEvent

func (mg *MulticastGroup) AddrsToPublishEvent(p *Predicate) []*SubData

AddrsToPublishEvent returns all the subs within the publisher responsability that are interested in a certain event

func (*MulticastGroup) RecruitHelper

func (mg *MulticastGroup) RecruitHelper(helper *SubData, subs []*SubData) error

RecruitHelper requests a helper to provide support to some subs of the group

func (*MulticastGroup) RemoveFromRangeTrees

func (mg *MulticastGroup) RemoveFromRangeTrees(sub *SubData)

RemoveFromRangeTrees removes subs from the multicast fetching-structure

func (*MulticastGroup) RemoveSubFromGroup

func (mg *MulticastGroup) RemoveSubFromGroup(sub *pb.PremiumSubscription) error

RemoveSubFromGroup removes a sub from the multicastGroup whether it is delegated to a helper, a helper or a node of its responsability. In the helper case the subs delegated to him must be supported by the publisher, a new helper or a existing one. On the case of a sub unsubscribing that was delegated to a helper here the publisher removes it from its Group data and informs the helper for him to also remove that sub

func (*MulticastGroup) RemoveSubFromList

func (mg *MulticastGroup) RemoveSubFromList(sub *SubData)

RemoveSubFromList removes subs from the fetching list

func (*MulticastGroup) StopDelegating

func (mg *MulticastGroup) StopDelegating(tracker *HelperTracker, add bool)

StopDelegating erases data-structure of a unsubscribed/failed helper

type Node

type Node struct {
	// contains filtered or unexported fields
}

func NewNode

func NewNode(upperLimit int, lowerLimit int) *Node

func (*Node) DeleteSubFromNode

func (n *Node) DeleteSubFromNode(upper int, lower int, sub *SubData)

DeleteSubsFromNode recursively called to delete a node throught a tree

func (*Node) GetSubsOfEvent

func (n *Node) GetSubsOfEvent(value int) []*SubData

GetSubsOfEvent returns all subs interested in a certain specific value by calling recursivelly the function

func (*Node) InsertSub

func (n *Node) InsertSub(upper int, lower int, sub *SubData)

InsertSub inserts a sub to all the nodes that he should be at, by recursively place it throught the tree

type Predicate

type Predicate struct {
	// contains filtered or unexported fields
}

Predicate is expression that categorizes an event or subscription, composed of one or more attributes

func NewPredicate

func NewPredicate(rawPredicate string, maxAttr int) (*Predicate, error)

NewPredicate creates a predicate. Example of a rawPredicate: "laptop T/RAM R 16 32/price R 0 1000"

func (*Predicate) Equal

func (p *Predicate) Equal(pred *Predicate) bool

func (*Predicate) SimpleAdvMatch

func (p *Predicate) SimpleAdvMatch(pAdv *Predicate) bool

SimpleAdvMatch is used to check if a Premium Publisher advertisement is of the interest of a Premium Subscriber

func (*Predicate) SimplePredicateMatch

func (p *Predicate) SimplePredicateMatch(pEvent *Predicate) bool

SimplePredicateMatch evaluates if an event predicate matches a sub predicate or also to know if a predicate encompasses other Special Note >> events range is seen as a single value, this means that a event will have equal values of the range attribute (Ex:"price R 15 15")

func (*Predicate) String

func (p *Predicate) String() string

func (*Predicate) ToString

func (p *Predicate) ToString() string

ToString returns the input representation of a predicate

func (*Predicate) TryMergePredicates

func (p *Predicate) TryMergePredicates(pOther *Predicate) (bool, *Predicate)

TryMergePredicates is used in FilterSummarizing to attempt merging two different predicates. If the result is false it means they are exclusive, otherwise it will return the merge of both predicates

type PubEventState

type PubEventState struct {
	// contains filtered or unexported fields
}

type PubSub

type PubSub struct {
	pb.UnimplementedScoutHubServer
	// contains filtered or unexported fields
}

PubSub supports all the middleware logic

func NewPubSub

func NewPubSub(dht *kaddht.IpfsDHT, cfg *SetupPubSub) *PubSub

func (*PubSub) AckOp

func (ps *PubSub) AckOp(ctx context.Context, ack *pb.Ack) (*pb.Ack, error)

AckOp receives confirmation of a Operation and stops its resending from happening

func (*PubSub) AckToTracker

func (ps *PubSub) AckToTracker(ctx context.Context, ack *pb.EventAck) (*pb.Ack, error)

AckToTracker is the remote call the Rv node uses to communicate received event acknowledges to the tracker

func (*PubSub) AckUp

func (ps *PubSub) AckUp(ctx context.Context, ack *pb.EventAck) (*pb.Ack, error)

AckUp processes an event ackknowledge and if it was the last missing ack returns its own acknowledge upstream

func (*PubSub) AdvertiseGroup

func (ps *PubSub) AdvertiseGroup(ctx context.Context, adv *pb.AdvertRequest) (*pb.Ack, error)

AdvertiseGroup remote call used to propagate a advertisement to the rendezvous

func (*PubSub) BackupRefresh

func (ps *PubSub) BackupRefresh(stream pb.ScoutHub_BackupRefreshServer) error

BackupRefresh refreshes the filter table the backup keeps of the peer

func (*PubSub) CreateMulticastGroup

func (ps *PubSub) CreateMulticastGroup(pred string) error

func (*PubSub) DelegateSubToHelper

func (ps *PubSub) DelegateSubToHelper(ctx context.Context, sub *pb.DelegateSub) (*pb.Ack, error)

DelegateSubToHelper is a remote call used by the premium publisher of a multicast group to delegate a sub to a sub already helping him

func (*PubSub) GroupSearchRequest

func (ps *PubSub) GroupSearchRequest(ctx context.Context, req *pb.SearchRequest) (*pb.SearchReply, error)

GroupSearchRequest is a piggybacked remote call that deliveres to the myGroupSearchRequest caller all the multicastGroups he has in his AdvertiseBoard that comply with his search predicate

func (*PubSub) HelpNewRv added in v0.5.20

func (ps *PubSub) HelpNewRv(ctx context.Context, event *pb.Event) (*pb.Ack, error)

func (*PubSub) LogToTracker

func (ps *PubSub) LogToTracker(ctx context.Context, log *pb.EventLog) (*pb.Ack, error)

LogToTracker is the remote call a tracker receives from the Rv node with a event log for him to start tracking

func (*PubSub) MyPremiumPublish

func (ps *PubSub) MyPremiumPublish(grpPred string, event string, eventInfo string) error

MyPremiumPublish is the operation a premium publisher runs when he wants to publish in one of its MulticastGroups

func (*PubSub) MyPremiumSubscribe

func (ps *PubSub) MyPremiumSubscribe(info string, pubAddr string, pubPredicate string) error

MyPremiumSubscribe is the operation a subscriber performs in order to belong to a certain MulticastGroup of a certain premium publisher and predicate

func (*PubSub) MyPremiumUnsubscribe

func (ps *PubSub) MyPremiumUnsubscribe(pubPred string, pubAddr string) error

myPremiumUnsubscribe is the operation a premium subscriber performes once it wants to get out of a multicastGroup

func (*PubSub) MyPublish

func (ps *PubSub) MyPublish(data string, info string) error

MyPublish function is used when we want to publish an event on the overlay. Data is the message we want to publish and info is the representative predicate of that event data. The publish operation is made towards all attributes rendezvous in order find the way to all subscribers

func (*PubSub) MySearchAndPremiumSub

func (ps *PubSub) MySearchAndPremiumSub(pred string) error

MyGroupSearchRequest requests to the closest rendezvous of his whished Group predicate for MulticastGroups of his interest

func (*PubSub) MySubscribe

func (ps *PubSub) MySubscribe(info string) error

MySubscribe subscribes to certain event(s) and saves it in myFilters for further resubing operations and assess if node is interested in the events it receives

func (*PubSub) MyUnsubscribe

func (ps *PubSub) MyUnsubscribe(info string) error

MyUnsubscribe deletes a specific predicate out of mySubs list which will stop the refreshing of thatsub and stop delivering to the user those contained events

func (*PubSub) Notify

func (ps *PubSub) Notify(ctx context.Context, event *pb.Event) (*pb.Ack, error)

Notify is a remote function called by a external peer to send an Event downstream

func (*PubSub) PremiumPublish

func (ps *PubSub) PremiumPublish(ctx context.Context, event *pb.PremiumEvent) (*pb.Ack, error)

PremiumPublish remote call used not only by the premium publisher to forward its events to the helpers and interested subs but also by the helpers to forward to their delegated subs

func (*PubSub) PremiumSubscribe

func (ps *PubSub) PremiumSubscribe(ctx context.Context, sub *pb.PremiumSubscription) (*pb.Ack, error)

PremiumSubscribe remote call used by the myPremiumSubscribe to delegate the premium subscription to the premium publisher to process it

func (*PubSub) PremiumUnsubscribe

func (ps *PubSub) PremiumUnsubscribe(ctx context.Context, sub *pb.PremiumSubscription) (*pb.Ack, error)

PremiumUnsubscribe remote call used by the subscriber to communicate is insterest to unsubscribe to a multicastGroup to the premium publisher

func (*PubSub) Publish

func (ps *PubSub) Publish(ctx context.Context, event *pb.Event) (*pb.Ack, error)

Publish is a remote function called by a external peer to send an Event upstream

func (*PubSub) RequestHelp

func (ps *PubSub) RequestHelp(ctx context.Context, req *pb.HelpRequest) (*pb.Ack, error)

RequestHelp is the remote call the premium publisher of a MulticastGroup uses to a sub of his to recruit him as a helper

func (*PubSub) ReturnCorrectnessStats

func (ps *PubSub) ReturnCorrectnessStats(expected []string) (int, int)

ReturnCorrectnessStats returns the number of events missing and duplicated

func (*PubSub) ReturnEventStats

func (ps *PubSub) ReturnEventStats() []int

ReturnEventStats returns the time it took to receive each event

func (*PubSub) ReturnOpStats added in v0.13.75

func (ps *PubSub) ReturnOpStats(opName string) int

ReturnOpStats returns the number of times a operation was executed

func (*PubSub) ReturnSubStats

func (ps *PubSub) ReturnSubStats() []int

ReturnSubsStats returns the time it took to receive confirmation of subscription completion

func (*PubSub) SetHasOldPeer

func (ps *PubSub) SetHasOldPeer()

SetHasOldPeer only goal is to set peer as old in testing scenario

func (*PubSub) Subscribe

func (ps *PubSub) Subscribe(ctx context.Context, sub *pb.Subscription) (*pb.Ack, error)

Subscribe is a remote function called by a external peer to send subscriptions towards the rendezvous node

func (*PubSub) TerminateService

func (ps *PubSub) TerminateService()

TerminateService closes the PubSub service

func (*PubSub) TrackerRefresh

func (ps *PubSub) TrackerRefresh(ctx context.Context, req *pb.RecruitTrackerMessage) (*pb.Ack, error)

TrackerRefresh is a rpc that is requested by a new tracker to the rv neighbourhood in order to refresh himself with their event ledgers

func (*PubSub) UpdateBackup

func (ps *PubSub) UpdateBackup(ctx context.Context, update *pb.Update) (*pb.Ack, error)

UpdateBackup sends a new filter of the filter table to the backup

type RangeAttributeTree

type RangeAttributeTree struct {
	// contains filtered or unexported fields
}

RangeAttributeTree is a structure that organizes the subscribers by their interest in a specific value of an attribute for then faster/efficient subscriber diffusion

func NewRangeAttributeTree

func NewRangeAttributeTree(attr *Attribute) *RangeAttributeTree

func (*RangeAttributeTree) AddSubToTree

func (rt *RangeAttributeTree) AddSubToTree(sub *SubData)

AddSubToTree adds a sub to the tree translating the attribute values for tree insertion

func (*RangeAttributeTree) AddSubToTreeRoot

func (rt *RangeAttributeTree) AddSubToTreeRoot(sub *SubData)

AddSubToTreeRoot adds a sub to a tree root

func (*RangeAttributeTree) DeleteSubFromTree

func (rt *RangeAttributeTree) DeleteSubFromTree(sub *SubData)

DeleteSubFromTree removes a subs from the tree

func (*RangeAttributeTree) GetInterestedSubs

func (rt *RangeAttributeTree) GetInterestedSubs(value int) []*SubData

GetInterestedSubs collects the subs that are interested in an attribute specific value

func (*RangeAttributeTree) RemoveSubFromTreeRoot

func (rt *RangeAttributeTree) RemoveSubFromTreeRoot(sub *SubData)

RemoveSubFromTreeRoot removes a sub from the tree root

type RegionData

type RegionData struct {
	// contains filtered or unexported fields
}

type RouteStats

type RouteStats struct {
	// contains filtered or unexported fields
}

RouteStats keeps filters for each pubsub peer it is connected and its backups in case of his failure

func NewRouteStats

func NewRouteStats(addr string) *RouteStats

func (*RouteStats) IsInterested

func (rs *RouteStats) IsInterested(p *Predicate) bool

IsInterested checks if there are any filters compatible to a specific predicate inside a routeStat and returns true if there are

func (*RouteStats) SimpleAddSummarizedFilter

func (rs *RouteStats) SimpleAddSummarizedFilter(p *Predicate, backups []string) (bool, *Predicate)

SimpleAddSummarizedFilter is called upon receiving a subscription filter to see if it should be added if exclusive, merged with others, or encompass or be encompassed by others

func (*RouteStats) SimpleSubtractFilter

func (rs *RouteStats) SimpleSubtractFilter(p *Predicate)

SimpleSubtractFilter removes all the emcompassed filters by the info string predicate ignoring partial encompassing for now

type SetupPubSub

type SetupPubSub struct {

	// Maximum number of subscribers the pubisher can have in a geographical region
	MaxSubsPerRegion int

	// # nodes of a region that are not delegated, to keep the most powerfull nodes recruitable for the future
	PowerSubsPoolSize int

	// Time resend operation if not acknowledge
	OpResendRate time.Duration

	// Number of backups, localized tolerable faults
	FaultToleranceFactor int

	// How many parallel operations of each type can be supported
	ConcurrentProcessingFactor int

	// Maximum allowed number of attributes per predicate
	MaxAttributesPerPredicate int

	// Frequency in which a subscriber needs to resub
	SubRefreshRateMin time.Duration

	// Time the publisher waits for rv ack until it resends event
	TimeToCheckDelivery time.Duration

	// Geographic region of the peer
	Region string

	// Number of peer he may help in FastDelivery
	Capacity int

	// True to activate redirect mechanism
	RedirectMechanism bool

	// True to activate the tracking mechanism and operation acknowledgement
	ReliableMechanisms bool

	// Should be true if we are running with our testground testbed
	TestgroundReady bool

	// timeout of each rpc
	RPCTimeout time.Duration
}

func DefaultConfig

func DefaultConfig(region string, cap int) *SetupPubSub

type SubData

type SubData struct {
	// contains filtered or unexported fields
}

type SubGroupView

type SubGroupView struct {
	// contains filtered or unexported fields
}

SubGroupView is where helper keeps track of its delegated peers

func (*SubGroupView) AddSub

func (sg *SubGroupView) AddSub(sub *pb.MinimalSubData) error

AddSub adds a subs to a helper responsability

func (*SubGroupView) AddToRangeTrees

func (sg *SubGroupView) AddToRangeTrees(sub *SubData)

AddToRangeTrees adds the subs to the helper fetching-structure tree

func (*SubGroupView) AddrsToPublishEvent

func (sg *SubGroupView) AddrsToPublishEvent(p *Predicate) []*SubData

AddrsToPublishEvent fetches the interested subs on a event the helper has received

func (*SubGroupView) RemoveFromRangeTrees

func (sg *SubGroupView) RemoveFromRangeTrees(sub *SubData)

RemoveFromRangeTrees remove a sub from the helper fetching-structure tree

func (*SubGroupView) RemoveSub

func (sg *SubGroupView) RemoveSub(sub *pb.PremiumSubscription) error

RemoveSub removes a subs from the helper responsability

func (*SubGroupView) RemoveSubFromList

func (sg *SubGroupView) RemoveSubFromList(sub *SubData)

RemoveSubFromList remove a sub from the helper fetching-structure list

func (*SubGroupView) SetHasHelper

func (sg *SubGroupView) SetHasHelper(req *pb.HelpRequest) error

SetHasHelper sets a previous normal sub to a helper and adds subs to its responsability

type SubState

type SubState struct {
	// contains filtered or unexported fields
}

type Tracker

type Tracker struct {
	// contains filtered or unexported fields
}

Tracker keeps "track" of a rendezvous forwarded events' acknowledge chain, so that if after a certain time not all confirmation were received he forwards a resend request to certain peers back to the rendezvous

func NewTracker

func NewTracker(leader bool, attr string, ps *PubSub, timeToCheckDelivery time.Duration) *Tracker

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL