bspgraph

package
v0.0.0-...-8b501b0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2023 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrUnknownEdgeSource is returned by AddEdge when the source vertex
	// is not present in the graph.
	ErrUnknownEdgeSource = xerrors.New("source vertex is not part of the graph")

	// ErrDestinationIsLocal is returned by Relayer instances to indicate
	// that a message destination is actually owned by the local graph.
	ErrDestinationIsLocal = xerrors.New("message destination is assigned to the local graph")

	// ErrInvalidMessageDestination is returned by calls to SendMessage and
	// BroadcastToNeighbors when the destination cannot be resolved to any
	// (local or remote) vertex.
	ErrInvalidMessageDestination = xerrors.New("invalid message destination")
)

Functions

This section is empty.

Types

type Aggregator

type Aggregator interface {
	// Type returns the type of this aggregator.
	Type() string

	// Set the aggregator to the specified value.
	Set(val interface{})

	// Get the current aggregator value.
	Get() interface{}

	// Aggregate updates the aggregator's value based on the provided value.
	Aggregate(val interface{})

	// Delta returns the change in the aggregator's value since the last
	// call to Delta. The delta values can be used in distributed
	// aggregator use-cases to reduce local, partially-aggregated values
	// into a single value across by feeding them into the Aggregate method
	// of a top-level aggregator.
	//
	// For example, in a distributed counter scenario, each node maintains
	// its own local counter instance. At the end of each step, the master
	// node calls delta on each local counter and aggregates the values
	// to obtain the correct total which is then pushed back to the workers.
	Delta() interface{}
}

Aggregator is implemented by types that provide concurrent-safe aggregation primitives (e.g. counters, min/max, topN).

type ComputeFunc

type ComputeFunc func(g *Graph, v *Vertex, msgIt message.Iterator) error

ComputeFunc is a function that a graph instance invokes on each vertex when executing a superstep.

type Edge

type Edge struct {
	// contains filtered or unexported fields
}

Edge represents a directed edge in the Graph.

func (*Edge) DstID

func (e *Edge) DstID() string

DstID returns the vertex ID that corresponds to this edge's target endpoint.

func (*Edge) SetValue

func (e *Edge) SetValue(val interface{})

SetValue sets the value associated with this edge.

func (*Edge) Value

func (e *Edge) Value() interface{}

Value returns the value associated with this edge.

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Executor wraps a Graph instance and provides an orchestration layer for executing super-steps until an error occurs or an exit condition is met. Users can provide an optional set of callbacks to be executed before and after each super-step.

func NewExecutor

func NewExecutor(g *Graph, cb ExecutorCallbacks) *Executor

NewExecutor returns an Executor instance for graph g that invokes the provided list of callbacks inside each execution loop.

func (*Executor) Graph

func (ex *Executor) Graph() *Graph

Graph returns the graph instance associated with this executor.

func (*Executor) RunSteps

func (ex *Executor) RunSteps(ctx context.Context, numSteps int) error

RunSteps executes at most numStep supersteps unless the context expires, an error occurs or one of the Pre/PostStepKeepRunning callbacks specified at configuration time returns false.

func (*Executor) RunToCompletion

func (ex *Executor) RunToCompletion(ctx context.Context) error

RunToCompletion keeps executing supersteps until the context expires, an error occurs or one of the Pre/PostStepKeepRunning callbacks specified at configuration time returns false.

func (*Executor) Superstep

func (ex *Executor) Superstep() int

Superstep returns the current graph superstep.

type ExecutorCallbacks

type ExecutorCallbacks struct {
	// PreStep, if defined, is invoked before running the next superstep.
	// This is a good place to initialize variables, aggregators etc. that
	// will be used for the next superstep.
	PreStep func(ctx context.Context, g *Graph) error

	// PostStep, if defined, is invoked after running a superstep.
	PostStep func(ctx context.Context, g *Graph, activeInStep int) error

	// PostStepKeepRunning, if defined, is invoked after running a superstep
	// to decide whether the stop condition for terminating the run has
	// been met. The number of the active vertices in the last step is
	// passed as the second argument.
	PostStepKeepRunning func(ctx context.Context, g *Graph, activeInStep int) (bool, error)
}

ExecutorCallbacks encapsulates a series of callbacks that are invoked by an Executor instance on a graph. All callbacks are optional and will be ignored if not specified.

type ExecutorFactory

type ExecutorFactory func(*Graph, ExecutorCallbacks) *Executor

ExecutorFactory is a function that creates new Executor instances.

type Graph

type Graph struct {
	// contains filtered or unexported fields
}

Graph implements a parallel graph processor based on the concepts described in the Pregel paper.

func NewGraph

func NewGraph(cfg GraphConfig) (*Graph, error)

NewGraph creates a new Graph instance using the specified configuration. It is important for callers to invoke Close() on the returned graph instance when they are done using it.

func (*Graph) AddEdge

func (g *Graph) AddEdge(srcID, dstID string, initValue interface{}) error

AddEdge inserts a directed edge from src to destination and annotates it with the specified initValue. By design, edges are owned by the source vertices (destinations can be either local or remote) and therefore srcID must resolve to a local vertex. Otherwise, AddEdge returns an error.

func (*Graph) AddVertex

func (g *Graph) AddVertex(id string, initValue interface{})

AddVertex inserts a new vertex with the specified id and initial value into the graph. If the vertex already exists, AddVertex will just overwrite its value with the provided initValue.

func (*Graph) Aggregator

func (g *Graph) Aggregator(name string) Aggregator

Aggregator returns the aggregator with the specified name or nil if the aggregator does not exist

func (*Graph) Aggregators

func (g *Graph) Aggregators() map[string]Aggregator

Aggregators returns a map of all currently registered aggregators where the key is the aggregator's name.

func (*Graph) BroadcastToNeighbors

func (g *Graph) BroadcastToNeighbors(v *Vertex, msg message.Message) error

BroadcastToNeighbors is a helper function that broadcasts a single message to each neighbor of a particular vertex. Messages are queued for delivery and will be processed by receipients in the next superstep.

func (*Graph) Close

func (g *Graph) Close() error

Close releases any resources associated with the graph.

func (*Graph) RegisterAggregator

func (g *Graph) RegisterAggregator(name string, aggr Aggregator)

RegisterAggregator adds an aggregator with the specified name into the graph.

func (*Graph) RegisterRelayer

func (g *Graph) RegisterRelayer(relayer Relayer)

RegisterRelayer configures a Relayer that the graph will invoke when attempting to deliver a message to a vertex that is not known locally but could potentially be owned by a remote graph instance.

func (*Graph) Reset

func (g *Graph) Reset() error

Reset the state of the graph by removing any existing vertices or aggregators and resetting the superstep counter.

func (*Graph) SendMessage

func (g *Graph) SendMessage(dstID string, msg message.Message) error

SendMessage attempts to deliver a message to the vertex with the specified destination ID. Messages are queued for delivery and will be processed by receipients in the next superstep.

If the destination ID is not known by this graph, it might still be a valid ID for a vertex that is owned by a remote graph instance. If the client has provided a Relayer when configuring the graph, SendMessage will delegate message delivery to it.

On the other hand, if no Relayer is defined or the configured RemoteMessageSender returns a ErrDestinationIsLocal error, SendMessage will first check whether an UnknownVertexHandler has been provided at configuration time and invoke it. Otherwise, an ErrInvalidMessageDestination is returned to the caller.

func (*Graph) Superstep

func (g *Graph) Superstep() int

Superstep returns the current superstep value.

func (*Graph) Vertices

func (g *Graph) Vertices() map[string]*Vertex

Vertices returns the graph vertices as a map where the key is the vertex ID.

type GraphConfig

type GraphConfig struct {
	// QueueFactory is used by the graph to create message queue instances
	// for each vertex that is added to the graph. If not specified, the
	// default in-memory queue will be used instead.
	QueueFactory message.QueueFactory

	// ComputeFn is the compute function that will be invoked for each graph
	// vertex when executing a superstep. A valid ComputeFunc instance is
	// required for the config to be valid.
	ComputeFn ComputeFunc

	// ComputeWorkers specifies the number of workers to use for invoking
	// the registered ComputeFunc when executing each superstep. If not
	// specified, a single worker will be used.
	ComputeWorkers int
}

GraphConfig encapsulates the configuration options for creating graphs.

type Relayer

type Relayer interface {
	// Relay a message to a vertex that is not known locally. Calls to
	// Relay must return ErrDestinationIsLocal if the provided dst value is
	// not a valid remote destination.
	Relay(dst string, msg message.Message) error
}

Relayer is implemented by types that can relay messages to vertices that are managed by a remote graph instance.

type RelayerFunc

type RelayerFunc func(string, message.Message) error

The RelayerFunc type is an adapter to allow the use of ordinary functions as Relayers. If f is a function with the appropriate signature, RelayerFunc(f) is a Relayer that calls f.

func (RelayerFunc) Relay

func (f RelayerFunc) Relay(dst string, msg message.Message) error

Relay calls f(dst, msg).

type Vertex

type Vertex struct {
	// contains filtered or unexported fields
}

Vertex represents a vertex in the Graph.

func (*Vertex) Edges

func (v *Vertex) Edges() []*Edge

Edges returns the list of outgoing edges from this vertex.

func (*Vertex) Freeze

func (v *Vertex) Freeze()

Freeze marks the vertex as inactive. Inactive vertices will not be processed in the following supersteps unless they receive a message in which case they will be re-activated.

func (*Vertex) ID

func (v *Vertex) ID() string

ID returns the vertex ID.

func (*Vertex) SetValue

func (v *Vertex) SetValue(val interface{})

SetValue sets the value associated with this vertex.

func (*Vertex) Value

func (v *Vertex) Value() interface{}

Value returns the value associated with this vertex.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL