core: go.gazette.dev/core/broker/fragment Index | Files

package fragment

import "go.gazette.dev/core/broker/fragment"

Package fragment is a broker-only package concerned with the mapping of journal offsets to protocol.Fragments, and from there to corresponding local or remote journal content.

It implements file-like operations over the FragmentStore schemes supported by Gazette, such as listing, opening, signing, persisting, and removing fragments. See FileStoreConfig, S3StoreConfig, and GSStoreConfig for further configuration of store operations.

The package implements a Fragment wrapper type which composes a protocol.Fragment with an open file descriptor, and an Index over local or remote Fragments which maps a journal offset to a best-covering Fragment.

Spool is a Fragment which is in the process of being constructed from an ongoing broker Replicate RPC. It is the transactional "memory" of brokers which are participating in the replication of a journal. Once closed, or "rolled", a Spool Fragment is persisted to its configured FragmentStore by a Persister.

Index

Package Files

cover_set.go doc.go fragment.go index.go persister.go spool.go spool_unix.go store_fs.go store_gcs.go store_s3.go stores.go

Variables

var FileSystemStoreRoot = "/dev/null/must/configure/file/store/root"

FileSystemStoreRoot is the filesystem path which roots fragment ContentPaths of a file:// fragment store. It must be set at program startup prior to use.

func List Uses

func List(ctx context.Context, store pb.FragmentStore, name pb.Journal, callback func(pb.Fragment)) error

List Fragments of the FragmentStore for a given journal. |callback| is invoked with each listed Fragment, and any returned error aborts the listing.

func Open Uses

func Open(ctx context.Context, fragment pb.Fragment) (io.ReadCloser, error)

Open a Reader of the Fragment on the store. The returned ReadCloser does not perform any applicable client-side decompression, but does request server decompression in the case of GZIP_OFFLOAD_DECOMPRESSION.

func Persist Uses

func Persist(ctx context.Context, spool Spool, spec *pb.JournalSpec) error

Persist a Spool to the JournalSpec's store. If the Spool Fragment is already present, this is a no-op. If the Spool has not been compressed incrementally, it will be compressed before being persisted.

func Remove Uses

func Remove(ctx context.Context, fragment pb.Fragment) error

Remove |fragment| from its BackingStore.

func SignGetURL Uses

func SignGetURL(fragment pb.Fragment, d time.Duration) (string, error)

SignGetURL returns a URL authenticating the bearer to perform a GET operation of the Fragment for the provided Duration from the current time.

type CoverSet Uses

type CoverSet []Fragment

CoverSet maintains Fragments ordered on |Begin| and |End|, with the invariant that no Fragment is fully overlapped by another Fragment in the set (though it may be overlapped by a combination of other Fragments). Intuitively, CoverSet represents the set of offsets which are "covered" by a collection of Fragments, and is able to map, for each byte offset, a "best" covering Fragment. It employs a heuristic of preferring larger fragments (and will replace spans of overlapped smaller fragments). An implication of its invariant is that no two Fragments have the same |Begin| or |End| (as that would imply an overlap). Both are monotonically increasing in the set: set[0].Begin represents the minimum offset, and set[len(set)-1].End represents the maximum offset.

func CoverSetDifference Uses

func CoverSetDifference(a, b CoverSet) CoverSet

CoverSetDifference returns the subset of Fragments in |a| which cover byte offsets not also covered by Fragments in |b|.

func WalkAllStores Uses

func WalkAllStores(ctx context.Context, name pb.Journal, stores []pb.FragmentStore) (CoverSet, error)

WalkAllStores enumerates Fragments from each of |stores| into the returned CoverSet, or returns an encountered error.

func (CoverSet) Add Uses

func (s CoverSet) Add(fragment Fragment) (CoverSet, bool)

Add the Fragment to the CoverSet. The CoverSet is returned, along with an indication of whether an offset span was updated to reflect Fragment. All updates occur in-place.

func (CoverSet) BeginOffset Uses

func (s CoverSet) BeginOffset() int64

BeginOffset returns the first (lowest) Begin offset of any Fragment in the CoverSet.

func (CoverSet) EndOffset Uses

func (s CoverSet) EndOffset() int64

EndOffset returns the last (largest) End offset of any Fragment in the set.

func (CoverSet) LongestOverlappingFragment Uses

func (s CoverSet) LongestOverlappingFragment(offset int64) (ind int, found bool)

LongestOverlappingFragment finds and returns the index |ind| of the Fragment covering |offset| which also has the most content following |offset|. If no fragment covers |offset|, the index of the next Fragment beginning after |offset| is returned (which may be beyond the current CoverSet range). |found| indicates whether an overlapping Fragment was found.

type File Uses

type File interface {
    io.ReaderAt
    io.Seeker
    io.WriterAt
    io.Writer
    io.Closer
}

File is the subset of os.File used in backing Fragments with local files.

type FileStoreConfig Uses

type FileStoreConfig struct {
    RewriterConfig
}

FileStoreConfig configures a Fragment store of the "file://" scheme. It is initialized from parsed URL parameters of the pb.FragmentStore.

type Fragment Uses

type Fragment struct {
    protocol.Fragment
    // Local uncompressed file of the Fragment, or nil iff the Fragment is remote.
    File File
}

Fragment wraps the protocol.Fragment type with a nil-able backing local File.

type GSStoreConfig Uses

type GSStoreConfig struct {
    RewriterConfig
    // contains filtered or unexported fields
}

GSStoreConfig configures a Fragment store of the "gs://" scheme. It is initialized from parsed URL parameters of the pb.FragmentStore.

type Index Uses

type Index struct {
    // contains filtered or unexported fields
}

Index maintains a queryable index of local and remote journal Fragments.

func NewIndex Uses

func NewIndex(ctx context.Context) *Index

NewIndex returns a new, empty Index.

func (*Index) EndOffset Uses

func (fi *Index) EndOffset() int64

EndOffset returns the last (largest) End offset in the index.

func (*Index) FirstRefreshCh Uses

func (fi *Index) FirstRefreshCh() <-chan struct{}

FirstRefreshCh returns a channel which signals if the Index as been refreshed with a remote store(s) listing at least once.

func (*Index) Inspect Uses

func (fi *Index) Inspect(ctx context.Context, callback func(CoverSet) error) error

Inspect invokes the callback with a snapshot of all fragments in the Index. The callback must not modify the CoverSet, and during callback invocation no changes will be made to it. If an initial refresh of remote fragment store(s) hasn't yet been applied, Inspect will first block until it does (or context cancellation).

func (*Index) Query Uses

func (fi *Index) Query(ctx context.Context, req *pb.ReadRequest) (*pb.ReadResponse, File, error)

Query the Index for a Fragment matching the ReadRequest.

func (*Index) ReplaceRemote Uses

func (fi *Index) ReplaceRemote(set CoverSet)

ReplaceRemote replaces all remote Fragments in the index with |set|.

func (*Index) SpoolCommit Uses

func (fi *Index) SpoolCommit(frag Fragment)

SpoolCommit adds local Spool Fragment |frag| to the index.

type Persister Uses

type Persister struct {
    // contains filtered or unexported fields
}

Persister asynchronously persists completed Fragments to their backing pb.FragmentStore.

func NewPersister Uses

func NewPersister(ks *keyspace.KeySpace) *Persister

NewPersister returns an empty, initialized Persister.

func (*Persister) Finish Uses

func (p *Persister) Finish()

func (*Persister) Serve Uses

func (p *Persister) Serve()

func (*Persister) SpoolComplete Uses

func (p *Persister) SpoolComplete(spool Spool, primary bool)

type RewriterConfig Uses

type RewriterConfig struct {
    // Find is the string to replace in the unmodified journal name.
    Find string
    // Replace is the string with which Find is replaced in the constructed store path.
    Replace string
}

RewriterConfig rewrites the path under which journal fragments are stored by finding and replacing a portion of the journal's name in the final constructed path. Its use is uncommon and not recommended, but it can help in the implementation of new journal naming taxonomies which don't disrupt journal fragments that are already written.

var cfg = RewriterConfig{
    Replace: "/old-path/page-views/
    Find:    "/bar/v1/page-views/",
}
// Remaps journal name => fragment store URL:
//  "/foo/bar/v1/page-views/part-000" => "s3://my-bucket/foo/old-path/page-views/part-000" // Matched.
//  "/foo/bar/v2/page-views/part-000" => "s3://my-bucket/foo/bar/v2/page-views/part-000"   // Not matched.

type S3StoreConfig Uses

type S3StoreConfig struct {
    RewriterConfig
    // AWS Profile to extract credentials from the shared credentials file.
    // For details, see:
    //   https://aws.amazon.com/blogs/security/a-new-and-standardized-way-to-manage-credentials-in-the-aws-sdks/
    // If empty, the default credentials are used.
    Profile string
    // Endpoint to connect to S3. If empty, the default S3 service is used.
    Endpoint string
    // ACL applied when persisting new fragments. By default, this is
    // s3.ObjectCannedACLBucketOwnerFullControl.
    ACL string
    // Storage class applied when persisting new fragments. By default,
    // this is s3.ObjectStorageClassStandard.
    StorageClass string
    // SSE is the server-side encryption type to be applied (eg, "AES256").
    // By default, encryption is not used.
    SSE string
    // contains filtered or unexported fields
}

S3StoreConfig configures a Fragment store of the "s3://" scheme. It is initialized from parsed URL parameters of the pb.FragmentStore.

type Spool Uses

type Spool struct {
    // Fragment at time of last commit.
    Fragment
    // FirstAppendTime is the UTC Time of the first commit of the Spool Fragment.
    FirstAppendTime time.Time
    // Registers of the journal.
    Registers pb.LabelSet
    // contains filtered or unexported fields
}

Spool is a Fragment which is in the process of being created, backed by a local *os.File. As commits occur and the file extent is updated, the Spool Fragment is also updated to reflect the new committed extent. At all times, the Spool Fragment is a consistent, valid Fragment.

func NewSpool Uses

func NewSpool(journal pb.Journal, observer SpoolObserver) Spool

NewSpool returns an empty Spool of |journal|.

func (*Spool) Apply Uses

func (s *Spool) Apply(r *pb.ReplicateRequest, primary bool) (pb.ReplicateResponse, error)

Apply the ReplicateRequest to the Spool, returning any encountered error.

func (*Spool) MustApply Uses

func (s *Spool) MustApply(r *pb.ReplicateRequest)

MustApply applies the ReplicateRequest, and panics if a !OK status is returned or error occurs. MustApply is a convenience for cases such as rollbacks, where the request is derived from the Spool itself and cannot reasonably fail.

func (*Spool) Next Uses

func (s *Spool) Next() pb.Fragment

Next returns the next Fragment which can be committed by the Spool.

func (Spool) String Uses

func (s Spool) String() string

String returns a debugging representation of the Spool.

type SpoolObserver Uses

type SpoolObserver interface {
    // SpoolCommit is called when the Spool Fragment is extended.
    SpoolCommit(Fragment)
    // SpoolComplete is called when the Spool has been completed.
    SpoolComplete(_ Spool, primary bool)
}

SpoolObserver is notified of important events in the Spool lifecycle.

Package fragment imports 35 packages (graph) and is imported by 8 packages. Updated 2020-07-15. Refresh now. Tools for package owners.