Disruptor 原理

Disruptor 相对于传统方式(普通)的优点

  1. 无锁
    相对 Lock 来说效率更高(线程不需要挂起,只涉及到一次内存交换速度快),但是同时会带来 ABA 问题,且多线程下竞争容易产生空转。
  2. 所有访问者都记录自己的序号的实现方式,允许多个生产者与多个消费者共享相同的数据结构。
  3. 在每个对象中都能跟踪序列号(ring buffer,claim Strategy,生产者和消费者),加上神奇的 cache line padding,就意味着没有为伪共享和非预期的竞争。

如何使用 Disruptor

1
2
3
4
5
6
7
8
9
10
11
12
public class LongEvent {

private long value;

public void set(long value) {
this.value = value;
}

public long getValue() {
return this.value;
}
}
1
2
3
4
5
6
7
public class LongEventFactory implements EventFactory<LongEvent> {

@Override
public LongEvent newInstance() {
return new LongEvent();
}
}
1
2
3
4
5
6
7
public class LongEventHandler implements EventHandler<LongEvent> {

@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("Event: " + event.getValue() + " sequence:+" + sequence + " endOfBatch:" + endOfBatch);
}
}
1
2
3
4
5
6
7
public class LongEventHandler implements EventHandler<LongEvent> {

@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("Event: " + event.getValue() + " sequence:+" + sequence + " endOfBatch:" + endOfBatch);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class LongEventProducer {

private final RingBuffer<LongEvent> ringBuffer;

public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}

public void onData(ByteBuffer bb) {
// Grab the next sequence
long sequence = ringBuffer.next();
try {
// Get the entry in the Disruptor
LongEvent event = ringBuffer.get(sequence);
// for the sequence Fill with data
event.set(bb.getLong(0));
} finally {
ringBuffer.publish(sequence);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class LongEventMain {

public static void main(String[] args) {
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();

// The factory for the event
LongEventFactory factory = new LongEventFactory();

// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;

// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, executor);

// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());

// Start the Disruptor, starts all threads running
disruptor.start();

// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

LongEventProducer producer = new LongEventProducer(ringBuffer);

ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++) {
bb.putLong(0, l);
producer.onData(bb);
}
}
}

QA

  1. 并发框架 Disruptor 译文