bulk

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2022 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option func(*Service) error

func SetIndexService

func SetIndexService(is *index.Service) Option

func SetIndexTypes

func SetIndexTypes(indexTypes ...string) Option

func SetPostHookService added in v0.1.10

func SetPostHookService(hooks ...domain.PostHookService) Option

type Parser

type Parser struct {
	// contains filtered or unexported fields
}

func (*Parser) AppendRDFBulkRequest

func (p *Parser) AppendRDFBulkRequest(req *Request, g *rdf.Graph) error

AppendRDFBulkRequest gathers all the triples from an BulkAction to be inserted in bulk.

func (*Parser) Parse

func (p *Parser) Parse(ctx context.Context, r io.Reader) error

func (*Parser) Publish

func (p *Parser) Publish(ctx context.Context, req *Request) error

func (*Parser) RDFBulkInsert

func (p *Parser) RDFBulkInsert() []error

RDFBulkInsert inserts all triples from the bulkRequest in one SPARQL update statement

type Publisher added in v0.2.1

type Publisher struct {
	BulkSize int
	// contains filtered or unexported fields
}

func NewPublisher added in v0.2.1

func NewPublisher(host, dataPath string) *Publisher

func (*Publisher) Append added in v0.2.1

func (p *Publisher) Append(request *Request) error

Append appends a bulk.Request as []byte to p.requests

When p.requests > p.BulkSize it will submit the chunk to the endpoint

func (*Publisher) AppendBytes added in v0.2.1

func (p *Publisher) AppendBytes(b []byte) error

AppendBytes appends a bulk.Request as []byte to p.requests

When p.requests > p.BulkSize it will submit the chunk to the endpoint

func (*Publisher) Do added in v0.2.1

func (p *Publisher) Do(ctx context.Context) error

Do parses the records in the dataPath and submits them in chunks to the Hub3 BulkAPI endpoint

Chunks are defined by p.BulkSize.

The expected directory structure is:

{orgId}

/{datasetID}
	/bulk
		/{hubid}.jsonl

The jsonl file is assumed to be a bulk.Request serialized on a single line. Inside the struct newlines can be escaped.

It will call increment revision at the start and on final submit it will clear orphans.

func (*Publisher) MaxRetries added in v0.2.1

func (p *Publisher) MaxRetries(max int)

func (*Publisher) Stats added in v0.2.1

func (p *Publisher) Stats() PublisherStats

Stats returns the number of request and submits by the Publisher

type PublisherStats added in v0.2.1

type PublisherStats struct {
	NrRequests uint64
	NrSubmits  uint64
}

type Request

type Request struct {
	HubID         string `json:"hubId"`
	OrgID         string `json:"orgID"`
	DatasetID     string `json:"dataset"`
	LocalID       string `json:"localID"`
	NamedGraphURI string `json:"graphUri"`
	RecordType    string `json:"type"`
	Action        string `json:"action"`
	ContentHash   string `json:"contentHash"`
	Graph         string `json:"graph"`
	GraphMimeType string `json:"graphMimeType"`
	SubjectType   string `json:"subjectType"`
	Revision      int    `json:"revision"`
	Tags          string `json:"tags,omitempty"`
}

type Service

type Service struct {
	// contains filtered or unexported fields
}

func NewService

func NewService(options ...Option) (*Service, error)

func (*Service) Handle

func (s *Service) Handle(w http.ResponseWriter, r *http.Request)

bulkApi receives bulkActions in JSON form (1 per line) and processes them in ingestion pipeline.

func (*Service) HandleRDF added in v0.2.1

func (s *Service) HandleRDF(w http.ResponseWriter, r *http.Request)

func (*Service) NewParser

func (s *Service) NewParser() *Parser

func (*Service) Routes added in v0.3.0

func (s *Service) Routes(pattern string, r chi.Router)

func (*Service) ServeHTTP

func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*Service) SetServiceBuilder added in v0.3.0

func (s *Service) SetServiceBuilder(b *domain.ServiceBuilder)

func (*Service) Shutdown

func (s *Service) Shutdown(ctx context.Context) error

type Stats

type Stats struct {
	OrgID              string `json:"orgID"`
	DatasetID          string `json:"datasetID"`
	Spec               string `json:"spec"`
	SpecRevision       uint64 `json:"specRevision"`  // version of the records stored
	TotalReceived      uint64 `json:"totalReceived"` // originally json was total_received
	RecordsStored      uint64 `json:"recordsStored"` // originally json was records_stored
	JSONErrors         uint64 `json:"jsonErrors"`
	TriplesStored      uint64 `json:"triplesStored"`
	PostHooksSubmitted uint64 `json:"postHooksSubmitted"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL