Fork me on GitHub

Java并发编程

名词解释

竞态条件

在并发编程中,由于不恰当的执行时序而出现不正确的结果;

重排序

Java内存模型允许编译器对操作顺序进行重排序,允许CPU对操作顺序进行重排序;

内存屏障

硬件层内存屏障

硬件层的内存屏障分为两种:Load BarrierStore Barrier即读屏障和写屏障。

内存屏障有两个作用:

  1. 阻止屏障两侧的指令重排序;
  2. 强制把写缓冲区/高速缓存中的脏数据等写回主内存,让缓存中相应的数据失效。

对于Load Barrier来说,在指令前插入Load Barrier,可以让高速缓存中的数据失效,强制从新从主内存加载数据;
对于Store Barrier来说,在指令后插入Store Barrier,能让写入缓存中的最新数据更新写入主内存,让其他线程可见。

java内存屏障

java的内存屏障通常所谓的四种即LoadLoad,StoreStore,LoadStore,StoreLoad实际上也是上述两种的组合,完成一系列的屏障和数据同步功能。

  • LoadLoad屏障:对于这样的语句Load1; LoadLoad; Load2,在Load2及后续读取操作要读取的数据被访问前,保证Load1要读取的数据被读取完毕。
  • StoreStore屏障:对于这样的语句Store1; StoreStore; Store2,在Store2及后续写入操作执行前,保证Store1的写入操作对其它处理器可见。
  • LoadStore屏障:对于这样的语句Load1; LoadStore; Store2,在Store2及后续写入操作被刷出前,保证Load1要读取的数据被读取完毕。
  • StoreLoad屏障:对于这样的语句Store1; StoreLoad; Load2,在Load2及后续所有读取操作执行前,保证Store1的写入对所有处理器可见。它的开销是四种屏障中最大的。在大多数处理器的实现中,这个屏障是个万能屏障,兼具其它三种内存屏障的功能

Compare And Swap(CAS)

比较并交换。

CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该位置的值。CAS 有效地说明了“我认为位置 V 应该包含值 A;如果包含该值,则将 B 放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。”

long和double的非原子性协定

64位数据的读写操作划分为2次32位的操作来进行;在实际开发中,目前各种平台下的商业虚拟机几乎都选择把64位数据的读写操作作为原子操作来对待;

线程封闭

某个对象封闭在一个线程中时,这种用法将自动实现线程安全性;Swing、JDBC;

栈封闭

通过局部变量才能访问对象,是线程封闭的一种特例,易于维护,更加健壮

线程

什么是线程

线程,有时被称为轻量级进程,是程序执行流的最小单元。每个线程都是独立的,线程会共享进程范围内的资源。一个程序中的多个线程可以被同时调度到多个CPU上运行。一个程序至少有一个进程,一个进程至少有一个线程。

为什么要使用并发

在早期的计算机中时没有使用并发的,它们从头到尾只执行一个程序,这对于计算机资源来说是一种浪费

  1. 资源利用率:在一些情况下,程序要等待一些外部操作的完成,比如输入、输出、访问数据库等造成的阻塞,在阻塞的时候,程序无法进行任何别的操作,要是在等待的时候可以运行另一个程序,无疑会极大提升程序利用率
  2. 公平性:所有的用户和线程对计算机资源有着相同的使用权,应该通过时间分片,而不是等一个任务执行完了再执行另一个任务
  3. 便利性:不同任务相互协作

线程的优势

  1. 发挥多处理器的能力:由于通过时钟频率来提高性能已经越来越困难,产商开始向多核处理器转变;目前多核处理器比比皆是,16核,32核的都有很多,如果是单线程跑程序就不能发挥多核处理器的优势,造成很大的资源浪费;
  2. 阻塞时:很多时候会产生阻塞比如数据输出,访问数据库,访问接口;要是单线程的话会一直等待,而多线程可以切换成别的线程运行,提高了计算机资源的利用率

线程的风险

  1. 提高了开发人员的技术要求:开发人员必须要了解线程方面的内容
  2. 安全性问题:这是一个很大,很复杂的问题,多线程的操作顺序是不可预测的,不正确的使用可能会导致出现莫名其妙的结果,测试时很难发现问题,并且排查问题的难度很大。

线程调度方式

协同式线程调度

线程的执行时间由线程本身来控制,线程把自己的工作执行完了之后,要主动通知系统切换到另一个线程上;

优点

实现简单

缺点

不稳定,程序可能会阻塞甚至崩溃

抢占式线程调度

线程执行时间由系统分配

优点

线程执行时间可控,不会导致进程阻塞

线程状态

  1. 新建(New):创建后尚未启动的线程处于这种状态
  2. 运行(Runable):Runable 包括了操作系统线程状态中的 Running 和 Ready,处于此状态的线程可能正在执行,也可能正在等待 CPU 为它分配执行时间;
    3.无期限等待(Waiting):处于这种状态的线程不会被分配 CPU 执行时间,它们要等待被其他线程显式地唤醒;以下方法会让线程陷入无期限的等待状态:
    没有设置Timeout参数的Object.wait() 方法
    没有设置Timeout参数的Thread.join()方法
  3. 期限等待(Timed Waiting):处于这种状态的线程不会被分配CPU执行时间,但它不需要被其他线程显式唤醒,在一定时间之后会由系统自动唤醒;以下方法会让线程进入期限等待状态:
    Thread.sleep()方法;
    设置了Timeout参数的Object.wait() 方法
    设置了Timeout参数的Thread.join()方法
  4. 阻塞(Blocked):线程被阻塞;在程序进入同步区域的时候,线程将进入这种状态;
    阻塞与等待区别:”阻塞状态”在等待获取一个排他锁(在另外一个线程放弃这个锁的时候发生),等待状态是在等待一段时间或者唤醒动作的发生;
  5. 结束(Terminated):线程已经执行结束

线程安全

线程安全:多个线程访问一个对象时,调用这个对象的行为都能获得正确的结果

线程安全的实现方法:

  1. 互斥同步(阻塞同步,悲观的并发策略,重量级锁):synchronized关键字;synchronized关键字经过编译之后,会在同步块的前后分别形成monitorenter和monitorexit这两个字节码指令(上锁,解锁,锁计数器+_1),这两个字节码都需要一个reference类型的参数来指明要锁定和解锁的对象。如果Java程序中的synchronized明确指定了对象参数,那就是这个对象的reference;如果没有明确指定,那就根据synchronized修饰的是实例方法还是类方法,去取对应的对象实例或Class对象来作为锁对象;
    synchronized同步块对同一条线程来说是可重入的,不会出现自己把自己锁死的问题;
    Java如果要阻塞或唤醒一个线程都需要从用户态转换到核心态中,很耗时;要避免频繁切入到核心态之中;
    公平锁:多个线程在等待同一个锁时,按照申请锁的时间顺序来依次获得锁;synchronized中的锁是非公平的;

  2. 非阻塞同步(乐观的并发策略)

  3. 无同步方案
    可重入代码:不依赖储存在堆上的数据和公用的系统资源,用到的状态量都由参数传入,不调用非可重入的方法等;
    线程本地储存:把共享数据的可见范围限制在同一个线程之内;一个请求对应一个服务器线程

    竞态条件

    在同一程序中运行多个线程本身不会导致问题,问题在于多个线程访问了相同的资源。并且向这些资源做了写操作时才有可能发生问题,只要资源没有发生变化,多个线程读取相同的资源就是安全的。
    当两个线程竞争同一资源时,如果对资源的访问顺序敏感,就称存在竞态条件。导致竞态条件发生的代码区称作临界区。下例中add()方法就是一个临界区,它会产生竞态条件。在临界区中使用适当的同步就可以避免竞态条件。

    1
    2
    3
    4
    5
    6
    public class Counter {
    protected long count = 0;
    public void add(long value) {
    this.count = this.count + value;
    }
    }

假定线程A和线程B同时执行add方法,两个线程分别加了2和3到count变量上,两个线程执行结束后count变量的值应该等于5。然而由于两个线程是交叉执行的,两个线程从内存中读出的初始值都是0。然后各自加了2和3,并分别写回内存。最终的值并不是期望的5,而是最后写回内存的那个线程的值,上面例子中最后写回内存的是线程A,但实际中也可能是线程B。如果没有采用合适的同步机制,线程间的交叉执行情况就无法预料。
add()方法就是一个临界区,它会产生竞态条件。

延迟初始化中的竞态条件:
线程A和B同时执行getInstance ,可能会导致两次调用getInstance得到不同的结果;

1
2
3
4
5
6
7
8
9
10
11
12
@NotThreadSafe
public class LazyInitRace {
private ExpensiveObject instance = null;
public ExpensiveObject getInstance () {
if (instance == null) {
instance = new ExpensiveObject();
}
return instance;
}
}

线程a和线程b同时执行getInstance(),线程a看到instance为空,创建了一个新的instance对象,此时线程b也需要判断instance是否为空,此时的instance是否为空取决于不可预测的时序:包括线程a创建instance对象需要多长时间以及线程的调度方式,如果b检测时,instance为空,那么b也会创建一个instance对象。

和大多数并发错误一样,竞态条件不总是会产生问题,还需要不恰当的执行时序

同步机制

synchronized

synchronized 使用场景

synchronized包裹代码块
  • synchronized(对象){}
  • synchronized(类名.class){}
  • synchronized(this){}
synchronized修饰方法
  • public synchronized void memberMethod(){};
  • public static synchronized void staticMethod(){};

相关赘述

内置锁

每个java对象都可以用做一个实现同步的锁,这些锁成为内置锁。线程进入同步代码块或方法的时候会自动获得该锁,在退出同步代码块或方法时会释放该锁。获得内置锁的唯一途径就是进入这个锁的保护的同步代码块或方法。

对象锁

假设类对象instance的某段代码块被synchronized(obj){}包裹,线程访问该段代码块时便会拿到obj对象的内置锁。在obj对象的内置锁释放前,其他线程仍然可以访问instance对象非同步的方法和代码块(现象一),
但是:

  1. 不能进入任何也以obj为锁的代码块;(现象二
  2. 当obj与instance是同一个对象时,也不能进入任何instance对象的同步方法(现象三

由此可见,第一个线程拿到obj对象的内置锁其实就相当于给instance这个对象加上了一个用于同步的独占排他锁(可重入),我们称obj对象为对象锁。

特别说明:对于不同的两个对象,只有当请求的对象锁相同时,线程间才会产生竞争!其他情况下, 并不会互相影响。(现象四

类锁

是相对于对象锁而抽象出来的一种独占排他锁。当线程拿到一个类A的类锁时,其他线程无法访问以类A为锁(A.class)的同步代码块,也无法进入类A的所有静态同步方法。

类锁其实并不真实存在。所谓类锁,实际上是限制对象仅为Class类对象的对象锁。因此当我们以类A为类锁锁定类A,以类A对象a为对象锁锁定对象a时,两个线程分别访问类A锁定的代码块、对象a锁定的代码块时,并不会产生竞争。(现象五)

线程进入同步成员方法,需要申请的是该同步成员方法对应对象的内置锁作为对象锁。而线程进入同步静态方法,需要申请的是该类对应的Class类对象的内置锁。这使得synchronized(A.class){}代码块 与 A类的同步静态方法前后被多线程访问时,需要申请同一个锁,于是便产生了竞争。(现象六

实例分析

无论修饰的是代码块还是方法,只要指定的锁相同就一定会引起多线程的竞争和部分线程阻塞!
判断的关键便是从形式多变的synchronized关键词中挖掘出锁。
现在假设有类A,另有Object类对象obj。得下表:

synchronized形式(均写在在类A中) 锁类型
synchronized(obj){} obj 对象锁
synchronized(A.class){} A.class 类锁
synchronized(this){} 当前类对象 对象锁
synchronized void memberMethod(){} 当前类对象 对象锁
static synchronized void staticMethod(){} 当前类 类锁

实例分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class SynchronizedVerify {
/**-------------------------------------包裹代码块------------------------------**/
Object objA = new Object();
Object objB = new Object();
public void blockNormal(){ track("blockNormal");}
public void blockStyle(){
synchronized (objA) { track("blockStyle"); }
}
/**相同场景下对比**/
public void blockContrast( ){
synchronized (objA) { track("blockContrast");}
}
public void blockDiffObj(){
synchronized (objB) { track("blockDiffObj");}
}
public void blockOneself(){
synchronized (this) { track("blockOneself");}
}
public void blockClass(){
synchronized (SynchronizedVerify.class) { track("blockClass");}
}
/**-------------------------------------修饰方法------------------------------**/
public void methodNormal(){ track("methodNormal");}
public synchronized void methodStyle(){ track("methodStyle");}
public synchronized void methodContrast(){ track("methodContrast");}/**相同场景下对比**/
public static void methodStaticNormal(){ track("methodStaticNormal");}
public synchronized static void methodStatic(){ track("methodStatic");}
public synchronized static void methodStaticContrast(){ track("methodStaticContrast");}
/**循环输出一段内容**/
private static void track(String callerName){
for(int i = 0;i < 3 ;i++){
System.out.println(
callerName+":"+i+"\t|"+
Thread.currentThread().getName());
Thread.yield();
}
}
}

验证现象一
1
2
3
4
5
6
7
8
9
public static void main(String[] args){
final SynchronizedVerify sv = new SynchronizedVerify();
new Thread(){//定为线程一
public void run() { sv.blockStyle(); };
}.start();
new Thread(){//定为线程二
public void run() { sv.blockNormal(); };
}.start();
}

sv.blockStyle() 以私有成员ObjA为对象锁

  • sv.blockNormal() 非同步方法且不含同步代码块
    输出(无竞争):
    blockStyle:0 |Thread-0
    blockNormal:0 |Thread-1
    blockStyle:1 |Thread-0
    blockNormal:1 |Thread-1
    blockNormal:2 |Thread-1
    blockStyle:2 |Thread-0
验证现象二
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args){
final SynchronizedVerify sv = new SynchronizedVerify();
new Thread(){//定为线程一
public void run() { sv.blockStyle(); };
}.start();
new Thread(){//定为线程二
public void run() { sv.blockContrast(); };
}.start();
}
sv.blockStyle() 以私有成员ObjA为对象锁
+ sv.blockContrast() 以私有成员ObjA为对象锁
输出(竞争):
blockStyle:0 |Thread-0
blockStyle:1 |Thread-0
blockStyle:2 |Thread-0
blockContrast:0 |Thread-1
blockContrast:1 |Thread-1
blockContrast:2 |Thread-1

将线程二的调用改为sv.blockDiffObj() 以私有成员ObjB为对象锁
sv.blockStyle() + sv.blockDiffObj()输出(无竞争。多尝试几遍,或者加大输出量):
blockStyle:0 |Thread-0
blockStyle:1 |Thread-0
blockDiffObj:0 |Thread-1
blockStyle:2 |Thread-0
blockDiffObj:1 |Thread-1
blockDiffObj:2 |Thread-1

组合sv.blockStyle() + sv.blockDiffObj()中,两个方法体都用synchronized包裹。但所施加的对象锁一个是objA一个是objB,这是两个对象,所以线程进入两个方法的同步代码块时并不会产生竞争。

验证现象三
1
2
3
4
5
6
7
8
9
public static void main(String[] args){
final SynchronizedVerify sv = new SynchronizedVerify();
new Thread(){//定为线程一
public void run() { sv.blockStyle(); };
}.start();
new Thread(){//定为线程二
public void run() { sv.methodStyle(); };
}.start();
}

sv.blockStyle() 以私有成员ObjA为对象锁

  • sv.methodStyle() 同步成员方法
    输出(无竞争):
    blockStyle:0 |Thread-0
    methodStyle:0 |Thread-1
    blockStyle:1 |Thread-0
    methodStyle:1 |Thread-1
    methodStyle:2 |Thread-1
    blockStyle:2 |Thread-0

线程一调用方法改为sv.blockOneself() 以this为对象锁
sv.blockOneself() + sv.methodStyle()输出(竞争):
blockOneself:0 |Thread-0
blockOneself:1 |Thread-0
blockOneself:2 |Thread-0
methodStyle:0 |Thread-1
methodStyle:1 |Thread-1
methodStyle:2 |Thread-1

当线程进入某个对象的同步方法时,实际上是拿这个对象的内置锁作为对象锁。
blockOneself()以this为锁锁住代码块,等同于拿当前对象的内置锁作为对象锁。所以当其他线程再进入这个对象的blockOneself()时,因为第一个线程已经独占了对象锁,随后的线程只能阻塞。

验证现象四

在这里使用类SameObjLockVerify来验证,类定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class SameObjLockVerify {
public static Object staticObj = new Object();
public void objLock(Object obj){
synchronized (obj) { track("objLock");}
}
public void staticObjLock(){
synchronized (SameObjLockVerify.staticObj) { track("staticObjLock");}
}
/**循环输出一段内容**/
private static void track(String callerName){
for(int i = 0;i < 3 ;i++)
System.out.println(
callerName+":"+i+"\t|"+
Thread.currentThread().getName());
Thread.yield();
}
}

如果有若干个不同的对象,这些对象中没有一个同步代码块使用与其他对象相同的对象锁。那么多线程访问这些对象,线程间无论如何都不会相互影响。(自行验证)
这里反向模拟现象四,让我们来看看同一个对象obj作对象锁时,不同线程进入不同对象solv1、solv2时有无竞争?

为上述SameObjLockVerify类添加如下main方法:

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args){
final SameObjLockVerify solv1 = new SameObjLockVerify();
final SameObjLockVerify solv2 = new SameObjLockVerify();
final Object objLock = new Object();
new Thread(){//定为线程一
public void run() { solv1.objLock(objLock); };
}.start();
new Thread(){//定为线程二
public void run() { solv2.objLock(objLock); };
}.start();
}

solv1.objLock(objLock)+solv2.objLock(objLock)输出(竞争):
objLock:0 |Thread-0
objLock:1 |Thread-0
objLock:2 |Thread-0
objLock:0 |Thread-1
objLock:1 |Thread-1
objLock:2 |Thread-1

或者这样:

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args){
final SameObjLockVerify solv1 = new SameObjLockVerify();
final SameObjLockVerify solv2 = new SameObjLockVerify();
new Thread(){//定为线程一
public void run() { solv1.staticObjLock(); };
}.start();
new Thread(){//定为线程二
public void run() { solv2.staticObjLock(); };
}.start();
}

solv1.staticObjLock() + solv2.staticObjLock()输出(竞争):
staticObjLock:0 |Thread-0
staticObjLock:1 |Thread-0
staticObjLock:2 |Thread-0
staticObjLock:0 |Thread-1
staticObjLock:1 |Thread-1
staticObjLock:2 |Thread-1

即使不是同一个类对象,只要synchronized(对象锁){}使用了相同的对象锁,就会造成多线程的资源竞争。

验证现象五
1
2
3
4
5
6
7
8
9
10
public static void main(String[] args){
final SynchronizedVerify sv = new SynchronizedVerify();
new Thread(){//定为线程一
public void run() { sv.blockOneself(); };
}.start();
new Thread(){//定为线程二
public void run() { sv.blockClass(); };
}.start();
}

sv.blockOneself() + sv.blockClass()输出(无竞争):
blockOneself:0 |Thread-0
blockClass:0 |Thread-1
blockClass:1 |Thread-1
blockClass:2 |Thread-1
blockOneself:1 |Thread-0
blockOneself:2 |Thread-0

验证现象六
1
2
3
4
5
6
7
8
9
10
public static void main(String[] args){
final SynchronizedVerify sv = new SynchronizedVerify();
new Thread(){//定为线程一
public void run() { sv.blockClass(); };
}.start();
new Thread(){//定为线程二
public void run() { SynchronizedVerify.methodStatic(); };
}.start();
}

sv.blockClass() + SynchronizedVerify.methodStatic()输出(竞争):
blockClass:0 |Thread-0
blockClass:1 |Thread-0
blockClass:2 |Thread-0
methodStatic:0 |Thread-1
methodStatic:1 |Thread-1
methodStatic:2 |Thread-1

关键词volatile

关键词volatile:Java虚拟机提供的最轻量级的同步机制;不依赖变量原值时可以使用,比如说set()

特性一:保证此变量对所有线程的可见性(一条线程修改了此变量的值,新值对所有线程可以立即得知;普通变量需要先写入主内存,别的线程再从主内存读取该线程才能新值可见);
特性二:volatile变量的运算在并发下一样是不安全的(Java里面的运算并非原子操作);只能保证可见性;当volatile变量被取到栈顶时,其他线程可能已经改变该变量值,导致该变量栈顶值过期;
特性三:volatile变量禁止指令重排序优化(内存屏障);

volatile的使用要求满足下面的两个条件:

  1. 对变量或者引用的写操作不依赖于变量或者引用的当前值(如果只有特定的单个线程修改共享变量,那么修改操作也是可以依赖于当前值);
  2. 该变量或者引用没有包含在其它的不变式条件中;

volatile最常见的错误使用场景是使用volatile来实现并发 i++; 错误的原因是,该操作依赖于 i 变量的当前值,他是在 i 变量的当前值的基础上加一,所以说他依赖于 i 的当前值。多个线程执行 i++; 会丢失更新。比如两个线程同时读到 i 的当前值8,都进行加一,然后写回,最终 i 的结果是 9,而不是我们期待的10,丢失了更新。

Volatile修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值。而且,当成员变量发生变化时,强迫线程将变化值回写到共享内存。这样在任何时刻,两个不同的线程总是看到某个成员变量的值是相同的,更简单一点理解就是volatile修饰的变量值发生变化时对于另外的线程是可见的。

为什么要使用Volatile(好处)

Volatile变量修饰符如果使用恰当的话,它比synchronized的使用和执行成本会更低,因为它不会引起线程上下文的切换和调度。
有volatile变量修饰的共享变量进行写操作的时候会多第二行汇编代码
0x01a3de24: lock addl $0x0,(%esp);
lock前缀的指令在多核处理器下会引发了两件事情。

  • 将当前处理器缓存行的数据会写回到系统内存。

  • 这个写回内存的操作会引起在其他CPU里缓存了该内存地址的数据无效。

处理器为了提高处理速度,不直接和内存进行通讯,而是先将系统内存的数据读到内部缓存(L1,L2或其他)后再进行操作,但操作完之后不知道何时会写到内存,如果对声明了Volatile变量进行写操作,JVM就会向处理器发送一条Lock前缀的指令,将这个变量所在缓存行的数据写回到系统内存。但是就算写回到内存,如果其他处理器缓存的值还是旧的,再执行计算操作就会有问题,所以在多处理器下,为了保证各个处理器的缓存是一致的,就会实现缓存一致性协议,每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了,当处理器发现自己缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置成无效状态,当处理器要对这个数据进行修改操作的时候,会强制重新从系统内存里把数据读到处理器缓存里。
Lock前缀指令会引起处理器缓存回写到内存。Lock前缀指令导致在执行指令期间,声言处理器的 LOCK# 信号。在多处理器环境中,LOCK# 信号确保在声言该信号期间,处理器可以独占使用任何共享内存。(因为它会锁住总线,导致其他CPU不能访问总线,不能访问总线就意味着不能访问系统内存),但是在最近的处理器里,LOCK#信号一般不锁总线,而是锁缓存,毕竟锁总线开销比较大。在8.1.4章节有详细说明锁定操作对处理器缓存的影响,对于Intel486和Pentium处理器,在锁操作时,总是在总线上声言LOCK#信号。但在P6和最近的处理器中,如果访问的内存区域已经缓存在处理器内部,则不会声言LOCK#信号。相反地,它会锁定这块内存区域的缓存并回写到内存,并使用缓存一致性机制来确保修改的原子性,此操作被称为“缓存锁定”,缓存一致性机制会阻止同时修改被两个以上处理器缓存的内存区域数据
一个处理器的缓存回写到内存会导致其他处理器的缓存无效。IA-32处理器和Intel 64处理器使用MESI(修改,独占,共享,无效)控制协议去维护内部缓存和其他处理器缓存的一致性。在多核处理器系统中进行操作的时候,IA-32 和Intel 64处理器能嗅探其他处理器访问系统内存和它们的内部缓存。它们使用嗅探技术保证它的内部缓存,系统内存和其他处理器的缓存的数据在总线上保持一致。例如在Pentium和P6 family处理器中,如果通过嗅探一个处理器来检测其他处理器打算写内存地址,而这个地址当前处理共享状态,那么正在嗅探的处理器将无效它的缓存行,在下次访问相同内存地址时,强制执行缓存行填充。
对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入。
当读一个volatile变量时,JMM会把该线程对应的本地内存置为无效。线程接下来将从主内存中读取共享变量。
什么时候读?就是每次使用这个变量的时候,也就是说,每次用到这个变量的时候都会取到最新值

volatile内存语义的实现

为了实现volatile内存语义,JMM( Java Memory Model(Java内存模型) )会分别限制编译器重排序和处理器重排序

volatile重排序规则表:

  1. 当第一个操作为普通的读或写时,如果第二个操作为volatile写,则编译器不能重排序这两个操作(1,3)
  2. 当第一个操作是volatile读时,不管第二个操作是什么,都不能重排序。这个规则确保volatile读之后的操作不会被编译器重排序到volatile读之前(第二行)
  3. 当第一个操作是volatile写,第二个操作是volatile读时,不能重排序(3,2)
  4. 当第二个操作是volatile写时,不管第一个操作是什么,都不能重排序(第三列)

为了实现volatile的内存语义,编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。
对于编译器来说,发现一个最优布置来最小化插入屏障的总数几乎不可能,为此,JMM采取保守策略。下面是基于保守策略的JMM内存屏障插入策略:

  • 在每个volatile写操作的前面插入一个StoreStore屏障。
  • 在每个volatile写操作的后面插入一个StoreLoad屏障。
  • 在每个volatile读操作的后面插入一个LoadLoad屏障。
  • 在每个volatile读操作的后面插入一个LoadStore屏障。

上面的优化是针对任意处理器平台,由于不同的处理器有不同“松紧度”的处理器内存模型,内存屏障的插入还可以根据具体的处理器内存模型继续优化。以x86处理器为例,上图中除最后的StoreLoad屏障外,其它的屏障都会被省略。
x86处理器仅会对写-读操作做重排序。X86不会对读-读,读-写和写-写操作做重排序,因此在x86处理器中会省略掉这三种操作类型对应的内存屏障。


上图的StoreStore屏障可以保证在volatile写之前,其前面的所有普通写操作已经对任意处理器可见了。因为StoreStore屏障将保障上面所有的普通写在volatile写之前刷新到主内存


x86处理器仅仅会对写-读操作做重排序,因此会省略掉读-读、读-写和写-写操作做重排序的内存屏障。在x86中,JMM仅需在volatile后面插入一个StoreLoad屏障即可正确实现volatile写-读的内存语义,这意味着在x86处理器中,volatile写的开销比volatile读的大,因为StoreLoad屏障开销比较大。

原子变量和原子引用

原子变量和引用都是使用compareAndSwap(CAS指令)来实现:依赖当前值的原子修改的
而且他们的实现都是使用volatileUnsafe:volatile保证可见性,而Unsafe保证原子性。

从Java 1.5开始引入了原子变量和原子引用:
java.util.concurrent.atomic.AtomicBoolean
java.util.concurrent.atomic.AtomicInteger
java.util.concurrent.atomic.AtomicLong
java.util.concurrent.atomic.AtomicReference

以及他们对应的数组:
java.util.concurrent.atomic.AtomicIntegerArray
java.util.concurrent.atomic.AtomicLongArray
java.util.concurrent.atomic.AtomicReferenceArray

AtomicReference的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class AtomicReference<V> implements java.io.Serializable {
private static final long serialVersionUID = -1848883965231344442L;
// 获取Unsafe对象,Unsafe的作用是提供CAS操作
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
//获得AtomicReference<V>实例化对象中的 value 属性的在该对象在堆内存的偏移 valueOffset 位置
valueOffset = unsafe.objectFieldOffset
(AtomicReference.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
//使用了volatile类型来保证 value 的可见性;
private volatile V value;
public AtomicReference(V initialValue) {
value = initialValue;
}
public AtomicReference() {
}
public final V get() {
return value;
}
public final void set(V newValue) {
value = newValue;
}
public final void lazySet(V newValue) {
unsafe.putOrderedObject(this, valueOffset, newValue);
}
//通过比较valueOffset处的内存的值是否为expect,是的话就更新替换成新值update,这个操作是原子性的。
//CAS操作有3个操作数,内存值M,预期值E,新值U,如果M==E,则将内存值修改为B,否则啥都不做。Unsafe的实现主要是通过编译器,利用CPU的一些原子指令来实现的。
public final boolean compareAndSet(V expect, V update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}
public final boolean weakCompareAndSet(V expect, V update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}
public final V getAndSet(V newValue) {
while (true) {
V x = get();
if (compareAndSet(x, newValue))
return x;
}
}
public String toString() {
return String.valueOf(get());
}
}

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// AtomicReferenceTest.java的源码
import java.util.concurrent.atomic.AtomicReference;
public class AtomicReferenceTest {
public static void main(String[] args){
// 创建两个Person对象,它们的id分别是101和102。
Person p1 = new Person(101);
Person p2 = new Person(102);
// 新建AtomicReference对象,初始化它的值为p1对象
AtomicReference ar = new AtomicReference(p1);
// 通过CAS设置ar。如果ar的值为p1的话,则将其设置为p2。
ar.compareAndSet(p1, p2);
Person p3 = (Person)ar.get();
System.out.println("p3 is "+p3);
System.out.println("p3.equals(p1)="+p3.equals(p1));
}
}
class Person {
volatile long id;
public Person(long id) {
this.id = id;
}
public String toString() {
return "id:"+id;
}
}

运行结果:

1
2
p3 is id:102
p3.equals(p1)=false

AtomicInteger源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent.atomic;
import sun.misc.Unsafe;
/**
* An {@code int} value that may be updated atomically. See the
* {@link java.util.concurrent.atomic} package specification for
* description of the properties of atomic variables. An
* {@code AtomicInteger} is used in applications such as atomically
* incremented counters, and cannot be used as a replacement for an
* {@link java.lang.Integer}. However, this class does extend
* {@code Number} to allow uniform access by tools and utilities that
* deal with numerically-based classes.
*
* @since 1.5
* @author Doug Lea
*/
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;//value值的偏移地址
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;//实际的值
/**
* Creates a new AtomicInteger with the given initial value.
*
* @param initialValue the initial value
*/
public AtomicInteger(int initialValue) {
value = initialValue;//初始化
}
/**
* Creates a new AtomicInteger with initial value {@code 0}.
*/
public AtomicInteger() {
}
/**
* Gets the current value.
*
* @return the current value
*/
public final int get() {
return value;//返回value值
}
/**
* Sets to the given value.
*
* @param newValue the new value
*/
public final void set(int newValue) {
value = newValue;//设置新值,因为没有判断oldvalue,所以此操作是非线程安全的
}
/**
* Eventually sets to the given value.
*
* @param newValue the new value
* @since 1.6
*/
public final void lazySet(int newValue) {
unsafe.putOrderedInt(this, valueOffset, newValue);//与set操作效果一样,只是采用的是unsafe对象中通过偏移地址来设置值的方式
}
/**
* Atomically sets to the given value and returns the old value.
*
* @param newValue the new value
* @return the previous value
*/
public final int getAndSet(int newValue) {//原子操作,设置新值,返回老值
for (;;) {
int current = get();
if (compareAndSet(current, newValue))//通过CAS算法,比较current的值和实际值是否一致,如果一致则设置为newValue
return current;
}
}
/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* @param expect the expected value
* @param update the new value
* @return true if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* <p>May <a href="package-summary.html#Spurious">fail spuriously</a>
* and does not provide ordering guarantees, so is only rarely an
* appropriate alternative to {@code compareAndSet}.
*
* @param expect the expected value
* @param update the new value
* @return true if successful.
*/
public final boolean weakCompareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
/**
* Atomically increments by one the current value.
*
* @return the previous value
*/
public final int getAndIncrement() {//i++操作
for (;;) {
int current = get();//获取当前值
int next = current + 1;//当前值+1
if (compareAndSet(current, next))//比较current值和实际的值是否一致,如不一致,则继续循环
return current;
}
}
/**
* Atomically decrements by one the current value.
*
* @return the previous value
*/
public final int getAndDecrement() {
for (;;) {
int current = get();
int next = current - 1;
if (compareAndSet(current, next))
return current;
}
}
/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the previous value
*/
public final int getAndAdd(int delta) {//例如:当我们统计接口的响应时间时,可以利用此方法
for (;;) {
int current = get();
int next = current + delta;
if (compareAndSet(current, next))
return current;
}
}
/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
/**
* Atomically decrements by one the current value.
*
* @return the updated value
*/
public final int decrementAndGet() {
for (;;) {
int current = get();
int next = current - 1;
if (compareAndSet(current, next))
return next;
}
}
/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the updated value
*/
public final int addAndGet(int delta) {
for (;;) {
int current = get();
int next = current + delta;
if (compareAndSet(current, next))
return next;
}
}
/**
* Returns the String representation of the current value.
* @return the String representation of the current value.
*/
public String toString() {
return Integer.toString(get());
}
public int intValue() {
return get();
}
public long longValue() {
return (long)get();
}
public float floatValue() {
return (float)get();
}
public double doubleValue() {
return (double)get();
}
}

首先毫无疑问,在没有锁的机制下可能需要借助volatile原语,保证线程间的数据是可见的(共享的)。
这样才获取变量的值的时候才能直接读取。

1
2
3
public final int get() {
return value;//返回value值
}

然后来看看++i是怎么做到的。

1
2
3
4
5
6
7
8
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}

在这里采用了CAS操作,每次从内存中读取数据然后将此数据和+1后的结果进行CAS操作,如果成功就返回结果,否则重试直到成功为止。
而compareAndSet利用JNI来完成CPU指令的操作。

1
2
3
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

整体的过程就是这样子的,利用CPU的CAS指令,同时借助JNI来完成Java的非阻塞算法。其它原子操作都是利用类似的特性完成的。
其中

1
unsafe.compareAndSwapInt(this, valueOffset, expect, update);

类似:

1
2
3
4
5
6
if (this == expect) {
this = update
return true;
} else {
return false;
}

那么问题就来了,成功过程中需要2个步骤:比较this == expect,替换this = updatecompareAndSwapInt如何这两个步骤的原子性呢? 参考CAS的原理。

compareAndSwap(CAS指令)

比较并交换。

CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该位置的值。CAS 有效地说明了“我认为位置 V 应该包含值 A;如果包含该值,则将 B 放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。”

Java并发包(java.util.concurrent)中大量使用了CAS操作,涉及到并发的地方都调用了sun.misc.Unsafe类方法进行CAS操作。java.util.concurrent包完全建立在CAS之上的,没有CAS就不会有此包。java.util.concurrent包中借助CAS实现了区别于synchronouse同步锁的一种乐观锁。

CAS的原理

CAS通过调用JNI的代码实现的。JNI:java Native Interface为JAVA本地调用,允许java调用其他语言。
而compareAndSwapObject就是借助C来调用CPU底层指令实现的。
程序会根据当前处理器的类型来决定是否为cmpxchg指令添加lock前缀。如果程序是在多处理器上运行,就为cmpxchg指令加上lock前缀(lock cmpxchg)。反之,如果程序是在单处理器上运行,就省略lock前缀(单处理器自身会维护单处理器内的顺序一致性,不需要lock前缀提供的内存屏障效果)。

lock前缀

intel的手册对lock前缀的说明如下:

  • 确保对内存的读-改-写操作原子执行。在Pentium及Pentium之前的处理器中,带有lock前缀的指令在执行期间会锁住总线,使得其他处理器暂时无法通过总线访问内存。很显然,这会带来昂贵的开销。从Pentium 4,Intel Xeon及P6处理器开始,intel在原有总线锁的基础上做了一个很有意义的优化:如果要访问的内存区域(area of memory)在lock前缀指令执行期间已经在处理器内部的缓存中被锁定(即包含该内存区域的缓存行当前处于独占或以修改状态),并且该内存区域被完全包含在单个缓存行(cache line)中,那么处理器将直接执行该指令。由于在指令执行期间该缓存行会一直被锁定,其它处理器无法读/写该指令要访问的内存区域,因此能保证指令执行的原子性。这个操作过程叫做缓存锁定(cache locking),缓存锁定将大大降低lock前缀指令的执行开销,但是当多处理器之间的竞争程度很高或者指令访问的内存地址未对齐时,仍然会锁住总线。
  • 禁止该指令与之前和之后的读和写指令重排序。
  • 把写缓冲区中的所有数据刷新到内存中。

备注知识:

关于CPU的锁有如下3种:

  • 处理器自动保证基本内存操作的原子性
      首先处理器会自动保证基本的内存操作的原子性。处理器保证从系统内存当中读取或者写入一个字节是原子的,意思是当一个处理器读取一个字节时,其他处理器不能访问这个字节的内存地址。奔腾6和最新的处理器能自动保证单处理器对同一个缓存行里进行16/32/64位的操作是原子的,但是复杂的内存操作处理器不能自动保证其原子性,比如跨总线宽度,跨多个缓存行,跨页表的访问。但是处理器提供总线锁定和缓存锁定两个机制来保证复杂内存操作的原子性。

  • 使用总线锁保证原子性
      第一个机制是通过总线锁保证原子性。如果多个处理器同时对共享变量进行读改写(i++就是经典的读改写操作)操作,那么共享变量就会被多个处理器同时进行操作,这样读改写操作就不是原子的,操作完之后共享变量的值会和期望的不一致,举个例子:如果i=1,我们进行两次i++操作,我们期望的结果是3,但是有可能结果是2。如下图

    原因是有可能多个处理器同时从各自的缓存中读取变量i,分别进行加一操作,然后分别写入系统内存当中。那么想要保证读改写共享变量的操作是原子的,就必须保证CPU1读改写共享变量的时候,CPU2不能操作缓存了该共享变量内存地址的缓存。
      处理器使用总线锁就是来解决这个问题的。所谓总线锁就是使用处理器提供的一个LOCK#信号,当一个处理器在总线上输出此信号时,其他处理器的请求将被阻塞住,那么该处理器可以独占使用共享内存。   

  • 使用缓存锁保证原子性
      第二个机制是通过缓存锁定保证原子性。在同一时刻我们只需保证对某个内存地址的操作是原子性即可,但总线锁定把CPU和内存之间通信锁住了,这使得锁定期间,其他处理器不能操作其他内存地址的数据,所以总线锁定的开销比较大,最近的处理器在某些场合下使用缓存锁定代替总线锁定来进行优化。
      频繁使用的内存会缓存在处理器的L1,L2和L3高速缓存里,那么原子操作就可以直接在处理器内部缓存中进行,并不需要声明总线锁,在奔腾6和最近的处理器中可以使用“缓存锁定”的方式来实现复杂的原子性。所谓“缓存锁定”就是如果缓存在处理器缓存行中内存区域在LOCK操作期间被锁定,当它执行锁操作回写内存时,处理器不在总线上声言LOCK#信号,而是修改内部的内存地址,并允许它的缓存一致性机制来保证操作的原子性,因为缓存一致性机制会阻止同时修改被两个以上处理器缓存的内存区域数据,当其他处理器回写已被锁定的缓存行的数据时会起缓存行无效,在例1中,当CPU1修改缓存行中的i时使用缓存锁定,那么CPU2就不能同时缓存了i的缓存行。
      但是有两种情况下处理器不会使用缓存锁定。第一种情况是:当操作的数据不能被缓存在处理器内部,或操作的数据跨多个缓存行(cache line),则处理器会调用总线锁定。第二种情况是:有些处理器不支持缓存锁定。对于Inter486和奔腾处理器,就算锁定的内存区域在处理器的缓存行中也会调用总线锁定。

  以上两个机制我们可以通过Inter处理器提供了很多LOCK前缀的指令来实现。比如位测试和修改指令BTS,BTR,BTC,交换指令XADD,CMPXCHG和其他一些操作数和逻辑指令,比如ADD(加),OR(或)等,被这些指令操作的内存区域就会加锁,导致其他处理器不能同时访问它。   

CAS缺点

CAS虽然很高效的解决原子操作,但是CAS仍然存在三大问题:
ABA问题,循环时间长开销大和只能保证一个共享变量的原子操作

  1. ABA问题。因为CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A。
    从Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
    关于ABA问题参考文档: http://blog.hesey.Net/2011/09/resolve-aba-by-atomicstampedreference.html

  2. 循环时间长开销大。自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。

  3. 只能保证一个共享变量的原子操作。当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。

Java并发工具包(java.util.concurrent)

Java 5 添加了一个新的包到 Java 平台,java.util.concurrent 包。这个包包含有一系列能够让 Java 的并发编程变得更加简单轻松的类。

阻塞队列 BlockingQueue

BlockingQueue 通常用于一个线程生产对象,而另外一个线程消费这些对象的场景。下图是对这个原理的阐述:

一个线程往里边放,另外一个线程从里边取的一个 BlockingQueue。
一个线程将会持续生产新对象并将其插入到队列之中,直到队列达到它所能容纳的临界点。也就是说,它是有限的。如果该阻塞队列到达了其临界点,负责生产的线程将会在往里边插入新对象时发生阻塞。它会一直处于阻塞之中,直到负责消费的线程从队列中拿走一个对象。
负责消费的线程将会一直从该阻塞队列中拿出对象。如果消费线程尝试去从一个空的队列中提取对象的话,这个消费线程将会处于阻塞之中,直到一个生产线程把一个对象丢进队列。
| || 抛异常 | 特定值 | 阻塞 | 超时 |
| —— | —— | —— | —— | —— |
| 插入 | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) |
| 移除 | remove(o) | poll() | take(o) | poll(timeout, timeunit) |
| 检查 | element(o) | peek(o) | | 对象锁 |
四组不同的行为方式解释:

  • 抛异常:如果试图的操作无法立即执行,抛一个异常。
  • 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
  • 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  • 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

无法向一个 BlockingQueue 中插入 null。如果你试图插入 null,BlockingQueue 将会抛出一个 NullPointerException。

数组阻塞队列 ArrayBlockingQueue

ArrayBlockingQueue 类实现了 BlockingQueue 接口。
ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。
ArrayBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.yh.applet.thread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.junit.Test;
public class BlockingQueueTest {
@Test
public void getResult() throws InterruptedException {
BlockingQueue<String> bq = new ArrayBlockingQueue<String>(1024);
Thread t1 = new Thread(new Producer(bq));
Thread t2 = new Thread(new Consumer(bq));
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Test thread is finished");
}
class Producer implements Runnable {
BlockingQueue<String> bq = null;
public Producer(BlockingQueue<String> bq) {
this.bq = bq;
}
@Override
public void run() {
int count = 0;
while (true) {
try {
bq.put("" + count++ );
System.out.println("Producer: " + count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
BlockingQueue<String> bq = null;
public Consumer(BlockingQueue<String> bq) {
this.bq = bq;
}
@Override
public void run() {
try {
while (true) {
System.out.println("Consumer: " + bq.take());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

延迟队列 DelayQueue

链阻塞队列 LinkedBlockingQueue

LinkedBlockingQueue 类实现了 BlockingQueue 接口。
LinkedBlockingQueue 内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。
LinkedBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。

ArrayBlockingQueue和LinkedBlockingQueue的区别:

  1. 队列中锁的实现不同
    ArrayBlockingQueue实现的队列中的锁是没有分离的,即生产和消费用的是同一个锁;
    LinkedBlockingQueue实现的队列中的锁是分离的,即生产用的是putLock,消费是takeLock(能够高效地处理并发数据)
  2. 在生产或消费时操作不同
    ArrayBlockingQueue实现的队列中在生产和消费的时候,是直接将枚举对象插入或移除的;
    LinkedBlockingQueue实现的队列中在生产和消费的时候,需要把枚举对象转换为Node进行插入或移除,会影响性能
  3. 队列大小初始化方式不同
    ArrayBlockingQueue实现的队列中必须指定队列的大小;
    LinkedBlockingQueue实现的队列中可以不指定队列的大小,但是默认是Integer.MAX_VALUE

具有优先级的阻塞队列 PriorityBlockingQueue

同步队列 SynchronousQueue

SynchronousQueue 类实现了 BlockingQueue 接口。
SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。
据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。

阻塞双端队列 BlockingDeque

线程在双端队列的两端都可以插入和提取元素

并发 Map(映射) ConcurrentMap

ConcurrentHashMap

ConcurrentHashMap 和 java.util.HashTable 类很相似,但 ConcurrentHashMap 能够提供比 HashTable 更好的并发性能。在你从中读取对象的时候 ConcurrentHashMap 并不会把整个 Map 锁住。此外,在你向其中写入对象的时候,ConcurrentHashMap 也不会锁住整个 Map。它的内部只是把 Map 中正在被写入的部分进行锁定。

1
2
3
ConcurrentMap concurrentMap = new ConcurrentHashMap();
concurrentMap.put("key", "value");
Object value = concurrentMap.get("key");

闭锁 CountDownLatch

java.util.concurrent.CountDownLatch 是一个并发构造,它允许一个或多个线程等待一系列指定操作的完成。
CountDownLatch 以一个给定的数量初始化。countDown() 每被调用一次,这一数量就减一。通过调用 await() 方法之一,线程可以阻塞等待这一数量到达零。
以下是一个简单示例。Decrementer 三次调用 countDown() 之后,等待中的 Waiter 才会从 await() 调用中释放出来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.yh.applet.thread;
import java.util.concurrent.CountDownLatch;
import org.junit.Test;
public class CountDownLatchTest {
@Test
public void getResult () throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
Waiter waiter = new Waiter(latch);
Decrementer decrementer = new Decrementer(latch);
Thread t1 = new Thread(waiter);
Thread t2 = new Thread(decrementer);
t1.start();
t2.start();
t1.join();
t2.join();
}
class Waiter implements Runnable{
CountDownLatch latch = null;
public Waiter(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Waiter Released");
}
}
class Decrementer implements Runnable {
CountDownLatch latch = null;
public Decrementer(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
Thread.sleep(1000);
this.latch.countDown();
Thread.sleep(1000);
this.latch.countDown();
Thread.sleep(1000);
this.latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

栅栏 CyclicBarrier

java.util.concurrent.CyclicBarrier 类是一种同步机制,它能够对处理一些算法的线程实现同步。换句话讲,它就是一个所有线程必须等待的一个栅栏,直到所有线程都到达这里,然后所有线程才可以继续做其他事情。图示如下:

两个线程在栅栏旁等待对方。
通过调用 CyclicBarrier 对象的 await() 方法,两个线程可以实现互相等待。一旦 N 个线程在等待 CyclicBarrier 达成,所有线程将被释放掉去继续运行。

1
2
CyclicBarrier barrier = new CyclicBarrier(2);
barrier.await();

Executor框架

http://www.cnblogs.com/MOBIN/p/5436482.html
Eexecutor作为灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,基于生产者-消费者模式,其提交任务的线程相当于生产者,执行任务的线程相当于消费者,并用Runnable来表示任务,Executor的实现还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制。

Exexctor简介

Executor的UML图:(常用的几个接口和子类)

  • Executor:一个接口,其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command),
  • ExecutorService:是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法
  • AbstractExecutorService:ExecutorService执行方法的默认实现
  • ScheduledExecutorService:一个可定时调度任务的接口
  • ScheduledThreadPoolExecutor:ScheduledExecutorService的实现,一个可定时调度任务的线程池
  • ThreadPoolExecutor:线程池,可以通过调用Executors以下静态工厂方法来创建线程池并返回一个ExecutorService对象:

Java四种线程池的使用(基于ThreadPoolExecutor)

CachedThreadPool

1
2
3
4
5
6
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
//使用同步队列,将任务直接提交给线程
new SynchronousQueue<Runnable>());
}

创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;
try {
Thread.sleep(index * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(new Runnable() {
public void run() {
System.out.println(index);
}
});
}
}
}

线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。

FixedThreadPool

1
2
3
4
5
6
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
//使用一个基于FIFO排序的阻塞队列,在所有corePoolSize线程都忙时新任务将在队列中等待
new LinkedBlockingQueue<Runnable>());
}

创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}

因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。
定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import org.junit.Before;
import org.junit.Test;
public class ThreadPoolTest {
private ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
private List<Integer> list = new ArrayList<Integer>();
@Before
public void init() {
for (int i = 0; i < 10000; i++) {
list.add(i);
}
}
@Test
public void threadtest() {
for (final Integer i : list) {
pool.submit(new Runnable() {
@Override
public void run() {
try {
System.out.println(i);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
}

ScheduledThreadPool

1
2
3
4
5
6
7
8
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}

创建一个定长线程池,支持定时及周期性任务执行。延迟执行示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package test;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new Runnable() {
public void run() {
System.out.println("delay 3 seconds");
}
}, 3, TimeUnit.SECONDS);
}
}

表示延迟3秒执行。
定期执行示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package test;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println("delay 1 seconds, and excute every 3 seconds");
}
}, 1, 3, TimeUnit.SECONDS);
}
}

SingleThreadExecutor

1
2
3
4
5
6
7
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
//corePoolSize和maximumPoolSize都等于,表示固定线程池大小为1
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}

结果依次输出,相当于顺序执行各个任务。

Executor的生命周期

ExecutorService提供了管理Eecutor生命周期的方法,ExecutorService的生命周期包括了:运行 关闭和终止三种状态。

ExecutorService在初始化创建时处于运行状态。
shutdown方法等待提交的任务执行完成并不再接受新任务,在完成全部提交的任务后关闭
shutdownNow方法将强制终止所有运行中的任务并不再允许提交新任务

可以将一个Runnable 或 Callable 提交给ExecutorService的submit方法执行,最终返回一个Future用来获得任务的执行结果或取消任务

1
2
3
4
5
6
7
8
9
10
11
public class CallableAndFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(new Callable<String>() { //接受一上callable实例
public String call() throws Exception {
return "MOBIN";
}
});
System.out.println("任务的执行结果:"+future.get());
}
}

输出:
任务的执行结果:MOBIN

ExecutorCompletionService

实现了CompletionService,将执行完成的任务放到阻塞队列中,通过take或poll方法来获得执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class CompletionServiceTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(10); //创建含10.条线程的线程池
CompletionService completionService = new ExecutorCompletionService(executor);
for (int i =1; i <=10; i ++) {
final int result = i;
completionService.submit(new Callable() {
public Object call() throws Exception {
Thread.sleep(new Random().nextInt(5000)); //让当前线程随机休眠一段时间
return result;
}
});
}
System.out.println(completionService.take().get()); //获取执行结果
}
}

输出结果可能每次都不同(在1到10之间)
3

通过Executor来设计应用程序可以简化开发过程,提高开发效率,并有助于实现并发,在开发中如果需要创建线程可优先考虑使用Executor

ForkJoinPool

ForkJoinPool 在 Java 7 中被引入。它和 ExecutorService 很相似,除了一点不同。ForkJoinPool 让我们可以很方便地把任务分裂成几个更小的任务,这些分裂出来的任务也将会提交给 ForkJoinPool。任务可以继续分割成更小的子任务,只要它还能分割。可能听起来有些抽象,因此本节中我们将会解释 ForkJoinPool 是如何工作的,还有任务分割是如何进行的。

分叉

一个使用了分叉和合并原理的任务可以将自己分叉(分割)为更小的子任务,这些子任务可以被并发执行。如下图所示:

通过把自己分割成多个子任务,每个子任务可以由不同的 CPU 并行执行,或者被同一个 CPU 上的不同线程执行。
只有当给的任务过大,把它分割成几个子任务才有意义。把任务分割成子任务有一定开销,因此对于小型任务,这个分割的消耗可能比每个子任务并发执行的消耗还要大。
什么时候把一个任务分割成子任务是有意义的,这个界限也称作一个阀值。这要看每个任务对有意义阀值的决定。很大程度上取决于它要做的工作的种类。

合并

当一个任务将自己分割成若干子任务之后,该任务将进入等待所有子任务的结束之中。
一旦子任务执行结束,该任务可以把所有结果合并到同一个结果。图示如下:

当然,并非所有类型的任务都会返回一个结果。如果这个任务并不返回一个结果,它只需等待所有子任务执行完毕。也就不需要结果的合并啦。

RecursiveAction

RecursiveAction 是一种没有任何返回值的任务。它只是做一些工作,比如写数据到磁盘,然后就退出了。

RecursiveTask

RecursiveTask 是一种会返回结果的任务。它可以将自己的工作分割为若干更小任务,并将这些子任务的执行结果合并到一个集体结果。可以有几个水平的分割和合并。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package com.yh.applet.thread;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Test;
public class ForkJoinPoolTest {
@Test
public void getResult() {
//定义你期望的并行级别(这里的4)。并行级别表示你想要传递给 ForkJoinPool 的任务所需的线程或 CPU 数量。
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
MyRecursiveAction myRecursiveAction = new MyRecursiveAction(24);
forkJoinPool.invoke(myRecursiveAction);
}
@Test
public void makeMoneyTest() throws InterruptedException, ExecutionException {
//默认并行级别:运行时计算机的处理器数量
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Integer> task = pool.submit(new MakeMoneyTask(1000000));
// do {
// try {
// TimeUnit.MILLISECONDS.sleep(5);
// }catch (InterruptedException e){
// e.printStackTrace();
// }
// }while (!task.isDone());
task.join();
pool.shutdown();
System.out.println(task.get());
}
//RecursiveAction 是一种没有任何返回值的任务。它只是做一些工作,比如写数据到磁盘,然后就退出了。
class MyRecursiveAction extends RecursiveAction {
private static final long serialVersionUID = 1L;
private long workLoad = 0;
public MyRecursiveAction(long workLoad) {
this.workLoad = workLoad;
}
@Override
protected void compute() {
//if work is above threshold, break tasks up into smaller tasks
if(this.workLoad > 16) {
System.out.println("Splitting workLoad : " + this.workLoad);
List<MyRecursiveAction> subtasks =
new ArrayList<MyRecursiveAction>();
subtasks.addAll(createSubtasks());
for(RecursiveAction subtask : subtasks){
subtask.fork();
}
} else {
System.out.println("Doing workLoad myself: " + this.workLoad);
}
}
private List<MyRecursiveAction> createSubtasks() {
List<MyRecursiveAction> subtasks =
new ArrayList<MyRecursiveAction>();
MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2);
MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2);
subtasks.add(subtask1);
subtasks.add(subtask2);
return subtasks;
}
}
//RecursiveTask 是一种会返回结果的任务。它可以将自己的工作分割为若干更小任务,并将这些子任务的执行结果合并到一个集体结果。
public static class MakeMoneyTask extends RecursiveTask<Integer>{
private static final long serialVersionUID = 1L;
private static final int MIN_GOAL_MONEY = 100000;
private int goalMoney;
private String name;
private static final AtomicLong employeeNo = new AtomicLong();
public MakeMoneyTask(int goalMoney){
this.goalMoney = goalMoney;
this.name = "员工" + employeeNo.getAndIncrement() + "号";
}
@Override
protected Integer compute() {
if (this.goalMoney < MIN_GOAL_MONEY){
System.out.println(name + ": 老板交代了,要赚 " + goalMoney + " 元,为了买车买房,加油吧....");
return makeMoney();
}else{
int subThreadCount = ThreadLocalRandom.current().nextInt(10) + 2;
System.out.println(name + ": 上级要我赚 " + goalMoney + ", 有点小多,没事让我" + subThreadCount + "个手下去完成吧," +
"每人赚个 " + Math.ceil(goalMoney * 1.0 / subThreadCount) + "元应该没问题...");
List<MakeMoneyTask> tasks = new ArrayList<MakeMoneyTask>();
for (int i = 0; i < subThreadCount; i ++){
tasks.add(new MakeMoneyTask(goalMoney / subThreadCount));
}
Collection<MakeMoneyTask> makeMoneyTasks = invokeAll(tasks);
int sum = 0;
for (MakeMoneyTask moneyTask : makeMoneyTasks){
try {
sum += moneyTask.get();
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println(name + ": 嗯,不错,效率还可以,终于赚到 " + sum + "元,赶紧邀功去....");
return sum;
}
}
private Integer makeMoney(){
int sum = 0;
int day = 1;
try {
while (true){
Thread.sleep(ThreadLocalRandom.current().nextInt(500));
int money = ThreadLocalRandom.current().nextInt(MIN_GOAL_MONEY / 3);
System.out.println(name + ": 在第 " + (day ++) + " 天赚了" + money);
sum += money;
if (sum >= goalMoney){
System.out.println(name + ": 终于赚到 " + sum + " 元, 可以交差了...");
break;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return sum;
}
}
}

ForkJoinPool有利有弊,使用之前可以了解利弊,比如:《一个 Java 分叉-合并 带来的灾祸

锁 Lock

可重入锁

在学习ReentrantLock之前我们先来了解一下可重入锁。
在JAVA环境下 ReentrantLock 和synchronized 都是 可重入锁。

可重入锁,也叫做递归锁,指的是同一线程 外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响。简单的说:当一个线程请求得到一个对象锁后再次请求此对象锁,可以再次得到该对象锁,就是说在一个synchronized方法/块或ReentrantLock的内部调用本类的其他synchronized方法/块或ReentrantLock时,是永远可以拿到锁。
java.util.concurrent.locks.Lock 是一个类似于 synchronized 块的线程同步机制。但是 Lock 比 synchronized 块更加灵活、精细。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.yh.applet.thread;
import java.util.ArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.junit.Test;
public class ReentrantLockTest {
Lock lock = new ReentrantLock(); //注意这个地方
private ArrayList<Integer> arrayList = new ArrayList<Integer>();
@Test
public void getResult() {
Thread t0 = new Thread(){
public void run() {
insert(Thread.currentThread());
};
};
Thread t1 = new Thread(){
public void run() {
insert(Thread.currentThread());
};
};
t0.start();
t1.start();
try {
t0.join();
t1.join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void insert(Thread thread) {
lock.lock();
try {
System.out.println(thread.getName()+"得到了锁");
Thread.sleep(1000);
for(int i=0;i<5;i++) {
arrayList.add(i);
}
} catch (Exception e) {
// TODO: handle exception
}finally {
System.out.println(thread.getName()+"释放了锁");
lock.unlock();
}
}
}

一个 Lock 对象和一个 synchronized 代码块之间的主要不同点是:

  • synchronized 代码块不能够保证进入访问等待的线程的先后顺序。
  • 你不能够传递任何参数给一个 synchronized 代码块的入口。因此,对于synchronized 代码块的访问等待设置超时时间是不可能的事情。
  • synchronized 块必须被完整地包含在单个方法里。而一个 Lock 对象可以把它的 lock() 和 unlock() 方法的调用放在不同的方法里。

读写锁 ReadWriteLock

java.util.concurrent.locks.ReadWriteLock 读写锁是一种先进的线程锁机制。它能够允许多个线程在同一时间对某特定资源进行读取,但同一时间内只能有一个线程对其进行写入。
读写锁的理念在于多个线程能够对一个共享资源进行读取,而不会导致并发问题。并发问题的发生场景在于对一个共享资源的读和写操作的同时进行,或者多个写操作并发进行。

ReadWriteLock 锁规则

一个线程在对受保护资源在读或者写之前对 ReadWriteLock 锁定的规则如下:

ReadWriteLock 代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package com.yh.applet.thread;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.junit.Test;
public class ReadWriteLockTest {
@Test
public void getResult(){
final ReadWriteData a = new ReadWriteData();
Thread t0 = new Thread() {
public void run() {
while (true) {
a.get();
}
}
};
Thread t1 = new Thread() {
public void run() {
while (true) {
a.get();
}
}
};
Thread t2 = new Thread() {
public void run() {
while (true) {
a.get();
}
}
};
Thread t3 = new Thread() {
public void run() {
while (true) {
a.put(new Random().nextInt(10000));
}
}
};
t0.start();
t1.start();
t2.start();
t3.start();
try {
t0.join();
t1.join();
t2.join();
t3.join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
class ReadWriteData {
private Object data = null;// 共享数据,只能有一个线程能写该数据,但可以有多个线程同时读该数据。
ReadWriteLock rwl = new ReentrantReadWriteLock();
public void get() {
rwl.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()
+ " be ready to read data!");
Thread.sleep(1000);
System.out.println(new Date() + Thread.currentThread().getName()
+ "have read data :" + data);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rwl.readLock().unlock();
}
}
public void put(Object data) {
rwl.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()
+ " be ready to write data!");
Thread.sleep(1000);
this.data = data;
System.out.println(new Date() + Thread.currentThread().getName()
+ " have write data: " + data);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rwl.writeLock().unlock();
}
}
}
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Thread-0 be ready to read data!
Thread-2 be ready to read data!
Thread-1 be ready to read data!
Fri Dec 22 11:26:11 CST 2017Thread-0have read data :null
Fri Dec 22 11:26:11 CST 2017Thread-2have read data :null
Fri Dec 22 11:26:11 CST 2017Thread-1have read data :null
Thread-3 be ready to write data!
Fri Dec 22 11:26:12 CST 2017Thread-3 have write data: 2270
Thread-0 be ready to read data!
Thread-2 be ready to read data!
Thread-1 be ready to read data!
Fri Dec 22 11:26:13 CST 2017Thread-2have read data :2270
Fri Dec 22 11:26:13 CST 2017Thread-0have read data :2270
Fri Dec 22 11:26:13 CST 2017Thread-1have read data :2270
Thread-3 be ready to write data!
Fri Dec 22 11:26:14 CST 2017Thread-3 have write data: 5420
Thread-0 be ready to read data!
Thread-2 be ready to read data!
Thread-1 be ready to read data!
Fri Dec 22 11:26:15 CST 2017Thread-2have read data :5420
Fri Dec 22 11:26:15 CST 2017Thread-0have read data :5420
Fri Dec 22 11:26:15 CST 2017Thread-1have read data :5420
Thread-3 be ready to write data!
Fri Dec 22 11:26:16 CST 2017Thread-3 have write data: 5673
Thread-3 be ready to write data!
Fri Dec 22 11:26:17 CST 2017Thread-3 have write data: 759
Thread-3 be ready to write data!
Fri Dec 22 11:26:18 CST 2017Thread-3 have write data: 1592
Thread-3 be ready to write data!
Fri Dec 22 11:26:19 CST 2017Thread-3 have write data: 1582
Thread-3 be ready to write data!

  • 读锁:如果没有任何写操作线程锁定 ReadWriteLock,并且没有任何写操作线程要求一个写锁(但还没有获得该锁)。因此,可以有多个读操作线程对该锁进行锁定。
  • 写锁:如果没有任何读操作或者写操作。因此,在写操作的时候,只能有一个线程对该锁进行锁定。

    原子类型

    上面有过介绍

锁优化

自旋锁

线程执行一个忙循环(自旋);共享数据锁定时间短,为了这段时间去挂起和恢复线程不值得

自适应自旋

自旋时间不固定的自旋锁

锁消除

虚拟机即时编译器在运行时,对一些代码上要求同步,但被检测到不可能存在共享数据竞争的锁进行消除。

锁粗化

一系列的操作都对同一个对象反复加锁和解锁,甚至加锁操作出现在循环体中,性能损耗大;如果虚拟机探测到有这样一串零碎的操作都对同一个对象加锁,会把加锁同步的范围扩展(粗化)到整个操作序列的外部

轻量级锁

JDK1.6之中加入的新型锁机制;在无竞争的情况下使用CAS操作去消除同步使用的互斥量;

偏向锁

JDK1.6之中引入的锁优化;在无竞争的情况下消除同步;

「真诚赞赏,手留余香」