repository

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2024 License: MIT Imports: 63 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PermissionCheckDenied = iota
	PermissionCheckAllowed
	PermissionCheckNoSuchDocument
)
View Source
const (
	ScopeDocumentAdmin        = "doc_admin"
	ScopeDocumentReadAll      = "doc_read_all"
	ScopeDocumentRead         = "doc_read"
	ScopeDocumentDelete       = "doc_delete"
	ScopeDocumentWrite        = "doc_write"
	ScopeMetaDocumentWriteAll = "meta_doc_write_all"
	ScopeDocumentImport       = "doc_import"
	ScopeEventlogRead         = "eventlog_read"
	ScopeMetricsAdmin         = "metrics_admin"
	ScopeMetricsWrite         = "metrics_write"
	ScopeReportAdmin          = "report_admin"
	ScopeReportRun            = "report_run"
	ScopeSchemaAdmin          = "schema_admin"
	ScopeSchemaRead           = "schema_read"
	ScopeWorkflowAdmin        = "workflow_admin"
)
View Source
const (
	LockSigningKeys        = elephantCRC + 1
	LockLogicalReplication = elephantCRC + 2
)
View Source
const (
	SQLCodeObjectInUse = "55006"
)

https://www.postgresql.org/docs/current/errcodes-appendix.html#ERRCODES-TABLE

Variables

This section is empty.

Functions

func DocStoreErrorf

func DocStoreErrorf(code DocStoreErrorCode, format string, a ...any) error

func EnsureCoreSchema

func EnsureCoreSchema(ctx context.Context, store SchemaStore) error

func EnsureSchema

func EnsureSchema(
	ctx context.Context, store SchemaStore,
	name string, version string, schema revisor.ConstraintSet,
) error

func EntityRefToRPC

func EntityRefToRPC(ref []revisor.EntityRef) []*repository.EntityRef

func EventToRPC

func EventToRPC(evt Event) *repository.EventlogItem

func IsDocStoreErrorCode

func IsDocStoreErrorCode(err error, code DocStoreErrorCode) bool

func IsValidPermission added in v0.4.6

func IsValidPermission(p Permission) bool

func ListenAndServe

func ListenAndServe(ctx context.Context, addr string, h http.Handler) error

func ReportToRPC

func ReportToRPC(r Report) *repository.Report

func RequireAnyScope

func RequireAnyScope(ctx context.Context, scopes ...string) (*elephantine.AuthInfo, error)

func S3Client

func S3Client(
	ctx context.Context, opts S3Options,
) (*s3.Client, error)

func SetUpRouter

func SetUpRouter(
	router *httprouter.Router,
	opts ...RouterOption,
) error

func Subscope

func Subscope(scope string, resource ...string) string

func ToMetricAggregation

func ToMetricAggregation(a Aggregation) (repository.MetricAggregation, error)

func ValidateLabel

func ValidateLabel(label string) error

func ValueProcessingFromRPC

func ValueProcessingFromRPC(
	r []*repository.ReportValue,
) (map[string][]ReportValueProcess, error)

func ValueProcessingToRPC

func ValueProcessingToRPC(
	v map[string][]ReportValueProcess,
) []*repository.ReportValue

Types

type ACLEntry

type ACLEntry struct {
	URI         string   `json:"uri"`
	Permissions []string `json:"permissions"`
}

type Aggregation

type Aggregation int16
const (
	AggregationNone      Aggregation = 0
	AggregationReplace   Aggregation = 1
	AggregationIncrement Aggregation = 2
)

type ArchiveEventType

type ArchiveEventType int
const (
	ArchiveEventTypeStatus ArchiveEventType = iota
	ArchiveEventTypeVersion
)

type ArchiveSignature

type ArchiveSignature struct {
	KeyID     string
	Hash      [sha256.Size]byte
	Signature []byte
}

func NewArchiveSignature

func NewArchiveSignature(
	key *SigningKey, hash [sha256.Size]byte,
) (*ArchiveSignature, error)

func ParseArchiveSignature

func ParseArchiveSignature(sg string) (*ArchiveSignature, error)

func (*ArchiveSignature) String

func (as *ArchiveSignature) String() string

func (*ArchiveSignature) Verify

func (as *ArchiveSignature) Verify(key *SigningKey) error

type ArchivedDocumentStatus

type ArchivedDocumentStatus struct {
	UUID             uuid.UUID       `json:"uuid"`
	Name             string          `json:"name"`
	ID               int64           `json:"id"`
	Version          int64           `json:"version"`
	Created          time.Time       `json:"created"`
	CreatorURI       string          `json:"creator_uri"`
	Meta             json.RawMessage `json:"meta,omitempty"`
	ParentSignature  string          `json:"parent_signature,omitempty"`
	VersionSignature string          `json:"version_signature"`
}

type ArchivedDocumentVersion

type ArchivedDocumentVersion struct {
	UUID            uuid.UUID       `json:"uuid"`
	Version         int64           `json:"version"`
	Created         time.Time       `json:"created"`
	CreatorURI      string          `json:"creator_uri"`
	Meta            json.RawMessage `json:"meta,omitempty"`
	DocumentData    json.RawMessage `json:"document_data"`
	ParentSignature string          `json:"parent_signature,omitempty"`
}

type ArchivedEvent

type ArchivedEvent struct {
	Type ArchiveEventType `json:"type"`
	UUID uuid.UUID        `json:"uuid"`
	// Version is the version of a document or the ID of a status.
	Version int64  `json:"version"`
	Name    string `json:"name,omitempty"`
}

type Archiver

type Archiver struct {
	// contains filtered or unexported fields
}

Archiver reads unarchived document versions, and statuses and writes a copy to S3. It does this using SELECT ... FOR UPDATE SKIP LOCKED.

func NewArchiver

func NewArchiver(opts ArchiverOptions) (*Archiver, error)

func (*Archiver) Run

func (a *Archiver) Run(ctx context.Context) error

func (*Archiver) Stop

func (a *Archiver) Stop()

type ArchiverOptions

type ArchiverOptions struct {
	Logger            *slog.Logger
	S3                *s3.Client
	Bucket            string
	DB                *pgxpool.Pool
	MetricsRegisterer prometheus.Registerer
}

type CheckPermissionRequest

type CheckPermissionRequest struct {
	UUID        uuid.UUID
	GranteeURIs []string
	Permissions []Permission
}

type CheckPermissionResult

type CheckPermissionResult int

type DeleteRequest

type DeleteRequest struct {
	UUID      uuid.UUID
	Updated   time.Time
	Updater   string
	Meta      newsdoc.DataMap
	IfMatch   int64
	LockToken string
}

type DocStore

type DocStore interface {
	GetDocumentMeta(
		ctx context.Context, uuid uuid.UUID) (*DocumentMeta, error)
	GetDocument(
		ctx context.Context, uuid uuid.UUID, version int64,
	) (*newsdoc.Document, int64, error)
	GetVersion(
		ctx context.Context, uuid uuid.UUID, version int64,
	) (DocumentUpdate, error)
	GetVersionHistory(
		ctx context.Context, uuid uuid.UUID,
		before int64, count int,
	) ([]DocumentUpdate, error)
	Update(
		ctx context.Context,
		workflows WorkflowProvider,
		update []*UpdateRequest,
	) ([]DocumentUpdate, error)
	Delete(ctx context.Context, req DeleteRequest) error
	CheckPermissions(
		ctx context.Context, req CheckPermissionRequest,
	) (CheckPermissionResult, error)
	GetMetaTypeForDocument(
		ctx context.Context, uuid uuid.UUID,
	) (DocumentMetaType, error)
	RegisterMetaType(
		ctx context.Context, metaType string, exclusive bool,
	) error
	RegisterMetaTypeUse(
		ctx context.Context, mainType string, metaType string,
	) error
	GetEventlog(
		ctx context.Context, after int64, limit int32,
	) ([]Event, error)
	GetLastEvent(
		ctx context.Context,
	) (*Event, error)
	GetLastEventID(
		ctx context.Context,
	) (int64, error)
	GetCompactedEventlog(
		ctx context.Context, req GetCompactedEventlogRequest,
	) ([]Event, error)
	OnEventlog(
		ctx context.Context, ch chan int64,
	)
	GetStatusHistory(
		ctx context.Context, uuid uuid.UUID,
		name string, before int64, count int,
	) ([]Status, error)
	GetDocumentACL(
		ctx context.Context, uuid uuid.UUID,
	) ([]ACLEntry, error)
	Lock(
		ctx context.Context, req LockRequest,
	) (LockResult, error)
	UpdateLock(
		ctx context.Context, req UpdateLockRequest,
	) (LockResult, error)
	Unlock(
		ctx context.Context, uuid uuid.UUID, token string,
	) error
}

type DocStoreError

type DocStoreError struct {
	// contains filtered or unexported fields
}

func (DocStoreError) Error

func (e DocStoreError) Error() string

func (DocStoreError) Unwrap

func (e DocStoreError) Unwrap() error

type DocStoreErrorCode

type DocStoreErrorCode string

DocStoreErrorCode TODO: Rename to StoreErrorCode and consistently rename all dependent types and methods.

const (
	NoErrCode                 DocStoreErrorCode = ""
	ErrCodeNotFound           DocStoreErrorCode = "not-found"
	ErrCodeNoSuchLock         DocStoreErrorCode = "no-such-lock"
	ErrCodeOptimisticLock     DocStoreErrorCode = "optimistic-lock"
	ErrCodeDeleteLock         DocStoreErrorCode = "delete-lock"
	ErrCodeBadRequest         DocStoreErrorCode = "bad-request"
	ErrCodeExists             DocStoreErrorCode = "exists"
	ErrCodePermissionDenied   DocStoreErrorCode = "permission-denied"
	ErrCodeFailedPrecondition DocStoreErrorCode = "failed-precondition"
	ErrCodeDocumentLock       DocStoreErrorCode = "document-lock"
)

func GetDocStoreErrorCode

func GetDocStoreErrorCode(err error) DocStoreErrorCode

type DocumentMeta

type DocumentMeta struct {
	Created        time.Time
	Modified       time.Time
	CurrentVersion int64
	ACL            []ACLEntry
	Statuses       map[string]Status
	Deleting       bool
	Lock           Lock
	MainDocument   string
}

type DocumentMetaType added in v0.4.6

type DocumentMetaType struct {
	MetaType       string
	IsMetaDocument bool
	Exists         bool
}

type DocumentStatus

type DocumentStatus struct {
	Name     string
	Disabled bool
}

type DocumentUpdate

type DocumentUpdate struct {
	UUID    uuid.UUID
	Version int64
	Creator string
	Created time.Time
	Meta    newsdoc.DataMap
}

type DocumentValidator

type DocumentValidator interface {
	ValidateDocument(ctx context.Context, document *newsdoc.Document) []revisor.ValidationResult
}

type DocumentsService

type DocumentsService struct {
	// contains filtered or unexported fields
}

func NewDocumentsService

func NewDocumentsService(
	store DocStore,
	validator DocumentValidator,
	workflows WorkflowProvider,
	defaultLanguage string,
) *DocumentsService

func (*DocumentsService) BulkUpdate added in v0.4.0

BulkUpdate implements repository.Documents.

func (*DocumentsService) CompactedEventlog added in v0.4.0

CompactedEventlog implements repository.Documents.

func (*DocumentsService) Delete

Delete implements repository.Documents.

func (*DocumentsService) Eventlog

Eventlog returns document update events, optionally waiting for new events.

func (*DocumentsService) ExtendLock

ExtendLock extends the expiration of an existing lock.

func (*DocumentsService) Get

Get implements repository.Documents.

func (*DocumentsService) GetHistory

GetHistory implements repository.Documents.

func (*DocumentsService) GetMeta

GetMeta implements repository.Documents.

func (*DocumentsService) GetPermissions

GetPermissions returns the permissions you have for the document.

func (*DocumentsService) GetStatusHistory

GetStatusHistory returns the history of a status for a document.

func (*DocumentsService) Lock

func (*DocumentsService) Unlock

func (*DocumentsService) Update

Update implements repository.Documents.

func (*DocumentsService) Validate

Validate implements repository.Documents.

type Event

type Event struct {
	ID           int64      `json:"id"`
	Event        EventType  `json:"event"`
	UUID         uuid.UUID  `json:"uuid"`
	Timestamp    time.Time  `json:"timestamp"`
	Updater      string     `json:"updater"`
	Type         string     `json:"type"`
	Language     string     `json:"language"`
	OldLanguage  string     `json:"old_language,omitempty"`
	MainDocument *uuid.UUID `json:"main_document,omitempty"`
	Version      int64      `json:"version,omitempty"`
	StatusID     int64      `json:"status_id,omitempty"`
	Status       string     `json:"status,omitempty"`
	ACL          []ACLEntry `json:"acl,omitempty"`
}

func RPCToEvent

func RPCToEvent(evt *repository.EventlogItem) (Event, error)

type EventType

type EventType string
const (
	TypeEventIgnored    EventType = ""
	TypeDocumentVersion EventType = "document"
	TypeNewStatus       EventType = "status"
	TypeACLUpdate       EventType = "acl"
	TypeDeleteDocument  EventType = "delete_document"
)

type FanOut

type FanOut[T any] struct {
	// contains filtered or unexported fields
}

func NewFanOut

func NewFanOut[T any]() *FanOut[T]

func (*FanOut[T]) Listen

func (f *FanOut[T]) Listen(ctx context.Context, l chan T, test func(v T) bool)

func (*FanOut[T]) Notify

func (f *FanOut[T]) Notify(msg T)

type GetCompactedEventlogRequest added in v0.4.0

type GetCompactedEventlogRequest struct {
	After  int64
	Until  int64
	Type   string
	Limit  *int32
	Offset int32
}

type Lock

type Lock struct {
	Token   string
	URI     string
	Created time.Time
	Expires time.Time
	App     string
	Comment string
}

type LockRequest

type LockRequest struct {
	UUID    uuid.UUID
	URI     string
	TTL     int32
	App     string
	Comment string
}

type LockResult

type LockResult struct {
	Token   string
	Created time.Time
	Expires time.Time
}

type Metric

type Metric struct {
	UUID  uuid.UUID
	Kind  string
	Label string
	Value int64
}

type MetricKind

type MetricKind struct {
	Name        string
	Aggregation Aggregation
}

type MetricStore

type MetricStore interface {
	RegisterMetricKind(
		ctx context.Context, name string, aggregation Aggregation,
	) error
	DeleteMetricKind(
		ctx context.Context, name string,
	) error
	GetMetricKind(
		ctx context.Context, name string,
	) (*MetricKind, error)
	GetMetricKinds(
		ctx context.Context,
	) ([]*MetricKind, error)
	RegisterOrReplaceMetric(
		ctx context.Context, metric Metric,
	) error
	RegisterOrIncrementMetric(
		ctx context.Context, metric Metric,
	) error
}

type MetricsService

type MetricsService struct {
	// contains filtered or unexported fields
}

func NewMetricsService

func NewMetricsService(store MetricStore) *MetricsService

func (*MetricsService) DeleteKind

DeleteKind implements repository.Metrics.

func (*MetricsService) GetKinds

GetKinds implements repository.Metrics.

func (*MetricsService) RegisterKind

RegisterKind implements repository.Metrics.

func (*MetricsService) RegisterMetric

RegisterMetric implements repository.Metrics.

type NotifyChannel

type NotifyChannel string
const (
	NotifySchemasUpdated   NotifyChannel = "schemas"
	NotifyArchived         NotifyChannel = "archived"
	NotifyWorkflowsUpdated NotifyChannel = "workflows"
	NotifyEventlog         NotifyChannel = "eventlog"
)

type PGDocStore

type PGDocStore struct {
	// contains filtered or unexported fields
}

func NewPGDocStore

func NewPGDocStore(
	logger *slog.Logger, pool *pgxpool.Pool,
	options PGDocStoreOptions,
) (*PGDocStore, error)

func (*PGDocStore) ActivateSchema

func (s *PGDocStore) ActivateSchema(
	ctx context.Context, name, version string,
) error

RegisterSchema implements DocStore.

func (*PGDocStore) CheckPermissions added in v0.4.6

func (s *PGDocStore) CheckPermissions(
	ctx context.Context, req CheckPermissionRequest,
) (CheckPermissionResult, error)

CheckPermission implements DocStore.

func (*PGDocStore) DeactivateSchema

func (s *PGDocStore) DeactivateSchema(ctx context.Context, name string) error

DeactivateSchema implements DocStore.

func (*PGDocStore) Delete

func (s *PGDocStore) Delete(ctx context.Context, req DeleteRequest) error

Delete implements DocStore.

func (*PGDocStore) DeleteMetricKind

func (s *PGDocStore) DeleteMetricKind(
	ctx context.Context, name string,
) error

func (*PGDocStore) DeleteReport added in v0.4.5

func (s *PGDocStore) DeleteReport(
	ctx context.Context, name string,
) error

func (*PGDocStore) DeleteStatusRule

func (s *PGDocStore) DeleteStatusRule(
	ctx context.Context, name string,
) error

func (*PGDocStore) GetActiveSchemas

func (s *PGDocStore) GetActiveSchemas(
	ctx context.Context,
) ([]*Schema, error)

GetActiveSchemas implements DocStore.

func (*PGDocStore) GetCompactedEventlog added in v0.4.0

func (s *PGDocStore) GetCompactedEventlog(
	ctx context.Context, req GetCompactedEventlogRequest,
) ([]Event, error)

func (*PGDocStore) GetDocument

func (s *PGDocStore) GetDocument(
	ctx context.Context, uuid uuid.UUID, version int64,
) (*newsdoc.Document, int64, error)

GetDocument implements DocStore.

func (*PGDocStore) GetDocumentACL

func (s *PGDocStore) GetDocumentACL(
	ctx context.Context, uuid uuid.UUID,
) ([]ACLEntry, error)

func (*PGDocStore) GetDocumentMeta

func (s *PGDocStore) GetDocumentMeta(
	ctx context.Context, docID uuid.UUID,
) (*DocumentMeta, error)

GetDocumentMeta implements DocStore.

func (*PGDocStore) GetEventlog

func (s *PGDocStore) GetEventlog(
	ctx context.Context, after int64, limit int32,
) ([]Event, error)

func (*PGDocStore) GetLastEvent added in v0.4.0

func (s *PGDocStore) GetLastEvent(
	ctx context.Context,
) (*Event, error)

func (*PGDocStore) GetLastEventID added in v0.4.0

func (s *PGDocStore) GetLastEventID(ctx context.Context) (int64, error)

GetLastEventID implements DocStore.

func (*PGDocStore) GetMetaTypeForDocument added in v0.4.6

func (s *PGDocStore) GetMetaTypeForDocument(
	ctx context.Context, uuid uuid.UUID,
) (DocumentMetaType, error)

GetMetaTypeForDocument implements DocStore.

func (*PGDocStore) GetMetricKind

func (s *PGDocStore) GetMetricKind(
	ctx context.Context, name string,
) (*MetricKind, error)

func (*PGDocStore) GetMetricKinds

func (s *PGDocStore) GetMetricKinds(
	ctx context.Context,
) ([]*MetricKind, error)

func (*PGDocStore) GetReport

func (s *PGDocStore) GetReport(
	ctx context.Context, name string,
) (*StoredReport, error)

func (*PGDocStore) GetSchema

func (s *PGDocStore) GetSchema(
	ctx context.Context, name string, version string,
) (*Schema, error)

GetSchema implements DocStore.

func (*PGDocStore) GetSchemaVersions

func (s *PGDocStore) GetSchemaVersions(
	ctx context.Context,
) (map[string]string, error)

func (*PGDocStore) GetSinkPosition

func (s *PGDocStore) GetSinkPosition(ctx context.Context, name string) (int64, error)

func (*PGDocStore) GetStatusHistory

func (s *PGDocStore) GetStatusHistory(
	ctx context.Context, uuid uuid.UUID,
	name string, before int64, count int,
) ([]Status, error)

func (*PGDocStore) GetStatusRules

func (s *PGDocStore) GetStatusRules(ctx context.Context) ([]StatusRule, error)

func (*PGDocStore) GetStatuses

func (s *PGDocStore) GetStatuses(ctx context.Context) ([]DocumentStatus, error)

func (*PGDocStore) GetVersion

func (s *PGDocStore) GetVersion(
	ctx context.Context, uuid uuid.UUID, version int64,
) (DocumentUpdate, error)

func (*PGDocStore) GetVersionHistory

func (s *PGDocStore) GetVersionHistory(
	ctx context.Context, uuid uuid.UUID,
	before int64, count int,
) ([]DocumentUpdate, error)

func (*PGDocStore) ListReports added in v0.4.5

func (s *PGDocStore) ListReports(
	ctx context.Context,
) ([]ReportListItem, error)

func (*PGDocStore) Lock

func (s *PGDocStore) Lock(ctx context.Context, req LockRequest) (LockResult, error)

func (*PGDocStore) OnArchivedUpdate

func (s *PGDocStore) OnArchivedUpdate(
	ctx context.Context, ch chan ArchivedEvent,
)

OnSchemaUpdate notifies the channel ch of all archived status updates. Subscription is automatically cancelled once the context is cancelled.

Note that we don't provide any delivery guarantees for these events. non-blocking send is used on ch, so if it's unbuffered events will be discarded if the receiver is busy.

func (*PGDocStore) OnEventlog

func (s *PGDocStore) OnEventlog(
	ctx context.Context, ch chan int64,
)

OnEventlog notifies the channel ch of all new eventlog IDs. Subscription is automatically cancelled once the context is cancelled.

Note that we don't provide any delivery guarantees for these events. non-blocking send is used on ch, so if it's unbuffered events will be discarded if the receiver is busy.

func (*PGDocStore) OnSchemaUpdate

func (s *PGDocStore) OnSchemaUpdate(
	ctx context.Context, ch chan SchemaEvent,
)

OnSchemaUpdate notifies the channel ch of all schema updates. Subscription is automatically cancelled once the context is cancelled.

Note that we don't provide any delivery guarantees for these events. non-blocking send is used on ch, so if it's unbuffered events will be discarded if the receiver is busy.

func (*PGDocStore) OnWorkflowUpdate

func (s *PGDocStore) OnWorkflowUpdate(
	ctx context.Context, ch chan WorkflowEvent,
)

OnWorkflowUpdate notifies the channel ch of all workflow updates. Subscription is automatically cancelled once the context is cancelled.

Note that we don't provide any delivery guarantees for these events. non-blocking send is used on ch, so if it's unbuffered events will be discarded if the receiver is busy.

func (*PGDocStore) RegisterMetaType added in v0.4.6

func (s *PGDocStore) RegisterMetaType(
	ctx context.Context, metaType string, exclusive bool,
) error

RegisterMetaType implements DocStore.

func (*PGDocStore) RegisterMetaTypeUse added in v0.4.6

func (s *PGDocStore) RegisterMetaTypeUse(ctx context.Context, mainType string, metaType string) error

RegisterMetaTypeUse implements DocStore.

func (*PGDocStore) RegisterMetricKind

func (s *PGDocStore) RegisterMetricKind(
	ctx context.Context, name string, aggregation Aggregation,
) error

func (*PGDocStore) RegisterOrIncrementMetric

func (s *PGDocStore) RegisterOrIncrementMetric(ctx context.Context, metric Metric) error

RegisterMetric implements MetricStore.

func (*PGDocStore) RegisterOrReplaceMetric

func (s *PGDocStore) RegisterOrReplaceMetric(ctx context.Context, metric Metric) error

RegisterMetric implements MetricStore.

func (*PGDocStore) RegisterSchema

func (s *PGDocStore) RegisterSchema(
	ctx context.Context, req RegisterSchemaRequest,
) error

RegisterSchema implements DocStore.

func (*PGDocStore) RunCleaner added in v0.3.0

func (s *PGDocStore) RunCleaner(ctx context.Context, period time.Duration)

func (*PGDocStore) RunListener

func (s *PGDocStore) RunListener(ctx context.Context)

RunListener opens a connection to the database and subscribes to all store notifications.

func (*PGDocStore) SetSinkPosition

func (s *PGDocStore) SetSinkPosition(ctx context.Context, name string, pos int64) error

func (*PGDocStore) Unlock

func (s *PGDocStore) Unlock(ctx context.Context, uuid uuid.UUID, token string) error

func (*PGDocStore) Update

func (s *PGDocStore) Update(
	ctx context.Context, workflows WorkflowProvider,
	requests []*UpdateRequest,
) ([]DocumentUpdate, error)

Update implements DocStore.

func (*PGDocStore) UpdateLock

func (s *PGDocStore) UpdateLock(ctx context.Context, req UpdateLockRequest) (LockResult, error)

func (*PGDocStore) UpdateReport

func (s *PGDocStore) UpdateReport(
	ctx context.Context, report Report, enabled bool,
) (time.Time, error)

func (*PGDocStore) UpdateStatus

func (s *PGDocStore) UpdateStatus(
	ctx context.Context, req UpdateStatusRequest,
) error

func (*PGDocStore) UpdateStatusRule

func (s *PGDocStore) UpdateStatusRule(
	ctx context.Context, rule StatusRule,
) error

type PGDocStoreOptions

type PGDocStoreOptions struct {
	DeleteTimeout time.Duration
}

type PGReplication

type PGReplication struct {
	// contains filtered or unexported fields
}

func NewPGReplication

func NewPGReplication(
	logger *slog.Logger,
	pool *pgxpool.Pool,
	dbURI string,
	slotName string,
	metricsRegisterer prometheus.Registerer,
) (*PGReplication, error)

func (*PGReplication) Run

func (pr *PGReplication) Run(ctx context.Context)

func (*PGReplication) Started

func (pr *PGReplication) Started() <-chan bool

Started emits true as a signal every time replication has started.

func (*PGReplication) Stop

func (pr *PGReplication) Stop()

type Permission

type Permission string
const (
	ReadPermission      Permission = "r"
	WritePermission     Permission = "w"
	MetaWritePermission Permission = "m"
)

func (Permission) Name

func (p Permission) Name() string

type Queryer

type Queryer interface {
	Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
}

type RegisterSchemaRequest

type RegisterSchemaRequest struct {
	Name          string
	Version       string
	Specification revisor.ConstraintSet
	Activate      bool
}

type Report

type Report struct {
	Name           string        `json:"name"`
	Title          string        `json:"title"`
	CronExpression string        `json:"cron_expression"`
	CronTimezone   string        `json:"cron_timezone"`
	GenerateSheet  bool          `json:"generate_sheet,omitempty"`
	Queries        []ReportQuery `json:"queries"`
	SlackChannels  []string      `json:"slack_channels,omitempty"`
}

func ReportFromRPC

func ReportFromRPC(r *repository.Report) (Report, error)

func (*Report) NextTick

func (r *Report) NextTick() (time.Time, error)

type ReportListItem added in v0.4.5

type ReportListItem struct {
	Name           string `json:"name"`
	Title          string `json:"title"`
	CronExpression string `json:"cron_expression"`
	CronTimezone   string `json:"cron_timezone"`
}

type ReportObject

type ReportObject struct {
	Specification Report    `json:"specification"`
	Tables        []string  `json:"tables"`
	Created       time.Time `json:"created"`
}

type ReportQuery

type ReportQuery struct {
	Name            string                          `json:"name"`
	SQL             string                          `json:"sql"`
	ValueProcessing map[string][]ReportValueProcess `json:"value_processing,omitempty"`
	Summarise       []int                           `json:"summarize,omitempty"`
}

type ReportResult

type ReportResult struct {
	Tables      []string
	Spreadsheet *bytes.Buffer
}

func GenerateReport

func GenerateReport(
	ctx context.Context, logger *slog.Logger, r Report,
	conn Queryer,
) (*ReportResult, error)

type ReportRunner

type ReportRunner struct {
	// contains filtered or unexported fields
}

func NewReportRunner

func NewReportRunner(opts ReportRunnerOptions) (*ReportRunner, error)

func (*ReportRunner) Run

func (r *ReportRunner) Run(ctx context.Context)

func (*ReportRunner) Stop

func (r *ReportRunner) Stop()

type ReportRunnerOptions

type ReportRunnerOptions struct {
	Logger *slog.Logger
	S3     *s3.Client
	Bucket string
	// ReportQueryer should be a read-only connection to the database with
	// access to the tables `document`, `delete_record`, `document_version`,
	// `document_status`, `status_heads`, `acl`, `acl_audit`.
	ReportQueryer Queryer
	// DB should be a normal database connection with full repository
	// access.
	DB                *pgxpool.Pool
	MetricsRegisterer prometheus.Registerer
}

type ReportStore

type ReportStore interface {
	ListReports(
		ctx context.Context,
	) ([]ReportListItem, error)
	UpdateReport(
		ctx context.Context, report Report, enabled bool,
	) (time.Time, error)
	GetReport(
		ctx context.Context, name string,
	) (*StoredReport, error)
	DeleteReport(
		ctx context.Context, name string,
	) error
}

type ReportValueProcess

type ReportValueProcess string
const (
	ReportHTMLDecode ReportValueProcess = "html_decode"
)

type Reporter

type Reporter interface {
	AddHeader(q ReportQuery, columns []string) error
	AddRow(values []any) error
	QueryDone() error
}

type ReportsService

type ReportsService struct {
	// contains filtered or unexported fields
}

func NewReportsService

func NewReportsService(
	logger *slog.Logger, store ReportStore, reportingDB Queryer,
) *ReportsService

func (*ReportsService) Delete added in v0.4.5

Delete a report.

func (*ReportsService) Get

Get a report.

func (*ReportsService) List added in v0.4.5

List all reports.

func (*ReportsService) Run

Test a report. This will run the report and return the results instead of sending it to any outputs.

func (*ReportsService) Test

func (*ReportsService) Update

Update or create a report.

type RouterOption

type RouterOption func(router *httprouter.Router) error

func WithDocumentsAPI

func WithDocumentsAPI(
	service repository.Documents,
	opts ServerOptions,
) RouterOption

func WithJWKSEndpoint

func WithJWKSEndpoint(jwtKey *ecdsa.PrivateKey) RouterOption

func WithMetricsAPI

func WithMetricsAPI(
	service repository.Metrics,
	opts ServerOptions,
) RouterOption

func WithReportsAPI

func WithReportsAPI(
	service repository.Reports,
	opts ServerOptions,
) RouterOption

func WithSSE added in v0.4.4

func WithSSE(
	handler http.Handler,
	opt ServerOptions,
) RouterOption

func WithSchemasAPI

func WithSchemasAPI(
	service repository.Schemas,
	opts ServerOptions,
) RouterOption

func WithTokenEndpoint

func WithTokenEndpoint(
	jwtKey *ecdsa.PrivateKey, sharedSecret string,
) RouterOption

func WithWorkflowsAPI

func WithWorkflowsAPI(
	service repository.Workflows,
	opts ServerOptions,
) RouterOption

type S3Options

type S3Options struct {
	Endpoint        string
	AccessKeyID     string
	AccessKeySecret string
	DisableHTTPS    bool
	HTTPClient      *http.Client
}

type SSE added in v0.4.4

type SSE struct {
	// contains filtered or unexported fields
}

func NewSSE added in v0.4.4

func NewSSE(ctx context.Context, logger *slog.Logger, store DocStore) (*SSE, error)

func (*SSE) HTTPHandler added in v0.4.4

func (s *SSE) HTTPHandler() http.Handler

func (*SSE) Run added in v0.4.4

func (s *SSE) Run(ctx context.Context)

func (*SSE) Stop added in v0.4.4

func (s *SSE) Stop()

type Schema

type Schema struct {
	Name          string
	Version       string
	Specification revisor.ConstraintSet
}

type SchemaEvent

type SchemaEvent struct {
	Type SchemaEventType `json:"type"`
	Name string          `json:"name"`
}

type SchemaEventType

type SchemaEventType int
const (
	SchemaEventTypeActivation SchemaEventType = iota
	SchemaEventTypeDeactivation
)

type SchemaLoader

type SchemaLoader interface {
	GetActiveSchemas(ctx context.Context) ([]*Schema, error)
	OnSchemaUpdate(ctx context.Context, ch chan SchemaEvent)
}

type SchemaStore

type SchemaStore interface {
	RegisterSchema(
		ctx context.Context, req RegisterSchemaRequest,
	) error
	ActivateSchema(
		ctx context.Context, name, version string,
	) error
	DeactivateSchema(
		ctx context.Context, name string,
	) error
	GetSchema(
		ctx context.Context, name, version string,
	) (*Schema, error)
	GetActiveSchemas(ctx context.Context) ([]*Schema, error)
	GetSchemaVersions(ctx context.Context) (map[string]string, error)
	OnSchemaUpdate(ctx context.Context, ch chan SchemaEvent)
	RegisterMetaType(
		ctx context.Context, metaType string, exclusive bool,
	) error
	RegisterMetaTypeUse(
		ctx context.Context, mainType string, metaType string,
	) error
}

type SchemasService

type SchemasService struct {
	// contains filtered or unexported fields
}

func NewSchemasService

func NewSchemasService(logger *slog.Logger, store SchemaStore) *SchemasService

func (*SchemasService) Get

Get retrieves a schema.

func (*SchemasService) GetAllActive

GetAllActiveSchemas returns the currently active schemas.

func (*SchemasService) Register

Register register a new validation schema version.

func (*SchemasService) RegisterMetaType added in v0.4.6

RegisterMetaType implements repository.Schemas.

func (*SchemasService) RegisterMetaTypeUse added in v0.4.6

RegisterMetaTypeUse implements repository.Schemas.

func (*SchemasService) SetActive

SetActive activates schema versions.

type ServerOptions

type ServerOptions struct {
	Hooks          *twirp.ServerHooks
	AuthMiddleware func(
		w http.ResponseWriter, r *http.Request, next http.Handler,
	) error
}

func (*ServerOptions) SetJWTValidation

func (so *ServerOptions) SetJWTValidation(jwtKey *ecdsa.PrivateKey)

type SigningKey

type SigningKey struct {
	Spec *jwk.KeySpec `json:"spec"`

	// Key timestamps
	IssuedAt  time.Time `json:"iat"`
	NotBefore time.Time `json:"nbf"`
}

type SigningKeySet

type SigningKeySet struct {
	Keys []SigningKey `json:"keys"`
	// contains filtered or unexported fields
}

func (*SigningKeySet) CurrentKey

func (s *SigningKeySet) CurrentKey(t time.Time) *SigningKey

func (*SigningKeySet) GetKeyByID

func (s *SigningKeySet) GetKeyByID(kid string) *SigningKey

func (*SigningKeySet) LatestKey

func (s *SigningKeySet) LatestKey() *SigningKey

func (*SigningKeySet) Replace

func (s *SigningKeySet) Replace(keys []SigningKey)

type SpreadsheetReporter

type SpreadsheetReporter struct {
	File *excelize.File
	// contains filtered or unexported fields
}

func NewSpreadsheetReporter

func NewSpreadsheetReporter() *SpreadsheetReporter

func (*SpreadsheetReporter) AddHeader

func (sr *SpreadsheetReporter) AddHeader(q ReportQuery, columns []string) error

func (*SpreadsheetReporter) AddRow

func (sr *SpreadsheetReporter) AddRow(values []any) error

func (*SpreadsheetReporter) QueryDone

func (sr *SpreadsheetReporter) QueryDone() error

type Status

type Status struct {
	ID             int64
	Version        int64
	Creator        string
	Created        time.Time
	Meta           newsdoc.DataMap
	MetaDocVersion int64
}

type StatusRule

type StatusRule struct {
	Name        string
	Description string
	AccessRule  bool
	AppliesTo   []string
	ForTypes    []string
	Expression  string
}

type StatusRuleError

type StatusRuleError struct {
	Violations []StatusRuleViolation
}

func (StatusRuleError) Error

func (err StatusRuleError) Error() string

type StatusRuleInput

type StatusRuleInput struct {
	Name        string
	Status      Status
	Update      DocumentUpdate
	Document    newsdoc.Document
	VersionMeta newsdoc.DataMap
	Heads       map[string]Status
	User        elephantine.JWTClaims
}

type StatusRuleViolation

type StatusRuleViolation struct {
	Name            string
	Description     string
	Error           string
	AccessViolation bool
}

type StatusUpdate

type StatusUpdate struct {
	Name    string
	Version int64
	Meta    newsdoc.DataMap
}

func RPCToStatusUpdate

func RPCToStatusUpdate(update []*repository.StatusUpdate) []StatusUpdate

type StoredReport

type StoredReport struct {
	Report        Report
	Enabled       bool
	NextExecution time.Time
}

type TableReporter

type TableReporter struct {
	Tables []table.Writer
	// contains filtered or unexported fields
}

func NewTableReporter

func NewTableReporter() *TableReporter

func (*TableReporter) AddHeader

func (tr *TableReporter) AddHeader(q ReportQuery, columns []string) error

func (*TableReporter) AddRow

func (tr *TableReporter) AddRow(values []any) error

func (*TableReporter) QueryDone

func (tr *TableReporter) QueryDone() error

type TokenResponse

type TokenResponse struct {
	AccessToken  string `json:"access_token"`
	TokenType    string `json:"token_type"`
	ExpiresIn    int    `json:"expires_in"`
	RefreshToken string `json:"refresh_token"`
}

type TupleDecoder

type TupleDecoder struct {
	// contains filtered or unexported fields
}

func NewTupleDecoder

func NewTupleDecoder() *TupleDecoder

func (*TupleDecoder) DecodeValues

func (td *TupleDecoder) DecodeValues(
	relation uint32, tuple *pglogrepl.TupleData,
) (*pglogrepl.RelationMessage, map[string]interface{}, error)

func (*TupleDecoder) GetRelation

func (td *TupleDecoder) GetRelation(id uint32) (*pglogrepl.RelationMessage, bool)

func (*TupleDecoder) RegisterRelation

func (td *TupleDecoder) RegisterRelation(rel *pglogrepl.RelationMessage)

type UpdateLockRequest

type UpdateLockRequest struct {
	UUID  uuid.UUID
	TTL   int32
	Token string
}

type UpdateRequest

type UpdateRequest struct {
	UUID         uuid.UUID
	Updated      time.Time
	Updater      string
	Meta         newsdoc.DataMap
	ACL          []ACLEntry
	DefaultACL   []ACLEntry
	Status       []StatusUpdate
	Document     *newsdoc.Document
	MainDocument *uuid.UUID
	IfMatch      int64
	LockToken    string
}

type UpdateStatusRequest

type UpdateStatusRequest struct {
	Name     string
	Disabled bool
}

type Validator

type Validator struct {
	// contains filtered or unexported fields
}

func NewValidator

func NewValidator(
	ctx context.Context, logger *slog.Logger, loader SchemaLoader,
) (*Validator, error)

func (*Validator) GetValidator

func (v *Validator) GetValidator() *revisor.Validator

func (*Validator) ValidateDocument

func (v *Validator) ValidateDocument(ctx context.Context, document *newsdoc.Document) []revisor.ValidationResult

type WorkflowEvent

type WorkflowEvent struct {
	Type WorkflowEventType `json:"type"`
	Name string            `json:"name"`
}

type WorkflowEventType

type WorkflowEventType int
const (
	WorkflowEventTypeStatusChange WorkflowEventType = iota
	WorkflowEventTypeStatusRuleChange
)

type WorkflowLoader

type WorkflowLoader interface {
	GetStatuses(ctx context.Context) ([]DocumentStatus, error)
	GetStatusRules(ctx context.Context) ([]StatusRule, error)
	OnWorkflowUpdate(ctx context.Context, ch chan WorkflowEvent)
}

type WorkflowProvider

type WorkflowProvider interface {
	HasStatus(name string) bool
	EvaluateRules(input StatusRuleInput) []StatusRuleViolation
}

type WorkflowStore

type WorkflowStore interface {
	UpdateStatus(
		ctx context.Context, req UpdateStatusRequest,
	) error
	GetStatuses(ctx context.Context) ([]DocumentStatus, error)
	UpdateStatusRule(
		ctx context.Context, rule StatusRule,
	) error
	DeleteStatusRule(
		ctx context.Context, name string,
	) error
	GetStatusRules(ctx context.Context) ([]StatusRule, error)
}

type Workflows

type Workflows struct {
	// contains filtered or unexported fields
}

func NewWorkflows

func NewWorkflows(
	ctx context.Context, logger *slog.Logger, loader WorkflowLoader,
) (*Workflows, error)

func (*Workflows) EvaluateRules

func (w *Workflows) EvaluateRules(
	input StatusRuleInput,
) []StatusRuleViolation

func (*Workflows) HasStatus

func (w *Workflows) HasStatus(name string) bool

type WorkflowsService

type WorkflowsService struct {
	// contains filtered or unexported fields
}

func NewWorkflowsService

func NewWorkflowsService(store WorkflowStore) *WorkflowsService

func (*WorkflowsService) CreateStatusRule

CreateStatusRule creates or updates a status rule that should be applied when setting statuses.

func (*WorkflowsService) DeleteStatusRule

DeleteStatusRule removes a status rule.

func (*WorkflowsService) GetStatusRules

GetStatusRules returns all status rules.

func (*WorkflowsService) GetStatuses

GetStatuses lists all enabled statuses.

func (*WorkflowsService) UpdateStatus

UpdateStatus creates or updates a status that can be used for documents.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL