概述
序列化(Serialization):事件A必须在事件B之前发生。
互斥(Mutual exclusion):事件A和B不能同时发生。
同步方式
使用消息同步
线程A:
线程B:
B会等待A发来消息后再执行后续的指令。
共享变量
1、并发写
线程A
线程B
这两个线程并发执行,最后打印出来的结果不确定是5还是7。
2、并发更新
线程A:
线程B:
两个线程的操作都是读后写,可能就会发生同时读出旧值然后都+1,最终结果并没有+2的情况。
3、通过发消息互斥执行
通过发消息保证共享变量的安全更新。
信号量
支持PV操作,P原子地减少信号量值,当值为0时阻塞,V原子地增加信号量值。
信号量的优点:
- 信号量的约定使得代码更不容易出错;
- 信号量在很多系统都有实现,使用信号量是可移植的。
基础同步模式
发信号(Signaling)
一个线程发消息给另一个线程告知某件事情的发生。
线程A:
1 2
| statement a1 sem.signal()
|
线程B:
只有A signal发出消息后,B才能从wait离开继续执行。
在Java中,发信号的功能可以通过Object的wait/notify、Lock的Condition、LockSupport、Semaphore实现。
Rendezvous
不知道怎么翻译,叫做汇聚?作者给出的是类似下面这样的例子:
线程A:
1 2 3 4
| statement a1 aArrived.signal() bArrived.wait() statement a2
|
线程B:
1 2 3 4
| statement b1 bArrived.signal() aArrived.wait() statement b2
|
注意signal和wait不要写反了,写反了会死锁。
在Java中,可以通过CyclicBarrier
实现。
互斥量(Mutex)
使用信号量可以实现互斥量,实际上互斥量可以看作Semaphore(1)
,使用以下代码就可以实现两个线程的互斥执行:
线程A:
1 2 3
| mutex.wait() count = count + 1 mutex.signal()
|
线程B:
1 2 3
| mutex.wait() count = count + 1 mutex.signal()
|
上边mutex包围的代码就称为临界区代码(critical section)。
多路复用(Multiplex)
将上边的互斥量泛化,我们让多个线程可以同时执行一块临界区代码。
其实就是用Semaphore(n)
就可以实现n个线程同时执行了。
栅栏(Barrier)
只有所有线程都到达某个位置才能一块继续执行下去,栅栏可以通过以下代码实现(有bug,会出现死锁):
1 2 3
| count = 0 mutex = Semaphore(1) barrier = Semaphore(0)
|
1 2 3 4 5 6 7 8 9 10 11
| rendezvous // 汇聚
mutex.wait() count = count + 1 mutex.signal()
if count == n barrier.signal()
barrier.wait() 其他代码
|
如果直接拿来执行,容易发现只有1个线程能执行下去,因为:
- 假设有n=5,即5个线程并发执行;
- 前4个线程到了
barrier.wait
后barrier的值变为-4;
- 第5个线程
barrier.signal
释放了1个,barrier的值变为-3,此时只有一个线程被放过去了,另外还有3个线程仍阻塞,且第5个线程随后也会进入阻塞状态。
修改后的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12
| rendezvous // 汇聚
mutex.wait() count = count + 1 mutex.signal()
if count == n barrier.signal()
barrier.wait() barrier.sign 其他代码
|
并发问题解题模型
分析问题时:
- 寻找角色,每个角色对应一个独立线程;
- 寻找共享资源,每个共享资源对应一个信号量(或其他并发控制类);
- 按场景描述进行模拟;
交错打印
两个线程交替打印数组的功能,虽然比较简单,但是面试时问的还蛮多的,如果用Semaphore实现会比较简单,用Java的wait/notify或Condition实现则会稍微麻烦一点。
Semaphore实现交错打印
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public class OneByOneTest {
static int count = 0; static Semaphore first = new Semaphore(1); static Semaphore second = new Semaphore(0);
public static void main(String[] args) { Thread t1 = new Thread(() -> { while(true) { try { first.acquire(); Thread.sleep(10); System.out.println("线程1: " + count++); second.release(); }catch (Exception e) { e.printStackTrace(); } } }); Thread t2 = new Thread(() -> { while(true) { try { second.acquire(); Thread.sleep(10); System.out.println("线程2: " + count++); first.release(); }catch (Exception e) { e.printStackTrace(); } } }); t1.start(); t2.start(); } }
|
Condition
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public class OneByOneConditionTest {
private static int counter = 0; private static Lock lock = new ReentrantLock(); private static Condition first = lock.newCondition(); private static Condition second = lock.newCondition(); private static int currentPrinter = 0;
public static void main(String[] args) { Thread t1 = new Thread(() -> { while(true) { try { lock.lock(); while(currentPrinter == 1) { first.await(); } Thread.sleep(100); System.out.println("线程1: " + counter++); second.signal(); currentPrinter = 1; lock.unlock(); } catch (Exception e) { e.printStackTrace(); } } }); Thread t2 = new Thread(() -> { while(true) { try { lock.lock(); while(currentPrinter == 0) { second.await(); } Thread.sleep(100); System.out.println("线程2: " + counter++); first.signal(); currentPrinter = 0; lock.unlock(); } catch (Exception e) { e.printStackTrace(); } } }); t1.start(); t2.start(); } }
|
wait/notify
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| public class WaitNotiftTest {
private static int count = 1; private static Object lock = new Object(); private static int cur = 1;
static class A implements Runnable {
@Override public void run() { while (true) { try { synchronized (lock) { if (count > 100) { return; } while (cur != 1) { lock.wait(); } System.out.println("第一个线程输出: " + count++); Thread.sleep(100); cur = 2; lock.notifyAll(); } } catch (InterruptedException e) { e.printStackTrace(); } } } }
static class B implements Runnable {
@Override public void run() { while (true) { try { synchronized (lock) { if (count > 100) { return; } while (cur != 2) { lock.wait(); } System.out.println("第二个线程输出: " + count++); Thread.sleep(100); cur = 1; lock.notifyAll(); } } catch (InterruptedException e) { e.printStackTrace(); } } } }
public static void main(String[] args) { Thread a = new Thread(new A()); Thread b = new Thread(new B()); a.start(); b.start(); } }
|
Producer-Consumer
解决生产者/消费者问题需要维护一个队列,生产者向队列添加,消费者从队列获取,同步问题出现在队列为空或满的情况,因此我们需要对队列进行同步化。
为了简化问题,可以使用 juc 引入的 BlockingQueue(阻塞队列),这种数据结构能在下面两种情况下阻塞当前线程
- 当队列为空时,调用 take 或 poll
- 当队列满时,调用 put 或 offer
使用 Semaphore 实现Producer/Consumer代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| public class ProducerConsumerTest {
static Semaphore mutex = new Semaphore(1); static Semaphore items = new Semaphore(0); static Semaphore spaces = new Semaphore(10);
static int resource = 0;
static class Producer implements Runnable {
@Override public void run() { int count = 5; while (count-- > 0) { try { spaces.acquire(); mutex.acquire(); resource++; System.out.println("Producer添加一个,现在resource=" + resource); mutex.release(); items.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
static class Consumer implements Runnable {
@Override public void run() { int count = 5; while (count-- > 0) { try { items.acquire(); mutex.acquire(); resource--; System.out.println("Consumer消费一个,现在resource=" + resource); mutex.release(); spaces.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public static void main(String[] args) { Thread a = new Thread(new Producer()); Thread b = new Thread(new Consumer()); a.start(); b.start(); } }
|
使用 BlockingQueue 实现生产者/消费者代码
下面是使用 BlockingQueue 实现的生产者/消费者代码
注意要使用put/take这组方法,而不是offer/poll,因为后者会在满/空时直接返回(而非阻塞等待)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| public class Producer implements Runnable { private final BlockingQueue<String> queue;
public Producer(BlockingQueue<String> queue) { this.queue = queue; }
@Override public void run() { for(int i = 0; i < 10; i++) { try { queue.put("Course" + (i + 1)); System.out.println("Complete production:Course" + (i + 1)); Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public class Consumer implements Runnable { private final BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> queue) { this.queue = queue; }
@Override public void run() { for(int i = 0; i < 10; i++) { try { String course = queue.take(); System.out.println("Complete consumption:" + course); Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public class Main { public static void main(String[] args) { BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2); Thread t1 = new Thread(new Producer(queue)); Thread t2 = new Thread(new Consumer(queue)); t1.start(); t2.start(); try { t1.join(); t2.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }
|
Readers-writers
读写问题中有两类线程:
- 读线程:多个读线程可以同时在临界区;
- 写线程:多个写线程之间互斥,与读线程也互斥。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| public class ReaderWriterTest {
// 当前有多少读线程正在读 static int readers = 0; // 保护readers计数器 static Semaphore mutex = new Semaphore(1); // 0表示有线程正在临界区,1表示没有 static Semaphore rootEmpty = new Semaphore(1);
static Runnable reader = () -> { try { mutex.acquire(); readers++; // 第一个来的需要等锁释放,其他reader会在mutex那里等着 if(readers == 1) { rootEmpty.acquire(); } mutex.release(); System.out.println("读取开始, readers:" + readers); Thread.sleep(1000); System.out.println("读取结束, readers:" + readers); mutex.acquire(); readers--; if(readers == 0) { rootEmpty.release(); } mutex.release(); } catch (InterruptedException e) { e.printStackTrace(); } };
static Runnable writer = () -> { try { rootEmpty.acquire(); System.out.println("写入开始"); Thread.sleep(3000); System.out.println("写入结束"); rootEmpty.release(); } catch (InterruptedException e) { e.printStackTrace(); } };
public static void main(String[] args) { Thread reader1 = new Thread(reader); Thread reader2 = new Thread(reader); Thread reader3 = new Thread(reader); Thread reader4 = new Thread(reader); Thread reader5 = new Thread(reader); Thread reader6 = new Thread(reader); Thread writer1 = new Thread(writer); reader1.start(); reader2.start(); reader3.start(); reader4.start(); reader5.start(); reader6.start(); writer1.start(); } }
|
上面的代码有个问题,就是写线程可能被饿死,因为第一个读线程通过rootEmpty.acquire
进来后,后续的读线程都不必再等待,可以直接进入临界区,而同时执行的写线程就永远都等在rootEmpty.acquire
上了。
改成如下的方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| public class ReaderWriterTest {
// 当前有多少读线程正在读 static int readers = 0; // 保护readers计数器 static Semaphore mutex = new Semaphore(1); // 0表示有线程正在临界区,1表示没有 static Semaphore rootEmpty = new Semaphore(1); // 控制reader和writer获取锁 static Semaphore turnstile = new Semaphore(1);
static Runnable reader = () -> { try { turnstile.acquire(); turnstile.release(); mutex.acquire(); readers++; // 第一个来的需要等锁释放,其他reader会在mutex那里等着 if(readers == 1) { rootEmpty.acquire(); } mutex.release(); System.out.println("读取开始, readers:" + readers); Thread.sleep(1000); System.out.println("读取结束, readers:" + readers); mutex.acquire(); readers--; if(readers == 0) { rootEmpty.release(); } mutex.release(); } catch (InterruptedException e) { e.printStackTrace(); } };
static Runnable writer = () -> { try { turnstile.acquire(); rootEmpty.acquire(); System.out.println("写入开始"); Thread.sleep(3000); System.out.println("写入结束"); turnstile.release(); rootEmpty.release(); } catch (InterruptedException e) { e.printStackTrace(); } };
public static void main(String[] args) { Thread reader1 = new Thread(reader); Thread reader2 = new Thread(reader); Thread reader3 = new Thread(reader); Thread reader4 = new Thread(reader); Thread reader5 = new Thread(reader); Thread reader6 = new Thread(reader); Thread writer1 = new Thread(writer); reader1.start(); reader2.start(); reader3.start(); reader4.start(); reader5.start(); reader6.start(); writer1.start(); } }
|
Dining philosophers(哲学家就餐)
我们先来看下最开始最直观的一种错误解法,这种解法会导致死锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| public class DiningPhilosophersTest {
private static final int count = 2;
private Semaphore[] forks = new Semaphore[count];
{ for (int i = 0; i < forks.length; i++) { forks[i] = new Semaphore(1); } }
private int left(int i) { return i; }
private int right(int i) { return (i + 1) % forks.length; }
private void getForks(int i) throws InterruptedException { forks[left(i)].acquire(); forks[right(i)].acquire(); }
private void putForks(int i) { forks[left(i)].release(); forks[right(i)].release(); }
public void doDining() { Thread[] threads = new Thread[count]; for (int i = 0; i < threads.length; i++) { int finalI = i; threads[i] = new Thread(() -> { while (true) { try { getForks(finalI); System.out.println(finalI + " 开始就餐"); Thread.sleep(10); System.out.println(finalI + " 结束就餐"); putForks(finalI); } catch (Exception e) { e.printStackTrace(); } } }); threads[i].start(); } }
public static void main(String[] args) { DiningPhilosophersTest test = new DiningPhilosophersTest(); test.doDining(); } }
|
显然,n位哲学家刚开始都没有处于就餐状态,如果他们同时拿起左边的叉子,然后尝试取右边的叉子,就会直接导致死锁。
减少同时获取叉子的哲学家数量
注意,上面发生死锁的必要条件是“n位哲学家同时就餐”,如果n位无法同时就餐,那这个问题也就迎刃而解了,所以我们额外引入一个footman
信号量,它的数量控制在n - 1
:
1 2 3 4 5 6 7 8 9 10 11 12 13
| private Semaphore footman = new Semaphore(count - 1);
private void getForks(int i) throws InterruptedException { footman.acquire(); forks[left(i)].acquire(); forks[right(i)].acquire(); }
private void putForks(int i) { forks[left(i)].release(); forks[right(i)].release(); footman.release(); }
|
同时存在左撇子(先拿左手边叉子的)和右撇子(先拿右手边叉子的)
另外一种解决办法是让一个哲学家先获取右边的叉子再获取左边的叉子,这样其实解除了环路等待条件:假设有5个哲学家,其中4个哲学家拿到左手的叉子后,第五个哲学家会尝试取第一个叉子,也就是第一个哲学家左手的叉子,他们两个不满足死锁的条件。
这种思路的代码比较简单,就先忽略了。
Tanenbaum的解
这是一种相对比较极端的解,每个哲学家都需要等两边的人不在就餐的情况下才能就餐,否则他什么都不做:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| private static final int count = 2;
private Semaphore mutex = new Semaphore(1);
/** * state为0表示正在思考(thinking),1表示准备就餐(hungry),2表示正在就餐(eating) */ private int[] states = new int[count];
private Semaphore[] forks = new Semaphore[count];
{ for (int i = 0; i < forks.length; i++) { forks[i] = new Semaphore(0); states[i] = 0; } }
private int left(int i) { return i; }
private int right(int i) { return (i + 1) % forks.length; }
private void test(int i) { // 如果自己准备就餐且两边的人都不在就餐,则自己可以就餐 if(states[i] == 1 && states[left(i)] != 2 && states[right(i)] != 2) { states[i] = 2; forks[i].release(); } }
private void getForks(int i) throws InterruptedException { mutex.acquire(); states[i] = 1; test(i); mutex.release(); forks[i].acquire(); }
private void putForks(int i) throws InterruptedException { mutex.acquire(); states[i] = 0; test(right(i)); test(left(i)); mutex.release(); }
|
这种解存在的主要问题是会发生饥饿,比如2个哲学家的情况下,可能1号会一直处于就餐状态,2号一直处于循环检测的状态,于是就发生了饥饿。
Cigarette smokers
The dining savages(野人就餐)
The barbershop(理发师问题)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
| #define N 10 //最多10个顾客 typedef struct queue{ int nums[N]; int front; int rear; }queue; int isFull(queue *q){ return q->front + 1 == q->rear; } int isEmpty(queue *q){ return q->front == q->rear; } //返回顾客标志 int dequeue(queue *q){ if(isEmpty(q)){ puts("error: empty!"); return 0; } else{ int num = q->nums[q->rear]; q->rear = (q->rear + 1) % N;
return num; } } void enqueue(queue *q, int num){ if(isFull(q)){ puts("error: full queue!"); return ; } else{ q->nums[q->front] = num; q->front = (q->front + 1) % N; q->front++; } }
//顾客每次等待,若队列已满将会被忽略 void customerWait(semaphore *barber, int num){ if(barber->busy == no){ barber->busy = yes; enqueue(&barber->customers, num); printf("这个顾客开始接受服务"); } else{ //将当前顾客加入等待队列,似乎不是原子操作? enqueue(&barber->customers, num); printf("加入顾客%d", num); } } //理发师每次等待新用户, void barberSignal(semaphore *barber){ if(isEmpty(&barber->customers)){ barber->busy = no; } else{ int num = dequeue(&barber->customers); printf("顾客%d开始交易", num); sleep(3000);//每个顾客睡三秒 printf("结束交易"); } }
/* 用一个主函数开启一个理发师进程,理发师进程等待新顾客,对每一个顾客sleep(3000)作为服务时间,然后signal, 主函数等待用户输入用户id,对每一个用户id开启一个进程, */
semaphore barber; void simulate(){ barber.busy = no; barber.customers.front = barber.customers.rear = 0; int pid = fork(); if(pid == 0){ //说明是理发师 while(1){ barberSignal(&barber); } } else{ int num; while(1){ scanf("%d", &num); printf("%d", num); //int pid1 = fork(); //if(pid1 != 0){ //顾客进程 customerWait(&barber, num); // printf("哈哈哈"); // return ; //} //父进程继续运行 } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| #include <stdio.h> #include <pthread.h> #include <semaphore.h>
#define N 10 //最多10个顾客 /***********************信号量**************************/ sem_t barbers; sem_t customers; sem_t mutex; int customerCount = 0;
void haircut(){ printf("理发师剪头中\n"); sleep(3);//服务时间 } void get_haircut(){ sleep(3); } void *barber(void *arg){ while(1){ if(customerCount == 0){ printf("理发师打瞌睡\n"); sem_wait(&customers);//等顾客 printf("理发师被叫醒了\n"); } else{ sem_wait(&customers); }
sem_post(&barbers);//唤醒理发师 haircut();//开始服务 } } void *customer(void *arg){ while(1){ if(customerCount > 0){ printf("顾客等理发师\n"); sem_wait(&barbers);//等理发师 sem_wait(&mutex); customerCount--; sem_post(&mutex); get_haircut();//接受服务 printf("理完头这个顾客离开了\n"); } } } void *customer_arrive(void *arg){ int num; while(1){ scanf("%d", &num); printf("来了一个顾客\n"); sem_wait(&mutex); if(customerCount < N){//如果还有空位 sem_post(&mutex); sem_post(&customers);//添加一个顾客资源 customerCount++;//顾客增加 } else{ sem_post(&mutex); printf("没椅子了,顾客离开了\n"); } } }
void simulate(){ pthread_t barber_t, customer_t, customer_arrive_t; if(sem_init(&barbers, 0, 1) != 0){ printf("sem init failed"); } if(sem_init(&customers, 0, 0) != 0){ printf("sem init failed"); } if(sem_init(&mutex, 0, 1) != 0){ printf("sem init failed"); }
printf("begin barber_t\n"); pthread_create(&barber_t, NULL, barber, NULL); printf("begin customer_t\n"); pthread_create(&customer_t, NULL, customer, NULL); printf("begin customer_arrive_t\n"); pthread_create(&customer_arrive_t, NULL, customer_arrive, NULL); while(1){} }
int main(void ){ simulate();
return 0; }
|
卖票问题
一个火车站有多个窗口,它们同时卖票,而票数使用一个 ticket 变量进行计算,对票数有查询和修改两个操作,这两个操作不能同时进行,并且写操作可能不是原子的,两个写操作也不能同时进行
- 使用 Atom 类型来保存票数,这样写之间就不需要进行同步了
参考
- The Little Book of Semaphores