Java并发编程基础(线程间通信)
一、volatile和synchronized关键字
Java支持多个线程同时访问一个对象或者对象的成员变量,由于每一个线程可以拥有这个变量的拷贝,所以程序在执行过程中,一个线程看到的变量并不一定是最新的。
关键字volatile可以用来修饰字段,就是告诉程序任何对该变量的访问均需要从共享内存中获取,而对它的修改必须同步刷新回共享内存,他能保证所有线程对变量访问的可见性。过度使用volatile是不必要的,因为会降低程序执行的效率。
关键字synchronized可以修饰方法或者是同步块的形式来进行使用,他主要确保多个线程在同一时刻,只能有一个线程楚瑜方法或者同步块中,他保证了线程对变量访问的可见性和排他性。示例:
从图中class信息中,对于同步块的实现使用了monitorenter和monitorexit指令,而同步方法则是依靠方法修饰符上的ACC_SYNCHRONIZED来完成的。无论采用哪种方式,其本质是对一个对象的监视器(monitor)进行获取,而这个获取过程是排他的,也就是同一时刻只能有一个线程获取到由synchronized所保护对象的监视器。
任意一个对象都拥有自己的监视器,当这个对象由同步块或者这个对象的同步方法调用时,执行方法的线程必须先获取到该对象的监视器才能进入同步块或者同步方法,而没有获取到监视器的线程将会被阻塞在同步块和同步方法的入口处,进入BLOCKED状态。下图描述对象、对象的监视器、同步队列和执行线程之间的关系:
任意线程对Object(Object由synchronized保护)的访问,首先要获得Object的监视器。如果获取失败,线程进入同步队列,线程状态变为BOLCKED。当访问Object的前驱(获得了锁的线程)释放了锁,则该释放操作唤醒阻塞在同步队列中的线程,使其重新尝试对监视器的获取。
二、等待/通知机制
一个线程修改了一个对象的值,而另一个线程感知到变化,然后就行相关的操作,整个过程开始于一个线程,而最终执行的又是另一个线程。前者是生产者,后者是消费者,这种模式隔离了“做什么”和“怎么做”,在功能层面上实现了解耦,体系结构上具备良好的伸缩性,但是在Java语言中如何实现类似功能呢?
简单的办法就是让消费者线程不断循环检查变量是否符合预期,如下代码:
while(value!=desire){
Thread.sleep(1000)
}
doSomething();
while循环中设置不满足的条件,如果条件满足则退出while循环,从而完成消费者的工作。睡眠一段时间,这样做的目的是防止过快的“无效”尝试,这种方式看似能够实现所需的功能,但是却存在如下问题:
- 难以确保及时性。在睡眠时,基本不消耗处理器资源,但是过久睡眠,就不能及时发现已经变化的条件,也就是及时性难以保证。
- 难以降低开销,如果降低睡眠时间,这样消费者能够更加迅速的发现条件变化,但是却可能消耗更多的处理器资源,造成无端的浪费。
等待\通知的相关方法是任意Java对象都具备的,因为这些方法被定义在所有对象的超类java.lang.Object上:
下面代码中,创建了两个线程 WaitThread和NotifyThread,前者检查flag值是否为false,如果符合要求,进行后续操作,否则在lock上等待,后者在睡眠一段时间后对lock进行通知:
package com.wholesmart.thread4;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
public class WaitNotify {
static boolean flag = true;
static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
Thread waitThread = new Thread(new Wait(), "WaitThread");
waitThread.start();
TimeUnit.SECONDS.sleep(1);
Thread notifyThread = new Thread(new Notify(), "NotifyThread");
notifyThread.start();
}
static class Wait implements Runnable {
@Override
public void run() {
// 加锁,拥有lock的Monitor
synchronized (lock) {
// 当条件不满足时,继续wait,同时释放lock的锁
while (flag) {
try {
System.out.println(Thread.currentThread() + " flag is true. wait@ "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
lock.wait();
} catch (InterruptedException e) {
}
}
// 条件满足,完成工作
System.out.println(Thread.currentThread() + " flag is false. running@ "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
}
}
}
static class Notify implements Runnable {
@Override
public void run() {
// 加锁,拥有lock的Monitor
synchronized (lock) {
while (flag) {
// 获取lock的锁,然后进行通知,通知时不会释放lock的锁
// 直到当前线程释放了lock后,WaitThread才能从wait方法中返回
System.out.println(Thread.currentThread() + " hold lock. notify@ "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
lock.notifyAll();
flag = false;
SleepUtils.second(5);
}
// 再次加锁
synchronized (lock) {
System.out.println(Thread.currentThread() + " hold lock again. sleep@ "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
SleepUtils.second(5);
}
}
}
}
}
输出如下(顺序可能会变):
Thread[WaitThread,5,main] flag is true. wait@ 10:27:16
Thread[NotifyThread,5,main] hold lock. notify@ 10:27:17
Thread[NotifyThread,5,main] hold lock again. sleep@ 10:27:22
Thread[WaitThread,5,main] flag is false. running@ 10:27:27
上述示例说明在使用wait()、notifyAll()、notify()时需要注意如下细节:
- 使用wait()、notifyAll()和notify()时需要先对调用对象加锁。
- 调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列。
- notifyAll()、notify()方法调用后,等待线程依旧不会从wait()返回,需要调用notifyAll()、notify()的线程释放锁之后,等待线程才有机会从wait()返回。
- notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而notifyAll()方法则是将等待队列中所有的线程全部移动到同步队列,被移动的线程状态由WAITING变为BLOCKED。
- 从wait()方法返回的前提是获得了调用对象的锁。
在上图中,WaitThread首先获取了对象的锁,然后调用对象的wait()方法,从而放弃了锁并进入了对象的等待队列WaitQueue中,进入等待状态。由于WaitThread释放了对象的锁,NotifyThread随后获取了对象锁,并调用对象的notify()方法,将WaitThread从WaitQueue移到SynchronizedQueue中,此时WaitThread的状态变为阻塞状态。NotifyThread释放了锁之后,WaitThread再次获取到锁并从wait()方法返回继续执行。
三、等待/通知的经典范式
等待方遵循的原则:
- 获取对象的锁
- 如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件。
- 条件满足则执行对应的逻辑。
对应伪代码:
synchronized(对象){
while(条件不满足){
对象.wait();
}
对应的处理逻辑
}
通知方遵循如下原则:
- 获取对象的锁。
- 改变条件。
- 通知所有等待在对象上的线程。
对应伪代码:
synchronized(对象){
改变条件
对象.notifyAll();
}
四、管道输入/输出流
管道输入/输出流主要用于线程之间的数据传输,传输媒介为内存。包括4中具体实现:
PipedWriter、PipedReader用于处理字符数据,PipedOutputStream、PipedInputStream用于处理字节数据。示例代码:
package com.wholesmart.thread4;
import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
public class Piped {
public static void main(String[] args) throws IOException {
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();
// 将输入和输出流连接
out.connect(in);
Thread printThread = new Thread(new Print(in), "PrintThread");
printThread.start();
int receive = 0;
try {
while ((receive = System.in.read()) != -1) {
out.write(receive);
}
} finally {
out.close();
}
}
static class Print implements Runnable {
private PipedReader in;
public Print(PipedReader in) {
this.in = in;
}
@Override
public void run() {
int receive = 0;
try {
while ((receive = in.read()) != -1) {
System.out.print((char) receive);
}
} catch (IOException ex) {
}
}
}
}
运行示例,在控制台输入一串字符,程序会在控制台原样输出:
Hello world
Hello world
注意:对于Piped类型的流,必须要进行绑定,也就是调用connect()方法,如果没有将输入/输出绑定起来,对于该流的访问将会抛出异常。
五、Thread.join()的使用
如果一个线程A执行了thread.join()语句,其含义是:当前线程A等待thread线程终止之后才能从thread.join()返回。线程Thread除了提供join()方法之外,还提供了join(long millis)和join(long millis,int nanos)两个具备超时特性的方法。这两个超时方法表示,如果线程thread在给定的超时时间里没有终止,那么将会从该超时方法中返回。以下示例中,创建10个线程,编号0–9,每一个线程调用前一个线程的join()方法,也就是线程0结束了,线程1才能从join方法中返回,而线程0需要等待main线程结束:
package com.wholesmart.thread4;
import java.util.concurrent.TimeUnit;
public class Join {
public static void main(String[] args) throws InterruptedException {
Thread previous = Thread.currentThread();
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(new Domino(previous));
thread.start();
previous = thread;
}
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + " terminate.");
}
static class Domino implements Runnable {
private Thread thread;
public Domino(Thread thread) {
this.thread = thread;
}
@Override
public void run() {
try {
thread.join();
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread().getName() + " terminate.");
}
}
}
输出如下:
main terminate.
Thread-0 terminate.
Thread-1 terminate.
Thread-2 terminate.
Thread-3 terminate.
Thread-4 terminate.
Thread-5 terminate.
Thread-6 terminate.
Thread-7 terminate.
Thread-8 terminate.
Thread-9 terminate.
下图是JDK源码(进行了部分调整):
六、ThreadLocal的使用
使用方式代码,该代码可以用于统计方法执行耗时:
package com.wholesmart.thread4;
import java.util.concurrent.TimeUnit;
public class Profiler {
private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<>();
protected Long initialValue() {
return System.currentTimeMillis();
}
public static final void begin() {
TIME_THREADLOCAL.set(System.currentTimeMillis());
}
public static final long end() {
return System.currentTimeMillis() - TIME_THREADLOCAL.get();
}
public static void main(String[] args) throws InterruptedException {
Profiler.begin();
TimeUnit.SECONDS.sleep(1);
System.out.println("Cost: " + Profiler.end() + " mills.");
}
}
修道的人,大多数都把道的境界,先由自己的主观观念,建立起一个至真、至善、至美的构想。也可以说是自己首先建立起一个道的幻境,妄自追求。其实,一存此念,早已离道太远了。