发号器

发号器几乎是最简单的一个中间件了,它旨在生成一个全局唯一ID,用于在业务领域内标识一个对象。

发号器方案

单纯的生成全局 ID 并不是什么难题,但是生成的 ID 通常要满足高并发、分布式等要求:

  1. 全局唯一;
  2. 不要太长,最好 64bit。使用 long 比较好操作,如果是 96bit,那就要各种移位相当的不方便,还有可能有些组件不能支持这么大的 ID;
  3. 独立,业务不需要冠心生成规则;
  4. 安全,至少不能暴露生成规律;
  5. 递增性,比如以时间为序,或者 ID 里包含时间,这样一是可以少一个索引,二是冷热数据容易分离;
  6. 可以控制 ShardingId。比如某一个用户的文章要放在同一个分片内,这样查询效率高,修改也容易。
  7. 不能有单点问题。

简单解决方案

UUID、MySQL自增、Redis-incr都是老生常谈了,Redis方案还需要在数据库里多维护一张号段表来增加可用性,这些方案都相对简单,适合在一些小场景中使用,不适合作为一个发号器的实现核心。

Flicker

因为 MySQL 本身支持 auto_increment 操作,很自然地,我们会想到借助这个特性来实现这个功能。
Flicker 在解决全局 ID 生成方案里就采用了 MySQL 自增长 ID 的机制(auto_increment + replace into + MyISAM)。一个生成 64 位 ID 方案具体就是这样的:

  1. 创建单独的数据库 (eg:ticket),然后创建一个表:
    1
    2
    3
    4
    5
    6
    CREATE TABLE Tickets64 (
    id bigint(20) unsigned NOT NULL auto_increment,
    stub char(1) NOT NULL default '',
    PRIMARY KEY (id),
    UNIQUE KEY stub (stub)
    ) ENGINE=MyISAM
  2. 当我们插入记录后,执行SELECT * from Tickets64,查询结果就是这样的:
    1
    2
    3
    4
    5
    +-------------------+------+
    | id | stub |
    +-------------------+------+
    | 72157623227190423 | a |
    +-------------------+------+
    在我们的应用端需要做下面这两个操作,在一个事务会话里提交:
    1
    2
    REPLACE INTO Tickets64 (stub) VALUES ('a');
    SELECT LAST_INSERT_ID();
    这样我们就能拿到不断增长且不重复的 ID 了。
  3. 我们只是在单台数据库上生成 ID,从高可用角度考虑,接下来就要解决单点故障问题:Flicker 启用了两台数据库服务器来生成 ID,通过区分 auto_increment 的起始值和步长来生成奇偶数的 ID。
    1
    2
    3
    4
    5
    6
    7
    TicketServer1:
    auto-increment-increment = 2
    auto-increment-offset = 1

    TicketServer2:
    auto-increment-increment = 2
    auto-increment-offset = 2
  4. 在客户端只需要通过轮询方式取 ID。

优点:充分借助数据库的自增 ID 机制,提供高可靠性,生成的 ID 有序。
缺点:占用两个独立的 MySQL 实例,有些浪费资源,成本较高。

Twitter - Snowflake

背景:Twitter 在把存储系统从 MySQL 迁移到 Cassandra 的过程中由于 Cassandra 没有顺序 ID 生成机制,于是自己开发了一套全局唯一 ID 生成服务:Snowflake。

  • 41 位的时间序列(精确到毫秒,41 位的长度可以使用 69 年)10 位的机器标识(10 位的长度最多支持部署 1024 个节点)
  • 12 位的计数顺序号(12 位的计数顺序号支持每个节点每毫秒产生 4096 个 ID 序号) 最高位是符号位,始终为 0。
    维度1:时间:用前面 41 bit 来表示时间,精确到毫秒,可以表示69年的数据
    维度2:机器 ID:用10 bit 来表示,也就是说可以部署 1024 台机器
    维度3:序列数:用 12 bit 来表示,意味着每台机器,每毫秒最多可以生成 4096个 ID

优点:高性能,低延迟;独立的应用;按时间有序。充分把信息保存到 ID 里。
缺点:需要独立的开发和部署,结构略复杂。

附:官方的代码,参考下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package com.github.arrayedu;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Random;

/**
* Snowflake idworker ,from twitter snowflake project .
*/
public class IdWorker {

protected static final Logger LOGGER = LoggerFactory.getLogger(IdWorker.class);
private long workerId;
private long datacenterId;
private long sequence = 0L;

private long twepoch = 1451577600000L;

private long workerIdBits = 5L;
private long datacenterIdBits = 5L;
private long maxWorkerId = ~(-1L << workerIdBits);
private long maxDatacenterId = ~(-1L << datacenterIdBits);
private long sequenceBits = 12L;

private long workerIdShift = sequenceBits;
private long datacenterIdShift = sequenceBits + workerIdBits;
private long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
private long sequenceMask = ~(-1L << sequenceBits);

private long lastTimestamp = -1L;

private Random random = new Random();

/**
*
*/
public IdWorker() {
this.workerId = -1;
this.datacenterId = -1;
}

/**
* @param workerId 10 bits workerId,from 0 to 1023
*/
public IdWorker(long workerId) {
this((workerId & ~(-1 << 5)), workerId >> 5);
}

/**
* @param workerId 5 bits worker id ,from 0 to 31
* @param datacenterId 5 bits data center id ,from 0 to 31
*/
public IdWorker(long workerId, long datacenterId) {
// sanity check for workerId
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
if (datacenterId > maxDatacenterId || datacenterId < 0) {
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
}
this.workerId = workerId;
this.datacenterId = datacenterId;
LOGGER.info(String.format("worker starting. timestamp left shift %d, datacenter id bits %d, worker id bits %d, sequence bits %d, workerid %d", timestampLeftShift, datacenterIdBits, workerIdBits, sequenceBits, workerId));
}

public synchronized long nextId() {
long timestamp = timeGen();

if (timestamp < lastTimestamp) {
LOGGER.error(String.format("clock is moving backwards. Rejecting requests until %d.", lastTimestamp));
throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
}

if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
timestamp = tilNextMillis(lastTimestamp);
}
} else {
// sequence = 0L;//this is twitter version
sequence = random.nextInt(10);// optimize the random number of 0~9 to prevent the id distribution is not balanced
}

lastTimestamp = timestamp;

long identity;
if (datacenterId != -1 && workerId != -1) {
identity = (datacenterId << datacenterIdShift) | (workerId << workerIdShift);
} else {
//get id from InstanceIdManager
identity = InstanceIdProxy.getInstanceId();
}

return ((timestamp - twepoch) << timestampLeftShift) | identity | sequence;
}
/**
* 保证返回的毫秒数在参数lastTimestamp之后
*/
private long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
/**
* 系统当前毫秒数
*/
private long timeGen() {
return System.currentTimeMillis();
}
}

Instagram

Instagram 参考了 Flickr 的方案,再结合 Twitter 的经验,利用 Postgres 数据库的特性,实现了一个更简单可靠的 ID 生成服务。

  • 使用 41 bit 来存放时间,精确到毫秒,可以使用41年。
  • 使用 13 bit 来存放逻辑分片 ID。
  • 使用 10 bit 来存放自增长 ID,意味着每台机器,每毫秒最多可以生成 1024 个 ID

以 Instagram 举的例子为说明:
假定时间是 September 9th, 2011, at 5:00pm,则毫秒数是 1387263000(直接使用系统得到的从 1970 年开始的毫秒数)。

  1. 把时间数据放到 ID 里:
    id = 1387263000 <<(64-41)
  2. 分片 ID 放到ID里,假定用户 ID 是 61672,有 1298 个逻辑分片,则分片 ID 是 61672 % 1298-> 666
    id |= 666 <<(64-41-13)
  3. 把自增序列放 ID 里,假定前一个序列是 6000,则新的序列是 6001:
    id |= (6001 % 1024)
    这样就得到了一个全局的分片 ID。

优势:充分把信息保存到 ID 里。 充分利用数据库自身的机制,程序完全不用额外处理,直接插入到对应的分片的表即可。

ShardingJDBC中的发号器

RocketMQ中的消息ID生成

消息发送前,给消息设置一个唯一ID:MessageClientIDSetter#setUniqID

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void setUniqID(final Message msg) {
if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());
}
}

public static String createUniqID() {
StringBuilder sb = new StringBuilder(LEN * 2);
sb.append(FIX_STRING);
sb.append(UtilAll.bytes2string(createUniqIDBuffer()));
return sb.toString();
}

private static byte[] createUniqIDBuffer() {
ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
long current = System.currentTimeMillis();
if (current >= nextStartTime) {
setStartTime(current);
}
// 4个字节的时间戳
buffer.putInt((int) (System.currentTimeMillis() - startTime));
// 2个字节的计数器
buffer.putShort((short) COUNTER.getAndIncrement());
return buffer.array();
}