Redis 应用

环境准备

搭建环境

1
2
docker run --name myredis -d -p6379:6379 redis
docker exec -it myredis redis-cli

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# exists 查看某个key是否存在
exists aa
SET 创建一个key;
GET 获取一个key的值;
DEL ***一个key;
TYPE 获取一个key的类型;
EXISTS 判断一个key是否存在,0:存在,1,不存在;
# KEYS 获取给定模糊匹配的key,但要谨慎使用,因为线上的key一般非常多
keys *
keys a?
EXPIRE 设置一个key过期的秒数;
PERSTST ***一个key过期的秒数;
PEXPIRE 设置一个key过期的毫秒数;
RENAME 将一个key重命名;
RENAMENX 将一个key重命名,且新的key必须是不存在的可以;
TTL 获取key的有效时间,以秒为单位,-1表示永不过期,-2表示已过期、已转移
# dbsize 查看当前库中key数量
dbsize
# flushdb 清除数据库(内存)
flushdb
# move 移动key到另一个库
move aa 2

容量预估

在实际部署前一般都会先对所需容量进行一个评估,这样可以尽量避免在上线后容量不够还要扩容、或者容量过大造成浪费。
官方提供了一个容量预估工具,一些博客比如Redis 容量评估模型贴近 Redis 底层数据结构给出了容量的评估分析,可以作为一个参考,但是业务架构一直在变,实际的容量监控还是必须的,我们下面还会谈到这方面的工具。

使用Redis Cluster

搭建Redis Cluster集群

redis集群搭建

1、安装
到官网找到:

1
2
wget http://download.redis.io/releases/redis-4.0.8.tar.gz
make && make install # 默认安装目录为/usr/local/bin

ruby

1
2
yum install ruby
yum install rubygems

还有gem文件在此处下载,安装:

1
gem install /usr/local/redis-3.0.0.gem

2、创建 redis 节点
在一个目录(比如编译目录)下创建 redis_cluster 目录,再在这个目录下创建 7001、7002、7003、7004、7005、7006 的子目录,拷贝配置文件 redis.conf 到各个这些子目录中,并编辑以下内容

1
2
3
4
5
6
7
8
9
port 7001 //端口7001,7002,7003        
bind 本机ip //默认ip为127.0.0.1 需要改为其他节点机器可访问的ip 否则创建集群时无法访问对应的端口,无法创建集群
daemonize yes //redis后台运行
pidfile /var/run/redis_7001.pid //pidfile文件对应7001,7002,7003
logfile /tmp/redis_7001.log // 日志文件
cluster-enabled yes //开启集群 把注释#去掉
cluster-config-file nodes_7001.conf //集群的配置 配置文件首次启动自动生成 7001,7002,7003
cluster-node-timeout 15000 //请求超时 默认15秒,可自行设置
appendonly yes //aof日志开启 有需要就开启,它会每次写操作都记录一条日志

3、创建集群
先安装 ruby,因为 redis 的集群协调程序是用 ruby 写的

1
yum -y install ruby ruby-devel rubygems rpm-build

再安装gem,在编译目录下执行

1
gem install redis

运行每个redis实例:

1
redis-server redis.conf

复制编译目录下的src目录中的redis-trib.rb到/usr/local/bin,然后运行
在编译目录的src子目录下执行,其中host为各redis节点的绑定ip(如果绑定的ip是0.0.0.0则必须指定为对外开放的ip,否则会默认绑定127.0.0.1,在slot重定向时会报错),设置每个主分片有一个副本分片

1
./redis-trib.rb create --replicas 1 host1:port1 host2:port2 ...

4、测试
为了连到集群上,需要在 redis-cli 请求后加上-c 参数,比如

1
redis-cli -h 192.168.31.245 -c -p 7002

在普通set和get时,redis会自动计算出目标地址。
5、Java客户端连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class RedissonTest {

private static final Random random = new Random();

static int succeed = 0;

static int failed = 0;

public static void main(String[] args) throws InterruptedException {
Config config = new Config();
config.useClusterServers()
.setScanInterval(2000)
.addNodeAddress("redis://127.0.0.1:7001", "redis://127.0.0.1:7002", "127.0.0.1:7003")
.addNodeAddress("redis://127.0.0.1:7004", "redis://127.0.0.1:7005", "127.0.0.1:7006");
RedissonClient redissonClient = Redisson.create(config);
while (true) {
try {
RBucket<Object> bucket = redissonClient.getBucket(Integer.toString(random.nextInt()));
bucket.get();
succeed++;
log.info("调用成功, 当前 succeed:{}, failed:{}", succeed, failed);
} catch (Exception e) {
failed++;
log.info("调用失败, 当前 succeed:{}, failed:{}", succeed, failed, e.getMessage());
}
Thread.sleep(500);
}
}
}

info 命令中涉及集群部署的部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Replication
# 当前副本角色,如果实例不是任何节点的从节点,则该值是“master”,如果实例从某个节点同步数据,则是“slave”
role:master
# 已连接的从节点数
connected_slaves:2
# 每个从节点的信息,包括ID、地址、端口号、状态
slave0:ip=10.32.140.18,port=6222,state=online,offset=1745391794554,lag=0
slave1:ip=10.32.140.15,port=6212,state=online,offset=1745391807778,lag=0
master_replid:6a64bcbbcae91324e72e24745392275e2f1382ea
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:1745391878919
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:16777216
repl_backlog_first_byte_offset:1745375101704
repl_backlog_histlen:16777216

cluster info 命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
10.32.140.15:6212> cluster info
# ok状态表示集群可以正常接受查询请求。fail 状态表示,至少有一个哈希槽没有被绑定(说明有哈希槽没有被绑定到任意一个节点),或者在错误的状态(节点可以提供服务但是带有FAIL 标记),或者该节点无法联系到多数master节点。
cluster_state:ok
# 已分配到集群节点的哈希槽数量(不是没有被绑定的数量)。16384个哈希槽全部被分配到集群节点是集群正常运行的必要条件.
cluster_slots_assigned:16384
# 哈希槽状态不是FAIL 和 PFAIL 的数量
cluster_slots_ok:16384
# 哈希槽状态是 PFAIL的数量。只要哈希槽状态没有被升级到FAIL状态,这些哈希槽仍然可以被正常处理。PFAIL状态表示我们当前不能和节点进行交互,但这种状态只是临时的错误状态
cluster_slots_pfail:0
# 哈希槽状态是FAIL的数量。如果值不是0,那么集群节点将无法提供查询服务,除非cluster-require-full-coverage被设置为no
cluster_slots_fail:0
# 集群中节点数量,包括处于握手状态还没有成为集群正式成员的节点
cluster_known_nodes:9
# 至少包含一个哈希槽且能够提供服务的master节点数量
cluster_size:3
# 集群本地Current Epoch变量的值。这个值在节点故障转移过程时有用,它总是递增和唯一的
cluster_current_epoch:3
# 当前正在使用的节点的Config Epoch值. 这个是关联在本节点的版本值
cluster_my_epoch:2
cluster_stats_messages_ping_sent:27870482
cluster_stats_messages_pong_sent:27072640
cluster_stats_messages_meet_sent:4
cluster_stats_messages_sent:54943126
cluster_stats_messages_ping_received:27072636
cluster_stats_messages_pong_received:27865273
cluster_stats_messages_meet_received:4
cluster_stats_messages_fail_received:4
cluster_stats_messages_publish_received:7514286
cluster_stats_messages_received:62452203

cluster nodes

1
2
3
4
5
6
7
8
# 节点ID,IP地址:端口号,标识,上一次发送 ping 包的时间,上一次收到 pong 包的时间,连接状态,节点使用的哈希槽
127.0.0.1:7001> cluster nodes
fcdcafe5482daa80d0a382f675e8caced2d6ce63 127.0.0.1:7001@17001 myself,slave b2d0be87492d75bce14d3e50f687ce8a7872ef73 0 1603765953000 7 connected
75d6d1e3d4d64eb6fb85d1ac7d883ecef4ac5e7e 127.0.0.1:7002@17002 master - 0 1603765955026 2 connected 5461-10922
b2d0be87492d75bce14d3e50f687ce8a7872ef73 127.0.0.1:7004@17004 master - 0 1603765952012 7 connected 0-5460
8bf573a595a3955293e37f2e34e20c4cfb469060 127.0.0.1:7003@17003 master - 0 1603765954021 3 connected 10923-16383
5d599dea56108003633cd27d13abf87a9ef07d52 127.0.0.1:7005@17005 slave 75d6d1e3d4d64eb6fb85d1ac7d883ecef4ac5e7e 0 1603765953000 2 connected
68e6ddbf689bc6d51ebfadd24064fc6cc8204210 127.0.0.1:7006@17006 slave 8bf573a595a3955293e37f2e34e20c4cfb469060 0 1603765954000 3 connected

测试结果预期

测试一些Cluster宕机的情况,预期会有以下结论:

  1. 关闭任意一主,会导致部分写操作失败,是由于从节点不能执行写操作,在Slave升级为Master期间会有少量的失败。
  2. 关闭从节点对于整个集群没有影响。
  3. 如果半数以上 Master 处于关闭状态那么整个集群处于不可用状态。
    原因:Redis Cluster的选举需要有Master参与,如果多半的Master都挂掉了,也就不能再支持选举新Master了。
  4. 关闭任意一对主从节点会导致部分(大约为整个集群的1/3)失败。
    Master宕机了,且没有替补的Slave,则分配给这个Master的slot就不可用了。

测试 - 压测

测试 - 宕机1台Master

下面的命令将7001干掉后测试集群的主从迁移情况。
刚开始集群7001、7002、7003是Master:

1
2
3
4
5
6
7
127.0.0.1:7001> cluster nodes
5d599dea56108003633cd27d13abf87a9ef07d52 127.0.0.1:7005@17005 slave 75d6d1e3d4d64eb6fb85d1ac7d883ecef4ac5e7e 0 1603615592617 2 connected
fcdcafe5482daa80d0a382f675e8caced2d6ce63 127.0.0.1:7001@17001 myself,master - 0 1603615591000 1 connected 0-5460
b2d0be87492d75bce14d3e50f687ce8a7872ef73 127.0.0.1:7004@17004 slave fcdcafe5482daa80d0a382f675e8caced2d6ce63 0 1603615591614 1 connected
8bf573a595a3955293e37f2e34e20c4cfb469060 127.0.0.1:7003@17003 master - 0 1603615591000 3 connected 10923-16383
75d6d1e3d4d64eb6fb85d1ac7d883ecef4ac5e7e 127.0.0.1:7002@17002 master - 0 1603615592000 2 connected 5461-10922
68e6ddbf689bc6d51ebfadd24064fc6cc8204210 127.0.0.1:7006@17006 slave 8bf573a595a3955293e37f2e34e20c4cfb469060 0 1603615593620 3 connected

将7001 kill掉后,请求7001服务器的请求都会失败,大约20秒后请求恢复,且观察日志可以发现,原来7001的Slave-7004现在替补上来成为了Master:

1
2
3
4
5
6
7
8
9
10
11
18:29:28.186 [main] INFO  c.t.l.r.RedissonTest - 调用成功, 当前 succeed:15, failed:0
18:29:33.475 [main] INFO c.t.l.r.RedissonTest - 调用失败, 当前 succeed:15, failed:1
18:29:38.750 [main] INFO c.t.l.r.RedissonTest - 调用失败, 当前 succeed:15, failed:2
18:29:39.253 [main] INFO c.t.l.r.RedissonTest - 调用成功, 当前 succeed:16, failed:2
18:29:39.756 [main] INFO c.t.l.r.RedissonTest - 调用成功, 当前 succeed:17, failed:2
18:29:40.259 [main] INFO c.t.l.r.RedissonTest - 调用成功, 当前 succeed:18, failed:2
18:29:40.763 [main] INFO c.t.l.r.RedissonTest - 调用成功, 当前 succeed:19, failed:2
18:29:45.271 [redisson-netty-2-4] INFO o.r.c.MasterSlaveEntry - master 127.0.0.1/127.0.0.1:7004 used as slave
18:29:45.274 [redisson-netty-2-14] INFO o.r.c.p.PubSubConnectionPool - 1 connections initialized for 127.0.0.1/127.0.0.1:7004
18:29:45.280 [redisson-netty-2-4] WARN o.r.c.ClusterConnectionManager - slave: redis://127.0.0.1:7001 has down for slot ranges: [[0-5460]]
18:29:45.285 [redisson-netty-2-28] INFO o.r.c.p.SlaveConnectionPool - 24 connections initialized for 127.0.0.1/127.0.0.1:7004

之后重启7001后,发现7001重新加入到了集群中:

1
2
3
4
18:34:18.539 [redisson-netty-2-29] INFO  o.r.c.p.PubSubConnectionPool - 1 connections initialized for 127.0.0.1/127.0.0.1:7001
18:34:18.542 [redisson-netty-2-4] INFO o.r.c.MasterSlaveEntry - master 127.0.0.1/127.0.0.1:7004 excluded from slaves
18:34:18.542 [redisson-netty-2-4] INFO o.r.c.ClusterConnectionManager - slave: redis://127.0.0.1:7001 has up for slot ranges: [[0-5460]]
18:34:18.544 [redisson-netty-2-6] INFO o.r.c.p.SlaveConnectionPool - 24 connections initialized for 127.0.0.1/127.0.0.1:7001

测试 - 宕机2台Master

把两台Master干掉后,两个Master均进入fail状态,这时集群也会进入fail状态,选举不会成功。

1
2
3
4
5
6
7
localhost:7001> cluster nodes
fcdcafe5482daa80d0a382f675e8caced2d6ce63 127.0.0.1:7001@17001 myself,slave b2d0be87492d75bce14d3e50f687ce8a7872ef73 0 1603622378000 7 connected
75d6d1e3d4d64eb6fb85d1ac7d883ecef4ac5e7e 127.0.0.1:7002@17002 master - 0 1603622381510 2 connected 5461-10922
b2d0be87492d75bce14d3e50f687ce8a7872ef73 127.0.0.1:7004@17004 master - 0 1603622379504 7 connected 0-5460
8bf573a595a3955293e37f2e34e20c4cfb469060 127.0.0.1:7003@17003 master - 0 1603622380507 3 connected 10923-16383
5d599dea56108003633cd27d13abf87a9ef07d52 127.0.0.1:7005@17005 slave 75d6d1e3d4d64eb6fb85d1ac7d883ecef4ac5e7e 0 1603622378498 2 connected
68e6ddbf689bc6d51ebfadd24064fc6cc8204210 127.0.0.1:7006@17006 slave 8bf573a595a3955293e37f2e34e20c4cfb469060 0 1603622380000 3 connected

集群fail后,客户端之后的请求也都失败了。

分布式锁

1
2
3
set key value ex nx
do sth...
del key
  1. 为什么要加过期时间?
    怕中间遇到异常退出,del 没有执行,导致陷入死锁。
    set 命令现在支持加 ex 和 nx 参数,既保证原子性,又支持加过期时间。
  2. 过早过期释放别人的锁
    业务执行时间过长,锁过期了,可能导致其他线程先获取到了锁,这样当前线程 del 的时候相当于将别人的锁释放掉了。
    可以给 value 设置一个随机数,释放前检查一下,这是个“读后写”的过程,为了保证其原子性,一般会使用 Lua 脚本来实现(类似 Redisson 中的实现)。
    Redisson 中的解决方案是加一个“看门狗”,定时刷新过期时间。
  3. 重入性
    为了实现可重入性,一般是在客户端使用 ThreadLocal 存储当前持有锁的计数。
    在 Redisson 中,可重入锁是通过在 Lua 脚本中对客户端线程 ID 进行计数来实现的。

延时队列

异步消息队列

可以使用 list 数据结构来实现异步消息队列,使用 rpush/lpush 操作入队列,使用 lpop 和 rpop 来出队列。

1
2
3
4
5
6
rpush notify-queue a b
llen notify-queue
lpop notify-queue
llen notify-queue
lpop notify-queue
...
  1. 如果队列空了怎么办?
    客户端通过轮询队列 pop 获取消息处理,如果队列空了,那么就会陷入 pop 的死循环。
    一般的解决办法是sleep,每次 pop 后可以暂停个 1 秒。
    但是sleep同样会带来延迟增大的问题,因此,更好的解决办法是blpop / brpop,即阻塞读:
    1
    2
    // 等待一秒若还没有数据则直接返回
    blpop notify-queue 1
  2. 空闲连接
    如果线程一直阻塞,则 Redis 的客户端连接将成为闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用。这个时候 blpop/brpop 会抛出异常来,这时要注意对异常的捕获和重试
  3. 锁冲突处理
    加锁失败一般有 3 种处理方式:
    • 直接抛出异常,由用户稍后重试;
      适合由用户直接发起的请求,比如商城下单,下单失败后由用户决定是否重新请求。
    • sleep
      sleep 会阻塞消息处理线程,会导致后续消息处理出现延迟,不适合高并发(锁冲突频繁)或队列消息较多的情况,并且,如果是由于死锁导致的加锁不成功,sleep 将导致线程一直处于阻塞状态、后续的消息永远得不到处理。
    • 延时队列
      将当前冲突的请求扔到另一个队列延后处理以避开冲突,适合异步消息处理的场景。

下面的实现例子来自《Redis 深度历险》:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public class RedisDelayingQueue<T> {

static class TaskItem<T> {
public String id;
public T msg;
}

// fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference
private Type TaskType = new TypeReference<TaskItem<T>>() {
}.getType();

private Jedis jedis;
private String queueKey;

public RedisDelayingQueue(Jedis jedis, String queueKey) {
this.jedis = jedis;
this.queueKey = queueKey;
}

public void delay(T msg) {
TaskItem<T> task = new TaskItem<T>();
task.id = UUID.randomUUID().toString();
task.msg = msg;
String s = JSON.toJSONString(task);
// key是当前时间
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试
}

public void loop() {
while (!Thread.interrupted()) {
// 取时间范围是0到当前时间内的一条记录
// 第3个参数0表示不记分数,
Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
if (values.isEmpty()) {
try {
// 暂停一会,避免浪费CPU
Thread.sleep(500);
} catch (InterruptedException e) {
break;
}
continue;
}
String s = values.iterator().next();
// 说明抢到了
if (jedis.zrem(queueKey, s) > 0) {
TaskItem<T> task = JSON.parseObject(s, TaskType);
this.handleMsg(task.msg);
}
}
}

public void handleMsg(T msg) {
System.out.println(msg);
}

public static void main(String[] args) {
Jedis jedis = new Jedis();
RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
Thread producer = new Thread() {

public void run() {
for (int i = 0; i < 10; i++) {
queue.delay("codehole" + i);
}
}

};
Thread consumer = new Thread() {

public void run() {
queue.loop();
}

};
producer.start();
consumer.start();
try {
producer.join();
Thread.sleep(6000);
consumer.interrupt();
consumer.join();
} catch (InterruptedException e) {
}
}
}

一些优化点:

  1. 注意zrangeByScorezrem不是一个原子操作,可能会有多个线程争抢同一个 key,这可以通过 lua 脚本来优化。
  2. 使用 Redis 作为消息队列并不能保证 100%的可靠性,因为 rem 执行后如果客户端崩溃了消息就丢失了,或者 Redis 崩溃了消息也有可能丢失(这个可以通过主备来避免)。

阻塞队列

使用Redis实现队列可以利用lpop/rpush或rpop/lpush,但是这两组命令在遇到队列为空或满时还是会直接返回,如果要实现阻塞队列的阻塞等待能力,可以:
1、使用lua脚本轮询
lua脚本中先检测队列是否为空/满,在不空/不满的情况下才执行后续的操作。
如果为空/满,则客户端先等待一会再执行一次该lua脚本。
缺点是会有很多空轮询。
2、使用blpop命令
在list结构尚不存在元素的情况下,blpop命令会先将客户端挂起,等待:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void blockForKeys(redisClient *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {

...

// 关联阻塞客户端和键的相关信息
for (j = 0; j < numkeys; j++) {

/* If the key already exists in the dict ignore it. */
// c->bpop.keys 是一个集合(值为 NULL 的字典)
// 它记录所有造成客户端阻塞的键
// 以下语句在键不存在于集合的时候,将它添加到集合
if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;

incrRefCount(keys[j]);

/* And in the other "side", to map keys -> clients */
// c->db->blocking_keys 字典的键为造成客户端阻塞的键
// 而值则是一个链表,链表中包含了所有被阻塞的客户端
// 以下程序将阻塞键和被阻塞客户端关联起来
de = dictFind(c->db->blocking_keys,keys[j]);
if (de == NULL) {
// 链表不存在,新创建一个,并将它关联到字典中
int retval;

/* For every key we take a list of clients blocked for it */
l = listCreate();
retval = dictAdd(c->db->blocking_keys,keys[j],l);
incrRefCount(keys[j]);
redisAssertWithInfo(c,keys[j],retval == DICT_OK);
} else {
l = dictGetVal(de);
}
// 将客户端填接到被阻塞客户端的链表中
listAddNodeTail(l,c);
}
blockClient(c,REDIS_BLOCKED_LIST);
}

执行指令结束后,处理解除了阻塞的键。
redis.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int processCommand(redisClient *c) {

...

/* Exec the command */
if (c->flags & REDIS_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
// 在事务上下文中
// 除 EXEC 、 DISCARD 、 MULTI 和 WATCH 命令之外
// 其他所有命令都会被入队到事务队列中
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
// 执行命令
call(c,REDIS_CALL_FULL);

c->woff = server.master_repl_offset;
// 处理那些解除了阻塞的键
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}

return REDIS_OK;
}

可以看到,执行命令的末尾需要处理解除了阻塞的键,遍历这些键然后唤醒等待的客户端。
t_list.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
/* 这个函数会在 Redis 每次执行完单个命令、事务块或 Lua 脚本之后调用。
*
* 对所有被阻塞在某个客户端的 key 来说,只要这个 key 被执行了某种 PUSH 操作
* 那么这个 key 就会被放到 serve.ready_keys 去。
*
* 这个函数会遍历整个 serve.ready_keys 链表,
* 并将里面的 key 的元素弹出给被阻塞客户端,
* 从而解除客户端的阻塞状态。
*
* 函数会一次又一次地进行迭代,
* 因此它在执行 BRPOPLPUSH 命令的情况下也可以正常获取到正确的新被阻塞客户端。
*/
void handleClientsBlockedOnLists(void) {

// 遍历整个 ready_keys 链表
while(listLength(server.ready_keys) != 0) {
list *l;

/* Point server.ready_keys to a fresh list and save the current one
* locally. This way as we run the old list we are free to call
* signalListAsReady() that may push new elements in server.ready_keys
* when handling clients blocked into BRPOPLPUSH. */
// 备份旧的 ready_keys ,再给服务器端赋值一个新的
l = server.ready_keys;
server.ready_keys = listCreate();

while(listLength(l) != 0) {

// 取出 ready_keys 中的首个链表节点
listNode *ln = listFirst(l);

// 指向 readyList 结构
readyList *rl = ln->value;

/* First of all remove this key from db->ready_keys so that
* we can safely call signalListAsReady() against this key. */
// 从 ready_keys 中移除就绪的 key
dictDelete(rl->db->ready_keys,rl->key);

/* If the key exists and it's a list, serve blocked clients
* with data. */
// 获取键对象,这个对象应该是非空的,并且是列表
robj *o = lookupKeyWrite(rl->db,rl->key);
if (o != NULL && o->type == REDIS_LIST) {
dictEntry *de;

/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
// 取出所有被这个 key 阻塞的客户端
de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
int numclients = listLength(clients);

while(numclients--) {
// 取出客户端
listNode *clientnode = listFirst(clients);
redisClient *receiver = clientnode->value;

// 设置弹出的目标对象(只在 BRPOPLPUSH 时使用)
robj *dstkey = receiver->bpop.target;

// 从列表中弹出元素
// 弹出的位置取决于是执行 BLPOP 还是 BRPOP 或者 BRPOPLPUSH
int where = (receiver->lastcmd &&
receiver->lastcmd->proc == blpopCommand) ?
REDIS_HEAD : REDIS_TAIL;
robj *value = listTypePop(o,where);

// 还有元素可弹出(非 NULL)
if (value) {
/* Protect receiver->bpop.target, that will be
* freed by the next unblockClient()
* call. */
if (dstkey) incrRefCount(dstkey);

// 取消客户端的阻塞状态
unblockClient(receiver);

// 将值 value 推入到造成客户端 receiver 阻塞的 key 上
if (serveClientBlockedOnList(receiver,
rl->key,dstkey,rl->db,value,
where) == REDIS_ERR)
{
/* If we failed serving the client we need
* to also undo the POP operation. */
listTypePush(o,value,where);
}

if (dstkey) decrRefCount(dstkey);
decrRefCount(value);
} else {
// 如果执行到这里,表示还有至少一个客户端被键所阻塞
// 这些客户端要等待对键的下次 PUSH
break;
}
}
}

// 如果列表元素已经为空,那么从数据库中将它删除
if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
/* We don't call signalModifiedKey() as it was already called
* when an element was pushed on the list. */
}

/* Free this item. */
decrRefCount(rl->key);
zfree(rl);
listDelNode(l,ln);
}
listRelease(l); /* We have the new list on place at this point. */
}
}

位图

场景

位图数据结构与 java 中的 Set 类似,占用空间小,但是并不能防止冲突,适合数据离散性比较大且对数据准确性不高的场景。

  1. 统计月活
    统计月活的时候我们需要对 userId 进行去重,每个用户就可以定位到这个位图上的一个确定的位置上,0 表示不活跃,1 表示活跃,遍历一次就可以知道月活用户数有多少。

使用

1
2
3
4
set s a
getbit s 2
setbit s 6 1
get s

注意下标并不是从低位到高位递增的,而是反过来的,a 的 ASCII 码值是01100001setbit s 6 1设置第 7 位为 1 后值就变成了 c。

1
2
3
4
5
6
// 指定范围内1的个数
bitcount s 0 7
// 第一个1的位置
bitpos s 1
// 下标0到7范围内第一个1的位置
bitpos s 1 0 7

比如用位数组记录用户登录的日期,bitcount 可以用于统计用户一共签到了多少天,bitpos 可以用于查找用户从哪一天开始第一次签到。

1
2
3
4
5
6
// 从第一个位(0)开始取4个位,结果是无符号数(u)
bitfield s get u4 0
// 从第三个位(2)开始取3个位,结果是有符号数(i)
bitfield s get i3 2
// 一次性执行多个子命令
bitfield s get u4 0 get u3 2 get i4 0 get i3 2

如果 s 的值是 c,二进制值是0110 0011,取前 4 位结果是 6(0110)。

1
2
3
4
5
6
// +1,结果为0,因为溢出了
bitfield s incrby u2 1 1
// 不执行,返回nil
bitfield s overflow fail incrby u2 1 1
// 结果为3
bitfield s overflow sat incrby u2 1 1

位的增加操作有 3 种策略:

  1. 默认的 wrap:
  2. fail:失败报错不执行
  3. sat:饱和截断,如果有溢出就停留在最大最小值。

HyperLogLog

HyperLogLog 主要用于大数据量的计数,比如访问频繁的页面需要统计 UV(一天内访问的用户数),不同于 PV,UV 需要去重。
HyperLogLog 只能粗略统计,理论上会有不到 1%的误差。

使用

1
2
3
4
5
6
7
8
pfadd s user1
pfcount s
pfadd s user1
pfadd s user2
// 结果仍然为2
pfcount s
// 将两个计数器累加
pfmerge s b

布隆过滤器

HyperLogLog 可以用于计数,但是不能用于判断一个值是否存在于该结构里,这时最好使用布隆过滤器(Bloom Filter):

  1. 去重;
  2. 支持 contains 判断;
  3. 节省空间;
  4. 不精确,存在误判的可能(当布隆过滤器说某个值存在时,这个值可能不存在,当它说不存在时,那就肯定不存在,这个问题是由 hash 函数的冲突引起的);
  5. 不支持计数。

使用

插入值并且判断该值是否存在于 bf 对象内:

1
2
3
4
5
bf.add s user1
bf.add s user2
bf.madd s user3 user4
bf.exists s user1
bf.mexists s user1 user5

第一次 bf.add 会创建一个默认参数的布隆过滤器,也可以显式创建:

1
2
// 创建一个名为key的布隆过滤器,错误率0.1,预计放入的元素数量为10
bf.reserve key 0.1 10
  • 错误率越低,需要的空间越大,默认值为 0.01
    对于不需要非常精确的场合,错误率设置得稍大一点也无伤大雅。
  • 当实际数量超出预计放入数量时,误判率会上升,默认值为 100
    该值设置得过大,会浪费存储空间,估计得过小,就会影响准确率,最理想的情况下是略大于实际元素数量。

当实际元素数超出初始化大小时,应该对布隆过滤器进行重建,重新分配一个 size 更大的过滤器,再将所有的历史元素批量 add 进去(比如如果布隆过滤器保存的是 userId,那么就需要把历史 userId 重新保存到一个新的布隆过滤器中)。

原理

布隆过滤器本身是一个大型的位数组和几个不同的无偏 hash 函数。

无偏指的是该 hash 函数能把值分布得比较均匀。

  • add:使用这些 hash 函数计算 hash(key)%length(bit_arr),每个 hash 函数都可以得到一个位置,将这些位置置为 1;
  • exists:同样用这些 hash 函数计算位置,如果有一个不为 1 说明 key 不存在,如果都为 1 也不一定说明 key 一定存在,因为这些位置可能会被其他 key 设置到。

GeoHash

一种简单的方法

找一个节点附近的所有节点,可以近似看做以该节点为中心的一个矩形范围内有哪些节点,可以用类似下面的 SQL 来查询:

1
select id from positions where x0-r < x < x0+r and y0-r < y < y0+r

对 x 和 y 字段加索引后该 SQL 的性能也不会太差。

GeoHash

GeoHash 的基本思想是“降维”,将二维坐标映射到直线上的一个点,要寻找二维平面上的“附近的人”就相当于在直线上找相邻的点。
GeoHash 本质是一个 zset(带 score 的 set),底层结构是 skiplist:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
geoadd company 116.489033 40.007669 meituan
geoadd company 116.562108 39.787602 jd 116.334255 40.027400 xiaomi
// geoadd指令添加一个节点,因为geo存储结构上使用的是zset,因此可以使用zset相关的指令来操作,比如可以使用zrem指令来删除
zrem company jd
geoadd company 116.562108 39.787602 jd
// 计算两个节点之间的距离
geodist company meituan jd
// 单位km
geodist company meituan jd km
// 获取节点位置
geopos company meituan jd
// 获取元素的hash值
geohash company meituan
// 查询指定元素附近的其他元素
// 范围20公里内最多3个元素按距离正序排列,
georadiusbymember company meituan 20 km count 3 asc

GeoHash 算法的执行流程如下:

  1. 将坐标编码为一个 52 位整数,这个整数也可以还原为原坐标;
  2. zset 的 value 是元素的 key(即上面的 meituan、jd 等),score 是 GeoHash 的 52 位整数值,通过 zset 的 score 排序即可得到坐标附近的其它元素。

其中主要问题是坐标是如何编码的,编码后为什么可以通过比较大小来判断是否相邻?
GeoHash 算法的原理是按经纬度区间对半分来进行编码,比如,维度的区间是(-90, 90),如果坐标的维度值大于 0,则记 1,否则记 0,进一步的,如果坐标维度大于 45,则记 1,否则记 0,以此类推,对于39.918118来说,最后的编码值就是10111000110001011011

一般实现中还会进行 BASE64 编码,本质是一样的,只是编码后占用的空间更小了。

Scan

扫描大量 key 找到目标 key 的需求有两种实现方法:

  1. keys 命令
    不支持分页,一次性吐出所有满足条件的 key,如果数据量过大、耗时过长,可能导致服务端卡顿。

    因为 Redis 是单线程模型。

  2. scan
    和 keys 命令一样提供模式匹配,但它是通过游标扫描的,每次只扫描指定数量的数据,并将其中匹配的结果返回。

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 从cursor=0开始,匹配"key99*",共扫描1000条
scan 0 match key99* count 1000
1) "11928"
2) 1) "key9956"
2) "key9993"
3) "key9911"
4) "key996"
5) "key9933"
6) "key9962"
7) "key991"
8) "key9981"
9) "key9990"
10) "key9946"
11) "key9971"
12) "key99"
13) "key9951"

返回值中,第一个”11928”是结果中最后一条的 cursor,下一次遍历时可以使用该值作为初始 cursor。

1
2
3
scan 11928 match key99* count 10
1) "3480"
2) (empty list or set)

结果为空集合并不意味着遍历结束了,只有 cursor=0 才是遍历结束的标识。

1
2
3
4
5
6
7
8
scan 11928 match key99* count 10000
1) "0"
2) 1) "key9952"
2) "key9961"
3) "key9988"
4) "key9931"
5) "key9998"
...忽略更多的

原理

Redis 中所有 key 都存储在一个非常大的字典中,这个字典的结构和 Java 的 HashMap 类似:
Redis-字典

  1. 一维数组大小为 2^n(n >= 0),扩容一次大小翻倍,保存的是所有 key 的下标,或者称为槽(slot);
  2. 二维链表保存的是所有的 key,不同 key 是有可能被 hash 到同一个槽上的,这时这个槽里所有元素都会被模式匹配过滤后一次性返回。

scan 的遍历并不是从 0 开始递增,而是通过二进制高位进位加法来遍历,比如遍历顺序:

  1. 0000 -> 0
  2. 1000 -> 8
  3. 0100 -> 4
  4. 1100 -> 12
  5. 以此类推

Redis-高位递增遍历
这种遍历方式的好处是可以避免扩容缩容后相同元素被反复遍历到。
因为槽数组的长度总是 2 的 n 次方,因此取模运算等价于位与操作,比如原来数组长度为 8,15(1111)会被 hash 到 7 号槽,而扩容后数组长度变成 16,增加了一个高位的 1,15(1111)会被 hash 到 15 号槽。
因此从低到高的遍历方式可能会导致重复遍历,而从高到低的方式则可以避免。

Redis 的 rehash 是渐进式的,在 rehash 的过程中,操作需要同时访问新旧的两个数组结构,如果旧的找不到就需要再到新的下面去找,scan 同理。

大 key 问题

大 key 会导致数据迁移和扩容时需要分配更大的一块内存空间,导致卡顿,删除时,同样需要一次性回收大 key,卡顿会再一次产生。
为了定位大 key,可以使用 redis-cli 的扫描功能

1
2
3
4
redis-cli -h 127.0.0.1 -p 7001 –-bigkeys
# 如果担心这个指令会大幅抬升 Redis 的 ops 导致线上报警,还可以增加一个休眠参数
# 每隔100条scan指令就会休眠0.1s
redis-cli -h 127.0.0.1 -p 7001 –-bigkeys -i 0.1