Documentation ¶
Overview ¶
Package reconciler syncs the state of resource bundles between the cloud and the databroker.
Index ¶
- Constants
- Variables
- func ApplyChanges(ctx context.Context, client databroker.DataBrokerServiceClient, ...) error
- func EqualRecord(a, b *databroker.Record) bool
- func GetDatabrokerRecords(ctx context.Context, client databroker.DataBrokerServiceClient, types []string) (databroker.RecordSetBundle, error)
- func ReadBundleRecords(src io.Reader) (databroker.RecordSetBundle, error)
- func Run(ctx context.Context, opts ...Option) error
- type BundleCacheEntry
- func (r *BundleCacheEntry) Equals(other *BundleCacheEntry) bool
- func (r *BundleCacheEntry) FromAny(any *anypb.Any) error
- func (r *BundleCacheEntry) GetDownloadConditional() *zero_sdk.DownloadConditional
- func (r *BundleCacheEntry) GetRecordTypes() []string
- func (r *BundleCacheEntry) ToAny() (*anypb.Any, error)
- func (r *BundleCacheEntry) Validate() error
- type BundleQueue
- type DatabrokerChangeSet
- type Option
- func WithAPI(client *sdk.API) Option
- func WithCheckForUpdateIntervalWhenConnected(interval time.Duration) Option
- func WithCheckForUpdateIntervalWhenDisconnected(interval time.Duration) Option
- func WithDataBrokerClient(client databroker.DataBrokerServiceClient) Option
- func WithDatabrokerRPSLimit(rps int) Option
- func WithDownloadHTTPClient(client *http.Client) Option
- func WithSyncBackoffMaxInterval(interval time.Duration) Option
- func WithTemporaryDirectory(path string) Option
- type ReadWriteSeekCloser
Constants ¶
const ( // BundleStatusFailureDatabrokerError indicates a failure due to a databroker error BundleStatusFailureDatabrokerError = cluster_api.DatabrokerError // BundleStatusFailureDownloadError indicates a failure due to a download error BundleStatusFailureDownloadError = cluster_api.DownloadError // BundleStatusFailureInvalidBundle indicates a failure due to an invalid bundle BundleStatusFailureInvalidBundle = cluster_api.InvalidBundle // BundleStatusFailureIO indicates a failure due to an IO error BundleStatusFailureIO = cluster_api.IoError // BundleStatusFailureUnknownError indicates a failure due to an unknown error BundleStatusFailureUnknownError = cluster_api.UnknownError )
const (
// BundleCacheEntryRecordType is the databroker record type for BundleCacheEntry
BundleCacheEntryRecordType = "pomerium.io/BundleCacheEntry"
)
Variables ¶
var ErrBundleCacheEntryNotFound = errors.New("bundle cache entry not found")
ErrBundleCacheEntryNotFound is returned when a bundle cache entry is not found
Functions ¶
func ApplyChanges ¶
func ApplyChanges(ctx context.Context, client databroker.DataBrokerServiceClient, changes *DatabrokerChangeSet) error
ApplyChanges applies the changes to the databroker.
func EqualRecord ¶
func EqualRecord(a, b *databroker.Record) bool
EqualRecord returns true if the databroker records are equal.
func GetDatabrokerRecords ¶
func GetDatabrokerRecords( ctx context.Context, client databroker.DataBrokerServiceClient, types []string, ) (databroker.RecordSetBundle, error)
GetDatabrokerRecords gets all databroker records of the given types.
func ReadBundleRecords ¶
func ReadBundleRecords(src io.Reader) (databroker.RecordSetBundle, error)
ReadBundleRecords reads records in a protobuf wire format from src. Each record is expected to be a databroker.Record.
Types ¶
type BundleCacheEntry ¶
type BundleCacheEntry struct { zero_sdk.DownloadConditional RecordTypes []string }
BundleCacheEntry is a cache entry for a bundle that is kept in the databroker to avoid downloading the same bundle multiple times.
by using the ETag and LastModified headers, we do not need to keep caches of the bundles themselves, which can be large.
also it works in case of multiple instances, as it uses the databroker database as a shared cache.
func (*BundleCacheEntry) Equals ¶
func (r *BundleCacheEntry) Equals(other *BundleCacheEntry) bool
Equals returns true if the two cache entries are equal
func (*BundleCacheEntry) FromAny ¶
func (r *BundleCacheEntry) FromAny(any *anypb.Any) error
FromAny unmarshals an anypb.Any into a BundleCacheEntry
func (*BundleCacheEntry) GetDownloadConditional ¶
func (r *BundleCacheEntry) GetDownloadConditional() *zero_sdk.DownloadConditional
GetDownloadConditional returns conditional download information
func (*BundleCacheEntry) GetRecordTypes ¶
func (r *BundleCacheEntry) GetRecordTypes() []string
GetRecordTypes returns the record types
func (*BundleCacheEntry) ToAny ¶
func (r *BundleCacheEntry) ToAny() (*anypb.Any, error)
ToAny marshals a BundleCacheEntry into an anypb.Any
func (*BundleCacheEntry) Validate ¶
func (r *BundleCacheEntry) Validate() error
Validate validates a BundleCacheEntry
type BundleQueue ¶
BundleQueue is a priority queue of bundles to sync.
func (*BundleQueue) GetNextBundleToSync ¶
func (b *BundleQueue) GetNextBundleToSync() (string, bool)
GetNextBundleToSync returns the ID of the next bundle to sync and whether there is one.
func (*BundleQueue) MarkForSync ¶
func (b *BundleQueue) MarkForSync(id string)
MarkForSync marks the bundle with the given ID for syncing.
func (*BundleQueue) MarkForSyncLater ¶
func (b *BundleQueue) MarkForSyncLater(id string)
MarkForSyncLater marks the bundle with the given ID for syncing later (after all other bundles).
func (*BundleQueue) Set ¶
func (b *BundleQueue) Set(bundles []string)
Set sets the bundles to be synced. This will reset the sync status of all bundles.
type DatabrokerChangeSet ¶
type DatabrokerChangeSet struct {
// contains filtered or unexported fields
}
DatabrokerChangeSet is a set of databroker changes.
func NewDatabrokerChangeSet ¶
func NewDatabrokerChangeSet() *DatabrokerChangeSet
NewDatabrokerChangeSet creates a new databroker change set.
func (*DatabrokerChangeSet) Remove ¶
func (cs *DatabrokerChangeSet) Remove(typ string, id string)
Remove adds a record to the change set.
func (*DatabrokerChangeSet) Upsert ¶
func (cs *DatabrokerChangeSet) Upsert(record *databroker.Record)
Upsert adds a record to the change set.
type Option ¶
type Option func(*reconcilerConfig)
Option configures the resource bundles reconciler
func WithCheckForUpdateIntervalWhenConnected ¶
WithCheckForUpdateIntervalWhenConnected configures the interval at which the reconciler will check for updates when connected to the cloud.
func WithCheckForUpdateIntervalWhenDisconnected ¶
WithCheckForUpdateIntervalWhenDisconnected configures the interval at which the reconciler will check for updates when disconnected from the cloud.
func WithDataBrokerClient ¶
func WithDataBrokerClient(client databroker.DataBrokerServiceClient) Option
WithDataBrokerClient configures the databroker client.
func WithDatabrokerRPSLimit ¶
WithDatabrokerRPSLimit configures the maximum number of requests per second to the databroker.
func WithDownloadHTTPClient ¶
WithDownloadHTTPClient configures the http client used for downloading files.
func WithSyncBackoffMaxInterval ¶
WithSyncBackoffMaxInterval configures the maximum interval between sync attempts.
func WithTemporaryDirectory ¶
WithTemporaryDirectory configures the resource bundles client to use a temporary directory for downloading files.
type ReadWriteSeekCloser ¶
type ReadWriteSeekCloser interface { io.ReadWriteSeeker io.Closer }
ReadWriteSeekCloser is a file that can be read, written, seeked, and closed.