neo4j

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 18, 2023 License: Apache-2.0 Imports: 22 Imported by: 0

README

GoDoc Go Report Card

Neo4J Storage Driver for Layered Schemas

This Go package contains the Neo4J graph database driver to store graphs generated by layered schemas. It also has a command line tool that can be paired with layers command line tool to ingest data and store in a Neo4J database.

Graph Mapping

The data ingested using layered schemas is represented as a labeled property graph. There are some minor differences between the Neo4J graph representation and a more generic labeled property graph used by labeled schemas, so a mapping is necessary. This section describes how that mapping is done.

  • Every node of a data graph is stored as a Neo4J node.

    • The data graph node id is stored as the string property neo4j_id
    • All node properties of the data graph are stored as Neo4j node properties, with string or []string values
    • The types associated with a data graph node are stored as Neo4j node labels.
  • Every edge of a data graph is stored as a Neo4J edge.

    • The graph node edge label is stored as the Neo4J edge label
    • All edge properties of the graph node are stored as string or []string property values

As an example:

Data Graph:
+----------------------------+                                  +---------------------------+
|  @id = node_1              |                                  |  @id = node_2             |
+----------------------------+                                  +---------------------------+
|  @type = [ type1, type2 ]  |-------- edgeLabel      --------->|  @type = [ type3, type4 ] |
|  property = value          |         edgeProperty = value     |  property2 = value2       +
+----------------------------+                                  +---------------------------+


Neo4J:
+---------------------+                                      +---------------------------+
|  :type1 :type2      |                                      |     :type3 :type4         |
+-------------------- +                                      +---------------------------+
|  neo4j_id = node_1  |----------- :edgeLabel -------------->|   neo4j_id = node_2       |
|                     |             edgeProperty = value     |   property2 = value2      |
|  property = value   |                                      +---------------------------+
+---------------------+

Command Line Tool

To build the command line tool, use the Go build system:

cd lsaneo
go mod tidy
go build

That should build the lsaneo binary for your platform

You can pair lsaneo with the layers tool to ingest data and store graphs:

layers ingest csv --schema myschema.json inputdata.csv | lsaneo create --user userName --pwd password --uri dburi

You can also store saved graph files (in JSON-LD flattened format):

lsaneo create --user userName --pwd password --uri dbUri <fileName>

Documentation

Overview

Package neo4j is the graph storage driver using Neo4J database.

The input graph node IDs are stored in `neo4j_id` property. All other properties and labels are stored verbatim.

Index

Constants

View Source
const (
	PropNodeID = "neo4j_id"
)

Variables

View Source
var DEFAULT_BATCH_SIZE = 1000

Functions

func AddLinksToThisEntity

func AddLinksToThisEntity(ctx *ls.Context, tx neo4j.ExplicitTransaction, session *Session, config Config, entityRoot *lpg.Node, nodeMap map[*lpg.Node]string) error

ID is entity id of entity root

func BuildNodePropertiesAfterLoad

func BuildNodePropertiesAfterLoad(node *lpg.Node, input map[string]interface{}, cfg Config)

BuildNodePropertiesAfterLoad is during the loading of nodes from database. This function sets all node properties to PropertyValues, excluding properties that are assigned to NodeValueTerm

func Close

func Close(closer CloseWithContext, ctx context.Context, resourceName string)

func Compact

func Compact(g *lpg.Graph) (*lpg.Graph, error)

Compact a graph for storing in the database. This will remove all nodes that do not have an id, and collapse all value nodes to properties

func CreateEdgesBatch

func CreateEdgesBatch(ctx *ls.Context, tx neo4j.ExplicitTransaction, session *Session, createEdges []*lpg.Edge, nodeMap map[*lpg.Node]string, cfg Config, batchSize int) error

func CreateEdgesUnwind

func CreateEdgesUnwind(ctx *ls.Context, session *Session, edges []*lpg.Edge, dbGraph *DBGraph, cfg Config) func(neo4j.ExplicitTransaction) error

func CreateNodesBatch

func CreateNodesBatch(ctx *ls.Context, tx neo4j.ExplicitTransaction, createNodes []*lpg.Node, nodeMap map[*lpg.Node]string, cfg Config, batchSize int) error

func CreateNodesUnwind

func CreateNodesUnwind(ctx *ls.Context, nodes []*lpg.Node, dbGraph *DBGraph, cfg Config) func(neo4j.ExplicitTransaction) error

func Execute

func Execute(ctx *ls.Context, tx neo4j.ExplicitTransaction, cfg Config, oldNodeset, newNodeset Nodeset, rootOp rootOpType, inserts, updates, deletes []NodesetData) error

func Insert

func Insert(ctx *ls.Context, session *Session, tx neo4j.ExplicitTransaction, grph *lpg.Graph, selectEntity func(*lpg.Node) bool, config Config, batch int) ([]string, error)

Insert creates or adds to a graph on a database; does not check existing nodes

func LinkEntitiesByForeignKeys

func LinkEntitiesByForeignKeys(ctx *ls.Context, tx neo4j.ExplicitTransaction, session *Session, config Config, entityRoot *lpg.Node, nodeMap map[*lpg.Node]string) error

func LinkMergedEntities

func LinkMergedEntities(ctx *ls.Context, tx neo4j.ExplicitTransaction, cfg Config, delta []Delta, nodeMap map[*lpg.Node]string) error

LinkMergedEntities will find the new entities from the delta and link them

func LinkNodesForNewEntity

func LinkNodesForNewEntity(ctx *ls.Context, tx neo4j.ExplicitTransaction, session *Session, config Config, entityRoot *lpg.Node, nodeMap map[*lpg.Node]string) error

func Neo4jValueToNativeValue

func Neo4jValueToNativeValue(val interface{}) interface{}

neo4jValueToNativeValue converts a neo4j value to a native go value

func NodesetDiff

func NodesetDiff(oldNodeset, newNodeset Nodeset) (rootOp rootOpType, insertions []NodesetData, deletions []NodesetData, updates []NodesetData)

oldNodeset is nodeset pulled from DB

func ParseNodesetData

func ParseNodesetData(cfg Config, input NodesetInput) (map[string]Nodeset, error)

func SetNodeValueAfterLoad

func SetNodeValueAfterLoad(cfg Config, node *lpg.Node, input map[string]interface{}) interface{}

Called before SetNodeValue, input contains neo4j native values

func WriteableType

func WriteableType(value any) bool

Determine if value is writeable

Types

type CloseWithContext

type CloseWithContext interface {
	Close(context.Context) error
}

type Config

type Config struct {
	TermMappings       map[string]string            `json:"termMappings" yaml:"termMappings"`
	NamespaceMappings  map[string]string            `json:"namespaceMappings" yaml:"namespaceMappings"`
	PropertyTypes      map[string]string            `json:"propertyTypes" yaml:"propertyTypes"`
	EntityMergeActions map[string]EntityMergeAction `json:"entityMergeActions" yaml:"entityMergeActions"`
	// contains filtered or unexported fields
}

func (*Config) Expand

func (cfg *Config) Expand(short string) string

func (Config) GetNeo4jPropertyValue

func (cfg Config) GetNeo4jPropertyValue(expandedPropertyKey string, val string) (interface{}, error)

GetNativePropertyValue is called during building properties for save and when the expanded property key exists in the config.

func (Config) IsMergeEntity

func (cfg Config) IsMergeEntity(name string) bool

IsMergeEntity returns if the entity has a merge config

func (Config) MakeLabels

func (cfg Config) MakeLabels(types []string) string

func (Config) MakeProperties

func (cfg Config) MakeProperties(x withProperty, txVars map[string]any) string

func (Config) MakePropertiesObj

func (cfg Config) MakePropertiesObj(x withProperty) map[string]any

func (Config) Shorten

func (cfg Config) Shorten(fullName string) string

func (Config) ShortenProperties

func (cfg Config) ShortenProperties(props map[string]any) map[string]any

type CreateEdgeDelta

type CreateEdgeDelta struct {
	DBEdge *lpg.Edge
	// contains filtered or unexported fields
}

func (CreateEdgeDelta) Run

func (ce CreateEdgeDelta) Run(ctx *ls.Context, tx neo4j.ExplicitTransaction, session *Session, dbNodeIds map[*lpg.Node]string, dbEdgeIds map[*lpg.Edge]string, c Config) error

func (CreateEdgeDelta) WriteQuery

func (ce CreateEdgeDelta) WriteQuery(session *Session, dbNodeIds map[*lpg.Node]string, dbEdgeIds map[*lpg.Edge]string, c Config) DeltaQuery

type CreateEntity

type CreateEntity struct {
	Config
	*lpg.Graph
	*lpg.Node
}

func (*CreateEntity) Queue

CreateEntity.Queue will find all connected nodes to the given entity and create, stopping at different entity boundaries

type CreateNodeDelta

type CreateNodeDelta struct {
	DBNode  *lpg.Node
	MemNode *lpg.Node
}

func (CreateNodeDelta) Run

func (cn CreateNodeDelta) Run(ctx *ls.Context, tx neo4j.ExplicitTransaction, session *Session, dbNodeIds map[*lpg.Node]string, dbEdgeIds map[*lpg.Edge]string, c Config) error

func (CreateNodeDelta) WriteQuery

func (cn CreateNodeDelta) WriteQuery(session *Session, dbNodeIds map[*lpg.Node]string, dbEdgeIds map[*lpg.Edge]string, c Config) DeltaQuery

type DBGraph

type DBGraph struct {
	G *lpg.Graph

	NodeIds map[*lpg.Node]string
	Nodes   map[string]*lpg.Node
	EdgeIds map[*lpg.Edge]string
	Edges   map[string]*lpg.Edge
}

func NewDBGraph

func NewDBGraph(g *lpg.Graph) *DBGraph

func (*DBGraph) NewEdge

func (dbg *DBGraph) NewEdge(edge *lpg.Edge, dbID string)

func (*DBGraph) NewNode

func (dbg *DBGraph) NewNode(node *lpg.Node, dbID string)

type DeleteEntity

type DeleteEntity struct {
	Config
	*lpg.Graph
	// contains filtered or unexported fields
}

func (*DeleteEntity) Queue

func (d *DeleteEntity) Queue(ctx *ls.Context, tx neo4j.ExplicitTransaction, session *Session, q *JobQueue, selectEntity func(*lpg.Node) bool) error

DeleteEntity.Queue will find all connected nodes to the given entity in the database and delete them

type Delta

type Delta interface {
	WriteQuery(session *Session, dbNodeIds map[*lpg.Node]string, dbEdgeIds map[*lpg.Edge]string, c Config) DeltaQuery
	Run(ctx *ls.Context, tx neo4j.ExplicitTransaction, session *Session, dbNodeIds map[*lpg.Node]string, dbEdgeIds map[*lpg.Edge]string, c Config) error
}

func Merge

func Merge(memGraph *lpg.Graph, dbGraph *DBGraph, config Config) ([]Delta, error)

func SelectDelta

func SelectDelta(in []Delta, flt func(Delta) bool) []Delta

type DeltaQuery

type DeltaQuery struct {
	Query string
	Vars  map[string]interface{}
}

func (DeltaQuery) String

func (d DeltaQuery) String() string

type Driver

type Driver struct {

	// ID(objectName)=id.
	IDEqValueFunc func(objectName, id string) string
	// ID(objectName)=varname.
	IDEqVarFunc func(objectName, varname string) string
	// ID(objectName)
	IDFunc func(objectName string) string
	// IDValue returns the actual ID value as a string or int64
	IDValue func(string) interface{}
	// contains filtered or unexported fields
}

func NewDriver

func NewDriver(driver neo4j.DriverWithContext, databaseName string) *Driver

func (*Driver) Close

func (d *Driver) Close(ctx context.Context)

func (*Driver) NewSession

func (d *Driver) NewSession(ctx context.Context) *Session

type EntityMergeAction

type EntityMergeAction struct {
	Merge  *bool `json:"merge" yaml:"merge"`
	Create *bool `json:"create" yaml:"create"`
}

func (EntityMergeAction) GetCreate

func (e EntityMergeAction) GetCreate() bool

func (EntityMergeAction) GetMerge

func (e EntityMergeAction) GetMerge() bool

type ErrMultipleFound

type ErrMultipleFound string

func (ErrMultipleFound) Error

func (e ErrMultipleFound) Error() string

type JobQueue

type JobQueue struct {
	// contains filtered or unexported fields
}

func (*JobQueue) Run

func (q *JobQueue) Run(ctx *ls.Context, tx neo4j.ExplicitTransaction, session *Session, cfg Config, nodeMap map[*lpg.Node]string, batchSize int) error

type Neo4jCache

type Neo4jCache struct {
	// contains filtered or unexported fields
}

type Nodeset

type Nodeset struct {
	ID         string
	Labels     []string
	Properties map[string]interface{}
	// Data[x] gives NodesetData with Nodeset.ID = x
	Data map[string]NodesetData
}

func LoadNodeset

func LoadNodeset(ctx context.Context, cfg Config, tx neo4j.ExplicitTransaction, nodesetId string) (Nodeset, error)

type NodesetData

type NodesetData struct {
	ID         string
	Labels     []string
	Properties map[string]interface{}
}

type NodesetInput

type NodesetInput interface {
	ColumnNames() []string
	Next() ([]string, error)
	Reset() error
}

type Session

type Session struct {
	neo4j.SessionWithContext
	*Driver
}

func (*Session) Close

func (s *Session) Close(ctx context.Context)

func (*Session) CollectEntityDBIds

func (s *Session) CollectEntityDBIds(ctx *ls.Context, tx neo4j.ExplicitTransaction, config Config, grph *lpg.Graph, cache *Neo4jCache) (entityRootNodes []*lpg.Node, entityRootDBIds []string, entityInfo map[*lpg.Node]ls.EntityInfo, err error)

func (*Session) LoadDBGraph

func (s *Session) LoadDBGraph(ctx *ls.Context, tx neo4j.ExplicitTransaction, memGraph *lpg.Graph, config Config, cache *Neo4jCache) (*DBGraph, error)

LoadDBGraph loads a graph from the DB. The entity root nodes are matched based on the config entityMergeActions. If there is an entityMergeAction defined in the config for an entity, then this function will look up for a node that contains that entity name. If not, it will look for exact label match.

For instance, if there is entityMergeAction defined for X, then a memory node :X:Y will be looked up as :X. If X does not have entityMergeAction, then a memory node :X:Y will look for :X:Y

func (*Session) LoadEntityNodes

func (s *Session) LoadEntityNodes(ctx *ls.Context, tx neo4j.ExplicitTransaction, grph *lpg.Graph, rootIds []string, config Config, selectEntity func(*lpg.Node) bool) error

func (*Session) LoadEntityNodesByEntityId

func (s *Session) LoadEntityNodesByEntityId(ctx *ls.Context, tx neo4j.ExplicitTransaction, grph *lpg.Graph, rootIds []string, config Config, selectEntity func(*lpg.Node) bool) error

func (*Session) Logf

func (s *Session) Logf(format string, a ...interface{})

type Trie

type Trie struct {
	// contains filtered or unexported fields
}

Trie represents a trie and has a pointer to the root node

func InitNamespaceTrie

func InitNamespaceTrie(cfg *Config) *Trie

func InitTrie

func InitTrie() *Trie

InitTrie will create a new Trie

func (*Trie) Insert

func (t *Trie) Insert(word, mapping string)

Insert will take in a word and add it to the trie

func (*Trie) Search

func (t *Trie) Search(word string) (string, string, bool)

Search will search if a word is in the trie

type TrieNode

type TrieNode struct {
	// contains filtered or unexported fields
}

Node represents each node in the trie

type UpdateEdgeDelta

type UpdateEdgeDelta struct {
	// contains filtered or unexported fields
}

func (UpdateEdgeDelta) Run

func (ue UpdateEdgeDelta) Run(ctx *ls.Context, tx neo4j.ExplicitTransaction, session *Session, dbNodeIds map[*lpg.Node]string, dbEdgeIds map[*lpg.Edge]string, c Config) error

func (UpdateEdgeDelta) WriteQuery

func (ue UpdateEdgeDelta) WriteQuery(session *Session, dbNodeIds map[*lpg.Node]string, dbEdgeIds map[*lpg.Edge]string, c Config) DeltaQuery

type UpdateNodeDelta

type UpdateNodeDelta struct {
	// contains filtered or unexported fields
}

func (UpdateNodeDelta) Run

func (un UpdateNodeDelta) Run(ctx *ls.Context, tx neo4j.ExplicitTransaction, session *Session, dbNodeIds map[*lpg.Node]string, dbEdgeIds map[*lpg.Edge]string, c Config) error

func (UpdateNodeDelta) WriteQuery

func (un UpdateNodeDelta) WriteQuery(session *Session, dbNodeIds map[*lpg.Node]string, dbEdgeIds map[*lpg.Edge]string, c Config) DeltaQuery

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL