uring

package module
v0.0.0-...-1b2ec51 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 9, 2021 License: MIT Imports: 5 Imported by: 1

README

Golang library for io_uring framework (without CGO)

io_uring is the new kernel interface for asynchronous IO. The best introduction is io_uring intro.

Note that this library is mostly tested on 5.8.* kernel. While the core of this library doesn't use any of the newer features and will work on the kernel with io_uring that supports flags IORING_SETUP_CQSIZE and IORING_SETUP_ATTACH_WQ, and supports notifications with eventfd (IORING_REGISTER_EVENTFD) - some of the tests will depend on the latest features and will probably fail with cryptic errors if run on the kernel that doesn't support those features.

Benchmarks

Benchmarks for reading 40gb file are collected on 5.8.15 kernel, ext4 and Samsung EVO 960. File is opened with O_DIRECT. Benchmarks are comparing the fastest way to read a file using optimal strategies with io_uring or os.

io_uring
  • 16 rings (one per core) with shared kernel workers
  • one thread per ring to reap completions (faster than single thread with epoll and eventfd's)
  • 4096 submission queue size
  • 8192 completion queue size
  • 100 000 concurrent readers
BenchmarkReadAt/enter_4096-16            5000000          1709 ns/op	2397.39 MB/s          34 B/op          2 allocs/op

os
  • 128 os threads (more than that hurts performance)
BenchmarkReadAt/os_4096-128              5000000          1901 ns/op	2154.84 MB/s           0 B/op          0 allocs/op

Implementation

memory ordering (atomics)

io_uring relies on StoreRelease/ReadAcquire atomic semantics to guarantee that submitted entries will be visible after by the kernel when sq tail is updated, and vice verse for completed and cq head.

Based on comments (#1,#2) golang provides sufficient guarantees (actually stronger) for this to work. However i wasn't able to find any mention in memory model specification so it is a subject to change, but highly unlikely.

Also, runtime/internal/atomic module has implementation for weaker atomics that provide exactly StoreRelease and ReadAcquire. You can link them using go:linkname pragma, but it is not safe, nobody wants about it and it doesn't provide any noticeable perf improvements to justify this hack.

//go:linkname LoadAcq runtime/internal/atomic.LoadAcq
func LoadAcq(ptr *uint32) uint32

//go:linkname StoreRel runtime/internal/atomic.StoreRel
func StoreRel(ptr *uint32, val uint32)
pointers

Casting pointers to uintptr is generally unsafe as there is no guarantee that they will remain in the same location after the cast. Documentation for unsafe.Pointer specifies in what situations it can be done safely. Communication with io_uring, obviously, assumes that pointer will be valid until the end of the execution. Take a look at the writev syscall example:

func Writev(sqe *SQEntry, fd uintptr, iovec []syscall.Iovec, offset uint64, flags uint32) {
        sqe.opcode = IORING_OP_WRITEV
        sqe.fd = int32(fd)
        sqe.len = uint32(len(iovec))
        sqe.offset = offset
        sqe.opcodeFlags = flags
        sqe.addr = (uint64)(uintptr(unsafe.Pointer(&iovec[0])))
}

In this example sqe.addr may become invalid right after Writev helper returns. In order to lock pointer in place there is hidden pragma go:uintptrescapes.

//go:uintptrescapes

func (q *Queue) Syscall(opts func(*uring.SQEntry), ptrs ...uintptr) (uring.CQEntry, error) {
        return q.Complete(opts)
}

...

func (f *File) WriteAt(buf []byte, off int64) (int, error) {
        if len(buf) == 0 {
                return 0, nil
        }
        iovec := []syscall.Iovec{{Base: &buf[0], Len: uint64(len(buf))}}
        return ioRst(f.queue.Syscall(func(sqe *uring.SQEntry) {
                uring.Writev(sqe, f.ufd, iovec, uint64(off), 0)
                sqe.SetFlags(f.flags)
        }, uintptr(unsafe.Pointer(&iovec[0]))))
}

ptrs are preventing pointers from being moved to another location until the Syscall exits.

This approach has several limitations:

  • pragma go:uintptrescapes forces heap allocation (e.g. iovec will escape to the heap in this example).
  • you cannot use interface for queue.Syscall.

It must be possible to achieve the same without heap allocation (e.g. the same way as in syscall.Syscall/syscall.RawSyscall).

synchronization and goroutines

Submissions queue requires synchronization if used concurrently by multiple goroutines. It leads to contention with large number of CPU's. The natural way to avoid contetion is to setup ring per thread, io_uring provides handy flag IORING_SETUP_ATTACH_WQ that allows to share same kernel pool between multiple rings.

On linux we can use syscall.Gettid efficiently to assign work to a particular ring in a way that minimizes contetion. It is also critical to ensure that completion path doesn't have to synchronize with submission, as it introductes noticeable degradation.

Another potential unsafe improvement is to link procPin from runtime. And use it in the place of syscall.Gettid, my last benchmark shows no difference (maybe couple ns not in favor of procPing/procUnpin).

//go:linkname procPin runtime.procPin
func procPin() int

//go:linkname procUnpin runtime.procUnpin
func procUnpin() int

In runtime we can use gopark/goready directly, however this is not available outside of the runtime and I had to use simple channel for notifying submitter on completion. This works nicely and doesn't introduce a lot of overhead. This whole approach in general adds ~750ns with high submission rate (includes spawning goroutine, submitting nop uring operation, and waiting for completion).

Several weak points of this approach are:

  • IOPOLL and SQPOLL can't be used, as it will lead to creation of a polling thread for each CPU.
  • Submissions are not batched (syscall per operation). However this can be improved with a batch API.

By introducing uring into the runtime directly we can decrease overhead even futher, by removing syscall.Gettid, removing sq synchronization, and improving gopark/goready efficiency (if compared with waiting on channel).

Documentation

Index

Constants

View Source
const (
	IO_URING_SETUP uintptr = 425 + iota
	IO_URING_ENTER
	IO_URING_REGISTER
)

syscalls

View Source
const (
	IORING_OP_NOP uint8 = iota
	IORING_OP_READV
	IORING_OP_WRITEV
	IORING_OP_FSYNC
	IORING_OP_READ_FIXED
	IORING_OP_WRITE_FIXED
	IORING_OP_POLL_ADD
	IORING_OP_POLL_REMOVE
	IORING_OP_SYNC_FILE_RANGE
	IORING_OP_SENDMSG
	IORING_OP_RECVMSG
	IORING_OP_TIMEOUT
	IORING_OP_TIMEOUT_REMOVE
	IORING_OP_ACCEPT
	IORING_OP_ASYNC_CANCEL
	IORING_OP_LINK_TIMEOUT
	IORING_OP_CONNECT
	IORING_OP_FALLOCATE
	IORING_OP_OPENAT
	IORING_OP_CLOSE
	IORING_OP_FILES_UPDATE
	IORING_OP_STATX
	IORING_OP_READ
	IORING_OP_WRITE
	IORING_OP_FADVISE
	IORING_OP_MADVISE
	IORING_OP_SEND
	IORING_OP_RECV
	IORING_OP_OPENAT2
	IORING_OP_EPOLL_CTL
	IORING_OP_SPLICE
	IORING_OP_PROVIDE_BUFFERS
	IORING_OP_REMOVE_BUFFERS
	IORING_OP_TEE
	IORING_OP_LAST
)

operations

View Source
const (
	IOSQE_FIXED_FILE uint8 = 1 << iota
	IOSQE_IO_DRAIN
	IOSQE_IO_LINK
	IOSQE_IO_HARDLINK
	IOSQE_ASYNC
	IOSQE_BUFFER_SELECT
)

submission queue entry flags

View Source
const (
	IORING_SETUP_IOPOLL uint32 = 1 << iota
	IORING_SETUP_SQPOLL
	IORING_SETUP_SQ_AFF
	IORING_SETUP_CQSIZE
	IORING_SETUP_CLAMP
	IORING_SETUP_ATTACH_WQ
)

setup flags

View Source
const (
	IORING_OFF_SQ_RING int64 = 0
	IORING_OFF_CQ_RING int64 = 0x8000000
	IORING_OFF_SQES    int64 = 0x10000000
)

offsets for mmap

View Source
const (
	IORING_SQ_NEED_WAKEUP uint32 = 1 << iota
	IORING_SQ_CQ_OVERFLOW
)

sq ring flags

View Source
const (
	IORING_ENTER_GETEVENTS uint32 = 1 << iota
	IORING_ENTER_SQ_WAKEUP
)

enter flags

View Source
const (
	IORING_FEAT_SINGLE_MMAP uint32 = 1 << iota
	IORING_FEAT_NODROP
	IORING_FEAT_SUBMIT_STABLE
	IORING_FEAT_RW_CUR_POS
	IORING_FEAT_CUR_PERSONALITY
	IORING_FEAT_FAST_POLL
	IORING_FEAT_POLL_32BITS
)

params feature flags

View Source
const (
	IORING_REGISTER_BUFFERS uintptr = iota
	IORING_UNREGISTER_BUFFERS
	IORING_REGISTER_FILES
	IORING_UNREGISTER_FILES
	IORING_REGISTER_EVENTFD
	IORING_UNREGISTER_EVENTFD
	IORING_REGISTER_FILES_UPDATE
	IORING_REGISTER_EVENTFD_ASYNC
	IORING_REGISTER_PROBE
	IORING_REGISTER_PERSONALITY
	IORING_UNREGISTER_PERSONALITY
)
View Source
const (
	MinSize = 2
	MaxSize = 4096
)
View Source
const IORING_CQE_BUFFER_SHIFT uint32 = 16
View Source
const IORING_CQE_F_BUFFER uint32 = 1 << 0

cqe flags

View Source
const IORING_CQ_EVENTFD_DISABLED uint32 = 1 << 0

cqe ring flags

View Source
const IORING_FSYNC_DATASYNC uint32 = 1 << 0

sqe fsync flags

View Source
const IORING_TIMEOUT_ABS uint32 = 1 << 0

sqe timeout flags

View Source
const (
	// IO_URING_OP_SUPPORTED ...
	IO_URING_OP_SUPPORTED uint16 = 1 << 0
)
View Source
const SPLICE_F_FD_IN_FIXED uint32 = 1 << 31

sqe splice flags

Variables

This section is empty.

Functions

func Close

func Close(sqe *SQEntry, fd uintptr)

Close ...

func Fdatasync

func Fdatasync(sqe *SQEntry, fd uintptr)

Fdatasync ...

func Fsync

func Fsync(sqe *SQEntry, fd uintptr)

Fsync ...

func LinkTimeout

func LinkTimeout(sqe *SQEntry, ts *unix.Timespec, abs bool)

LinkTimeout will cancel linked operation if it doesn't complete in time.

func Nop

func Nop(sqe *SQEntry)

Nop ...

func Openat

func Openat(sqe *SQEntry, dfd int32, pathptr *byte, flags uint32, mode uint32)

Openat

func Read

func Read(sqe *SQEntry, fd uintptr, buf []byte)

Read ...

func ReadFixed

func ReadFixed(sqe *SQEntry, fd uintptr, base *byte, len, offset uint64, flags uint32, bufIndex uint16)

ReadFixed ...

func Readv

func Readv(sqe *SQEntry, fd uintptr, iovec []syscall.Iovec, offset uint64, flags uint32)

Readv

func Recv

func Recv(sqe *SQEntry, fd uintptr, buf []byte, flags uint32)

Recv ...

func Send

func Send(sqe *SQEntry, fd uintptr, buf []byte, flags uint32)

Send ...

func Timeout

func Timeout(sqe *SQEntry, ts *unix.Timespec, abs bool, count uint64)

Timeout operation. if abs is true then IORING_TIMEOUT_ABS will be added to timeoutFlags. count is the number of events to wait.

func Write

func Write(sqe *SQEntry, fd uintptr, buf []byte)

Write ...

func WriteFixed

func WriteFixed(sqe *SQEntry, fd uintptr, base *byte, len, offset uint64, flags uint32, bufIndex uint16)

WriteFixed ...

func Writev

func Writev(sqe *SQEntry, fd uintptr, iovec []syscall.Iovec, offset uint64, flags uint32)

Writev ...

Types

type CQEntry

type CQEntry struct {
	// contains filtered or unexported fields
}

CQEntry is a submission queue entry. Filled in by kernel, applications should not modify it.

func (CQEntry) Flags

func (e CQEntry) Flags() uint32

Flags ...

func (CQEntry) Result

func (e CQEntry) Result() int32

Result ...

func (CQEntry) UserData

func (e CQEntry) UserData() uint64

UserData is a copy of user data from SQEntry.

type CQRingOffsets

type CQRingOffsets struct {
	Head        uint32
	Tail        uint32
	RingMask    uint32
	RingEntries uint32
	Overflow    uint32
	CQEs        uint32
	Flags       uint32
	Resv1       uint32
	Resv2       uint64
}

CQRingOffsets ...

type IOUringFilesUpdate

type IOUringFilesUpdate struct {
	Offset uint32

	Fds *int32
	// contains filtered or unexported fields
}

type IOUringParams

type IOUringParams struct {
	SQEntries    uint32
	CQEntries    uint32
	Flags        uint32
	SQThreadCPU  uint32
	SQThreadIdle uint32
	Features     uint32
	WQFd         uint32

	SQOff SQRingOffsets
	CQOff CQRingOffsets
	// contains filtered or unexported fields
}

IOUringParams can be used to control io_uring instance behavior. For more details see manpages for IO_URING_SETUP(2).

IORING_SETUP_* can be passed to Flags to enable specific behaviour.

Specifying CQEntries will be ignored unlless IORING_SETUP_CQSIZE provided.

SQThreadIdle is in milliseconds and controls amount of time SQ thread can be idle before it will be put to sleep by kernel.

SQThreadCPU if specified together with IORING_SETUP_SQ_AFF to bound SQ thread to specific CPU.

WQFd can be used to share kernel thread worker pool between multiple io_uring instances. Ignored unless specified with IORING_SETUP_ATTACH_WQ. For an example see queue.ShardedQueue and relevant tests.

Other fields should be ignored, as they are filled in by the kernel. TODO those fields can be private, SQOff and CQOff should be definitely private

type Probe

type Probe struct {
	LastOp uint8
	OpsLen uint8

	Ops [probeOpsSize]ProbeOp
	// contains filtered or unexported fields
}

Probe ...

func (Probe) IsSupported

func (p Probe) IsSupported(op uint8) bool

IsSupported returns true if operation is supported.

type ProbeOp

type ProbeOp struct {
	Op uint8

	Flags uint16
	// contains filtered or unexported fields
}

ProbeOp ...

type Ring

type Ring struct {
	// contains filtered or unexported fields
}

Ring is an interface to io_uring kernel framework. Not safe to use from multiple goroutines without additional synchronization. API is inspired mostly by liburing.

func Setup

func Setup(size uint, params *IOUringParams) (*Ring, error)

func (*Ring) CQSize

func (r *Ring) CQSize() uint32

func (*Ring) Close

func (r *Ring) Close() (err error)

func (*Ring) CloseEventfd

func (r *Ring) CloseEventfd() error

CloseEventfd unregsiters eventfd from uring istance and closes associated fd.

func (*Ring) Enter

func (r *Ring) Enter(submitted uint32, minComplete uint32) (uint32, error)

Enter io_uring instance. submited and minComplete will be passed as is.

func (*Ring) Eventfd

func (r *Ring) Eventfd() uintptr

Eventfd is a eventfd for this uring instance. Call ring.Setupeventfd() to setup one.

func (*Ring) Fd

func (r *Ring) Fd() uintptr

Fd is a io_uring fd returned from IO_URING_SETUP syscall.

func (*Ring) Flush

func (r *Ring) Flush() uint32

Flush submission queue.

func (*Ring) GetCQEntry

func (r *Ring) GetCQEntry(minComplete uint32) (CQEntry, error)

GetCQEntry returns entry from completion queue, performing IO_URING_ENTER syscall if necessary. CQE is copied from mmaped region to avoid additional sync step after CQE was consumed. syscall.EAGAIN will be returned if there are no completed entries and minComplete is 0. syscall.EINTR will be returned if IO_URING_ENTER was interrupted while waiting for completion.

func (*Ring) GetSQEntry

func (r *Ring) GetSQEntry() *SQEntry

GetSQEntry returns earliest available SQEntry. May return nil if there are no available entries. Entry can be reused after Submit or Enter.

sqe := ring.GetSQEntry()
ring.Submit(0)

... or ...

sqe := ring.GetSQEntry()
ring.Flush()
ring.Enter(1, 0)

func (*Ring) RegisterBuffers

func (r *Ring) RegisterBuffers(iovec []syscall.Iovec) error

RegisterBuffers ...

func (*Ring) RegisterFiles

func (r *Ring) RegisterFiles(fds []int32) error

RegisterFiles ...

func (*Ring) RegisterProbe

func (r *Ring) RegisterProbe(probe *Probe) error

RegisterProbe ...

func (*Ring) SQSize

func (r *Ring) SQSize() uint32

func (*Ring) SetupEventfd

func (r *Ring) SetupEventfd() error

SetupEventfd creates eventfd and registers it with current uring instance.

func (*Ring) Submit

func (r *Ring) Submit(minComplete uint32) (uint32, error)

Submit and wait for specified number of entries.

func (*Ring) UnregisterBuffers

func (r *Ring) UnregisterBuffers() error

UnregisterBuffers ...

func (*Ring) UnregisterFiles

func (r *Ring) UnregisterFiles() error

UnregisterFiles ...

func (*Ring) UpdateFiles

func (r *Ring) UpdateFiles(fds []int32, off uint32) error

UpdateFiles ...

type SQEntry

type SQEntry struct {
	// contains filtered or unexported fields
}

SQEntry is a submission queue entry. In C some of the fields are unions. Golang doesn't support union declaration, instead SQEntry provided setters with descriptive names.

func (*SQEntry) Reset

func (e *SQEntry) Reset()

Reset will reset all fields to zeros.

func (*SQEntry) SetAddr

func (e *SQEntry) SetAddr(addr uint64)

SetAddr ...

func (*SQEntry) SetAddr2

func (e *SQEntry) SetAddr2(addr2 uint64)

SetAddr2 ...

func (*SQEntry) SetBufGroup

func (e *SQEntry) SetBufGroup(group uint16)

SetBufGroup ...

func (*SQEntry) SetBufIndex

func (e *SQEntry) SetBufIndex(index uint16)

SetBufIndex ...

func (*SQEntry) SetFD

func (e *SQEntry) SetFD(fd int32)

SetFD ...

func (*SQEntry) SetFlags

func (e *SQEntry) SetFlags(flags uint8)

SetFlags ...

func (*SQEntry) SetIOPrio

func (e *SQEntry) SetIOPrio(ioprio uint16)

SetIOPrio ...

func (*SQEntry) SetLen

func (e *SQEntry) SetLen(len uint32)

SetLen ....

func (*SQEntry) SetOffset

func (e *SQEntry) SetOffset(off uint64)

SetOffset ...

func (*SQEntry) SetOpcode

func (e *SQEntry) SetOpcode(opcode uint8)

SetOpcode ...

func (*SQEntry) SetOpcodeFlags

func (e *SQEntry) SetOpcodeFlags(flags uint32)

SetOpcodeFlags ...

func (*SQEntry) SetPersonality

func (e *SQEntry) SetPersonality(personality uint16)

SetPersonality ...

func (*SQEntry) SetSpliceFdIn

func (e *SQEntry) SetSpliceFdIn(val int32)

SetSpliceFdIn ...

func (*SQEntry) SetSpliceOffIn

func (e *SQEntry) SetSpliceOffIn(val uint64)

SetSpliceOffIn ...

func (*SQEntry) SetUserData

func (e *SQEntry) SetUserData(ud uint64)

SetUserData ...

func (*SQEntry) UserData

func (e *SQEntry) UserData() uint64

type SQRingOffsets

type SQRingOffsets struct {
	Head        uint32
	Tail        uint32
	RingMask    uint32
	RingEntries uint32
	Flags       uint32
	Dropped     uint32
	Array       uint32
	Resv1       uint32
	Resv2       uint64
}

SQRingOffsets ...

type Sigset_t

type Sigset_t struct {
	Val [16]uint64
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL