Redisson

与 Jedis 的区别

  1. Jedis 提供了对 Redis-API 的简单封装,使用 Jedis 时,需要关注 Redis 服务器的部署细节,而 Redisson 屏蔽了这些细节,使得使用者可以将精力更集中地放到自己希望实现的功能上。
  2. Jedis 只提供简单的 API 调用,并不关注用户如何使用这些 API,比如 string 可以实现原子变量,不过需要用户手动封装,而 Redisson 中已经有了现成的 AtomicLong。
  3. Jedis 不支持 Cluster 环境下的事务、Lua

Sentinel 模式获取连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
public class RedissonSentinelConnectionTest {

RedissonClient redisson;
RedisSentinelConnection connection;
RedisRunner.RedisProcess master;
RedisRunner.RedisProcess slave1;
RedisRunner.RedisProcess slave2;
RedisRunner.RedisProcess sentinel1;
RedisRunner.RedisProcess sentinel2;
RedisRunner.RedisProcess sentinel3;

@Before
public void before() throws FailedToStartRedisException, IOException, InterruptedException {
master = new RedisRunner()
.nosave()
.randomDir()
.run();
slave1 = new RedisRunner()
.port(6380)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
slave2 = new RedisRunner()
.port(6381)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
sentinel1 = new RedisRunner()
.nosave()
.randomDir()
.port(26379)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
sentinel2 = new RedisRunner()
.nosave()
.randomDir()
.port(26380)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
sentinel3 = new RedisRunner()
.nosave()
.randomDir()
.port(26381)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();

Thread.sleep(5000);

Config config = new Config();
config.useSentinelServers()
.setLoadBalancer(new RandomLoadBalancer())
.addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster");
redisson = Redisson.create(config);

RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
connection = factory.getSentinelConnection();
}

@After
public void after() {
sentinel1.stop();
sentinel2.stop();
sentinel3.stop();
master.stop();
slave1.stop();
slave2.stop();

redisson.shutdown();
}

@Test
public void testMasters() {
Collection<RedisServer> masters = connection.masters();
assertThat(masters).hasSize(1);
}

@Test
public void testSlaves() {
Collection<RedisServer> masters = connection.masters();
Collection<RedisServer> slaves = connection.slaves(masters.iterator().next());
assertThat(slaves).hasSize(2);
}

@Test
public void testRemove() {
Collection<RedisServer> masters = connection.masters();
connection.remove(masters.iterator().next());
}

@Test
public void testMonitor() {
Collection<RedisServer> masters = connection.masters();
RedisServer master = masters.iterator().next();
master.setName(master.getName() + ":");
connection.monitor(master);
}

@Test
public void testFailover() throws InterruptedException {
Collection<RedisServer> masters = connection.masters();
connection.failover(masters.iterator().next());

Thread.sleep(10000);

RedisServer newMaster = connection.masters().iterator().next();
assertThat(masters.iterator().next().getPort()).isNotEqualTo(newMaster.getPort());
}
}
  1. 确定 Sentinel 集群内的所有节点地址
    创建连接管理器(ConnectionManager)时读取所有 Master、Slave 和 Sentinel 节点的地址(org.redisson.connection.SentinelConnectionManager#SentinelConnectionManager)
    Sentinel 通过监听 master 可以得到所有节点的地址。
    可以从SentinelConnectionManager中看到,客户端会定时(默认1秒)地刷新服务端状态,即使集群暂时不可用,也可以通过这种刷新来恢复连接。
  2. 尝试连接一个 Sentinel
    只要有一个 Sentinel 能通过 PING-PONG 校验,则返回对该 Sentinel 的连接。
  3. 执行操作
    获取连接(org.redisson.command.RedisExecutor#getConnection)。
    如果是只读的操作,会从 slave 中通过负载均衡选一个操作(org.redisson.connection.MasterSlaveConnectionManager#connectionReadOp);
    如果是非只读操作,从 master 里选一个操作(org.redisson.connection.ConnectionManager#connectionWriteOp)。

Cluster 模式获取连接

  1. 添加节点初始化 Redisson
    对于客户端来说,Cluster 模式可以看做几个 Master、Slave 的组合(org.redisson.ClusterRunner#addNode)。
  2. 连接时从节点中选一个
    以Buckets.get操作为例,跟踪代码直到CommandAsyncService#readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    @Override
    public int calcSlot(String key) {
    ...

    // slot的计算方法,注意这里的MAX_SLOT是固定的
    int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
    log.debug("slot {} for {}", result, key);
    return result;
    }

    private NodeSource getNodeSource(String key) {
    // 计算该key属于哪个slot
    int slot = connectionManager.calcSlot(key);
    // 计算该slot属于哪个节点
    MasterSlaveEntry entry = connectionManager.getEntry(slot);
    return new NodeSource(entry);
    }

    @Override
    public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
    RPromise<R> mainPromise = connectionManager.newPromise();
    // 获取key所在的节点
    NodeSource source = getNodeSource(key);
    async(true, source, codec, command, params, mainPromise, 0);
    return mainPromise;
    }
    先计算 key 属于哪个 slot(org.redisson.cluster.ClusterConnectionManager#calcSlot);
    再由 slot 计算应该请求哪对主从(org.redisson.connection.MasterSlaveConnectionManager#getEntry)。
  3. 重连
    Redisson启动后会创建一个定时任务每5秒更新一次节点状态,所以就算节点挂掉了,之后重启时客户端是可以感知到服务器的启动的。
    org.redisson.cluster.ClusterConnectionManager#scheduleClusterChangeCheck

分布式锁

文档:8. 分布式锁和同步器
下面记录一下 Redisson 中对应功能所在的代码位置和基本思路。

锁的特性

  • 互斥性
    任意时刻,只会有一个客户端持有锁
  • 不会发生死锁
    即使客户端在持有锁期间崩溃而没有释放锁,也能保证其他客户端能获取到锁。
  • 容错性
    锁服务的某个节点不可用时,客户端还能继续加解锁。
  • 可重入性
    一个客户端可以重复加锁,期间其他客户端无法获取这个锁。

可重入锁(Reentrant Lock)

测试代码见:org.redisson.RedissonLockTest#testGetHoldCount
源码主要为:org.redisson.RedissonLock

可重入性是通过加锁时传的 threadId 实现的,下面是 Redisson 中用于加锁的 lua 脚本(org.redisson.RedissonLock#tryLockInnerAsync):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-- KEYS[1]: RedissonObject中的name字段,这里表示锁的名字
-- ARGV[1]: leaseTime,过期时间,这里为30000
-- ARGV[2]: 锁的value,这里为threadId

-- 未加过锁的情况
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
-- 当前线程已加过锁的情况
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
-- 其他线程加过锁的情况
"return redis.call('pttl', KEYS[1]);"

所有锁都保存在一个 key 为 lock 的 hash 对象下,第一次加锁时保存的结果为<lock, {threadId}, 1>,过期时间为 30s,同一线程第二次加锁时,更新为<lock, {threadId}, 2>,且过期时间被刷新。
当加锁成功时(包括同一线程调用重入多次)返回 null,而加锁失败时,返回锁的剩余过期时间,根据返回值是否为空可以判断加锁是否成功,当还未获取到锁时,客户端会轮询检查(org.redisson.RedissonLock#lock(long leaseTime, TimeUnit unit, boolean interruptibly)中的 while 循环),也就是说这种加锁方式并不是公平的
加锁监控保证了当业务执行时间超过加锁时间时,不会因为锁过期而让其他线程进入临界区,在 Redisson 中是通过一个 TimerTask 每隔 10s(即加锁时间 / 3)刷新一次锁的过期时间来实现的(org.redisson.RedissonLock#renewExpiration)。

另外,由于加锁时保存了 threadId,unlock时同样会传 threadId、只能释放当前线程加上的锁,下面是用于释放锁的 lua 脚本(org.redisson.RedissonLock#unlockInnerAsync):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-- KEYS[1]: {lockName}
-- KEYS[2]:
-- ARGS[1]: UNLOCK_MESSAGE
-- ARGS[2]: 加锁时间,默认30s
-- ARGS[3]: {threadId}

-- 检查当前线程是否有加该锁
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
-- 计数器-1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
-- 如果计数器未减完,说明重入了多次,且这里刷新了一次过期时间
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
-- 计数器减完,删除该锁,并通知其他正在等待的线程
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;"

公平锁(Fair Lock)

测试代码见:org.redisson.RedissonFairLockTest#testIsLockedOtherThread
源码主要为:org.redisson.RedissonFairLock

公平性是通过队列实现的,(org.redisson.RedissonFairLock#tryLockInnerAsync):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// remove stale threads
"while true do " +
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end;" +

"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
"if timeout <= tonumber(ARGV[4]) then " +
// remove the item from the queue and timeout set
// NOTE we do not alter any other timeout
"redis.call('zrem', KEYS[3], firstThreadId2);" +
"redis.call('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" +

// check if the lock can be acquired now
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +

// remove this thread from the queue and timeout set
"redis.call('lpop', KEYS[2]);" +
"redis.call('zrem', KEYS[3], ARGV[2]);" +

// decrease timeouts for all waiting in the queue
"local keys = redis.call('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
"end;" +

// acquire the lock and set the TTL for the lease
"redis.call('hset', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +

// check if the lock is already held, and this is a re-entry
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
"redis.call('hincrby', KEYS[1], ARGV[2],1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +

// the lock cannot be acquired
// check if the thread is already in the queue
"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +
// the real timeout is the timeout of the prior thread
// in the queue, but this is approximately correct, and
// avoids having to traverse the queue
"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" +

// add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of
// the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the
// threadWaitTime
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;"

联锁(MultiLock)

源码位置:org.redisson.RedissonMultiLock

联锁是对批量加锁的封装,其关键是如何实现死锁避免,其中的关键代码如下(org.redisson.RedissonMultiLock#tryLock(long waitTime, long leaseTime, TimeUnit unit)):

1
2
3
4
5
6
7
8
9
10
11
// 将已经获取的锁释放掉
unlockInner(acquiredLocks);
if (waitTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// 重置iterator,重新加一遍锁
while (iterator.hasPrevious()) {
iterator.previous();
}

红锁(RedLock)

测试代码:org.redisson.RedissonRedLockTest#testLockLeasetime
源码位置:org.redisson.RedissonRedLock

红锁实际上是联锁的子类,原理基本一致,它和联锁的区别主要是:

  • 联锁不允许加锁失败(org.redisson.RedissonMultiLock#failedLocksLimit),而红锁允许少于半数次的加锁失败(org.redisson.RedissonRedLock#failedLocksLimit)。
  • 使用时,红锁的加锁目标最好包含多个 Redis 实例,从而实现高可用。

    如果一个Redis实例加多次锁,那么这个Redis挂掉了就会导致全部加锁请求都失败了。

红锁执行流程

  1. 获取当前时间戳;
  2. 开始获取锁:Client按顺序从每台Redis实例上获取锁;
    注意每台服务器都有一个获取的截止时间,超过一段时间获取不到就放弃,而且这个截止时间要比总的获取锁的TTL时间要短很多,避免由于等待部分已停机的Redis实例时间过长而导致获取锁失败了。
    比如总TTL为5s,那么每台Redis实例的获取时间就可以定为1s。
    因为是顺序获取的,所以每台实例上锁的过期时间也是不一样的。
  3. 怎么样算获取成功:过半数,且未超时
    • 过半数:比如总共有5个Redis实例的情况下,需要有至少3个实例成功获取到锁才算获取成功;
    • 未超时:(总TTL) - (每台服务器获取锁花费的时间之和)需要大于0,如果获取成功,锁的真正有效时间就是这个时间差。
  4. 获取失败释放锁
    不满足获取成功条件的情况下,把之前获取过锁的Redis实例都给释放掉。

锁续期 - 看门狗

看门狗原理,下图来自于这里
红锁-看门狗
加锁时启动定时任务刷新锁的过期时间:
org.redisson.RedissonLock#tryAcquireOnceAsync
-> org.redisson.RedissonLock#scheduleExpirationRenewal
释放锁时关掉该定时任务:
org.redisson.RedissonLock#unlock
-> org.redisson.RedissonLock#cancelExpirationRenewal

可重入性

可重入性原理,下图来自于这里
红锁-可重入性

红锁存在的问题

  1. 一般Redis集群都是多主多从,但是使用多主多从的情况下,锁是加到主服务器上的,而主从复制是异步完成的,如果在客户端获取到锁之后,主复制锁到从的过程中崩溃了,导致没有复制到从Redis中,那么之后即使再选举出一个从升级为主,主服务器里也是没有锁的,并且能够成功被获取到锁,导致互斥失效。
    所以,使用红锁时Redis集群一般都是单节点,而不是主从的。
  2. 5主无从的情况下,如果一个客户端获取到锁之后,所有Redis重启,这时其他客户端又可以获取到锁了,显然违背了锁的互斥原则;如果Redis实例开启了AOF持久化存储,在持久化间隔时间内断电,照样会导致数据丢失。

    显然AOF不能开启Always(每个命令都同步到硬盘),这样会造成性能急剧下降。

读写锁(ReadWriteLock)

TODO

信号量(Semaphore)

TODO

可过期性信号量(PermitExpirableSemaphore)

TODO

闭锁(CountDownLatch)

TODO

异常情况分析

  • 因为主从同步导致锁被重复获取
    Redis集群如果采用Cluster集群或Master-Slave主从复制的方式,就会存在key刚写完Master、但是在同步到Slave之前Master挂掉的情况,这时如果发生主从切换,就有可能会出现多个线程同时持有锁的情况。
  • 因为GC导致锁被重复获取
    如果出现GC停顿时间过长,或者其他情况导致客户端和Redis连接断开,也有可能出现多个线程同时持有一个锁的情况。

参考

  1. Redlock(redis分布式锁)原理分析