mongoload

package
v0.0.0-...-fa720cf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 2, 2023 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Cmd = &cobra.Command{
	Use:   "mongoload <graph>",
	Short: "Directly load data into mongodb",
	Long:  ``,
	Args:  cobra.ExactArgs(1),
	RunE: func(cmd *cobra.Command, args []string) error {
		if vertexFile == "" && edgeFile == "" && dirPath == "" {
			return fmt.Errorf("no edge or vertex files were provided")
		}

		graph = args[0]

		log.Infof("Loading data into graph: %s", graph)
		client, err := mgo.NewClient(options.Client().ApplyURI(mongoHost))
		if err != nil {
			return err
		}
		err = client.Connect(context.TODO())
		if err != nil {
			return err
		}

		if createGraph {
			err = mongo.AddMongoGraph(client, database, graph)
			if err != nil {
				return err
			}
		}

		vertexCol := client.Database(database).Collection(fmt.Sprintf("%s_vertices", graph))
		edgeCol := client.Database(database).Collection(fmt.Sprintf("%s_edges", graph))

		if vertexFile != "" {
			log.Infof("Loading vertex file: %s", vertexFile)
			vertInserter := db.NewUnorderedBufferedBulkInserter(vertexCol, bulkBufferSize).
				SetBypassDocumentValidation(true).
				SetOrdered(false).
				SetUpsert(true)
			vertChan, err := util.StreamVerticesFromFile(vertexFile, workerCount)
			if err != nil {
				return err
			}
			dataChan := vertexSerialize(vertChan, workerCount)
			count := 0
			for d := range dataChan {
				vertInserter.InsertRaw(d)
				if count%logRate == 0 {
					log.Infof("Loaded %d vertices", count)
				}
				count++
			}
			vertInserter.Flush()
		}

		if edgeFile != "" {
			log.Infof("Loading edge file: %s", edgeFile)
			edgeInserter := db.NewUnorderedBufferedBulkInserter(edgeCol, bulkBufferSize).
				SetBypassDocumentValidation(true).
				SetOrdered(false).
				SetUpsert(true)
			edgeChan, err := util.StreamEdgesFromFile(edgeFile, workerCount)
			if err != nil {
				return err
			}
			dataChan := edgeSerialize(edgeChan, workerCount)
			count := 0
			for d := range dataChan {
				edgeInserter.InsertRaw(d)
				if count%logRate == 0 {
					log.Infof("Loaded %d edges", count)
				}
				count++
			}
			edgeInserter.Flush()
		}

		if dirPath != "" {
			if glob, err := util.DirScan(dirPath, "*.vertex.json.gz"); err == nil {
				vertexCount := 0
				vertInserter := db.NewUnorderedBufferedBulkInserter(vertexCol, bulkBufferSize).
					SetBypassDocumentValidation(true).
					SetOrdered(false).
					SetUpsert(true)
				for _, vertexFile := range glob {
					log.Infof("Loading vertex file: %s", vertexFile)
					vertChan, err := util.StreamVerticesFromFile(vertexFile, workerCount)
					if err != nil {
						return err
					}
					dataChan := vertexSerialize(vertChan, workerCount)
					for d := range dataChan {
						vertInserter.InsertRaw(d)
						if vertexCount%logRate == 0 {
							log.Infof("Loaded %d vertices", vertexCount)
						}
						vertexCount++
					}
				}
				vertInserter.Flush()
			}

			if glob, err := util.DirScan(dirPath, "*.edge.json.gz"); err == nil {
				edgeCount := 0
				edgeInserter := db.NewUnorderedBufferedBulkInserter(edgeCol, bulkBufferSize).
					SetBypassDocumentValidation(true).
					SetOrdered(false).
					SetUpsert(true)
				for _, edgeFile := range glob {
					log.Infof("Loading edge file: %s", edgeFile)
					edgeChan, err := util.StreamEdgesFromFile(edgeFile, workerCount)
					if err != nil {
						return err
					}
					dataChan := edgeSerialize(edgeChan, workerCount)
					for d := range dataChan {
						edgeInserter.InsertRaw(d)
						if edgeCount%logRate == 0 {
							log.Infof("Loaded %d edges", edgeCount)
						}
						edgeCount++
					}
				}
				edgeInserter.Flush()
			}
		}

		return nil
	},
}

Cmd is the declaration of the command line

Functions

This section is empty.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL