stream

package
v0.0.0-...-fa720cf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 2, 2023 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Cmd = &cobra.Command{
	Use:   "stream <graph>",
	Short: "Stream data into a graph from Kafka",
	Long:  ``,
	Args:  cobra.ExactArgs(1),
	RunE: func(cmd *cobra.Command, args []string) error {
		graph = args[0]
		log.WithFields(log.Fields{"kafka": kafka, "graph": graph}).Errorf("Streaming data from Kafka into graph")

		conn, err := gripql.Connect(rpc.ConfigWithDefaults(host), true)
		if err != nil {
			return err
		}

		consumer, err := sarama.NewConsumer([]string{kafka}, nil)
		if err != nil {
			panic(err)
		}

		vertexConsumer, err := consumer.ConsumePartition(vertexTopic, 0, sarama.OffsetOldest)
		edgeConsumer, err := consumer.ConsumePartition(edgeTopic, 0, sarama.OffsetOldest)

		done := make(chan bool)

		go func() {
			count := 0
			for msg := range vertexConsumer.Messages() {
				v := gripql.Vertex{}
				err := protojson.Unmarshal(msg.Value, &v)
				if err != nil {
					log.WithFields(log.Fields{"error": err}).Error("vertex consumer: unmarshal error")
					continue
				}
				err = conn.AddVertex(graph, &v)
				if err != nil {
					log.WithFields(log.Fields{"error": err}).Error("vertex consumer: add error")
					continue
				}
				count++
				if count%1000 == 0 {
					log.Infof("Loaded %d vertices", count)
				}
			}
			done <- true
		}()

		go func() {
			count := 0
			for msg := range edgeConsumer.Messages() {
				e := gripql.Edge{}
				err := protojson.Unmarshal(msg.Value, &e)
				if err != nil {
					log.WithFields(log.Fields{"error": err}).Error("edge consumer: unmarshal error")
					continue
				}
				err = conn.AddEdge(graph, &e)
				if err != nil {
					log.WithFields(log.Fields{"error": err}).Error("edge consumer: add error")
					continue
				}
				count++
				if count%1000 == 0 {
					log.Infof("Loaded %d edges", count)
				}
			}
			done <- true
		}()
		<-done
		<-done
		return nil
	},
}

Cmd is the base command called by the cobra command line system

Functions

This section is empty.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL