推荐阅读
彻底理解Runnable和Thread的区别_thread runnable-CSDN博客
HashMap为什么线程不安全 - 掘金 (juejin.cn)
协程的概念,为什么要用协程,以及协程的使用_协程的作用,优缺点-CSDN博客
并发编程
并发编程的目的是为了让程序运行得更快,但是,并不是启动更多的线程就能让程序最大限度地并发执行。在进行并发编程时,如果希望通过多线程执行任务让程序运行得更快,会面临非常多的挑战,比如上下文切换的问题、死锁的问题,以及受限于硬件和软件的资源限制问题。
线程间通信
虚假唤醒
this.wait()会释放锁
this.notifyAll()
自旋(while)一下就不会虚假唤醒
Lock lock = new ReentrantLock()
Condition condition = lock.newCondition()
condition.await()
condition.signal()
集合
List
ArrayList
线程不安全
Vector
线程安全
Collections.synchronizedList()
线程安全
CopyOnWriteArrayList()
写时复制技术
Set
HashSet
线程不安全
CopyOnWriteArraySet()
线程安全
Map
HashMap
线程不安全
ConcurrentHashMap
线程安全
并发容器
ConcurrentHashMap
ConcurrentHashMap使用了锁分段技术,就是每一把锁只锁其中一部分数据。
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。
ConcurrentLinkedQueue
CAS实现
线程池
线程池(thread pool) :一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
特点:
- 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
- 提高响应速度:当任务到达时,任务可以不需要等待线程创建就能立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
线程池使用方式
都使用了ThreadPoolExecutor
public class ThreadPoolTest {
public static void main(String[] args) {
//一池五线程
ExecutorService threadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "办理业务");
});
}
threadPool.shutdown();
//一池一线程
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
singleThreadExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "办理业务");
});
}
singleThreadExecutor.shutdown();
//一池可扩容
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
cachedThreadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "办理业务");
});
}
cachedThreadPool.shutdown();
}
}
ThreadPoolExecutor
七个参数
- int corePoolSize 常驻线程数量
- int maximumPoolSize 最大线程数量
- long keepAliveTime, TimeUnit unit 线程存活时间,后面为单位
- BlockingQueue
workQueue 阻塞对列 - ThreadFactory threadFactory 线程工厂
- RejectedExecutionHandler handler 拒绝策略
- 抛出异常
- 调用者运行
- 抛弃等待最久的任务
- 不做处理
自定义线程池
一般使用自定义线程池
多线程编程
实现线程方式:
继承Thread类
实现Runnable接口
使用了静态代理
//优先使用,java单个继承、多个实现
class Threadtest implements Runnable {
//实现run方法
}
new Thread(new Threadtest()).start();
- 实现Callable接口
可以有返回值,可以抛出异常
class Threadtest implements Callable<Object> {
//实现call方法
}
//创建执行服务:
ExecutorService ser = Executors.newFixedThreadPool(1);
//提交执行
Future<Object> r1 = ser.submit(new Threadtest());
//获取结果
Object rs1 = r1.get();
ser.shutdown();
- 线程池
守护线程
用户线程:是系统的工作线程,会完成该程序需要完成的业务操作
守护线程:是一种特殊的线程,为某个用户线程服务,再后台默默完成系统性的服务
默认都是用户线程,守护线程是为其他线程服务的。
new Thread().setDaemon(true);//设置为守护线程 必须在start之前设置
//GC线程就是一个守护线程
一个主线程 一个守护线程 : 主线程结束 用户线程结束
一个主线程 一个用户线程: 主线程的结束,不会使用户线程结束
Future
Future接口(FutureTask实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
FutureTask
集成了:多线程,有返回值,异步,Callable注入,抛出异常,是Future的实现类
public static void main(String[] args) {
FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return "你好";
}
});
new Thread(futureTask).start();
try {
System.out.println("会阻塞吗?");//get方法会阻塞等待结果
System.out.println(futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
try{Timeunit.SECONDs.sleep(t3);} catch (InterruptedException e){e.printstackTrace();}
优点
Future + 线程池异步多线程任务配合,能显著提高程序的执行效率。
缺点
get方法容易导致阻塞,一般建议放在程序最后,当然可以设置超时时间。
轮循容易导致CPU空转
FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(4000);
return "你好";
}
});
new Thread(futureTask).start();
try {
System.out.println("会阻塞吗?");
while (true) {
if (futureTask.isDone()) {
System.out.println(futureTask.get());
break;
} else {
//暂停防止轮循
Thread.sleep(1000);
System.out.println("正在访问结果...");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。
CompletableFuture
针对Future缺点,提出了CompletableFuture
对于真正的异步处理我们希望是可以通过传入回调函数,在Future结束时自动调用该回调函数,这样,我们就不用等待结果。
阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的CPU资源。因此,JDK8设计出CompletableFuture。
CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。
CompletionStage
- CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
- 一个阶段的计算执行可以是一-个Function, Consumer或者Runnable。 比如: stage.thenApply(x -> square()).thenAccept(X -> System.out.print()).thenRun(O -> System.out.println()
- 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发
代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符传参数。
该接口方法
getNow("xxxx")//未完成,立即返回xxxx
complete("xxxx")//是否打断get/join方法,打断get/join就立即返回xxxx
//thenApply
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
System.out.println(1);
return 1;
}, threadPool).thenApply(f -> {
System.out.println(2);
return 2 + f;
});
threadPool.shutdown();
//handle 出错了可以往下走
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
System.out.println(1);
return 1;
}, threadPool).handle((f, e) -> {
System.out.println(1/0);;
System.out.println(2);
return 2 + f;
}).handle((f, e) -> {
System.out.println(3);
return 3 + f;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println(v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println(e.getMessage());
return null;
});
threadPool.shutdown();
//thenAccept 接收任务处理结果,并消费处理,无返回结果
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
return 1;
}, threadPool).thenApply((f) -> {
return 2 + f;
}).thenAccept(r -> {
System.out.println(r);
});
threadPool.shutdown();
//thenRun 方法无返回值,其他方法对它无影响
ExecutorService threadPool = Executors.newFixedThreadPool(3);
System.out.println(CompletableFuture.supplyAsync(() -> {return 1;}, threadPool).thenRun(() -> {}).join());;
threadPool.shutdown();
//applyToEither判断谁快
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(1);
return 1;
}, threadPool);
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println(2);
return 2;
}, threadPool);
CompletableFuture<String> res = cf1.applyToEither(cf2, f -> {
return f + " is winer";
});
System.out.println(res.join());
threadPool.shutdown();
//thenCombine结果合并
CompletableFuture<Integer> res = cf1.thenCombine(cf2, (x, y) -> {
System.out.println("join");
return x + y;
});
System.out.println(res.join());
核心静态方法
推荐用这些方法创建CompletableFuture
runAsync无返回值
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executo)
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName());
}, threadPool);
System.out.println(completableFuture.get());
threadPool.shutdown();
supplyAsync有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)//没有指定线程池,用默认
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
return "hello";
}, threadPool);
System.out.println(completableFuture.get());
threadPool.shutdown();
从Java8开始引入了CompletableFuture,它是Future的功能增强版,减少阻塞和轮询可以传入回调对象,
当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
//不用使用get阻塞接收
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "come in");
//System.out.println(1/0);
}, threadPool).whenComplete((r, e) -> {
//自动监听任务是否完成任务
if (e == null) {
System.out.println("get result: " + r);
}
}).exceptionally(e -> {
//任务出现异常时执行
e.printStackTrace();
System.out.println("error: " + e.getCause() + "\t" + e.getMessage());
return null;
});
threadPool.shutdown();
主线程结束时,CompletableFuture默认使用的线程池会立刻关闭,建议使用自定义线程池,并手动关闭。
Lambda表达式+Stream流式调用+Chain链式调用+Java8函数式接口
函数式接口名称 | 方法名称 | 参数 | 返回值 |
---|---|---|---|
Runnable | run | 无参数 | 无返回值 |
Function | apply | 1个参数 | 有返回值 |
Consume | accept | 1个参数 | 无返回值 |
Supplier | get | 没有参数 | 有返回值 |
BiConsumer | accept | 2个参数 | 无返回值 |
join()与get()类似,只是前者不会再编译期间检查异常,运行是报错再说
锁
悲观锁
认为自己在使用数据的时候一定有别的线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被别的线程修改,synchronized关键字和Lock的实现类都是悲观锁,适应写操作频繁的场景。
乐观锁
认为自己在使用数据时不会有别的线程修改数据或资源,所以不会添加锁。
在Java中是通过使用无锁编程来实现,只是在更新数据的时候去判断,之前有没有别的线程更新了这个数据。
如果这个数据没有被更新,当前线程将自己修改的数据成功写入。
如果这个数据已经被其它线程更新,则根据不同的实现执行不同的操作,比如放弃修改、重试抢锁等等
判断规则
- 版本号机制
- CAS算法
适合读频繁的场景
synchronized
synchronized使用的三种方式:
作用于实例方法,当前实例加锁,进入同步代码前要获得当前实例的锁。
调用指令将会检查方法的ACC_SYNCHRONIZED访问标志是否被设置。如果设置了,执行线程会将先持有monitor锁,然后再执行方法,最后在方法完成(无论是正常完成还是非正常完成)时释放monitor
作用于代码块,对括号里配置的对象加锁。
synchronized同步代码块实现使用的是monitorenter和monitorexit指令,来保证持有锁,释放锁。会而外多加一个monitorexit,为了出现异常时也能释放锁。
作用于静态方法,当前类加锁,进去同步代码前要获得当前类对象的锁。
ACC_STATIC,ACC_SYNCHRONIZED访问标志区分该方法是否静态同步方法
为什么任何一个对象都能成为一个锁呢?
objectMonitor.java -> objectMonitor.cpp -> objectMonitor.hpp
因为monitor采用objectMonitor实现的
什么是管程monitor
管程(英语: Monitors, 也称为监视器)是一种程序结构,结构内的多个子程序(对象或模块)形成的多个工作线程互斥访问共享资源。
这些共享资源一般是硬件设备或一群变量。对共享变量能够进行的所有操作集中在一个模块中。(把信号量及其操作原语“封装”在一个对象内部)
管程实现了在一个时间点, 最多只有一个线程在执行管程的某个子程序。管程提供了一种机制,管程可以看做一个软件模块,它是将共享的变量和对于这些共享变量的操作封装起来,形成一个具有一定接口的功能模块,进程可以调用管程来实现进程级别的并发控制。
class Phone {
public synchronized void sendEmail() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("----sendEmail");
}
public synchronized void sendSMS() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("-----sendSMS");
}
public void hello() {
System.out.println("-------he11o");
}
}
public class JucTest {
public static void main(String[] args) {
Phone p1 = new Phone();
Phone p2 = new Phone();
new Thread(() -> {
p1.sendEmail();
}).start();
new Thread(() -> {
p1.sendSMS();
}).start();
}
}
先Email,后SMS
一个对象里面如果有多个synchronized方法,某一个时刻内, 只要一个线程去调用其中的一个synchronized方法了,其它的线程都只能等待,换句话说,某一个时刻内, 只能有唯一的一 个线程去访间这些synchronized方法。
锁的是当前对象this,被锁定后,其它的线程都不能进入到当前对象的其它的synchronized方法。
public class JucTest {
public static void main(String[] args) {
Phone p1 = new Phone();
Phone p2 = new Phone();
new Thread(() -> {
p1.sendEmail();
}).start();
new Thread(() -> {
p1.hello();
}).start();
}
}
先hello,后Email
普通方法与同步锁无关
public class JucTest {
public static void main(String[] args) {
Phone p1 = new Phone();
Phone p2 = new Phone();
new Thread(() -> {
p1.sendEmail();
}).start();
new Thread(() -> {
p2.sendSMS();
}).start();
}
}
先SMS,后Email
不是同一把锁
class Phone {
public static synchronized void sendEmail() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("----sendEmail");
}
public static synchronized void sendSMS() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("-----sendSMS");
}
public void hello() {
System.out.println("-------he11o");
}
}
public class JucTest {
public static void main(String[] args) {
Phone p1 = new Phone();
Phone p2 = new Phone();
new Thread(() -> {
p1.sendEmail();
}).start();
new Thread(() -> {
p1.sendSMS();
}).start();
}
}
先Email,后SMS
public class JucTest {
public static void main(String[] args) {
Phone p1 = new Phone();
Phone p2 = new Phone();
new Thread(() -> {
p1.sendEmail();
}).start();
new Thread(() -> {
p2.sendSMS();
}).start();
}
}
先Email,后SMS
上述两种情况,加的是类锁
对F普通同步方法,锁的是当前实例对象,通常指this,具体的一部部手机,所有的普通同步方法用的都是同把锁->实例对象本身,对于静态同步方法, 锁的是当前类的Class对象, 如Phone.class唯的一 个模板对于同步方法块,锁的是synchronized括号内的对象。
class Phone {
public static synchronized void sendEmail() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("----sendEmail");
}
public synchronized void sendSMS() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("-----sendSMS");
}
public void hello() {
System.out.println("-------he11o");
}
}
public class JucTest {
public static void main(String[] args) {
Phone p1 = new Phone();
Phone p2 = new Phone();
new Thread(() -> {
p1.sendEmail();
}).start();
new Thread(() -> {
p1.sendSMS();
}).start();
}
}
先SMS,后Email
前者为类锁,后者为对象锁
public class JucTest {
public static void main(String[] args) {
Phone p1 = new Phone();
Phone p2 = new Phone();
new Thread(() -> {
p1.sendEmail();
}).start();
new Thread(() -> {
p2.sendSMS();
}).start();
}
}
先SMS,后Email
上述两种情况:当个线程试图访问同步代码时 它直先必须得到锁,正常退出或抛出异常时必须释放锁。
所有的普通同步方法用的都是同一把锁一实例对象本身,就是new出来的具体实例对象本身,本类this也就是说如果一个实例对象的普通同步方法获取锁后,该实例对象的其他普通同步方法必须等待获取锁的方法释放锁后才能获取锁。
所有的静态同步方法用的也是同一把锁一类对象本身,就是我们说过的唯一模板Class
具体实例对象this和唯一模板class, 这两把锁是两个不同的对象,所以静态同步方法与普通同步方法之间是不会有竞态条件的但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁。
公平锁
是指多个线程按照申请锁的顺序来获取锁,这里类似排队买票,先来的人先买后来的人在队尾排着,这是公平的。
Lock lock = new ReentrantLock(true)
非公平锁
是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁,在高并
发环境下,有可能造成优先级翻转或者饥饿的状态(某个线程一直得不到锁)。
Lock lock = new ReentrantLock(false)
Lock lock = new ReentrantLock()//默认非公平锁
恢复挂起的线程到真正锁的获取还是有时间差的,从开发人员来看这个时间微乎其微,但是从CPU的角度来看,这个时间差存在的还是很明显的。所以非公平锁能更充分的利用CPU的时间片,尽量减少CPU空闲状态时间。
使用多线程很重要的考量点是线程切换的开销,当采用非公平锁时,当1个线程请求锁获取同步状态,然后释放同步状态,所以刚释放锁的线程在此刻再次获取同步状态的概率就变得非常大,所以就减少了线程的开销。
非公平锁线程切换开销大。
可重入锁又名递归锁
ReentrantLock 和 synchronized都是可重入锁
是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提,锁对象得是同一个对 象),不会因为之前已经获取过还没释放而阻塞。
隐式锁(synchronized)
默认就是可重入锁
在一个synchronized修饰的方法或代码块的内部调用本类的其他synchronized修饰的方法或代码块时,是永远可以得到锁的
public class JucTest {
public static void main(String[] args) {
reEntryM1();
}
private static void reEntryM1() {
final Object object = new Object();
new Thread(() -> {
synchronized (object){
System. out . println(Thread.currentThread(). getName()+"\t ---外层调用");
synchronized (object){
System. out . println(Thread. currentThread(). getName()+"\t ----中层调用");
synchronized (object){
System. out . println( Thread.currentThread(). getName()+"\t ----内层调用");
}
}
}
}, "t1"). start();
}
}
public class JucTest {
public static void main(String[] args) {
new Thread(() -> {
new JucTest().m1();
}).start();
}
public synchronized void m1() {
//指的是可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,并且不发生死锁,这样的锁就叫做可重入
System.out.println(Thread.currentThread().getName()+"\t ----come in");
m2();
System.out.println(Thread.currentThread().getName()+"\t ----end m1");
}
public synchronized void m2(){
System.out.println(Thread.currentThread().getName()+"\t ----come in");
m3();
}
public synchronized void m3() {
System.out.println(Thread.currentThread(). getName()+"\t ----come in");
}
}
显示锁(ReentrantLock)
显示指定lock() unlock()
加锁几次就要解锁几次
public class JucTest {
static Lock lock = new ReentrantLock();
public static void main(String[] args) {
new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"\t ----come in外层调用");
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"\t ----come in内层调用");
}finally {
lock.unlock();
}
}finally {
lock.unlock();
}
}, "t1").start();
}
}
死锁
死锁是指两个或两个以上的线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力干涉那它们都将无法推进下去,如果系统资源充足,进程的资源请求都能够得到满足,死锁出现的可能性就很低,否则就会因争夺有限的资源而陷入死锁。
//死锁代码
public class JucTest {
public static void main(String[] args) {
final Object objectA = new Object();
final Object objectB = new Object();
new Thread(() -> {
synchronized (objectA) {
System.out.println(Thread.currentThread().getName() + "\t自己持有A锁,希望获得B锁");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (objectB) {
System.out.println(Thread.currentThread().getName() + "\t成功获得B锁");
}
}
}, "A").start();
new Thread(() -> {
synchronized (objectB) {
System.out.println(Thread.currentThread().getName() + "\t自己持有B锁,希望获得A锁");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (objectA) {
System.out.println(Thread.currentThread().getName() + "\t成功获得A锁");
}
}
}, "B").start();
}
}
排查死锁
1.纯命令
jps -l
jstack 进程编号
2.jconsole图形化界面
中断机制
协商中断
首先一个线程不应该由其他线程来强制中断或停止,而是应该由线程自己自行停止,自己来决定自己的命运。
所以,Thread.stop,Thread.suspend,Thread.resume都已经被废弃了。
其次在Java中没有办法立即停止一条线程,然而停止线程却显得尤为重要,如取消一个耗时操作。
因此,Java提供了一种用于停止线程的协商机制—–中断,也即中断标识协商机制。
中断只是一种协作协商机制,Java没 有给中断增加任何语法,中断的过程完全需要程序员自己实现。
若要中断一个线程,你需要手动调用该线程的interrupt方法,该方法也仅仅是将线程对象的中断标识设成true
接着你需要自己写代码不断地检测当前线程的标识位,如果为true,表示别的线程请求这条线程中断,此时究竟该做什么需要你自己写代码实现。
每个线程对象中都有一个中断标识位,用于表示线程是否被中断;该标识位为true表示中断,为false表示未中断;
通过调用线程对象的interrupt方法将该线程的标识位设为true;可以在别的线程中调用,也可以在自己的线程中调用。
如何实现线程中断停止
//1.使用volatile共享变量
public class JucTest {
static volatile boolean isStop = false;
public static void main(String[] args) {
new Thread(() -> {
while (true) {
if (isStop) {
System.out.println(Thread.currentThread().getName() + "\t" + "thread stop");
break;
}
System.out.println("t1 -------hello volatile");
}
}, "t1").start();
try {
TimeUnit.MICROSECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
isStop = true;
}, "t2").start();
}
}
//使用原子类
public class JucTest {
static AtomicBoolean atomicBoolean = new AtomicBoolean(false);
public static void main(String[] args) {
new Thread(() -> {
while (true) {
if (atomicBoolean.get()) {
System.out.println(Thread.currentThread().getName() + "\t" + "thread stop");
break;
}
System.out.println("t1 -------hello volatile");
}
}, "t1").start();
try {
TimeUnit.MICROSECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
atomicBoolean.set(true);
}, "t2").start();
}
}
通过线程自带的中断API实列方法实现
Thread.interrupted()
返回当前线程的中断状态,测试当前线程是否已被中断
将当前线程的中断状态清零并重新设为false,清除线程的中断状态Thread.interrupt()
没有返回值
仅仅是设置线程的中断状态为true,发起一个协商而不会立刻停止线程
①如果线程处于正常活动状态,那么会将该线程的中断标志设置为true,仅此而已。被设置中断标志的线程将继续正常运行,不受影响。所以,interrupt() 并不能真正的中断线程,需要被调用的线程自己进行配合才行。
②如果线程处于被阻塞状态( 例如处于sleep, wait, join等状态),在别的线程中调用当前线程对象的interrupt方法,那么线程将立即退出被阻塞状态,中断状态将被清除,并抛出一个InterruptedException异常。(可能程序会无限循环,解决方法须在异常catch处再调用interrupt方法)Thread.isInterrupted()
判断当前线程是否被中断(通过检查中断标志位),不清除线程的中断状态。
//通过线程自带的中断API实列方法实现
public class JucTest {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
while (true) {
if (Thread.currentThread().isInterrupted()) {
System.out.println(Thread.currentThread().getName() + "\t" + "thread stop");
break;
}
System.out.println("t1 -------hello volatile");
}
}, "t1");
t1.start();
try {
TimeUnit.MICROSECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
t1.interrupt();
}, "t2").start();
}
}
LockSupport
线程的等待和唤醒
- 使用Object中的wait()方法让线程等待,使用Object中的notify()方法唤醒线程,使用必须持有锁
public class JucTest {
public static void main(String[] args) {
Object obj = new Object();
new Thread(() -> {
synchronized (obj) {
System.out.println(Thread.currentThread().getName() + "\t-------come in");
try {
obj.wait();//释放了锁???
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t-------waked");
}
}, "t1").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
synchronized (obj) {
obj.notify();
System.out.println(Thread.currentThread().getName() + "\t-------notify");
}
}, "t2").start();
}
}
//wait 须在 notify前面
- 使用JUC包中Condition的await()方法让线程等待,使用signal()方法唤醒线程,使用必须持有锁
public class JucTest {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(() -> {
lock.lock();
System.out.println(Thread.currentThread().getName() + "\t-------come in");
try {
condition.await();
System.out.println(Thread.currentThread().getName() + "\t-------waked");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "t1").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
lock.lock();
condition.signal();
System.out.println(Thread.currentThread().getName() + "\t-------notify");
lock.unlock();
}, "t2").start();
}
}
//先await() 后signal
- LockSupport类可以park阻塞当前线程以及unpark唤醒指定被阻塞的线程,
//park unpark 无顺序要求。因为park发了许可证,线程没使用,会保留,许可证不会积累,最多只有一个
public class JucTest {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t-------come in");
LockSupport.park();
System.out.println(Thread.currentThread().getName() + "\t-------waked");
}, "t1");
t1.start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
LockSupport.unpark(t1);
System.out.println(Thread.currentThread().getName() + "\t-------notify");
}, "t2").start();
}
}
JAVA内存模型(JMM)
CPU的运行并不是直接操作内存而是先把内存里边的数据读到缓存,而内存的读和写操作的时候就会造成不一致的问题。
JVM规范中试图定义一种Java内存模型(java Memory Model,简称JMM)来屏蔽掉各种硬件和操作系统的内存访问差异,以实现让Java程序在各种平台下都能达到一致的内存访问效果。
JMM(Java内存模型Java Memory Model,简称JMM)本身是一种抽象的概念并不真实存在它仅仅描述的是一组约定或规范,通过这组规范定义了程序中(尤其是多线程)各个变量的读写访问方式并决定一个线程对共享变量的写入何时以及如何变成对另一个线程可见,关键技术点都是围绕多线程的原子性、可见性和有序性展开的。
作用:
通过JMM来实现线程和主内存之间的抽象关系。
屏蔽各个硬件平台和操作系统的内存访问差异以实现让Java程序在各种平台下都能达到一致的内存访问效果。
三大特性
可见性
是指当一个线程修改了某一个共享变量的值,其他线程是否能够立即知道该变更,JMM规定了所有的变量都存储在主内存中。
原子性
指一个操作是不可打断的,即多线程环境下,操作不能被其他线程干扰。
有序性
指令重排
JVM能根据处理器特性(CPU多级缓存系统、多核处理器等)适当的对机器指令进行重排序,使机器指令能更符合CPU的执行特性,最大限度的发挥机器性能。但是,指令重排可以保证串行语义一致,但没有义务保证多线程间的语义也一致(即可能产生”脏读”), 简单说,两行以上不相干的代码在执行的时候有可能先执行的不是第一条, 不见得是从上到下顺序执行,执行顺序会被优化。
多线程对变量的读写过程
(读取赋值等)必须在工作内存中进行,首先要将变量从主内存拷贝到 线程自己的工作内存空间,然后对变量进行操作,操作完成后再将变量写回主内存,不能直接操作主内存中的变量,各个线程中的工作内存中存储着主内存中的变量副本拷贝,因此不同的线程间无法访问对方的工作内存,线程间的通信(传值)必须通过主内存来完成,其简要访问过程如下图:
先行发生原则
包含可见性和有序性的约束
如果Java内存模型中所有的有序性都仅靠volatile和synchronized来完成,那么有很多操作都将会变得非常罗嗦,但是我们在编写Java并发代码的时候并没有察觉到这一点。
我们没有时时、处处、次次,添加volatile和synchronized来完成程序,这是因为Java语言中JMM原则下有一个“先行发生”(Happens-Before)的原则限制和规矩,给你立好了规矩!这个原则非常重要:
它是判断数据是否存在竞争,线程是否安全的非常有用的手段。依赖这个原则,我们可以通过几条简单规则一揽子解决并发环境下两个操作之间是否可能存在冲突的所有问题,而不需要陷入Java内存模型苦涩难懂的底层编译原理之中。
**happens-before **总原则
在JMM中,如果一个操作执行的结果需要对另一个操作可见性或者代码重排序,那么这两个操作之间必须存在happens-before(先行发生)原则。
如果一个操作happens-before另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前。
两个操作之间存在happens-before关系,并不意味着一定要按照happens-before原则制定的顺序来执行。如果重排序之后的执行结果与按照happens-before关系来执行的结果一致,那么这种重排序并不非法。
八条
次序规则
一个线程内,按照代码顺序,写在前面的操作先行发生于写在后面的操作;
前一个操作的结果可以被后续的操作获取。讲白点就是前面一个操作把变量X赋值为1,那后面一个操作肯定能知道X已经变成了1。
锁定规则
一个unLock操作先行发生于后面 (这里的“后面”是指时间上的先后) 对同一个锁的Iock操作。
直白来讲 就是同一个锁只能有一个类获取 , A获取锁,B想要获取,只能等待A unLock该锁
可重入锁是同一线程的递归锁 不同线程 只能等待锁释放
变量规则
对一个volatile变量的写操作先行发生于后面对这个变量的读操作,前面的写对后面的读是可见的,这里的”后面”同样是指时间上的先后。
传递规则
如果操作A先行发生于操作B,而操作B又先行发生于操作C,则可以得出操作A先行发生于操作C;
线程启动规则(Thread Start Rule)
Thread对象的start()方法先行发生于此线程的每一个动作
线程中断规则(Thread Interruption Rule)
对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生
可以通过Thread.interrupted()检测到是否发生中断
也就是说你要先调用interrupt()方法设置过中断标志位,我才能检测到中断发送。线程终止规则(Thread Termination Rule)
线程中的所有操作都先行发生于对此线程的终止检测,我们可以通过isAlive()等手段检测线程是否已经终止执行。
对象终结规则(Finalizer Rule)
一个对象的初始化完成(构造函数执行结束)先行发生于它的finalize()方法的开始
对象生成之前,是不能调用finalize()的
JMM(Java Memory Model)和volatile是Java语言中两个不同的概念,但它们之间有一定的关系。
JMM是一种内存模型,用于定义多线程环境下数据读写的规则。它规定了线程之间共享变量的可见性和顺序性,以确保在并发访问时不会出现数据不一致的问题。
Volatile变量和JMM之间的关系在于,volatile变量的可见性保证是基于JMM的。在JMM中,线程之间的共享变量存在于主内存中,每个线程都有一块私有的本地内存。当一个线程修改了共享变量的值后,JMM会要求将该变量立即刷新回主内存中,以便其他线程可以立即看到最新的值。而对于volatile变量,Java语言规定当一个线程修改了它的值后,JMM会立即将该变量的值刷新回主内存中,并且会强制其他线程重新从主内存中读取该变量的值,以保证可见性。
因此,可以说volatile变量的可见性保证是基于JMM的,它们之间的关系在于共同维护多线程环境下的数据一致性和可见性。
volatile(可见和禁重排)
满足:可见性,有序性(禁重排)
可见性:保证不同线程对某个变量完成操作后结果及时可见,即该共享变量一旦改变所有线程立即可见。
当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量值立即刷新回主内存中,并及时发出通知,大家可以去主内存拿最新版,前面的修改对后面所有线程可见。
当读一个volatile变量时,JMM会把该线程对应的本地内存设置为无效,重新回到主内存中读取最新共享变量所以volatile的写内存语义是直接刷新到主内存中,读的内存语义是直接从主内存中读取。
怎么保证可见性,有序性:内存屏障
内存屏障
内存屏障(也称内存栅栏,屏障指令等,是一类同步屏障指令,是CPU或编译器在对内存随机访问的操作中的一个同步点,使得此点之前的所有读写操作都执行后才可以开始执行此点之后的操作),避免代码重排序。内存屏障其实就是一种JVM指令,Java内存模型的重排规则会要求Java编译器在生成JVM指令时插入特定的内存屏障指令,通过这些内存屏障指令,volatile实现了Java内存模型中的可见性和有序性(禁重排),但volatile无法保证原子性。
内存屏障之前的所有写操作都要回写到主内存,
内存屏障之后的所有读操作都能获得内存屏障之前的所有写操作的最新结果(实现了可见性)。
写屏障( Store Memory Barrier) :告诉处理器在写屏障之前将所有存储在缓存(store bufferes)中的数据同步到主内存。也就是说当看到Store屏障指令,就必须把该指令之前所有写入指令执行完毕才能继续往下执行。在写指令之后插入写屏障,强制把写缓冲区的数据刷回到主内存中。
读屏障(Load Memory Barrier):处理器在读屏障之后的读操作,都在读屏障之后执行。也就是说在Load屏障指令之后就能够保证后面的读取数据指令一定能够读取到最新的数据。即在读指令之前插入读屏障,让工作内存或CPU高速缓存当中的缓存数据失效,重新回到主内存中获取最新数据。
四大屏障
屏障类型 | 指令示例 | 说明 |
---|---|---|
LoadLoad | Load1; LoadLoad; Load2 | 保证load1的读取操作在load2及后续读取操作之前执行 |
StoreStore | Store1; StoreStore; Store2 | 在store2及其后的写操作执行前, 保证store1的写操作已刷新到主内存 |
LoadStore | Load1; LoadStore; Store2 | 在stroe2及其后的写操作执行前, 保证load1的读操作已读取结束 |
StoreLoad | Store1; StoreLoad; Load2 | 保证store1的写操作已刷新到主内存之 后,load2及 其后的读操作才能执行 |
public class JucTest {
//static boolean flag = true;
static volatile boolean flag = true;
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t-------come in");
while (flag);
System.out.println(Thread.currentThread().getName() + "\t-------out");
}, "t1");
t1.start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = false;
System.out.println("修改完成");
}
}
1.主线程修改了flag之后没有将其刷新到主内存,所以t1线程看不到。
2.主线程将flag刷新到了主内存,但是t1一直读取的是自己工作内存中flag的值, 没有去主内存中更新获取flag最新的值。
上述原因需要volatile解决
volatile没有原子性: 可以被多线程打扰
对于volatile变量具备可见性,JVM只是保证从主内存加载到线程工作内存的值是最新的,也仅是数据加载时是最新的。但是多线程环境下,”数据计算”和”数据赋值”操作可能多次出现,若数据在加载之后,若主内存volatile修饰变量发生修改之后,线程工作内存中的操作将会作废去读主内存最新值,操作出现写丢失问题。即各线程私有内存和主内存公共内存中变量不同步,进而导致数据不一致。由此可见volatile解决的是变量读时的可见性问题,但无法保证原子性,对于多线程修改主内存共享变量的场景必须使用加锁同步。
如:i++
volatile可以禁重排
数据依赖性:若两个操作访问同一变量,且这两个操作中有一个为写操作,此时两操作间就存在数据依赖性。
不存在数据依赖关系,可以重排序。
volatile使用场景
单一赋值
布尔状态标志,判断业务是否结束
开销较低的读,写策略
DCL双端锁的发布
//单列模式
class SafeDoubleCheckSingleton {
//private static SafeDoubleCheckSingleton singleton;
private volatile static SafeDoubleCheckSingleton singleton;//解决下面隐患
private SafeDoubleCheckSingleton(){}
//双重锁设计
public static SafeDoubleCheckSingleton getInstance() {
if (singleton == null) {
synchronized (SafeDoubleCheckSingleton.class) {
if (singleton == null) {
//隐患:多线程环境下,由于重排序,该对象可能还未完成初始化就被其他线程读取
//1.分配内存
//2.初始化对象
//3.指向内存
singleton = new SafeDoubleCheckSingleton();
}
}
}
return singleton;
}
}
CAS
在多线程环境下可以,使用原子类保证线程安全,无需加锁保证
compare and swap的缩写,中文翻译成比较并交换,实现并发算法时常用到的一种技术。
它包含三个操作数一一内存位置、预期原值及更新值。
执行CAS操作的时候,将内存位置的值与预期原值比较:(类似乐观锁)
如果相匹配,那么处理器会自动将该位置值更新为新值.
如果不匹配,处理器不做任何操作或者重来当它重来重试的这种行为成为–自旋!!
多个线程同时执行CAS操作只有一个会成功
CAS是JDK提供的非阻塞原子性操作,它通过硬件保证了比较更新的原子性。
它是非阻塞的且自身具有原子性,也就是说这玩意效率更高且通过硬件保证,说明这玩意更可靠。
CAS是一条CPU的原子指令(cmpxchg指令),不会造成所谓的数据不一致问题,Unsafe提供的
CAS方法(如compareAndSwapXXX)底层实现即为CPU指令cmpxchg。
执行cmpxchg指令的时候,会判断当前系统是否为多核系统,如果是就给总线加锁,只有一个线程会对总线加锁成功,加锁成功之后会执行cas操作,也就是说CAS的原子性实际上是CPU实现独占的,比起用synchronized重量级锁,这里的排他时间要短很多,所以在多线程情况下性能会比较好。
1 Unsafe是CAS的核心类,由于Java方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据。Unsafe类存在于sun.misc包中,其内部方法操作可以像C的指针一样直接操作内存,因为Java中CAS操作的执行依赖于Unsafe类的方法。
注意Jnsafe类中的历所有方法都是native修饰的, 也就是说Jnsafe类中的方法都直接湖用操作系统底层资源热相应任务AtomicInteger类主要利用CAS(compare and swap)+volatile和native方法来保证原子操作,从而避免synchronized的高开销,执行效率大为提升。
原子引用
@AllArgsConstructor
@Data
class User {
String username;
int age;
}
public class JucTest {
public static void main(String[] args) {
AtomicReference<User> atomicReference = new AtomicReference<>();
User tom = new User("tom", 22);
User cat = new User("cat", 22);
atomicReference.set(tom);
System.out.println(atomicReference.compareAndSet(tom, cat) + "\t" + atomicReference.get().toString());
System.out.println(atomicReference.compareAndSet(tom, cat) + "\t" + atomicReference.get().toString());
}
}
自旋锁
CAS是实现自旋锁的基础,CAS 利用CPU指令保证了操作的原子性,以达到锁的效果,至于自旋呢,看字面意思也很明白,自己旋转。是指尝试获取锁的线程不会立即阻寒,而是采用循环的方式去尝试获取锁,当线程发现锁被占用时,会不断循环判断锁的状态,直到获取。这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU。
//实现自旋锁
public class JucTest {
AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void lock() {
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "\t" + "come in");
while (!atomicReference.compareAndSet(null, thread)) {
}
}
public void unLock() {
Thread thread = Thread.currentThread();
atomicReference.compareAndSet(thread, null);
System.out.println(Thread.currentThread().getName() + "\t" + "task over, unLock");
}
public static void main(String[] args) {
JucTest jucTest = new JucTest();
new Thread(() -> {
jucTest.lock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
jucTest.unLock();
}, "A").start();
//保证A先于B
try {
TimeUnit.MICROSECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
jucTest.lock();
jucTest.unLock();
}, "B").start();
}
}
缺点
- 空轮询,CPU消耗大 这是自旋自带的缺点
- ABA问题
AtomicStampedReference
//该类可以添加版本号 避免ABA问题
@AllArgsConstructor
@Data
class Book {
private int id;
private String bookName;
}
public class JucTest {
public static void main(String[] args) {
Book javaBook = new Book(1, "javaBook");
AtomicStampedReference<Book> stampedReference = new AtomicStampedReference<>(javaBook, 1);
System.out.println(stampedReference.getReference() + "\t" + stampedReference.getStamp());
Book mysqlBook = new Book(2, "mysqlBook");
boolean b = stampedReference.compareAndSet(
javaBook,
mysqlBook,
stampedReference.getStamp(),
stampedReference.getStamp() + 1);
System.out.println(b + "\t" + stampedReference.getReference() + "\t" + stampedReference.getStamp());
b = stampedReference.compareAndSet(
mysqlBook,
javaBook,
stampedReference.getStamp(),
stampedReference.getStamp() + 1);
System.out.println(b + "\t" + stampedReference.getReference() + "\t" + stampedReference.getStamp());
}
}
原子类
AtomicInteger
CountDownLatch用法
countDown()
await()
AtomicIntegerArray
引用类型原子类
AtomicStampedReference,version号,+1;
AtomicMarkableReference,一次,解决一次性问题(动没动过)
对象的属性修改原子类
使用目的:以一种线程安全的方式操作非线程安全对象内的某些字段
使用要求:更新的对象属性必须使用public volatile修饰符。因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。
AtomicIntegerFieldUpdater
class BankAccount {
String bankName = "CCB";
public volatile int money = 0;
// public void add() {
// money++;
// }
AtomicIntegerFieldUpdater<BankAccount> fieldUpdater =
AtomicIntegerFieldUpdater.newUpdater(BankAccount.class, "money");
public void transMoney() {
fieldUpdater.getAndIncrement(this);
}
}
public class JucTest {
public static void main(String[] args) {
BankAccount bankAccount = new BankAccount();
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
for (int j = 0; j < 1000; j++) {
bankAccount.transMoney();
}
}finally {
countDownLatch.countDown();
}
}).start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(bankAccount.money);
}
}
AtomicReferenceFieldUpdater
class MyCar {
public volatile Boolean isInit = Boolean.FALSE;
AtomicReferenceFieldUpdater<MyCar, Boolean> referenceFieldUpdater =
AtomicReferenceFieldUpdater.newUpdater(MyCar.class, Boolean.class, "isInit");
public void init() {
if (referenceFieldUpdater.compareAndSet(this, Boolean.FALSE, Boolean.TRUE)) {
System.out.println("success init");
} else {
System.out.println("had been inited by other thread");
}
}
}
public class JucTest {
public static void main(String[] args) {
MyCar myCar = new MyCar();
for (int i = 0; i < 5; i++) {
new Thread(() -> {
myCar.init();
}).start();
}
}
}
原子操作增强类
LongAdder() 只能计算加法,且从0开始,比AtomicLong(底层为CAS,空轮询造成性能降低,但能保证精度)性能好。
LongAccumulator() 功能更强大
LongAdder底层原理:
Stripend64类
transient volatile cell[] cells;
transient volatile long base;
为什么快:
LongAdder的基本思路就是分散热点,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
sum()会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong个value的更新压力分散到多个value中去,从而降级更新热点。(sum不能保证精度,只能保证最终一致性)
LongAdder源码分析:
LongAdder在无竞争的情况,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时则是采用化整为零、分散热点的做法,用空间换时间用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据nash值映射到这个数组cellsi的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和base都加起来作为最终结果
public void increment() {
add(1L);
}
/*
as表示cell[]引用
b表示获取的base值
v表示期望值,
m表示cells数组的长度
a表示当前线程命中的cell单元格
*/
/*
1.最初无竞争时只更新base;
2.如果更新base失败后,首次新建一个Cell数组;
3.当多个线程竞争同一个Cell比较激烈时,可能就要扩容cell数组
*/
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
/*
先判断cell[]是否存在,并发量小的初始值为null,所以前面的为false
之后对long base进行CAS(比较并交换)操作,操作成功直接逃过该方法,并发量过大时,交换失败进入if循环,开始创建cell[]
*/
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
/*
as == null、(m = as.length - 1) < 0 判断cell[]是否存在
as[getProbe() & m] 类似Hash寻找对应槽位
(a = as[getProbe() & m]) == null 寻找cell数组对应下标是否有值 如果没有值,就会进入到if语句 执行longAccumulate(x, null, uncontended);
!(uncontended = a.cas(v = a.value, v + x)) 对cell数组对应数组下标的值进行CAS操作
*/
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
/*
long x 需要增加的值,一般默认都是 1
LongBinaryOperator fn 默认传递的是null
wasUncontended竞争标识,如果是false则代表有竞争。只有cells初始化之后,并且当前线程CAS竞争修改失败,才会是false
*/
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
// 存储线程probe值
int h;
// 如果getProbe()方法返回0,说明随机数未初始化
if ((h = getProbe()) == 0) {
//使用ThreadLocalRandom.为当前线程重新计算一个hash值,强制初始化
ThreadLocalRandom.current(); // 强制进行初始化
// 重新获取probe值,hash值被重置就好比一个全新的线程一样,所以设置了wasUncontended竞争状态true。
h = getProbe();
//重新计算了当前线程hash后认为此次不算是一次竞争,都未初始化,肯定还不存在竞争激wasuncontended竞争状态true
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
/*
ce11s已经被初始化了
Cel数组不再为空且可能存在Cell数组扩容
*/
/*
(as = cells) != null、(n = as.length) > 0 cell数组存在
*/
if ((as = cells) != null && (n = as.length) > 0) {
// 进行位于运算,如果为空,说明cells数组中存在还未初始化的坑位,开始初始化
if ((a = as[(n - 1) & h]) == null) {
// 两次的cellsBusy == 0进行双端检索,防止多线程下高并发导致数据不一致
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // 对cell进行初始化
if (cellsBusy == 0 && casCellsBusy()) { // casCellsBusy() 判断获取锁是否成功
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null && // 数组存在
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) { // rs[j = (m - 1) & h] == null cells数组某个下标为空
// 开始进行初始化
rs[j] = r;
created = true;
}
} finally {
// 解除锁占用
cellsBusy = 0;
}
if (created)
break;
continue; // 重新开始循环
}
}
collide = false;
}
// 进行位于运算,如果为空 cells数组对应的坑位已经实例化
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 运行CAS将新的数值写进去
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 如果n大于等于cpu的个数,或者
else if (n >= NCPU || cells != as)
collide = false; // 就停止扩容
//
else if (!collide)
collide = true;
// 开始进行cells数组扩容
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1]; // 将cell扩大到原先的2倍
for (int i = 0; i < n; ++i) // 将原cells数组中的数据拷贝到当前的数组当中去
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
/*
cellsBusy:初始化cells或者扩容cells需要获取锁,0:表示无锁状态 1:表示其他线程已经持有了锁
casCellsBusy():通过CAS操作修改cellsBusy的值,CAS成功代表获取锁,返回true
*/
/*
ce1ls没有加锁且没有初始化,则尝试对它进行加锁,并初始化ce11s数组
cellsBusy == 0 初始化cells需要获取锁,0:表示无锁状态
cells == as
casCellsBusy() 成功获取锁
*/
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // 初始化数组
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x); // 为数组对应下标进行赋值
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
/*
cells正在进行初始化,则尝试直接在基数base上进行累加操作
多个线程尝试CAS修改失败的线程会走到这个分支
*/
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
SUM求和
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
ThreadLocal
记录线程独有的数据,不需要写回主线程内存
实现每一个线程都有自己专属的本地变量副本(自己用自己的变量不麻烦别人,不和其他人共享,人人有份,人各一份),主要解决了让每个线程绑定自己的值,通过使用get()和set()方法,获取默认值或将其值更改为当前线程所存的副本的值从而避免了线程安全问题,比如我们之前讲解的8锁案例,资源类是使用同一部手机,多个线程抢夺同一部手机使用,假如人手一份是不是天下太平?
必须回收自定义的ThreadLocal变量,尤其在线程池场景下,线程经常会被复用,如果不清理自定义的ThreadLocal变量,可能会影响后续业务逻辑和造成内存泄露等问题。尽量在代理中使用try-finally 块进行回收。
class House {
int saleCount = 0;
public synchronized void saleHouse() {
++saleCount;
}
// ThreadLocal<Integer> saleVolume = new ThreadLocal<Integer>() {
// @Override
// protected Integer initialValue() {
// return 0;
// }
// };
ThreadLocal<Integer> saleVolume = ThreadLocal.withInitial(() -> 0);
public void saleVolumeByThreadLocal() {
saleVolume.set(1 + saleVolume.get());
}
}
public class JucTest {
public static void main(String[] args) {
House house = new House();
for (int i = 0; i < 5; i++) {
new Thread(() -> {
int size = new Random().nextInt(5) + 1;
try {
for (int j = 1; j<= size; j++) {
house.saleHouse();
house.saleVolumeByThreadLocal();
}
System.out.println(Thread.currentThread().getName() + "\t" + "号销售卖出:" + house.saleVolume.get());
} finally {
//必须回收
house.saleVolume.remove();
}
}, String.valueOf(i)).start();
}
//暂停亳秒
try {
TimeUnit.MILLISECONDS. sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread( ).getName() + "\t" + "共计卖出多少套: " + house.saleCount);
}
}
底层原理
Thread调用了ThreadLocal.ThreadLocalMap;ThreadLocal包含了静态内部类ThreadLocalMap
threadLocalMap实际上就是一个以threadLocal实例为key,任意对象为value的Entry对象。
JVM内部维护了一个线程版的Map<ThreadLocal,Value>(通过ThreadLocal对象的set方法,结果把ThreadLocal对象自己当做key,放进了ThreadLoalMap中),每个线程要用到这个T的时候,用当前的线程去Map里面获取,通过这样让每个线程都拥有了自己独立的变量,人手一份,竞争条件被彻底消除,在并发模式下是绝对安全的变量。
强引用
OOM都不会被回收
软引用
当内存不够了,软引用才会被自动回收
class MyObject {
public MyObject(){}
@Override
protected void finalize() throws Throwable {
super.finalize();
System.out.println("-----该对象被gc了");
}
}
public class JucTest {
public static void main(String[] args) {
//当内存不够了,软引用才会被自动回收
//-Xms10m -Xmx10m 最大内存为10m
SoftReference<MyObject> softReference = new SoftReference<>(new MyObject());
System.gc();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("内存够用: " + softReference.get());
try {
byte[] bytes = new byte[1024 * 1024 * 20];
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("------gc after内存不够用: " + softReference.get());
}
}
}
弱引用
弱引用会被gc回收,不管内存空间是否充足
class MyObject {
public MyObject(){}
@Override
protected void finalize() throws Throwable {
super.finalize();
System.out.println("-----该对象被gc了");
}
}
public class JucTest {
public static void main(String[] args) {
//弱引用会被回收,不管内存空间是否充足
//-Xms10m -Xmx10m 最大内存为10m
WeakReference<MyObject> weakReference = new WeakReference<>(new MyObject());
System.out.println("内存够用: " + weakReference.get());
System.gc();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
虚引用
- 虚引用必须和引用队列(ReferenceQueue)联合使用
虚引用需要java.lang.ref.PhantomReference类来实现,顾名思义,就是形同虚设,与其他几种引用都不同,虚引用并不会决定对象的生命周期。如果一个对象仅持有虚引用,那么它就和没有任何引用一样,在任何时候都可能被垃圾回收器回收,它不能单独使用也不能通过它访问对象,虚引用必须和引用队(ReferenceQueue)联合使用。 - PhantomReference的get方法总是返回null
虚引用的主要作用是跟踪对象被垃圾回收的状态。仅仅是提供了一种确保对象被finalize以后,做某些事情的通知机制。PhantomReference的get方法总是返回null,因此无法访问对应的引用对象。 - 处理监控通知使用
换句话说,设置虚引用关联对象的唯一目的,就是在这个对象被收集器回收的时候收到一个系统通知或者后续添加进一步的处理,用来实现比finalize机制更灵活的回收操作
为什么ThreadLocal.ThreadMap.Entry使用弱引用
强引用(new的),弱引用(ThreadLocalMap key指向)都指向ThreadLocal对象,强引用销毁了,弱引用就会被gc回收。
当function01方法执行完毕后,栈帧销毁强引用tl也就没有了。但此时线程的ThreadLocalMap里某个entry的key引用还指向这个对象,若这个key引用是强引用,就会导致key指向的ThreadLocal对象及v指向的对象不能被gc回收,造成内存泄漏:若这个key引用是弱引用就大概率会减少内存泄漏的问题(还有一个key为null的雷,第2个坑后面讲)。使用弱引用,就可以使ThreadLocal对象在方法执行完毕后顺利被回收且Entry的key用指向为null。(解决了ThreadLocal对象的回收问题,但是没有解决Map中的value的回收问题)
当我们为threadLocal变量赋值,实际上就是当前的Entry(threadLocal实例为key,值为value)往这个threadLocalMap中存放。Entry中的key是弱引用,当threadLocal外部强引用被置为null(tl=null),那么系统GC的时候,根据可达性分析,这个threadLocal实例就没有任何一条链路能够引用至它,这个ThreadLoca势必会被回收。这样一来,ThreadLocalMap中就会出现key为null的Entry,就没有办法访问这些key为null的Entry的value。如果当前线程再迟迟不结束的话,这key为null的Entry的value就会一直存在一条强引用链:Thread Ref->Thread->ThreaLocalMap->Entry
此后我们调用get,set或remove方法时,就会尝试删除key为null的entry,可以释放value对象所占用的内存。
虽然弱引用,保证了key指向的ThreadLocal对象能被及时回收,但是v指向的value对象是需要ThreadLocalMap调用get、set时发现key为null时才会去回收整个entry、value,因此弱引用不能100%保证内存不泄露。我们要在不使用某个ThreadLocal对象后,手动调用remoev方法来删除它,尤其是在线程池中,不仅仅是内存泄露的问题,因为线程池中的线程是重复使用的,意味着这个线程的ThreadLocalMap对象也是重复使用的,如果我们不手动调用remove方法,那么后面的线程就有可能获取倒上个线程遗留下来的value值,造成bug。
清除脏的Entry
<null, value>
set(),get(),remove()方法被调用时,key为null的会被清除。
对象内存布局
在HotSpot虚拟机里,对象在堆内存中的存储布局可以划分为三个部分:对象头(Header) 、实例数据(Instance Data)和对齐填充(Padding)。
对其填充:保证java字节是8的倍数
对象头
组成:
对象标记Mark Word
默认存储对象的HashCode、 分代年龄和锁标志位等信息。
这些信息都是与对象自身定义无关的数据,所以MarkWord被设计成一个非固定的数据结构以便在极小的空间内存存储尽量多的数据。它会根据对象的状态复用自己的存储空间,也就是说在运行期间MarkWord里存储的数据会随着锁标志位的变化时变化。类元信息(类型指针)
对象指向它的类元数据的指针(方法区),虚拟机通过这个指针来确定这个对象是哪个类的实例。
默认启用压缩指针。
对象中的实际数据
存放类的属性(Field)数据信息,包括父类的属性信息
对齐填充
虚拟机要求对象起始地址必须是8字节的整数倍。填充数据不是必须存在的,仅仅是为了字节对齐这部分内存按8字节补充对齐。
64位处理器每次处理8字节的数据,并且只能按照一种特殊的方式进行访问,要么是0-7,要么是8-F,这个是硬件造成的,没有办法,也就是按照固定的索引从0开始,每次访问8个偏移量。那么如果没有对齐填充就可能会存在数据跨内存地址区域存储的情况。32位也是一样的,只不过是每次读取4字节的数据。
对齐填充存在的意义就是为了提高CPU访问数据的效率,这是一种以空间换时间的做法;虽然访问效率提高了(减少了内存访问次数),但是在0x07处产生了1bit的空间浪费。
JOL工具
分析对象在JVM虚拟机的大小和布局
<!-- JOL工具 -->
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.9</version>
</dependency>
class Customer {
int id;
boolean flag = false;
}
public class JucTest {
public static void main(String[] args) {
Object o = new Object();//16 bytes
//System.out.println(ClassLayout.parseInstance(o).toPrintable());
//1. Customer 只有对象头没有任何Field 16 bytes
//2. int+boolean 默认满足对齐填充24
Customer c1 = new Customer();
System.out.println(ClassLayout.parseInstance(c1).toPrintable());
}
}
Synchronized与锁升级
锁的升级过程
无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁
重量级锁,假如锁的竞争比较激烈的话,性能减低
java的线程是映射到操作系统原生线程之上的,如果要阻塞或唤醒一个线程就需要操作系统介入,需要在用户态与核心态之间切换,这种切换会消耗大量的系统资源,因为用户态与内核态都有各自专用的内存空间,专用的寄存器等,用户态切换至内核态需要传递给许多变量、参数给内核,内核也需要保护好用户态在切换时的一些寄存器值、变量等,以便内核态调用结束后切换回用户态继续工作。
在Java早期版本中,synchronized属于重量级锁,效率低下,因为监视器锁(monitor)是依赖于底层的操作系统的Mutex Lock(系统互斥量)来实现的,挂起线程和恢复线程都需要转入内核态去完成,阻塞或唤醒一个Java线程需要操作系统切换CPU状态来完成,这种状态切换需要耗费处理器时间,如果同步代码块中内容过于简单,这种切换的时间可能比用户代码执行的时间还长”,时间成本相对较高,这也是为什么早期的synchronized效率低的原因Java6之后,为了减少获得锁和释放锁所带来的性能消耗,引入了轻量级锁和偏向锁
锁指向
偏向锁:MarkWord存储的是偏向的线程ID。
轻量锁:MarkWord存储的是指向线程栈中LockRecord的指针。
重量锁:MarkWord存储的是指向堆中的monitor对象的指针。
Monitor
Monitor可以理解为一种同步工具,也可理解为一种同步机制, 常常被描述为一个Java对 象。Java对象是天生的Monitor,每一个Java对象都有成为Monitor的潜质,因为在Java的设计中,每一个Java对象自打娘胎里出来就带了一把看不见的锁,它叫做内部锁或者Monitor锁。
Mutex Lock
Monitor是在jvm底层实现的,底层代码是c++。本质是依赖于底层操作系统的Mutex Lock实现,操作系统实现线程之间的切换需要从用户态到内核态的转换,状态转换需要耗费很多的处理器时间成本非常高。所synchronized是Java语言中的一个重量级操作。
Monitor与java对象以及线程是如何关联?
1.如果一个java对象被某个线程锁住,则该java对象的Mark Word字段中LockWord指向monitor的起始地址
2.Monitor的Owner字段会存放拥有相关联对象锁的线程id
synchronized用的锁是存在Java对象头里的Mark Word中锁升级功能主要依赖MarkWord中锁标志位和释放偏向锁标志位
偏向锁
单线程竞争
当线程A第一次竞争到锁时,通过操作修改Mark Word中的偏向线程ID、偏向模式。如果不存在其他线程竞争,那么持有偏向锁的线程将永远不需要进行同步
Hotspot的作者经过研究发现,大多数情况下:
多线程的情况下,锁不仅不存在多线程竞争,还存在锁由同一个线程多次获得的情况,偏向锁就是在这种情况下出现的,它的出现是为了解决只有在一个线程执行同步时提高性能。
偏向锁会偏向于第一个访问锁的线程,如果在接下来的运行过程中,该锁没有被其他的线程访问,则持有偏向锁的线程将永远不需要触发同步。也即偏向锁在资源没有竞争情况下消除了同步语句,懒的连CAS操作都不做了,直接提高程序性能。
在实际应用运行过程中发现,“锁总是同一个线程持有,很少发生竞争”,也就是说锁总是被第一个占用他的线程拥有,这个线程就是锁的偏向线程。
那么只需要在锁第一次被拥有的时候,记录下偏向线程ID。这样偏向线程就一直持有着锁(后续这个线程进入和退出这段加了同出锁的代码块时,不需要再次加锁和释放锁。而是直接会去检查锁的MarkWord.里面是不是放的自己的线程ID)。
如果相等,表示偏向锁是偏向于当前线程的,就不需要再尝试获得锁了,直到竞争发生才释放锁。以后每次同步,检查锁的偏向线程ID与当前线程ID是否一致,如果一致直接进入同步。无需每次加锁解锁都去CAS更新对象头。如果自始至终使用锁的线程只有一个,很显偏向锁几乎没有额外开销,性能极高。
如果不等,表示发生了竞争,锁已经不是总是偏向于同一个线程了,这个时候会尝试使用CAS来替换MarkWord.里面的线程ID为新线程的ID
竞争成功,表示之前的线程不存在了,MarkWord.里面的线程ID为新线程的ID,锁不会升级,仍然为偏向锁;
竞争失败,这时候可能需要升级变为轻量级锁,才能保证线程间公平竞争锁。
- 竞争失败表示,当前线程在操作某一个同步代码块时,上一个线程也在使用,这时候需要进行自旋访问,根据锁升级原理,最好使用轻量级锁
注意,偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程是不会主动释放偏向锁的。
技术实现:
一个synchronized方法被一个线程抢到了锁时,那这个方法所在的对象就会在其所在的Mark Word中将偏向锁修改状态位,同时还会有占用前54位来存储线程指针作为标识。若该线程再次访问同一个synchronized方法时,该线程只需去对象头的Mark Word中去类断一下是否有偏向锁指向本身的ID,无需再进入Monitor去竞争对象了。
偏向锁的持有
一个synchronized方法被一个线程抢到 了锁时,那这个方法所在的对象就会在其所在的Mark Word中将偏向锁修改状态位,同时还会有占用前54位来存储线程指针作为标识。若该线程再次访问同一个synchronized方法时,该线程只需去对象头的Mark Word中去判断一下是否有偏向锁指向本身的ID,无需再进入Monitor去竞争对象了。
偏向锁,不主动释放锁,偏向指向的ID,如果是当前指向线程ID,当前线程就会直接进入临界区,无需加锁操作,无需沟通内核。
参数说明:
偏向锁在JDK1.6以上默认开启,开启后程序启动几秒后才会被激活,可以使用JVM参数来关闭延迟-XX:BiasedLocking StartupDelay=O如果确定锁通常处于竞争状态则可通过JVM参数-x:-UseBiasedLocking关闭偏向锁,那么默认会进入轻量级锁
偏向锁撤销
竞争激烈,其他线程CAS操作替换线程ID,原持有偏向锁的线程未执行完毕,则升级轻量级锁
Java15后废弃偏向锁
轻量级锁
多线程竞争,但是任意时刻最多只有一个线程竞争,即不存在锁竞争太过激烈的情况,也就没有线程阻塞。 本质就是 CAS 锁
轻量级锁是为了在线程近乎交替执行同步块时提高性能。
主要目的:在没有多线程竞争的前提下,通过CAS减少重量级锁使用操作系统互斥量产生的性能消耗,说白了先自旋,不行才升级阻塞。
升级时机:当关闭偏向锁功能或多线程竞争偏向锁会导致偏向锁升级为轻量级锁
假如线程A已经拿到锁,这时线程B又来抢该对象的锁,由于该对象的锁已经被线程A拿到,当前该锁已是偏向锁了。而线程B在争抢时发现对象头Mark Word中的线程ID不是线程B自己的线程ID(而是线程A),并且获取偏向锁失败,那线程B就会进行CAS操作希望能获得锁。
此时线程B操作中有两种情况:
如果锁获取成功,直接替换Mark Word中的线程ID为B自己的ID(A→B),重新偏向于其他线程(即将偏向锁交给其他线程,相当于当前线程”被”释放了锁),该锁会保持偏向锁状态,A线程Over,B线程上位;
如果锁获取失败,则偏向锁升级为轻量级锁(设置偏向锁标识为0并设置锁标志位为00),此时轻量级锁由原持有偏向锁的线程持有,继续执行其同步代码,而正在竞争的线程B会进入自旋等待获得该轻量级锁。
轻量级锁的加锁
JVM会为每个线程在当前线程的栈帧中创建用于存储锁记录的空间,官方成为Displaced Mark Word。若一个线程获得锁时发现是轻量级锁,会把锁的Mark Word复制到自己的Displaced Mark Word里面。然后线程尝试用CAS将锁的Mark Word替换为指向锁记录的指针。如果成功,当前线程获得锁,如果失败,表示Mark Word已经被替换成了其他线程的锁记录,说明在与其它线程竞争锁,当前线程就尝试使用自旋来获取锁。
轻量级锁的释放
在释放锁时,当前线程会使用CAS操作将Displaced Mark Word的内容复制回锁的Mark Word里面。如果没有发生竞争,那么这个复制的操作会成功。如果有其他线程因为自旋多次导致轻量级锁升级成了重量级锁,那么CAS操作会失败,此时会释放锁并唤醒被阻塞的线程。
轻量级锁和偏向锁的区别:
争夺轻量级锁失败时,自旋尝试抢占锁轻量级锁每次退出同步块都需要释放锁,
而偏向锁是在竞争发生时才释放锁
重量级锁
自旋达到一定次数(自适应调节次数)时,升级锁。
自适应:线程如果自旋成功了,那下次自旋的最大次数会增加,因为JVM认为既然上次成功了,那么这一次也很大概率会成功。
Java中synchronized的重量级锁,是基于进入和退出Monitor对象实现的。在编译时会将同步块的开始位置插入monitor enter指令,在结束位置插入monitor exit指令。
当线程执行到monitor enter指令时,会尝试获取对象所对应的Monitor所有权,如果获取到了,即获取到了锁,会在Monitor的owner中存放当前线程的id,这样它将处于锁定状态,除非退出同步块,否则其他线程无法获取到这个Monitor。
升级为轻量级锁时,JVM会在当前线程的栈帧中创建个锁记录(Lock Record)空间,用于存储锁对象的Mark Word拷贝,该拷贝中可以包含identity hash code,所以轻量级锁可以和identity hash code共存,哈希码和GC年龄自然保存在此,释放锁后会将这些信息写回到对象头。
一个对象计算过hash code时无法进入偏向锁,直接进入轻量级锁
锁升级发生后,hashcode去哪啦
锁升级为轻量级或重量级锁后,Mark Word中保存的分别是线程栈帧里的锁记录指针和重量级锁指针,己经没有位置再保存哈希码,GC年龄了,那么这些信息被移动到哪里去了呢?
- 锁和hashcode的关系:
- 在java语言中,一个对象如果已经计算过哈希值,就应该一直保持不变,绝大多数对象哈希值来源就时Object.HashCode,它通过在对象头中存储计算结果来保证第一次计算后,再次调用该方法湖区道德hash值不会发生改变。所以,当一个对象已经获取对应的HashCode后,他就再也无法进入到偏向锁的状态了,如果一个对象进入到偏向锁的状态之后,有收到计算其哈希值的要求,会直接退出偏向锁的状态,直接进入到重量锁状态。在重量级锁的实现中,对象头指向了重量级锁的位置,代表重量级锁的ObjectMonitor类中有字段可以记录非加锁状态下的Mark Word,其中自然可以存储原来的哈希码。
- 对于偏向锁,在线程获取偏向锁时,会用Thread ID和epoch值覆盖identity hash code所在的位置。如果一个对象的hashCode(O方法己经被调用过一次之后,这个对象不能被设置偏向锁。因为如果可以的话,那Mark Word中的identity hash code必然会被偏向线程ld给覆盖,这就会造成同一个对象前后两次调用hashCode()方法得到的结果不一致。
- 升级为轻量级锁时,JVM会在当前线程的栈帧中创建一个锁记录(Lock Record空间,用于存储锁对象的Mark Word拷贝,该拷贝中可以包含identity hash code,所以轻量级锁可以和identity hash code共存,哈希码和GC年龄自然保存在此,释放锁后会将这些信息写回到对象头。
- 升级为重量级锁后,Mark Word保存的重量级锁指针,代表重量级锁的ObjectMonitor类里有字段记录非加锁状态下的Mark Wo,锁释放后也会将信息写回到对象头。
总结
synchronized锁升级过程总结:一句话,就是先自旋,不行再阻塞。
JIT编译器对锁的优化
JIT即时编译器会优化无效的锁(锁消除)
JIT会融合一些锁(锁粗化)
没有锁:自由自在
偏向锁:唯我独尊
轻量锁:楚汉争霸
重量锁:群雄逐鹿
AQS
就是一套线程阻塞等待以及唤醒时锁分配的机制
AQS 抽象的队列同步器
AQS是juc内容中的重要基石
实现了AQS的锁有:自旋锁、互斥锁、读锁写锁、条件产量、信号量、栅栏都是AQS的衍生物
AbstractQueuedSynchronizer简称为AQS,抽象的对列同步器。
是用来实现锁或者其它同步器组件的公共基础部分的抽象实现,是重量级基础框架及整个JUC体系的基石,主要用于解决锁分配给”谁”的问题。
整体就是一个抽象的FIFO队列(双向循环列表)来完成资源获取线程的排队工作,并通过一个int类变量表示持有锁的状态。
AQA靠一个 先进先出的队列,和一个 资源状态值state 来实现
CLH队列:Craig、Landin and Hagersten队列,是一个单项列表,AQS中的队列时CLH变体的虚拟双向队列FIFO
AbstractQueuedSynchronizer类内部类Node,属性state为同步状态。
Java并发大神DougLee,提出统一规范并简化了锁的实现,将其抽象出来屏蔽了同步状态管理、同步队列的管理和维护、阻塞线程排队和通知、唤醒机制等,是一切锁和同步组件实现的–公共基础部分 AQS
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS同步队列的抽象表现。它将要请求共享资源的线程及自身的等待状态封装成队列的结点对象(Node),通过CAS、自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的效果。
AQS使用一个volatile的it类型的成员变量来表示同步状态,通过内置的FIFo队列来完成资源获取的排队工作将每条要去抢占资源的线程封装成一个Node节点来实现锁的分配,通过CAS完成对State值的修改。
Lock接口的实现类,基本都是通过【聚合】了一个【队列同步器】的子类完成线程访问控制的
AQS 源码分析
从ReentrantLock开始解读AQS源码
从最简单的lock方法开始看看公平和非公平
以非公平锁作为案例突破口
双向链表中,第一个节点为虚节点(也叫哨兵节点),其实并不存储任何信息,只是占位。
真正的第一个有数据的节点,是从第二个节点开始的。
等待队列
上锁 阻塞
public class ReentrantLock implements Lock, java.io.Serializable {
public ReentrantLock() {
sync = new NonfairSync();
}
static final class NonfairSync extends Sync {
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// 第一个线程进入,抢占
if (compareAndSetState(0, 1))
// set 独占所有者线程 将当前线程独占该资源
setExclusiveOwnerThread(Thread.currentThread());
else
// 第二个线程以及后续线程抢占
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// h获取当前锁的状态 (1:被占用 0:空闲)
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果当前线城是 独占所有者线程
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io. Serializable {
public final void acquire(int arg) {
// 进行抢锁, 如果成功直接进入 selfInterrupt(); 如果失败,则进入等待区间addWaiter(Node.EXCLUSIVE), arg)
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// mode 排他模式
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 判断尾指针是否为null 正常情况下,第一次应该为null
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 第一次入队的时候掉用
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 使得该线程停留在该队列中,等待资源的释放
LockSupport.park(this);
return Thread.interrupted();
}
}
unlock 释放
public class ReentrantLock implements Lock, java.io.Serializable {
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 1 - 1
// 当前线程不是 占用资源独占锁的线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
// 释放锁
setExclusiveOwnerThread(null);
}
// 修改当前锁的状态为零 ,标识没有线程占用
setState(c);
return free;
}
}
public abstract class AbstractQueuedSynchronizer {
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
}
读写锁
ReentrantReadWriteLock
悲观锁
读写锁定义为:一个资源能够被多个读线程访问,或者被一个写线程访问,但是不能同时存在读写线程。
- 读读可以共享,多线程并发可以访问,大面积的可以容许多个线程来读取读多写少的时候,读写锁优先。
- 读写、写写互斥
readLock()
writeLock()
缺点:
写饥饿: 在线程拥有读锁的时候无法在获取写锁,写锁的线程只能一直等待,造成饥饿
锁降级(不算缺点吧)
写锁降级为读锁
- 如果同一个线程持有了写锁,在没有释放写锁的情况下,它还可以继续获得读锁。这就是写锁的降级,降级成为了读锁。
- 规则惯例,先获取写锁,然后获取读锁,再释放写锁的次序。
- 如果释放了写锁,那么就完全转换为读锁。
public class JucTest {
public static void main(String[] args) {
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
writeLock.lock();
System.out.println("----wirte");
readLock.lock();
System.out.println("----read");
writeLock.unlock();
readLock.unlock();
}
}
锁降级设计思想
//锁降级设计思想,保证数据可见性。写锁降级为读锁,本质相当于重入,这种机制专门为缓存设计的。
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
StampedLock
是一种乐观锁
ReentrantReadWriteLock的读锁被占用的时候,其他线程尝试获取写锁的时候会被阻塞。但是,StampedLock采取乐观获取锁后,其他线程尝试获取写锁时不会被阻塞,这其实是对读锁的优化,所以,在获取乐观读锁后,还需要对结果进行校验。
解决锁饥饿问题
- 公平锁,牺牲吞吐量
- 乐观读锁(读的过程允许写的介入)
缺点
- StampedLock不支特重入,没有Re开头
- StampedLock的悲观读锁和写锁都不支持条件变量(Condition),这个也需要注意。
- 使用StampedLock一定不要调用中断操作,即不要调用interrupt()方法
public class JucTest {
static int nuumber = 17;
static StampedLock stampedLock = new StampedLock();
public void write() {
System.out.println(Thread.currentThread().getName() + "\t" + "wirte thread prepare to wirte");
long stamp = stampedLock.writeLock();
try {
nuumber = nuumber + 13;
} finally {
stampedLock.unlockWrite(stamp);
}
System.out.println(Thread.currentThread().getName() + "\t" + "wirte thread end to wirte");
}
//悲观读
public void read() {
System.out.println(Thread.currentThread().getName() + "\t" + "wirte thread prepare to read");
long stamp = stampedLock.readLock();
for (int i = 0; i < 4; i++) {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("reading...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
int result = nuumber;
System.out.println("read: " + result);
} finally {
stampedLock.unlockRead(stamp);
}
System.out.println(Thread.currentThread().getName() + "\t" + "wirte thread end to read");
}
//乐观读
public void tryOptimisticRead() {
long stamp = stampedLock.tryOptimisticRead();
int result = nuumber;
System.out.println("判断是否有修改" + "\t" + stampedLock.validate(stamp));
for (int i = 0; i < 4; i++) {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("reading stampedLockValidate: " + stampedLock.validate(stamp));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (!stampedLock.validate(stamp)) {
System.out.println("value modified by someone");
stamp = stampedLock.readLock();
try {
System.out.println("乐观读升级为悲观读");
result = nuumber;
System.out.println("悲观读后: " + result);
} finally {
stampedLock.unlockRead(stamp);
}
}
System.out.println("finally value: " + result);
}
public static void main(String[] args) {
JucTest jucTest = new JucTest();
new Thread(() -> {
jucTest.tryOptimisticRead();
}, "readThread").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
jucTest.write();
}, "writeThread").start();
System.out.println(Thread.currentThread().getName() + "\t" + nuumber);
}
}