mmio

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2022 License: MIT Imports: 18 Imported by: 0

README

mmio

High performance async network io library

go report license Maintenance PRs Welcome Ask Me Anything !

First

if you have any good feature suggestions or bug fixed , any Pull Request or Issues are welcome!

Usage

# install mmio 
go get -u github.com/ooopSnake/mmio@latest
// import 
import "github.com/ooopSnake/mmio"

Overview

  • World Based On Event Loop 😎
  • UnBuffered/Buffered Stream 🚀
  • Timers: IO Timeout, Delay... ⏰
  • DNS Resolve 🌐
  • Lots Of Unix Socket API Toolbox 🔧
  • Thread(Goroutine) Safe! 👍

Backend

Platform Backend Support
Linux family Epoll Done 🎉
mac OS
(BSD family)
Kqueue Done 🎉
Windows IOCP Maybe Never... 😢
POSIX Like Poll Maybe Never... 🥺
we already have epoll
POSIX Like Select Coming Soon 🤡

mmio using interface Poller and Watcher as abstraction for any backend.

Getting Started

Create Pure EventLoop:
loop,err := mmio.NewEventLoop()
merrors.AssertError(err, "new pure event loop")
Create IO EventLoop:
loop, err := mmio.NewIOEvtLoop(1024 * 4)
merrors.AssertError(err, "new io event loop")
Loop Lifecycle:

exit a loop

// just call loop.Break in anywhere
loop.Break()

📌Loop.'Close' can't stop a loop but Loop.'Break' can.

📌Loop.'Close' use to cleanup a loop

Cleanup a loop

loop ,err := mmio.NewEventLoop()
merrors.AssertError(err, "new event loop")
defer loop.Close()

Run loop synchronously

// block until break loop called
loop.Run()

Run loop asynchronously😂😂😂

go func(){ loop.Run() }()
Create Listener:
// create listen fd first!
listenerFd, err := mmio.NewListenerFd(
  "127.0.0.1:12345", // serve at
  1024,              // backlog
  true,              // enable reuse addr
  true,              // enable reuse port
)
merrors.AssertError(err, "new listener fd")
// new listener
listener := mmio.NewListener(loop, int(listenerFd), onAccept)
listener.Start()
Accept New Conn Stream:
// 📌Note: in accept callback
stream := mmio.NewConnStream(
  ln.Loop().(*mmio.IOEvtLoop), // cast Loop to IOEventLoop 
  newFd,                         // incoming fd
  onStreamRead,                  // read callback
  )
stream.SetOnClose(onStreamClose) // register close callback
stream.Start()
Create Client Stream:
cliFd, err := mmio.NewConnFd(addr)
merrors.AssertError(err, "new client fd failed")
stream := mmio.NewConnStream(loop, int(cliFd), nil)
stream.SetOnConnect(func(sw mmio.StreamWriter, err error) {
  sw.Write([]byte("hello world!"), true)
})
stream.SetOnClose(func(sw mmio.StreamWriter, err error) {
  log.Println("client close :", err)
  // break loop...
  loop.Break()
})
stream.Start()

📌Stream.'Close' is safe to invoke multi times

📌Anytime you can't find out whether if Stream is 'Closing' or really been 'Closed',Just invoke Stream.'Close'

Example: Simple Read/Write/Close

package main

import (
	"github.com/guestin/mob/merrors"
	"github.com/guestin/mob/mio"
	"github.com/ooopSnake/mmio"
	"log"
)

func onRemoteStreamRead(sw mmio.StreamWriter, data []byte, len int) {
	log.Println("read remote conn:", string(data[:len]))
	_ = sw.Close()
}

func onRemoteStreamClose(sw mmio.StreamWriter, err error) {
	log.Println("remote conn closed,err:", err)
	// it's safe to call Close multi times
	_ = sw.Close() // close remote client
}

func onAccept(ln *mmio.Listener, newFd int, err error) {
	if err != nil {
		log.Printf("listener got error:%v\n", err)
		return
	}
	stream := mmio.NewConnStream(ln.Loop().(*mmio.IOEvtLoop), newFd, onRemoteStreamRead)
	stream.SetOnClose(onRemoteStreamClose)
	stream.Start()
}

func simpleClient(loop *mmio.IOEvtLoop, addr string) {
	cliFd, err := mmio.NewConnFd(addr)
	merrors.AssertError(err, "new client fd failed")
	stream := mmio.NewConnStream(loop, int(cliFd), nil)
	stream.SetOnConnect(func(sw mmio.StreamWriter, err error) {
		sw.Write([]byte("hello world!"), true)
	})
	stream.SetOnClose(func(sw mmio.StreamWriter, err error) {
		log.Println("client close :", err)
		_ = sw.Close() // dont forgot close stream itself
		// break loop...
		loop.Break()
	})
	stream.Start()
}

func main() {
	log.Println("simple listener start...")
	loop, err := mmio.NewIOEvtLoop(1024 * 4)
	merrors.AssertError(err, "new event loop")
	defer mio.CloseIgnoreErr(loop)
	// create listen fd first!
	listenerFd, err := mmio.NewListenerFd(
		"127.0.0.1:12345", // serve at
		1024,              // backlog
		true,              // enable reuse addr
		true,              // enable reuse port
	)
	merrors.AssertError(err, "new listener fd")
	// new listener
	listener := mmio.NewListener(loop, int(listenerFd), onAccept)
	defer mio.CloseIgnoreErr(listener)
	listener.Start()
	// start simple client
	simpleClient(loop, "127.0.0.1:12345")
	//
	loop.Run(nil)
	log.Println("simple listener exit...")
}

License

Released under the MIT License

Documentation

Index

Constants

View Source
const (
	TmFdNonblock     = unix.O_NONBLOCK
	TmFdCloexec      = unix.O_CLOEXEC
	TmFdTimerAbstime = 1 << 0
)
View Source
const DefaultIOEvtLoopBufferSize = 1024 * 4
View Source
const MSG_NOSIGNAL = unix.MSG_NOSIGNAL
View Source
const Readable = unix.EPOLLIN
View Source
const Writeable = unix.EPOLLOUT

Variables

View Source
var Debug = false

Functions

func ClockGetTime

func ClockGetTime(clockId ClockId) (*unix.Timespec, error)

func MakeIpcSockpair

func MakeIpcSockpair(nonblock bool) (fds [2]int, err error)

fd[0] for parent process fd[1] for child process nonblock : set socket nonblock

func Socket

func Socket(domain, typ, proto int) (fd int, err error)

func Spawn

func Spawn(exePath string, extraFd int) (*exec.Cmd, error)

func TimerFdCreate

func TimerFdCreate(clockId ClockId, flags int) (int, error)

func TimerFdGetTime

func TimerFdGetTime(fd int, curr *ITimerSpec) error

func TimerFdSetTime

func TimerFdSetTime(fd int, flags int, new *ITimerSpec, old *ITimerSpec) error

func WOULDBLOCK

func WOULDBLOCK(err error) bool

Types

type BaseUserData

type BaseUserData struct {
	// contains filtered or unexported fields
}

func (*BaseUserData) GetUserData

func (this *BaseUserData) GetUserData() interface{}

func (*BaseUserData) SetUserData

func (this *BaseUserData) SetUserData(data interface{})

type Bucket

type Bucket mapset.Set // type std.Set<BucketEntry>

type BucketEntry

type BucketEntry interface {
	io.Closer
	GetRefCounter() *int32
}

type BufferedStream

type BufferedStream struct {
	*Stream
	// contains filtered or unexported fields
}

func NewBufferedClientStream

func NewBufferedClientStream(loop *IOEvtLoop, fd int, onRead BufferedStreamOnRead) *BufferedStream

func NewBufferedConnStream

func NewBufferedConnStream(loop *IOEvtLoop, fd int, onRead BufferedStreamOnRead) *BufferedStream

type BufferedStreamOnRead

type BufferedStreamOnRead func(sw StreamWriter, buf mio.ReadableBuffer)

type ClockId

type ClockId int
const (
	ClockRealtime  ClockId = unix.CLOCK_REALTIME
	ClockMonotonic ClockId = unix.CLOCK_MONOTONIC
)

type Epoll

type Epoll struct {
	// contains filtered or unexported fields
}

func (*Epoll) AddFd

func (this *Epoll) AddFd(fd int, event uint32, watcher EventWatcher) error

func (*Epoll) Close

func (this *Epoll) Close() error

func (*Epoll) DelFd

func (this *Epoll) DelFd(fd int) error

func (*Epoll) ModFd

func (this *Epoll) ModFd(fd int, event uint32) error

func (*Epoll) Poll

func (this *Epoll) Poll(msec int) error

func (*Epoll) WatcherCtl

func (this *Epoll) WatcherCtl(action PollerAction, watcher EventWatcher) error

type EventLoop

type EventLoop interface {
	io.Closer
	RunInLoop(cb func())
	Notify()
	Run(ctx context.Context)
	Break()
	Poller() Poller
}

func NewEventLoop

func NewEventLoop() (EventLoop, error)

func NewEventLoop2

func NewEventLoop2(poller Poller, builder LoopNotifyBuilder) (EventLoop, error)

type EventSizeType

type EventSizeType = uint32

type EventWatcher

type EventWatcher interface {
	io.Closer
	GetFd() int
	GetEvent() EventSizeType
	SetEvent(event EventSizeType)
	Update(inLoop bool)
	OnEvent(event EventSizeType)
}

type Fd

type Fd int

func (Fd) Cloexec

func (this Fd) Cloexec(enable bool) error

best way to set cloexec

func (Fd) Close

func (this Fd) Close() error

func (Fd) FcntlGetFlag

func (this Fd) FcntlGetFlag() (flags int, err error)

func (Fd) FcntlSetFlag

func (this Fd) FcntlSetFlag(flag int) (err error)

func (Fd) NoneBlock

func (this Fd) NoneBlock(enable bool) error

type FdWatcher

type FdWatcher struct {
	BaseUserData
	// contains filtered or unexported fields
}

func NewFdWatcher

func NewFdWatcher(loop EventLoop, fd int, watcher IOWatcher) *FdWatcher

func (*FdWatcher) Close

func (this *FdWatcher) Close() error

func (*FdWatcher) DisableRW

func (this *FdWatcher) DisableRW() (update bool)

func (*FdWatcher) DisableRead

func (this *FdWatcher) DisableRead() (update bool)

func (*FdWatcher) DisableWrite

func (this *FdWatcher) DisableWrite() (update bool)

func (*FdWatcher) GetEvent

func (this *FdWatcher) GetEvent() EventSizeType

func (*FdWatcher) GetFd

func (this *FdWatcher) GetFd() int

func (*FdWatcher) Loop

func (this *FdWatcher) Loop() EventLoop

func (*FdWatcher) SetEvent

func (this *FdWatcher) SetEvent(event EventSizeType)

func (*FdWatcher) SetWatcher

func (this *FdWatcher) SetWatcher(watcher IOWatcher)

helper for driven class

func (*FdWatcher) Start

func (this *FdWatcher) Start()

func (*FdWatcher) Update

func (this *FdWatcher) Update(inLoop bool)

func (*FdWatcher) WantRead

func (this *FdWatcher) WantRead() (update bool)

func (*FdWatcher) WantWrite

func (this *FdWatcher) WantWrite() (update bool)

type FdWatcherMap

type FdWatcherMap struct {
	// contains filtered or unexported fields
}

func NewFdWatcherMap

func NewFdWatcherMap() *FdWatcherMap

func (*FdWatcherMap) Close

func (this *FdWatcherMap) Close() error

func (*FdWatcherMap) GetWatcher

func (this *FdWatcherMap) GetWatcher(fd int) EventWatcher

func (*FdWatcherMap) RmFd

func (this *FdWatcherMap) RmFd(fd int)

func (*FdWatcherMap) SetFd

func (this *FdWatcherMap) SetFd(fd int, watcher EventWatcher)

type GenericLoopNotify

type GenericLoopNotify struct {
	*FdWatcher
	// contains filtered or unexported fields
}

func (*GenericLoopNotify) Close

func (this *GenericLoopNotify) Close() error

func (*GenericLoopNotify) GetEvent

func (this *GenericLoopNotify) GetEvent() EventSizeType

func (*GenericLoopNotify) Notify

func (this *GenericLoopNotify) Notify()

func (*GenericLoopNotify) OnEvent

func (this *GenericLoopNotify) OnEvent(event EventSizeType)

func (*GenericLoopNotify) SetEvent

func (this *GenericLoopNotify) SetEvent(event EventSizeType)

type IOEvtLoop

type IOEvtLoop struct {
	EventLoop
	// contains filtered or unexported fields
}

func NewIOEvtLoop

func NewIOEvtLoop(ioBufferSize int) (*IOEvtLoop, error)

type IOWatcher

type IOWatcher interface {
	EventWatcher
	Loop() EventLoop
	WantRead() (update bool)
	DisableRead() (update bool)
	WantWrite() (update bool)
	DisableWrite() (update bool)
	DisableRW() (update bool)
}

type ITimerSpec

type ITimerSpec struct {
	ItInterval unix.Timespec
	ItValue    unix.Timespec
}

type Listener

type Listener struct {
	*FdWatcher
	// contains filtered or unexported fields
}

func NewListener

func NewListener(loop EventLoop, fd int, onAccept ListenerOnAccept) *Listener

func (*Listener) OnEvent

func (this *Listener) OnEvent(event EventSizeType)

type ListenerOnAccept

type ListenerOnAccept func(ln *Listener, newFd int, err error)

type LoopNotify

type LoopNotify interface {
	EventWatcher
	Notify()
}

func NewGenericLoopNotify

func NewGenericLoopNotify(loop EventLoop, wakeupCb func()) (LoopNotify, error)

func NewNotifyWatcher

func NewNotifyWatcher(loop EventLoop, wakeupCb func()) (LoopNotify, error)

type LoopNotifyBuilder

type LoopNotifyBuilder func(EventLoop, func()) (LoopNotify, error)
var DefaultLoopNotifyCreator LoopNotifyBuilder

type NotifyWatcher

type NotifyWatcher struct {
	*FdWatcher
	// contains filtered or unexported fields
}

func (*NotifyWatcher) GetEvent

func (this *NotifyWatcher) GetEvent() EventSizeType

func (*NotifyWatcher) Notify

func (this *NotifyWatcher) Notify()

func (*NotifyWatcher) OnEvent

func (this *NotifyWatcher) OnEvent(event EventSizeType)

func (*NotifyWatcher) SetEvent

func (this *NotifyWatcher) SetEvent(event EventSizeType)

type Poller

type Poller interface {
	io.Closer
	WatcherCtl(action PollerAction, watcher EventWatcher) error
	Poll(msec int) error
}

func NewEpoll

func NewEpoll(pollSize int) (Poller, error)

type PollerAction

type PollerAction int
const (
	Add PollerAction = iota
	Mod
	Del
)

type PollerBuilder

type PollerBuilder func(size int) (Poller, error)
var DefaultPollerCreator PollerBuilder

type SockFd

type SockFd int

func NewConnFd

func NewConnFd(addrS string) (SockFd, error)

func NewConnFd2

func NewConnFd2(version int, sockAddr unix.Sockaddr) (SockFd, error)

func NewListenerFd

func NewListenerFd(addrS string, backLog int, reuseAddr, reusePort bool) (SockFd, error)

fd with nonblock, cloexec default

func NewListenerFd2

func NewListenerFd2(version int, sockAddr unix.Sockaddr, backLog int, reuseAddr, reusePort bool) (SockFd, error)

fd with nonblock, cloexec default

func NewTcpSocketFd

func NewTcpSocketFd(version int, nonblock bool, cloexec bool) (SockFd, error)

create new socket , cloexec by default

func (SockFd) Accept

func (this SockFd) Accept(flags int) (nfd int, sa unix.Sockaddr, err error)

func (SockFd) Bind

func (this SockFd) Bind(sockAddr unix.Sockaddr) error

func (SockFd) Connect

func (this SockFd) Connect(addr unix.Sockaddr) error

func (SockFd) Listen

func (this SockFd) Listen(backLog int) error

func (SockFd) ReuseAddr

func (this SockFd) ReuseAddr(enable bool) error

func (SockFd) ReusePort

func (this SockFd) ReusePort(enable bool) error

type SpinLock

type SpinLock struct {
	// contains filtered or unexported fields
}

func NewLockedSpinLock

func NewLockedSpinLock() *SpinLock

func NewSpinLock

func NewSpinLock() *SpinLock

func (*SpinLock) Lock

func (this *SpinLock) Lock()

func (*SpinLock) Unlock

func (this *SpinLock) Unlock()

type Stream

type Stream struct {
	*FdWatcher
	// contains filtered or unexported fields
}

func NewClientStream

func NewClientStream(loop *IOEvtLoop, fd int, rcb StreamOnRead) *Stream

func NewConnStream

func NewConnStream(loop *IOEvtLoop, fd int, rcb StreamOnRead) *Stream

func (*Stream) Close

func (this *Stream) Close() error

it's safe to invoke Close multi times. ensure underlay resource has been cleanup

func (*Stream) OnEvent

func (this *Stream) OnEvent(event EventSizeType)

func (*Stream) SetOnClose

func (this *Stream) SetOnClose(cb StreamOnClose)

func (*Stream) SetOnConnect

func (this *Stream) SetOnConnect(cb StreamOnConnect)

func (*Stream) Write

func (this *Stream) Write(data []byte, inLoop bool)

type StreamMode

type StreamMode int
const (
	ModeConn StreamMode = iota
	ModeClient
)

type StreamOnClose

type StreamOnClose func(sw StreamWriter, err error)

type StreamOnConnect

type StreamOnConnect func(sw StreamWriter, err error)

type StreamOnRead

type StreamOnRead func(sw StreamWriter, data []byte, len int)

type StreamWriter

type StreamWriter interface {
	io.Closer
	Write(data []byte, inLoop bool)
	UserDataStorage
}

type SyscallSockAddr

type SyscallSockAddr struct {
	unix.Sockaddr
	Version int
}

func ResolveTcpAddr

func ResolveTcpAddr(addrS string) (*SyscallSockAddr, error)

type TimeWheel

type TimeWheel struct {
	// contains filtered or unexported fields
}

func NewTimeWheel

func NewTimeWheel(partTimeout, partCount uint16) *TimeWheel

func (*TimeWheel) Entries

func (this *TimeWheel) Entries() chan<- BucketEntry

func (*TimeWheel) Execute

func (this *TimeWheel) Execute(ctx context.Context)

type Timer

type Timer struct {
	*FdWatcher
	// contains filtered or unexported fields
}

func NewTimerWatcher

func NewTimerWatcher(loop EventLoop, clockId ClockId, onTick TimerOnTick) (*Timer, error)

func (*Timer) OnEvent

func (this *Timer) OnEvent(event EventSizeType)

func (*Timer) StartTimer

func (this *Timer) StartTimer(delayMs int, intervalMs int) error

func (*Timer) Stop

func (this *Timer) Stop() error

type TimerOnTick

type TimerOnTick func(*Timer)

type UnknownAFError

type UnknownAFError string

func (UnknownAFError) Error

func (e UnknownAFError) Error() string

type UserDataStorage

type UserDataStorage interface {
	GetUserData() interface{}
	SetUserData(interface{})
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL