archiver

package
v0.0.0-...-092b941 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2017 License: MIT Imports: 14 Imported by: 0

README

archiver

This library demonstrates reading from the nats.io event bus and storing Frame events to files on disk.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultCaPath string = "certs/my-demo-ca.pem"

DefaultCaPath: default location to look for the Certificate Authority certificate.

View Source
var DefaultCertDir string = "/etc/ssl"

where to look for certs by default (change this to your installation)

View Source
var DefaultCertPath string = "certs/my-demo-cert.crt"

DefaultCaPath: default location to look for this nodes certificate.

View Source
var DefaultKeyPath string = "private/my-demo-cert.key"

DefaultCaPath: default location to look for the this nodes private key.

View Source
var NoLockErr = fmt.Errorf("no flock lock was obtained")

NoLockErr is returned by LockFile when it fails to lock the file.

View Source
var ServiceName = "service-name"

ServiceName is how this service is addressed on the Nats bus.

Functions

func DirExists

func DirExists(name string) bool

func ExtractStreamFromSubject

func ExtractStreamFromSubject(subj string) (stream string, id string)

func FileExists

func FileExists(name string) bool

func HaveLock

func HaveLock(f *os.File) bool

HaveLock checks if we have an exclusive flock lock on the file. flocks(2) can be inherited by child processes.

func LockFile

func LockFile(f *os.File) error

LockFile returns nil if the lock was obtained. If the lock was not obtained, NoLockErr is returned

func UnlockFile

func UnlockFile(f *os.File) error

Unlock calls flock with LOCK_UN, and returns any error. Typically the returned error is nil whether we succeeded or not in unlocking the file. Note that the operating system automatically unlocks flocks when the process exits.

func XtraDirs

func XtraDirs(id string) string

XtraDirs allows better scalabilty for directory file counts by adding a 3 layer deep directory hierarchy.

Types

type ArchiverConfig

type ArchiverConfig struct {
	WriteDir   string
	ServerList string
	TlsDir     string
}

func (*ArchiverConfig) DefineFlags

func (c *ArchiverConfig) DefineFlags(fs *flag.FlagSet)

call DefineFlags before myflags.Parse()

func (*ArchiverConfig) ValidateConfig

func (c *ArchiverConfig) ValidateConfig() error

call c.ValidateConfig() after myflags.Parse()

type Date

type Date struct {
	Year  int
	Month int
	Day   int
}

in UTC time/day boundaries, give the Date

func GetYearMonthDayString

func GetYearMonthDayString(tm time.Time) (string, Date)

func GetYearMonthString

func GetYearMonthString(tm time.Time) (string, Date)

func TimeToDate

func TimeToDate(tm time.Time) Date

type File

type File struct {
	Key        string
	Date       Date
	StreamName string
	Path       string
	Fd         *os.File
	EndOffset  int64
	LastErr    error
	LastWrite  time.Time
}

File is an open file in our cache of file handles

type FileMgr

type FileMgr struct {
	Cfg   ArchiverConfig
	Files map[string]*File

	Ready   chan bool
	ReqStop chan bool
	Done    chan bool

	NatsAsyncErrCh   chan asyncErr
	NatsConnClosedCh chan *nats.Conn
	NatsConnDisconCh chan *nats.Conn
	NatsConnReconCh  chan *nats.Conn
	ArchiveReqCh     chan *nats.Msg

	SignalInterruptCh chan os.Signal
	// contains filtered or unexported fields
}

func NewFileMgr

func NewFileMgr(cfg *ArchiverConfig) *FileMgr

func (*FileMgr) CloseUnusedFiles

func (fm *FileMgr) CloseUnusedFiles(olderThan time.Duration)

close unaccessed files, to keep file handle count low.

func (*FileMgr) GetDateNameString

func (fm *FileMgr) GetDateNameString(tm time.Time, streamName string) (string, Date)

provides index into File map

func (*FileMgr) GetPath

func (fm *FileMgr) GetPath(tm time.Time, streamName string) string

func (*FileMgr) Run

func (fm *FileMgr) Run() error

blocks until done

func (*FileMgr) Stop

func (fm *FileMgr) Stop()

func (*FileMgr) Store

func (fm *FileMgr) Store(tm time.Time, streamName string, data []byte) (f *File, err error)

func (*FileMgr) Subscribe

func (fm *FileMgr) Subscribe(subj string) error

func (*FileMgr) Sync

func (fm *FileMgr) Sync()

sync all open files

func (*FileMgr) SyncAndCloseAllFiles

func (fm *FileMgr) SyncAndCloseAllFiles()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL