Redis 进程和 IO 模型

为什么 Redis 这么快

Redis 采用的是一种单线程工作模型,它能这么快主要归功于下面几个策略:

  1. 完全基于内存,绝大部分请求是纯粹的内存操作,非常快速。数据存在内存中,类似于 HashMap,HashMap 的优势就是查找和操作的时间复杂度都是 O(1);
  2. 数据结构简单,对数据操作也简单,Redis 中的数据结构是专门进行设计的;
  3. 采用单线程,避免了不必要的上下文切换和竞争条件,也不存在多进程或者多线程导致的切换而消耗 CPU,不用去考虑各种锁的问题,不存在加锁释放锁操作,没有因为可能出现死锁而导致的性能消耗;
  4. 使用底层模型不同,它们之间底层实现方式以及与客户端之间通信的应用协议不一样,Redis 直接自己构建了 VM 机制 ,因为一般的系统调用系统函数的话,会浪费一定的时间去移动和请求;
  5. 使用多路 I/O 复用模型,非阻塞 IO;
    多路 I/O 复用模型是利用 select、poll、epoll 可以同时监察多个流的 I/O 事件的能力,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有 I/O 事件时,就从阻塞态中唤醒,于是程序就会轮询一遍所有的流(epoll 是只轮询那些真正发出了事件的流),并且只依次顺序的处理就绪的流,这种做法就避免了大量的无用操作。
    Redis-Client 在操作的时候,会产生具有不同事件类型的 socket,在服务端,有一段 I/O 多路复用程序,将其置入队列之中,然后,文件事件分派器依次去队列中取,转发到不同的事件处理器中(对这个 I/O 多路复用机制,Redis 还提供了 select、epoll、evport、kqueue 等多路复用函数库)。
    这里“多路”指的是多个网络连接,“复用”指的是复用同一个线程。采用多路 I/O 复用技术可以让单个线程高效的处理多个连接请求(尽量减少网络 IO 的时间消耗),且 Redis 在内存中操作数据的速度非常快,也就是说内存内的操作不会成为影响 Redis 性能的瓶颈,主要由以上几点造就了 Redis 具有很高的吞吐量。
    多路IO复用模型

一些常见的进程模型

  1. 单进程多线程模型:MySQL、Memcached、Oracle(Windows 版本);
  2. 多进程模型:Oracle(Linux 版本);
  3. Nginx 有两类进程,一类称为 Master 进程(相当于管理进程),另一类称为 Worker 进程(实际工作进程)。启动方式有两种:
    1. 单进程启动:此时系统中仅有一个进程,该进程既充当 Master 进程的角色,也充当 Worker 进程的角色。
    2. 多进程启动:此时系统有且仅有一个 Master 进程,至少有一个 Worker 进程工作。
    3. Master 进程主要进行一些全局性的初始化工作和管理 Worker 的工作;事件处理是在 Worker 中进行的。

为什么是 NIO

对于优化单个 server 节点的网络层,多使用 NIO 方式,server 端与 client 端在多次通讯的情况下使用 TCP 长连接维持会话,比如 Redis epoll 模型,RocketMq 的 netty 模型
对于高性能 Server 节点,在处理好网络请求同时,还要保证 server 端逻辑可以快速执行完成,这就涉及到合理的数据结构与线程模型。
在 Redis 中,采用的是 Reactor 模式实现文件事件处理器:
Redis-事件处理模型

  1. IO 多路复用
    根据平台不同选择不同的 IO 复用模型,比如 Linux 就是选择 epoll,select 是备选方案,不过正常情况下根本不会采用,因为 select 效率低,且有文件描述符监听上限。
  2. 封装不同 IO 模型,为事件处理器提供统一接口

Redis单线程多路复用IO模型实现 - 事件注册

Redis服务器的初始化过程中包括了对事件处理器的初始化。
1、服务器启动期间初始化事件处理器
服务器初始化代码:redis.c/initServer
初始化事件处理器代码:ae.c/aeCreateEventLoop
2、根据系统的不同,选择不同的底层IO事件处理实现
比如linux的话,会选择epoll作为实现:epoll.c/aeApiCreate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/*
* 创建一个新的 epoll 实例,并将它赋值给 eventLoop
*/
static int aeApiCreate(aeEventLoop *eventLoop) {

aeApiState *state = zmalloc(sizeof(aeApiState));

if (!state) return -1;

// 初始化事件槽空间
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}

// 创建 epoll 实例
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}

// 赋值给 eventLoop
eventLoop->apidata = state;
return 0;
}

当服务器需要监听某些事件时,会注册对这些事件的监听器,下面是注册监听器的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/*
* 根据 mask 参数的值,监听 fd 文件的状态,
* 当 fd 可用时,执行 proc 函数
*/
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}

if (fd >= eventLoop->setsize) return AE_ERR;

// 取出文件事件结构
aeFileEvent *fe = &eventLoop->events[fd];

// 监听指定 fd 的指定事件
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;

// 设置文件事件类型,以及事件的处理器
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;

// 私有数据
fe->clientData = clientData;

// 如果有需要,更新事件处理器的最大 fd
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;

return AE_OK;
}

比如Redis服务器要接受客户端的请求,就要注册一个监听连接事件,回调函数中会为客户端连接创建一个socket,并注册可读文件事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void initServer() {

...

/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
// 为 TCP 连接关联连接应答(accept)处理器
// 用于接受并应答客户端的 connect() 调用
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}

...
}

Redis单线程多路复用IO模型实现 - 事件循环(处理)

Redis中定义了两种事件:时间事件TimeEvents、文件事件FileEvents:

  • TimeEvents:一般都是一些定时任务,实际上现在时间事件只应用于服务器启动时注册的serverCron定时任务的执行;
  • FileEvents:socket文件的IO事件,比如上面的监听连接的事件就是文件事件。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    /*
    * 事件处理器的主循环
    */
    void aeMain(aeEventLoop *eventLoop) {

    eventLoop->stop = 0;

    while (!eventLoop->stop) {

    // 如果有需要在事件处理前执行的函数,那么运行它
    if (eventLoop->beforesleep != NULL)
    eventLoop->beforesleep(eventLoop);

    // 开始处理事件
    aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
    }

    // 1、看是否有事件到达了执行时间
    // 2、如果有,则执行这些事件
    /* Process every pending time event, then every pending file event
    * (that may be registered by time event callbacks just processed).
    *
    * 处理所有已到达的时间事件,以及所有已就绪的文件事件。
    *
    * Without special flags the function sleeps until some file event
    * fires, or when the next time event occurs (if any).
    *
    * 如果不传入特殊 flags 的话,那么函数睡眠直到文件事件就绪,
    * 或者下个时间事件到达(如果有的话)。
    *
    * If flags is 0, the function does nothing and returns.
    * 如果 flags 为 0 ,那么函数不作动作,直接返回。
    *
    * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
    * 如果 flags 包含 AE_ALL_EVENTS ,所有类型的事件都会被处理。
    *
    * if flags has AE_FILE_EVENTS set, file events are processed.
    * 如果 flags 包含 AE_FILE_EVENTS ,那么处理文件事件。
    *
    * if flags has AE_TIME_EVENTS set, time events are processed.
    * 如果 flags 包含 AE_TIME_EVENTS ,那么处理时间事件。
    *
    * if flags has AE_DONT_WAIT set the function returns ASAP until all
    * the events that's possible to process without to wait are processed.
    * 如果 flags 包含 AE_DONT_WAIT ,
    * 那么函数在处理完所有不许阻塞的事件之后,即刻返回。
    *
    * The function returns the number of events processed.
    * 函数的返回值为已处理事件的数量
    */
    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
    * file events to process as long as we want to process time
    * events, in order to sleep until the next time event is ready
    * to fire.
    */
    if (eventLoop->maxfd != -1 ||
    ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
    int j;
    aeTimeEvent *shortest = NULL;
    struct timeval tv, *tvp;

    // 获取最近的时间事件
    if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
    shortest = aeSearchNearestTimer(eventLoop);
    if (shortest) {
    // 如果时间事件存在的话
    // 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间
    long now_sec, now_ms;

    /* Calculate the time missing for the nearest
    * timer to fire. */
    // 计算距今最近的时间事件还要多久才能达到
    // 并将该时间距保存在 tv 结构中
    aeGetTime(&now_sec, &now_ms);
    tvp = &tv;
    tvp->tv_sec = shortest->when_sec - now_sec;
    if (shortest->when_ms < now_ms) {
    tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
    tvp->tv_sec --;
    } else {
    tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
    }

    // 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)
    if (tvp->tv_sec < 0) tvp->tv_sec = 0;
    if (tvp->tv_usec < 0) tvp->tv_usec = 0;
    } else {

    // 执行到这一步,说明没有时间事件
    // 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度

    /* If we have to check for events but need to return
    * ASAP because of AE_DONT_WAIT we need to set the timeout
    * to zero */
    if (flags & AE_DONT_WAIT) {
    // 设置文件事件不阻塞
    tv.tv_sec = tv.tv_usec = 0;
    tvp = &tv;
    } else {
    /* Otherwise we can block */
    // 文件事件可以阻塞直到有事件到达为止
    tvp = NULL; /* wait forever */
    }
    }

    // 处理文件事件,阻塞时间由 tvp 决定
    numevents = aeApiPoll(eventLoop, tvp);
    for (j = 0; j < numevents; j++) {
    // 从已就绪数组中获取事件
    aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

    int mask = eventLoop->fired[j].mask;
    int fd = eventLoop->fired[j].fd;
    int rfired = 0;

    /* note the fe->mask & mask & ... code: maybe an already processed
    * event removed an element that fired and we still didn't
    * processed, so we check if the event is still valid. */
    // 读事件
    if (fe->mask & mask & AE_READABLE) {
    // rfired 确保读/写事件只能执行其中一个
    rfired = 1;
    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
    }
    // 写事件
    if (fe->mask & mask & AE_WRITABLE) {
    if (!rfired || fe->wfileProc != fe->rfileProc)
    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
    }

    processed++;
    }
    }

    /* Check time events */
    // 执行时间事件
    if (flags & AE_TIME_EVENTS)
    processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
    }