Linux 与 IO
总结Linux中的5种IO模型,其中最常用的是IO多路复用,特别是epoll是各种网络框架的底层IO框架。
5 种 IO 模型
Blocking
- 用户程序调用系统调用 read(),应用程序阻塞;
- 当内核把数据准备好后,将数据从内核拷贝到用户内存;
- 内核返回,用户线程解除 block 状态。
Non-Blocking
- 用户进程轮询调用 recvfrom,直到内核将数据准备好;
- 轮询期间用户可以去做别的事;
- 当内核中的数据准备好了,再次接收到 recvfrom 调用时会将数据拷贝到用户内存并返回。
IO Multiplexing(多路复用)
又称为事件驱动 IO。
- 内核会同时监听多个 select 负责的 socket,当其中任何一个 socket 中的数据准备好了,select 就会返回;
- 当数据准备好了,用户进程再调用 read、内核会将数据从内核拷贝到用户进程。
优势:
- 能同时监听多个 socket;
缺点:
- 如果 socket 较少,因为有两次系统调用,性能甚至不如 BIO;
select
原理:如上所述
缺点:
- select 最大的缺陷是单个进程打开的 fd 数量有限(因为是用数组保存的);
- 对多个 socket 的扫描是线性轮询扫描的,效率较低,当 socket 较多时,select 会浪费很多 CPU 时间,且 socket 越多,这个现象会越明显;
- 需要一个大数据结构来存放打开的 fd,这样会使得用户空间和内核空间在传递该结构时复制开销大。
poll
原理:本质上与 select 没有区别,只是存储 fd 的结构是链表。
优点(相对 select):
- 没有最大连接数的限制
缺点:
- 大量 fd 组成的链表被整体复制于用户态和内核地址空间之间,很多时候这样的复制没有意义;
epoll
原理:
- 更灵活,使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的 copy 只需一次;
- 通过回调通知进程,而不是轮询。
优点:
- 没有描述符限制,epoll 会把所有创建的描述符放到一个事件表(文件)中由一个描述符统一管理;
- 效率更高、IO 的效率不会随着监视 fd 的数量的增长而下降,fd 变为活跃时才会调用 callback 函数通知用户进程,而不是通过轮询的方式;
- 使用 mmap()文件映射内存加速与内核空间的消息传递,减少了复制的开销。
进程调度原理
- 进程刚开始处于运行中状态,这时进程处于工作队列内,进程调度程序会分时执行各个运行状态的进程;
- 进程调用了
recv
这种需要阻塞的系统调用,这时,进程从运行状态变为等待状态,会被添加到对应socket中的等待队列内,这时进程调度程序不会分配时间片给该进程,因此也不会占用任何 CPU 资源; - socket 接收到数据后,操作系统将该 socket等待队列上的进程重新放回到工作队列,该进程变成运行状态,继续执行代码。也由于 socket 的接收缓冲区已经有了数据,recv 可以返回接收到的数据。
操作系统如何知道网络数据对应于哪个 socket
如上面进程调度原理所述,操作系统接收数据后需要唤醒 socket 上等待队列里的进程,但是操作系统接收数据后怎么知道网络数据属于哪个 socket 呢?
不妨回忆下我们平时创建 socket 时给出的参数:
1 | new Socket(serverHost, serverPort) |
实际上因为每个 socket 都对应着一个端口号,而网络数据包中包含了 ip 和端口的信息,内核可以通过端口号找到对应的 socket。
而且为了提高处理速度,操作系统会维护端口号到 socket 的索引结构。
多路复用的基石-select 及其存在的问题
前面我们讨论了操作系统如何唤醒进程,如果进程和 socket 是一一对应的,那么我们就实现了 BIO。
不过 epoll 等模型的特点是一个进程同时监听多个 socket,即一对多的多路复用。
最普通的 select 模型中,进程将所有监听的 socket 放到一个列表内,只要有一个 socket 监听到数据到,操作系统会唤醒该进程,然后程序需要遍历一次该列表,就可以拿到所有就绪的 socket。
这种方式的缺点主要是:
- 有两次遍历过程:每次 socket 读取到数据后需要遍历该 socket 的等待队列;每次进程被唤醒后,都需要遍历一次它监听的 socket 列表。
这里涉及了两次遍历,而且每次都要将整个 fds 列表传递给内核,有一定的开销。正是因为遍历操作开销大,出于效率的考量,才会规定 select 的最大监视数量,默认只能监视 1024 个 socket。
poll 与 select 没有本质区别,只不过 poll 使用链表来保存监听的 socket,没有最大监听数量限制。
为什么使用 epoll
- 功能分离
传统 select 每次调用 select 都需要将运行队列中的进程添加到 socket 的等待队列,唤醒时再移除,然而大多数应用场景中,需要监视的 socket 相对固定,并不需要每次都修改。
epoll 的改进是只添加一次等待队列,之后每次都只需要阻塞进程,如下图所示: - 就绪列表
epoll 中,操作系统唤醒进程后,进程不需要遍历所有监听的 socket,因为收到数据的 socket 会被添加到一个就绪队列内,然后唤醒等待队列中的所有进程,这时进程只需要遍历就绪队列即可拿到所有接收到数据的 socket。
kqueue
与 epoll 非常相似,注册一批 socket 描述符到 kqueue 以后,当其中的描述符状态发生变化时,kqueue 将一次性通知应用程序哪些描述符可读、可写或出错了。
可能是因为不同 Linux 发行版之间存在接口兼容性问题,所以在已有 epoll 的情况下又有人开发了 kqueue,kqueue 主要应用在 freebsd 和 macosx 内核的操作系统上。
Reactor
Reactor 是 Java 中的一种设计模式:
- 事件驱动;
- 一次性处理一个或多个事件(输入源);
- 通过 Service Handler 同步的将输入事件(Event)采用多路复用分发给相应的 Request Handler(多个)处理。
Reactor 的实现上有 3 种模式:
- 单 Reactor 单线程模型如代码可知,Reactor 线程启动后监听客户端请求、多路分离 socket,将 connect 事件分发给 Acceptor 处理,有 IO 读写事件后交给 Handler 处理,分发过程中并没有创建新线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115@Slf4j
public class Reactor implements Runnable {
private Selector selector;
public Reactor(int port) throws Exception {
ServerSocketChannel server = ServerSocketChannel.open();
ServerSocket serverSocket = server.socket();
serverSocket.bind(new InetSocketAddress(port));
// 设置为非阻塞模式
server.configureBlocking(false);
selector = Selector.open();
SelectionKey sk = server.register(selector,
SelectionKey.OP_ACCEPT);
// attach Acceptor 处理新连接
sk.attach(new Acceptor(server, selector));
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
it.remove();
//分发事件处理
dispatch((SelectionKey) (it.next()));
}
}
} catch (IOException ex) {
log.info("异常", ex);
}
}
void dispatch(SelectionKey k) {
// 若是连接事件获取是acceptor
// 若是IO读写事件获取是handler
Runnable runnable = (Runnable) (k.attachment());
if (runnable != null) {
runnable.run();
}
}
}
@Slf4j
public class Acceptor implements Runnable {
private ServerSocketChannel serverSocket;
private Selector selector;
private ExecutorService tp = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
public Acceptor(ServerSocketChannel serverSocket,
Selector selector) {
this.serverSocket = serverSocket;
this.selector = selector;
}
@Override
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null) {
// 每个连接一个Handler
tp.submit(new SimpleHandler(c, selector));
}
} catch (Exception e) {
log.info("异常", e);
}
}
}
@Slf4j
public class SimpleHandler implements Runnable {
public static final int READING = 0, WRITING = 1;
int state;
private SocketChannel socketChannel;
private SelectionKey sk;
public SimpleHandler(SocketChannel socketChannel, Selector selector) throws IOException {
this.state = READING;
this.socketChannel = socketChannel;
sk = socketChannel.register(selector, SelectionKey.OP_READ);
sk.attach(this);
socketChannel.configureBlocking(false);
}
@Override
public void run() {
if (state == READING) {
read();
} else if (state == WRITING) {
write();
}
}
private void read() {
// TODO 处理读
//下一步处理写事件
sk.interestOps(SelectionKey.OP_WRITE);
this.state = WRITING;
}
private void write() {
// TODO 处理写
//下一步处理读事件
sk.interestOps(SelectionKey.OP_READ);
this.state = READING;
}
}
这种单线程模型不能充分利用多核资源,所以实际使用不多。 - 单 Reactor 多线程模型
与上面的区别是 Handler 通过多线程来执行读写任务:
1 | public class MultiThreadHandler implements Runnable { |
- 多 Reactor 多线程模型
相对上面两种来说,第三种将 Reactor 分成了两部分:- mainReactor 负责监听 server socket,用来处理新连接的建立,将建立的 socketChannel 指定注册给 subReactor。
- subReactor 维护自己的 selector, 基于 mainReactor 注册的 socketChannel 多路分离 IO 读写事件,读写网络数据,对业务处理的功能,将其扔给 worker 线程池来完成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108/**
* 多work 连接事件Acceptor,处理连接事件
*/
public class MultiWorkThreadAcceptor implements Runnable {
// cpu线程数相同多work线程
private int workCount =Runtime.getRuntime().availableProcessors();
private SubReactor[] workThreadHandlers = new SubReactor[workCount];
private volatile int nextHandler = 0;
public MultiWorkThreadAcceptor() {
this.init();
}
public void init() {
nextHandler = 0;
for (int i = 0; i < workThreadHandlers.length; i++) {
try {
workThreadHandlers[i] = new SubReactor();
} catch (Exception e) {
}
}
}
@Override
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null) {// 注册读写
synchronized (c) {
// 顺序获取SubReactor,然后注册channel
SubReactor work = workThreadHandlers[nextHandler];
work.registerChannel(c);
nextHandler++;
if (nextHandler >= workThreadHandlers.length) {
nextHandler = 0;
}
}
}
} catch (Exception e) {
}
}
}
/**
* 多work线程处理读写业务逻辑
*/
@Slf4j
public class SubReactor implements Runnable {
private final Selector mySelector;
//多线程处理业务逻辑
private int workCount = Runtime.getRuntime().availableProcessors();
private ExecutorService executorService = Executors.newFixedThreadPool(workCount);
public SubReactor() throws Exception {
// 每个SubReactor 一个selector
this.mySelector = SelectorProvider.provider().openSelector();
}
/**
* 注册chanel
*/
public void registerChannel(SocketChannel sc) throws Exception {
sc.register(mySelector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT);
}
@Override
public void run() {
while (true) {
try {
//每个SubReactor 自己做事件分派处理读写事件
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
read();
} else if (key.isWritable()) {
write();
}
}
} catch (Exception e) {
log.warn("出错", e);
}
}
}
private void read() {
//任务异步处理
executorService.submit(() -> process());
}
private void write() {
//任务异步处理
executorService.submit(() -> process());
}
/**
* task 业务处理
*/
public void process() {
//do IO ,task,queue something
}
}
Signal Driven IO(信号驱动的 IO)
用户注册一个信号处理函数,一旦数据准备好,由内核生成一个 SIGIO 信号,通知数据已经准备好的事件,之后用户进程可以调用 recvfrom 把数据从内核拷贝出来并返回结果。
AIO
与信号驱动 IO 的主要区别是:在信号驱动的 I/O 中,内核告诉我们何时可以启动 I/O 操作,但是异步 I/O 时,内核告诉我们何时 I/O 操作完成。
Proactor
5 种 IO 模型的比较
阻塞
1 | int main() |
- clib库函数recv会执行到recvfrom系统调用;
- 进入系统调用后,用户进程就进入到了内核态,通过执行一系列的内核协议层函数,然后到 socket 对象的接收队列中查看是否有数据,没有的话就把自己添加到 socket 对应的等待队列里。最后让出CPU,操作系统会选择下一个就绪状态的进程来执行。
如下图的第4、5步即阻塞:将当前进程添加到socket的等待队列中、并修改当前进程的状态1
2
3
4
5
6
7
8
9
10
11
12
13//file: kernel/wait.c
void
prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state)
{
unsigned long flags;
wait->flags &= ~WQ_FLAG_EXCLUSIVE;
spin_lock_irqsave(&q->lock, flags);
if (list_empty(&wait->task_list))
__add_wait_queue(q, wait);
set_current_state(state);
spin_unlock_irqrestore(&q->lock, flags);
} - 当有数据到达的时候,内核首先将数据包放到该 socket 的接收队列中。然后扫描一下 socket 等待队列,唤醒正等待该socket的进程
1
2
3
4
5
6
7
8
9
10
11
12
13
14//file: kernel/sched/core.c
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, int wake_flags, void *key)
{
wait_queue_t *curr, *next;
list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
unsigned flags = curr->flags;
if (curr->func(curr, mode, wake_flags, key) &&
(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
}
}
优化
pageCache
- Linux里所有处理文件的IO请求均需经过pageCache,对于操作系统来说,磁盘文件都是由一系列的数据块顺序组成。
比如要读的磁盘位置若未加载到pageCache(pageCache中的每一个数据块都设置了文件以及偏移量地址信息),则发起一次缺页中断,由操作系统加载该页到pageCache中的一个空闲块,并注册到页表,然后再copy到用户缓冲区中。换进新页面时会通过页面置换算法将老页面换出,Linux中一般是基于LRU实现的。 - 预读取(局部性原理)
page cache本身也会对数据文件进行预读取,对于每个文件的第一个读请求操作,系统在读入所请求页面的同时会读入紧随其后的少数几个页面。
PageCache机制也不是完全无缺点的,当遇到OS进行脏页回写,内存回收,内存swap等情况时,就会引起较大的消息读写延迟,优化方式主要包括内存预分配,文件预热和mlock系统调用。
零拷贝
零拷贝技术主要有以下几种:
- 直接 I/O:对于这种数据传输方式来说,应用程序可以直接访问硬件存储,操作系统内核只是辅助数据传输:这类零拷贝技术针对的是操作系统内核并不需要对数据进行直接处理的情况,数据可以在应用程序地址空间的缓冲区和磁盘之间直接进行传输,完全不需要 Linux 操作系统内核提供的页缓存的支持。
- 在数据传输的过程中,避免数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间进行拷贝。有的时候,应用程序在数据进行传输的过程中不需要对数据进行访问,那么,将数据从 Linux 的pageCache拷贝到用户进程的缓冲区中就可以完全避免,传输的数据在pageCache中就可以得到处理。在某些特殊的情况下,这种零拷贝技术可以获得较好的性能。Linux 中提供类似的系统调用主要有
mmap()
,sendfile()
以及splice()
。 - 对数据在 Linux 的页缓存和用户进程的缓冲区之间的传输过程进行优化。该零拷贝技术侧重于灵活地处理数据在用户进程的缓冲区和操作系统的页缓存之间的拷贝操作。这种方法延续了传统的通信方式,但是更加灵活。在 Linux 中,该方法主要利用了写时复制技术。
当应用程序访问某块数据时,操作系统首先会检查,是不是最近访问过此文件,文件内容是否缓存在内核缓冲区,如果是,操作系统则直接根据 read 系统调用提供的 buf 地址,将内核缓冲区的内容拷贝到 buf 所指定的用户空间缓冲区中去。如果不是,操作系统则首先将磁盘上的数据拷贝的内核缓冲区,这一步目前主要依靠 DMA 来传输,然后再把内核缓冲区上的内容拷贝到用户缓冲区中。 接下来,write 系统调用再把用户缓冲区的内容拷贝到网络堆栈相关的内核缓冲区中,最后 socket 再把内核缓冲区的内容发送到网卡上。
这期间,一共进行了 4 次数据拷贝,且用户态和内核态也发生了多次上下文切换。
从代码层面上来看,从硬盘上将文件读入内存,都要经过文件系统进行数据拷贝,并且数据拷贝操作是由文件系统和硬件驱动实现的,理论上来说,拷贝数据的效率是一样的。
但是通过内存映射的方法访问硬盘文件效率会比 read 和 write 系统调用高:
- read()是系统调用,首先将文件从硬盘拷贝到内核空间的一个缓冲区,再将这些数据拷贝到用户空间,实际上进行了两次数据拷贝;
- map()也是系统调用,但没有进行数据拷贝,当缺页中断发生时,直接将文件从硬盘拷贝到用户空间,只进行了一次数据拷贝。
mmap
mmap是零拷贝的实现方案,mmap 将内核缓冲区与应用程序缓冲区共享,减少了 2 次拷贝及拷贝过程中必经的上下文切换过程。
Java 中的零拷贝
Java 中的 NIO 相对 BIO 的优势之一就是 NIO 提供了零拷贝提高了 IO 速度,实现是MappedByteBuffer
,核心方法是map()
,该方法把文件映射到内存中,获得内存地址 addr,然后通过这个 addr 构造MappedByteBuffer
类,以暴露各种文件操作 API。
1 | public class MappedByteBufferTest { |
MappedByteBuffer
本身是一个抽象类,它申请的是堆外内存,因此不受 Minor GC 控制,只能在发生 Full GC 时才能被回收。而DirectByteBuffer
改善了这一情况,原理是在MappedByteBuffer
的基础上,维护了一个 Cleaner 对象来完成内存的回收工作,因此它既可以通过 Full GC 来回收内存,也可以调用 clean()方法来进行回收。
1 | if (address == 0) { |
读取内存和写入数据都是直接调用 Unsafe 的对应接口 getByte 和 putByte,通过(address + 偏移量)获取指定内存的数据。:
1 | public byte get(int i) { |
- 第一次访问 address 所指向的内存区域,导致缺页中断,中断响应函数会在交换区中查找相对应的页面,如果找不到(也就是该文件从来没有被读入内存的情况),则从硬盘上将文件指定页读取到物理内存中(非 jvm 堆内存)。
- 如果在拷贝数据时,发现物理内存不够用,则会通过虚拟内存机制(swap)将暂时不用的物理页面交换到硬盘的虚拟内存中。
MappedByteBuffer 还有一个兄弟 HeapByteBuffer,它在堆中申请内存,本质上是一个数组,由于位于堆中,因此可受 GC 管控,易于回收。
C10K问题
两个限制:
- 可打开文件句柄数的限制
可以通过ulimit命令调整。 - 线程数的限制
注意内存容量限制,线程本身会占用堆栈空间内存。
解决C10K问题采用的IO模型:epoll
epoll具体原理描述见之前的小节。
参考
- 为什么说NIO是同步非阻塞的?和BIO有什么区别?
非阻塞指的是:通过轮询selector监听socket是否来数据,而不是阻塞等待。
同步指的是:一个socket来数据后需要创建一个线程读取,读取过程是同步的。
异步指的是:读取数据由操作系统做好,业务线程等着数据读取完毕的通知。 - 多路复用机制如何支持海量连接?
多路复用可以用很少的线程监听很多客户端的请求。
因此多路复用常用于解决C10K问题。