ingester

package
v4.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2022 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ApplyMerge

func ApplyMerge(doc *models.Document, command *UpdateCommand, secondary []*models.Document) *models.Document

ApplyMerge execute a merge based on a specific UpdateCommand

Types

type BulkIngestRequest

type BulkIngestRequest struct {
	UUID         string             `json:"uuid"`
	DocumentType string             `json:"documentType"`
	MergeConfig  []*merge.Config    `json:"merge"`
	Docs         []*models.Document `json:"docs"`
}

BulkIngestRequest wrap a collection of ingestion request (multiple documents with multiple mergeconfigs)

type BulkIngester

type BulkIngester struct {
	EsExecutor     *elasticsearch.EsExecutor
	TypedIngesters map[string]*TypedIngester
	Cache          *ttlcache.Cache
}

BulkIngester is a component which split BulkIngestRequest and affect the resulting IngestRequests to dedicated TypedIngester As a chokepoint, it doesn't do much processing and only acts as a request router

func NewBulkIngester

func NewBulkIngester(esExecutor *elasticsearch.EsExecutor) *BulkIngester

NewBulkIngester returns a pointer to a new BulkIngester instance

func (*BulkIngester) Ingest

func (ingester *BulkIngester) Ingest(bir BulkIngestRequest)

Ingest process a single BulkIngestRequest The BulkIngestRequest is splitted in multiple IngestRequest, then sent to a specific TypedIngester The target TypedIngester is selected, based on which document type must be updated

type GetQuery

type GetQuery struct {
	DocumentType string
	ID           string
}

GetQuery ...

type IndexingWorker

type IndexingWorker struct {
	TypedIngester *TypedIngester
	ID            int
	Data          chan *UpdateCommand
	Client        *elasticsearch.EsExecutor
}

IndexingWorker is the unit of processing which can be started in parallel for elasticsearch ingestion

func NewIndexingWorker

func NewIndexingWorker(typedIngester *TypedIngester, id int) *IndexingWorker

NewIndexingWorker returns a new IndexingWorker

func (*IndexingWorker) BulkChainedUpdate

func (worker *IndexingWorker) BulkChainedUpdate(documents [][]*UpdateCommand)

BulkChainedUpdate process multiple groups of UpdateCommand It execute sequentialy every single UpdateCommand on a specific "source" document, for each group of commands

func (*IndexingWorker) Run

func (worker *IndexingWorker) Run()

Run start a worker

type IngestRequest

type IngestRequest struct {
	UUID         string           `json:"uuid"`
	BulkUUID     string           `json:"bulkUuid"`
	DocumentType string           `json:"documentType"`
	MergeConfig  *merge.Config    `json:"merge"`
	Doc          *models.Document `json:"docs"`
}

IngestRequest wrap a single ingestion request (one document with one mergeconfig)

type TypedIngester

type TypedIngester struct {
	DocumentType string
	Data         chan *IngestRequest
	Workers      map[int]*IndexingWorker
	// contains filtered or unexported fields
}

TypedIngester is a component which process IngestRequest It generates UpdateCommand which are processed by the attached IndexingWorker's

func NewTypedIngester

func NewTypedIngester(bulkIngester *BulkIngester, documentType string) *TypedIngester

NewTypedIngester returns a pointer to a new TypedIngester instance

func (*TypedIngester) Run

func (ingester *TypedIngester) Run()

Run is the main routine of a TypeIngester instance In case of Mode == SELF * The in-memory cache is filled with new informations * An update command is send to the dedicated indexer

In case of Mode == ENRICH_FROM (Which might be the same at last ?) * An update command is send to the dedicated indexer

In case of Mode == ENRICH_TO (Which might be the same at last ?) * A dedicated "relation cache" is queried to find all the object which must be updated * One or multiple update command are sent to the dedicated indexer

type UpdateCommand

type UpdateCommand struct {
	DocumentID   string           `json:"documentId"`
	DocumentType string           `json:"documentType"`
	NewDoc       *models.Document `json:"doc"`
	MergeConfig  *merge.Config    `json:"merge"`
}

UpdateCommand wrap all infos required to update a document in elasticsearch

func NewUpdateCommand

func NewUpdateCommand(documentID string, documentType string, newDoc *models.Document, mergeConfig *merge.Config) *UpdateCommand

NewUpdateCommand returns a new UpdateCommand

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL