datahub

package module
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2024 License: Apache-2.0 Imports: 21 Imported by: 4

README

Data Hub Client SDK for Go

This is a Client SDK in Go for interacting with MIMIRO data hub instances. Documentation on MIMIRO data hub, set up and configuration can be found here.

The full go package documentation can be seen online.

Installation

To use the SDK in your Go project, you can use the go get command:

go get github.com/mimiro-io/datahub-client-sdk-go

Usage

To use the SDK in a Go project import the following:

import (
    datahub "github.com/mimiro-io/datahub-client-sdk-go"
)

The following sections highlight the core patterns and ways of using the SDK for common tasks.

Creating a client

To create a client, you need to provide the base URL of the data hub instance you want to connect to.

For example:

client := datahub.NewClient("http://localhost:8080")
Authenticating

There are several supported authentication mechanisms; admin authentication using key and secret, client credentials towards an external identity provider using key and secret, and lastly public key authentication using a client certificate to sign the authentication request.

To authenticate and setup the authentication approach for the duration of the client lifecycle the client can be configured differently depending on the desired approach.

To authenticate with client credentials against an external identity provider that supports the OAuth2 flow configure the client in the following way:

client, err := NewClient("http://localhost:8080")
client.WithClientKeyAndSecretAuth("authorizer URL", "audience", "key", "secret")
err = client.Authenticate()

To authenticate with admin authentication configure the client in the following way:

client, err := NewClient(testConfig.DataHubUrl)
client.WithAdminAuth("admin user key", "admin user secret")
err = client.Authenticate()

To authenticate with client certificate it is assumed that the server has been configured with this client id and corresponding public key. The client should be configured in the following way:

client, err := NewClient(testConfig.DataHubUrl)
client.WithPublicKeyAuth(clientId, privateKey)
err = client.Authenticate()
Add Dataset

To Add a dataset to the datahub.

err := client.AddDataset("datasetName", nil)
Get Datasets

The list of datasets that a client has access to can be retrieved with the GetDatasets function.

datasets := client.GetDatasets()
Store Entities

Stores entities into a named dataset. Makes use of the Entity Graph Data Model package. Build a EntityCollection either directly, or by parsing JSON, then call StoreEntities with the collection and the name of the dataset.

namespaceManager := egdm.NewNamespaceContext()
prefixedId, err := namespaceManager.AssertPrefixFromURI("http://data.example.com/things/entity1")
ec := egdm.NewEntityCollection(namespaceManager)
entity := egdm.NewEntity().SetID(prefixedId)
err = ec.AddEntity(entity)
err = client.StoreEntities(datasetName, ec)

The StoreEntityStream function can be used to deliver a stream of entities to the server. This is useful when streaming many entities from a file or some other source.

Get Changes
GetChanges(dataset string, since string, take int, latestOnly bool, reverse bool, expandURIs bool)

Since should be the empty string unless this is a subsequent call. Take should be -1 to get all (up to the server limit). LatestOnly indicates if only the latest version of a changed entity should be returned (recommended). Reverse will return the changes with the most recent first. ExpandURIs will returned entities with all namespace prefixes resolved.

changes, err := client.GetChanges("people", "", -1, true, false, true)

The entities that have been changed are in the entities property. The Continuation property has a token that can be used as the since value in a subsequent call.

Get Jobs

To return a list of all the job configurations use the GetJobs function.

jobs, err := client.GetJobs()
Add Job

To add a job use the JobBuilder to construct a Job definition, then call AddJob with that as a parameter. There are many job options that can be set using the JobBuilder.

// build the job definition
jb := NewJobBuilder("myjob", "job1")
jb.WithDescription("my description")
jb.WithTags([]string{"tag1", "tag2"})
jb.WithDatasetSource("my-source-dataset", true)
jb.WithDatasetSink("my-sink-dataset")

js := base64.StdEncoding.EncodeToString([]byte("function transform(record) { return record; }"))
jb.WithJavascriptTransform(js, 0)

triggerBuilder := NewJobTriggerBuilder()
triggerBuilder.WithCron("0 0 0 * *")
triggerBuilder.WithIncremental()
triggerBuilder.AddLogErrorHandler(10)

jb.AddTrigger(triggerBuilder.Build())

// build and add the job
err := client.AddJob(jb.Build())

Use UpdateJob and DeleteJob to manage the set of jobs being executed.

Run Job

As well as jobs running according to their schedule they can be controlled on demand. There are several job control functions, Run, Pause, Resume, Kill, Reset Token. They are invoked using the Job id.

When running a job it can be run as full sync or incremental.

err = client.RunJobAsFullSync(jobId)
Get Job Statuses

To get the current execution status of running jobs.

statuses, err := client.GetJobStatuses()
Query for Entity

To query for an entity use the QueryBuilder then call RunQuery.

qb := NewQueryBuilder()
qb.WithEntityId("http://data.example.com/things/entity1")
query := qb.Build()

results, err := client.RunQuery(query)

To query for related entities (to traverse the graph) again use the query builder and then call RunQuery.

qb := NewQueryBuilder()
qb.WithEntityId("http://data.example.com/things/entity2")
qb.WithInverse(true)
query := qb.Build()
Javascript Query

To execute a Javascript function it must be encoded as base64. The use the RunJavascriptQuery function.

javascriptQuery := `function do_query() {
							WriteQueryResult({key1: "value1"});
							WriteQueryResult({key1: "value2"});
							WriteQueryResult({key1: "value3"});
						}`

// base64 encode the query
javascriptQuery = base64.StdEncoding.EncodeToString([]byte(javascriptQuery))

results, err := client.RunJavascriptQuery(javascriptQuery)

// iterate results
obj, err := results.Next()

Documentation

Overview

package datahub provides a sdk for interacting with MIMIRO data hub instances.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AccessControl

type AccessControl struct {
	// Resource is a URL of the resource to which the access control rule applies
	Resource string
	// Action is the action that is allowed or denied. The value can be "read" or "write"
	Action string
	// Deny is a boolean value that indicates whether the action is allowed or denied
	Deny bool
}

AccessControl is a struct that represents a single access control rule for a single resource

type AuthType

type AuthType int
const (
	// AuthTypeNone used for connecting to unsercured datahub instances
	AuthTypeNone AuthType = iota
	// AuthTypeBasic used for connecting as admin user with username and password
	AuthTypeBasic
	// AuthTypeClientKeyAndSecret used for OAuth flow with client key and secret
	AuthTypeClientKeyAndSecret
	// AuthTypePublicKey Used for OAuth flow with signed JWT authentication request
	AuthTypePublicKey
	// AuthTypeUser Used the OAuth User flow - Not yet supported
	AuthTypeUser
)

type AuthenticationError

type AuthenticationError struct {
	Err error
	Msg string
}

AuthenticationError is an error that occurs when there is an issue authenticating with the server. Check the inner error for more details.

func (*AuthenticationError) Error

func (e *AuthenticationError) Error() string

func (*AuthenticationError) Unwrap

func (e *AuthenticationError) Unwrap() error

type Client

type Client struct {
	AuthConfig *authConfig
	AuthToken  *oauth2.Token
	Server     string
}

Client is the main entry point for the data hub client sdk

func NewClient

func NewClient(server string) (*Client, error)

NewClient creates a new client instance. Specify the data hub server url as the parameter. Use the withXXX functions to configure options returns a ParameterError if the server url is empty or invalid URL

func (*Client) AddClient

func (c *Client) AddClient(clientID string, publicKey *rsa.PublicKey) error

AddClient stores the client ID and optional public key of a client. clientID is the unique id of the client to be added. publicKey is the client public key (optional). returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the clientID is empty returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) AddDataset

func (c *Client) AddDataset(name string, namespaces []string) error

AddDataset creates a dataset if it does not exist. returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the dataset name is empty. returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) AddJob

func (c *Client) AddJob(job *Job) error

AddJob adds a job to the data hub Use the JobBuilder to create valid jobs returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the job is nil, the job id is empty or the job title is empty. returns a RequestError if the request fails.

func (*Client) AddProxyDataset

func (c *Client) AddProxyDataset(name string, namespaces []string, remoteDatasetURL string, authProviderName string) error

AddProxyDataset creates a proxy dataset if it does not exist, or updates the namespaces, remoteDatasetURL and authProviderName if it does. returns an error if the dataset could not be created or updated. returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the dataset name is empty. returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) AddTokenProvider

func (c *Client) AddTokenProvider(tokenProviderConfig *ProviderConfig) error

AddTokenProvider returns the access control rules for the specified client. tokenProviderConfig is a single token provider configuration to be added. returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the tokenProviderConfig is nil returns a RequestError if the request fails.

func (*Client) Authenticate

func (c *Client) Authenticate() error

Authenticate attempts to authenticate the client with the configured authentication type returns an AuthenticationError if authentication fails

func (*Client) DeleteClient

func (c *Client) DeleteClient(id string) error

DeleteClient deletes the specific client. clientID is the unique id of the client to be added. returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the clientID is empty returns a RequestError if the request fails.

func (*Client) DeleteDataset

func (c *Client) DeleteDataset(dataset string) error

DeleteDataset deletes a named dataset. returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the dataset name is empty. returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) DeleteJob

func (c *Client) DeleteJob(id string) error

DeleteJob deletes a job from the data hub id is the id of the job to delete returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the job id is empty. returns a RequestError if the request fails.

func (*Client) DeleteTokenProvider

func (c *Client) DeleteTokenProvider(name string) error

DeleteTokenProvider deletes the specified token provider. name is the name of the token provider to be deleted. returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the name is empty returns a RequestError if the request fails.

func (*Client) GenerateKeypair

func (c *Client) GenerateKeypair() (*rsa.PrivateKey, *rsa.PublicKey, error)

GenerateKeypair generates a new RSA keypair

func (*Client) GetChanges

func (c *Client) GetChanges(dataset string, since string, take int, latestOnly bool, reverse bool, expandURIs bool) (*egdm.EntityCollection, error)

GetChanges gets changes for a dataset. returns an EntityCollection for the named dataset. since parameter is an optional token to get changes since. take parameter is an optional limit on the number of changes to return. latestOnly parameter is an optional flag to only return the latest version of each entity. reverse parameter is an optional flag to reverse the order of the changes. expandURIs parameter is an optional flag to expand Entity URIs in the response. returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the dataset name is empty. returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) GetChangesStream added in v0.1.3

func (c *Client) GetChangesStream(dataset string, since string, latestOnly bool, take int, reverse bool, expandURIs bool) (EntityIterator, error)

GetChangesStream gets entities for a dataset as a stream from the since position defined. returns an EntityIterator over the changes for the named dataset. since parameter is an optional token to get changes since. take parameter is an optional limit on the number of changes to return in each batch. reverse parameter is an optional flag to reverse the order of the changes. latestOnly parameter is an optional flag to only return the latest version of each entity. expandURIs parameter is an optional flag to expand Entity URIs in the response. returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the dataset name is empty. returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) GetClientAcl

func (c *Client) GetClientAcl(clientID string) ([]AccessControl, error)

GetClientAcl returns the access control rules for the specified client. clientID is the unique id of the client to be added. returns a slice of AccessControl structs that represent the access control rules. returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the clientID is empty returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) GetClients

func (c *Client) GetClients() (map[string]ClientInfo, error)

GetClients returns a map of client IDs to ClientInfo structs returns an AuthenticationError if the client is unable to authenticate. returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) GetDataset

func (c *Client) GetDataset(name string) (*Dataset, error)

GetDataset gets a dataset by name. returns a dataset if it exists, or an error if it does not. returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the dataset name is empty. returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) GetDatasetEntity

func (c *Client) GetDatasetEntity(name string) (*egdm.Entity, error)

GetDatasetEntity gets a dataset entity by name. returns an Entity if it exists, or an error if it does not. returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the dataset name is empty. returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) GetDatasets

func (c *Client) GetDatasets() ([]*Dataset, error)

GetDatasets gets list of datasets. returns []*Dataset for the named dataset. returns an AuthenticationError if the client is unable to authenticate. returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) GetEntities

func (c *Client) GetEntities(dataset string, from string, take int, reverse bool, expandURIs bool) (*egdm.EntityCollection, error)

GetEntities gets entities for a dataset. returns an EntityCollection for the named dataset. from parameter is an optional token to get changes since. take parameter is an optional limit on the number of changes to return. reverse parameter is an optional flag to reverse the order of the changes. expandURIs parameter is an optional flag to expand Entity URIs in the response. returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the dataset name is empty. returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) GetEntitiesStream added in v0.1.1

func (c *Client) GetEntitiesStream(dataset string, from string, take int, reverse bool, expandURIs bool) (EntityIterator, error)

GetEntitiesStream gets entities for a dataset as a stream from the start position defined. returns an EntityIterator over the entities in the named dataset. from parameter is an optional token to get changes since. take parameter is an optional limit on the number of changes to return. reverse parameter is an optional flag to reverse the order of the changes. expandURIs parameter is an optional flag to expand Entity URIs in the response. returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the dataset name is empty. returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) GetJob

func (c *Client) GetJob(id string) (*Job, error)

GetJob gets a job from the data hub id is the id of the job to get returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the job id is empty. returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) GetJobStatus

func (c *Client) GetJobStatus(id string) (*JobStatus, error)

GetJobStatus gets the status of a job from the data hub id is the id of the job to get the status for returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the job id is empty. returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) GetJobStatuses

func (c *Client) GetJobStatuses() ([]*JobStatus, error)

GetJobStatuses gets the status of all running jobs from the data hub returns an AuthenticationError if the client is unable to authenticate. returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) GetJobs

func (c *Client) GetJobs() ([]*Job, error)

GetJobs gets a list of jobs from the data hub returns an AuthenticationError if the client is unable to authenticate. returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) GetJobsHistory

func (c *Client) GetJobsHistory() ([]*JobResult, error)

GetJobsHistory gets the history of all jobs from the data hub returns an AuthenticationError if the client is unable to authenticate. returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) GetJobsSchedule

func (c *Client) GetJobsSchedule() (*ScheduleEntries, error)

GetJobsSchedule gets the schedule for all scheduled jobs from the data hub returns an AuthenticationError if the client is unable to authenticate. returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) GetTokenProvider

func (c *Client) GetTokenProvider(name string) (*ProviderConfig, error)

GetTokenProvider returns the specified token provider. name is the name of the token provider to be returned. returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the name is empty returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) GetTokenProviders

func (c *Client) GetTokenProviders() ([]*ProviderConfig, error)

GetTokenProviders returns a slice of ProviderConfig structs. returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the tokenProviderConfig is nil returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) KillJob

func (c *Client) KillJob(id string) error

KillJob kills a job in the data hub id is the id of the job to kill returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the job id is empty. returns a RequestError if the request fails.

func (*Client) LoadKeypair

func (c *Client) LoadKeypair(location string) (*rsa.PrivateKey, *rsa.PublicKey, error)

LoadKeypair loads an RSA keypair from the specified location. Names of the key files are node_key and node_key.pub

func (*Client) PauseJob

func (c *Client) PauseJob(id string) error

PauseJob pauses a job in the data hub id is the id of the job to pause returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the job id is empty. returns a RequestError if the request fails.

func (*Client) ProcessTransaction

func (c *Client) ProcessTransaction(transaction *Transaction) error

ProcessTransaction sends a transaction to the datahub returns a ParameterError if the transaction is nil or cannot be serialiased returns an AuthenticationError if the client is not authenticated returns a RequestError if the transaction could not be processed Example usage: (error handling omitted for brevity)

	txn := NewTransaction()
	entityId, err := txn.NamespaceManager.AssertPrefixFromURI("http://data.example.io/entity1")
	entity := egdm.NewEntity().SetID(entityId)
	txn.DatasetEntities[datasetId1] = append(txn.DatasetEntities[datasetId1], entity)
	err = client.ProcessTransaction(txn)
 	create another entity
 	entityId2, err := txn.NamespaceManager.AssertPrefixFromURI("http://data.example.io/entity2")
 	entity2 := egdm.NewEntity().SetID(entityId2)
 	txn.DatasetEntities[datasetId2] = append(txn.DatasetEntities[datasetId2], entity2)
 	err = client.ProcessTransaction(txn)

func (*Client) ResetJobSinceToken

func (c *Client) ResetJobSinceToken(id string, token string) error

ResetJobSinceToken resets the job since token id is the id of the job to reset token is the since token to reset to returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the job id is empty. returns a RequestError if the request fails.

func (*Client) ResumeJob

func (c *Client) ResumeJob(id string) error

ResumeJob resumes a job in the data hub id is the id of the job to resume returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the job id is empty. returns a RequestError if the request fails.

func (*Client) RunHopQuery added in v0.1.2

func (c *Client) RunHopQuery(entityId string, predicate string, datasets []string, inverse bool, limit int) (EntityIterator, error)

func (*Client) RunJavascriptQuery

func (c *Client) RunJavascriptQuery(query string) (*QueryResultIterator, error)

RunJavascriptQuery executes a javascript query on the server. The query is a base64 encoded string of the javascript code to execute. returns a QueryResultIterator that can be used to iterate over the results. returns an AuthenticationError if the client is not authenticated. returns a ParameterError if the query is empty. returns a RequestError if there is an issue executing the query.

func (*Client) RunJobAsFullSync

func (c *Client) RunJobAsFullSync(id string) error

RunJobAsFullSync runs a job as a full sync job id is the id of the job to run returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the job id is empty. returns a RequestError if the request fails.

func (*Client) RunJobAsIncremental

func (c *Client) RunJobAsIncremental(id string) error

RunJobAsIncremental runs a job as an incremental job id is the id of the job to run returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the job id is empty. returns a RequestError if the request fails.

func (*Client) RunQuery

func (c *Client) RunQuery(query *Query) ([]any, error)

func (*Client) RunStreamingQuery added in v0.1.2

func (c *Client) RunStreamingQuery(query *Query) (EntityIterator, error)

func (*Client) SaveKeypair

func (c *Client) SaveKeypair(location string, privateKey *rsa.PrivateKey, publicKey *rsa.PublicKey) error

SaveKeypair saves the specified RSA keypair to the specified location. Names of the key files are node_key and node_key.pub

func (*Client) SetClientAcl

func (c *Client) SetClientAcl(clientID string, acls []AccessControl) error

SetClientAcl sets the access control rules for the specified client. clientID is the unique id of the client to be added. acls is a slice of AccessControl structs that represent the access control rules to be set. returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the clientID is empty returns a RequestError if the request fails.

func (*Client) SetTokenProvider

func (c *Client) SetTokenProvider(name string, tokenProviderConfig *ProviderConfig) error

SetTokenProvider sets the specified token provider. name is the name of the token provider to be set. tokenProviderConfig is the token provider configuration to be set. returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the name is empty or the tokenProviderConfig is nil returns a RequestError if the request fails.

func (*Client) StoreEntities

func (c *Client) StoreEntities(dataset string, entityCollection *egdm.EntityCollection) error

StoreEntities stores the entities in a named dataset. dataset is the name of the dataset to be updated. entityCollection is the set of entities to store. returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the dataset name is empty or entityCollection is nil. returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) StoreEntityStream

func (c *Client) StoreEntityStream(dataset string, data io.Reader) error

StoreEntityStream stores the entities in a named dataset. dataset is the name of the dataset to be updated. data is the stream of entities to store. returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the dataset name is empty or entityCollection is nil. returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) UpdateDatasetEntity

func (c *Client) UpdateDatasetEntity(dataset string, datasetEntity *egdm.Entity) error

UpdateDatasetEntity updates the dataset entity for a named dataset. returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the dataset name is empty or the dataset entity is nil. returns a RequestError if the request fails. returns a ClientProcessingError if the response cannot be processed.

func (*Client) UpdateJob

func (c *Client) UpdateJob(job *Job) error

UpdateJob updates a job in the data hub Use the JobBuilder to create valid jobs returns an AuthenticationError if the client is unable to authenticate. returns a ParameterError if the job is nil, the job id is empty or the job title is empty. returns a RequestError if the request fails.

func (*Client) WithAdminAuth

func (c *Client) WithAdminAuth(username string, password string) *Client

WithAdminAuth sets the authentication type to basic authentication. username and password are the credentials of the admin user

func (*Client) WithClientKeyAndSecretAuth

func (c *Client) WithClientKeyAndSecretAuth(authorizer string, audience string, clientKey string, clientSecret string) *Client

WithClientKeyAndSecretAuth sets the authentication type to client key and secret OAuth2 authentication flow authorizer is the url of the authorizer service audience is the audience identifier clientKey is the client key clientSecret is the client secret

func (*Client) WithExistingToken

func (c *Client) WithExistingToken(token *oauth2.Token) *Client

WithExistingToken sets the authentication token to use. This is useful if you have a reconstituted a stored token from a previous session

func (*Client) WithPublicKeyAuth

func (c *Client) WithPublicKeyAuth(clientID string, privateKey *rsa.PrivateKey) *Client

WithPublicKeyAuth sets the authentication type to public key authentication. Sets the client id and private key

func (*Client) WithUserAuth

func (c *Client) WithUserAuth(authorizer string, audience string) *Client

WithUserAuth sets the authentication type to user authentication and sets the authorizer url and audience NOT SUPPORTED YET

type ClientInfo

type ClientInfo struct {
	// ClientId is the unique ID of the client on the server
	ClientId string
	// PublicKey is the public key of the client
	PublicKey []byte
	// Deleted is a boolean value that indicates whether the client is deleted
	Deleted bool
}

ClientInfo is a struct that represents a single client, including the client ID and public key

type ClientProcessingError

type ClientProcessingError struct {
	Err error
	Msg string
}

ClientProcessingError is an error that occurs when there is an issue processing the response from the server. Check the inner error for more details.

func (*ClientProcessingError) Error

func (e *ClientProcessingError) Error() string

func (*ClientProcessingError) Unwrap

func (e *ClientProcessingError) Unwrap() error

type Dataset

type Dataset struct {
	Name     string
	Metadata map[string]any
}

Dataset represents a dataset in the data hub. Name is a unique identifier for the dataset for a given data hub instance. Metadata is a map of metadata properties for the dataset.

type EntitiesStream added in v0.1.1

type EntitiesStream struct {
	// contains filtered or unexported fields
}

func (*EntitiesStream) Context added in v0.1.1

func (e *EntitiesStream) Context() *egdm.Context

func (*EntitiesStream) Next added in v0.1.1

func (e *EntitiesStream) Next() (*egdm.Entity, error)

func (*EntitiesStream) Token added in v0.1.1

func (e *EntitiesStream) Token() *egdm.Continuation

type EntityIterator

type EntityIterator interface {
	Context() *egdm.Context
	Next() (*egdm.Entity, error)
	Token() *egdm.Continuation
}

type Job

type Job struct {
	Title       string                 `json:"title"`
	Id          string                 `json:"id"`
	Description string                 `json:"description"`
	Tags        []string               `json:"tags,omitempty"`
	Source      map[string]interface{} `json:"source,omitempty"`
	Sink        map[string]interface{} `json:"sink,omitempty"`
	Transform   *Transform             `json:"transform,omitempty"`
	Triggers    []*JobTrigger          `json:"triggers,omitempty"`
	Paused      bool                   `json:"paused"`
	BatchSize   int                    `json:"batchSize"`
}

Job is a datahub job

type JobBuilder

type JobBuilder struct {
	// contains filtered or unexported fields
}

JobBuilder is a builder for Job

func NewJobBuilder

func NewJobBuilder(title string, id string) *JobBuilder

NewJobBuilder creates a new JobBuilder. Use the build functions to build the Job after title and id must be provided, by non-empty and be unique

func (*JobBuilder) AddTrigger

func (jb *JobBuilder) AddTrigger(trigger *JobTrigger) *JobBuilder

AddTrigger adds a trigger to the job. Use the JobTriggerBuilder to construct valid triggers

func (*JobBuilder) Build

func (jb *JobBuilder) Build() *Job

Build builds the Job

func (*JobBuilder) WithBatchSize

func (jb *JobBuilder) WithBatchSize(batchSize int) *JobBuilder

WithBatchSize adds a batch size to the job

func (*JobBuilder) WithDatasetSink

func (jb *JobBuilder) WithDatasetSink(name string) *JobBuilder

WithDatasetSink adds a dataset sink to the job name is the name of the dataset

func (*JobBuilder) WithDatasetSource

func (jb *JobBuilder) WithDatasetSource(name string, latestOnly bool) *JobBuilder

WithDatasetSource adds a dataset source to the job name is the name of the dataset latestOnly is a flag to indicate whether only the latest version of the entities should be used

func (*JobBuilder) WithDescription

func (jb *JobBuilder) WithDescription(description string) *JobBuilder

WithDescription adds a description to the job

func (*JobBuilder) WithHttpSink

func (jb *JobBuilder) WithHttpSink(url string) *JobBuilder

WithHttpSink adds an http sink to the job url is the url to the sink

func (*JobBuilder) WithHttpSource

func (jb *JobBuilder) WithHttpSource(url string, latestOnly bool) *JobBuilder

WithHttpSource adds an http source to the job url is the url to the source latestOnly is a flag to indicate whether only the latest version of the entities should be used

func (*JobBuilder) WithJavascriptTransform

func (jb *JobBuilder) WithJavascriptTransform(code string, parallelism int) *JobBuilder

WithJavascriptTransform adds a JavascriptTransform to the job. Code is the javascript to be executed encoded as a base64 string. Parallelism is the number of parallel workers to use

func (*JobBuilder) WithPaused

func (jb *JobBuilder) WithPaused(paused bool) *JobBuilder

WithPaused adds a paused flag to the job

func (*JobBuilder) WithSecureHttpSink

func (jb *JobBuilder) WithSecureHttpSink(url string, tokenProvider string) *JobBuilder

WithSecureHttpSink adds a secure http sink to the job url is the url to the sink tokenProvider is the name of the token provider to use

func (*JobBuilder) WithSecureHttpSource

func (jb *JobBuilder) WithSecureHttpSource(url string, latestOnly bool, tokenProvider string) *JobBuilder

WithSecureHttpSource adds a secure http source to the job url is the url to the source latestOnly is a flag to indicate whether only the latest version of the entities should be used tokenProvider is the name of the token provider to use

func (*JobBuilder) WithSink

func (jb *JobBuilder) WithSink(sink map[string]interface{}) *JobBuilder

WithSink adds a sink to the job. See data hub documentation on valid sinks Use of the WithXXXSink simplifies most use cases

func (*JobBuilder) WithSource

func (jb *JobBuilder) WithSource(source map[string]interface{}) *JobBuilder

WithSource adds a source to the job. See data hub documentation on valid sources Use of the WithXXXSource simplifies most use cases

func (*JobBuilder) WithTags

func (jb *JobBuilder) WithTags(tags []string) *JobBuilder

WithTags adds tags to the job

func (*JobBuilder) WithTransform

func (jb *JobBuilder) WithTransform(transform *Transform) *JobBuilder

WithTransform adds a transform to the job. See data hub documentation on valid transforms Use of the WithXXXTransform simplifies most use cases

func (*JobBuilder) WithTriggers

func (jb *JobBuilder) WithTriggers(triggers []*JobTrigger) *JobBuilder

WithTriggers adds triggers to the job. See data hub documentation on valid triggers

func (*JobBuilder) WithUnionDatasetSource

func (jb *JobBuilder) WithUnionDatasetSource(contributingDatasets []string, latestOnly bool) *JobBuilder

WithUnionDatasetSource adds a UnionDatasetSource to the job. name is the name of the union dataset. contributingDatasets is a list of dataset names that contribute to the union. latestOnly indicates whether the union should only contain the latest version of an entity from each source.

type JobResult

type JobResult struct {
	ID        string    `json:"id"`
	Title     string    `json:"title"`
	Start     time.Time `json:"start"`
	End       time.Time `json:"end"`
	LastError string    `json:"lastError"`
	Processed int       `json:"processed"`
}

JobResult represents the history of job runs

type JobStatus

type JobStatus struct {
	JobId    string    `json:"jobId"`
	JobTitle string    `json:"jobTitle"`
	Started  time.Time `json:"started"`
}

JobStatus represents the status of a running job

type JobTrigger

type JobTrigger struct {
	TriggerType      string                   `json:"triggerType"`
	JobType          string                   `json:"jobType"`
	Schedule         string                   `json:"schedule"`
	MonitoredDataset string                   `json:"monitoredDataset,omitempty"`
	OnError          []map[string]interface{} `json:"onError,omitempty"`
}

JobTrigger represents a trigger for a job TriggerType can be cron or onchange JobType can be incremental or fullsync Schedule is the cron schedule MonitoredDataset is the dataset to monitor for changes OnError is a list of error handlers

type JobTriggerBuilder

type JobTriggerBuilder struct {
	// contains filtered or unexported fields
}

JobTriggerBuilder is a builder for JobTrigger

func NewJobTriggerBuilder

func NewJobTriggerBuilder() *JobTriggerBuilder

NewJobTriggerBuilder creates a new JobTriggerBuilder. Use the build functions to build the JobTrigger after calling the configuration functions.

func (*JobTriggerBuilder) AddLogErrorHandler

func (jtb *JobTriggerBuilder) AddLogErrorHandler(maxItems int) *JobTrigger

AddLogErrorHandler adds a log error handler to the JobTrigger maxItems is the maximum number of items to log

func (*JobTriggerBuilder) AddRerunErrorHandler

func (jtb *JobTriggerBuilder) AddRerunErrorHandler(retryDelay int, maxRetries int) *JobTrigger

AddRerunErrorHandler adds a kill error handler to the JobTrigger retryDelay is the delay in seconds before retrying maxRetries is the maximum number of retries that should be attempted

func (*JobTriggerBuilder) Build

func (jtb *JobTriggerBuilder) Build() *JobTrigger

Build builds the JobTrigger

func (*JobTriggerBuilder) WithCron

func (jtb *JobTriggerBuilder) WithCron(schedule string) *JobTriggerBuilder

WithCron configures the JobTrigger as a cron trigger schedule is the cron schedule

func (*JobTriggerBuilder) WithFullSync

func (jtb *JobTriggerBuilder) WithFullSync() *JobTriggerBuilder

WithFullSync configures the JobTrigger as a full sync job

func (*JobTriggerBuilder) WithIncremental

func (jtb *JobTriggerBuilder) WithIncremental() *JobTriggerBuilder

WithIncremental configures the JobTrigger as an incremental job

func (*JobTriggerBuilder) WithOnChange

func (jtb *JobTriggerBuilder) WithOnChange(dataset string) *JobTriggerBuilder

WithOnChange configures the JobTrigger as an onchange trigger dataset is the dataset to monitor for changes

type ParameterError

type ParameterError struct {
	Err error
	Msg string
}

ParameterError is an error that occurs when there is an issue with the parameters passed to the client function. Check the inner error for more details.

func (*ParameterError) Error

func (e *ParameterError) Error() string

func (*ParameterError) Unwrap

func (e *ParameterError) Unwrap() error

type ProviderConfig

type ProviderConfig struct {
	Name         string       `json:"name"`
	Type         string       `json:"type"`
	User         *ValueReader `json:"user,omitempty"`
	Password     *ValueReader `json:"password,omitempty"`
	ClientId     *ValueReader `json:"key,omitempty"`
	ClientSecret *ValueReader `json:"secret,omitempty"`
	Audience     *ValueReader `json:"audience,omitempty"`
	Endpoint     *ValueReader `json:"endpoint,omitempty"`
}

type Query

type Query struct {
	EntityID         string   `json:"entityId"`
	StartingEntities []string `json:"startingEntities"`
	Predicate        string   `json:"predicate"`
	Inverse          bool     `json:"inverse"`
	Datasets         []string `json:"datasets"`
	Details          bool     `json:"details"`
	Limit            int      `json:"limit"`
	Continuations    []string `json:"continuations"`
	NoPartialMerging bool     `json:"noPartialMerging"`
}

type QueryBuilder

type QueryBuilder struct {
	// contains filtered or unexported fields
}

func NewQueryBuilder

func NewQueryBuilder() *QueryBuilder

func (*QueryBuilder) Build

func (qb *QueryBuilder) Build() *Query

func (*QueryBuilder) WithContinuations

func (qb *QueryBuilder) WithContinuations(continuations []string) *QueryBuilder

func (*QueryBuilder) WithDatasets

func (qb *QueryBuilder) WithDatasets(datasets []string) *QueryBuilder

func (*QueryBuilder) WithDetails

func (qb *QueryBuilder) WithDetails(details bool) *QueryBuilder

func (*QueryBuilder) WithEntityId

func (qb *QueryBuilder) WithEntityId(entityId string) *QueryBuilder

func (*QueryBuilder) WithInverse

func (qb *QueryBuilder) WithInverse(inverse bool) *QueryBuilder

func (*QueryBuilder) WithLimit

func (qb *QueryBuilder) WithLimit(limit int) *QueryBuilder

func (*QueryBuilder) WithNoPartialMerging

func (qb *QueryBuilder) WithNoPartialMerging(noPartialMerging bool) *QueryBuilder

func (*QueryBuilder) WithPredicate

func (qb *QueryBuilder) WithPredicate(predicate string) *QueryBuilder

func (*QueryBuilder) WithStartingEntities

func (qb *QueryBuilder) WithStartingEntities(startingEntities []string) *QueryBuilder

type QueryResultEntitiesStream added in v0.1.2

type QueryResultEntitiesStream struct {
	// contains filtered or unexported fields
}

func (*QueryResultEntitiesStream) Context added in v0.1.2

func (e *QueryResultEntitiesStream) Context() *egdm.Context

func (*QueryResultEntitiesStream) Next added in v0.1.2

func (*QueryResultEntitiesStream) Token added in v0.1.2

type QueryResultIterator

type QueryResultIterator struct {
	// contains filtered or unexported fields
}

QueryResultIterator is used to iterate over the results of a javascript query.

func (*QueryResultIterator) Close

func (qri *QueryResultIterator) Close() error

Close closes the query result iterator. This must be called when the iterator is no longer needed. returns a ClientProcessingError if there is an issue closing the data stream.

func (*QueryResultIterator) Next

func (qri *QueryResultIterator) Next() (map[string]interface{}, error)

Next returns the next object in the query result iterator. returns a ClientProcessingError if there is an issue decoding the data stream. returns nil if there are no more objects. returns the object if there are no errors.

type RequestError

type RequestError struct {
	Err error
	Msg string
}

RequestError is an error that occurs when there is an issue making the request or with the request data. Check the inner error for more details.

func (*RequestError) Error

func (e *RequestError) Error() string

func (*RequestError) Unwrap

func (e *RequestError) Unwrap() error

type ScheduleEntries

type ScheduleEntries struct {
	Entries []ScheduleEntry `json:"entries"`
}

ScheduleEntries is a container for all scheduled jobs

type ScheduleEntry

type ScheduleEntry struct {
	ID       int       `json:"id"`
	JobID    string    `json:"jobId"`
	JobTitle string    `json:"jobTitle"`
	Next     time.Time `json:"next"`
	Prev     time.Time `json:"prev"`
}

ScheduleEntry is information about a scheduled job

type Transaction

type Transaction struct {
	NamespaceManager *egdm.NamespaceContext
	DatasetEntities  map[string][]*egdm.Entity
}

func NewTransaction

func NewTransaction() *Transaction

NewTransaction creates a new transaction initialize the transaction with a namespace manage that will be used to generate prefixed URIs

type Transform

type Transform struct {
	Type        string `json:"Type"`
	Code        string `json:"Code"`
	Parallelism int    `json:"Parallelism"`
}

func NewJavascriptTransform

func NewJavascriptTransform(code string, parallelism int) *Transform

NewJavascriptTransform creates a new JavascriptTransform code is the javascript to be executed encoded as a base64 string

type ValueReader

type ValueReader struct {
	Type  string `json:"type"`
	Value string `json:"value"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL