Java编程思想之二十 并发
- 2019 年 10 月 3 日
- 笔记
20.1 并发得多面性
并发编程令人困惑的一个主要原因:使用并发时需要解决的问题有多个,而实现并发的方法也有多种,并且在这两者之间没有明显的映射关系。
20.1.1 更快的执行
速度问题初听起来很简单:如果你需要一个程序运行得更快,那么可以将起断开为多个片段,在单个处理器上运行每个片段。
并发通常是提高运行在单个处理器上的程序的性能,但在单个处理器上运行的并发程序开销确实应该比该程序所有部分都顺序执行开销大,因为其中增加了所谓的上下文切换的代价。
如果没有任务会阻塞,那么在单处理器上使用并发就没有任何意义。
在单处理器系统中的性能提高常见示例是事件驱动的编程。
Java采取的是在顺序语言的基础上提供对线程的支持。与在多任务操作系统中分叉进程不同,线程机制是在由执行程序表示的单一进程中创建任务。
20.1.2 改进代码设计
协作多线程:Java的线程机制是抢占式的,这表示调度机制周期性的中断线程,将上下文切换到另一个线程,从而为每个线程都提供时间片,使得每个线程都会分配到数量合理得时间去驱动它得任务。在协作式系统中,每个任务都会自动得放弃控制,这要求程序员要有意识得插入某种类型得让步语句。协作式系统得优势是双重得:上下文切换的开销通常比抢占式要少得多,并且对可以同时执行的线程数量在理论上没有任何限制。
20.2 基本的线程机制
通过使用多线程机制,这些独立任务中的每一个将由执行线程来驱动,一个线程就是在进程中的一个单一顺序控制流,当个进程可以拥有多个并发执行的任务。
20.2.1 定义任务
线程可以驱动任务,因此你需要一种描述任务的方式,这可以由Runnable接口来提供。要想定义任务,只需实现Runnable接口并编写run(0方法,使得该任务可以执行你的命令。
public class LiftOff implements Runnable { protected int countDown = 10; // Default private static int taskCount = 0; private final int id = taskCount++; public LiftOff() {} public LiftOff(int countDown) { this.countDown = countDown; } public String status() { return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), "; } public void run() { while(countDown-- > 0) { System.out.print(status()); Thread.yield(); } } } ///:~
Thread.yield()的调用是对线程调度器的以后在哪个建议,它声明:我已经执行完生命周期中最重要的部分了,此刻正是切换给其他任务执行一段时间的时机了。
public class MainThread { public static void main(String[] args) throws InterruptedException { LiftOff launch = new LiftOff(); launch.run(); } } /* Output: #0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), *///:~
当从Runnable导出一个类时,它必须具有run()方法,但是这个方法并无特殊之处——它不会产生内在的线程能力。要实现线程行为,你必须显示的将一个任务附着在线程上。
20.2.2 Thread类
将Runnable对象转变为工作任务的传统方式是把它提交给一个Thread构造器:
public class BasicThreads { public static void main(String[] args) { Thread t = new Thread(new LiftOff()); t.start(); System.out.println("Waiting for LiftOff"); } } /* Output: (90% match) Waiting for LiftOff #0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), *///:~
可以添加更多的线程去驱动更多的任务。
public class MoreBasicThreads { public static void main(String[] args) { for(int i = 0; i < 5; i++) new Thread(new LiftOff()).start(); System.out.println("Waiting for LiftOff"); } } /* Output: (Sample) Waiting for LiftOff #0(9), #1(9), #2(9), #3(9), #4(9), #0(8), #1(8), #2(8), #3(8), #4(8), #0(7), #1(7), #2(7), #3(7), #4(7), #0(6), #1(6), #2(6), #3(6), #4(6), #0(5), #1(5), #2(5), #3(5), #4(5), #0(4), #1(4), #2(4), #3(4), #4(4), #0(3), #1(3), #2(3), #3(3), #4(3), #0(2), #1(2), #2(2), #3(2), #4(2), #0(1), #1(1), #2(1), #3(1), #4(1), #0(Liftoff!), #1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!), *///:~
当main()创建Thread对象时,它并没有捕获任何对这些对象的引用。每个Thread都注册了它自己,因此确实有一个对它的引用,而且在它的任务推出其run()并死亡之前,垃圾回收期无法清除它。
20.2.3 使用Executor
Java SE5的jav.util.concurrent包中的执行器(Executor)将为你管理Thread对象,从而简化了并发编程。Executor在客户端和任务执行之间提供了一个间接层;与客户端直接执行任务不同,这个中介对象将执行任务。Executor允许你管理异步任务的执行,而无须显示的管理线程的声明周期。
我们可以使用Executor来代替Thread对象。
import java.util.concurrent.*; public class CachedThreadPool { public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++) exec.execute(new LiftOff()); exec.shutdown(); } } /* Output: (Sample) #0(9), #0(8), #1(9), #2(9), #3(9), #4(9), #0(7), #1(8), #2(8), #3(8), #4(8), #0(6), #1(7), #2(7), #3(7), #4(7), #0(5), #1(6), #2(6), #3(6), #4(6), #0(4), #1(5), #2(5), #3(5), #4(5), #0(3), #1(4), #2(4), #3(4), #4(4), #0(2), #1(3), #2(3), #3(3), #4(3), #0(1), #1(2), #2(2), #3(2), #4(2), #0(Liftoff!), #1(1), #2(1), #3(1), #4(1), #1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!), *///:~
单个的Executor被用来创建和管理系统中所有任务。
对shutdown()方法的调用可以防止新任务被提交给这个Executor,当前线程将继续运行在shutdown()被调用之前提交所有任务。
FixedThreadPool使用了有限的线程集来执行所提交的任务:
import java.util.concurrent.*; public class SingleThreadExecutor { public static void main(String[] args) { ExecutorService exec = Executors.newSingleThreadExecutor(); for(int i = 0; i < 5; i++) exec.execute(new LiftOff()); exec.shutdown(); } } /* Output: #0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), #1(9), #1(8), #1(7), #1(6), #1(5), #1(4), #1(3), #1(2), #1(1), #1(Liftoff!), #2(9), #2(8), #2(7), #2(6), #2(5), #2(4), #2(3), #2(2), #2(1), #2(Liftoff!), #3(9), #3(8), #3(7), #3(6), #3(5), #3(4), #3(3), #3(2), #3(1), #3(Liftoff!), #4(9), #4(8), #4(7), #4(6), #4(5), #4(4), #4(3), #4(2), #4(1), #4(Liftoff!), *///:~
有了FixedThreadPool,就可以一次性预先执行代价高昂的线程分配,因而也就可以限制线程的数量了。
CachedThreadPool在程序执行过程中通常会创建于所徐数量相同的线程,然后再它回收旧线程时停止创建新的线程,因此它是合理的Executor首选。只有当这种方式会引发问题时,才需要切换到FixedThreadPool。
SingleThreadExecutor就像是线程数量为1的FixedThreadPool。
SingleThreadExecutor会序列化所有提交给它的任务,并会维护它自己的悬挂任务队列。
import java.util.concurrent.*; public class SingleThreadExecutor { public static void main(String[] args) { ExecutorService exec = Executors.newSingleThreadExecutor(); for(int i = 0; i < 5; i++) exec.execute(new LiftOff()); exec.shutdown(); } } /* Output: #0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), #1(9), #1(8), #1(7), #1(6), #1(5), #1(4), #1(3), #1(2), #1(1), #1(Liftoff!), #2(9), #2(8), #2(7), #2(6), #2(5), #2(4), #2(3), #2(2), #2(1), #2(Liftoff!), #3(9), #3(8), #3(7), #3(6), #3(5), #3(4), #3(3), #3(2), #3(1), #3(Liftoff!), #4(9), #4(8), #4(7), #4(6), #4(5), #4(4), #4(3), #4(2), #4(1), #4(Liftoff!), *///:~
20.2.4 从任务中产生返回值
Runnable是执行工作的独立任务,但是它不返回任何值。如果你希望任务再完成时能够返回一个值,可以实现Callable接口而不是Runnable接口。
//: concurrency/CallableDemo.java import java.util.concurrent.*; import java.util.*; class TaskWithResult implements Callable<String> { private int id; public TaskWithResult(int id) { this.id = id; } public String call() { return "result of TaskWithResult " + id; } } public class CallableDemo { public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); ArrayList<Future<String>> results = new ArrayList<Future<String>>(); for(int i = 0; i < 10; i++) results.add(exec.submit(new TaskWithResult(i)));//将产生Future对象 for(Future<String> fs : results) try { // get() blocks until completion: System.out.println(fs.get()); } catch(InterruptedException e) { System.out.println(e); return; } catch(ExecutionException e) { System.out.println(e); } finally { exec.shutdown(); } } } /* Output: result of TaskWithResult 0 result of TaskWithResult 1 result of TaskWithResult 2 result of TaskWithResult 3 result of TaskWithResult 4 result of TaskWithResult 5 result of TaskWithResult 6 result of TaskWithResult 7 result of TaskWithResult 8 result of TaskWithResult 9 *///:~
还可以使用isDone判断是否执行完成,如果不调用isDone,那个如果没有完成,get会被阻塞。
20.2.5 休眠
影响任务行为的一种简单方式是调用sleep(),这将使任务中止执行给定的时间。
//: concurrency/SleepingTask.java // Calling sleep() to pause for a while. import java.util.concurrent.*; public class SleepingTask extends LiftOff { public void run() { try { while(countDown-- > 0) { System.out.print(status()); // Old-style: Thread.sleep(100); // Java SE5/6-style: //TimeUnit.MILLISECONDS.sleep(100); } } catch(InterruptedException e) { System.err.println("Interrupted"); } } public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++) exec.execute(new SleepingTask()); exec.shutdown(); } } /* Output: #0(9), #1(9), #2(9), #3(9), #4(9), #0(8), #1(8), #2(8), #3(8), #4(8), #0(7), #1(7), #2(7), #3(7), #4(7), #0(6), #1(6), #2(6), #3(6), #4(6), #0(5), #1(5), #2(5), #3(5), #4(5), #0(4), #1(4), #2(4), #3(4), #4(4), #0(3), #1(3), #2(3), #3(3), #4(3), #0(2), #1(2), #2(2), #3(2), #4(2), #0(1), #1(1), #2(1), #3(1), #4(1), #0(Liftoff!), #1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!), *///:~
异常不能跨线程传播回main(),所以你必须在本地处理所有在任务内部产生的异常。
20.2.6 优先级
线程的优先级将该线程的重要性传递给调度器。
优先级较低的线程仅仅是执行的频率较低。
在对大多数时间里,所有线程都应该以默认的优先级运行,试图操作线程的优先级通常是一种错误。
//: concurrency/SimplePriorities.java // Shows the use of thread priorities. import java.util.concurrent.*; public class SimplePriorities implements Runnable { private int countDown = 5; private volatile double d; // No optimization private int priority; public SimplePriorities(int priority) { this.priority = priority; } public String toString() { return Thread.currentThread() + ": " + countDown; } public void run() { Thread.currentThread().setPriority(priority);//设置当前线程优先级,使用getPriority获取当前优先级 while(true) { // An expensive, interruptable operation: for(int i = 1; i < 100000; i++) { d += (Math.PI + Math.E) / (double)i; if(i % 1000 == 0) Thread.yield(); } System.out.println(this); if(--countDown == 0) return; } } public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++) exec.execute( new SimplePriorities(Thread.MIN_PRIORITY)); exec.execute( new SimplePriorities(Thread.MAX_PRIORITY)); exec.shutdown(); } } /* Output: (70% match) Thread[pool-1-thread-6,10,main]: 5 Thread[pool-1-thread-6,10,main]: 4 Thread[pool-1-thread-6,10,main]: 3 Thread[pool-1-thread-6,10,main]: 2 Thread[pool-1-thread-6,10,main]: 1 Thread[pool-1-thread-3,1,main]: 5 Thread[pool-1-thread-2,1,main]: 5 Thread[pool-1-thread-1,1,main]: 5 Thread[pool-1-thread-5,1,main]: 5 Thread[pool-1-thread-4,1,main]: 5 ... *///:~
20.2.7 让步
当调用yieId()时,你也是在建议具有相同优先级的其他线程可以运行。
大体上,对于任何重要的控制或在调用整个应用时,都不能依赖yieId(),实际上,yieId()经常被误用。
21.2.8 后台线程
所谓后台线程,是指在程序运行的时候在后台提供一种通用服务的线程,并且这种线程并不属于程序中不可或缺的部分。当所有非后台线程结束时,程序也就终止了,同时会杀死进程中的所有后台线程。
//: concurrency/SimpleDaemons.java // Daemon threads don't prevent the program from ending. import java.util.concurrent.*; import static net.mindview.util.Print.*; public class SimpleDaemons implements Runnable { public void run() { try { while(true) { TimeUnit.MILLISECONDS.sleep(100); print(Thread.currentThread() + " " + this); } } catch(InterruptedException e) { print("sleep() interrupted"); } } public static void main(String[] args) throws Exception { for(int i = 0; i < 10; i++) { Thread daemon = new Thread(new SimpleDaemons()); daemon.setDaemon(true); // 必须在线程被调用之前设置setDaemon daemon.start(); } print("All daemons started"); TimeUnit.MILLISECONDS.sleep(175); } } /* Output: (Sample) All daemons started Thread[Thread-0,5,main] SimpleDaemons@530daa Thread[Thread-1,5,main] SimpleDaemons@a62fc3 Thread[Thread-2,5,main] SimpleDaemons@89ae9e Thread[Thread-3,5,main] SimpleDaemons@1270b73 Thread[Thread-4,5,main] SimpleDaemons@60aeb0 Thread[Thread-5,5,main] SimpleDaemons@16caf43 Thread[Thread-6,5,main] SimpleDaemons@66848c Thread[Thread-7,5,main] SimpleDaemons@8813f2 Thread[Thread-8,5,main] SimpleDaemons@1d58aae Thread[Thread-9,5,main] SimpleDaemons@83cc67 ... *///:~
必须在线程启动之前调用setDaemom()方法,才能把它设置为后台线程。
通过编写定制的ThreadFactory可以定制由Executor创建的线程的属性:
package net.mindview.util; import java.util.concurrent.ThreadFactory; public class DaemonThreadFactory implements ThreadFactory { public DaemonThreadFactory() { } public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } }
//: concurrency/DaemonFromFactory.java // Using a Thread Factory to create daemons. import java.util.concurrent.*; import net.mindview.util.*; import static net.mindview.util.Print.*; public class DaemonFromFactory implements Runnable { public void run() { try { while(true) { TimeUnit.MILLISECONDS.sleep(100); print(Thread.currentThread() + " " + this); } } catch(InterruptedException e) { print("Interrupted"); } } public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool( new DaemonThreadFactory()); for(int i = 0; i < 10; i++) exec.execute(new DaemonFromFactory()); print("All daemons started"); TimeUnit.MILLISECONDS.sleep(500); // Run for a while } } /* (Execute to see output) *///:~
可以通过调用isDaemon()方法来确定线程是否是一个后台线程。如果是一个后台线程,那么它创建的任何线程将被自动设置成后台线程:
// Using a Thread Factory to create daemons. import java.util.concurrent.*; import net.mindview.util.*; import static net.mindview.util.Print.*; public class DaemonFromFactory implements Runnable { public void run() { try { while(true) { TimeUnit.MILLISECONDS.sleep(100); print(Thread.currentThread() + " " + this); } } catch(InterruptedException e) { print("Interrupted"); } } public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool( new DaemonThreadFactory()); for(int i = 0; i < 10; i++) exec.execute(new DaemonFromFactory()); print("All daemons started"); TimeUnit.MILLISECONDS.sleep(500); // Run for a while } } /* (Execute to see output) *///:~
后台进程在不执行finaiiy子句的情况下就会终止其run()方法:
//: concurrency/DaemonsDontRunFinally.java // Daemon threads don't run the finally clause import java.util.concurrent.*; import static net.mindview.util.Print.*; class ADaemon implements Runnable { public void run() { try { print("Starting ADaemon"); TimeUnit.SECONDS.sleep(1); } catch(InterruptedException e) { print("Exiting via InterruptedException"); } finally { print("This should always run?"); } } } public class DaemonsDontRunFinally { public static void main(String[] args) throws Exception { Thread t = new Thread(new ADaemon()); t.setDaemon(true); t.start(); } } /* Output: Starting ADaemon *///:~
如果你注释调对setDaemon()的调用,就会看到finally子句将会执行。
当最后一个非后台线程终止时,后台线程会突然终止。因此一旦main()退出,JVM就会立即关闭所有后台线程。因为不能以优雅的方式来关闭后台线程,所以它们几乎不是一种好的思想。非后台的Executor通常是一种更好的方法,它控制的所有任务都可以同时被关闭,关闭将以有序的方式执行。
20.2.9 编码的变体
使用直接从Thread继承这种可替代的方式:
//: concurrency/SimpleThread.java // Inheriting directly from the Thread class. public class SimpleThread extends Thread { private int countDown = 5; private static int threadCount = 0; public SimpleThread() { // Store the thread name: super(Integer.toString(++threadCount)); start(); } public String toString() { return "#" + getName() + "(" + countDown + "), "; } public void run() { while(true) { System.out.print(this); if(--countDown == 0) return; } } public static void main(String[] args) { for(int i = 0; i < 5; i++) new SimpleThread(); } } /* Output: #1(5), #1(4), #1(3), #1(2), #1(1), #2(5), #2(4), #2(3), #2(2), #2(1), #3(5), #3(4), #3(3), #3(2), #3(1), #4(5), #4(4), #4(3), #4(2), #4(1), #5(5), #5(4), #5(3), #5(2), #5(1), *///:~
通过调用适当的Thread构造器为Thread对象赋予具体的名称,这个名称可以通过使用GetName()的toString()中获得。
惯用法是自管理的Runnable:
public class SelfManaged implements Runnable { private int countDown = 5; private Thread t = new Thread(this);//传入当前对象 public SelfManaged() { t.start(); } public String toString() { return Thread.currentThread().getName() + "(" + countDown + "), "; } public void run() { while(true) { System.out.print(this); if(--countDown == 0) return; } } public static void main(String[] args) { for(int i = 0; i < 5; i++) new SelfManaged(); } } /* Output: Thread-0(5), Thread-0(4), Thread-0(3), Thread-0(2), Thread-0(1), Thread-1(5), Thread-1(4), Thread-1(3), Thread-1(2), Thread-1(1), Thread-2(5), Thread-2(4), Thread-2(3), Thread-2(2), Thread-2(1), Thread-3(5), Thread-3(4), Thread-3(3), Thread-3(2), Thread-3(1), Thread-4(5), Thread-4(4), Thread-4(3), Thread-4(2), Thread-4(1), *///:~
这里实现接口使得你可以继承另一个不同的类。
通过使用内部类来将线程代码隐藏在类中:
//: concurrency/ThreadVariations.java // Creating threads with inner classes. import java.util.concurrent.*; import static net.mindview.util.Print.*; // Using a named inner class: class InnerThread1 {//创建一个扩展自Thread的匿名内部类 private int countDown = 5; private Inner inner; private class Inner extends Thread { Inner(String name) { super(name); start(); } public void run() { try { while (true) { print(this); if (--countDown == 0) return; sleep(10); } } catch (InterruptedException e) { print("interrupted"); } } public String toString() { return getName() + ": " + countDown; } } public InnerThread1(String name) {//创建这个内部类的实例 inner = new Inner(name); } } // Using an anonymous inner class: class InnerThread2 { private int countDown = 5; private Thread t; public InnerThread2(String name) {//可替换方式:在构造器中创建了一个匿名的Thread子类,并且将其向上转型为Thread引用t。 t = new Thread(name) { public void run() { try { while (true) { print(this); if (--countDown == 0) return; sleep(10); } } catch (InterruptedException e) { print("sleep() interrupted"); } } public String toString() { return getName() + ": " + countDown; } }; t.start(); } } // Using a named Runnable implementation: class InnerRunnable1 { private int countDown = 5; private Inner inner; private class Inner implements Runnable { Thread t; Inner(String name) { t = new Thread(this, name); t.start(); } public void run() { try { while (true) { print(this); if (--countDown == 0) return; TimeUnit.MILLISECONDS.sleep(10); } } catch (InterruptedException e) { print("sleep() interrupted"); } } public String toString() { return t.getName() + ": " + countDown; } } public InnerRunnable1(String name) { inner = new Inner(name); } } // Using an anonymous Runnable implementation: class InnerRunnable2 { private int countDown = 5; private Thread t; public InnerRunnable2(String name) { t = new Thread(new Runnable() { public void run() { try { while (true) { print(this); if (--countDown == 0) return; TimeUnit.MILLISECONDS.sleep(10); } } catch (InterruptedException e) { print("sleep() interrupted"); } } public String toString() { return Thread.currentThread().getName() + ": " + countDown; } }, name); t.start(); } } // A separate method to run some code as a task: class ThreadMethod {//在方法内部创建线程 private int countDown = 5; private Thread t; private String name; public ThreadMethod(String name) { this.name = name; } public void runTask() { if (t == null) { t = new Thread(name) { public void run() { try { while (true) { print(this); if (--countDown == 0) return; sleep(10); } } catch (InterruptedException e) { print("sleep() interrupted"); } } public String toString() { return getName() + ": " + countDown; } }; t.start(); } } } public class ThreadVariations { public static void main(String[] args) { new InnerThread1("InnerThread1"); new InnerThread2("InnerThread2"); new InnerRunnable1("InnerRunnable1"); new InnerRunnable2("InnerRunnable2"); new ThreadMethod("ThreadMethod").runTask(); } } /* (Execute to see output) *///:~
20.2.10 术语
你对Thread类实际没有任何控制权。你创建任务,并通过某种方式将一个线程附着到任务上,以使得这个线程可以驱动任务。
Java的线程机制基于来自C的低级的p线程方式,这是一种你必须深入研究,并且需要完全理解其所有细节的方式。
20.2.11 加入一个线程
一个线程可以在其他线程上调用join()方法,其效果是等待一段时间知道第二个线程结束才继续执行。
如果某个线程在另一个线程t上调用t.join(),此线程将被挂起,知道目标线程t结束才恢复。
也可以调用join()时带上一个超时参数,这样如果目标线程在这段时间到期时还没有结束的话,join()方式总能返回。
对join()方法的调用可以被中断,做法时在调用线程上调用interrupt()方法。
//: concurrency/Joining.java // Understanding join(). import static net.mindview.util.Print.*; class Sleeper extends Thread { private int duration; public Sleeper(String name, int sleepTime) { super(name); duration = sleepTime; start(); System.out.println(name); } public void run() { try { sleep(duration); } catch(InterruptedException e) { print(getName() + " was interrupted. " + "isInterrupted(): " + isInterrupted()); return; } print(getName() + " has awakened"); } } class Joiner extends Thread { private Sleeper sleeper; public Joiner(String name, Sleeper sleeper) { super(name); this.sleeper = sleeper; start(); System.out.println(name); } public void run() { try { sleeper.join(); } catch(InterruptedException e) { print("Interrupted"); } print(getName() + " join completed"); } } public class Joining { public static void main(String[] args) { Sleeper sleepy = new Sleeper("Sleepy", 1500), grumpy = new Sleeper("Grumpy", 1500); Joiner dopey = new Joiner("Dopey", sleepy), doc = new Joiner("Doc", grumpy); grumpy.interrupt(); } } /* Output: Grumpy was interrupted. isInterrupted(): false Doc join completed Sleepy has awakened Dopey join completed *///:~
Joiner线程将通过在Sleeper对象上调用join()方法来等待Sleeper醒来。在main()里面,每个Sleeper都有一个Joiner,这个可以在输出中发现,如果Sleeper被中断或者是正常结束,Joiner将和Sleeper一同结束。
20.2.12 创建有响应的用户界面
使用线程的动机之一就是建立有响应的用户界面:
//: concurrency/ResponsiveUI.java // User interface responsiveness. // {RunByHand} class UnresponsiveUI { private volatile double d = 1; public UnresponsiveUI() throws Exception { while(d > 0) d = d + (Math.PI + Math.E) / d; System.in.read(); // Never gets here } } public class ResponsiveUI extends Thread { private static volatile double d = 1; public ResponsiveUI() { setDaemon(true); start(); } public void run() { while(true) { d = d + (Math.PI + Math.E) / d; } } public static void main(String[] args) throws Exception { //new UnresponsiveUI(); // Must kill this process new ResponsiveUI();//作为后台运行的同时,还在等待用户的输入 System.in.read(); System.out.println("aaaaaa"); System.out.println(d); // Shows progress } } ///:~
20.2.13 线程组
线程组持有一个线程集合
20.2.14 捕获异常
由于线程的本质特性,使得你不能捕获从线程中逃逸的异常。一旦异常逃出任务的run()方法,它就会向外传播到控制台,除非你采取特殊的步骤捕获这种错误的异常。
下面的程序总是会抛出异常:
//: concurrency/ExceptionThread.java // {ThrowsException} import java.util.concurrent.*; public class ExceptionThread implements Runnable { public void run() { throw new RuntimeException(); } public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new ExceptionThread()); } } ///:~
在main中放入try carth并不能抓住异常:
import java.util.concurrent.*; public class NaiveExceptionHandling { public static void main(String[] args) { try { ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new ExceptionThread()); } catch(RuntimeException ue) { // This statement will NOT execute! System.out.println("Exception has been handled!"); } } } ///:~
我们需要修改Executor产生线程的方式。Thread.UncaughtExceptionHandler是Java SE5中的新接口,它允许在每个Thread对象上都附着一个异常处理器。Thread.UncaughtExceptionHandler.uncaughtException()会在线程因为捕获的异常而临近死亡时被调用,为了使用它,我们创建一个新类型ThreadFactory,它将在每个新创建的Thread对象上附着一个Thread.UncaughtExceptionHandler:
//: concurrency/CaptureUncaughtException.java import java.util.concurrent.*; class ExceptionThread2 implements Runnable { public void run() { Thread t = Thread.currentThread(); System.out.println("run() by " + t); System.out.println( "eh = " + t.getUncaughtExceptionHandler()); throw new RuntimeException(); } } class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { public void uncaughtException(Thread t, Throwable e) { System.out.println("caught " + e); } } class HandlerThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { System.out.println(this + " creating new Thread"); Thread t = new Thread(r); System.out.println("created " + t); t.setUncaughtExceptionHandler( new MyUncaughtExceptionHandler()); System.out.println( "eh = " + t.getUncaughtExceptionHandler()); return t; } } public class CaptureUncaughtException { public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool( new HandlerThreadFactory()); exec.execute(new ExceptionThread2()); } } /* Output: (90% match) HandlerThreadFactory@de6ced creating new Thread created Thread[Thread-0,5,main] eh = MyUncaughtExceptionHandler@1fb8ee3 run() by Thread[Thread-0,5,main] eh = MyUncaughtExceptionHandler@1fb8ee3 caught java.lang.RuntimeException *///:~
在Thread类中设置一个静态域,并将这个处理器设置为默认的为捕获异常处理器:
import java.util.concurrent.*; public class SettingDefaultHandler { public static void main(String[] args) { Thread.setDefaultUncaughtExceptionHandler( new MyUncaughtExceptionHandler()); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new ExceptionThread()); } } /* Output: caught java.lang.RuntimeException *///:~
20.3 共享受限资源
20.3.1 不正确的访问资源
下面的任务产生一个偶数,而其他任何消费这些数字。消费者任何唯一工作就是检查偶数的有效性。
public abstract class IntGenerator { private volatile boolean canceled = false; public abstract int next(); // Allow this to be canceled: public void cancel() { canceled = true; }//修改canceled标识 public boolean isCanceled() { return canceled; }//查看该对象是否被取消 } ///:~
import java.util.concurrent.*; public class EvenChecker implements Runnable {//消费者任务 private IntGenerator generator; private final int id; public EvenChecker(IntGenerator g, int ident) { generator = g; id = ident; } public void run() { while(!generator.isCanceled()) { int val = generator.next(); if(val % 2 != 0) {//程序将检查是否是偶数,如果是奇数,那么就是另一个线程还没有执行完next()就调用了检查判断。 System.out.println(val + " not even!"); generator.cancel(); // Cancels all EvenCheckers } } } // Test any type of IntGenerator: public static void test(IntGenerator gp, int count) { System.out.println("Press Control-C to exit"); ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < count; i++) exec.execute(new EvenChecker(gp, i)); exec.shutdown(); } // Default value for count: public static void test(IntGenerator gp) { test(gp, 10); } } ///:~
共享公共资源的任务可以观察该资源的终止信号。这可以消除所谓竞争条件,即两个或更多的任务竞争响应某个条件,因此产生的冲突或以一致结果:
public class EvenGenerator extends IntGenerator { private volatile int currentEvenValue = 0; public int next() {//一个任务可能在另一个任务执行第一个对currentEvenValue递增操作之后,但没有执行第二个操作之前,调用next()方法。 ++currentEvenValue; // Danger point here! ++currentEvenValue; return currentEvenValue; } public static void main(String[] args) { EvenChecker.test(new EvenGenerator()); } } /* Output: (Sample) Press Control-C to exit 89476993 not even! 89476993 not even! *///:~
递增也不是原子操作,所以,必须要保护任务。
20.3.2 解决共享资源竞争
使用线程时的一个基本问题:你永远都不知道一个线程何时在运行。
对于并发,你需要某种方式来防止两个任务访问相同的资源。
防止这种冲突的方法就是当资源被一个任务使用时,在其上加锁。
基本所有并发模式在解决线程冲突问题的时候,都是采用序列化访问共享资源的方案。这意味着在给定时刻只允许一个任务访问共享资源。通常这是通过在代码前面加上一条锁语句来实现的,这就使得在一段时间内只有一个任务可以运行这段代码。因为锁语句产生了一种互相排斥的效果,所有这种机制被称为互斥量。
Java提供关键字synchronized的形式,为防止资源冲突提供了内置支持。当任务要执行被synchronized关键字保护的代码片段的时候,它将检查锁是否可用,然后获取锁,执行代码,释放锁。
要控制对共享资源的访问,得先把它包装进一个对象,然后把所有要访问这个资源的方法标记为synchronized。
synchronized void f(){}
所有对象都自动含有单一的锁(也称为监视器)。当对象上调用其任意synchronized方法的时候,此对象都被加锁。
在使用并发时,将域设置为private是非常重要的,否则,synchronized关键字就不能防止其他任务直接访问域,这样就会产生冲突。
一个任务可以获取多个锁。
JVM负责跟踪对象被加锁的次数。如果一个对象被解锁,其计数变为0。在任务第一个给对象加锁的时候,计数变为1.每当这个相同的任务在这个对象上获取锁,计数都会递增。只有首先获得锁的任务才能允许继续获取多个锁。每当任务离开一个synchronized方法,计数递减,当计数为0的时候,锁被完全释放,此时别的任务就可以使用此资源。
针对每个类,也有一个锁,所有synchronized static方法可以在类的范围内防止对static数据的并发访问。
你应该在什么时候同步,可以运用Brian的同步规则:
如果你在写一个变量,它接下来将被另一个线程读取,或者正在读取一个上一次已经被另一个线程写过的变量,那么你必须使用同步,并且,读写线程都必须使用相同的监视器同步。
每个访问临界共享资源的方法都必须被同步,否则它们就不会正确的工作。
同步控制EvenGenerator
public class SynchronizedEvenGenerator extends IntGenerator { private int currentEvenValue = 0; public synchronized int next() { ++currentEvenValue; Thread.yield(); // Cause failure faster ++currentEvenValue; return currentEvenValue; } public static void main(String[] args) { EvenChecker.test(new SynchronizedEvenGenerator()); } } ///:~
第一个进入next()的任务将获得锁,任何试图获取锁的任务都将从其开始尝试之时被组赛,直到第一个任务释放锁。通过这种方式,任何时刻只有一个任务可以通过由互斥量看护的代码。
使用显示Lock对象
Lock对象必须被显示的创建,锁定和释放。
对于解决某些类型的问题来说,它更灵活。
import java.util.concurrent.locks.*; public class MutexEvenGenerator extends IntGenerator { private int currentEvenValue = 0; private Lock lock = new ReentrantLock(); public int next() { lock.lock(); try { ++currentEvenValue; Thread.yield(); // Cause failure faster ++currentEvenValue; return currentEvenValue; } finally { lock.unlock(); } } public static void main(String[] args) { EvenChecker.test(new MutexEvenGenerator()); } } ///:~
添加一个被互斥调用的锁,并使用lock和unlock方法在next()内创建临界资源
当你使用synchronized关键字时,需要写的代码量更少,并且用户错误出现的可能性也会降低,因此通常只有在解决特殊问题时,才能显示使用Lock对象。例如:使用synchronized关键字不能尝试获取锁且最终获取锁会失败,或者尝试着获取锁一段时间,然后放弃它,要实现这些,你必须使用concurrent类库:
//: concurrency/AttemptLocking.java // Locks in the concurrent library allow you // to give up on trying to acquire a lock. import java.util.concurrent.*; import java.util.concurrent.locks.*; public class AttemptLocking { private ReentrantLock lock = new ReentrantLock();//ReentrantLock可以让你尝试获取锁,但最终没有获取到锁 public void untimed() { boolean captured = lock.tryLock(); try { System.out.println("tryLock(): " + captured); } finally { if(captured) lock.unlock(); } } public void timed() { boolean captured = false; try { captured = lock.tryLock(2, TimeUnit.SECONDS);//尝试获取锁,在2秒后失败 } catch(InterruptedException e) { throw new RuntimeException(e); } try { System.out.println("tryLock(2, TimeUnit.SECONDS): " + captured); } finally { if(captured) lock.unlock(); } } public static void main(String[] args) { final AttemptLocking al = new AttemptLocking(); al.untimed(); // True -- lock is available al.timed(); // True -- lock is available // Now create a separate task to grab the lock: new Thread() { { setDaemon(true); } public void run() { al.lock.lock(); System.out.println("acquired"); } }.start(); Thread.yield(); // Give the 2nd task a chance al.untimed(); // False -- lock grabbed by task al.timed(); // False -- lock grabbed by task } } /* Output: tryLock(): true tryLock(2, TimeUnit.SECONDS): true acquired tryLock(): false tryLock(2, TimeUnit.SECONDS): false *///:~
20.3.3 原子性与易变性
一个常不正确的知识是“原子操作不需要进行同步控制”
通过Goetz测试你就可以使用原子性:如果你可以编写用于现代微处理器的高性能JVM,那么就有资格去考虑是否可以避免同步。
使用volatile关键字,就会获得(简单的赋值和返回操作)原子性。
在多处理器系统上,相对于单处理系统而言,可视性问题远比原子性问题多得多。一个任务做出的修改,即使在不中断的意义上讲是原子性的,对其他任务也可能是不可视的,因此不同的任务对应用的状态有不同的视图。另一方面,同步机制强制在处理系统中,一个任务做出的修改必须在应用中是可视的。如果没有同步机制,那么修改时可视将无法确定。
volatile关键字还确保了应用中的可视性。如果你将一个域声明为volatile的,那么只要对这个域产生了写操作,那么所有的读操作就都可以看到这个修改。
原子性和易变性是不同的概念。在非volatile域上的原子操作不必刷新到主存中去,因此其它读取该域的任务也不必看到这个新值。如果多个任务在同时访问某个域,那么这个域就应该是volatile的,否则,这个域应该只能经由同步来访问。同步也会导致向主存中刷新,因此如果一个域完全由syschronized方法或语句来防护,那就不必将其设置为是volatile的。
一个任务所作的任务写入操作对这个任务来说都是可视的,因此如果它只需要在这个任务内部可视,那么你就不需要将其设置为volatile的。
当一个域的值依赖于它之前的值时,volatile就无法工作了。
使用volatile而不是synchronized的唯一完全情况是类中只有一个可变域。所有,一般,你的第一选择应该是synchronized。
不要盲目的应用原子性:
import java.util.concurrent.*; public class AtomicityTest implements Runnable { private int i = 0; public int getValue() { return i; } private synchronized void evenIncrement() { i++; i++; } public void run() { while(true) evenIncrement(); } public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); AtomicityTest at = new AtomicityTest(); exec.execute(at); while(true) { int val = at.getValue(); if(val % 2 != 0) { System.out.println(val); System.exit(0); } } } }
尽管return i确实是原子性操作,但是缺少同步使得其数值可以在处于不稳定的中间状态被读取。除此之外,由于i也不是volatile的,因此还存在可视性问题。
一个产生序列数的类每当nextSerialNumber()被调用时,它必须调用者返回唯一的值:
public class SerialNumberGenerator { private static volatile int serialNumber = 0; public static int nextSerialNumber() { return serialNumber++; // Not thread-safe } }
如果一个域可能会被多个任务同时访问,或者这些任务中至少有一个是写入任务,那么你就应该将这个域设置为volatile。将一个域定义为volatile,那么它就会告诉编译器不要执行任务移除读取和写入操作的优化,这些操作的目的是用线程中的局部变量维护对这个域的精确同步。
import java.util.concurrent.*; // Reuses storage so we don't run out of memory: class CircularSet { private int[] array; private int len; private int index = 0; public CircularSet(int size) { array = new int[size]; len = size; // Initialize to a value not produced // by the SerialNumberGenerator: for(int i = 0; i < size; i++) array[i] = -1; } public synchronized void add(int i) { array[index] = i; // Wrap index and write over old elements: index = ++index % len; } public synchronized boolean contains(int val) { for(int i = 0; i < len; i++) if(array[i] == val) return true; return false; } } public class SerialNumberChecker { private static final int SIZE = 10; private static CircularSet serials = new CircularSet(1000); private static ExecutorService exec = Executors.newCachedThreadPool(); static class SerialChecker implements Runnable { public void run() { while(true) { int serial = SerialNumberGenerator.nextSerialNumber(); if(serials.contains(serial)) { System.out.println("Duplicate: " + serial); System.exit(0); } serials.add(serial); } } } public static void main(String[] args) throws Exception { for(int i = 0; i < SIZE; i++) exec.execute(new SerialChecker()); // Stop after n seconds if there's an argument: if(args.length > 0) { TimeUnit.SECONDS.sleep(new Integer(args[0])); System.out.println("No duplicates detected"); System.exit(0); } } }
上面这个程序,最终会得到重复的序列数。如果要解决这个问题,需要在nextSerialNumber()前面加上synchronized关键字。
20.3.4 原子类
Java SE5引入了诸如AtomicIntger,AtomicLong,AtomicReference等特殊的原子性变量类,它们提供下面形式的原子性条件更新操作:
booleean compareAndSet(expectedValue,updateValue)
我们可以使用AtomicInteger来重写:
import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.*; public class AtomicIntegerTest implements Runnable { private AtomicInteger i = new AtomicInteger(0); public int getValue() { return i.get(); } private void evenIncrement() { i.addAndGet(2); } public void run() { while(true) evenIncrement(); } public static void main(String[] args) { new Timer().schedule(new TimerTask() { public void run() { System.err.println("Aborting"); System.exit(0); } }, 5000); // Terminate after 5 seconds ExecutorService exec = Executors.newCachedThreadPool(); AtomicIntegerTest ait = new AtomicIntegerTest(); exec.execute(ait); while(true) { int val = ait.getValue(); if(val % 2 != 0) { System.out.println(val); System.exit(0); } } } }
20.3.5 临界区
只是希望防止多个线程同时访问方法内部的部分代码而不是防止访问整个方法。通过这种方式分离出来的代码段被称为临界区,它也使用synchronized关键字建立,synchronized被用来指定某个对象,此对象的锁被用来对花括号内代码进行同步控制:
synchronized(syncObject){}
这也被称为同步控制块;在进入此段代码前,必须得到syncObject对象的锁。如果其他线程已经得到这个锁,那么就得等到锁被释放以后,才能进入临界区。
如果把一个非保护类型的类,在其他类的保护和控制下,应用于多线程环境:
//: concurrency/CriticalSection.java // Synchronizing blocks instead of entire methods. Also // demonstrates protection of a non-thread-safe class // with a thread-safe one. import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.*; class Pair { // Not thread-safe private int x, y; public Pair(int x, int y) { this.x = x; this.y = y; } public Pair() { this(0, 0); } public int getX() { return x; } public int getY() { return y; } public void incrementX() { x++; } public void incrementY() { y++; } public String toString() { return "x: " + x + ", y: " + y; } public class PairValuesNotEqualException extends RuntimeException { public PairValuesNotEqualException() { super("Pair values not equal: " + Pair.this); } } // Arbitrary invariant -- both variables must be equal: public void checkState() { if(x != y) throw new PairValuesNotEqualException(); } } // Protect a Pair inside a thread-safe class: abstract class PairManager {//持有一个Pair对象,并控制一切对它的访问 AtomicInteger checkCounter = new AtomicInteger(0); protected Pair p = new Pair(); private List<Pair> storage = Collections.synchronizedList(new ArrayList<Pair>()); public synchronized Pair getPair() { // Make a copy to keep the original safe: return new Pair(p.getX(), p.getY()); } // Assume this is a time consuming operation protected void store(Pair p) { storage.add(p); try { TimeUnit.MILLISECONDS.sleep(50); } catch(InterruptedException ignore) {} } public abstract void increment(); } // Synchronize the entire method: class PairManager1 extends PairManager { public synchronized void increment() { p.incrementX(); p.incrementY(); store(getPair()); } } // Use a critical section: class PairManager2 extends PairManager { public void increment() { Pair temp; synchronized(this) { p.incrementX(); p.incrementY(); temp = getPair(); } store(temp); } } class PairManipulator implements Runnable { private PairManager pm; public PairManipulator(PairManager pm) { this.pm = pm; } public void run() { while(true) pm.increment(); } public String toString() { return "Pair: " + pm.getPair() + " checkCounter = " + pm.checkCounter.get(); } } class PairChecker implements Runnable { private PairManager pm; public PairChecker(PairManager pm) { this.pm = pm; } public void run() { while(true) { pm.checkCounter.incrementAndGet(); pm.getPair().checkState(); } } } public class CriticalSection { // Test the two different approaches: static void testApproaches(PairManager pman1, PairManager pman2) { ExecutorService exec = Executors.newCachedThreadPool(); PairManipulator pm1 = new PairManipulator(pman1), pm2 = new PairManipulator(pman2); PairChecker pcheck1 = new PairChecker(pman1), pcheck2 = new PairChecker(pman2); exec.execute(pm1); exec.execute(pm2); exec.execute(pcheck1); exec.execute(pcheck2); try { TimeUnit.MILLISECONDS.sleep(500); } catch(InterruptedException e) { System.out.println("Sleep interrupted"); } System.out.println("pm1: " + pm1 + "npm2: " + pm2); System.exit(0); } public static void main(String[] args) { PairManager pman1 = new PairManager1(), pman2 = new PairManager2(); testApproaches(pman1, pman2); } } /* Output: (Sample) pm1: Pair: x: 15, y: 15 checkCounter = 272565 pm2: Pair: x: 16, y: 16 checkCounter = 3956974 *///:~
交给你的一个非线程安全的Pair类,你需要在一个线程环境中使用它。通过创建PairManager类就可以实现,PairManager类持有一个Pair对象并控制对它的一切访问。
PairManager类结构,它的一些功能在基类中实现,并且一个或多个抽象方法在派生类中定义,这种结构在设计模式中称为模板方法。
对于PairChecker的检查频率,PairManager1.increment()不允许有PairManager2.increment()那样多。后者采用同步控制块进行控制的典型原因:使得其他线程能更多的访问。
使用显示的Lock对象来创建临界区:
//: concurrency/ExplicitCriticalSection.java // Using explicit Lock objects to create critical sections. import java.util.concurrent.locks.*; // Synchronize the entire method: class ExplicitPairManager1 extends PairManager { private Lock lock = new ReentrantLock(); public synchronized void increment() { lock.lock(); try { p.incrementX(); p.incrementY(); store(getPair()); } finally { lock.unlock(); } } } // Use a critical section: class ExplicitPairManager2 extends PairManager { private Lock lock = new ReentrantLock(); public void increment() { Pair temp; lock.lock(); try { p.incrementX(); p.incrementY(); temp = getPair(); } finally { lock.unlock(); } store(temp); } } public class ExplicitCriticalSection { public static void main(String[] args) throws Exception { PairManager pman1 = new ExplicitPairManager1(), pman2 = new ExplicitPairManager2(); CriticalSection.testApproaches(pman1, pman2); } } /* Output: (Sample) pm1: Pair: x: 15, y: 15 checkCounter = 174035 pm2: Pair: x: 16, y: 16 checkCounter = 2608588 *///:~
20.3.6 在其他对象上同步
synchronized块必须给定一个在其上进行同步的对象,并且最合理的方式是,使用其方法正在被调用的当前对象:synchronized(this)。
如果获得了synchronized块上的锁,那么改对象其他的synchronized方法和临界区就不能被调用了,因此,如果在this上同步,临界区的效果就会直接缩小到同步的范围内。
两个任务可以同时进入同一个对象,只要这个对象上的方法是在不同的锁上同步的即可:
//: concurrency/SyncObject.java // Synchronizing on another object. import javax.xml.crypto.Data; import java.text.SimpleDateFormat; import java.util.Date; import static net.mindview.util.Print.*; class DualSynch { private Object syncObject = new Object(); public synchronized void f() {//同步整个方法,在this上同步。 for(int i = 0; i < 5; i++) { print("f():"+new SimpleDateFormat("yyyy/MM/dd-HH:mm:ss:SSS").format(new Date())); Thread.yield(); } } public void g() {//在syncObject对象上同步 synchronized(syncObject) { for(int i = 0; i < 5; i++) { print("g():"+new SimpleDateFormat("yyyy/MM/dd-HH:mm:ss:SSS").format(new Date())); Thread.yield(); } } } } public class SyncObject { public static void main(String[] args) { final DualSynch ds = new DualSynch(); new Thread() { public void run() { ds.f(); } }.start(); ds.g(); } } /* Output: (Sample) g():2018/05/31-10:36:32:635 f():2018/05/31-10:36:32:635 f():2018/05/31-10:36:32:637 g():2018/05/31-10:36:32:637 f():2018/05/31-10:36:32:637 g():2018/05/31-10:36:32:637 f():2018/05/31-10:36:32:637 g():2018/05/31-10:36:32:637 f():2018/05/31-10:36:32:638 g():2018/05/31-10:36:32:638 *///:~
20.3.7 线程本地存储
防止任务在共享资源上产生冲突的第二种方式是根除对变量的共享。线程本地存储是一种自动化机制,可以为使用相同变量的每个不同的线程都创建不同的存储。
//: concurrency/ThreadLocalVariableHolder.java // Automatically giving each thread its own storage. import java.util.concurrent.*; import java.util.*; class Accessor implements Runnable { private final int id; public Accessor(int idn) { id = idn; } public void run() { while(!Thread.currentThread().isInterrupted()) { ThreadLocalVariableHolder.increment(); System.out.println(this); Thread.yield(); } } public String toString() { return "#" + id + ": " + ThreadLocalVariableHolder.get(); } } public class ThreadLocalVariableHolder { private static ThreadLocal<Integer> value = new ThreadLocal<Integer>() { private Random rand = new Random(47); protected synchronized Integer initialValue() { return rand.nextInt(10000); } }; public static void increment() { value.set(value.get() + 1); } public static int get() { return value.get(); } public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++) exec.execute(new Accessor(i)); TimeUnit.SECONDS.sleep(3); // Run for a while exec.shutdownNow(); // All Accessors will quit } } /* Output: (Sample) #0: 9259 #1: 556 #2: 6694 #3: 1862 #4: 962 #0: 9260 #1: 557 #2: 6695 #3: 1863 #4: 963 ... *///:~
ThreadLoca;对象通常当作静态域存储。
每个单独的线程都被分配了自己的存储,因为它们每个都需要跟踪自己的计数值。
20.4 终结任务
下面演示一个终止问题,而且还是一个资源共享的示例
20.4.1 装饰性花园
获取每天进入公园的总人数。在公园的任何一个门口都有计数器可以递增。
//: concurrency/OrnamentalGarden.java import java.util.concurrent.*; import java.util.*; import static net.mindview.util.Print.*; class Count { private int count = 0; private Random rand = new Random(47); // Remove the synchronized keyword to see counting fail: public synchronized int increment() { int temp = count; if(rand.nextBoolean()) // Yield half the time Thread.yield(); return (count = ++temp); } public synchronized int value() { return count; } } class Entrance implements Runnable { private static Count count = new Count(); private static List<Entrance> entrances = new ArrayList<Entrance>(); private int number = 0; // Doesn't need synchronization to read: private final int id; private static volatile boolean canceled = false; // Atomic operation on a volatile field: public static void cancel() { canceled = true; } public Entrance(int id) { this.id = id; // Keep this task in a list. Also prevents // garbage collection of dead tasks: entrances.add(this); } public void run() { while(!canceled) { synchronized(this) { ++number; } print(this + " Total: " + count.increment()); try { TimeUnit.MILLISECONDS.sleep(100); } catch(InterruptedException e) { print("sleep interrupted"); } } print("Stopping " + this); } public synchronized int getValue() { return number; } public String toString() { return "Entrance " + id + ": " + getValue(); } public static int getTotalCount() { return count.value(); } public static int sumEntrances() { int sum = 0; for(Entrance entrance : entrances) sum += entrance.getValue(); return sum; } } public class OrnamentalGarden { public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++) exec.execute(new Entrance(i)); // Run for a while, then stop and collect the data: TimeUnit.SECONDS.sleep(3); Entrance.cancel(); exec.shutdown(); if(!exec.awaitTermination(250, TimeUnit.MILLISECONDS)) print("Some tasks were not terminated!"); print("Total: " + Entrance.getTotalCount()); print("Sum of Entrances: " + Entrance.sumEntrances()); } } /* Output: (Sample) Entrance 0: 1 Total: 1 Entrance 2: 1 Total: 3 Entrance 1: 1 Total: 2 Entrance 4: 1 Total: 5 Entrance 3: 1 Total: 4 Entrance 2: 2 Total: 6 Entrance 4: 2 Total: 7 Entrance 0: 2 Total: 8 ... Entrance 3: 29 Total: 143 Entrance 0: 29 Total: 144 Entrance 4: 29 Total: 145 Entrance 2: 30 Total: 147 Entrance 1: 30 Total: 146 Entrance 0: 30 Total: 149 Entrance 3: 30 Total: 148 Entrance 4: 30 Total: 150 Stopping Entrance 2: 30 Stopping Entrance 1: 30 Stopping Entrance 0: 30 Stopping Entrance 3: 30 Stopping Entrance 4: 30 Total: 150 Sum of Entrances: 150 *///:~
20.4.2 在阻塞时终结
sleep()的一种情况,它使任务从执行状态变为被阻塞状态,而有时你必须终止被阻塞的任务。
线程状态
一个线程可以处于以下四种状态之一:
- 新建:当线程被创建时,它只会短暂的处于这种状态。此时它已经分配了必须的系统资源,并执行了初始化。
- 就绪:在这种状态下,只要调度器把时间片分配给线程,线程就可以运行。
- 阻塞:线程能够运行,但有某些条件阻止它的运行。当线程处于阻塞状态时,调度器讲忽略线程,不会分配给线程任何CPU时间。
- 死亡:处于死亡或终止状态的线程将不再是可调度的。任务死亡的通常方式是从run()方法返回,但是任务的线程还可以被中断。
进入阻塞状态
一个任务进入阻塞状态,可能有如下原因:
- 通过调用sleep(milliseconds)使任务进入休眠状态,在这种情况下,任务在指定的时间内不会运行。
- 你通过调用wait()使线程挂起。直到线程得到了notity()或notityAll()消息,线程才会进入就绪状态。
- 任务在等待某个输入/输出完成
- 任务试图在某个对象上调用其同步控制方法,但是对象锁不可用,因为另一个任务已经获取了这个锁。
查看的问题:希望能够终止处于阻塞状态的任务。
20.4.3 中断
在任务的run()方法中间打断,更像是抛出的异常,因此在Java线程种的这种类型的异常中断种用到了异常。
Thread类包含interrupt()方法,因此你可以终止被阻塞的任务,这个方法将设置线程的中断状态。如果一个线程已经被阻塞,或者试图执行一个阻塞操作,那么设置这个线程的中断状态将抛出InterruptedException。
在Executor上调用shutdownNow(),那么它将发送一个interrupt()调用给它启动的所有线程。
使用Executor,那么通过调用submin()而不是executor()来启动任务,就可以持有改任务的上下文。submit()将返回一个泛型Future<?>,其中有一个未修饰的参数,因为你永远不会调用上面的get()——持有这种Future的关键在于你可以在其上调用cancel(),并因此可以使用它来中断某个特定的任务。
Executor展示了基本的interrupt()用法:
//: concurrency/Interrupting.java // Interrupting a blocked thread. import java.util.concurrent.*; import java.io.*; import static net.mindview.util.Print.*; class SleepBlocked implements Runnable { public void run() { try { TimeUnit.SECONDS.sleep(10000); } catch (InterruptedException e) {//被中断时抛出异常 print("InterruptedException"); } print("Exiting SleepBlocked.run()"); } } class IOBlocked implements Runnable { private InputStream in; public IOBlocked(InputStream is) { in = is; } public void run() { try { print("Waiting for read():"); in.read();//不可被中断 } catch (IOException e) { if (Thread.currentThread().isInterrupted()) { print("Interrupted from blocked I/O"); } else { throw new RuntimeException(e); } } print("Exiting IOBlocked.run()"); } } class SynchronizedBlocked implements Runnable { public synchronized void f() {//不可被中断 while (true) // Never releases lock Thread.yield(); } public SynchronizedBlocked() { new Thread() { public void run() { f(); // Lock acquired by this thread } }.start(); } public void run() { print("Trying to call f()"); f(); print("Exiting SynchronizedBlocked.run()"); } } public class Interrupting { private static ExecutorService exec = Executors.newCachedThreadPool(); static void test(Runnable r) throws InterruptedException { Future<?> f = exec.submit(r); TimeUnit.MILLISECONDS.sleep(100); print("Interrupting " + r.getClass().getName()); f.cancel(true); // Interrupts if running print("Interrupt sent to " + r.getClass().getName()); } public static void main(String[] args) throws Exception { //test(new SleepBlocked()); //test(new IOBlocked(System.in)); test(new SynchronizedBlocked()); TimeUnit.SECONDS.sleep(3); print("Aborting with System.exit(0)"); System.exit(0); // ... since last 2 interrupts failed } } /* Output: (95% match) Interrupting SleepBlocked InterruptedException Exiting SleepBlocked.run() Interrupt sent to SleepBlocked Waiting for read(): Interrupting IOBlocked Interrupt sent to IOBlocked Trying to call f() Interrupting SynchronizedBlocked Interrupt sent to SynchronizedBlocked Aborting with System.exit(0) *///:~
你能够中断对sleep()的调用,但是不能中断正在试图获取synchronized锁或者试图执行I/O操作的线程。
对于这类问题,有一个笨拙的解决方案,即关闭任务在其上发生阻塞的底层资源:
//: concurrency/CloseResource.java // Interrupting a blocked task by // closing the underlying resource. // {RunByHand} import java.net.*; import java.util.concurrent.*; import java.io.*; import static net.mindview.util.Print.*; public class CloseResource { public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); ServerSocket server = new ServerSocket(8080); InputStream socketInput = new Socket("localhost", 8080).getInputStream(); exec.execute(new IOBlocked(socketInput));//线程中断在关闭socket的时候 exec.execute(new IOBlocked(System.in));//线程没有中断 TimeUnit.MILLISECONDS.sleep(100); print("Shutting down all threads"); exec.shutdownNow(); TimeUnit.SECONDS.sleep(1); print("Closing " + socketInput.getClass().getName()); socketInput.close(); // Releases blocked thread TimeUnit.SECONDS.sleep(1); print("Closing " + System.in.getClass().getName()); System.in.close(); // Releases blocked thread } } /* Output: (85% match) Waiting for read(): Waiting for read(): Shutting down all threads Closing java.net.SocketInputStream Interrupted from blocked I/O Exiting IOBlocked.run() Closing java.io.BufferedInputStream Exiting IOBlocked.run() *///:~
但是各种nio类提供了更人性化的I/O中断。被阻塞的nio通道回自动的响应中断:
//: concurrency/NIOInterruption.java // Interrupting a blocked NIO channel. import java.net.*; import java.nio.*; import java.nio.channels.*; import java.util.concurrent.*; import java.io.*; import static net.mindview.util.Print.*; class NIOBlocked implements Runnable { private final SocketChannel sc; public NIOBlocked(SocketChannel sc) { this.sc = sc; } public void run() { try { print("Waiting for read() in " + this); sc.read(ByteBuffer.allocate(1)); } catch(ClosedByInterruptException e) { print("ClosedByInterruptException"); } catch(AsynchronousCloseException e) { print("AsynchronousCloseException"); } catch(IOException e) { throw new RuntimeException(e); } print("Exiting NIOBlocked.run() " + this); } } public class NIOInterruption { public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); ServerSocket server = new ServerSocket(8080); InetSocketAddress isa = new InetSocketAddress("localhost", 8080); SocketChannel sc1 = SocketChannel.open(isa); SocketChannel sc2 = SocketChannel.open(isa); Future<?> f = exec.submit(new NIOBlocked(sc1)); exec.execute(new NIOBlocked(sc2)); exec.shutdown(); TimeUnit.SECONDS.sleep(1); // Produce an interrupt via cancel: f.cancel(true); TimeUnit.SECONDS.sleep(1); // Release the block by closing the channel: sc2.close(); } } /* Output: (Sample) Waiting for read() in NIOBlocked@7a84e4 Waiting for read() in NIOBlocked@15c7850 ClosedByInterruptException Exiting NIOBlocked.run() NIOBlocked@15c7850 AsynchronousCloseException Exiting NIOBlocked.run() NIOBlocked@7a84e4 *///:~
被互斥所阻塞
如果你尝试着在一个对象上调用其synchronized方法,而这个对象的所已经被其他任务获得,那么调用任务将被挂起,直至这个锁可获得。
示例说明了同一个互斥可以如何能被同一个任务多次获得:
import static net.mindview.util.Print.*; public class MultiLock { public synchronized void f1(int count) { if(count-- > 0) { print("f1() calling f2() with count " + count); f2(count); } } public synchronized void f2(int count) { if(count-- > 0) { print("f2() calling f1() with count " + count); f1(count); } } public static void main(String[] args) throws Exception { final MultiLock multiLock = new MultiLock(); new Thread() { public void run() { multiLock.f1(10); } }.start(); } } /* Output: f1() calling f2() with count 9 f2() calling f1() with count 8 f1() calling f2() with count 7 f2() calling f1() with count 6 f1() calling f2() with count 5 f2() calling f1() with count 4 f1() calling f2() with count 3 f2() calling f1() with count 2 f1() calling f2() with count 1 f2() calling f1() with count 0 *///:~
一个任务应该能够调用在同一个对象种的其他synchronized方法,而这个任务已经持有锁。
Java SE5并发类种添加了一个特性,即在ReentrantLock上阻塞的任务具备可以被中断的能力,这与在synchronized方法或临界区上阻塞的任务不同:
//: concurrency/Interrupting2.java // Interrupting a task blocked with a ReentrantLock. import java.util.concurrent.*; import java.util.concurrent.locks.*; import static net.mindview.util.Print.*; class BlockedMutex { private Lock lock = new ReentrantLock(); public BlockedMutex() { // Acquire it right away, to demonstrate interruption // of a task blocked on a ReentrantLock: lock.lock();//获取锁 } public void f() { try { // This will never be available to a second task lock.lockInterruptibly(); // Special call print("lock acquired in f()"); } catch(InterruptedException e) { print("Interrupted from lock acquisition in f()"); } } } class Blocked2 implements Runnable { BlockedMutex blocked = new BlockedMutex(); public void run() { print("Waiting for f() in BlockedMutex"); blocked.f(); print("Broken out of blocked call"); } } public class Interrupting2 { public static void main(String[] args) throws Exception { Thread t = new Thread(new Blocked2()); t.start(); TimeUnit.SECONDS.sleep(1); System.out.println("Issuing t.interrupt()"); t.interrupt(); } } /* Output: Waiting for f() in BlockedMutex Issuing t.interrupt() Interrupted from lock acquisition in f() Broken out of blocked call *///:~