ioengine

package module
v0.0.0-...-3db53f6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 9, 2020 License: Apache-2.0 Imports: 12 Imported by: 0

README

ioengine

IO engine library supports BufferedIO/DirectIO/AsyncIO and provides a unified common unix-like system file operation interface.

usage

Import package:

import (
	"github.com/silentsharer/ioengine"
)
go get github.com/silentsharer/ioengine

example

// create AIO ioengine option
opt := ioengine.DefaultOptions
opt.IOEngine = AIO

fd, err := ioengine.Open("/tmp/test", opt)
if err != nil {
	handler(err)
}
defer fd.Close()

data1, err := ioengine.MemAlign(4*ioengine.BlockSize)
if err != nil {
	handler(err)
}
copy(data1, []byte("hello"))

data2, err := ioengine.MemAlign(4*ioengine.BlockSize)
if err != nil {
	handler(err)
}
copy(data1, []byte("world"))

b := NewBuffers()
b.Write(data1).Write(data2)

fd.WriteAtv(*b, 0)
fd.Append(*b)

license

Apache (see LICENSE file)

Documentation

Index

Constants

View Source
const (
	// AlignSize size to align the buffer
	AlignSize = 512
	// BlockSize direct IO minimum number of bytes to write
	BlockSize = 4096
)

Variables

View Source
var (
	ErrNotInit           = errors.New("Not initialized")
	ErrWaitAllFailed     = errors.New("Failed to wait for all requests to complete")
	ErrNilCallback       = errors.New("The kernel returned a nil callback iocb structure")
	ErrUntrackedEventKey = errors.New("The kernel returned an event key we weren't tracking")
	ErrInvalidEventPtr   = errors.New("The kernel returned an invalid callback event pointer")
	ErrReqIDNotFound     = errors.New("The requestID not found")
	ErrNotDone           = errors.New("Request not finished")
)
View Source
var DefaultOptions = Options{
	IOEngine:      StandardIO,
	Flag:          os.O_RDWR | os.O_CREATE | os.O_SYNC,
	Perm:          0644,
	FileLock:      None,
	MmapSize:      1<<30 - 1,
	MmapWritable:  false,
	AIO:           Libaio,
	AIOQueueDepth: 1024,
	AIOTimeout:    0,
}

DefaultOptions is recommended options, you can modify these to suit your needs.

View Source
var ShardCount = 32

ShardCount map shard count

Functions

func Lock

func Lock(b []byte) error

Lock locks the maped slice, preventing it from being swapped out.

func Madvise

func Madvise(b []byte) error

Madvise advises the kernel about how to handle the mapped slice.

func MemAlign

func MemAlign(blockSize uint) ([]byte, error)

MemAlign mem align

func MemAlignWithBase

func MemAlignWithBase(blockSize, alignSize uint) ([]byte, error)

MemAlignWithBase like linux posix_memalign. block start address must be a multiple of AlignSize. block size also must be a multiple of AlignSize.

func Mmap

func Mmap(fd *os.File, offset int64, length int, writable bool) ([]byte, error)

Mmap use the mmap system call to memory mapped file or device.

func Munmap

func Munmap(b []byte) error

Munmap unmaps mapped slice, this will also flush any remaining changes.

func OpenFileWithDIO

func OpenFileWithDIO(name string, flag int, perm os.FileMode) (*os.File, error)

OpenFileWithDIO open files with O_DIRECT flag

func Sync

func Sync(b []byte) error

Sync flushes mmap slice's all changes back to the device.

func Unlock

func Unlock(b []byte) error

Unlock unlocks the mapped slice, allowing it to swap out again.

Types

type AIOMode

type AIOMode int

AIOMode specifies aio mode, default Libaio.

const (
	// Libaio linux kernel disk async IO solution
	Libaio AIOMode = iota
	// IOUring linux kernel new async IO with v5.1
	IOUring
)

type AsyncIO

type AsyncIO struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

AsyncIO async IO maybe we can implement a simplified posix file system by implement an own disk allocator? Give it a try?

func (*AsyncIO) Append

func (aio *AsyncIO) Append(bs [][]byte) (int, error)

func (*AsyncIO) Close

func (aio *AsyncIO) Close() error

Close will wait for all submitted IO to completed.

func (*AsyncIO) FLock

func (aio *AsyncIO) FLock() error

FLock async IO not impl Flock

func (*AsyncIO) FUnlock

func (aio *AsyncIO) FUnlock() error

FUnlock async IO not impl FUnlock

func (*AsyncIO) Fd

func (aio *AsyncIO) Fd() uintptr

Fd wraps *os.File Fd to impl File Fd func

func (*AsyncIO) IsDone

func (aio *AsyncIO) IsDone(id RequestID) (bool, error)

func (*AsyncIO) Option

func (aio *AsyncIO) Option() Options

Option return options

func (*AsyncIO) Read

func (aio *AsyncIO) Read(b []byte) (int, error)

func (*AsyncIO) ReadAt

func (aio *AsyncIO) ReadAt(b []byte, offset int64) (int, error)

func (*AsyncIO) Seek

func (aio *AsyncIO) Seek(offset int64, whence int) (int64, error)

func (*AsyncIO) Stat

func (aio *AsyncIO) Stat() (os.FileInfo, error)

Stat wraps *os.File Stat to impl File Stat func

func (*AsyncIO) Sync

func (aio *AsyncIO) Sync() error

Sync will wait for all submitted jobs to finish and then sync the file descriptor. Because the Linux kernel does not actually support Sync via the AIO interface we just issue a plain old sync via userland. No async here. Sync don't ack outstanding requests

func (*AsyncIO) Truncate

func (aio *AsyncIO) Truncate(size int64) error

Truncate will wait for all submitted jobs to finish trunctate the file to the designated size.

func (*AsyncIO) WaitFor

func (aio *AsyncIO) WaitFor(id RequestID) (int, error)

WaitFor will block until the given RequestId is done

func (*AsyncIO) Write

func (aio *AsyncIO) Write(b []byte) (int, error)

Write simulate write by writeAt, it is a async IO. the buffer cannot change before the write completes.

func (*AsyncIO) WriteAt

func (aio *AsyncIO) WriteAt(b []byte, offset int64) (int, error)

func (*AsyncIO) WriteAtv

func (aio *AsyncIO) WriteAtv(bs [][]byte, offset int64) (int, error)

type Buffers

type Buffers [][]byte

Buffers contains zero or more runs of bytes to write. this is applied to readv, writev, preadv, pwritev.

func NewBuffers

func NewBuffers() *Buffers

NewBuffers init buffer slice by default cap 128

func (*Buffers) Length

func (v *Buffers) Length() (n int)

Length return buffers byte total length

func (*Buffers) Read

func (v *Buffers) Read(b []byte) (n int, err error)

func (*Buffers) Write

func (v *Buffers) Write(b []byte) *Buffers

func (*Buffers) WriteTo

func (v *Buffers) WriteTo(w io.Writer) (n int64, err error)

WriteTo direct write to writer

type ConcurrentMap

type ConcurrentMap []*ConcurrentMapShared

ConcurrentMap A "thread" safe map of type string:Anything. To avoid lock bottlenecks this map is dived to several (ShardCount) map shards.

func NewConcurrentMap

func NewConcurrentMap() ConcurrentMap

NewConcurrentMap Creates a new concurrent map.

func (ConcurrentMap) Count

func (m ConcurrentMap) Count() int

Count returns the number of elements within the map.

func (ConcurrentMap) Get

func (m ConcurrentMap) Get(key string) (interface{}, bool)

Get retrieves an element from map under given key.

func (ConcurrentMap) GetShard

func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared

GetShard returns shard under given key

func (ConcurrentMap) Has

func (m ConcurrentMap) Has(key string) bool

Has Looks up an item under specified key

func (ConcurrentMap) IsEmpty

func (m ConcurrentMap) IsEmpty() bool

IsEmpty checks if map is empty.

func (ConcurrentMap) Items

func (m ConcurrentMap) Items() map[string]interface{}

Items returns all items as map[string]interface{}

func (ConcurrentMap) Iter

func (m ConcurrentMap) Iter() <-chan Tuple

Iter returns an iterator which could be used in a for range loop. Deprecated: using IterBuffered() will get a better performence

func (ConcurrentMap) IterBuffered

func (m ConcurrentMap) IterBuffered() <-chan Tuple

IterBuffered returns a buffered iterator which could be used in a for range loop.

func (ConcurrentMap) IterCb

func (m ConcurrentMap) IterCb(fn IterCb)

IterCb Callback based iterator, cheapest way to read all elements in a map.

func (ConcurrentMap) Keys

func (m ConcurrentMap) Keys() []string

Keys returns all keys as []string

func (ConcurrentMap) MSet

func (m ConcurrentMap) MSet(data map[string]interface{})

MSet batch set

func (ConcurrentMap) MarshalJSON

func (m ConcurrentMap) MarshalJSON() ([]byte, error)

MarshalJSON reviles ConcurrentMap "private" variables to json marshal.

func (ConcurrentMap) Pop

func (m ConcurrentMap) Pop(key string) (v interface{}, exists bool)

Pop removes an element from the map and returns it

func (ConcurrentMap) RandomPop

func (m ConcurrentMap) RandomPop() (k string, v interface{}, exists bool)

RandomPop random removes an element from the map and returns it

func (ConcurrentMap) Remove

func (m ConcurrentMap) Remove(key string)

Remove removes an element from the map.

func (ConcurrentMap) RemoveCb

func (m ConcurrentMap) RemoveCb(key string, cb RemoveCb) bool

RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params If callback returns true and element exists, it will remove it from the map Returns the value returned by the callback (even if element was not present in the map)

func (ConcurrentMap) Set

func (m ConcurrentMap) Set(key string, value interface{})

Set the given value under the specified key.

func (ConcurrentMap) SetIfAbsent

func (m ConcurrentMap) SetIfAbsent(key string, value interface{}) bool

SetIfAbsent Sets the given value under the specified key if no value was associated with it.

func (ConcurrentMap) Upsert

func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{})

Upsert Insert or Update - updates existing element or inserts a new one using UpsertCb

type ConcurrentMapShared

type ConcurrentMapShared struct {
	sync.RWMutex // Read Write mutex, guards access to internal map.
	// contains filtered or unexported fields
}

ConcurrentMapShared A "thread" safe string to anything map.

type DirectIO

type DirectIO struct {
	*os.File
	*FileLock
	// contains filtered or unexported fields
}

DirectIO dio mode

func (*DirectIO) Append

func (dio *DirectIO) Append(bs [][]byte) (int, error)

Append write data to the end of file.

func (*DirectIO) Close

func (dio *DirectIO) Close() error

Close impl standard File Close method

func (*DirectIO) FLock

func (dio *DirectIO) FLock() (err error)

FLock a file lock is a recommended lock. if file lock not init, we will init once.

func (*DirectIO) FUnlock

func (dio *DirectIO) FUnlock() error

FUnlock file unlock

func (*DirectIO) Option

func (dio *DirectIO) Option() Options

Option return File options

func (*DirectIO) WriteAtv

func (dio *DirectIO) WriteAtv(bs [][]byte, off int64) (int, error)

WriteAtv like linux pwritev, write to the specifies offset and dose not change the file offset.

type Event

type Event struct {
	Data unsafe.Pointer
	Obj  *Iocb
	Res  int64
	Res2 int64
}

type File

type File interface {
	// Fd returns the Unix fd or Windows handle referencing the open file.
	// The fd is valid only until f.Close is called or f is garbage collected.
	Fd() uintptr

	// Stat returns the FileInfo structure describing file.
	// The MMap mode returns the native file state instead of the memory slice.
	Stat() (os.FileInfo, error)

	// Read reads up to len(b) bytes from the File.
	// It returns the number of bytes read and any error encountered.
	// At end of file, Read returns io.EOF.
	Read(b []byte) (int, error)

	// ReadAt reads len(b) bytes from the File starting at byte offset off.
	// It returns the number of bytes read and the error, if any.
	// ReadAt always returns a non-nil error when n < len(b).
	// At end of file, that error is io.EOF.
	ReadAt(b []byte, off int64) (int, error)

	// Write writes len(b) bytes to the File.
	// It returns the number of bytes written and an error, if any.
	// Write returns a non-nil error when n != len(b).
	Write(b []byte) (int, error)

	// WriteAt writes len(b) bytes to the File starting at byte offset off.
	// It returns the number of bytes written and an error, if any.
	// WriteAt returns a non-nil error when n != len(b).
	WriteAt(b []byte, off int64) (int, error)

	// WriteAtv write multiple discrete discontinuous mem block
	// on AIO mode, it's impled by pwritev syscall
	// on other mode, it's impled by multi call pwrite syscall
	WriteAtv(bs [][]byte, off int64) (int, error)

	// Append write data at the end of file
	// We do not guarantee atomicity of concurrent append writes.
	// Note: we should avoid O_APPEND here due to ta the following bug:
	// POSIX requires that opening a file with the O_APPEND flag should
	// have no affect on the location at which pwrite() writes data.
	// However, on Linux, if a file is opened with O_APPEND, pwrite()
	// appends data to the end of the file, regardless of the value of
	// offset. on darwin, there is no this Bug.
	// More info here: https://linux.die.net/man/2/pwrite
	Append(bs [][]byte) (int, error)

	// Seek sets the offset for the next Read or Write on file to offset, interpreted
	// according to whence: 0 means relative to the origin of the file, 1 means
	// relative to the current offset, and 2 means relative to the end.
	// It returns the new offset and an error, if any.
	// The behavior of Seek on a file opened with O_APPEND is not specified.
	Seek(offset int64, whence int) (int64, error)

	// Truncate changes the size of the file.
	// It does not change the I/O offset.
	// If there is an error, it will be of type *PathError.
	Truncate(size int64) error

	// FLock the lock is suggested and exclusive
	FLock() error

	// FUnlock unlock the file lock
	// it will be atomic release when file close.
	FUnlock() error

	// Sync commits the current contents of the file to stable storage.
	// Typically, this means flushing the file system's in-memory copy
	// of recently written data to disk.
	Sync() error

	// Close closes the File, rendering it unusable for I/O.
	Close() error

	// Option return IO engine options
	Option() Options
}

File a unified common file operation interface

func Open

func Open(name string, opt Options) (File, error)

Open opens the named file for reading

type FileIO

type FileIO struct {
	*os.File
	*FileLock
	// contains filtered or unexported fields
}

FileIO Standrad I/O mode

func (*FileIO) Append

func (fi *FileIO) Append(bs [][]byte) (int, error)

Append write data to the end of file.

func (*FileIO) Close

func (fi *FileIO) Close() error

Close impl standard File Close method

func (*FileIO) FLock

func (fi *FileIO) FLock() (err error)

FLock a file lock is a recommended lock. if file lock not init, we will init once.

func (*FileIO) FUnlock

func (fi *FileIO) FUnlock() error

FUnlock file unlock

func (*FileIO) Option

func (fi *FileIO) Option() Options

Option return File options

func (*FileIO) WriteAtv

func (fi *FileIO) WriteAtv(bs [][]byte, off int64) (int, error)

WriteAtv like linux pwritev, write to the specifies offset and dose not change the file offset.

type FileLock

type FileLock struct {
	// contains filtered or unexported fields
}

FileLock holds a lock on a hidden file inside.

func NewFileLock

func NewFileLock(path string, writable bool) (*FileLock, error)

NewFileLock return internal file lock example: The file lock format for the file name 'test' is .test-flock

func (*FileLock) FLock

func (fl *FileLock) FLock() error

FLock a block file lock

func (*FileLock) FUnlock

func (fl *FileLock) FUnlock() error

FUnlock file unlock

func (*FileLock) Release

func (fl *FileLock) Release() error

Release deletes the pid file and auto releases lock on the file

type FileLockMode

type FileLockMode int

FileLockMode specifies file lock mode, default None.

const (
	// None indicates that open file without file lock
	None FileLockMode = iota
	// ReadWrite indicates that open file with file rwlock
	ReadWrite
	// ReadOnly indicates that open file with file rlock
	ReadOnly
)

type IOContext

type IOContext uint

func NewIOContext

func NewIOContext(maxEvents int) (IOContext, error)

func (IOContext) Cancel

func (ioctx IOContext) Cancel(iocbs []Iocb, events []Event) (int, error)

func (IOContext) Destroy

func (ioctx IOContext) Destroy() error

func (IOContext) GetEvents

func (ioctx IOContext) GetEvents(minnr, nr int, events []Event, timeout Timespec) (int, error)

func (IOContext) Submit

func (ioctx IOContext) Submit(iocbs []*Iocb) (int, error)

type IOMode

type IOMode int

IOMode specifies disk I/O mode, default StandardIO.

const (
	// StandardIO indicates that disk I/O using standard buffered I/O
	StandardIO IOMode = iota
	// MMap indicates that disk I/O using memory mapped
	MMap
	// DIO indicates that disk I/O using Direct I/O
	DIO
	// AIO indicates that disk I/O using Async I/O by libaio or io_uring
	AIO
)

type Iocb

type Iocb struct {
	Data   unsafe.Pointer
	Key    uint64
	Opcode int16
	Prio   int16
	Fd     uint32
	Buf    unsafe.Pointer
	Nbytes uint64
	Offset int64

	Flags uint32
	Resfd uint32
	// contains filtered or unexported fields
}

func NewIocb

func NewIocb(fd uint32) *Iocb

func (*Iocb) OpCode

func (iocb *Iocb) OpCode() IocbCmd

func (*Iocb) PrepFDSync

func (iocb *Iocb) PrepFDSync()

func (*Iocb) PrepFSync

func (iocb *Iocb) PrepFSync()

func (*Iocb) PrepPread

func (iocb *Iocb) PrepPread(buf []byte, bufLen int, offset int64)

func (*Iocb) PrepPreadv

func (iocb *Iocb) PrepPreadv(bs [][]byte, offset int64)

func (*Iocb) PrepPwrite

func (iocb *Iocb) PrepPwrite(buf []byte, bufLen int, offset int64)

func (*Iocb) PrepPwritev

func (iocb *Iocb) PrepPwritev(bs [][]byte, offset int64)

func (*Iocb) SetEventFd

func (iocb *Iocb) SetEventFd(eventfd int)

type IocbCmd

type IocbCmd int16
const (
	IOCmdPread IocbCmd = iota
	IOCmdPwrite
	IOCmdFSync
	IOCmdFDSync
	IOCmdPoll
	IOCmdNoop
	IOCmdPreadv
	IOCmdPwritev
)

type IterCb

type IterCb func(key string, v interface{})

IterCb Iterator callback,called for every key,value found in maps. RLock is held for all calls for a given shard therefore callback sess consistent view of a shard, but not across the shards

type MemoryMap

type MemoryMap struct {
	*os.File
	*FileLock
	// contains filtered or unexported fields
}

MemoryMap disk IO mode page faults and dirty page writes can degrade mmap performance we impl ReadAt by mmap, other API impled by standardIO.

func (*MemoryMap) Append

func (mmap *MemoryMap) Append(bs [][]byte) (int, error)

Append write data to the end of file.

func (*MemoryMap) Close

func (mmap *MemoryMap) Close() error

Close closes the File

func (*MemoryMap) FLock

func (mmap *MemoryMap) FLock() (err error)

FLock a file lock is a recommended lock. if file lock not init, we will init once

func (*MemoryMap) FUnlock

func (mmap *MemoryMap) FUnlock() error

FUnlock file unlock

func (*MemoryMap) Option

func (mmap *MemoryMap) Option() Options

Option return file options

func (*MemoryMap) ReadAt

func (mmap *MemoryMap) ReadAt(b []byte, off int64) (int, error)

ReadAt like any io.ReaderAt, clients can execute parallel ReadAt calls.

func (*MemoryMap) WriteAtv

func (mmap *MemoryMap) WriteAtv(bs [][]byte, off int64) (int, error)

WriteAtv like linux pwritev, write to the specifies offset and dose not change the file offset.

type Options

type Options struct {
	// IOEngine io mode
	IOEngine IOMode

	// Flag the file open mode
	Flag int

	// Perm the file perm
	Perm os.FileMode

	// FileLock file lock mode, default none
	FileLock FileLockMode

	// MmapSize mmap file size in memory
	MmapSize int

	// MmapWritable whether to allow mmap write
	// if true, it will be use mmap write instead of standardIO write, not implemented yet.
	MmapWritable bool

	// AIO async IO mode, defaul libaio, the io_uring isn't implemented yet.
	AIO AIOMode

	// AIOQueueDepth libaio max events, it's also use to control client IO number.
	AIOQueueDepth int

	// AIOTimeout unit ms, libaio timeout, 0 means no timeout.
	AIOTimeout int
}

Options are params for creating IOEngine.

type RemoveCb

type RemoveCb func(key string, v interface{}, exists bool) bool

RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held If returns true, the element will be removed from the map

type RequestID

type RequestID uint

RequestID aio submit request id

type Timespec

type Timespec struct {
	Sec  int
	Nsec int
}

type Tuple

type Tuple struct {
	Key string
	Val interface{}
}

Tuple used by the Iter & IterBuffered functions to wrap two variables together over a channel,

type UpsertCb

type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{}

UpsertCb Callback to return new element to be inserted into the map It is called while lock is held, therefore it MUST NOT try to access other keys in same map, as it can lead to deadlock since Go sync.RWLock is not reentrant

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL