filer

package
v0.0.0-...-2206af0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2022 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LogFlushInterval = time.Minute
	PaginationSize   = 1024 * 256
	FilerStoreId     = "filer.store.id"
)
View Source
const (
	TopicsDir    = "/topics"
	SystemLogDir = TopicsDir + "/.system/log"
)
View Source
const (
	ManifestBatch = 1000
)
View Source
const (
	MetaOffsetPrefix = "Meta"
)

Variables

View Source
var (
	OS_UID = uint32(os.Getuid())
	OS_GID = uint32(os.Getgid())
)
View Source
var (
	ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
	ErrKvNotImplemented                 = errors.New("kv not implemented yet")
	ErrKvNotFound                       = errors.New("kv: not found")
)
View Source
var (
	Stores []FilerStore
)

Functions

func CompactFileChunks

func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk)

func DoMinusChunks

func DoMinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk)

func ETag

func ETag(entry *filer_pb.Entry) (etag string)

func ETagChunks

func ETagChunks(chunks []*filer_pb.FileChunk) (etag string)

func ETagEntry

func ETagEntry(entry *Entry) (etag string)

func EntryAttributeToPb

func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes

func EqualEntry

func EqualEntry(a, b *Entry) bool

func FileSize

func FileSize(entry *filer_pb.Entry) (size uint64)

func HasChunkManifest

func HasChunkManifest(chunks []*filer_pb.FileChunk) bool

func LookupByMasterClientFn

func LookupByMasterClientFn(masterClient *wdclient.MasterClient) func(vids []string) (map[string]operation.LookupResult, error)

func MaybeManifestize

func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error)

func MinusChunks

func MinusChunks(lookupFileIdFn LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error)

func ReadAll

func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error)

func ReadEachLogEntry

func ReadEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error)

func Replay

func Replay(filerStore FilerStore, resp *filer_pb.SubscribeMetadataResponse) error

func ResolveChunkManifest

func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error)

func ResolveOneChunkManifest

func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error)

func SeparateManifestChunks

func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonManifestChunks []*filer_pb.FileChunk)

func StreamContent

func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error

func TotalSize

func TotalSize(chunks []*filer_pb.FileChunk) (size uint64)

func VolumeId

func VolumeId(fileId string) string

Types

type Attr

type Attr struct {
	Mtime         time.Time   // time of last modification
	Crtime        time.Time   // time of creation (OS X only)
	Mode          os.FileMode // file mode
	Uid           uint32      // owner uid
	Gid           uint32      // group gid
	Mime          string      // mime type
	Replication   string      // replication
	Collection    string      // collection name
	TtlSec        int32       // ttl in seconds
	UserName      string
	GroupNames    []string
	SymlinkTarget string
	Md5           []byte
	FileSize      uint64
}

func PbToEntryAttribute

func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr

func (Attr) IsDirectory

func (attr Attr) IsDirectory() bool

type BucketName

type BucketName string

type BucketOption

type BucketOption struct {
	Name        BucketName
	Replication string
	// contains filtered or unexported fields
}

type ChunkReadAt

type ChunkReadAt struct {
	// contains filtered or unexported fields
}

func NewChunkReaderAtFromClient

func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt

func (*ChunkReadAt) ReadAt

func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error)

type ChunkStreamReader

type ChunkStreamReader struct {
	// contains filtered or unexported fields
}

---------------- ChunkStreamReader ----------------------------------

func NewChunkStreamReader

func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader

func NewChunkStreamReaderFromFiler

func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader

func (*ChunkStreamReader) Close

func (c *ChunkStreamReader) Close()

func (*ChunkStreamReader) Read

func (c *ChunkStreamReader) Read(p []byte) (n int, err error)

func (*ChunkStreamReader) Seek

func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error)

type ChunkView

type ChunkView struct {
	FileId      string
	Offset      int64
	Size        uint64
	LogicOffset int64 // actual offset in the file, for the data specified via [offset, offset+size) in current chunk
	ChunkSize   uint64
	CipherKey   []byte
	IsGzipped   bool
}

func ViewFromChunks

func ViewFromChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView)

func ViewFromVisibleIntervals

func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int64) (views []*ChunkView)

func (*ChunkView) IsFullChunk

func (cv *ChunkView) IsFullChunk() bool

type Entry

type Entry struct {
	util.FullPath

	Attr
	Extended map[string][]byte

	// the following is for files
	Chunks []*filer_pb.FileChunk `json:"chunks,omitempty"`
}

func FromPbEntry

func FromPbEntry(dir string, entry *filer_pb.Entry) *Entry

func (*Entry) DecodeAttributesAndChunks

func (entry *Entry) DecodeAttributesAndChunks(blob []byte) error

func (*Entry) EncodeAttributesAndChunks

func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error)

func (*Entry) Size

func (entry *Entry) Size() uint64

func (*Entry) Timestamp

func (entry *Entry) Timestamp() time.Time

func (*Entry) ToProtoEntry

func (entry *Entry) ToProtoEntry() *filer_pb.Entry

func (*Entry) ToProtoFullEntry

func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry

type Filer

type Filer struct {
	Store        *FilerStoreWrapper
	MasterClient *wdclient.MasterClient

	GrpcDialOption grpc.DialOption
	DirBucketsPath string
	FsyncBuckets   []string

	Cipher             bool
	LocalMetaLogBuffer *log_buffer.LogBuffer

	MetaAggregator *MetaAggregator
	Signature      int32
	// contains filtered or unexported fields
}

func NewFiler

func NewFiler(masters []string, grpcDialOption grpc.DialOption,
	filerHost string, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer

func (*Filer) AggregateFromPeers

func (f *Filer) AggregateFromPeers(self string, filers []string)

func (*Filer) BeginTransaction

func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error)

func (*Filer) CommitTransaction

func (f *Filer) CommitTransaction(ctx context.Context) error

func (*Filer) CreateEntry

func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32) error

func (*Filer) DeleteChunks

func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk)

func (*Filer) DeleteEntryMetaAndData

func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (err error)

func (*Filer) FindEntry

func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error)

func (*Filer) GetMaster

func (fs *Filer) GetMaster() string

func (*Filer) GetStore

func (f *Filer) GetStore() (store FilerStore)

func (*Filer) KeepConnectedToMaster

func (fs *Filer) KeepConnectedToMaster()

func (*Filer) ListDirectoryEntries

func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) ([]*Entry, error)

func (*Filer) LoadBuckets

func (f *Filer) LoadBuckets()

func (*Filer) LoadConfiguration

func (f *Filer) LoadConfiguration(config *viper.Viper)

func (*Filer) NotifyUpdateEvent

func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry, deleteChunks, isFromOtherCluster bool, signatures []int32)

func (*Filer) ReadBucketOption

func (f *Filer) ReadBucketOption(buketName string) (replication string, fsync bool)

func (*Filer) ReadPersistedLogBuffer

func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error)

func (*Filer) RollbackTransaction

func (f *Filer) RollbackTransaction(ctx context.Context) error

func (*Filer) SetStore

func (f *Filer) SetStore(store FilerStore)

func (*Filer) Shutdown

func (f *Filer) Shutdown()

func (*Filer) UpdateEntry

func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error)

type FilerBuckets

type FilerBuckets struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

type FilerStore

type FilerStore interface {
	// GetName gets the name to locate the configuration in filer.toml file
	GetName() string
	// Initialize initializes the file store
	Initialize(configuration util.Configuration, prefix string) error
	InsertEntry(context.Context, *Entry) error
	UpdateEntry(context.Context, *Entry) (err error)
	// err == filer2.ErrNotFound if not found
	FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
	DeleteEntry(context.Context, util.FullPath) (err error)
	DeleteFolderChildren(context.Context, util.FullPath) (err error)
	ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
	ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error)

	BeginTransaction(ctx context.Context) (context.Context, error)
	CommitTransaction(ctx context.Context) error
	RollbackTransaction(ctx context.Context) error

	KvPut(ctx context.Context, key []byte, value []byte) (err error)
	KvGet(ctx context.Context, key []byte) (value []byte, err error)
	KvDelete(ctx context.Context, key []byte) (err error)

	Shutdown()
}

type FilerStoreWrapper

type FilerStoreWrapper struct {
	ActualStore FilerStore
}

func NewFilerStoreWrapper

func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper

func (*FilerStoreWrapper) BeginTransaction

func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error)

func (*FilerStoreWrapper) CommitTransaction

func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error

func (*FilerStoreWrapper) DeleteEntry

func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error)

func (*FilerStoreWrapper) DeleteFolderChildren

func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error)

func (*FilerStoreWrapper) FindEntry

func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error)

func (*FilerStoreWrapper) GetName

func (fsw *FilerStoreWrapper) GetName() string

func (*FilerStoreWrapper) Initialize

func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error

func (*FilerStoreWrapper) InsertEntry

func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error

func (*FilerStoreWrapper) KvDelete

func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error)

func (*FilerStoreWrapper) KvGet

func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error)

func (*FilerStoreWrapper) KvPut

func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error)

func (*FilerStoreWrapper) ListDirectoryEntries

func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)

func (*FilerStoreWrapper) ListDirectoryPrefixedEntries

func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error)

func (*FilerStoreWrapper) RollbackTransaction

func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error

func (*FilerStoreWrapper) Shutdown

func (fsw *FilerStoreWrapper) Shutdown()

func (*FilerStoreWrapper) UpdateEntry

func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error

type LookupFileIdFunctionType

type LookupFileIdFunctionType func(fileId string) (targetUrl string, err error)

func LookupFn

func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType

type MetaAggregator

type MetaAggregator struct {
	MetaLogBuffer *log_buffer.LogBuffer
	// notifying clients
	ListenersLock sync.Mutex
	ListenersCond *sync.Cond
	// contains filtered or unexported fields
}

func NewMetaAggregator

func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAggregator

MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk. The old data comes from what each LocalMetadata persisted on disk.

func (*MetaAggregator) StartLoopSubscribe

func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string)

type SaveDataAsChunkFunctionType

type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error)

type VisibleInterval

type VisibleInterval struct {
	// contains filtered or unexported fields
}

func MergeIntoVisibles

func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (newVisibles []VisibleInterval)

func NonOverlappingVisibleIntervals

func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error)

NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory If the file chunk content is a chunk manifest

Directories

Path Synopsis
elastic
v7

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL