CPU分时轮转调度机制:时间片即CPU分配给各个程序的时间,每个线程被分配一个时间段,称作它的时间片
,即该进程允许运行的时间,使各个程序从表面上看是同时进行的。如果在时间片结束时进程还在运行,则CPU将被剥夺并分配给另一个进程。如果进程在时间片结束前阻塞或结束,则CPU当即进行切换。而不会造成CPU资源浪费。在宏观上:我们可以同时打开多个应用程序,每个程序并行不悖,同时运行。但在微观上:由于只有一个CPU,一次只能处理程序要求的一部分,如何处理公平,一种方法就是引入时间片,每个程序轮流执行。
进程:内存中正在运行的的程序;指一个内存中运行的应用程序;每个进程都有一个独立的内存空间,一个应用程序可以同时运行多个进程;进程也是程序的一次执行过程,是系统运行程序的基本单位;系统运行一个程序即是一个进程从创建,运行到消亡的过程。
线程:线程是进程中的一个执行单元,负责当前进程中程序的执行,一个进程中至少有一个线程。进程中是可以有多个线程,这个应用程序也可以称之为多线程程序。
二者对比
管程:管程,对应的英文是Monitor。所谓管程,指的是管理共享变量以及对共享变量的操作过程,让他们支持并发。并发编程领域,有两大核心问题:一个是互斥
,即同一时刻只允许一个线程访问共享资源;另一个是同步
,即线程之间如何通信、协作。这两大问题,管程都是能够解决的。
并行:同一时刻,不同主体可以同时处理事情的能力(强调不同实体)。多核cpu可以实现并行。
并发:与单位时间相关,在单位时间内可以处理事情的能力(强调同一实体)
结论:
单核 cpu 下,多线程不能实际提高程序运行效率,只是为了能够在不同的任务之间切换(线程上下文的切换),不同线程轮流使用 cpu ,不至于一个线程总占用 cpu,别的线程没法干活
多核 cpu 可以并行跑多个线程,但能否提高程序运行效率还是要分情况的
● 有些任务,经过精心设计,将任务拆分,并行执行,当然可以提高程序的运行效率。但不是所有计算任务都能拆分(参考后文的【阿姆达尔定律】)
● 也不是所有任务都需要拆分,任务的目的如果不同,谈拆分和效率没啥意义
IO 操作不占用 cpu,只是我们一般拷贝文件使用的是【阻塞 IO】,这时相当于线程虽然不用 cpu,但需要一 直等待 IO 结束,没能充分利用线程。所以才有后面的【非阻塞 IO】和【异步 IO】优化
锁的分类:
windows
任务管理器
可以查看进程和线程数,也可以用来杀死进程tasklist
查看进程taskkill
杀死进程taskkill /F /PID pid
杀死进程linux
ps -ef | grep java
查看关键字为Java的运行进程kill - 9 pid
强制杀死进程kill -15 pid
等待任务执行完毕,再杀死进程top
动态的显示进程信息top -H -p pid
查看某进程下的所有线程信息Java
jps
显示正在运行的java进程jstack pid
查看更详细的进程信息(jdk命令)jconsole
来查看某个 Java 进程中线程的运行情况(图形界面)jconsole 远程监控配置
需要以如下方式运行你的 java 类
java -i.server.hostname=`ip地址` -Dcom.sun.management.jmxremote -
Dcom.sun.management.jmxremote.port=`连接端口` -Dcom.sun.management.jmxremote.ssl=是否安全连接 -
Dcom.sun.management.jmxremote.authenticate=是否认证 java类
线程创建的几种方式
1、继承Thread类
2、实现Runnable接口
3、实现Callable接口。
call()方法可以有返回值;
call()方法可以向上抛出异常,而run()方法仅能内部处理;
使用步骤
(1)创建Callable接口实现类,并实现call()方法,该方法将作为线程执行体,且该方法有返回值,再创建Callable实现类的实例;
(2)使用FutureTask类来包装Callable对象(构造传入Callable对象)
(3)使用FutureTask对象作为Thread对象的target创建并启动新线程;
(4)调用FutureTask对象的get()方法来获得子线程执行结束后的返回值。
4、通过线程池方式(重要)
5、线程复用
每一个 Thread 的类都有一个 start 方法。 当调用 start 启动线程时 Java 虚拟机会调用该类的 run 方法。 那么该类的 run() 方法中就是调用了 Runnable 对象的 run() 方法。 我们可以继承重写Thread 类,在其 start 方法中添加不断循环调用传递过来的 Runnable 对象。 这就是线程池的实现原理。循环方法中不断获取 Runnable 是用 Queue 实现的,在获取下一个 Runnable 之前可以是阻塞的。
package com.ma.juc;import urrent.*;
import urrent.atomic.AtomicInteger;/*** @Description* 线程创建的几种方式* 1、继承Thread类* 2、实现Runnable接口* 3、实现Callable接口* 3.1、call()方法可以有返回值;* 3.2、call()方法可以声明抛出异常;* 方式:* (1)创建Callable接口实现类,并实现call()方法,该方法将作为线程执行体,且该方法有返回值,再创建Callable实现类的实例;* (2)使用FutureTask类来包装Callable对象,该FutureTask对象封装了该Callable对象的call()方法的返回值;* (3)使用FutureTask对象作为Thread对象的target创建并启动新线程;* (4)调用FutureTask对象的get()方法来获得子线程执行结束后的返回值。* 4、通过线程池方式(重要)* wFixedThreadPool:(core=max=指定的值)创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待;* wCachedThreadPool:(core=0,max=Integer.value)创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程;* wSingleThreadExecutor:(core=max=1)创建仅单个线程数的线程池,它可以保证任务先进先出的执行顺序;* wScheduledThreadPool:(定时任务的线程池)创建一个可以执行延迟任务的线程池;* wSingleThreadScheduledExecutor:创建一个单线程的可以执行延迟任务的线程池;* wWorkStealingPool:创建一个抢占式执行的线程池(任务执行顺序不确定)【JDK 1.8 添加】。* ThreadPoolExecutor:最原始的创建线程池的方式,它包含了 7 个参数可供设置,后面会详细讲。<br/> (非常重要!!!)* 转:[线程池的7种创建方式,强烈推荐你用它...]()* [为什么尽量不要使用Executors创建线程池](.html)*** 线程分类:* 线程可分为用户线程(user thread) 和 守护线程(daemon thread)。* 守护线程指在后台运行的线程,也称为后台线程,用于提供后台服务。* Java创建的线程默认是用户线程。* 两者的差别是,当进程中还有用户线程在运行时,进程不终止;* 当进程中只有守护线程在运行时,进程终止。* Thread类与守护线程有关的方法声明如下:* public final void setDaemon(boolean on) //若on为true,则设置为守护线程,必须在启动线程前调用* public final boolean isDaemon() //判断是否为守护线程,若是,则返回true;否则返回false* ————————————————* 版权声明:本文为CSDN博主「打你个落花流水」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。* 原文链接:* @Classname ThreadTest* @Created by Fearless* @Date 2021/8/2 19:53*/
public class ThreadTest implements Runnable {/*** 参数说明:* int corePoolSize:核心线程数 线程池中存放的已就绪的线程数量,若设置allowCoreThreadTimeOut则线程超时就会回收* int maximumPoolSize:最大线程数量 控制资源* long keepAliveTime:存活时间 线程中断到重新接受新任务之间的最大时间。* 当前线程数量大于核心线程数量时,会释放空闲的线程* (需要释放的线程数量为maximumPoolSize - corePoolSize)* TimeUnit unit:时间单位* BlockingQueue<Runnable> workQueue:阻塞队列 任务堆积时,会将任务存放到队列中去,等待空闲队列的执行* ThreadFactory threadFactory:线程工厂 负责线程的创建* RejectedExecutionHandler handler:拒绝策略 如果队列任务满了,按照指定的拒绝策略拒绝执行任务* 丢弃最老的;丢弃新来的,同时抛异常;直接利用当前线程直接调用run方法同步执行任务;丢弃新来的(不抛异常)* 工作流程:* 1)线程池创建,准备好core数量的核心线程,准备接受任务* 1.1、core满了,就将在进来的任务放入阻塞队列中,(注意是先进队列队列满了再起非核心线程提供服务)。空闲的core就会自己去阻塞队列中获取任务执行* 1.2、阻塞队列满了,就直接开新线程执行,最大开大max指定的数量* 1.3、max满了就用RejectedExecutionHandler拒绝任务* 1.4、如果线程池中线程数量大于core,将在指定的keepAliveTime以后,释放max-core数量的多余线程**/private static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5,200,10, TimeUnit.SECONDS,new LinkedBlockingQueue<>(100),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws InterruptedException, ExecutionException {/*Runnable thread1 = new Thread2();Thread thread2 = new Thread(thread1);thread2.setName("Thread2");thread2.start();Thread.sleep(60000);*/Thread3 thread3 = new Thread3();FutureTask<Integer> task = new FutureTask<Integer>(thread3);for (int j = 0; j < 50 ; ++j){System.out.println(Thread.currentThread().getName() + " 大循环的循环变量j的值为:" + j);if (j == 20){new Thread(task,"有返回值的线程" + j).start();}}try {// 阻塞等待获取子线程的返回值System.out.println("子线程的返回值:" + ());Thread.sleep(6000);} catch (ExecutionException e) {e.printStackTrace();}}
}class Thread1 extends Thread{@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + "正在运行..");}}
class Thread2 implements Runnable{@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + "正在运行..");}
}
class Thread3 implements Callable{@Overridepublic Object call() throws Exception {int i = 0;for (;i < 50 ; ++i){System.out.println(Thread.currentThread().getName() + " 的线程执行体内的循环变量i的值为:" + i);}return i;}
}
栈与栈帧
Java Virtual Machine Stacks (Java 虚拟机栈)
我们都知道 JVM 中由堆、栈、方法区所组成,其中栈内存是给谁用的呢?其实就是线程,每个线程启动后,虚拟机就会为其分配一块栈内存。
线程上下文切换(Thread Context Switch)
因为以下一些原因导致 cpu 不再执行当前的线程,转而执行另一个线程的代码
当 Context Switch 发生时,需要由操作系统保存当前线程的状态,并恢复另一个线程的状态,Java 中对应的概念就是程序计数器
(Program Counter Register),它的作用是记住下一条 jvm 指令的执行地址,是线程私有的
思考:线程上下文切换与线程用户态向内核态转换之间的关系?
【JUC】什么是用户态及内核态?
方法名 | static | 功能说明 | 注意 |
---|---|---|---|
start() | 启动一个新线程,在新的线程运行 run 方法中的代码 | start 方法只是让线程进入就绪,里面代码不一定立刻运行(CPU 的时间片还没分给它)。每个线程对象的start方法只能调用一次,如果调用了多次会出现IllegalThreadStateException | |
run() | 新线程启动后会调用的方法 | 如果在构造 Thread 对象时传递了 Runnable 参数,则线程启动后会调用 Runnable 中的 run 方法,否则默认不执行任何操作。但可以创建 Thread 的子类对象,来覆盖默认行为 | |
join() | 等待线程运行结束 | ||
join(long n) | 等待线程运行结束,最多等待 n毫秒 | ||
getId() | 获取线程长整型的 id | id唯一 | |
getName() | 获取线程名 | ||
setName(String) | 修改线程名 | ||
getPriority() | 获取线程优先级 | ||
setPriority(int) | 修改线程优先级 | java中规定线程优先级是1~10 的整数,较大的优先级能提高该线程被 CPU 调度的机率 | |
isInterrupted() | 判断是否被打断, | 不会清除 打断标记 | |
isAlive() | 线程是否存活(还没有运行完毕) | ||
interrupt() | 打断线程 | 如果被打断线程正在 sleep,wait,join 会导致被打断的线程抛出 InterruptedException,并清除 打断标记 ;如果打断的正在运行的线程,则会设置 打断标记 ;park 的线程被打断,也会设置 打断标记 | |
interrupted() | static | 判断当前线程是否被打断 | 会清除打断标记 |
currentThread() | static | 获取当前正在执行的线程 | |
sleep(long n) | static | 让当前执行的线程休眠n毫秒,休眠时让出 cpu 的时间片给其它线程 | |
yield() | static | 提示线程调度器让出当前线程对CPU的使用 | 主要是为了测试和调试用 |
start 与 run:调用start会通过操作系统创建出线程,之后由此线程来执行run方法(可以将run方法看做一个任务);单独调用run仅是方法调用,仍然是在当前线程中运行。
sleep(使线程阻塞)
yield(让出当前线程)
线程优先级
用于等待某个线程结束。哪个线程内调用join()方法,就等待哪个线程结束,然后再去执行其他线程。如在主线程中调用ti.join(),则是主线程等待t1线程结束,join 采用同步。
Thread t1 = new Thread();
//等待 t1 线程执行结束
t1.join();
// 最多等待 1000ms,如果 1000ms 内线程执行完毕,则会直接执行下面的语句,不会等够 1000ms
t1.join(1000);
interrupt 中断线程有两种情况,如下:
Two Phase Termination,就是考虑在一个线程T1中如何优雅地终止另一个线程T2?这里的优雅指的是给T2一个料理后事的机会(如释放锁)。
简单实现代码如下:
package com.ma.juc;slf4j.Slf4j;import urrent.TimeUnit;/*** @author mayujie@jyd* @version v1.0* @description TwoPhaseTermination 两阶段终止模式* @date 2022/6/14 15:19*/
@Slf4j
public class TwoPhaseTermination {private Thread monitor;// 开启监控线程public void start(){monitor = new Thread(() -> {while (true) {Thread current = Thread.currentThread();try {if (current.isInterrupted()) {log.info("料理后事...");break;}TimeUnit.SECONDS.sleep(1);log.info("执行监控记录...");} catch (InterruptedException e) {e.printStackTrace();current.interrupt();}}});monitor.start();}// 停止监控线程public void stop(){monitor.interrupt();}public static void main(String[] args) throws InterruptedException {TwoPhaseTermination phaseTermination = new TwoPhaseTermination();phaseTermination.start();TimeUnit.SECONDS.sleep(3);phaseTermination.stop();}
}
Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做 了,直接结束返回,有点类似单例。
public class BalkingDemo {public static void main(String[] args) throws InterruptedException {Monitor monitor = new Monitor();monitor.start();monitor.start();TimeUnit.SECONDS.sleep(3);monitor.stop();}
}class Monitor {Thread monitor;// 设置标记,用于判断是否被终止了private volatile boolean stop = false;// 设置标记,用于判断是否已经启动过了private boolean starting = false;/*** 启动监视器线程*/public void start() {synchronized (this) {if (starting) {return;}starting = true;}String s = UUID.randomUUID().toString();monitor = new Thread(() -> {// 开始不停的监控while (true) {if (stop) {System.out.println(Thread.currentThread().getName() + "处理后续任务...");}System.out.println(Thread.currentThread().getName() + "监视器运行中...");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {System.out.println(Thread.currentThread().getName() + "被打断了...");}}}, s);monitor.start();;}/*** 用于停止监视器线程*/public void stop() {monitor.interrupt();// 打断线程stop = true;}
}
参考文章:sleep、yield、wait、join的区别(阿里)
默认情况下,java进程需要等待所有的线程结束后才会停止,但是有一种特殊的线程,叫做守护线程,在其他线程全部结束的时候即使守护线程还未结束代码未执行完java进程也会停止。普通线程t1可以调用 t1.setDeamon(true); 方法变成守护线程。
1)从操作系统层划分,线程有 5 种状态
2)java线程的 6 种状态及状态切换
这是从 Java API 层面来描述的,我们主要研究的就是这种。可以参考文章👉Java线程的6种状态及切换(透彻讲解)
以上可从java.lang.Thread.State
查看
测试代码如下:
package com.ma.juc;import org.junit.Test;import urrent.TimeUnit;
import urrent.locks.ReentrantLock;/*** @author mayujie@jyd* @version v1.0* @description ThreadStateTest* @date 2021/12/29 11:08*/
public class ThreadStateTest {public static void main(String[] args) {// 没调用start方法查看下thread状态 NEWthreadStateNew();// 调用start方法查看thread状态 RUNNABLEworkingThread();// thread调用join方法查看状态 TERMINATEDthreadStateTerminate();// main线程持有锁(可重入锁)的同时查看thread状态 WAITINGthreadBlockedByLock();// 多个线程同时竞争同一把锁时,未获得锁的线程处于 BLOCKEDthreadBlockedBySynchronized();threadSleep();threadWait();threadTimeWait();}private static void threadStateNew(){System.out.println("--------------------------------------");System.out.println("Never Start Thread State:");Thread thread = new Thread(() -> {// System.out.println("");}, "Thread Never Start");// print NEWSystem.out.State());System.out.println("--------------------------------------");}private static void workingThread(){System.out.println("--------------------------------------");Thread thread = new Thread(() -> {for (int i = 0; i < 10; i++) {doSomeElse();}});thread.start();doSomeElse();// printf RUNNABLESystem.out.println("Working Thread State:" + State());System.out.println("--------------------------------------");}private static void threadStateTerminate(){System.out.println("--------------------------------------");System.out.println("Finish Job Thread State:");Thread thread = new Thread(() -> {}, "Thread Finish Job");thread.start();try {// Main Thread will wait until this thread finished jobthread.join();} catch (InterruptedException e) {e.printStackTrace();}// print TERMINATESystem.out.State());System.out.println("--------------------------------------");}/*** 为啥这块是waiting而下面是blocked???*/private static void threadBlockedByLock(){System.out.println("--------------------------------------");System.out.println("Thread State Blocked By Lock:");ReentrantLock lock = new ReentrantLock();Thread thread = new Thread(lock::lock, "Blocked Thread");lock.lock();try {thread.start();doSomeElse();//print WAITINGSystem.out.State());} catch (Exception e){e.printStackTrace();} finally {lock.unlock();}System.out.println("--------------------------------------");}/*** BLOCKED状态是两个线程同时获取一把锁时,没拿到锁的线程切换到BLOCKED状态,* 有时打印出RUNNABLE可能是线程没到开始获取锁,打印线程的状态动作有些提前*/private static void threadBlockedBySynchronized(){System.out.println("--------------------------------------");System.out.println("Thread Blocked By Synchronized:");Thread thread = new Thread(() -> {synchronized (ThreadStateTest.class){System.out.println("thread成功获取synchronized锁");}}, "Blocked by Synchronized Thread");thread.start();synchronized (ThreadStateTest.class){System.out.println("main成功获取synchronized锁");doSomeElse();// print BLOCKED 怎么还能打印RUNNABLE???// 阻塞main线程3s,等待thread去获取锁try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("Thread State is:" + State());System.out.println("锁释放...");}System.out.println("--------------------------------------");}private static void threadSleep(){System.out.println("--------------------------------------");System.out.println("Sleeping Thread:");Thread thread = new Thread(() -> {try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}}, "Thread Sleep");thread.start();doSomeElse();// print TIMED_WAITINGSystem.out.State());System.out.println("--------------------------------------");}private static void threadWait(){System.out.println("--------------------------------------");System.out.println("Thread waiting");Object lock = new Object();Thread threadA = new Thread(() -> {synchronized (lock){System.out.println("threadA获得到锁!!!");try {lock.wait();for (int i = 0; i < 100; i++) {doSomeElse();}} catch (InterruptedException e) {e.printStackTrace();}}}, "Thread Waiting");Thread threadB = new Thread(() -> {synchronized (lock){System.out.println("threadB获得到锁!!!");// print WAITINGSystem.out.println("Before Notify, Thread A State:" + State());ify();// print BLOCKEDSystem.out.println("After Notify, Thread A State:" + State());}});threadA.start();doSomeElse();threadB.start();try {threadB.join();} catch (InterruptedException e) {e.printStackTrace();}// print RUNNABLESystem.out.println("After Thread B finish job, Thread A State:" + State());System.out.println("--------------------------------------");}private static void threadTimeWait(){System.out.println("--------------------------------------");System.out.println("Thread Waiting:");Object lock = new Object();Thread threadA = new Thread(() -> {synchronized (lock){try {lock.wait(1000);for (int i = 0; i < 100; i++) {doSomeElse();}} catch (InterruptedException e) {e.printStackTrace();}}}, "Thread Waiting");Thread threadB = new Thread(() -> {synchronized (lock) {// print TIMED_WAITINGSystem.out.println("Before Notify, Thread A State:" + State());ify();// print BLOCKEDSystem.out.println("After Notify, Thread A State:" + State());}});System.out.println(Thread.currentThread().getState());threadA.start();doSomeElse();threadB.start();try {threadB.join();} catch (InterruptedException e) {e.printStackTrace();}// print RUNNABLESystem.out.println("After Thread B finish job, Thread A State:" + State());System.out.println("--------------------------------------");}/*** take some times, let the thread get cpu time*/private static void doSomeElse(){double meanless = 0d;for(int i=0; i<10000; i++){meanless += Math.random();}}// 自己创建的thread线程是由main线程调用start方法通过操作系统初始化出来的,而run方法只是thread的执行任务而已// 因此start方法必定是在run方法执行之前@Testpublic void test(){new Thread(() -> {System.out.println("run执行了" + System.currentTimeMillis());}).start();System.out.println("start执行了" + System.currentTimeMillis());}}
线程出现问题的根本原因是因为线程上下文切换,导致线程里的指令没有执行完就切换执行其它线程了,最终造成数据错乱等现象。比较常见的例子有多线程情况下,i++、++i等自增自减操作时,会造成线程不安全的发生。这块需要清楚两个概念:
1)临界区 Critical Section
例如,下面代码中的临界区
static int counter = 0;static void increment()
// 临界区
{ counter++;
}static void decrement()
// 临界区
{ counter--;
}
2)竞态条件 Race Condition
多个线程在临界区内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件
1)解决手段
为了避免临界区中的竞态条件发生,由多种手段可以达到。
现在讨论使用synchronized
关键字来进行解决,即俗称的对象锁(这指的是广义上的对象),它采用互斥的方式让同一时刻至多只有一个线程持有对象锁,其他线程如果想获取这个锁就会阻塞住,这样就能保证拥有锁的线程可以安全的执行临界区内的代码,不用担心线程上下文切换。
线程安全:多个线程访问同一个对象时,如果不用考虑这些线程在运行时环境下的调度和交替执行,也不需要进行额外的同步,或者在调用方进行任何其他操作,调用这个对象的行为都可以获得正确的结果,那么这个对象就是线程安全的。(简单来说就是多线程情况下,任意的读写操作都不会造成最终的数据错乱,则该对象就是线程安全的)
线程安全问题大多是由全局变量及静态变量引起的,局部变量逃逸也可能导致线程安全问题。
如果局部变量引用的对象逃离方法的范围,那么要考虑线程安全的,分析如下代码:
@Slf4j(topic = "c.Code_18_Test")
public class Code_18_Test {public static void main(String[] args) {// 局部引用对象unsafeTest 是否线程安全取决于内部实现UnsafeTest unsafeTest = new UnsafeTest(); for(int i = 0; i < 10; i++) {new Thread(() -> {hod1();}, "t" + i).start();}}}class UnsafeTest {List<Integer> list = new ArrayList<>(); //该List被多个线程共享读写,因此线程不安全public void method1() {for (int i = 0; i < 200; i++) {method2();method3();}}public void method2() {list.add(1);}public void method3() {ve(0);}}
如图所示,因为 list 是实例变量,则多个线程都会使用到这个共享的实例变量,就会出现线程安全问题,为什么会有安全问题呢,首先要理解 list 添加元素的几步操作,第一步会获取添加元素的下标 index,第二步对指定的 index 位置添加元素,第三步将 index 往后移。
当 t0 线程从 list 拿到 index = 0 后,t0 线程的时间片用完,出现上下文切换,t1 获取时间片开始执行,从 list 也拿到 index = 0,然后将元素添加到 index 位置,然后将 index 值加 1,然后 t0 线程获取时间片,对 index = 0 位置添加元素,此时 index = 0 已经存在元素,就会出现报错。
可以将 list 修改成局部变量,然后将 list 作为引用传入方法中,因为局部变量是每个线程私有的,不会出现共享问题,那么就不会有上述问题了。修改的代码如下:
class SafeTest {public void method1() {List<Integer> list = new ArrayList<>();for (int i = 0; i < 200; i++) {method2(list);method3(list);}}public void method2(List<Integer> list) {list.add(1);}public void method3(List<Integer> list) {ve(0);}}
在上诉代码中,其实存在线程安全的问题,因为 method2,method3 方法都是用 public 声明的,如果一个类继承 SafeTest 类,对 method2,method3 方法进行了重写,比如重写 method3 方法,代码如下:
class UnsafeSubTest extends UnsafeTest {@Overridepublic void method3(List<Integer> list) {new Thread(() -> {ve(0);}).start();}
}
可以看到重写的方法中又使用到了线程,当主线程和重写的 method3 方法的线程同时存在,此时 list 就是这两个线程的共享资源了,就会出现线程安全问题,我们可以用 private 访问修饰符解决此问题,代码实现如下:
class ThreadSafe {public final void method1(int loopNumber) {ArrayList<String> list = new ArrayList<>();for (int i = 0; i < loopNumber; i++) {method2(list);method3(list);}}private void method2(ArrayList<String> list) {list.add("1");}private void method3(ArrayList<String> list) {ve(0);}
}
class ThreadSafeSubClass extends ThreadSafe{@Overridepublic void method3(ArrayList<String> list) {new Thread(() -> {ve(0);}).start();}
}
从这个例子可以看出 private 或 final 提供【安全】的意义所在,请体会开闭原则中的【闭】。
这里说它们是线程安全的是指,多个线程调用它们同一个实例的某个方法时,是线程安全的。如:
Hashtable table = new Hashtable();
new Thread(()->{table.put("key1", "value1");
}).start();
new Thread(()->{table.put("key2", "value2");
}).start();
线程安全类方法的组合
但注意它们多个方法的组合不是原子的,看如下代码
Hashtable table = new Hashtable();
// 线程1,线程2
if( ("key") == null) {table.put("key", value);
}
如上图所示,当使用方法组合时,出现了线程安全问题,当线程 1 执行完 get(“key”) ,这是一个原子操作没出问题,但是在 get(“key”) == null 比较时,如果线程的时间片用完了,线程 2 获取时间片执行了 get(“key”) == null 操作,然后进行 put(“key”, “v2”) 操作,结束后,线程 1 被分配 cpu 时间片继续执行,执行 put 操作就会出现线程安全问题。
不可变类的线程安全
String和Integer类都是不可变的类,因为其类内部状态是不可改变的,因此它们的方法都是线程安全的,有同学或许有疑问,String 有 replace,substring 等方法【可以】改变值啊,其实调用这些方法返回的已经是一个新创建的对象了!
public String substring(int beginIndex, int endIndex) {if (beginIndex < 0) {throw new StringIndexOutOfBoundsException(beginIndex);}if (endIndex > value.length) {throw new StringIndexOutOfBoundsException(endIndex);}int subLen = endIndex - beginIndex;if (subLen < 0) {throw new StringIndexOutOfBoundsException(subLen);}return ((beginIndex == 0) && (endIndex == value.length)) ? this: new String(value, beginIndex, subLen); // 新建一个对象,然后返回,没有修改等操作,是线程安全的。}
每个同步对象都有一个自己的Monitor(监视器锁),加锁过程如下图所示:
任何一个对象都有一个Monitor与之关联,当且一个Monitor被持有后,它将处于锁定状态。Synchronized在JVM里的实现都是基于进入和退出Monitor对象来实现方法同步和代码块同步,虽然具体实现细节不一样,但是都可以通过成对的monitorenter和monitorexit指令来实现。
同步代码块:
monitorenter:每个对象都是一个监视器锁(monitor)。当monitor被占用时就会处于锁定状态,线程执行monitorenter指令时尝试获取monitor的所有权,过程如下:
a. 如果monitor的进入数为0,则该线程进入monitor,然后将进入数设置为1,该线程即为monitor的所有者;
b. 如果线程已经占有该monitor,只是重新进入,则进入monitor的进入数加1;
c. 如果其他线程已经占用了monitor,则该线程进入阻塞状态,直到monitor的进入数为0,再重新尝试获取monitor的所有权;
monitorexit:执行monitorexit的线程必须是objectref所对应的monitor的所有者。指令执行时,monitor的进入数减1,如果减1后进入数为0,那线程退出monitor,不再是这个monitor的所有者。其他被这个monitor阻塞的线程可以尝试去
获取这个 monitor 的所有权。monitorexit,指令出现了两次,第1次为同步正常退出释放锁;第2次为发生异常退出释放锁。
示例代码:
public class TestSynchronized {static final Object obj = new Object();static int i=0;public static void main(String[] args) {synchronized (obj){i++;}}
}
反编译后的字节码
public static void main(java.lang.String[]);descriptor: ([Ljava/lang/String;)Vflags: ACC_PUBLIC, ACC_STATICCode:stack=2, locals=3, args_size=10: getstatic #2 // 获取obj对象3: dup4: astore_15: monitorenter //将obj对象的markword置为monitor指针6: getstatic #3 9: iconst_110: iadd11: putstatic #3 14: aload_115: monitorexit //同步代码块正常执行时,将obj对象的markword重置,唤醒EntryList16: goto 2419: astore_220: aload_121: monitorexit //同步代码块出现异常时,将obj对象的markword重置,唤醒EntryList22: aload_223: athrow24: returnException table:from to target type6 16 19 any //监测6-16行jvm指令,如果出现异常就会到第19行19 22 19 any
通过代码描述,我们应该能很清楚的看出Synchronized的实现原理,Synchronized的语义底层是通过一个monitor的对象来完成,其实wait/notify等方法也依赖于monitor对象,这就是为什么只有在同步的块或者方法中才能调用wait/notify等方法,否则会抛出java.lang.IllegalMonitorStateException的异常的原因。
同步方法:
方法的同步并没有通过指令 monitorenter 和 monitorexit 来完成(理论上其实也可以通过这两条指令来实现),不过相对于普通方法,其常量池中多了 ACC_SYNCHRONIZED 标示符。JVM就是根据该标示符来实现方法的同步的:
示例代码:
public class TestSynchronized {static int i=0;public synchronized void add(){i++;}
}
反编译后
public synchronized void add();descriptor: ()Vflags: ACC_PUBLIC, ACC_SYNCHRONIZEDCode:stack=2, locals=1, args_size=10: getstatic #2 // Field i:I3: iconst_14: iadd5: putstatic #2 // Field i:I8: return
当方法调用时,调用指令将会检查方法的 ACC_SYNCHRONIZED 访问标志是否被设置,如果设置了,执行线程将先获取monitor,获取成功之后才能执行方法体,方法执行完后再释放monitor。在方法执行期间,其他任何线程都无法再获得同一个monitor对象。
两种同步方式本质上没有区别,只是方法的同步是一种隐式的方式来实现,无需通过字节码来完成。两个指令的执行是JVM通过调用操作系统的互斥原语mutex来实现,被阻塞的线程会被挂起、等待重新调度,会导致“用户态和内核态”两个态之间来回切换,对性能有较大影响。
说了这么多,那什么是Monitor呢??
可以把它理解为 一个同步工具,也可以描述为 一种同步机制,它通常被描述为一个对象。与一切皆对象一样,所有的Java对象是天生的Monitor,每一个Java对象都有成为Monitor的潜质,因为在Java的设计中 ,每一个Java对象自打娘胎里出来就带了一把看不见的锁,它叫做内部锁或者Monitor锁。也就是通常说Synchronized的对象锁,MarkWord锁标识位为10,其中指针指向的是Monitor对象的起始地址。在Java虚拟机(HotSpot)中,Monitor是由ObjectMonitor实现的,其主要数据结构如下(位于HotSpot虚拟机源码ObjectMonitor.hpp文件,C++实现的):
ObjectMonitor中有两个队列,_WaitSet 和 _EntryList,用来保存ObjectWaiter对象列表( 每个等待锁的线程都会被封装成ObjectWaiter对象 ),_owner指向持有ObjectMonitor对象的线程,当多个线程同时访问一段同步代码时:
当多个线程同时请求某个对象锁时,对象锁会设置⼏种状态⽤来区分请求的线程:
当⼀个线程尝试获得锁时,如果该锁已经被占⽤,则会将该线程封装成⼀个 ObjectWaiter 对象插⼊到Contention List的队列的队⾸,然后调⽤ park 函数挂起当前线程。
当线程释放锁时,会从Contention List或EntryList中挑选⼀个线程唤醒,被选中的线程叫做 Heir presumptive 即假定继承⼈,假定继承⼈被唤醒后会尝试获得锁,但 synchronized 是⾮公平的,所以假定继承⼈不⼀定能获得锁。这是因为对于重量级锁,线程先⾃旋尝试获得锁,这样做的⽬的是为了减少执⾏操作系统同步操作带来的开销。如果⾃旋不成功再进⼊等待队列。这对那些已经在等待队列中的线程来说,稍微显得不公平,还有⼀个不公平的地⽅是⾃旋线程可能会抢占了Ready线程的锁(Ready线程是干嘛的???)。线程获得锁后调⽤ Object.wait ⽅法,则会将线程加⼊到WaitSet中,当被 ify 唤醒后,会将线程从WaitSet移动到Contention List或EntryList中去。需要注意的是,当调⽤⼀个锁对象的 wait 或 notify ⽅法时,如当前锁的状态是偏向锁或轻量级锁则会先膨胀成重量级锁。
同时,Monitor对象存在于每个Java对象的对象头Mark Word中(存储的指针的指向),Synchronized锁便是通过这种方式获取锁的,也是为什么Java中任意对象可以作为锁的原因,同时notify/notifyAll/wait等方法会使用到Monitor锁对象,所以必须在同步代码块中使用。监视器Monitor通过两种方式实现同步:互斥与协作。Java 使用对象锁 ( 使用 synchronized 获得对象锁 ) 保证工作在共享的数据集上的线程互斥执行, 使用notify/notifyAll/wait 方法来协同不同线程之间的工作。这些方法在 Object 类上被定义,会被所有的 Java 对象自动继承。多线程环境下线程之间如果需要共享数据,需要解决互斥访问数据的问题,监视器可以确保监视器上的数据在同一时刻只会有一个线程在访问。
执行流程:
参考文章:
HotSpot虚拟机中,对象在内存中存储的布局可以分为三块区域:对象头(Header)、实例数据(Instance Data)和对齐填充(Padding)。
对象头:比如 hash码,对象所属的年代,对象锁,锁状态标志,偏向锁(线程)ID,偏向时间,数组长度(数组对象)等。Java对象头一般占有2个机器码(在32位虚拟机中,1个机器码等于4字节,也就是32bit,在64位虚拟机中,1个机器码是8个字节,也就是64bit),但是 如果对象是数组类型,则需要3个机器码,因为JVM虚拟机可以通过Java对象的元数据信息确定Java对象的大小,但是无法从数组的元数据来确认数组的大小,所以用一块来记录数组长度。
实例数据:存放类的属性数据信息,包括父类的属性信息;
对齐填充:由于虚拟机要求,对象起始地址必须是8字节的整数倍。填充数据不是必须存在的,仅仅是为了字节对齐
当一个线程尝试访问synchronized修饰的代码块时,它首先要获得锁,那么这个锁到底存在哪里呢?
是存在锁对象的对象头中的。
HotSpot采用instanceOopDesc和arrayOopDesc来描述对象头,
arrayOopDesc对象用来描述数组类型。
instanceOopDesc的定义的在Hotspot源码的 instanceOop.hpp 文件中,
另外,arrayOopDesc的定义对应 arrayOop.hpp 。
以 64 位虚拟机为例,普通对象的对象头结构如下,其中的 Klass Word 为指针,指向对应的 Class 对象;
其中 Mark Word 结构为
在64位虚拟机下,Mark Word是64bit大小的,其存储结构如下:
在32位虚拟机下,Mark Word是32bit大小的,其存储结构如下:
锁消除
为了保证数据的完整性,我们在进行操作时需要对这部分操作进行同步控制。但是在有些情况下,JVM检测到不可能存在共享数据竞争,这时JVM会对这些同步锁进行锁消除
。锁消除
的依据是逃逸分析的数据支持。
如果不存在竞争,为什么还需要加锁呢?所以锁消除
可以节省毫无意义的请求锁的时间。变量是否逃逸,对于虚拟机来说需要使用数据流分析来确定,但是对于我们程序员来说这还不清楚么?我们会在明明知道不存在数据竞争的代码块前加上同步吗?但是有时候程序并不是我们所想的那样?我们虽然没有显示使用锁,但是我们在使用一些JDK的内置API时 ,如StringBuffer、Vector、HashTable等,这个时候会存在隐式的加锁操作。比如StringBuffer的append()方法,Vector的add()方法。
public class VectorTest {public void test(){Vector<Integer> vector = new Vector<Integer>(); for(int i=0;i<10;i++){vector.add(i);}System.out.println(vector);}
}
在运行这段代码时, JVM可以明显检测到变量vector没有逃逸出方法VectorTest()之外,所以JVM可以大胆地将vector内部的加锁操作消除。
锁粗化
在使用同步锁的时候,需要让同步块的作用范围尽可能小,仅在共享数据的实际作用域中才进行同步,这样做的目的是为了使需要同步的操作量尽可能缩小。如果存在锁竞争,那么等待锁的线程也能尽快拿到锁。
在大多数的情况下,上述观点是正确的。但是连续加锁解锁操作可能会导致不必要的性能损耗,所以引入锁粗化
的概念。锁粗化
概念比较好理解,就是将多个连续的加锁、解锁操作连接在一起,扩展成一个范围更大的锁。如上面实例: vector每次add的时候都需要加锁操作, JVM检测到对同一个对象( vector )连续加锁、解锁操作,会合并一个更大范围的加锁、解锁操作,即加锁解锁操作会移到for循环之外。
锁膨胀(synrhonized锁的升级)
无锁–》偏向锁–》轻量级锁–》重量级锁
synchronized锁升级详细流程图(JDK1.6优化):
参考如下文章:
深入分析Synchronized原理(阿里面试题)
Synchronized锁升级过程
synchronized锁优化、自旋锁、适应性自旋锁、锁消除、锁粗化、偏向锁、轻量级锁(CAS)、重量锁(monitor)
注意:下述锁均为synchronized锁不同时刻的状态,不是单独类型的锁。另外锁升级后状态不可逆!!!
下面将阐述各种状态synchronized锁的工作原理:
线程的阻塞和唤醒需要CPU从用户态转为核心态,频繁的阻塞和唤醒对CPU来说是一件负担很重的工作,势必会给系统的并发性能带来很大的压力。同时我们发现在许多应用上面,对象锁的锁状态只会持续很短一段时间,为了这一段很短的时间频繁地阻塞和唤醒线程是非常不值得的。 所以引入自旋锁。
所谓自旋锁,就是让该线程等待一段时间,不会被立即挂起,看持有锁的线程是否会很快释放锁。怎么等待呢?执行段无意义的循环即可(自旋)。
自旋等待不能替代阻塞,虽然它可以避免线程切换带来的开销,但是它占用了处理器的时间。如果持有锁的线程很快就释放了锁,那么自旋的效率就非常好,反之,自旋的线程就会白白消耗掉处理的资源,它不会做任何有意义的工作,典型的占着茅坑不拉屎,这样反而会带来住能上的浪费。所以说,自旋等待的时间(自旋的次数)必须要有一个限度,如果自旋超过了定义的时间仍然没有获取到锁,则应该被挂起。若同步代码块执行比较耗时,即线程持有锁的时间较长,自旋锁效率下降,不建议使用。
适应性自旋锁(自旋锁的进一步优化)
假如我将自旋次数调整为10 ,但是系统很多线程都是等你刚刚退出的时候就释放了锁(假如你多自旋一两次就可以获取锁) ,你是不是很尴尬。于是JDK1.6引入自适应的自旋锁,让虚拟机会变得越来越聪明。
JDK 1.6引入了更加聪明的自旋锁,即适应性自旋锁。所谓适应性就意味着自旋的次数不再是固定的,它是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。它怎么做呢?线程如果自旋成功了,那么下次自旋的次数会更加多,因为虚拟机认为既然上次成功了,那么此次自旋也很有可能会再次成功,那么它就会允许自旋等待持续的次数更多。反之,如果对于某个锁,很少有自旋能够成功的。那么在以后要或者这个锁的时候自旋的次数会减少甚至省略掉自旋过程,以免浪费处理器资源。
JDK1.6 :-XX: +UseSpinning 开启;-XX: PreBlockSpin=10 为自旋次数;
JDK1.7 后,去掉此参数,由 jvm 控制;
在轻量级的锁中,我们可以发现,如果同一个线程对同一个对象进行重入锁时,也需要执行 CAS 操作,这是有点耗时的。那么 java6 开始引入了偏向锁,只有第一次使用 CAS 时将对象的 Mark Word 头设置为偏向线程 ID,之后这个入锁线程再进行重入锁时,发现线程 ID 是自己的,那么就不用再进行CAS了。(解决锁重入,实现单线程加锁的锁消除)
偏向状态,对象头格式如下:
/*** @Classname TestBiased* @Description TODO* @Created by Fearless* @Date 2022/6/16 20:47*/
@Slf4j
public class TestBiased {public static void main(String[] args) throws InterruptedException {Dog dog = new Dog();// System.out.HexString(dog.hashCode()));log.debug(ClassLayout.parseInstance(dog).toPrintable());synchronized (dog) {log.debug(ClassLayout.parseInstance(dog).toPrintable());}// 1000110101101000000000101// 00000000011ad005log.debug(ClassLayout.parseInstance(dog).toPrintable());}
}class Dog {}
运行结果:
撤销偏向
以下几种情况会使对象的偏向锁失效
轻量级锁的使用场景是:如果一个对象虽然有多个线程要对它进行加锁,但是加锁的时间是错开的(也就是没有人可以竞争的),那么可以使用轻量级锁来进行优化。轻量级锁对使用者是透明的,即语法仍然是 synchronized ,假设有两个方法同步块,利用同一个对象加锁
static final Object obj = new Object();
public static void method1() {synchronized( obj ) {// 同步块 Amethod2();}
}
public static void method2() {synchronized( obj ) {// 同步块 B}
}
每次指向到 synchronized 代码块时,都会创建锁记录(Lock Record)对象,每个线程都会包括一个锁记录的结构,锁记录内部可以储存对象的 Mark Word 和对象引用 reference
让锁记录中的 Object reference 指向对象,并且尝试用 cas(compare and sweep) 替换 Object 对象的 Mark Word ,将 Mark Word 的值存入锁记录中。
如果 cas 替换成功,那么对象的对象头储存的就是锁记录的地址和状态 00 表示轻量级锁,如下所示
如果cas失败,有两种情况
当退出 synchronized 代码块(解锁时)
如果在尝试加轻量级锁的过程中,cas 操作无法成功,这是有一种情况就是其它线程已经为这个对象加上了轻量级锁,这是就要进行锁膨胀,将轻量级锁变成重量级锁。
重量级锁就是Monitor锁,参考上文Monitor的介绍。
注:只有当对象加锁以后,才能调用 wait 和 notify 方法
什么时候适合使用wait
synchronized (lock) {while(//不满足条件,一直等待,避免虚假唤醒) {lock.wait();}//满足条件后再运行
}synchronized (lock) {//唤醒所有等待线程ifyAll();
}
即 Guarded Suspension,用在一个线程等待另一个线程的执行结果,要点:
多任务版 GuardedObject 图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理。和生产者消费者模式的区别就是:这个生产者和消费者之间是一一对应的关系,但是生产者消费者模式并不是。rpc 框架的调用中就使用到了这种模式。
示例代码:
/*** 同步模式-保护性暂停 (Guarded-Suspension-pattern)*/
@Slf4j(topic = "c.Code_23_Test")
public class Code_23_Test {public static void main(String[] args) {for (int i = 0; i < 3; i++) {new People().start();}try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}for(Integer id : Ids()) {new Postman(id, "内容 " + id).start();}}}@Slf4j(topic = "c.People")
class People extends Thread {@Overridepublic void run() {GuardedObject guardedObject = ateGuardedObject();log.info("收信的为 id: {}", Id());Object o = (5000);log.info("收到信的 id: {}, 内容: {}", Id(), o);}
}@Slf4j(topic = "c.Postman")
class Postman extends Thread {private int id;private String mail;public Postman(int id, String mail) {this.id = id;this.mail = mail;}@Overridepublic void run() {GuardedObject guardedObject = GuardedObject(id);log.info("送信的 id: {}, 内容: {}", id, mail);guardedObjectplete(mail);}
}class Mailboxes {private static int id = 1;private static Map<Integer, GuardedObject> boxes = new Hashtable<>();public static synchronized int generateId() {return id++;}// 用户会进行投信public static GuardedObject createGuardedObject() {GuardedObject guardedObject = new GuardedObject(generateId());boxes.Id(), guardedObject);return guardedObject;}// 派件员会派发信public static GuardedObject getGuardedObject(int id) {ve(id);}public static Set<Integer> getIds() {return boxes.keySet();}
}class GuardedObject {private int id;public GuardedObject(int id) {this.id = id;}public int getId() {return this.id;}private Object response;// 优化等待时间public Object get(long timeout) {synchronized (this) {long begin = System.currentTimeMillis();long passTime = 0;while (response == null) {long waitTime = timeout - passTime; // 剩余等待时间if(waitTime <= 0) {break;}try {this.wait(waitTime);} catch (InterruptedException e) {e.printStackTrace();}passTime = System.currentTimeMillis() - begin;}return response;}}public void complete(Object response) {synchronized (this) {sponse = ify();}}}
要点
“异步”的意思就是生产者产生消息之后消息没有被立刻消费,而“同步模式”中,消息在产生之后被立刻消费了。
park & unpark 是 LockSupport 线程通信工具类的静态方法。
// 暂停当前线程
LockSupport.park();
// 恢复某个线程的运行
LockSupport.unpark;
每个线程都有自己的一个 Parker 对象,由三部分组成 _counter, _cond 和 _mutex
先调用park再调用upark的过程
先调用upark再调用park的过程
线程因为某些原因,导致代码一直无法执行完毕,这种的现象叫做活跃性。
有这样的情况:一个线程需要同时获取多把锁,这时就容易发生死锁
如:t1 线程获得 A 对象锁,接下来想获取 B 对象的锁 t2 线程获得 B 对象锁,接下来想获取 A 对象的锁。
public static void main(String[] args) {final Object A = new Object();final Object B = new Object();new Thread(()->{synchronized (A) {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}synchronized (B) {}}}).start();new Thread(()->{synchronized (B) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}synchronized (A) {}}}).start();}
发生死锁的必要条件
定位死锁的方法
检测死锁可以使用 jconsole
工具;或者使用 jps 定位进程 id,再用 jstack
根据进程 id 定位死锁。
哲学家就餐问题
有五位哲学家,围坐在圆桌旁。 他们只做两件事,思考和吃饭,思考一会吃口饭,吃完饭后接着思考。 吃饭时要用两根筷子吃,桌上共有 5 根筷子,每位哲学家左右手边各有一根筷子。 如果筷子被身边的人拿着,自己就得等待 。
当每个哲学家即线程持有一根筷子时,他们都在等待另一个线程释放锁,因此造成了死锁。这种线程没有按预期结束,执行不下去的情况,归类为【活跃性】问题,除了死锁以外,还有活锁和饥饿者两种情况。
/*** @author mayujie@jyd* @version v1.0* @description Chopstick* @date 2022/6/22 16:33*/
public class Chopstick extends ReentrantLock {String name;public Chopstick(String name) {this.name = name;}@Overridepublic String toString() {return "Chopstick{" +"name='" + name + ''' +'}';}public static void main(String[] args) {Chopstick c1 = new Chopstick("1");Chopstick c2 = new Chopstick("2");Chopstick c3 = new Chopstick("3");Chopstick c4 = new Chopstick("4");Chopstick c5 = new Chopstick("5");new Philosopher("苏格拉底", c1, c2).start();new Philosopher("柏拉图", c2, c3).start();new Philosopher("亚里士多德", c3, c4).start();new Philosopher("赫拉克利特", c4, c5).start();new Philosopher("阿基米德", c5, c1).start();}
}@Slf4j
class Philosopher extends Thread {Chopstick left;Chopstick right;public Philosopher(String name, Chopstick left, Chopstick right) {super(name);this.left = left;this.right = right;}public void eat() {log.debug(");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic void run() {while (true) {/*// 获取左筷子synchronized (left) {// 获取右筷子synchronized (right) {eat();}}*/// 使用ReentrantLock解决哲学家就餐问题// 公平锁会降低并发度if (Lock()) {try {if (Lock()) {try {eat();} finally {right.unlock();}}} finally {left.unlock();}}}}
}
避免死锁的方法
活锁出现在两个线程互相改变对方的结束条件,谁也无法结束。
避免活锁的方法
在线程执行时,中途给予不同的间隔时间即可。
死锁与活锁的区别
某些线程因为优先级太低,导致一直无法获得资源的现象。在使用顺序加锁时,可能会出现饥饿现象
和 synchronized 相比具有的的特点
常用API:
Condition 类和 Object 类锁方法区别区别
tryLock 和 lock 和 lockInterruptibly 的区别
Semaphore 是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore 可以用来构建一些对象池,资源池之类的,比如数据库连接池。
/*** @author mayujie@jyd* @version v1.0* @description Semaphore* @date 2022/7/28 17:42*/
public class SemaphoreTest {public static void main(String[] args) {Semaphore semp = new Semaphore(5);try {semp.acquire(); // 申请许可System.out.println("执行业务代码...");System.out.println(semp.availablePermits()); // print 4} catch (InterruptedException e) {e.printStackTrace();} finally {lease();// 释放许可}System.out.println(semp.availablePermits()); // 释放之后可用的信号量复原}}
Semaphore 基本能完成 ReentrantLock 的所有工作,使用方法也与之类似,通过 acquire()与release()方法来获得和释放临界资源。经实测,Semaphone.acquire()方法默认为可响应中断锁,与 ReentrantLock.lockInterruptibly()作用效果一致,也就是说在等待临界资源的过程中可以被Thread.interrupt()方法中断。
此外,Semaphore 也实现了可轮询的锁请求与定时锁的功能,除了方法名 tryAcquire 与 tryLock不同,其使用方法与 ReentrantLock 几乎一致。Semaphore 也提供了公平与非公平锁的机制,也可在构造函数中进行设定。
Semaphore 的锁释放操作也由手动进行,因此与 ReentrantLock 一样,为避免线程因抛出异常而无法正常释放锁的情况发生,释放锁的操作也必须在 finally 代码块中完成。
字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier 可以被重用。我们暂且把这个状态就叫做barrier,当调用 await()方法之后,线程就处于 barrier 了。CyclicBarrier 中最重要的方法就是 await 方法,它有 2 个重载版本:
CountDownLatch 类位于 urrent 包下,利用它可以实现类似计数器的功能。比如有一个任务 A,它要等待其他 4 个任务执行完毕之后才能执行,此时就可以利用 CountDownLatch来实现这种功能了。
区别:
方法类型 | 名称 | 用法介绍 |
---|---|---|
插入 | boolean add(E paramE) | 将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则抛出 IllegalStateException。如果该元素是 NULL,则会抛出 NullPointerException 异常。 |
插入 | boolean offer(E paramE) | 将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则返回 false。 |
插入 | put(E paramE) | 将指定元素插入此队列中,将阻塞等待可用的空间(如果有必要) |
插入 | offer(E o, long timeout, TimeUnit unit) | 可以设定等待的时间,如果在指定的时间内,还不能往队列中加入 BlockingQueue,则返回失败。 |
取出 | poll(time) | 取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等 time 参数规定的时间,取不到时返回 null |
取出 | poll(long timeout, TimeUnit unit) | 从 BlockingQueue 取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回失败 |
取出 | take() | 取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 BlockingQueue 有新的数据被加入。 |
取出 | drainTo() | 一次性从 BlockingQueue 获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。 |
Java中的阻塞队列实现有:
Java内存模型的英文名称为Java Memory Model(JMM),其并不像JVM内存结构一样真实存在,而是一个抽象的概念。在JSR133(java规范提案,JSR是Java界的一个重要标准)里提出用来定义一致的、跨平台的内存模型。Java多线程对共享内存(主内存)进行操作的时候,会存在一些如可见性、原子性和有序性的问题,JMM就是围绕着多线程这些特征而建立的模型。而JMM定义了一些语法集,而这些语法集映射到Java语言的volatile、synchronized等关键字,来保证jvm在内存上数据读写的规则。其中需要保证三个特性:
另外,多线程对主存上变量(跨线程的共享变量)进行读写访问时,通常时copy一份变量副本到工作内存然后进行其他操作。但在多线程情况下会造成破坏上面的特性,因此需要通过使用volatile或加锁的方式来避免线程的不安全现象。
感兴趣的小伙伴可以参考文章👉
【对线面试官】Java内存模型为什么存在?
【对线面试官】深入浅出Java内存模型
public class VolatileVisibilityDemo {public static void main(String args []) {// 资源类MyData myData = new MyData();// AAA线程 实现了Runnable接口的,lambda表达式new Thread(() -> {System.out.println(Thread.currentThread().getName() + "t come in");// 线程睡眠3秒,假设在进行运算try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}// 修改number的值myData.addTo60();// 输出修改后的值System.out.println(Thread.currentThread().getName() + "t update number value:" + myData.number);}, "AAA").start();// main线程就一直在这里等待循环,直到number的值不等于零while(myData.number == 0) {}// 按道理这个值是不可能打印出来的,因为主线程运行的时候,number的值为0,所以一直在循环// 如果能输出这句话,说明AAA线程在睡眠3秒后,更新的number的值,重新写入到主内存,并被main线程感知到了// 由于没有volatile修饰MyData类的成员变量number,main线程将会卡在while(myData.number == 0) {},不能正常结束// 若想正确结束,用volatile修饰MyData类的成员变量numberSystem.out.println(Thread.currentThread().getName() + "t mission is over");}/*** 假设是主物理内存*/static class MyData {/*** volatile 修饰的关键字,是为了增加 主线程和线程之间的可见性,只要有一个线程修改了内存中的值,其它线程也能马上感知*/volatile int number = 0;// int number = 0;// synchronized仅能保证原子性和有序性,但是可见性无法保证public synchronized void addTo60() {this.number = 60;}}
}
试问上述代码变量number 如果不加volatile修饰关键字,效果会怎么样?
虽然main线程创建的AAA线程修改了number的值,但由于主存中存放的值为0,AAA线程修改后的值并没有立刻可见,也就是不能及时的刷新到主存中(思考:不加volatile什么时候将会更新主存的值呢?),因此可以看出mission is over一直没打印出来,main处于持续的死循环中。显然,这不满足可见性的要求。
当我们加上volatile便能正常运行!!!
volatile原理:volatile修饰的变量是通过lock前缀指令,通过在读写操作前后添加内存屏障来解决可见性、有序性(这块的有序性,我理解应该是建立在满足可见性,且没有重排序的基础上来说的,具体还得分析。不过java多线程在满足这两个条件后,就能基本保证有序性的要求了)问题。内存屏障的作用有两点:禁止指令重排序来保证有序性;修改后的共享变量会立即同步到主内存中,同时让其他线程的本地内存(缓冲区/高速缓存)中该变量失效,其他线程访问共享变量时需要再从主内存加载,从而避免工作内存(缓冲区/高速缓存)与主存间的同步延迟问题。
那么volatile能否保证原子性呢?试着执行下面代码
public class VolatileAtomicityDemo {public static void main(String[] args) {MyData2 myData = new MyData2();// 创建10个线程,线程里面进行1000次循环for (int i = 0; i < 20; i++) {new Thread(() -> {// 里面for (int j = 0; j < 1000; j++) {myData.addPlusPlus();}}, String.valueOf(i)).start();}// 需要等待上面20个线程都计算完成后,在用main线程取得最终的结果值// 这里判断线程数是否大于2,为什么是2?因为默认是有两个线程的,一个main线程,一个gc线程while(Thread.activeCount() > 2) {// yield表示不执行Thread.yield();}// 查看最终的值// 假设volatile保证原子性,那么输出的值应该为: 20 * 1000 = 20000System.out.println(Thread.currentThread().getName() + "t finally number value: " + myData.number);}static class MyData2 {/*** volatile 修饰的关键字,是为了增加 主线程和线程之间的可见性,只要有一个线程修改了内存中的值,其它线程也能马上感知*/volatile int number = 0;public void addPlusPlus() {number ++;}}}
从上面代码可以看到number的值并没有像预计的那样累加到20000,因此可以得出volatile并不能保证原子性。主要原因在于 i++ 这个操作本身不是原子性的(等效于i = i + 1 可以分为读取 i,执行 i+1 操作,把 i+1 赋值给 i 这三个操作), 如果两个线程A和B,他们都读取了主存中的 i 值,A进行了 i+1 的操作,之后被阻塞,B又进行了 i+1 的操作,之后将新值赋值给 i,i 值被刷新回主存,此时由于A已经执行完了 i+1 操作,所以即使主存中的i值改变了,缓存一致性原则将A中的 i 变为新值,但是这个 i 值的改变也不会影响它将之前执行完的 i+1 得到的值赋给 i这一步,同样导致了最后的结果出错重要的点在于是否执行了 i+1 操作,如果只是读取了 i 值,在进行下一步操作之前,主存中的 i 已经变化了,那么我觉得还是会在缓存一致性原则下,刷新已经读取过的值。(如果可见性在+1之前发生就不会造成写覆盖,最终的结果出错,根本原因还是i++不是原子操作)
下面代码使用单例模式用的懒汉式写法,简单应用下volatile关键字:
public class Singleton implements Serializable {//懒汉式:某些编译器可能发生指令重排,先instance指向内存空间,后初始化对象,导致后来的线程会获得未初始化的对象/*private static volatile Singleton instance; //避免指令重排private Singleton(){//利用异常保证反射安全if (Singleton.instance != null){throw new IllegalStateException("该对象已创建!");}}public static Singleton getInstance(){//同步代码块,doubleChecking保证线程安全 双重校验可以减少同步次数// 使用双检锁的原因:首先要明白单线程下的单例模式必须要有一个if判断才能保证单例,多线程下synchronized放在if判断外及方法上,// 能保证线程安全但是锁的粒度太大,因此加锁前进行额外一次的判断操作,确保对象被创建出来后后续线程继续去竞争锁,进而提高并发度。if (instance == null){synchronized (Singleton.class){if (instance == null){// 双检锁并不一定保证线程安全,因为还存在指令重排。对象初始化过程如下:// 1、分配内存空间// 2、初始化对象// 3、将对象引用赋值给instance// 会导致2、3执行顺序交换,破坏单例instance = new Singleton();System.out.println("已创建!");}}}return instance;}// 保证反射安全private Object readResolve(){return SingletonHolder.instance;}
}
单例模式必须通过double-checking(加锁)及volatile 的方式才能确保线程安全。
as-if-serial:不管怎么重排序,单线程程序的执行结果不能被改变。编译器、处理器和系统内存都必须遵守as-if-serial语义。所以编译器和处理器不会对存在数据依赖关系的操作做重排序,因为这种重排序会改变执行结果。
happens-before:JMM通过happens-before关系对外提供跨线程的内存可见性保证(如果A线程的写操作a与B线程的读操作b之间存在happens-before关系,尽管a操作和b操作在不同的线程中执行,但JMM向程序员保证a操作将对b操作可见,即B线程能够立刻感知到A线程的修改操作)
具体规则:
urrent.atomic并发包提供了一些并发工具类,这里把它分成五类:
使用原子的方式更新基本类型
上面三个类提供的方法几乎相同,所以我们将以 AtomicInteger 为例子来介绍。通过观察源码可以发现,AtomicInteger 内部都是通过cas的原理来实现的。
常用api包括:
public static void main(String[] args) {AtomicInteger i = new AtomicInteger(0);// 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++System.out.AndIncrement());// 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++iSystem.out.println(i.incrementAndGet());// 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --iSystem.out.println(i.decrementAndGet());// 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--System.out.AndDecrement());// 获取并加值(i = 0, 结果 i = 5, 返回 0)System.out.AndAdd(5));// 加值并获取(i = 5, 结果 i = 0, 返回 0)System.out.println(i.addAndGet(-5));// 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)// 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用System.out.AndUpdate(p -> p - 2));// 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)// 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用System.out.println(i.updateAndGet(p -> p + 2));// 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)// 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用// getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的// getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 finalSystem.out.AndAccumulate(10, (p, x) -> p + x));// 计算并获取(i = 10, p 为 i 的当前值, x 为参数1值, 结果 i = 0, 返回 0)// 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));}
为什么需要原子引用类型?保证引用类型的共享变量是线程安全的(确保这个原子引用没有引用过别人)。
基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用引用类型原子类。
先看如下代码的问题:
class DecimalAccountUnsafe implements DecimalAccount {BigDecimal balance;public DecimalAccountUnsafe(BigDecimal balance) {this.balance = balance;}@Overridepublic BigDecimal getBalance() {return balance;}// 取款任务@Overridepublic void withdraw(BigDecimal amount) {BigDecimal balance = Balance();this.balance = balance.subtract(amount);}
}
当执行 withdraw 方法时,可能会有线程安全,我们可以加锁解决或者是使用无锁的方式 CAS 来解决,解决方式是用 AtomicReference 原子引用解决。
代码如下
class DecimalAccountCas implements DecimalAccount {private AtomicReference<BigDecimal> balance;public DecimalAccountCas(BigDecimal balance) {this.balance = new AtomicReference<>(balance);}@Overridepublic BigDecimal getBalance() {();}@Overridepublic void withdraw(BigDecimal amount) {while (true) {BigDecimal preVal = ();BigDecimal nextVal = preVal.subtract(amount);if(balancepareAndSet(preVal, nextVal)) {break;}}}
}
看如下代码:
public static AtomicReference<String> ref = new AtomicReference<>("A");public static void main(String[] args) throws InterruptedException {log.debug("");String preVal = ();other();TimeUnit.SECONDS.sleep(1);log.debug("change A->C {}", refpareAndSet(preVal, "C"));}private static void other() throws InterruptedException {new Thread(() -> {log.debug("change A->B {}", (), "B"));}, "t1").start();TimeUnit.SECONDS.sleep(1);new Thread(() -> {log.debug("change B->A {}", (), "A"));}, "t2").start();}
主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又改回 A 的情况,如果主线程希望:只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号。使用AtomicStampedReference来解决。
使用 AtomicStampedReference 加 stamp (版本号或者时间戳)的方式解决 ABA 问题。代码如下:
// 两个参数,第一个:变量的值 第二个:版本号初始值public static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);public static void main(String[] args) throws InterruptedException {log.debug("");String preVal = Reference();int stamp = Stamp();log.info("main 拿到的版本号 {}",stamp);other();TimeUnit.SECONDS.sleep(1);log.info("修改后的版本号 {}",Stamp());log.info("change A->C:{}", refpareAndSet(preVal, "C", stamp, stamp + 1));}private static void other() throws InterruptedException {new Thread(() -> {int stamp = Stamp();log.info("{}",stamp);log.info("change A->B:{}", Reference(), "B", stamp, stamp + 1));}).start();TimeUnit.SECONDS.sleep(1);new Thread(() -> {int stamp = Stamp();log.info("{}",stamp);log.debug("change B->A:{}", Reference(), "A",stamp,stamp + 1));}).start();}
AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如:A -> B -> A ->C,通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了AtomicMarkableReference 。
使用原子的方式更新数组里的某个元素
上面三个类提供的方法几乎相同,所以我们这里以 AtomicIntegerArray 为例子来介绍,代码如下:
public class Code_10_AtomicArrayTest {public static void main(String[] args) throws InterruptedException {/*** 结果如下:* [9934, 9938, 9940, 9931, 9935, 9933, 9944, 9942, 9939, 9940]* [10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]*/demo(() -> new int[10],(array) -> array.length,(array, index) -> array[index]++,(array) -> System.out.String(array)));TimeUnit.SECONDS.sleep(1);demo(() -> new AtomicIntegerArray(10),(array) -> array.length(),(array, index) -> AndIncrement(index),(array) -> System.out.println(array));}private static <T> void demo(Supplier<T> arraySupplier,Function<T, Integer> lengthFun,BiConsumer<T, Integer> putConsumer,Consumer<T> printConsumer) {ArrayList<Thread> ts = new ArrayList<>(); // 创建集合T array = (); // 获取数组int length = lengthFun.apply(array); // 获取数组的长度for(int i = 0; i < length; i++) {ts.add(new Thread(() -> {for (int j = 0; j < 10000; j++) {putConsumer.accept(array, j % length);}}));}ts.forEach(Thread::start);ts.forEach((thread) -> {try {thread.join();} catch (InterruptedException e) {e.printStackTrace();}});printConsumer.accept(array);}}
使用原子数组可以保证元素的线程安全。
注意:利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常
Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type
代码如下:
public class Code_11_AtomicReferenceFieldUpdaterTest {public static AtomicReferenceFieldUpdater ref wUpdater(Student.class, String.class, "name");public static void main(String[] args) throws InterruptedException {Student student = new Student();new Thread(() -> {System.out.println(refpareAndSet(student, null, "list"));}).start();System.out.println(refpareAndSet(student, null, "张三"));System.out.println(student);}}class Student {public volatile String name;@Overridepublic String toString() {return "Student{" +"name='" + name + ''' +'}';}
}
字段更新器就是为了保证类中某个属性线程安全问题。
public static void main(String[] args) {for(int i = 0; i < 5; i++) {demo(() -> new AtomicLong(0), (ref) -> AndIncrement());}for(int i = 0; i < 5; i++) {demo(() -> new LongAdder(), (ref) -> ref.increment());}}private static <T> void demo(Supplier<T> supplier, Consumer<T> consumer) {ArrayList<Thread> list = new ArrayList<>();T adder = ();// 4 个线程,每人累加 50 万for (int i = 0; i < 4; i++) {list.add(new Thread(() -> {for (int j = 0; j < 500000; j++) {consumer.accept(adder);}}));}long start = System.nanoTime();list.forEach(t -> t.start());list.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});long end = System.nanoTime();System.out.println(adder + " cost:" + (end - start)/1000_000);}
执行代码后,发现使用 LongAdder 比 AtomicLong 快2,3倍,使用 LongAdder 性能提升的原因很简单,就是在有竞争时,设置多个累加单元(但不会超过cpu的核心数),Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。
LongAdder 类有几个关键域
public class LongAdder extends Striped64 implements Serializable {}
下面的变量属于 Striped64 被 LongAdder 继承。
// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;
// 在 cells 创建或扩容时, 置为 1, 表示加锁
transient volatile int cellsBusy;
public class Code_13_LockCas {public AtomicInteger state = new AtomicInteger(0); // 如果 state 值为 0 表示没上锁, 1 表示上锁public void lock() {while (true) {if(statepareAndSet(0, 1)) {break;}}}public void unlock() {log.debug(");state.set(0);}public static void main(String[] args) {Code_13_LockCas lock = new Code_13_LockCas();new Thread(() -> {log.info(");lock.lock();try {log.info("上锁成功");TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}, "t1").start();new Thread(() -> {log.info(");lock.lock();try {log.info("上锁成功");TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}, "t2").start();}}
其中 Cell 即为累加单元
// 防止缓存行伪共享
@sun.misc.Contended
static final class Cell {volatile long value;Cell(long x) { value = x; }// 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值final boolean cas(long prev, long next) {return UNSAFEpareAndSwapLong(this, valueOffset, prev, next);}// 省略不重要代码
}
下面讨论 @sun.misc.Contended 注解的重要意义
得从缓存说起,缓存与内存的速度比较
因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。缓存离 cpu 越近速度越快。 而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long),缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中,CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效。
因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因 此缓存行可以存下 2 个的 Cell 对象。这样问题来了: Core-0 要修改 Cell[0],Core-1 要修改 Cell[1]
无论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加 Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1 的缓存行失效,@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效
LongAdder 进行累加操作是调用 increment 方法,它又调用 add 方法。
public void increment() {add(1L);}
第一步:add 方法分析,流程图如下
源码如下:
public void add(long x) {// as 为累加单元数组, b 为基础值, x 为累加值Cell[] as; long b, v; int m; Cell a;// 进入 if 的两个条件// 1. as 有值, 表示已经发生过竞争, 进入 if// 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 if// 3. 如果 as 没有创建, 然后 cas 累加成功就返回,累加到 base 中 不存在线程竞争的时候用到。if ((as = cells) != null || !casBase(b = base, b + x)) {// uncontended 表示 cell 是否有竞争,这里赋值为 true 表示有竞争boolean uncontended = true;if (// as 还没有创建as == null || (m = as.length - 1) < 0 ||// 当前线程对应的 cell 还没有被创建,a为当线程的cell(a = as[getProbe() & m]) == null ||// 给当前线程的 cell 累加失败 uncontended=false ( a 为当前线程的 cell )!(uncontended = a.cas(v = a.value, v + x))) {// 当 cells 为空时,累加操作失败会调用方法,// 当 cells 不为空,当前线程的 cell 创建了但是累加失败了会调用方法,// 当 cells 不为空,当前线程 cell 没创建会调用这个方法// 进入 cell 数组创建、cell 创建的流程longAccumulate(x, null, uncontended);}}}
第二步:longAccumulate 方法分析,流程图如下:
源码如下:
final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {int h;// 当前线程还没有对应的 cell, 需要随机生成一个 h 值用来将当前线程绑定到 cellif ((h = getProbe()) == 0) {// 初始化 probeThreadLocalRandom.current();// h 对应新的 probe 值, 用来对应 cellh = getProbe();wasUncontended = true;}// collide 为 true 表示需要扩容boolean collide = false;for (;;) {Cell[] as; Cell a; int n; long v;// 已经有了 cellsif ((as = cells) != null && (n = as.length) > 0) {// 但是还没有当前线程对应的 cellif ((a = as[(n - 1) & h]) == null) {// 为 cellsBusy 加锁, 创建 cell, cell 的初始累加值为 x// 成功则 break, 否则继续 continue 循环if (cellsBusy == 0) { // Try to attach new CellCell r = new Cell(x); // Optimistically createif (cellsBusy == 0 && casCellsBusy()) {boolean created = false;try { // Recheck under lockCell[] rs; int m, j;if ((rs = cells) != null &&(m = rs.length) > 0 &&// 判断槽位确实是空的rs[j = (m - 1) & h] == null) {rs[j] = r;created = true;}} finally {cellsBusy = 0;}if (created)break;continue; // Slot is now non-empty}}// 有竞争, 改变线程对应的 cell 来重试 caselse if (!wasUncontended)wasUncontended = true;// cas 尝试累加, fn 配合 LongAccumulator 不为 null, 配合 LongAdder 为 nullelse if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))break;// 如果 cells 长度已经超过了最大长度, 或者已经扩容, 改变线程对应的 cell 来重试 caselse if (n >= NCPU || cells != as)collide = false;// 确保 collide 为 false 进入此分支, 就不会进入下面的 else if 进行扩容了else if (!collide)collide = true;// 加锁else if (cellsBusy == 0 && casCellsBusy()) {// 加锁成功, 扩容continue;}// 改变线程对应的 cellh = advanceProbe(h);}// 还没有 cells, cells==as是指没有其它线程修改cells,as和cells引用相同的对象,使用casCellsBusy()尝试给 cellsBusy 加锁else if (cellsBusy == 0 && cells == as && casCellsBusy()) {// 加锁成功, 初始化 cells, 最开始长度为 2, 并填充一个 cell// 成功则 break;boolean init = false;try { // Initialize tableif (cells == as) {Cell[] rs = new Cell[2];rs[h & 1] = new Cell(x);cells = rs;init = true;}} finally {cellsBusy = 0;}if (init)break;}// 上两种情况失败, 尝试给 base 使用casBase累加else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))break;}}
获取最终结果通过 sum 方法,将各个累加单元的值加起来就得到了总的结果。
public long sum() {Cell[] as = cells; Cell a;long sum = base;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;}
Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得。LockSupport 的 park 方法,cas 相关的方法底层都是通过Unsafe类来实现的。
public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {// Unsafe 使用了单例模式,unsafe 对象是类中的一个私有的变量 Field theUnsafe = DeclaredField("theUnsafe");theUnsafe.setAccessible(true);Unsafe unsafe = ((null);}
public class Code_14_UnsafeTest {public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {// 创建 unsafe 对象Field theUnsafe = DeclaredField("theUnsafe");theUnsafe.setAccessible(true);Unsafe unsafe = ((null);// 拿到偏移量long idOffset = unsafe.objectFieldOffset(DeclaredField("id"));long nameOffset = unsafe.objectFieldOffset(DeclaredField("name"));// 进行 cas 操作Teacher teacher = new Teacher();unsafepareAndSwapLong(teacher, idOffset, 0, 100);unsafepareAndSwapObject(teacher, nameOffset, null, "lisi");System.out.println(teacher);}}@Data
class Teacher {private volatile int id;private volatile String name;}
public class Code_15_UnsafeAccessor {public static void main(String[] args) {Account.demo(new MyAtomicInteger(10000));}
}class MyAtomicInteger implements Account {private volatile Integer value;private static final Unsafe UNSAFE = Unsafe();private static final long valueOffset;static {try {valueOffset = UNSAFE.objectFieldOffset(DeclaredField("value"));} catch (Exception ex) { throw new Error(ex); }}public MyAtomicInteger(Integer value) {this.value = value;}public Integer get() {return value;}public void decrement(Integer amount) {while (true) {Integer preVal = this.value;Integer nextVal = preVal - amount;if(UNSAFEpareAndSwapObject(this, valueOffset, preVal, nextVal)) {break;}}}@Overridepublic Integer getBalance() {return get();}@Overridepublic void withdraw(Integer amount) {decrement(amount);}
}
public static void main(String[] args) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");for (int i = 0; i < 10; i++) {new Thread(() -> {try {log.debug("{}", sdf.parse("1951-04-21"));} catch (Exception e) {("{}", e);}}).start();}}
思路 - 不可变对象
如果一个对象在不能够修改其内部状态(属性),那么它就是线程安全的,因为不存在并发修改啊!这样的对象在 Java 中有很多,例如在 Java 8 后,提供了一个新的日期格式化类 DateTimeFormatter
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");for (int i = 0; i < 10; i++) {new Thread(() -> {LocalDate date = dtf.parse("2018-10-01", LocalDate::from);log.debug("{}", date);}).start();}
发现该类、类中所有属性都是 final 的
但有同学会说,使用字符串时,也有一些跟修改相关的方法啊,比如 substring 等,那么下面就看一看这些方法是 如何实现的,就以 substring 为例:
public String substring(int beginIndex, int endIndex) {if (beginIndex < 0) {throw new StringIndexOutOfBoundsException(beginIndex);}if (endIndex > value.length) {throw new StringIndexOutOfBoundsException(endIndex);}int subLen = endIndex - beginIndex;if (subLen < 0) {throw new StringIndexOutOfBoundsException(subLen);}// 上面是一些校验,下面才是真正的创建新的String对象return ((beginIndex == 0) && (endIndex == value.length)) ? this: new String(value, beginIndex, subLen);}
发现其内部是调用 String 的构造方法创建了一个新字符串
public String(char value[], int offset, int count) {if (offset < 0) {throw new StringIndexOutOfBoundsException(offset);}if (count <= 0) {if (count < 0) {throw new StringIndexOutOfBoundsException(count);}if (offset <= value.length) {this.value = "".value;return;}}// Note: offset or count might be near -1>>>1.if (offset > value.length - count) {throw new StringIndexOutOfBoundsException(offset + count);}// 上面是一些安全性的校验,下面是给String对象的value赋值,新创建了一个数组来保存String对象的值this.value = pyOfRange(value, offset, offset+count);}
构造新字符串对象时,会生成新的 char[] value,对内容进行复制 。这种通过创建副本对象来避免共享的手段称之为【保护性拷贝(defensive copy)】。通过拷贝副本的方式保证每线程仅对副本操作,就能确保线程安全。
简介定义英文名称:Flyweight pattern。 当需要重用数量有限的同一类对象时,归类为:Structual patterns(属于结构型模式的一种设计模式)。
包装类
在JDK中 Boolean,Byte,Short,Integer,Long,Character 等包装类提供了 valueOf 方法。
例如 Long 的 valueOf 会缓存 -128~127 之间的 Long 对象,在这个范围之间会重用对象,大于这个范围,才会新建 Long 对象:
public static Long valueOf(long l) {final int offset = 128;if (l >= -128 && l <= 127) { // will cachereturn LongCache.cache[(int)l + offset];}return new Long(l);
}
Byte, Short, Long 缓存的范围都是 -128~127
Character 缓存的范围是 0~127
Integer 的默认范围是 -128~127,最小值不能变,但最大值可以通过调整虚拟机参数 "-Djava.lang.Integer.IntegerCache.high "来改变
Boolean 缓存了 TRUE 和 FALSE
String 池
参考如下文章:JDK1.8关于运行时常量池, 字符串常量池的要点
BigDecimal、BigInteger
本文发布于:2024-02-01 02:08:17,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170672449733080.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |