filesessions

package
v0.0.0-...-5c79d48 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2024 License: AGPL-3.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetOpenFileFunc

func GetOpenFileFunc() utils.OpenFileWithFlagsFunc

GetOpenFileFunc gets the OpenFileWithFlagsFunc set in the package.

TODO(gabrielcorado): remove this global variable.

func NewStreamer

func NewStreamer(dir string) (*events.ProtoStreamer, error)

NewStreamer creates a streamer sending uploads to disk

func SetOpenFileFunc

func SetOpenFileFunc(f utils.OpenFileWithFlagsFunc)

SetOpenFileFunc sets the OpenFileWithFlagsFunc used by the package.

TODO(gabrielcorado): remove this global variable.

Types

type Config

type Config struct {
	// Directory is a directory with files
	Directory string
	// OnBeforeComplete can be used to inject failures during tests
	OnBeforeComplete func(ctx context.Context, upload events.StreamUpload) error
}

Config is a file uploader configuration

func (*Config) CheckAndSetDefaults

func (s *Config) CheckAndSetDefaults() error

CheckAndSetDefaults checks and sets default values of file handler config

type Handler

type Handler struct {
	// Config is a file sessions config
	Config
	// Entry is a file entry
	*log.Entry
}

Handler uploads and downloads sessions archives by reading and writing files to directory, useful for NFS setups and tests

func NewHandler

func NewHandler(cfg Config) (*Handler, error)

NewHandler returns new file sessions handler

func (*Handler) Close

func (l *Handler) Close() error

Closer releases connection and resources associated with log if any

func (*Handler) CompleteUpload

func (h *Handler) CompleteUpload(ctx context.Context, upload events.StreamUpload, parts []events.StreamPart) error

CompleteUpload completes the upload

func (*Handler) CreateUpload

func (h *Handler) CreateUpload(ctx context.Context, sessionID session.ID) (*events.StreamUpload, error)

CreateUpload creates a multipart upload

func (*Handler) Download

func (l *Handler) Download(ctx context.Context, sessionID session.ID, writer io.WriterAt) error

Download downloads session recording from storage, in case of file handler reads the file from local directory

func (*Handler) GetUploadMetadata

func (h *Handler) GetUploadMetadata(s session.ID) events.UploadMetadata

GetUploadMetadata gets the metadata for session upload

func (*Handler) ListParts

func (h *Handler) ListParts(ctx context.Context, upload events.StreamUpload) ([]events.StreamPart, error)

ListParts lists upload parts

func (*Handler) ListUploads

func (h *Handler) ListUploads(ctx context.Context) ([]events.StreamUpload, error)

ListUploads lists uploads that have been initiated but not completed with earlier uploads returned first

func (*Handler) ReserveUploadPart

func (h *Handler) ReserveUploadPart(ctx context.Context, upload events.StreamUpload, partNumber int64) error

ReserveUploadPart reserves an upload part.

func (*Handler) Upload

func (l *Handler) Upload(ctx context.Context, sessionID session.ID, reader io.Reader) (string, error)

Upload uploads session recording to file storage, in case of file handler, writes the file to local directory

func (*Handler) UploadPart

func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, partNumber int64, partBody io.ReadSeeker) (*events.StreamPart, error)

UploadPart uploads part

type ScanStats

type ScanStats struct {
	// Scanned is how many uploads have been scanned
	Scanned int
	// Started is how many uploads have been started
	Started int
	// Corrupted is how many corrupted uploads have been
	// moved out of the scan dir.
	Corrupted int
}

ScanStats provides scan statistics, used in tests

type Uploader

type Uploader struct {
	// contains filtered or unexported fields
}

Uploader periodically scans session records in a folder.

Once it finds the sessions it opens parallel upload streams to the streaming server.

It keeps checkpoints of the upload state and resumes the upload that have been aborted.

It marks corrupted session files to skip their processing.

func NewUploader

func NewUploader(cfg UploaderConfig) (*Uploader, error)

NewUploader creates new disk based session logger

func (*Uploader) Close

func (u *Uploader) Close()

func (*Uploader) Scan

func (u *Uploader) Scan(ctx context.Context) (*ScanStats, error)

Scan scans the streaming directory and uploads recordings

func (*Uploader) Serve

func (u *Uploader) Serve(ctx context.Context) error

Serve runs the uploader until stopped

type UploaderConfig

type UploaderConfig struct {
	// ScanDir is data directory with the uploads
	ScanDir string
	// CorruptedDir is the directory to store corrupted uploads in.
	CorruptedDir string
	// Clock is the clock replacement
	Clock clockwork.Clock
	// ScanPeriod is a uploader dir scan period
	ScanPeriod time.Duration
	// ConcurrentUploads sets up how many parallel uploads to schedule
	ConcurrentUploads int
	// Streamer is upstream streamer to upload events to
	Streamer events.Streamer
	// EventsC is an event channel used to signal events
	// used in tests
	EventsC chan events.UploadEvent
	// Component is used for logging purposes
	Component string
}

UploaderConfig sets up configuration for uploader service

func (*UploaderConfig) CheckAndSetDefaults

func (cfg *UploaderConfig) CheckAndSetDefaults() error

CheckAndSetDefaults checks and sets default values of UploaderConfig

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL