ddio

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2022 License: MIT Imports: 20 Imported by: 1

README

ddio

ddio是nyan内部使用的高性能事件库,它关注海量长/短连接下的性能

Thanks

感谢这些项目,本项目的一些想法从这些项目借鉴

Documentation

Overview

Package event_poll 负责管理和处理epoll事件的包

Index

Constants

View Source
const (
	ONCE_MAX_EVENTS = 1024
	BUFFER_SIZE     = 4096
)
View Source
const (
	DEFAULT_BLOCK              = 4096                 // 一个内存块的默认大小
	DEFAULT_POOL_SIZE          = DEFAULT_BLOCK * 8192 // 默认池大小 (DEFAULT_BLOCK * 8192) B
	FreeBufferZeroBase uintptr = math.MaxInt
)
View Source
const (
	// MAX_MASTER_LOOP_SIZE 负责监听接收新连接的主Reactor的goroutine最大数量
	MAX_MASTER_LOOP_SIZE = 32
	// MAX_SLAVE_LOOP_SIZE 主Reactor绑定的负责处理连接事件的从Reactor的goroutine最大数量
	MAX_SLAVE_LOOP_SIZE = 64
	// MAX_POLLER_ONCE_EVENTS 各底层Poller一次最多响应的就绪事件
	MAX_POLLER_ONCE_EVENTS = 1024
	// EVENT_LOOP_SLEEP 事件循环阻塞等待就绪事件的默认超时时间
	EVENT_LOOP_SLEEP = time.Second
)
View Source
const (
	TCP_V4 = 0x80
	TCP_V6 = 0x90
)
View Source
const (
	EPOLLIN      = unix.EPOLLIN
	EPOLLET      = unix.EPOLLET
	EPOLLONESHOT = unix.EPOLLONESHOT
)

Epoll Flags

Variables

View Source
var (
	ErrorEpollClosed = errors.New("epoll is closed")
	ErrRead          = errors.New("read error: ")
	ErrWrite         = errors.New("write error: ")
)

一些错误

View Source
var (
	// ErrTimerClosed 时间堆已经被关闭
	ErrTimerClosed = errors.New("timer is closed")
	// ErrTimerFull 时间堆的定时任务数量已经到达设定的最大值
	ErrTimerFull = errors.New("time is full")
)
View Source
var (
	DefaultConfig = ConnConfig{
		OnDataNBlock:                     1,
		MaxReadSysCallNumberOnEventLoop:  1024,
		MaxWriteSysCallNumberOnEventLoop: 1024,
	}
)
View Source
var (
	ErrConnClosed = errors.New("conn is closed")
)

Functions

func NewEpoll

func NewEpoll() (*epoll, error)

func NewPoller

func NewPoller() (*poller, error)

Types

type AfterHandler

type AfterHandler func(fd int) (error, bool)

AfterHandler 是一对回调函数,bool用于指示在后续的声明周期中是否还需要调用 否则立即调用result-handler,用于Non-Block IO

type AfterResultHandler

type AfterResultHandler func(fd int, err error) error

type Balanced

type Balanced interface {
	// Name 负载均衡器的名字或者其算法名
	Name() string
	// Target 输入的Seek为ConnectionEventHandler的数量
	// 负载均衡器需要给出一个正确的目标
	// connLen为子Reactor的数量,fd表示新接收连接的文件描述符的值
	Target(connLen, fd int) int
}

Balanced 自定义负载均衡器的接口

type Conn

type Conn interface {
	// TakeReadBytes 拿出所有读取的字节
	TakeReadBytes() []byte
	// WriteBytes 该接口针对小数据量
	WriteBytes(p []byte)
	/*
		RegisterAfterHandler 注册处理后续读数据的处理器。
		注册的after-handler不会影响写缓冲区的数据发送,写缓冲区的数据写入完成
		发生在after-handler被调用之前。如果希望after-handler处理完之后触发写之类
		的操作,可以注册after-result-handler,该处理器在after-handler由于出错或者无错完成的时候被触发
	*/
	// TODO 是否可用于将之后的数据交给ZeroCopy系列函数来处理
	RegisterAfterHandler(ahd AfterHandler, rhd AfterResultHandler)
	// Next 设置了OnDataNBlock时读不到完整的数据时
	// 可以调用该方法接着读取N * Block的内容
	// 调用该方法会使OnData事件重新触发
	Next(nBlock int)
	// Close 关闭连接
	// 非立即关闭,采取延迟关闭的策略
	Close() error
	// Addr 获取兼容net包的Socket Addr
	Addr() net.Addr
	SetDeadLine(deadline time.Duration) error
	SetTimeout(timeout time.Duration) error
}

type ConnConfig

type ConnConfig struct {
	// 触发OnData数据最多需要多少个Buffer Block
	// 一个Block大小为4KB
	OnDataNBlock int
	// 尝试Non-Block read()的最大次数
	MaxReadSysCallNumberOnEventLoop int
	// 尝试Non-Block write()的最大次数
	MaxWriteSysCallNumberOnEventLoop int
}

type ConnMultiEventDispatcher

type ConnMultiEventDispatcher struct {
	// contains filtered or unexported fields
}

ConnMultiEventDispatcher 从多路事件派发器

func NewConnMultiEventDispatcher

func NewConnMultiEventDispatcher(ctx context.Context, wg *sync.WaitGroup, handler ConnectionEventHandler, connConfig ConnConfig) (*ConnMultiEventDispatcher, error)

func (*ConnMultiEventDispatcher) AddConnEvent

func (p *ConnMultiEventDispatcher) AddConnEvent(ev *Event) error

type ConnectionEventHandler

type ConnectionEventHandler interface {
	// OnInit 初始化连接的一些参数
	// 注意: 该方式是在读事件就绪时被触发
	OnInit() ConnConfig

	// OnData 接收到完整数据时触发
	// 接收的数据量可以被设置
	OnData(conn *TCPConn) error

	// OnClose 对端关闭事件触发
	OnClose(ev Event) error

	// OnError 错误事件触发
	OnError(ev Event, err error)
}

ConnectionEventHandler 连接事件处理器

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine 实例

func NewEngine

func NewEngine(handler ListenerEventHandler, config *EngineConfig) (*Engine, error)

func (*Engine) Close

func (e *Engine) Close() error

type EngineConfig

type EngineConfig struct {
	// 连接处理器
	ConnHandler ConnectionEventHandler
	// 负载均衡器的工厂函数
	NBalance NewBalance
	// 绑定的地址
	// Protocol://ip:port?level=n
	// level设定地址的优先级,优先级越高为该地址分配的监听线程和处理线程就越多
	// Level总量是10,设置的数量是为该地址分配的监听线程的百分比,最少为1个
	//   Example: tcp://127.0.0.1:8080?level=5
	MultiAddr []string
}

type Event

type Event struct {
	// contains filtered or unexported fields
}

func (Event) Flags

func (e Event) Flags() EventFlags

type EventFlags

type EventFlags int

EventFlags 通用的事件掩码

const (
	EVENT_READ     EventFlags = 0x01   // 监听可读事件
	EVENT_WRITE    EventFlags = 0x10   // 监听可写事件
	EVENT_CLOSE    EventFlags = 0x100  // 监听连接关闭事件
	EVENT_LISTENER EventFlags = 0x04   // 监听连接建立事件
	EVENT_ERROR    EventFlags = 0x1000 // 监听错误事件
)

type EventLoop

type EventLoop interface {
	// Exec 开启事件循环
	// 参数的一些要求,否则可能会出现索引越界等异常
	// Receiver Cap >= maxEvent
	// 各底层Poller最多支持一次性处理 MAX_POLLER_ONCE_EVENTS 个事件
	// nEvent代表发生了多少个事件,如果有错误它为0,同时err != nil
	Exec(receiver []Event, timeOut time.Duration) (nEvent int, err error)

	// Exit 退出事件循环
	Exit() error

	// With 往轮询器中添加事件
	With(event Event) error

	// Modify 修改轮询器中事件的属性
	// 一般用于Epoll OnceShot事件
	Modify(event Event) error

	// Cancel 取消事件的监听
	Cancel(event Event) error

	// AllEvents 获取所有监听的事件
	AllEvents() []Event
}

EventLoop 事件循环要实现的接口

type ListenerConfig

type ListenerConfig struct {
	ConnEHd       ConnectionEventHandler
	Balance       Balanced
	NetPollConfig *NetPollConfig
}

type ListenerEventHandler

type ListenerEventHandler interface {
	// OnInit 初始化监听者事件处理器时调用的方法
	OnInit(config *NetPollConfig) (*Event, error)
	// OnAccept 有新连接到来时调用的方法
	OnAccept(ev Event) (connFd int, err error)
	// OnClose 客户端退出建立连接阶段调用的方法
	OnClose(ev Event) error
	// OnError 事件循环出错时调用的方法
	OnError(ev Event, err error)
}

ListenerEventHandler 监听事件处理器

type ListenerMultiEventDispatcher

type ListenerMultiEventDispatcher struct {
	// contains filtered or unexported fields
}

ListenerMultiEventDispatcher 主多路事件派发器

func NewListenerMultiEventDispatcher

func NewListenerMultiEventDispatcher(ctx context.Context, wg *sync.WaitGroup, handler ListenerEventHandler, config *ListenerConfig) (*ListenerMultiEventDispatcher, error)

type MemoryPool

type MemoryPool struct {
	// contains filtered or unexported fields
}

func NewBufferPool

func NewBufferPool(block, size int) *MemoryPool

NewBufferPool block 是块大小,会转换为2的N次方 size 是池能容纳的块数量,会转换为2的N次方 -1则使用默认配置

func (*MemoryPool) AllocBuffer

func (p *MemoryPool) AllocBuffer(n int) ([]byte, bool)

AllocBuffer variable == n * 256

func (*MemoryPool) BlockSize

func (p *MemoryPool) BlockSize() int

func (*MemoryPool) FreeBuffer

func (p *MemoryPool) FreeBuffer(ptr *[]byte)

FreeBuffer 释放分配出去的Buffer内存

func (*MemoryPool) Grow

func (p *MemoryPool) Grow(ptr *[]byte, nBlock int) bool

Grow 扩容原来的Buffer,可指定的扩容大小为n * p.block 扩容之后的Buffer长度被设置为跟未扩容前的Buffer一致,这是为了兼容性考虑 Example

pool := NewBufferPool(13,10)
pool.Grow(&buffer,2)
fmt.Println(cap(buffer))

Output:

16384

func (*MemoryPool) IsAlloc

func (p *MemoryPool) IsAlloc(buf []byte) bool

func (*MemoryPool) Size

func (p *MemoryPool) Size() int

type NetPollConfig

type NetPollConfig struct {
	// 程序监听了多个地址
	IsMultiAddr bool
	Protocol    int
	IP          net.IP
	Port        int
}

type NewBalance

type NewBalance func() Balanced

NewBalance 派生负载均衡器的工厂方法

type RoundBalanced

type RoundBalanced struct {
	// contains filtered or unexported fields
}

func (*RoundBalanced) Name

func (r *RoundBalanced) Name() string

func (*RoundBalanced) Target

func (r *RoundBalanced) Target(connLen, fd int) int

type TCPConn

type TCPConn struct {
	// contains filtered or unexported fields
}

func (*TCPConn) Addr

func (T *TCPConn) Addr() net.Addr

func (*TCPConn) Close

func (T *TCPConn) Close() error

Close 设置一个关闭标志,事件循环会审查这个标志,在写入完缓存区的数据或者出错时会将其关闭

func (*TCPConn) Next

func (T *TCPConn) Next(nBlock int)

func (*TCPConn) RegisterAfterHandler

func (T *TCPConn) RegisterAfterHandler(hd AfterHandler)

func (*TCPConn) SetDeadLine

func (T *TCPConn) SetDeadLine(deadline time.Duration) error

func (*TCPConn) SetTimeout

func (T *TCPConn) SetTimeout(timeout time.Duration) error

func (*TCPConn) TakeReadBytes

func (T *TCPConn) TakeReadBytes() []byte

func (*TCPConn) TakeWriteBuffer

func (T *TCPConn) TakeWriteBuffer() *[]byte

func (*TCPConn) WriteBytes

func (T *TCPConn) WriteBytes(p []byte)

type TCPListener

type TCPListener struct {
	// contains filtered or unexported fields
}

TCPListener Main Reactor 负责监听事件

func NewTCPListener

func NewTCPListener(event EventFlags) *TCPListener

func (*TCPListener) OnAccept

func (T *TCPListener) OnAccept(ev Event) (connFd int, err error)

func (*TCPListener) OnClose

func (T *TCPListener) OnClose(ev Event) error

func (*TCPListener) OnError

func (T *TCPListener) OnError(ev Event, err error)

func (*TCPListener) OnInit

func (T *TCPListener) OnInit(config *NetPollConfig) (*Event, error)

type TimerTask

type TimerTask func(data interface{}, timeOut time.Duration)

TimerTask 定时器的描述

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL