public class Example { // 类方法 public static int classMethod(int i, long l, float f, double d, Object o, byte b) { return 0; } // 实例方法 public int instanceMethod(char c, double d, short s, boolean b) { return 0; } }
类方法和实例方法栈帧结构有所不同,从图中可以看到它们之间的区别:
类方法帧里没有隐含的 this 引用,而实例方法帧中会隐含一个 this 引用;
并且注意:
byte,char,short,boolean 类型存入局部变量区的时候都会被转化成 int 类型值,当被存回堆或者方法区时,才会转化回原来的类型;
当一个方法执行完毕之后,要返回之前调用它的地方,因此在栈帧中必须保存一个方法返回地址。方法退出的过程实际上等同于把当前栈帧出栈,因此退出时可能执行的操作有:恢复上层方法的局部变量表和操作数栈,把返回值(如果有的话)压入调用都栈帧的操作数栈中,调用 PC 计数器的值以指向方法调用指令后面的一条指令等。
每个栈帧都包含一个指向运行时常量池中该栈帧所属方法的引用,持有这个引用是为了支持方法调用过程中的动态连接。在 Class 文件的常量池中存有大量的符号引用,字节码中的方法调用指令就以常量池中指向方法的符号引用为参数。这些符号引用一部分会在类加载阶段或第一次使用的时候转化为直接引用,这种转化称为静态解析。另外一部分将在每一次的运行期期间转化为直接引用,这部分称为动态连接。
public class GCLogTest { public static void main(String[] args) { int _1m = 1024 * 1024; byte[] data = new byte[_1m]; // 将data置null让其可被回收 data = null; System.gc(); } }
在 IDE 中设置 VM 参数-XX:+PrintGCDetails,再运行,可以得到:
1 2 3 4 5 6 7 8 9 10 11 12
[GC (System.gc()) [PSYoungGen: 5051K->776K(38400K)] 5051K->784K(125952K), 0.0014035 secs] [Times: user=0.01 sys=0.00, real=0.00 secs] [Full GC (System.gc()) Disconnected from the target VM, address: '127.0.0.1:55472', transport: 'socket' [PSYoungGen: 776K->0K(38400K)] [ParOldGen: 8K->684K(87552K)] 784K->684K(125952K), [Metaspace: 2980K->2980K(1056768K)], 0.0040080 secs] [Times: user=0.01 sys=0.00, real=0.00 secs] Heap PSYoungGen total 38400K, used 333K [0x0000000795580000, 0x0000000798000000, 0x00000007c0000000) eden space 33280K, 1% used [0x0000000795580000,0x00000007955d34a8,0x0000000797600000) from space 5120K, 0% used [0x0000000797600000,0x0000000797600000,0x0000000797b00000) to space 5120K, 0% used [0x0000000797b00000,0x0000000797b00000,0x0000000798000000) ParOldGen total 87552K, used 684K [0x0000000740000000, 0x0000000745580000, 0x0000000795580000) object space 87552K, 0% used [0x0000000740000000,0x00000007400ab0d0,0x0000000745580000) Metaspace used 2988K, capacity 4568K, committed 4864K, reserved 1056768K class space used 318K, capacity 392K, committed 512K, reserved 1048576K
java.lang.OutOfMemoryError: Java heap space 表示 Java 堆空间不足。当应用程序申请更多的内存时,若 Java 堆内存已经无法满足应用程序的需要,则将抛出这种异常。
1 2 3 4 5 6 7 8 9 10 11 12 13
/* VM Args: -Xms20m -Xmx20m -XX:+HeapDumpOnOutOfMemoryError */ public class HeapOOM { static class OOMObject {}
public static void main(String[] args) { List<OOMObject> list = new ArrayList<>(); while(true) { list.add(new OOMObject()); } } }
java.lang.OutOfMemoryError: PermGen space,表示 Java 永久代(方法区)的空间不足。永久代用于存放类的字节码和常量池,类的字节码被加载后存放在这个区域,这和存放对象实例的堆区是不同的。大多数 JVM 的实现都不会对永久代进行垃圾回收,因此,只要类加载过多就会出现这个问题。一般的应用程序都不会产生这个错误,然而,对于 Web 服务器会产生大量的 JSP,JSP 在运行时被动态地编译为 Java Servlet 类,然后加载到方法区,因此,有很多 JSP 的 Web 工程可能会产生这个异常。
1 2 3 4 5 6 7 8 9 10 11 12
/* VM Args: -XX:PermSize=10M -XX:MaxPermSize=10M */ public class RuntimeConstantPoolOOM { public static void main(String[] args) { // 使用List保持对常量池的引用,避免Full GC回收常量池 List<String> list = new ArrayList<String>(); for(int i = 0;; i++) { list.add(String.valueOf(i).intern()); } } }
使用 intern 测试运行时常量池是“永久代”的还是“元空间”的:
1 2 3 4
String str1 = new StringBuilder("计算机").append("软件").toString(); System.out.println(str1.intern() == str1); String str2 = new StringBuilder("ja").append("va").toString(); System.out.println(str2.intern() == str2);
11179 "New I/O worker #168" #299 daemon prio=5 os_prio=0 tid=0x00007f3a75127000 nid=0x5cf runnable [0x00007f37278f7000] 11180 java.lang.Thread.State: RUNNABLE 11181 at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) 11182 at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) 11183 at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) 11184 at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) 11185 - locked <0x00000000e4669320> (a sun.nio.ch.Util$2) 11186 - locked <0x00000000e4669310> (a java.util.Collections$UnmodifiableSet) 11187 - locked <0x00000000e46691e8> (a sun.nio.ch.EPollSelectorImpl) 11188 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) 11189 at org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68) 11190 at org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434) 11191 at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212) 11192 at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) 11193 at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) 11194 at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) 11195 at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) 11196 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 11197 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 11198 at java.lang.Thread.run(Thread.java:745)
class SuperClass { public static int value = 123; static { System.out.println("Super!"); } } class SubClass { static { System.out.println("Sub!"); } public static void main(String[] args) { System.out.println(SubClass.value); } }
class ConstClass { static { System.out.println("ConstClass!"); } public static final String HELLOWORLD = "hello world"; } public class NotInitialization { public static void main(String[] args) { System.out.println(ConstClass.HELLOWORLD); } }
此阶段的主要目的是确保 class 文件的字节流包含的信息符合虚拟机的要求,进行一部分的语义分析,主要是防止字节码中存在一些危险操作(数组越界、错误转型、跳转过头等),后来的 Java 虚拟机规范还规定了文件格式、符号引用等的验证。 不同的虚拟机,对类验证的实现可能有所不同,但大致都会完成下面四个阶段的验证:
文件格式验证,是要验证字节流是否符合 Class 文件格式的规范,并且能被当前版本的虚拟机处理。如验证魔数是否为 0xCAFEBABE;主、次版本号是否正在当前虚拟机处理范围之内;常量池的常量中是否有不被支持的常量类型。 该验证阶段的主要目的是保证输入的字节流能正确地解析并存储于方法区中,经过这个阶段的验证后,字节流才会进入内存的方法区中存储,所以后面的三个验证阶段都是基于方法区的存储结构进行的。
为类变量(static 变量)分配内存和并初始化为默认值,它们被分配在方法区中。 准备阶段不分配类中的实例变量的内存,实例变量将会在对象实例化时随着对象一起分配在 Java 堆中。 比如static int a = 1;在准备阶段后 a 的值为 0,在初始化阶段后才变成 1,但是对常量字段(static final),在准备阶段会直接赋予用户设定的值。
我们知道不同类加载器加载的类位于不同的命名空间,它们之间是相互隔离的,这里说的隔离仅仅指它们存储位置隔离,并不是说一个自定义的类 A 使用了 java.util.List 类就会报错。 自定义的类 A 一般会使用系统类加载器加载,而 java.util.List 则会由启动类加载器加载,当加载类 A 时如果遇到了 java.util.List,会首先尝试通过系统类加载器加载,在它发现自己无法加载后,通过双亲委派模型交给父加载器加载。
class A { int a; } public class JavaTest extends A { int a; @Test public void test() { a = 1; System.out.println(super.a); } }
为什么下面的报错? 类的初始化阶段有一个细节:类初始化块不能访问定义在其之后的变量
1 2 3 4 5 6 7
public class JavaTest { static { i = 0; System.out.println(i); // 报错 } static int i = 1; }
为什么输出两个’A’? 当我们 new A()时,首先为 A 分配内存空间,此时 A 已经存在了,只是还未初始化,然后调用 A 的构造函数,A 的构造函数又隐式调用了父类的构造函数。 在父类构造函数中使用 this 调用 draw(),this 实际上指向了 a 对象,平常调用方法时 this 也是隐含的。
class B { int a; B() { this.draw(); } void draw() { System.out.println("B"); } } class A extends B { int a; A() { draw(); } @Override void draw() { System.out.println("A"); } public static void main(String[] args) { A a = new A(); } }
为什么最后输出的 count2 为 0?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
class SingleTon { private static SingleTon singleTon = new SingleTon(); public static int count1; public static int count2 = 0; private SingleTon() { count1++; count2++; } public static SingleTon getInstance() { return singleTon; } } public class Test { public static void main(String[] args) { SingleTon singleTon = SingleTon.getInstance(); System.out.println("count1=" + singleTon.count1); System.out.println("count2=" + singleTon.count2); } }
class Father { public static void print(String str) { System.out.println("father " + str); } private void show(String str) { System.out.println("father " + str); } } class Son extends Father { } public class Test { public static void main(String[] args) { Son.print("coder"); // 调用的是Father的print()方法 //Father fa = new Father(); //fa.show("cooooder"); // 私有方法无法调用 } }
/** * 重载方法在编译器就可以进行确定,不需要等到运行期间 */ public class StaticDispatch { static class Human { } static class Women extends Human { } static class Men extends Human { }
public void sayHello(Human human) { System.out.println("say human"); } public void sayHello(Women women) { System.out.println("say women"); } public void sayHello(Men men) { System.out.println("say men"); }
public static void main(String[] args) { StaticDispatch ds = new StaticDispatch(); Human women = new Women(); Human men = new Men(); // 编译时确定方法的调用版本是以Human作为参数的方法 ds.sayHello(women); ds.sayHello(men); } }
public class DynamicDispatch { abstract static class Human { abstract public void sayHello(); }
static class Women extends Human { @Override public void sayHello() { System.out.println("say women"); } }
static class Men extends Human { @Override public void sayHello() { System.out.println("say men"); } }
public static void main(String[] args) { Human women = new Women(); Human men = new Men(); women.sayHello(); // 实际类型是Women men.sayHello(); // 实际类型是Men } }
public class Dispatch { static class QQ {}; static class _360{};
public static class Father { public void hardChoice(QQ arg) { System.out.println("father choose qq"); }
public void hardChoice(_360 arg) { System.out.println("father choose 360"); } }
public static class Son extends Father { public void hardChoice(QQ arg) { System.out.println("son choose qq"); }
public void hardChoice(_360 arg) { System.out.println("son choose 360"); } }
public static void main(String[] args) { Father father = new Father(); Father son = new Son(); father.hardChoice(new _360()); son.hardChoice(new QQ()); } }
public void print(Father father) { System.out.println(father); }
public void print(Child child) { System.out.println(child); }
public static class Father {
@Override public String toString() { return "Father"; } }
public static class Child extends Father {
@Override public String toString() { return "Child"; } }
public static void main(String[] args) { Father father = new Child(); DynamicDispatchTest dynamicDispatchTest = new DynamicDispatchTest(); dynamicDispatchTest.print(father); } }
18:29:28.186 [main] INFO c.t.l.r.RedissonTest - 调用成功, 当前 succeed:15, failed:0 18:29:33.475 [main] INFO c.t.l.r.RedissonTest - 调用失败, 当前 succeed:15, failed:1 18:29:38.750 [main] INFO c.t.l.r.RedissonTest - 调用失败, 当前 succeed:15, failed:2 18:29:39.253 [main] INFO c.t.l.r.RedissonTest - 调用成功, 当前 succeed:16, failed:2 18:29:39.756 [main] INFO c.t.l.r.RedissonTest - 调用成功, 当前 succeed:17, failed:2 18:29:40.259 [main] INFO c.t.l.r.RedissonTest - 调用成功, 当前 succeed:18, failed:2 18:29:40.763 [main] INFO c.t.l.r.RedissonTest - 调用成功, 当前 succeed:19, failed:2 18:29:45.271 [redisson-netty-2-4] INFO o.r.c.MasterSlaveEntry - master 127.0.0.1/127.0.0.1:7004 used as slave 18:29:45.274 [redisson-netty-2-14] INFO o.r.c.p.PubSubConnectionPool - 1 connections initialized for 127.0.0.1/127.0.0.1:7004 18:29:45.280 [redisson-netty-2-4] WARN o.r.c.ClusterConnectionManager - slave: redis://127.0.0.1:7001 has down for slot ranges: [[0-5460]] 18:29:45.285 [redisson-netty-2-28] INFO o.r.c.p.SlaveConnectionPool - 24 connections initialized for 127.0.0.1/127.0.0.1:7004
之后重启7001后,发现7001重新加入到了集群中:
1 2 3 4
18:34:18.539 [redisson-netty-2-29] INFO o.r.c.p.PubSubConnectionPool - 1 connections initialized for 127.0.0.1/127.0.0.1:7001 18:34:18.542 [redisson-netty-2-4] INFO o.r.c.MasterSlaveEntry - master 127.0.0.1/127.0.0.1:7004 excluded from slaves 18:34:18.542 [redisson-netty-2-4] INFO o.r.c.ClusterConnectionManager - slave: redis://127.0.0.1:7001 has up for slot ranges: [[0-5460]] 18:34:18.544 [redisson-netty-2-6] INFO o.r.c.p.SlaveConnectionPool - 24 connections initialized for 127.0.0.1/127.0.0.1:7001
/* If the key already exists in the dict ignore it. */ // c->bpop.keys 是一个集合(值为 NULL 的字典) // 它记录所有造成客户端阻塞的键 // 以下语句在键不存在于集合的时候,将它添加到集合 if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
incrRefCount(keys[j]);
/* And in the other "side", to map keys -> clients */ // c->db->blocking_keys 字典的键为造成客户端阻塞的键 // 而值则是一个链表,链表中包含了所有被阻塞的客户端 // 以下程序将阻塞键和被阻塞客户端关联起来 de = dictFind(c->db->blocking_keys,keys[j]); if (de == NULL) { // 链表不存在,新创建一个,并将它关联到字典中 int retval;
/* For every key we take a list of clients blocked for it */ l = listCreate(); retval = dictAdd(c->db->blocking_keys,keys[j],l); incrRefCount(keys[j]); redisAssertWithInfo(c,keys[j],retval == DICT_OK); } else { l = dictGetVal(de); } // 将客户端填接到被阻塞客户端的链表中 listAddNodeTail(l,c); } blockClient(c,REDIS_BLOCKED_LIST); }
// 遍历整个 ready_keys 链表 while(listLength(server.ready_keys) != 0) { list *l;
/* Point server.ready_keys to a fresh list and save the current one * locally. This way as we run the old list we are free to call * signalListAsReady() that may push new elements in server.ready_keys * when handling clients blocked into BRPOPLPUSH. */ // 备份旧的 ready_keys ,再给服务器端赋值一个新的 l = server.ready_keys; server.ready_keys = listCreate();
/* First of all remove this key from db->ready_keys so that * we can safely call signalListAsReady() against this key. */ // 从 ready_keys 中移除就绪的 key dictDelete(rl->db->ready_keys,rl->key);
/* If the key exists and it's a list, serve blocked clients * with data. */ // 获取键对象,这个对象应该是非空的,并且是列表 robj *o = lookupKeyWrite(rl->db,rl->key); if (o != NULL && o->type == REDIS_LIST) { dictEntry *de;
/* We serve clients in the same order they blocked for * this key, from the first blocked to the last. */ // 取出所有被这个 key 阻塞的客户端 de = dictFind(rl->db->blocking_keys,rl->key); if (de) { list *clients = dictGetVal(de); int numclients = listLength(clients);
// 还有元素可弹出(非 NULL) if (value) { /* Protect receiver->bpop.target, that will be * freed by the next unblockClient() * call. */ if (dstkey) incrRefCount(dstkey);
// 取消客户端的阻塞状态 unblockClient(receiver);
// 将值 value 推入到造成客户端 receiver 阻塞的 key 上 if (serveClientBlockedOnList(receiver, rl->key,dstkey,rl->db,value, where) == REDIS_ERR) { /* If we failed serving the client we need * to also undo the POP operation. */ listTypePush(o,value,where); }
if (dstkey) decrRefCount(dstkey); decrRefCount(value); } else { // 如果执行到这里,表示还有至少一个客户端被键所阻塞 // 这些客户端要等待对键的下次 PUSH break; } } } // 如果列表元素已经为空,那么从数据库中将它删除 if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key); /* We don't call signalModifiedKey() as it was already called * when an element was pushed on the list. */ }
/* Free this item. */ decrRefCount(rl->key); zfree(rl); listDelNode(l,ln); } listRelease(l); /* We have the new list on place at this point. */ } }
位图
场景
位图数据结构与 java 中的 Set 类似,占用空间小,但是并不能防止冲突,适合数据离散性比较大且对数据准确性不高的场景。
// 从第一个位(0)开始取4个位,结果是无符号数(u) bitfield s get u4 0 // 从第三个位(2)开始取3个位,结果是有符号数(i) bitfield s get i3 2 // 一次性执行多个子命令 bitfield s get u4 0 get u3 2 get i4 0 get i3 2
如果 s 的值是 c,二进制值是0110 0011,取前 4 位结果是 6(0110)。
1 2 3 4 5 6
// +1,结果为0,因为溢出了 bitfield s incrby u2 1 1 // 不执行,返回nil bitfield s overflow fail incrby u2 1 1 // 结果为3 bitfield s overflow sat incrby u2 1 1
不论是桌面应用还是 Web 应用,多线程代码都是比较难玩得转的,玩不明白的结果就是一大堆令人毛骨悚然且难以捉摸、难以调试的问题——实际上,一旦你意识到正在处理一个并发问题,你可能就不得不完全放弃调试了,并转而手动检查代码。 鉴于此,我们当然是希望尽量避免并发问题的,理想情况下希望完全避免多线程错误,同样,不存在那种一刀切的方法,但这有一些调试和防止多线程错误的实际考虑因素:
数据一致性问题 本来好好的,A 系统调用 BC 系统接口,如果 BC 系统出错了,会抛出异常,返回给 A 系统让 A 系统知道,这样的话就可以做回滚操作了 但是使用了 MQ 之后,A 系统发送完消息就完事了,认为成功了。而刚好 C 系统写数据库的时候失败了,但是 A 认为 C 已经成功了?这样一来数据就不一致了。
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; }
public void onData(ByteBuffer bb) { // Grab the next sequence long sequence = ringBuffer.next(); try { // Get the entry in the Disruptor LongEvent event = ringBuffer.get(sequence); // for the sequence Fill with data event.set(bb.getLong(0)); } finally { ringBuffer.publish(sequence); } } }
public static void main(String[] args) { // Executor that will be used to construct new threads for consumers Executor executor = Executors.newCachedThreadPool();
// The factory for the event LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024;
// Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, executor);
// Connect the handler disruptor.handleEventsWith(new LongEventHandler());
// Start the Disruptor, starts all threads running disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); producer.onData(bb); } } }
// 检查 master id 是否和 runid 一致,只有一致的情况下才考虑执行psync if (strcasecmp(master_runid, server.runid)) { /* Run id "?" is used by slaves that want to force a full resync. */ // 从服务器提供的 run id 和服务器的 run id 不一致 if (master_runid[0] != '?') { redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: " "Runid mismatch (Client asked for runid '%s', my runid is '%s')", master_runid, server.runid); // 从服务器提供的 run id 为 '?' ,表示强制 FULL RESYNC } else { redisLog(REDIS_NOTICE,"Full resync requested by slave."); } // 需要 full resync goto need_full_resync; }
// 判断当前Slave带来的offset在Master的backlog中是否还能找到,找不到则执行全量复制 if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != REDIS_OK) goto need_full_resync;
// 如果没有backlog if (!server.repl_backlog || // 或者 psync_offset 小于 server.repl_backlog_off // (想要恢复的那部分数据已经被覆盖) psync_offset < server.repl_backlog_off || // psync offset 大于 backlog 所保存的数据的偏移量 psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) { // 执行 FULL RESYNC redisLog(REDIS_NOTICE, "Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset); if (psync_offset > server.master_repl_offset) { redisLog(REDIS_WARNING, "Warning: slave tried to PSYNC with an offset that is greater than the master replication offset."); } goto need_full_resync; }
void syncCommand(redisClient *c) { ... if (!strcasecmp(c->argv[0]->ptr,"psync")) { // 尝试进行 PSYNC if (masterTryPartialResynchronization(c) == REDIS_OK) { // 可执行 PSYNC server.stat_sync_partial_ok++; return; /* No full resync needed, return. */ } else { // 不可执行 PSYNC char *master_runid = c->argv[1]->ptr; /* Increment stats for failed PSYNCs, but only if the * runid is not "?", as this is used by slaves to force a full * resync on purpose when they are not albe to partially * resync. */ if (master_runid[0] != '?') server.stat_sync_partial_err++; } } ... }
int masterTryPartialResynchronization(redisClient *c) { ...
/* If we reached this point, we are able to perform a partial resync: * 程序运行到这里,说明可以执行 partial resync * * 1) Set client state to make it a slave. * 将客户端状态设为 salve * * 2) Inform the client we can continue with +CONTINUE * 向 slave 发送 +CONTINUE ,表示 partial resync 的请求被接受 * * 3) Send the backlog data (from the offset to the end) to the slave. * 发送 backlog 中,客户端所需要的数据 */ c->flags |= REDIS_SLAVE; c->replstate = REDIS_REPL_ONLINE; c->repl_ack_time = server.unixtime; listAddNodeTail(server.slaves,c); /* We can't use the connection buffers since they are used to accumulate * new commands at this stage. But we are sure the socket send buffer is * emtpy so this write will never fail actually. */ // 向从服务器发送一个同步 +CONTINUE ,表示 PSYNC 可以执行 buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); if (write(c->fd,buf,buflen) != buflen) { freeClientAsync(c); return REDIS_OK; } // 发送 backlog 中的内容(也即是从服务器缺失的那些内容)到从服务器 psync_len = addReplyReplicationBacklog(c,psync_offset); redisLog(REDIS_NOTICE, "Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset); ... }
void processInputBufferAndReplicate(client *c) { // 处理命令然后广播命令 // if this is a slave, we just process the commands if (!(c->flags & CLIENT_MASTER)) { processInputBuffer(c); } else { /* If the client is a master we need to compute the difference * between the applied offset before and after processing the buffer, * to understand how much of the replication stream was actually * applied to the master state: this quantity, and its corresponding * part of the replication stream, will be propagated to the * sub-replicas and to the replication backlog. */ size_t prev_offset = c->reploff; processInputBuffer(c); // applied is how much of the replication stream was actually applied to the master state size_t applied = c->reploff - prev_offset; if (applied) {
/* Here we need to check if there is a background saving operation * in progress, or if it is required to start one */ // 检查是否有 BGSAVE 在执行 if (server.rdb_child_pid != -1) { /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another slave that is * registering differences since the server forked to save */ redisClient *slave; listNode *ln; listIter li;
if (ln) { /* Perfect, the server is already registering differences for * another slave. Set the right state, and copy the buffer. */ // 幸运的情况,可以使用目前 BGSAVE 所生成的 RDB copyClientOutputBuffer(c,slave); // 设置复制状态 c->replstate = REDIS_REPL_WAIT_BGSAVE_END; redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { /* No way, we need to wait for the next BGSAVE in order to * register differences */ // 不好运的情况,必须等待下个 BGSAVE c->replstate = REDIS_REPL_WAIT_BGSAVE_START; redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); } } else { /* Ok we don't have a BGSAVE in progress, let's start one */ // 没有 BGSAVE 在进行,开始一个新的 BGSAVE redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC"); if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) { redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); addReplyError(c,"Unable to perform background save"); return; } // 设置状态 c->replstate = REDIS_REPL_WAIT_BGSAVE_END; /* Flush the script cache for the new slave. */ // 因为新 slave 进入,刷新复制脚本缓存 replicationScriptCacheFlush(); } ...
# It is possible for a master to stop accepting writes if there are less than # N slaves connected, having a lag less or equal than M seconds. # # The N slaves need to be in "online" state. # # The lag in seconds, that must be <= the specified value, is calculated from # the last ping received from the slave, that is usually sent every second. # # This option does not GUARANTEES that N replicas will accept the write, but # will limit the window of exposure for lost writes in case not enough slaves # are available, to the specified number of seconds. # # For example to require at least 3 slaves with a lag <= 10 seconds use: # # min-slaves-to-write 3 # min-slaves-max-lag 10 # # Setting one or the other to 0 disables the feature. # # By default min-slaves-to-write is set to 0 (feature disabled) and # min-slaves-max-lag is set to 10.
// 创建 epoll 实例 state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ if (state->epfd == -1) { zfree(state->events); zfree(state); return -1; }
// 1、看是否有事件到达了执行时间 // 2、如果有,则执行这些事件 /* Process every pending time event, then every pending file event * (that may be registered by time event callbacks just processed). * * 处理所有已到达的时间事件,以及所有已就绪的文件事件。 * * Without special flags the function sleeps until some file event * fires, or when the next time event occurs (if any). * * 如果不传入特殊 flags 的话,那么函数睡眠直到文件事件就绪, * 或者下个时间事件到达(如果有的话)。 * * If flags is 0, the function does nothing and returns. * 如果 flags 为 0 ,那么函数不作动作,直接返回。 * * if flags has AE_ALL_EVENTS set, all the kind of events are processed. * 如果 flags 包含 AE_ALL_EVENTS ,所有类型的事件都会被处理。 * * if flags has AE_FILE_EVENTS set, file events are processed. * 如果 flags 包含 AE_FILE_EVENTS ,那么处理文件事件。 * * if flags has AE_TIME_EVENTS set, time events are processed. * 如果 flags 包含 AE_TIME_EVENTS ,那么处理时间事件。 * * if flags has AE_DONT_WAIT set the function returns ASAP until all * the events that's possible to process without to wait are processed. * 如果 flags 包含 AE_DONT_WAIT , * 那么函数在处理完所有不许阻塞的事件之后,即刻返回。 * * The function returns the number of events processed. * 函数的返回值为已处理事件的数量 */ int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents;
/* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp;
// 获取最近的时间事件 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { // 如果时间事件存在的话 // 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间 long now_sec, now_ms;
/* Calculate the time missing for the nearest * timer to fire. */ // 计算距今最近的时间事件还要多久才能达到 // 并将该时间距保存在 tv 结构中 aeGetTime(&now_sec, &now_ms); tvp = &tv; tvp->tv_sec = shortest->when_sec - now_sec; if (shortest->when_ms < now_ms) { tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; tvp->tv_sec --; } else { tvp->tv_usec = (shortest->when_ms - now_ms)*1000; }
/* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { // 设置文件事件不阻塞 tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ // 文件事件可以阻塞直到有事件到达为止 tvp = NULL; /* wait forever */ } }
int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ // 读事件 if (fe->mask & mask & AE_READABLE) { // rfired 确保读/写事件只能执行其中一个 rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } // 写事件 if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); }
processed++; } }
/* Check time events */ // 执行时间事件 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */ }