filewatch

package
v4.0.0-...-5981c31 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: BSD-2-Clause, BSD-2-Clause Imports: 26 Imported by: 0

Documentation

Overview

Package filewatch implements advanced utilities for tracking file changes within directories.

Index

Constants

View Source
const (
	MAX_QUEUED_EVENTS_PATH = "/proc/sys/fs/inotify/max_queued_events"
	EVENT_QUEUE_BUFFER     = 100000
)
View Source
const (
	LineEngine  int = 0
	RegexEngine int = 1
)
View Source
const (
	RENAME_COUNT_MAX = 128
)

Variables

View Source
var (
	ErrNotReady         = errors.New("fsnotify watcher is not ready")
	ErrLocationNotDir   = errors.New("Watched Location is not a directory")
	ErrNoDirsWatched    = errors.New("No locations have been added to the watch list")
	ErrInvalidStateFile = errors.New("State file exists and is not a regular file")
	ErrAlreadyStarted   = errors.New("WatchManager already started")
	ErrFailedSeek       = errors.New("Failed to seek to the start of the states file")
	ErrFsNotifyOverflow = errors.New("FSNotify kernel event buffer overflow")
)
View Source
var (
	ErrNotRunning = errors.New("Not running")
)

Functions

func EncodeStateFile

func EncodeStateFile(sf string, states []FileState) (err error)

func ExtractFilters

func ExtractFilters(ff string) ([]string, error)

func NewFollower

func NewFollower(cfg FollowerConfig) (*follower, error)

func ReadStateFile

func ReadStateFile(p string) (states map[string]int64, err error)

Types

type FileId

type FileId struct {
	Major uint64
	Minor uint64
}

type FileName

type FileName struct {
	BaseName string
	FilePath string
}

a unique name that allows multiple IDs pointing at the same file

type FileState

type FileState struct {
	BaseName string
	FilePath string
	State    int64
}

func DecodeStateFile

func DecodeStateFile(sf string) (states []FileState, err error)

type FilterManager

type FilterManager struct {
	// contains filtered or unexported fields
}

func NewFilterManager

func NewFilterManager(stateFile string) (*FilterManager, error)

func (*FilterManager) AddFilter

func (f *FilterManager) AddFilter(bname, loc string, mtchs []string, lh handler, ecfg FollowerEngineConfig) error

func (*FilterManager) CatchupFile

func (f *FilterManager) CatchupFile(wf watchedFile, qc chan os.Signal) (bool, error)

CatchupFile will synchronously consume all outstanding data from the file. This function is typically used at startup so that we can linearly process outstanding data from files one at a time before turning on all our file followers. It is a pre-optimization to deal with scenarios where the file follower has been offline for an extended period of time or a user is attempting import a large amount of data during a migration. Catchup will also check the last mod time of a file and use that as the indicator to consume the final bytes if there is not terminating delimiter

the returned boolean indicates that the quitchan (qc) has fired, this allows us to pass up that the process has been asked to quit.

func (*FilterManager) Close

func (fm *FilterManager) Close() (err error)

func (*FilterManager) Filters

func (fm *FilterManager) Filters() int

Filters returns the current number of installed filters

func (*FilterManager) FlushStates

func (fm *FilterManager) FlushStates() error

FlushStates flushes the current state of followed files to the disk periodically flushing states is a good idea, incase the device crashes, or the process is abruptly killed

func (*FilterManager) Followed

func (fm *FilterManager) Followed() int

Followed returns the current number of following handles if a file matches multiple filters, it will be followed multiple times. So this is NOT the number of files, but the number of follows

func (*FilterManager) IsWatched

func (f *FilterManager) IsWatched(fpath string) bool

func (*FilterManager) LoadFile

func (f *FilterManager) LoadFile(fpath string) (bool, error)

func (*FilterManager) LoadFileList

func (f *FilterManager) LoadFileList(lst []watchedFile) error

func (*FilterManager) NewFollower

func (f *FilterManager) NewFollower(fpath string) (bool, error)

func (*FilterManager) RemoveDirectory

func (f *FilterManager) RemoveDirectory(path string) error

func (*FilterManager) RemoveFollower

func (f *FilterManager) RemoveFollower(fpath string) (bool, error)

func (*FilterManager) RenameFollower

func (f *FilterManager) RenameFollower(fpath string) error

RenameFollower is designed to rename a file that is currently being followed We first grab the file id that matches the given fpath Then we scan the base directory for ALL files and attempt to match the fileId if a match is found, we check if it matches the current filter, if not, we delete the follower if it does, we update the name and leave. If no match is found, we delete the follower

func (*FilterManager) SetLogger

func (fm *FilterManager) SetLogger(lgr ingest.IngestLogger)

func (*FilterManager) SetMaxFilesWatched

func (fm *FilterManager) SetMaxFilesWatched(max int)

type FollowerConfig

type FollowerConfig struct {
	FollowerEngineConfig
	BaseName string
	FilePath string
	State    *int64
	FilterID int
	Handler  handler
}

type FollowerEngineConfig

type FollowerEngineConfig struct {
	Engine     int
	EngineArgs string
}

type LineReader

type LineReader struct {
	// contains filtered or unexported fields
}

func NewLineReader

func NewLineReader(cfg ReaderConfig) (*LineReader, error)

func (*LineReader) Close

func (br *LineReader) Close() error

func (*LineReader) Index

func (br *LineReader) Index() int64

func (*LineReader) Name

func (br *LineReader) Name() string

func (*LineReader) ReadEntry

func (lr *LineReader) ReadEntry() (ln []byte, ok bool, wasEOF bool, err error)

func (*LineReader) ReadRemaining

func (lr *LineReader) ReadRemaining() (ln []byte, err error)

func (*LineReader) SeekFile

func (br *LineReader) SeekFile(offset int64) error

type LogHandler

type LogHandler struct {
	LogHandlerConfig
	// contains filtered or unexported fields
}

func NewLogHandler

func NewLogHandler(cfg LogHandlerConfig, w logWriter) (*LogHandler, error)

func (*LogHandler) HandleLog

func (lh *LogHandler) HandleLog(b []byte, catchts time.Time, fname string) error

func (*LogHandler) Tag

func (lh *LogHandler) Tag() string

type LogHandlerConfig

type LogHandlerConfig struct {
	TagName                 string
	Tag                     entry.EntryTag
	Src                     net.IP
	IgnoreTS                bool
	AssumeLocalTZ           bool
	IgnorePrefixes          []string
	IgnoreGlobs             []string
	TimestampFormatOverride string
	TimezoneOverride        string
	UserTimeRegex           string
	UserTimeFormat          string
	Logger                  logger
	Debugger                debugOut
	Ctx                     context.Context
	TimeFormat              config.CustomTimeFormat
	AttachFilename          bool
	Trim                    bool // run trim space on entries
}

type Reader

type Reader interface {
	SeekFile(int64) error
	ReadEntry() ([]byte, bool, bool, error)
	ReadRemaining() ([]byte, error)
	Index() int64
	Close() error
}

func NewReader

func NewReader(cfg ReaderConfig) (Reader, error)

NewReader creates a new reader based on either the regex engine or line reader engine the Linux version of file follow does NOT support EVTX engines

type ReaderConfig

type ReaderConfig struct {
	Fin        *os.File
	MaxLineLen int
	StartIndex int64
	Engine     int
	EngineArgs string
}

type RegexReader

type RegexReader struct {
	// contains filtered or unexported fields
}

func NewRegexReader

func NewRegexReader(cfg ReaderConfig) (*RegexReader, error)

func (*RegexReader) Close

func (br *RegexReader) Close() error

func (*RegexReader) Index

func (br *RegexReader) Index() int64

func (*RegexReader) Name

func (br *RegexReader) Name() string

func (*RegexReader) ReadEntry

func (rr *RegexReader) ReadEntry() (ln []byte, ok bool, wasEOF bool, err error)

func (*RegexReader) ReadRemaining

func (rr *RegexReader) ReadRemaining() (ln []byte, err error)

func (*RegexReader) SeekFile

func (br *RegexReader) SeekFile(offset int64) error

type WatchConfig

type WatchConfig struct {
	FollowerEngineConfig
	ConfigName string
	BaseDir    string
	FileFilter string
	Hnd        handler
	Recursive  bool
}

type WatchManager

type WatchManager struct {
	// contains filtered or unexported fields
}

func NewWatcher

func NewWatcher(stateFilePath string) (*WatchManager, error)

func (*WatchManager) Add

func (wm *WatchManager) Add(c WatchConfig) error

func (*WatchManager) Catchup

func (wm *WatchManager) Catchup(qc chan os.Signal) (bool, error)

Catchup is used to synchronously process files that have outstanding work to be done. The purpose of this is so that when the file follower first starts with a large number of outstanding files to be processed, it can more intelligently process them one at a time. The real purpose is so that the usecase where a user points the follower at a massive number of files during an improt scenario we don't start grabbing things all willy nilly and with high concurrency we are better off ordering the work to be done and doing it synchronously

the input parameter is a quit channel, basically wired to the signal handler the return values are a shouldQuit(booL) and error the boolean value is true when the signal handler fired, telling us that the ingester should exit

func (*WatchManager) CheckNewDirectory

func (wm *WatchManager) CheckNewDirectory(dir string) error

func (*WatchManager) Close

func (wm *WatchManager) Close() error

func (*WatchManager) Context

func (wm *WatchManager) Context() context.Context

func (*WatchManager) Dump

func (wm *WatchManager) Dump() string

Returns a string containing information about the WatchManager

func (*WatchManager) Filters

func (wm *WatchManager) Filters() int

func (*WatchManager) Followers

func (wm *WatchManager) Followers() int

func (*WatchManager) Remove

func (wm *WatchManager) Remove(dir string) error

func (*WatchManager) SetLogger

func (wm *WatchManager) SetLogger(lgr ingest.IngestLogger)

func (*WatchManager) SetMaxFilesWatched

func (wm *WatchManager) SetMaxFilesWatched(max int)

func (*WatchManager) Start

func (wm *WatchManager) Start() error

Directories

Path Synopsis
tools

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL