同步指调用者必须等到方法调用返回后,才能继续后续的行为;异步通常会在另一个线程中执行
并发偏重于多个任务交替执行,并行是真正意义上的“同时执行”(多核操作系统中)
临界区用来表示一种公共资源或者说是共享数据,而且可以被多个线程使用,但是每次只有一个线程使用它,一旦临界区资源被占用,其他线程想要使用这个资源,就必须等待
如果一个线程占用了临界区资源,那么其他所有需要这个资源的线程就必须在这个临界区中进行等待。等待会导致挂起,这种情况就是阻塞。非阻塞就是相反,强调没有一个线程可以妨碍其他线程执行
死锁是最糟糕的情况:互相占用了其他线程的资源
饥饿:指一个或者多个线程因为种种原因无法获得资源
活锁:实际可以理解为另一种形式的饥饿,活锁在获取不到资源时候会不断地尝试(通过不断的改变自身的状态)
大致上可以分为阻塞、无饥饿、无障碍、无锁、无等待
当我们使用synchronized关键字或者重入锁时,我们得到的就是阻塞的线程。因为上面两种都会试图获取临界区的锁,如果得不到,线程就会被挂起等待。
下面的几种都是非阻塞算法
如果锁是公平的,满足先来后到,就不会产生饥饿现象
无障碍是一种最弱的非阻塞调度,线程不会因为获取临界区的问题导致其他线程挂起
如何防止破坏数据:会检测数据是否已经被破坏,如果被破坏会回滚
其实是一种乐观的调度策略
无障碍的多线程并不能一定会顺畅的进行。因为当临界区中出现严重的冲突时,所有的线程可能都会不断地回滚自己的操作,从而没有一个线程能够离开临界区(因为修改的数据可能是部分的,有互相依赖)
无障碍的实现一般通过一个“一致性标记”实现
无锁的并行都是无障碍的。所有的线程对能尝试对临界区的访问,无锁的并发保证必然有一个线程能够在有限步内完成操作离开临界区
无锁尝试修改变量一会判断是否有冲突,如果有冲突会继续尝试。无锁的并行总能保证有一个线程是胜出的
示意代码:
while(!atomicVarpareAndSet(localVar,localVar+1)){
localVar = ();
}
无锁只要求一个线程可以在有限步内完成操作,而无等待则在无锁的基础上更进一步进行扩展,他要求所有的线程都必须在有限步内完成,这样就不会引起饥饿问题
定义系统的加速比:优化钱系统耗时/优化后系统耗时
当程序的串行比一定时,加速比是有上限的,不能光靠堆叠CPU就能提升效率
如果并行代码所占的比例足够多,那么加速比就能随着CPU的数量线性增加
JMM是Java的内存模型
JMM的关键技术就是围绕多线程的原子性、可见性和有序性来建立的
是指一个操作是不可中断的。即使多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程干扰。比如int型变量的赋值就是一个原子性操作,但是long型的赋值就不是这样,因为long是64位,对于总线为32位的机器,存在被其他线程干扰的可能性。案例:
public class MultiThreadLong {public static long t = 0;public static class ChangeT implements Runnable {private long to;public ChangeT(long to) { = to;}@Overridepublic void run() {while (true) {MultiThreadLong.t = to;Thread.yield();}}}public static class ReadT implements Runnable {@Overridepublic void run() {while (true) {long temp = MultiThreadLong.t;if (temp != 111L && temp != -999L && temp != 333L && temp != -444L) {System.out.println(temp);}Thread.yield();}}}public static void main(String args[]) {//打印的数据会出现前32位和后32位和其他赋的值串了new Thread(new ChangeT(111L)).start();new Thread(new ChangeT(-999L)).start();new Thread(new ChangeT(333L)).start();new Thread(new ChangeT(-444L)).start();new Thread(new ReadT()).start();}
}
可见性是指当一个线程修改了一个变量的值后,另一个线程是否知道该值已经被修改。由于变量可能会放入寄存器或者缓存里导致变量的更新不能够实时感知。和可见性有关的:缓存优化或者硬件优化、指令重排以及编译器的优化。
当然对于一个线程来说,执行顺序是一致的,但是对于多线程来说,可能存在重排。指令重排的作用是减少执行流水线的中断,提高执行的效率(利用重排在CPU空闲等待的时候执行有意义的指令)。
并不是所有的指令都可以重排,指令重排需要满足Happen-Before规则
进程是系统进行资源分配和调度的基本单位,进程是线程的容器。线程是轻量级的进程,是程序执行的最小单位。线程有以下几种状态:
NEW 表示刚刚创建,调用start方法可以执行
RUNNABLE 表示所需的一切资源已经准备好了
BLOCKED 如果线程遇到了同步块,就会进入阻塞状态,直到获得锁
WATITING 等待 无限制
TIMED-WAITING 有限制等待
TERMINATED 终止
Thread t = new Thread();
t.start()
注意,不能调用Thread的run方法来启动线程,这种调用只会把run当成Thread的一个普通的方法来调用,而不是新建一个线程,因为这不符合我们之前说的线程的生命周期。
也可以实现Runnable接口并作为构造参数传递给Thread。实际上Thread本身就是实现了Runnable接口,而且Thread.run()方法就是实现了该接口的方法。
Thread有一个stop方法能够结束线程,但是这种是暴力终止,可能会破坏数据:该方法会直接终止进程,并且会立即释放这个线程所持有的锁,而这些锁恰恰是用来维护对象一致性的。
stop方法破坏数据的案例:
public class StopThreadUnsafe {public static User user = new User();public static class User {private int id;private String name;...@Overridepublic String toString() {return "User{" +"name='" + name + ''' +", id=" + id +'}';}}public static class ChangeObjectThread extends Thread {public void run() {while (true) {synchronized (user) {int v = (int) (System.currentTimeMillis() / 1000);user.setId(v);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}user.setName(v + "");}Thread.yield();}}}public static class ReadObjectThread extends Thread {public void run() {while (true) {synchronized (user) {if (Id() != Integer.Name())) {System.out.String());}}Thread.yield();}}}public static void main(String args[]) throws InterruptedException {new ReadObjectThread().start();while (true) {Thread thread = new ChangeObjectThread();thread.start();Thread.sleep(150);//直接停止会导致只修改了name和id这两个数据中的某个数据thread.stop();}}
}
正确的做法是由我们自己决定退出时机(当达到某个条件自动退出while循环)
线程终端并不会立即退出,而是会给线程发出通知,告诉它有人希望它退出,线程可以根据该信息作出自己的行为,如果没有,中断不起作用。注意区分和中断有关的三个方法:
Thread.interrupt() 中断,并设置标志位
Thread.isInterrupted() 判断是否被中断
Thread.interrupted() 判断是否已被中断,并清除当前中断状态
在waitf方法和sleep方法中,如果产生中断,会抛出InterruptedException
中断标志位如果不清除,那么在下一次循环开始时,就无法捕捉这个中断,因此一般在中断异常中要重新设置中断标记位
- notify只会随机唤醒一个等待的线程,而notifyAll会唤醒所有等待的线程
- 调用wait或者notify方法必须先获得对象的监视器,一般用在要同步的代码中
- wait方法会释放目标对象的锁,而sleep方法不会
suspend和resume都不会释放锁,因此JDK并不推荐使用这个两个方法
suspend会暂停线程,然后通过调用resume来恢复
join是让当前线程等待join的线程执行完之后再继续执行,主要有两个API
join()
join(long mills)
在JDK中,join是通过wait来实现的,也就是当加入一个线程之后,当前线程是处于wait状态的,等待notify通知。
yield:使当前线程让出CPU,但这并不是当前线程线程不执行,它还会进行一些资源的争夺,但是否能够再次被分配到,就不一定了。可以在适当的时候调用yield方法,给予其他重要的线程更多的工作机会。
volatile能够保证变量的可见性,即变量的修改能够被其他的线程所感知。它通知虚拟机该变量可能会被频繁修改,需要添加一些措施来保证程序的正常运行。可以用volatile来修饰long型变量,来保证变量赋值的原子性。但是volatil不能保证一系列符合操作的原子性,即不能替代锁。注意的是,使用volatile来修饰long,当存在多个线程修改这个long变量是,仍存在冲突的情况。
可以新建线程的时候,传参ThreadGroup设置线程组,这样能够通过线程组查看线程组的活跃线程的个数等其他功能。
垃圾回收、JIT线程可以理解为守护线程,与之相对应的是用户线程。当一个Java应用,只要一个守护线程存在时,程序就会退出。
通过Thread.setDaemon方法来设置守护线程,注意的是设置守护线程需要在start方法之前调用
在Java中,用1到10表示线程优先级,有三个内置的静态来表示,分别代表优先级1、5、10,数字越大优先级越高
通过调用Thread.setPriority来设置优先级
synchronized能够保证线程安全。比如在对i++操作进行同步,在一个线程进行修改的时候,其他的线程不能修改,同时也不能读。synchronized的三种用法:
- 指定加锁对象:进入同步代码块需要获取该对象的锁
- 直接作用于实例方法:相当于对当前实例加锁
- 直接作用于静态方法:相当于对当前类加锁
需要注意的是在多线程之间进行加锁同步的时候,要确保加锁的对象是同一个对象
除了线程同步,保证线程安全之外,synchronized还可以保证县线程间的可见性和有序性
由于并发问题的隐蔽性导致很难定位,因此需要写代码的时候尽量避免
ArrayList是一个线程不安全的容器,在多线程中会遇到难以预见的问题,不如抛出数组越界异常以及size大小不正确等,因此推荐使用Vector
并发下的HashMap也会遇到ArrayList同样的问题,不同的是HashMap可能会造成死循环,因为HashMap内部维护了一个链表,有并发下数据结构造成破坏有可能形成环,在Put时会造成死循环,程序无法停止。
在下面的程序中,由于Integer是不可变类型,因此在自增的时候回新建一个Integer,会导致加锁失效。
public class ThreadGroupName implements Runnable {@Overridepublic void run() {String groupAndName = Thread.currentThread().getThreadGroup().getName() + "-" + Thread.currentThread().getName();while (true) {System.out.println("I am " + groupAndName);try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String args[]) {ThreadGroup tg = new ThreadGroup("PrintGroup");Thread t1 = new Thread(tg, new ThreadGroupName(), "T1");Thread t2 = new Thread(tg, new ThreadGroupName(), "T2");t1.start();t2.start();System.out.println(tg.activeCount());Thread t3 = new Thread(tg, new ThreadGroupName(), "T3");t3.start();System.out.println(tg.activeCount());tg.list();}
}
ReentrantLock不仅能实现synchronized的功能,并扩展了其他的功能。JDK5早起版本,重入锁的性能很差,从JDK6.0开始两者的性能差距并不大。
重入锁是可以反复进入的,但是这只对一个线程来说。一个线程两次获取同一把锁是允许的(调用两次lock),同时相应的unlock也需要调用两次,如果释放的次数比获取的次数多,将会抛出IllegalMonitorStateException
下面的代码采用重入锁来保护临界区资源i
public class ReenterLock implements Runnable {public static ReentrantLock lock = new ReentrantLock();public static int i = 0;@Overridepublic void run() {for (int j = 0; j < 1000000; j++) {lock.lock();try {i++;} finally {lock.unlock();}}}public static void main(String args[]) throws InterruptedException {ReenterLock reenterLock = new ReenterLock();Thread thread1 = new Thread(reenterLock);Thread thread2 = new Thread(reenterLock);thread1.start();thread2.start();thread1.join();thread2.join();System.out.println(i);}
}
可重入锁支持中断响应,也就是在程序遇到死锁的时候,可以中断线程,让线程无需等待锁。
下面的代码构造了死锁的场景,并通过interrupt方法解决了死锁。
“`java
public class IntLock implements Runnable {
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
int lock;
public IntLock(int lock) {this.lock = lock;
}@Override
public void run() {try {if (lock == 1) {//这里使用lockInterruptibly是为了能够响应中断lock1.lockInterruptibly();Thread.sleep(500);lock2.lockInterruptibly();} else {lock2.lockInterruptibly();Thread.sleep(500);lock1.lockInterruptibly();}} catch (InterruptedException e) {e.printStackTrace();} finally {if (lock1.isHeldByCurrentThread()) {lock1.unlock();}if (lock2.isHeldByCurrentThread()) {lock2.unlock();}System.out.println(Thread.currentThread().getId() + ":id");}}
public static void main(String args[]) throws InterruptedException {IntLock r1 = new IntLock(1);IntLock r2 = new IntLock(2);Thread thread1 = new Thread(r1);Thread thread2 = new Thread(r2);thread1.start();thread2.start();Thread.sleep(1000);thread2.interrupt();}
}
“`
锁申请时间等待限时:tryLock
public class TimeLock implements Runnable {public static ReentrantLock lock = new ReentrantLock();@Overridepublic void run() {try {//最多等待5s,等不到就会返回falseif (Lock(5, TimeUnit.SECONDS)) {System.out.println(Thread.currentThread().getName());System.out.println("get lock success");Thread.sleep(6000);} else {System.out.println(Thread.currentThread().getName());System.out.println("get lock failed");}} catch (InterruptedException e) {e.printStackTrace();} finally {if (lock.isHeldByCurrentThread()) {lock.unlock();}}}public static void main(String args[]) {TimeLock timeLock = new TimeLock();Thread thread1 = new Thread(timeLock);Thread thread2 = new Thread(timeLock);thread1.start();thread2.start();}
}
tryLock()也可以不带参数,一般用在循环里面,它会马上去获取锁,获取不到就会返回false,案例如下:
public class TryLock implements Runnable {public static ReentrantLock lock1 = new ReentrantLock();public static ReentrantLock lock2 = new ReentrantLock();int lock;public TryLock(int lock) {this.lock = lock;}@Overridepublic void run() {if (lock == 1) {while (true) {if (Lock()) {try {try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}if (Lock()) {try {System.out.println(Thread.currentThread().getId() + ":My Job done;");return;} finally {lock2.unlock();}}} finally {lock1.unlock();}}}} else {while (true) {if (Lock()) {try {try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}if (Lock()) {try {System.out.println(Thread.currentThread().getId() + ":My Job done;");return;} finally {lock1.unlock();}}} finally {lock2.unlock();}}}}}/*** 上面代码中采用了非常容易死锁的加锁顺序,导致thread1和thread2由于锁的竞争而互相等待从而引起死锁** 使用了tryLock后,线程不会一直等待而是不停的尝试去获得锁资源,只需要等待一定的时间,线程最终会获得所需要的资源** @param args*/public static void main(String args[]) {TryLock r1 = new TryLock(1);TryLock r2 = new TryLock(2);Thread thread1 = new Thread(r1);Thread thread2 = new Thread(r2);thread1.start();thread2.start();}}
公平锁功能:新建重入锁时可以传一个是否为公平锁的布尔参数
公平所对线程的调用按照先来后到的策略,内部维护了一个队列,因此成本比非公平锁要高一点
非公平锁有一定的随机性,并且一个线程倾向于再次获取已经持有的锁,虽然这种分配方式是高效的,但是不是公平的。
对ReentrantLock的几个总结:
- lock():获得锁,如果锁已经被占用,则等待
- lockIntertruptibly:获得锁,优先响应中断
- tryLock():尝试获得锁,如果成功,返回true,失败返回false。该方法不等待,立即返回
- tryLock(long time, TimeUnit unit):在给定时间内尝试获得锁
- unlock():释放锁
在可重入锁实现中,主要包含三个因素:
- 原子状态:使用CAS操作来存储原子状态
- 等待队列:所有没有请求到锁的线程,会进入等待队列进行等待
- 阻塞原语park()和unpark(),用来挂起和恢复线程。没有得到锁的线程将会被挂起。
Condition的使用类似于wait和notify的配合使用:
Condition提供的基本方法如下:
await()
awawitUninterruptibly() 和上面类似,不过不会响应中断
awaitNanos(long nanosTimeout)
await(long time, TimeUnit unit)
awaitUntil(Date deadline)
signal()
signalAll()
await()方法会使当前线程等待,同时释放锁,当其他线程调用sinal或者signalAll()方法,线程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。和wait方法类似
signal和notify方法类似
案例:
public class ReenterLockCondition implements Runnable {public static ReentrantLock lock = new ReentrantLock();public static Condition condition = wCondition();@Overridepublic void run() {try {lock.lock();condition.await();System.out.println("Thread is going on");} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public static void main(String args[]) throws InterruptedException {ReenterLockCondition reenterLockCondition = new ReenterLockCondition();Thread thread1 = new Thread(reenterLockCondition);thread1.start();System.out.println("睡眠2秒钟");Thread.sleep(2000);lock.lock();condition.signal();lock.unlock();}
}
无论是synchronized或者重入锁,都只能每次让一个线程访问一个资源
信号量允许多个线程访问临界资源,Semaphore的构造函数:
Semaphore(int permits)
Semaphore(int permits, boolean fair) 第二个参数指定是否公平
信号量的主要逻辑方法:
acquire() 尝试获取准入,若无法获得线程会等待,直到有线程释放一个准入或者当前线程被中断
acquireUniterruptibly() 和上面类似,但是不响应中断
tryAcquire() 尝试获取,如果成功返回true
tryAcquire(long timeout, TimeUnit unit)
release() 使用acquire申请后必须调用release,不然该信号量不能被其他人获取到
案例:
public class SemapDemo implements Runnable {final Semaphore semp = new Semaphore(5);@Overridepublic void run() {try {semp.acquire();Thread.sleep(2000);System.out.println(Thread.currentThread().getId() + ":done!");lease();} catch (InterruptedException e) {e.printStackTrace();}}/*** 总共20个线程,系统会以5个线程一组为单位,依次执行并输出** @param args*/public static void main(String args[]) {ExecutorService executorService = wFixedThreadPool(20);final SemapDemo demo = new SemapDemo();for (int i = 0; i < 20; i++) {executorService.submit(demo);}}
}
读写分离锁可以有效地帮助减少竞争。因为如果使用重入锁或者内部锁,理论上所有的读之间、写之间、读写之间都要进行锁的竞争和等待。而读写锁允许多个线程同时读,只有写写操作和读写操作间依然需要相互等待和竞争锁。
读-读:不互斥
读-写:读阻塞写,写也会阻塞读
写-写:写写阻塞
public class ReadWriteLockDemo {private static Lock lock = new ReentrantLock();private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();private static Lock readLock = adLock();private static Lock = reentrantReadWriteLock.writeLock();private int value;public Object handleRead(Lock lock) throws InterruptedException {try {lock.lock();Thread.sleep(1000);//模拟读操作System.out.println("读操作:" + value);return value;} finally {lock.unlock();}}public void handleWrite(Lock lock, int index) throws InterruptedException {try {lock.lock();Thread.sleep(1000);//模拟写操作System.out.println("写操作:" + value);value = index;} finally {lock.unlock();}}public static void main(String args[]) {final ReadWriteLockDemo demo = new ReadWriteLockDemo();Runnable readRunnable = new Runnable() {@Overridepublic void run() {//分别使用两种锁来运行,性能差别很直观的就体现出来,使用读写锁后读操作可以并行,节省了大量时间try {demo.handleRead(readLock);//demo.handleRead(lock);} catch (InterruptedException e) {e.printStackTrace();}}};Runnable writeRunnable = new Runnable() {@Overridepublic void run() {//分别使用两种锁来运行,性能差别很直观的就体现出来try {demo.handleWrite(writeLock, new Random().nextInt(100));//demo.handleWrite(lock, new Random().nextInt(100));} catch (InterruptedException e) {e.printStackTrace();}}};for (int i = 0; i < 18; i++) {new Thread(readRunnable).start();}for (int i = 18; i < 20; i++) {new Thread(writeRunnable).start();}}
}
Latch英文为门闩的意思,这里指把门锁起来,不让线程跑出来。可以类比一下火箭发射,在倒计时时,只有所有的检查都正常后才会发射,下面是一个案例:
public class CountDownLatchDemo implements Runnable {//表示需要10个线程来完成任务static final CountDownLatch end = new CountDownLatch(10);static final CountDownLatchDemo demo = new CountDownLatchDemo();@Overridepublic void run() {try {Thread.sleep(new Random().nextInt(3) * 1000);System.out.println("check complete");//倒计时减一untDown();} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String args[]) throws InterruptedException {ExecutorService executorService = wFixedThreadPool(10);for (int i = 0; i < 10; i++) {executorService.submit(demo);}//等待检查end.await();//发射火箭System.out.println("Fire!");executorService.shutdown();}
}
和CountDownLatch类似,不过功能更强大。使用场景:司令下达命令,要求10个士兵一起去完成一项任务,就会要求10个士兵先集合报道,人齐后才出发。当10个人把任务执行完了,才能宣布任务完成。案例:
await()方法会抛出两个异常一个是中断异常一个是BrokenBarrierException表示当前的CyclicBarrier已经破损了,可能系统已经没办法等到设定的线程数
public class CyclicBarrierDemo {public static class Soldier implements Runnable {private String soldier;private final CyclicBarrier cyclicBarrier;public Soldier(CyclicBarrier cyclicBarrier, String soldier) {this.soldier = licBarrier = cyclicBarrier;}@Overridepublic void run() {try {//等待10个线程cyclicBarrier.await();//工作代码doWork();//再次等待10个线程,检查工作结果cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}void doWork() {try {Thread.sleep(Math.abs(new Random().nextInt() % 10000));} catch (InterruptedException e) {e.printStackTrace();}System.out.println(soldier + ":任务完成");}}public static class BarrierRun implements Runnable {boolean flag;int N;public BarrierRun(boolean flag, int N) {this.flag = flag;this.N = N;}@Overridepublic void run() {if (flag) {System.out.println("司令:[士兵" + N + "个,任务完成!");} else {System.out.println("司令:[士兵" + N + "个,集合完毕!");flag = true;}}}public static void main(String args[]) {final int N = 10;Thread[] allSoldier = new Thread[N];boolean flag = false;CyclicBarrier cyclicBarrier = new CyclicBarrier(N, new BarrierRun(flag, N));System.out.println("集合队伍!");for (int i = 0; i < N; i++) {System.out.println("士兵" + i + "报道!");allSoldier[i] = new Thread(new Soldier(cyclicBarrier, "士兵" + i));allSoldier[i].start();}}
}
Thread的suspend和resume方法也能够实现阻塞和恢复,不过suspend和resume方法在多线程中调用顺序可能会相反,这会导致程序永远挂起的问题。而使用LockSupport不会出现该问题,它不需要获取锁,也不会抛出InterruptedException的情况。Thread的suspend和resume在这里对应park和unpark
其静态方法park()方法能够阻塞当前线程,类似的还有parkNanos等带时间等待。
park和unpark方法的顺序不会影响程序的工作,原因是这里采用了类似信号量的技术,即park消耗一个信号量,unpark生产一个信号量
另外调用suspend,线程仍是Running状态,而调用park线程是WAITING状态
public class LockSupportDemo {public static Object u = new Object();static ChangeObjectThread t1 = new ChangeObjectThread("t1");static ChangeObjectThread t2 = new ChangeObjectThread("t2");public static class ChangeObjectThread extends Thread {public ChangeObjectThread(String name) {super.setName(name);}public void run() {synchronized (u) {System.out.println("in " + getName());LockSupport.park();}}}public static void main(String args[]) throws InterruptedException {t1.start();Thread.sleep(100);t2.start();LockSupport.unpark(t1);LockSupport.unpark(t2);t1.join();t2.join();}
}
park方法还可以设置阻塞对象,在线程Dump中可以查看阻塞的对象
park能够支持中断,但是不会抛出中断异常,我们可以从Thread.interrupted()方法判断
public class LockSupportIntDemo {public static Object u = new Object();static ChangeObjectThread t1 = new ChangeObjectThread("t1");static ChangeObjectThread t2 = new ChangeObjectThread("t2");public static class ChangeObjectThread extends Thread {public ChangeObjectThread(String name) {super.setName(name);}public void run() {synchronized (u) {System.out.println("in " + getName());LockSupport.park();if (Thread.interrupted()) {}System.out.println(getName() + "被中断");}System.out.println(getName() + "继续执行");}}public static void main(String args[]) throws InterruptedException {t1.start();Thread.sleep(100);t2.start();LockSupport.unpark(t1);LockSupport.unpark(t2);t1.join();t2.join();}}
正常Thread会在run方法结束后进行自动回收。为了避免频繁的创建和销毁线程可以采用线程池技术。JDK提供了一套Excutor框架实现了线程池。
ThreadPoolExecutor表示线程池,Executors扮演者线程池工厂的角色,通过它可以获得特定功能的线程池,任何Runnable的对象都可以被ThreadPoolExecutor线程池调度。
Executor框架提供了各种线程池,主要有以下的工厂方法:
ExecutorService newFixedThreadPool(int nThreads) 固定线程数量的线程池
ExecutorService newSingleThreadExecutor() 只有一个线程的线程池,按照先入先出的顺序执行
ExecutorServcie newCacheThreadPool() 若所有线程均在工作,又有新的任务,会创建新的线程处理
ScheduledExecutorService newSingleThreadScheduledExecutor() 具有周期执行能力的线程
ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 指定线程池的数量
案例:
public class ThreadPoolDemo {public static class MyTask implements Runnable {@Overridepublic void run() {//能够获取线程ID,前五个和后五个ID一致System.out.println(System.currentTimeMillis() + "Thread ID:" + Thread.currentThread().getId());try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String args[]) {MyTask myTask = new MyTask();ExecutorService executorService = wFixedThreadPool(5);for (int i = 0; i < 20; i++) {executorService.submit(myTask);}}
}
SheduledExecutorService有下列的调度方法:
shedule(Runnable command, long delay, TimeUnit unit);
scheduleAtFixedRate(Runnable command, long initialDelay, long peroid, TimeUnit unit);
sheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); 频率一定,和TImer的中的固定频率和固定周期类似
另一个值得注意的问题是,调度程序实际上并不保证任务会无限期的持续调用,如果中间产生异常,那么后续的所有执行都会中断
Executor的工厂方法创建线程池,内部都是用了ThreadPoolExecutor类,它的构造函数:
public ThreadPoolExecutor(int corePoolSize, 线程数量int maximumPoolSize, 最大线程数量long keepAliveTime, 当线程池数量超过corePoolSize时,多余的空闲线程的存活时间TimeUnit unit, 时间单位BlockingQueue<Runnable> workQueue, 任务队列,被提交但尚未执行当然任务ThreadFactory threadFactory,线程工厂RejectedExecutionHandler handler 当任务太多来不及处理,如何拒绝任务)
其中BlockingQueue用于存放Runnable对象,有一下几种BlockingQueue
直接提交队列:实际是SynchronousQueue,它是一种特殊的BlockingQueue,没有容量,每一个插入都要等待一个删除,反之,每一个删除都要等待对应的插入操作。因此提交的任务没有真的被保存,而总是将新任务提交给线程执行,如果没有空闲的线程,则尝试创建新的线程,如果进程数量已经达到最大值,则执行拒绝策略。因此SynchronousQueue通常要设置很大的maximumPoolSize。(该队列倾向于创建更多的线程)
有界的任务队列:使用LinkedBlockingQueue实现,必须带有一个容量的构造函数,表示队列的最大容量。若实际线程池大于corePoolSieze,则会将新任务加入等待队列。若等待队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的进程执行任务。若大于,则执行拒绝策略。因此有界队列在当任务队列满时,才可能将线程数提升到corePoolSize以上。
优先任务队列:通过PriorityBlockingQueue实现,可以控制任务的执行先后顺序,它是一个特殊的无界队列。前两种队列都是先入先出的。
对于newCachedThreadPool(),如果同时有大量任务被提交,而任务的执行又不那么快时,那么系统会开启等量的线程处理,这种做法会很快耗尽系统的资源。
ThreadPoolExecutor的调度逻辑:
任务提交—>小于coreSize:分配线程处理;大于coreSize:提交到等待队列—>成功:执行成功;失败提交线程池—>达到max,提交失败:拒绝执行;提交成功:分配线程执行
JDK内置四种决绝策略:
- AbortPolicy:直接抛出异常,阻止继续工作
- CallerRunsPolicy:直接在调用者线程中运行当前被丢弃的任务,会影响任务提交的性能
- DiscardOledestPolciy:丢弃一个老的请求,也就是即将被执行的一个任务,并尝试再次提交当前任务
- DiscardPolicy:默默丢弃无法处理的任务,不与任何处理。
以上策略均实现了RejectedExecutionHandler接口
下面是自定义线程池和决绝策略的使用:
public class RejectThreadPoolDemo {public static class MyTask implements Runnable {@Overridepublic void run() {System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String args[]) throws InterruptedException {MyTask myTask = new MyTask();ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(10), Executors.defaultThreadFactory(), new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.String() + " is discard");}});for (int i = 0; i < 100; i++) {executorService.submit(myTask);Thread.sleep(10);}}
}
自定义线程池可以跟踪线程池究竟创建了多少线程,也可以自定义线程的名称、组以及优先级信息
public class CustomizedThreadFactory {public static class MyTask implements Runnable {@Overridepublic void run() {System.out.println(System.currentTimeMillis() + "Thread ID:" + Thread.currentThread().getId());try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) throws InterruptedException {MyTask myTask = new MyTask();ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setDaemon(true);System.out.println("create " + t);return t;}});for (int i = 0; i < 5; i++) {es.submit(myTask);}Thread.sleep(2000);}
}
ThreadPoolExecutor也是一个可扩展的线程池。它提供了beforeExecute(),afterExecute(),terminated()三个接口对线程池进行控制
ThreadPoolExecutor.Worker是ThreadPoolExecutor的内部类,它是一个实现了Runnable接口的类。ThreadPoolExecutor线程池中的工作线程也正是Worker实例。Worker.runTask()方法会被线程池以多线程模式异步调用。下面是一个使用案例:
public class ExtThreadPool {public static class MyTask implements Runnable {public String name;public MyTask(String name) {this.name = name;}@Overridepublic void run() {System.out.println("正在执行:Thread ID:" + Thread.currentThread().getId() + ",Task Name:" + name);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String args[]) throws InterruptedException {ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>()) {protected void beforeExecute(Thread t, Runnable r) {System.out.println("准备执行:" + ((MyTask) r).name);}protected void afterExecute(Thread t, Runnable r) {System.out.println("执行完成:" + ((MyTask) r).name);}protected void terminated() {System.out.println("线程池退出!");}};for (int i = 0; i < 5; i++) {MyTask task = new MyTask("TASK-GEYM-" + i);//这里使用了ute(task);Thread.sleep(10);}//shutdown关闭线程池,这是一个比较安全的方法,不会立即暴力终止所有的任务,只是线程池不能接受新的任务了executorService.shutdown();}
}
主要有三个区别:
- 接收的参数不一样:submit能接收Callable和Runnable对象,而execute只能接收Runnable对象
- submit有返回值,execute没有
- submit能够获取异常的信息(通过get)
通过submit提交的线程任务会吞掉异常信息,从而产生莫名其妙的结果,可以使用下面的方法打印出异常堆栈:
用ute(new MyTask)来代替:pools.submit(new MyTask()),或者使用:
Future re = pools.submit(new MyTask())
re.get();
然而即使得到了异常堆栈信息,也拿不到任务提交有关的堆栈信息,只能获取从Thread.run开始的线程运行的堆栈信息。这是就需要提前保存一下堆栈信息:
public class TraceThreadPoolExecutor extends ThreadPoolExecutor {public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}public void execute(Runnable task) {ute(wrap(task, clientTrace(), Thread.currentThread().getName()));}private Runnable wrap(final Runnable task, final Exception clientTrace, String name) {return new Runnable() {@Overridepublic void run() {try {task.run();} catch (Exception e) {clientTrace.printStackTrace();throw e;}}};}private Exception clientTrace() {return new Exception("Client stack trace");}
}
Fork/Join框架是一种分而治之的概念,能够分解任务,并最后收集这些任务的结果。fork是创建子线程的意思,join是等待子线程运行。JDK有一个ForkJoin框架,能够接受并分解任务,内有每一个线程有一个任务队列,如果线程A已经执行完,而线程B还有任务,线程A会帮助线程B执行,并且拿任务的时候线程A是从队列的底部拿,而A是从顶部拿。API:
ForkJoinTask submit(ForkJoinTask task)
ForkJoinTask有两个重要的子类:RecursiveaAction和RecursiveTask,前者无返回值,后者有返回值。下面是一个计算数列和的例子:
public class CountTask extends RecursiveTask {private static final int THRESHOLD = 10000;private long start;private long end;public CountTask(long start, long end) {this.start = d = end;}//该方法可以递归调用@Overrideprotected Long compute() {long sum = 0;boolean canCompute = (end - start) < THRESHOLD;if (canCompute) {for (long i = start; i <= end; i++) {sum += i;}} else {long step = (start + end) / 100;ArrayList<CountTask> subTasks = new ArrayList<CountTask>();long pos = start;for (int i = 0; i < 100; i++) {long lastOne = pos + step;if (lastOne > end) {lastOne = end;}CountTask subTask = new CountTask(pos, lastOne);pos += step + 1;subTasks.add(subTask);subTask.fork();}for (CountTask t : subTasks) {//等待子任务sum += (Long) t.join();}}return sum;}public static void main(String args[]) {ForkJoinPool forkJoinPool = new ForkJoinPool();CountTask task = new CountTask(0, 200000L);ForkJoinTask<Long> result = forkJoinPool.submit(task);long res = 0;try {//等待返回结果res = ();System.out.println("sum=" + res);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}
需要注意的是,如果子任务划分的过细,或者层次过深会导致堆栈溢出异常,虽然ForkJoin框架内部也缓存了空闲线程堆栈以备其他任务使用。
Vector是线程安全的,但是效率不高,ArrayList和LinkedList都不是线程安全的
总体简介:
CocurrentHashMap:高效的并发HashMap,是线程安全的
CopyOnWriteArrayList:用在读多写少的情况下
ConcurrentLinkedQueue:高效的并发队列,使用链表实现,可以看成线程安全的LinkedList
BlockingQueue:这是一个接口,JDK内部通过链表、数组等方式实现了该接口,表示阻塞队列
CocurrentSkipListMap:跳表,使用跳表的数据结构进行快速查找
另外,Collections工具类可以帮助我们将任意集合包装成线程安全的类
Collections.synchronizedMap(new HashMap) 能够生成线程安全的HashMap:SynchronizedMap
内部通过synchronized一个mutex对象进行同步,但是在并发量比较大的时候不推荐,可以用CocurrentHashMap代理
和上面同样,使用Collections.synchronizedList能够得到线程安全的List
是高并发是性能最好的队列,内部使用了CAS来设置当前节点和下一个节点值。它允许运行时链表处于多个不同的状态,即当前的链表状态不是最实时的状态。 实现相对较为复杂
在某些场景下,读操作远远多于写操作,因此我们希望读操作尽量快,而写操作即使慢一些也没关系。
JDK提供了CopyOnWriteArrayList类,读取不用加锁,读和写也不会等待,只有写和写会等待
写的时候会先写副本,在写回。
这是一个接口,主要有两个实现:ArrayBlockingQueue和LinkedBlockingQueue,之所以能够作为数据共享通道,是在于Blocking,有两个重要的方法put和take,对于put来说如果队列满了,它会等待notFull的通知,对于take来说,如果队列为空,它会等待notEmpty的通知,其实现是通过重入锁的await和signal来实现。
跳表是拿空间换时间的一个List,它和平衡树的复杂度类似,使用跳表实现的Map是有序的,内部是通过CAS实现
在高并发情况下,激烈的锁的竞争会导致性能下降,所以需要在实际使用中需要关注锁的优化
同步的代码范围应该保证尽量下,只在必要的时候进行同步,这样就能明显减少锁的持有时间,提高系统的吞吐量
减少锁粒度也是一种削弱多线程锁竞争的有效手段。比如ConcurrentHashMap中的实现中,最重要的的两个方法是get和put,一种自然的想法就是对整个HashMap进行同步,但是这样效率不高,因此内部对HashMap进行了分段,默认情况下是16个段,也就是16个HashMap,从而大量减少锁的竞争。但是引入锁颗粒度会有一个问题,当需要获取全局锁的时候,需要获取每个Segment的锁,从而导致资源消耗较多,比如求ConcurrentHashMap的size。
比如之前讨论的ReadWriteLock,允许多个线程同时读
一个典型的案例就是LikedBlockingQueue类,其中的take实现了从队列中获取数据,put实现了从队列中增加数据的功能,由于两者的操作分别作用于队列的前端后尾端,理论上两者是不冲突的,一次将一个锁拆成了两个锁,使take和take之间竞争,put和put之间竞争。
这个是和减少锁持有的时间相反的优化,两者需要一个trade-off。也就是当虚拟机遇到一连串连续地对同一锁不断进行请求和释放的操作时,便会把所有的锁整合成锁的一次请求。
针对加锁操作的优化手段,核心思想是: 如果一个线程获得了锁。那么锁就进入偏向模式。当这个线程再次请求锁时,无需再做任何同步操作。对于几乎没有锁竞争的场合,偏向锁有比较好的优化结果,因为连续多次连续可能是同一个线程请求相同的锁。但是对于在竞争激烈的场合,偏向锁效率不高。
如果偏向锁请求失败,虚拟机并不会立即挂起线程,他还会使用一种轻量级锁的优化手段。如果轻量级锁竞争失败,那么它会膨胀为重量级锁。
锁膨胀后,虚拟机为了避免虚拟机真实地在操作系统层面挂起,虚拟机还会在做最后的努力-自旋锁。系统会进行一次赌注:它会假设在不久的将来,线程可以获取这把锁,因此虚拟机会让当前线程做几个空循环,如果得到锁,会进入临界区。
这是针对不当使用某些线程安全的类的情况,比如非并发情况下使用Vector类等,虚拟机会进行优化,清除不必要的锁。那么现在有个问题,怎么判断这些类可以不用加锁?一般如果变量的作用域仅限于方法内部,就不用进行加锁,因为这是线程栈上的私有变量,不能够被其他线程访问到。这种分析方法也称为逃逸分析。
ThreadLocal是存储线程的局部变量,只有当前线程才能够访问。
Thread内部有一个变量是threadLocals:
ThreadLocal.ThreadLocalMap threadLocals
其类型是ThreadLocalMap,注意的是ThreadLocalMap并不是一个Map,它是一个内部类,部分代码如下,可以看到,映射的数据是存在Entry中的,key就为ThreadLocal本身,value是要存的对象:
static class Entry extends WeakReference<ThreadLocal<?>> {/** The value associated with this ThreadLocal. */Object value;Entry(ThreadLocal<?> k, Object v) {super(k);value = v;}
}
通过ThreadLocal的set和get方法能够设置和获取ThreadLocalMap。
在Thread要退出的时候,会将threadLocals置为null,这样会清理ThreadLocalMap,但是由于目前很多线程都是采用线程池的方式,线程不会退出,因此会存在内存泄漏的问题(这里的含义是:存的对象已经没人用了,但是还存在ThreadLocalMap中)。如果希望及时回收对象,可以调用:
来清理数据。
同时为了避免这种内存泄漏的问题,ThreadLocalMap中使用了WeakReference,即弱引用,相对于强引用,弱引用在GC的时候会被回收。数据引用关系如下:
ThreadLocal—>ThreadLocalMap—>Entry—>(key,value)
当ThreadLocal被置为null时,强引用消失,此时弱引用在GC的是就很有可能会被回收。
内存泄漏问题
由于Thread采用线程池的关系,当前线程不会退出,因此内部的变量threadLocals不会被置为null(线程退出的时候会显式将该变量置为null)也就是不会对Entry进行内存回收,由于ThreadLocal变量一般定义在业务侧,代码执行完了,这些ThreadLocal变量会变成null,强引用消失。由于key定义的是弱引用,在GC的时候会回收,导致Entry的key为null,这样会导致永远取不到value对象,导致内存泄漏。虽然ThreadLocal在get和set的时候会清除key为null的entry,但是也有可能是在get和set之后才产生的GC,因此还是有内存泄漏的风险。
对于并发控制而言,锁是一种悲观策略,它总是假设会有冲突。无锁的策略一般使用的是一种叫做比较交换的技术(CAS)
基本的策略方法如下:
CAS(V,E,N)
其中V是需要跟新的变量,E是期望值,N是新值
当V值和E值相等才会更新,不相等继续做spin。在硬件层面,现在大多数CAS都是原子操作。
为了让JAVA程序员能够受益CAS等CPU指令,JDK并发包中有一个atomic包,里面实现了一些直接使用CAS操作的线程安全的类型,其中比较常用的就是AtomicInteger、AtomicLong、AtomicBoolean、AtomicReference等
比如对于AtomicInteger来说,incrementAndGet调用了compareAndSet方法,有调用了native类Unsafe的compareAndSwapInt方法来实现CAS。Unsafe类是java封装的操作指针的方法,通过AtomicInteger类中定义好的偏移量来获取和设置内部定义的int型字段值。Unsafe采用了工厂模式,除了jdk内部,我们自己不能使用(会抛异常)。前面讨论的是JDK1.7的实现,对于JDK1.8来说实现又不相同。
AtomicReference和AtomicInteger类似,不同之处是AtomicInteger是对整数的封装,而AtomicReference是对引用的封装,也就是保证在修改对象引用时的线程安全性,注意是引用,因此一般会用在不可变类上。
需要注意的是,如果CAS中实际值经过两次修改又和期望值一致时,CAS并不能识别数据已经修改,这在某些情况下会影响程序的正确性,使用AtomicReference也会遇到这种情况,JDK提供了AtomicStampedReference类来解决这个问题
AtomicStampedReference不仅维护了对象的值,也维护了对象的时间戳,因此需要对象值和时间戳都要满足期望值才能更新
JDK提供了一些可用的原子数组:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray,分别表示整数数组、long型数组和普通的对象数组
根据数据类型的不同,这种原子操作的Updater有三种:AtomicIntegerFieldUpdater, AtomicLongFieldUpdater,AtomicReferenceFieldUpdater。常用在将现有代码中某些变量的操作优化为线程安全的。
public class AtomicIntegerFieldUpdaterDemo {public static class Candidate {int id;volatile int score;}public final static AtomicIntegerFieldUpdater<Candidate> scoreUpdater = wUpdater(Candidate.class, "score");//检查Updater是否工作正确public static AtomicInteger allScore = new AtomicInteger(0);public static void main(String args[]) throws InterruptedException {final Candidate stu = new Candidate();Thread[] t = new Thread[10000];for (int i = 0; i < 10000; i++) {t[i] = new Thread() {public void run() {if (Math.random() > 0.4) {scoreUpdater.incrementAndGet(stu);allScore.incrementAndGet();}}};t[i].start();}for (int i = 0; i < 10000; i++) {t[i].join();}//正常运行后,下面两个结果相同System.out.println("score=" + stu.score);System.out.println("allScore=" + allScore);}
}
需要注意的是:
Updater只能修改它可见范围内的变量,因为Updater是反射获取的,如果上面讲score设置为private就不可行
为了确保变量被正确读取,它必须是volatile类型的
CAS操作会通过对象实例中的偏移量直接进行赋值,因此,它不支持static字段
SynchronousQueue是一个特殊的等待队列,当让容量为0时,任何的写需要等待一个队SynchronousQueue的读,反之亦然。因此SynchronousQueue与其说是一个队列,不如说是一个数据交换通道。
下面是模拟哲学家吃饭的死锁问题
public class DeadLock extends Thread {protected Object tool;static Object fork1 = new Object();static Object fork2 = new Object();public DeadLock(Object object) {l = object;if (tool == fork1) {this.setName("哲学家A");}if (tool == fork2) {this.setName("哲学家B");}}public void run() {if (tool == fork1) {synchronized (fork1) {try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}synchronized (fork2) {System.out.println("哲学家A开始吃饭了");}}}if (tool == fork2) {synchronized (fork2) {try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}synchronized (fork1) {System.out.println("哲学家B开始吃饭了");}}}}public static void main(String args[]) throws InterruptedException {DeadLock A = new DeadLock(fork1);DeadLock B = new DeadLock(fork2);A.start();B.start();Thread.sleep(1000);}
}
遇到这种情况通常相关的进程就不在工作,并且CPU利用率为0,这时可以用jstack查看进程内所有线程的堆栈,可以查看死锁的线程。
如何避免死锁:
1.考虑使用无锁的函数
2.使用之前讨论的重入锁,通过重入锁的中断和限时有效规避死锁的问题。
单例模式的优点:
- 对于频繁使用的对象,可以省略new操作花费的时间
- 由于new的次数减少,也能减少GC的时间
效率最高的单例:
public class Singleton{private Singleton(){System.out.println("Singleton is create");}private static Singleton instance = new Singleton();public static Singleton getInstance(){return instance;}
}
但是这里会有一个问题,任何访问Singleton的静态成员和方法都会初始化这个instance,因此不能控制创建时间。这是可以采用懒汉模式:
public class LazySingleton{private LazySingleton(){System.out.println("LazySingleton is create");}private static LazySingleton instance = null;public static synchronized LazySingleton getInstance(){if(instance==null){instance = new LazySingleton();}return instance;}
}
这里加锁是为了避免重复创建,还有一个结合两者优势的单例模式,既能够懒加载也避免加锁:
public class StaticSingleton{private StaticSingleton(){System.out.println("StaticSingleton is create");}private static class SingletonHolder{private static StaticSingleton instance = new StaticSingleton();}public static StaticSingleton getInstance(){return SingletonHolder.instance;}
}
依靠不变模式,可以确保其在没有同步操作的多线程环境中依然始终保持内部状态的一致性和正确性。在不变模式中,final起到了关键作用
生产者-消费者模式是一个经典的多线程模式,生产者和消费者是通过共享内存缓冲区进行通信,共享内存缓冲区也是该模式的核心所在,下面是一个基于该模式的求整数平方的并行程序:
public class Producer implements Runnable{private volatile boolean isRunning = true;private BlockingQueue<PCData> queue;private static AtomicInteger count = new AtomicInteger();private static final int SLEEPTIME = 1000;public Producer(BlockingQueue<PCData> queue) {this.queue = queue;}@Overridepublic void run() {PCData data = null;Random r = new Random();System.out.println("start producer");try{while (isRunning) {Thread.Int(SLEEPTIME));data = new PCData(count.incrementAndGet());System.out.println("data: " + data);if (!queue.offer(data, 2, TimeUnit.SECONDS)) {System.out.println("failed to put data:" + data);}}} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}}public void stop() {isRunning = false;}
}
public class Consumer implements Runnable {private BlockingQueue<PCData> queue;private static final int SLEEPTIME = 1000;public Consumer(BlockingQueue<PCData> queue) {this.queue = queue;}@Overridepublic void run() {System.out.println("start consumer:" + Thread.currentThread().getId());Random r = new Random();try{while (true) {PCData data = new PCData(1);if (data != null) {int re = IntData() * IntData();System.out.println(MessageFormat.format("{0}*{1}={2}", IntData(), IntData()));Thread.Int(SLEEPTIME));}}}catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}}public static void main(String[] args) throws InterruptedException {BlockingQueue<PCData> queue = new LinkedBlockingDeque<>(0);Producer producer1 = new Producer(queue);Producer producer2 = new Producer(queue);Producer producer3 = new Producer(queue);Consumer consumer1 = new Consumer(queue);Consumer consumer2 = new Consumer(queue);Consumer consumer3 = new Consumer(queue);ExecutorService service = wCachedThreadPool();ute(producer1);ute(producer2);ute(producer3);ute(consumer1);ute(consumer2);ute(consumer3);Thread.sleep(10 * 1000);producer1.stop();producer2.stop();producer3.stop();Thread.sleep(3000);service.shutdown();}
}
上面是采用BlockingQueue实现的,但是由于BlockingQueue采用了锁和阻塞来实现同步,在高并发的场合,性能不是很优越。如果采用无锁的CAS来实现生产者和消费者模式将会更高效,而Disruptor框架就是采用这样的实现的框架。下面一个案例:
数据类和上面的类似,首先是消费者:
//对数据进行处理,onEvent是框架的回调方法
public class Consumer implements WorkHandler<PCData> {@Overridepublic void onEvent(PCData pcData) throws Exception {System.out.println(Thread.currentThread().getId() + ":Event:--" + Value() * Value() + "--");}
}
生产者:
public class Producer {//框架提供的环形队列,能够快速定位数据,另外由于采用环形可以进行内存复用private final RingBuffer<PCData> ringBuffer;public Producer(RingBuffer<PCData> ringBuffer) {this.ringBuffer = ringBuffer;}public void pushData(ByteBuffer byteBuffer) {long sequence = ();try{//获取可以插入的数据,并复制PCData event = (sequence);event.Long(0));}finally {ringBuffer.publish(sequence);}}public static void main(String[] args) throws InterruptedException {Executor executor = wCachedThreadPool();//PCData的工厂类 PCDataFactory factory = new PCDataFactory();//缓冲区大小为2的整数次幂,方面定位数据位置int bufferSize = 1024;Disruptor<PCData> disruptor = new Disruptor<PCData>(factory, bufferSize, executor, ProducerType.MULTI, new BlockingWaitStrategy());disruptor.handleEventsWithWorkerPool(new Consumer(), new Consumer(), new Consumer(), new Consumer());disruptor.start();RingBuffer<PCData> ringBuffer = RingBuffer();Producer producer = new Producer(ringBuffer);ByteBuffer bb = ByteBuffer.allocate(8);for(long l=0;true;l++) {bb.putLong(0, 1);producer.pushData(bb);Thread.sleep(100);System.out.println("add data" + 1);}}
}
当缓冲区有新数据产生时,消费者是如何知道有数据的了,这就是上面构造Disruptor时传入的BlockingWaitStrategy()起的作用,该类实现了WaitStrategy接口,它有这个几个实现类:
- BlockingWaitStrategy:这是默认的策略,内部使用了锁和Condition对数据进行了监控,这种策略最节省CPU,但是并发性能也差
- SleepingWaitStrategy:这个策略同样不会消耗多少CPU,它会在循环中不断等待数据。首先会进行自旋,如果不成功,会使用Thread.yield()让出CPU,并最终使用LockSupport.parkNanos(1)进行线程休眠,以确保不太占用CPU数据
- YieldingWaitStrategy:这个策略适合低时延的场合。消费者线程会不断循环监控缓冲区变化,在循环内部,会使用Thread.yield()让出CPU给别的线程执行时间,因此,最好有多于消费者线程数量的逻辑CPU数量。
- BusySpinWaitStrategy:这个是最疯狂的等待策略。它就是一个死循环!消费者线程会尽最大努力疯狂监控缓冲区变化,因此会吃掉所有CPU的资源,一般需要物理CPU数量必须大于消费者线程数。
Disruptor不仅应用了CAS进行优化,也尝试解决CPU缓存的伪共享问题。我们知道CPU有一个告诉缓存Cache,读写单位为行,它是从主从复制到缓存的最小单位。
如果两个CPU的Cache某行都缓存了x和y的变量,如果CPU1更新了x,它会导致CPU2上的缓存行失效,同理CPU2上的更新y也会导致CPU1上的缓存行失效,缓存行失效会导致缓存未命中,这会导致系统的吞吐量下降。优化的方法就是在变量的前后都填充一些数据,从而使一个变量一个缓存行,这样就不会受到其他变量更新的影响。下面是一个实现的案例:
public class FalseSharing implements Runnable {//和CPU的个数一致public final static int NUM_THREADS = 2;public final static long ITERATIONS = 500L * 1000L * 1000L;private final int arrayIndex;private static VolatileLong[] longs = new VolatileLong[NUM_THREADS];static {for (int i = 0; i < longs.length; i++) {longs[i] = new VolatileLong();}}public FalseSharing(final int arrayIndex) {this.arrayIndex = arrayIndex;}public static void main(String[] args) throws InterruptedException {final long start = System.currentTimeMillis();runTest();System.out.println("duration:" + (System.currentTimeMillis() - start));}private static void runTest() throws InterruptedException {Thread[] threads = new Thread[NUM_THREADS];for (int i = 0; i < threads.length; i++) {threads[i] = new Thread(new FalseSharing(i));}for (Thread t : threads) {t.start();}for (Thread t : threads) {t.join();}}@Overridepublic void run() {long i = ITERATIONS + 1;//多线程在不停的修改value值while (0 != --i) {//每个线程的arrayIndex不一致longs[arrayIndex].value = i;}}public final static class VolatileLong{//value才是真正存的数据,其他用来填充的public volatile long value = 0L;public long p1,p2,p3,p4,p5,p6, p7;}
}
Future模式就是一个异步调用的设计模式,异步调用的优点不多赘述。下面介绍一下Future模式的主要角色:
- Client:能开开启ClientThead装配RealData,能立即返回futureData对象
- RealDData:实现了Data接口,真是数据,其构造是比较慢的
- FutureData:同样实现了Data接口,是虚拟的数据,构造很快、
下面是Future模式实现的一个案例:
首先是Data接口:
public interface Data {public String getResult();
}
接着是FutureData的实现:
public class FutureData implements Data {protected RealData realData = null;protected boolean isReady = false;public synchronized void setRealData(RealData realData) {if (isReady) {return;}alData = realData;isReady = true;notifyAll();//RealData已经被注入,通知getResult}//在没有ready的时候调用该方法会一致等待,直到有数据@Overridepublic synchronized String getResult() {while (!isReady) {try {wait();} catch (InterruptedException e) {e.printStackTrace();}}sult;}
}
接着是RealData的实现:
public class RealData implements Data {protected final String result;public RealData(String param) {//RealData的构造很慢,需要用户等待很久,这里用sleep模拟StringBuffer stringBuffer = new StringBuffer();for (int i = 0; i < 10; i++) {stringBuffer.append(param);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}result = String();}@Overridepublic String getResult() {return result;}
}
Client的实现:
public class Client {public Data request(final String queryStr) {final FutureData futureData = new FutureData();new Thread() {public void run() {//RealData的构造很慢,所以在单独的线程中运行RealData realData = new RealData(queryStr);futureData.setRealData(realData);}}.start();return futureData;}
}
Main:
public class Main {public static void main(String args[]) {Client client = new Client();//这里会立即返回,因为得到的是FutureData而不是RealDataData data = quest("name");System.out.println("请求完毕");try {//这里用一个sleep代替了对其他业务逻辑的处理//在处理这些业务逻辑的过程中,RealData被创建,从而充分利用了等待时间Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}//使用真实的数据System.out.println("数据=" + Result());}
}
JDK中的Future模式要比上面的实现要复杂的多,主要组成:
- FutureTask实现了RunnableFuture接口,RunnableFuture继承了Future和Runnable接口
- Runnable的run方法用于构造数据
- RutureTask有一个Sync类,做一些实质性的工作,会委托Sync类实现,Sync类会调用Callable接口,完成实际数据的组装工作。
下面是一个案例:
public class FutureMain {public static void main(String args[]) throws ExecutionException, InterruptedException {//构造FutureTaskFutureTask<String> futureTask = new FutureTask<String>(new RealData2("a"));ExecutorService executorService = wFixedThreadPool(1);//执行FutureTask,相当于前一个例子中的quest("a")发送请求//在这里开启线程进行RealData的call()执行executorService.submit(futureTask);System.out.println("请求完毕");try {//这里依然可以做额外的数据操作,使用sleep代替其他业务逻辑的处理Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}//相当于前一个例子中的Result(),取得call()方法的返回值//如果此时call()方法没有执行完成,则依然会等待System.out.println("数据=" + ());}
}
RealData的实现,用来构造数据:
public class RealData2 implements Callable<String> {private String data;public RealData2(String data) {this.data = data;}@Overridepublic String call() throws Exception {StringBuffer stringBuffer = new StringBuffer();for (int i = 0; i < 10; i++) {stringBuffer.append(data);Thread.sleep(100);}String();}
}
并行算法能够充分发挥多核CPU的性能,但是,并非所有的计算都可以改造成并发的方式,比如计算(B+C)*B/2。但是对于大量这种计算,完全可以改成并行流水线方式,充分利用所有的CPU,下面是一个案例:
首先我们需要一个在线程间携带结果进行信息交换的载体:
public class Msg {public double i;public double j;public String orgStr = null;
}
计算加法的流水线:
public class Plus implements Runnable {public static BlockingDeque<Msg> blockingDeque = new LinkedBlockingDeque<Msg>();@Overridepublic void run() {while (true) {Msg msg = null;try {msg = blockingDeque.take();msg.i = msg.j + msg.i;Multiply.blockingDeque.add(msg);} catch (InterruptedException e) {e.printStackTrace();}}}
}
计算乘法的流水线:
public class Multiply implements Runnable {public static BlockingDeque<Msg> blockingDeque = new LinkedBlockingDeque<Msg>();@Overridepublic void run() {while (true) {Msg msg = null;try {msg = blockingDeque.take();msg.i = msg.j * msg.i;Div.blockingDeque.add(msg);} catch (InterruptedException e) {e.printStackTrace();}}}
}
计算出发的流水线
public class Div implements Runnable {public static BlockingDeque<Msg> blockingDeque = new LinkedBlockingDeque<Msg>();@Overridepublic void run() {while (true) {Msg msg = null;try {msg = blockingDeque.take();msg.i = msg.i / 2;System.out.println(msgStr + "=" + msg.i);} catch (InterruptedException e) {e.printStackTrace();}}}
}
最后是主线程:
public class PStreamMain {public static void main(String args[]) {new Thread(new Plus()).start();new Thread(new Multiply()).start();new Thread(new Div()).start();for (int i = 0; i <= 1000; i++) {for (int j = 0; j <= 1000; j++) {Msg msg = new Msg();msg.i = i;msg.j = j;msgStr = "((" + i + "+" + j + ")*" + i + ")/2";//给加操作队列添加数据Plus.blockingDeque.add(msg);}}}
}
并行搜索的思想上就是通过多线程分段搜索从而提高查找效率,下面是一个案例:
public class ParallelSearch {static int[] arr;static ExecutorService pool = wCachedThreadPool();static final int Thread_Num = 2;static AtomicInteger result = new AtomicInteger(-1);//搜索的实现public static int search(int searchValue, int beginPos, int endpos) {int i = 0;for (int j = beginPos; j < endpos; j++) {if (() >= 0) {();}if (arr[i] == searchValue) {if (!resultpareAndSet(-1, i)) {();}}}return -1;}//搜索任务static class SearchTask implements Callable<Integer> {int begin,end,searchValue;public SearchTask(int begin, int end, int searchValue) {this.begin = d = end;this.searchValue = searchValue;}@Overridepublic Integer call() throws Exception {int re = search(searchValue, begin, end);return re;}}//任务分摊以及结果回收public static int pSearche(int searcheValue) throws ExecutionException, InterruptedException {int subArrSize = arr.length / Thread_Num + 1;List<Future<Integer>> re = new ArrayList<>();for (int i = 0; i < arr.length; i += subArrSize) {int end = i + subArrSize;if (end >= arr.length) {end = arr.length;}re.add(pool.submit(new SearchTask(searcheValue, i, end)));}for (Future<Integer> fu : re) {if (fu.get() >= 0) {();}}return -1;}
}
常见的排序,如冒泡排序、希尔排序都是串行的,因为排序的过程中涉及到数据交换,而这些交换跟数据的位置有关,不能串行化。不过,可以采用一些特殊的算法可以让排序并行化。
比如对于冒泡排序,我们可以将其拆分成奇排序和偶排序,其并行程序如下:
public class POddEvenSort {static int exchFlag = 1;static synchronized void setExchFlag(int v) {exchFlag = v;}static synchronized int getExchFlag() {return exchFlag;}//具体去实施的任务public static class OddEvenSortTask implements Runnable {int i;CountDownLatch latch;int arr[];public OddEvenSortTask(int i, CountDownLatch latch, int[] arr) {this.i = i;this.latch = latch;this.arr = arr;}@Overridepublic void run() {if (arr[i] > arr[i + 1]) {int temp = arr[i];arr[i] = arr[i + 1];arr[i + 1] = temp;setExchFlag(1);}untDown();}}public static void pOddEvenSort(int[] arr) throws InterruptedException {int start = 0;ExecutorService pool = wCachedThreadPool();//start为0是偶交换,为1是奇交换//只要发生了交换就要继续下去,只有没发生交换且当前是偶交换才停止while (getExchFlag() == 1 || start == 1) {setExchFlag(0);//start=1时,只有len/2-1个线程CountDownLatch latch = new CountDownLatch(arr.length / 2 - (arr.length % 2 == 0 ? start : 0));for (int i = start; i < arr.length - 1; i += 2) {//比较交换的工作让线程去做pool.submit(new OddEvenSortTask(i, latch,arr));}latch.wait();if (start == 0) {start = 1;} else {start = 0;}}}
}
如奇偶排序类似的是,希尔排序也是等间隔交换的,因此也可以进行并行化
我们知道矩阵的乘法是可以分块的,因此可以应用并行计算
对于标准的网络IO来说,我们会使用Socket进行网络的读写。为了让服务器可以支撑更多的客户端连接,通常的做法是为每一个客户端连接开启一个线程。
这里以Echo服务器为例,它会读取客户端的一个输入,并将这个输入原封不动的返回给客户端。下面是代码的实现:
public class MutiThreadEchoServer {private static ExecutorService tp = wCachedThreadPool();//处理请求任务static class HandleMsg implements Runnable {Socket clientSocket;public HandleMsg(Socket clientSocket) {this.clientSocket = clientSocket;}@Overridepublic void run() {BufferedReader is = null;PrintWriter os = null;try {//读输入并写输出is = new BufferedReader(new InputStream()));os = new OutputStream(), true);String input = null;long b = System.currentTimeMillis();while ((input = is.readLine()) != null) {os.print(input);}long e = System.currentTimeMillis();System.out.println("spend:" + (e - b) + "ms");} catch (IOException e) {e.printStackTrace();}finally {try {if (is != null) {is.close();}if (os != null) {os.close();}clientSocket.close();} catch (IOException e) {e.printStackTrace();}}}}public static void main(String[] args) {ServerSocket echoServer = null;Socket clientSocket = null;try {echoServer = new ServerSocket(8000);} catch (IOException e) {e.printStackTrace();}while (true) {try {clientSocket = echoServer.accept();System.out.RemoteSocketAddress() + " connect!");tp.execute(new HandleMsg(clientSocket));} catch (IOException e) {e.printStackTrace();}}}
}
客户端的实现:
public class EchoClient {public static void main(String[] args) throws IOException {Socket client = null;PrintWriter writer = null;BufferedReader reader = null;try {client = new Socket();t(new InetSocketAddress("localhost", 8000));writer = new OutputStream(), true);writer.print("Hello!");writer.flush();reader = new BufferedReader(new InputStream()));System.out.println("from server:" + adLine());} catch (IOException e) {e.printStackTrace();}finally {if (writer != null) {writer.close();}if (reader != null) {reader.close();}if (client != null) {client.close();}}}
}
上面的交互方式如果输入的IO操作非常缓慢会极大的占用CPU资源,在高并发的场合并不适用,因此一般采用NIO编程
NIO的原理不多描述,这里说一下NIO的几个关键组成部分:
- Channel:Channel类似于流,一个Channel可以和文件和网络Socket对应,此时往这个Channel中写数据,就等同于向Socket中写入数据。
- Buffer:可以理解成一个内存区域或者byte数组。数据需要包装成Buffer的形式才能和Channel交互(写入或者读取)
- Selector:众多Selector实现中有一个SelectableChannel实现,表示可选择的通道,它可以注册到Selector中,这样就能被Selector管理。一个Selector可以管理多个SelectableChannel(轮询),当Channel中的数据准备好时,Selector就会接到通知,得到那些已经准备好的数据。
下面是使用NIO改造的Echo服务器
public class NIOEchoServer {private Selector selector;private ExecutorService tp = wCachedThreadPool();public static Map<Socket, Long> time_start = new HashMap<>();private void startServer() throws IOException {selector = SelectorProvider.provider().openSelector();//获取服务端的SocketChannel实例ServerSocketChannel ssc = ServerSocketChannel.open();//设置为费阻塞模式figureBlocking(false);//绑定端口InetSocketAddress isa = new LocalHost(), 8000);ssc.socket().bind(isa);//绑定到Selector上,注册感兴趣的事件为Accept//注册后Selector就能为这个Channel服务了//SelectionKey表示一对Selector和Channel的关系SelectionKey acceptKey = ister(selector, SelectionKey.OP_ACCEPT);for(;;) {//是一个阻塞方法,如果当前数据没有准备好就一直等待。selector.select();Set readKeys = selector.selectedKeys();Iterator i = readKeys.iterator();long e = 0;while (i.hasNext()) {SelectionKey sk = (SelectionKey) i.next();//当处理一个SelectionKey之后务必要删除,以免重复处理i.remove();if (sk.isAcceptable()) {//进行客户端的接收doAccept(sk);} else if (sk.isValid() && sk.isReadable()) {//判断Channel是否已经可读if (!ainsKey(((SocketChannel) sk.channel()).socket())) {time_start.put(((SocketChannel) sk.channel()).socket(), System.currentTimeMillis());}doRead(sk);} else if (sk.isValid() && sk.isWritable()) {//是否准备好进行写doWrite(sk);e = System.currentTimeMillis();long b = ve(((SocketChannel) sk.channel()).socket());System.out.println("spend:" + (e - b) + "ms");}}}}private void doWrite(SelectionKey sk) {SocketChannel channel = (SocketChannel) sk.channel();NIOEchClient echClient = (NIOEchClient) sk.attachment();LinkedList<ByteBuffer> outq = Outq();ByteBuffer bb = Last();try {int len = channel.write(bb);if (len == -1) {disconnet(sk);return;}if (bb.remaining() == 0) {veLast();}} catch (IOException e) {System.out.println("Failed to write to client.");e.printStackTrace();disconnet(sk);}if (outq.size() == 0) {sk.interestOps(SelectionKey.OP_READ);}}private void doRead(SelectionKey sk) {SocketChannel channel = (SocketChannel) sk.channel();ByteBuffer bb = ByteBuffer.allocate(8192);int len;try {len = ad(bb);if (len < 0) {disconnet(sk);}} catch (IOException e) {System.out.println("Failed to read from clent.");e.printStackTrace();disconnet(sk);}bb.flip();tp.execute(new HandleMsg(sk,bb));}class HandleMsg implements Runnable {SelectionKey sk;ByteBuffer bb;public HandleMsg(SelectionKey sk, ByteBuffer bb) {this.sk = sk;this.bb = bb;}@Overridepublic void run() {NIOEchClient echoClient = (NIOEchClient) sk.attachment();queue(bb);sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);selector.wakeup();}}private void doAccept(SelectionKey sk) {//与客户端建立连接ServerSocketChannel server = (ServerSocketChannel) sk.channel();SocketChannel clientChannel;try {clientChannel = server.accept();//配置为非阻塞模式figureBlocking(false);//表示对读操作感兴趣SelectionKey clientKey = ister(selector, SelectionKey.OP_READ);//共享client变量NIOEchClient echoClient = new NIOEchClient();clientKey.attach(echoClient);InetAddress clientAddress = clientChannel.socket().getInetAddress();System.out.println("Accepted connection from:" + HostAddress());} catch (IOException e) {e.printStackTrace();}}private class NIOEchClient {private LinkedList<ByteBuffer> outq;public NIOEchClient() {outq = new LinkedList<>();}public LinkedList<ByteBuffer> getOutq() {return outq;}public void enqueue(ByteBuffer byteBuffer) {outq.addFirst(byteBuffer);}}}
使用NIO编程的客户端:
public class NIOClient {private Selector selector;public void init(String ip, int port) {try {SocketChannel channel = SocketChannel.open();figureBlocking(false);this.selector = SelectorProvider.provider().openSelector();t(new InetSocketAddress(ip, port));ister(selector, SelectionKey.OP_CONNECT);} catch (IOException e) {e.printStackTrace();}}public void working() throws IOException {while (true) {if (!selector.isOpen()) {break;}selector.select();Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = ();ve();if (key.isConnectable()) {connect(key);} else if (key.isReadable()) {read(key);}}}}private void read(SelectionKey key) throws IOException {SocketChannel channel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(100);ad(buffer);byte[] array = buffer.array();String msg = new String(array).trim();System.out.println("客户端收到:" + msg);channel.close();key.selector().close();}public void connect(SelectionKey key) throws IOException {SocketChannel channel = (SocketChannel) key.channel();if (channel.isConnectionPending()) {channel.finishConnect();}figureBlocking(false);channel.write(ByteBuffer.wrap(new String("hello server").getBytes()));ister(this.selector, SelectionKey.OP_READ);}
}
AIO是异步IO的缩写。虽然在网络操作系统中提供了非阻塞的方法,但是NIO的IO行为还是同步的。对于NIO来说,我们的业务线程是在IO操作准备好时,得到通知,接着由这个线程进行IO操作,IO操作本身还是同步的。对于AIO来说,则更加进了一步,它不是在IO准备好时再通知线程,而是在IO操作已经完后后,再给线程发出通知。因此AIO是完全不会阻塞的。此时,我们的业务逻辑将变成一个回调函数,等待IO操作完成后,由系统出发。下面是AIO实现Echo服务器的案例:
public class AIOEchoServer {public final static int PORT = 8000;private AsynchronousServerSocketChannel server;public AIOEchoServer() throws IOException {server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(PORT));}public void start() {System.out.println("Server listen on " + PORT);//注册事件和事件完成后的处理器server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {final ByteBuffer buffer = ByteBuffer.allocate(1024);@Overridepublic void completed(AsynchronousSocketChannel result, Object attachment) {System.out.println(Thread.currentThread().getName());Future<Integer> writeResult = null;buffer.clear();try {ad(buffer).get(100, TimeUnit.SECONDS);buffer.flip();writeResult = result.write(buffer);} catch (InterruptedException | ExecutionException | TimeoutException e) {e.printStackTrace();} finally {try {server.accept(null, this);();result.close();} catch (InterruptedException | ExecutionException | IOException e) {e.printStackTrace();}}}@Overridepublic void failed(Throwable exc, Object attachment) {System.out.println("failed: " + exc);}});}
}
AIO Echo客户端的实现
public class AIOClient {public static void main(String[] args) throws IOException, InterruptedException {AsynchronousSocketChannel client = AsynchronousSocketChannel.open();t(new InetSocketAddress("localhost", 8000), null, new CompletionHandler<Void, Object>() {@Overridepublic void completed(Void result, Object attachment) {client.write(ByteBuffer.wrap("Hello".getBytes()), null, new CompletionHandler<Integer, Object>() {@Overridepublic void completed(Integer result, Object attachment) {try {ByteBuffer buffer = ByteBuffer.allocate(1024);ad(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {@Overridepublic void completed(Integer result, ByteBuffer attachment) {buffer.flip();System.out.println(new String(buffer.array()));try {client.close();} catch (IOException e) {e.printStackTrace();}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {}});} catch (Exception e) {e.printStackTrace();}}@Overridepublic void failed(Throwable exc, Object attachment) {}});}@Overridepublic void failed(Throwable exc, Object attachment) {}});Thread.sleep(1000);}
}
Java是面向对象的编程方式,同对于其他某些语言来说,经常会采用函数式编程,也就是可以将函数作为参数传递给另外一个函数,这种方式存在很多的优势
所谓副作用就是指函数修改了外部变量和全局变量,函数式编程中要尽量避免这种修改。
函数式编程是申明式编程,而不是命令式编程。也就是采用特定语法申明要做什么操作,而不是使用具体的命令执行对应的操作。
函数式编程中,几乎所有传递的对象都不会被轻易修改,因此函数式编程更加易于并行,甚至完全不用担心线程安全的问题。
函数式编程更加简洁,能够减少代码量
Java8提供了函数式接口的概念,所谓函数式接口,简单来说,就是只定义了一个抽象方法的接口,并添加了@FunctionalInterface接口。实际上只要接口符合函数式接口的定义,不加该注解也能被识别,就和@Override一样。需要注意的是:
- 函数式接口只能有一个抽象方法,但不是只能有一个方法。Java 8中接口存在的实例方法(接口默认方法),和任何Object类实现的方法都不能算抽象方法
因此下面的接口符合函数式接口要求:
@FunctionalInterface
public interface IntHandler {void handle(int i);boolean equals(Object o);
}
@FunctionalInterface
public interface IntHandler {void handle(int i);boolean equals(Object o);default void print(){ }
}
在Java8之前的版本,接口只能包含抽象方法,但从Java8后,接口也可以包含若干个实例方法。如下所示:
public interface IHorse {void eat();default void run(){System.out.println("horse run");}
}
这弥补了Java单一继承的不便,使类拥有了多继承的能力。但是和其他多继承会出现同样的问题,如果多个接口的方法名相同,那么调用时会出错。解决方法就是在调用时显式调用父接口:
public class Mule implements IHorese, IDonkey,IAnimal{@Overridepublic void run(){IHorse.super.run();}
}
Java8中的Comparator接口就实现了默认方法。
lambada表达式可以说是函数式编程的核心,lambda表达式即匿名函数。比如:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4);
numbers.forEach((Integer value) -> System.out.println(value));
lambda表达式也可以访问外部的局部变量,这些外部的局部变量一般定义为final,即使没有显式定义为final,lambada也会默认为final,如果期间产生修改会抛异常。
final int num = 2;
Function<Integer, Integer> stringConverter = (from) -> from * num;
System.out.println(stringConverter.apply(3));
方法引用是用来简化lambda表达式的一种手段。他通过类名和方法名来定位到一个静态方法或者实例方法
静态方法引用:ClassName::methodName
实例上的实例方法:instanceReference::methodName
父类上的实例方法引用:super::methodName
类型上的实例方法引用:ClassName::methodName
构造方法引用:Class::new
数组构造方法引用::TypeName[]:new
案例:
List<User> users = new ArrayList<>();
for(int i=1;i<10;i==){users.add(new User(i,"billy"String(i)));
}
//java会自动识别流中的元素,识别是作为参数还是调用者
//如果类中有同名的静态和非静态方法,此时编译器识别不了stream中的元素是作为参数(对于静态方法是作为参数)还是调用者,因此会报错
users.stream().map(User::getName).forEeach(System.out::println)
使用构造函数:
public class ConstrMethodRef{@FunctionalInterfaceinterface UserFactory<U extends User>{U create(int id, String name);}static UserFactory<User> uf = User:new;
}
UserFactory作为User的工厂类,是一个函数式接口。当使用User:new创建接口实例时,系统会根据ate()的函数签名来选择合适的User构造函数。
下面将介绍java8函数式编程式怎么一步步演进的:
使用stream流简单实现打印数组:
Arrays.stream(arr).forEach(new IntConsumer(){@Overridepublic void accept(int value){System.out.println(value);}
});
Arrays.steam()返回了一个流对象。类似于结合或者数组,流对象也是一个集合,它将给予我们遍历处理流内元素的能力。由于我们数组存的是Integer,所以forEach方法可以接受一个处理int的IntConsumer接口,这里采用匿名类实现处理。
由于forEach中的参数是可以从上下文中推导出来的,因此可以简化为:
Arrays.stream(arr).forEach((final)int x->{System.out.println(x);
});
同时由于参数的类型也是可以推导的,再简化为:
Arrays.stream(arr).forEach(x->{System.out.println(x)
});
进一步简化写法,去掉花括号:
Arrays.stream(arr).forEach((x)->System.out.println(x))
实际上如果反编译代码看,lambada表达式实际就是通过匿名类来实现的。同时,由于Java 8还支持了方法引用 ,通过引用的推导,甚至连参数申明和传递都可以省略:
Arrays.stream(arr).forEach(System.out::println);
下面的例子进行了两次输出,一次输出到标准错误,一次输出到标准输出中:
IntConsumer outprintln = System.out::println;
IntConsumer errprintln = ::println;
//将两个IntConsumer结合,合成一个新的IntConsumer
Arrays.stream(arr).forEach(outprintln.andThen(errprintln));
Java8中,可以在接口不变的情况下,将流改为并行流。这样,就可以自然地使用多线程集合中的数据处理。
首先是串行方式,有一个PrimeUtil类接收一个int参数,并判断是否为质数。串行的函数式编程:
IntSream.range(1,1000).filter(PrimeUtil::isPrime).count();
改造成并行:
IntStream.range(1,10000).parallel().filter(PrimeUtil:isPrime).count();
通过parallel得到并行流,此时isPrime方法将会被并行调用。
下面这段代码统计集合内所有学生的平均分:
List<Student> ss = new ArrayList<Student>();
double ave = ss.stream().mapToInt(s->s.score).average().getAsDouble();
并行化:
double ave = ss.parallelStream().mapToInt(s->s.score).average().getAsDouble();
Arrays.parallelSort(arr)
另外Arrrays还增加了用于数组赋值的方法:
Random r = new Random();
Arrays.setAll(arr,(i)-&Int());
并行化:
Arrays.parallelSetAll(arr,(i)-&Int());
CompletableFuture是Java8新增的一个超大型工具类。它同时实现了Future接口和CompletionStage接口,CompletionStage接口是Java8中新增的,有多达40多种方法,是为了函数式编程中的流式调用准备的,所以是超大型工具类。通过CompletionStage接口我们可以在一个执行结果上进行多次流式调用:
stage.thenApply(x->square(x)).thenAccept(x->System.out.print(x)).thenRun(()->System.out.println())
和Future一样,CompletableFuture可以作为函数调用的契约。如果你想CompletableFuture请求一个数据,如果数据还没有准备好,线程救护等待。但是通过CompletableFuture可以手动设置完成的状态,如下:
public class AskThread implements Runnable {CompletableFuture<Integer> re = null;public AskThread(CompletableFuture<Integer> re) { = re;}@Overridepublic void run() {int myRe = 0;try {//刚开始会阻塞myRe = re.get() * re.get();} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}System.out.println(myRe);}public static void main(String[] args) throws InterruptedException {final CompletableFuture<Integer> future = new CompletableFuture<>();new Thread(new AskThread(future)).start();Thread.sleep(1000);//发通知完成futureplete(60);}
}
CompletableFuture接口有一些提供异步调用的接口,比如如下案例:
public class AsynCal {public static Integer calc(Integer para) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return para * para;}public static void main(String[] args) throws ExecutionException, InterruptedException {//这里创建了一个异步调用final CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> calc(50));//获取异步调用的结果System.out.());}
}
public class StreamCalc {public static Integer calc(Integer value) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return value * value;}public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> fu = CompletableFuture.supplyAsync(() -> calc(50)).thenAccept((i) -> String(i)).thenApply((str) -> """ + str + """).thenAccept(System.out::println);fu.get();}
}
通过exceptionally来进行一场处理
public class ExecptionCalc {public static Integer calc(Integer para) {return para / 0;}public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> fu = CompletableFuture.supplyAsync(() -> calc(50)).exceptionally(ex -> {System.out.String());return 0;}).thenApply((i) -> String(i)).thenApply((str) -> "str:" + str).thenAccept(System.out::println);fu.get();}
}
通过thenCompse方法组合多个CompletableFuture进行组合:
public class ComposeCalc {public static Integer calc(Integer para) {return para / 2;}public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> calc(50)).thenCompose((i) -> CompletableFuture.supplyAsync(() -> calc(i))).thenApply((str) -> "str" + str).thenAccept(System.out::println);(); }
}
另一种组合的方法是thenCombine(),thenCombine首先完成当前CompletableFuture和other的执行,接着将两者的执行结果传递给BiFunction(第二个参数):
public class CombineCalc {public static Integer calc(Integer para) {return para / 2;}public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> intFuture = CompletableFuture.supplyAsync(() -> calc(50));CompletableFuture<Integer> intFuture2 = CompletableFuture.supplyAsync(() -> calc(25));CompletableFuture<Void> future = intFuture.thenCombine(intFuture2, (i, j) -> (i + j)).thenApply((str) -> "str:" + str).thenAccept(System.out::println);();}
}
StampedLock是Java8中引入的一种新的锁机制。简单的理解,可以认为它是读写锁的一个改进版本。读写锁虽然分离了读和写的功能,使得读和读之间可以完全并发。但是读和写之间仍然是冲突的。读锁会完全阻塞写锁,它使用的依然是悲观的锁策略。而StampedLock则提供了一种乐观的读策略。这种乐观的锁非常类似无锁的操作,使得乐观锁完全不会阻塞写线程。
class Point {private double x, y;private final StampedLock sl = new StampedLock();void move(double deltaX, double deltaY) { // 独占锁方法long stamp = sl.writeLock();try {x += deltaX;y += deltaY;} finally {//释放锁sl.unlockWrite(stamp);}}double distanceFromOrigin() { // 只读方法//尝试读,返回一个类似时间戳的变量long stamp = sl.tryOptimisticRead();double currentX = x, currentY = y;//判断stamp是否被修改if (!sl.validate(stamp)) {//有修改,使用悲观锁stamp = sl.readLock();try {currentX = x;currentY = y;} finally {//记得释放锁sl.unlockRead(stamp);}}//没有被修改return Math.sqrt(currentX * currentX + currentY * currentY);}void moveIfAtOrigin(double newX, double newY) { // upgrade// Could instead start with optimistic, not read modelong stamp = sl.readLock();try {while (x == 0.0 && y == 0.0) {long ws = sl.tryConvertToWriteLock(stamp);if (ws != 0L) {stamp = ws;x = newX;y = newY;break;}else {sl.unlockRead(stamp);stamp = sl.writeLock();}}} finally {sl.unlock(stamp);}}
}
StampedLock内部实现时,使用类似CAS操作的死循环反复尝试测策略。在挂起线程时,使用的是Unsage.park()函数,而park()函数在遇到线程中断时,会直接返回,不会抛出异常。而StampedLock的死循环逻辑中,没有处理有关中断逻辑,因此中断后会再次进入循环。而当退出条件得不到满足时,就会出现疯狂占用CPU的情况。下面是一个案例:
public class StampedLockCPUDemo {static Thread[] hodCpuThreads = new Thread[3];static final StampedLock lock = new StampedLock();public static void main(String[] args) throws InterruptedException {new Thread(){//占用写锁public void run() {long readLong = lock.writeLock();LockSupport.parkNanos(6000000000L);lock.unlockWrite(readLong);} }.start();Thread.sleep(100);for (int i = 0; i < 3; i++) {hodCpuThreads[i] = new Thread(new HoldCPUReadThread());//请求读锁会失败,被中断后会进入循环hodCpuThreads[i].start();}Thread.sleep(10000);for (int i = 0; i < 3; i++) {//中断线程,CPU占用飙升hodCpuThreads[i].interrupt();}}private static class HoldCPUReadThread implements Runnable {@Overridepublic void run() {long lockr = adLock();System.out.println(Thread.currentThread().getName() + " 获得锁");lock.unlockRead(lockr);}}
}
StampedLock的内部实现时基于CLH锁的。CLH锁是一种自旋锁,它保证没有饥饿发生,并且可以保证FIFO的服务顺序。
基本思想:内部维护一个等待线程队列,所有申请锁但是没有成功的线程都加入这个队列。当试图获取锁的时候会自旋判断前一个节点是否释放锁,如果释放就会获取到锁。
之前我们说过,无锁的原子类操作使用系统的CAS指令,有着远远超越锁的性能。Java8中引入了LongAdder类,这个类也在atomic包下,因此也是使用了CAS指令,它的性能更好。
在大量修改失败时,原子操作会进行多次循环操作,因此性能会受到影响。那么当竞争激烈的时候,我们应该如何进一步提高系统的性能呢,一种基本的方案就是可以使用热点分离。比如可以将AtomicInteger中的内部核心数据value分离成一个数组,当然这种分离不是一开始就分离而是有冲突的时候会分离。下面是几种方式多线程累加数据的比较,结果是使用LongAdder更快:
public class LongAddrDemo {private static final int MAX_THREADS = 3;private static final int TASK_COUNT = 3;private static final int TARGET_COUNT = 100000000;private AtomicLong acount = new AtomicLong(0L);private LongAdder lacount = new LongAdder();private long count = 0;static CountDownLatch cdlsyn = new CountDownLatch(TASK_COUNT);static CountDownLatch cdlatomic = new CountDownLatch(TASK_COUNT);static CountDownLatch cdladdr = new CountDownLatch(TASK_COUNT);protected synchronized long inc() {return ++count;}protected synchronized long getCount() {return count;}public class SynThread implements Runnable {protected String name;protected long starttime;LongAddrDemo out;public SynThread(long starttime, LongAddrDemo out) {this.starttime = starttime;this.out = out;}@Overridepublic void run() {long v = Count();while (v < TARGET_COUNT) {v = out.inc();}long endtime = System.currentTimeMillis();System.out.println("SynThread spend:" + (endtime - starttime) + "ms");untDown();}}//采用传统同步方式public void testSync() throws InterruptedException{ExecutorService exe = wFixedThreadPool(MAX_THREADS);long starttime = System.currentTimeMillis();SynThread synThread = new SynThread(starttime, this);for (int i = 0; i < TARGET_COUNT; i++) {exe.submit(synThread);}cdlsyn.await();exe.shutdown();}public class AtomicThread implements Runnable {protected String name;protected long starttime;public AtomicThread(long starttime) {this.starttime = starttime;}@Overridepublic void run() {long v = ();while (v < TARGET_COUNT) {v = acount.incrementAndGet();}long endtime = System.currentTimeMillis();System.out.println("AtomicThread spend: " + (endtime - starttime));untDown();}//采用传统的原子类public void testAtomci() throws InterruptedException{ExecutorService exe = wFixedThreadPool(MAX_THREADS);long starttime = System.currentTimeMillis();AtomicThread atomic = new AtomicThread(starttime);for (int i = 0; i < TASK_COUNT; i++) {exe.submit(atomic);}cdlatomic.await();exe.shutdown();}public class LongAddrThread implements Runnable {protected String name;protected long starttime;public LongAddrThread(long starttime) {this.starttime = starttime;}@Overridepublic void run() {long v = lacount.sum();while (v < TARGET_COUNT) {lacount.increment();v = lacount.sum();}long endtime = System.currentTimeMillis();System.out.println("LongAddr spend: " + (endtime - starttime) + "ms");untDown();}}//采用LongAdderpublic void testAtomiclong() throws InterruptedException {ExecutorService exe = wFixedThreadPool(MAX_THREADS);long starttime = System.currentTimeMillis();LongAddrThread atomic = new LongAddrThread(starttime);for (int i = 0; i < TASK_COUNT; i++) {exe.submit(atomic);}cdladdr.await();exe.shutdown();}}}
LongAccumulator是LongAdder的增强版,他们有公共的父类Stripe64,因此优化方式是一样的。对于LongAdder来说,每次只能对给定的整数执行一次加法,而LongAccumulator则可以实现任意函数操作。
public class LongAccumulatorDemo {public static void main(String[] args) throws InterruptedException {LongAccumulator accumulator = new LongAccumulator(Long::max, Long.MAX_VALUE);Thread[] ts = new Thread[1000];for (int i = 0; i < 1000; i++) {ts[i] = new Thread(()->{//具体操作的实现Random random = new Random();long value = Long();accumulator.accumulate(value);});ts[i].start();}for (int i = 0; i < 1000; i++) {ts[i].join();}System.out.println(accumulator.longValue());}
}
Akka提供了一种Actor的并发模型,其颗粒度比线程小,这意味着在系统中可以启用大量的Actor。其次Akka提供了一套容错机制,允许在Actor出现异常时进行一些恢复或者重置操作。最后Akka不仅可以在单机上构建并发程序,也可以在网络中构建分布式程序,并提供位置透明的Actor定位服务。
Akka中以Actor为执行单元而不是线程。在Actor模型中,我们失去了对象的方法的调用,我们并不是通过Actor对象的某一个方法来告诉Actor需要做什么,而是给Actor发送一条消息。当一个Actor收到消息后,它有可能会根据消息的内容做出某些行为,包括更改自身状态。但是这种更改不是被动的而是主动的。
下面是一个入门案例:
第一个Actor的实现:
//UntypedAbstractActor表示无类型的Actor,还有一种是有类型的
public class Gretter extends UntypedAbstractActor {//定义了两种消息类型public static enum Msg{GREET,DONE}//接收到消息后的回调函数@Overridepublic void onReceive(Object message) throws Throwable //接收Greet消息并打印Helloworldif (message == Msg.GREET) {System.out.println("Hello world");//发送DONE消息getSender().tell(Msg.DONE, getSelf());} else {unhandled(message);}}
}
与Greeter交互的Actor是:
public class HelloWorld extends UntypedActor {ActorRef greeter;@Overridepublic void onReceive(Object message) throws Throwable {if (message == Gretter.Msg.DONE) {//又发送了一次GREET,会打印ll(Gretter.Msg.GREET, getSelf());//同时关闭自己getContext().stop(getSelf());}else {unhandled(message);}}//Actor启动前会被框架调用@Overridepublic void preStart() throws Exception {//使用HelloWorld的上下文创建,因此它属于HelloWorld的子Actorgreeter = getContext().ate(Gretter.class), "greeter");System.out.println("Greeter Actor Path:" + greeter.path());//发送Greet消息,会打印ll(Gretter.Msg.GREET, getSelf());}public static void main(String[] args) {//f只配置了日志级别//第一个参数是系统名ActorSystem system = ate("Hello", ConfigFactory.load(f"));ActorRef a = system.ate(HelloWorld.class), "helloWorld");System.out.println("HelloWorld Actor Path:" + a.path());}
}
输出:
HelloWorld Actor Path:akka://Hello/user/helloWorld ##Actor创建的时候打印的
Greeter Actor Path:akka://Hello/user/helloWorld/greeter
Hello world
Hello world
Akka会自动在线程池中选择线程来执行我们的Actor,因此,多个不同的Actor有可能会被同一个线程执行,同时,一个Actor也有可能被不同的线程执行。
整个Akka应用是由消息驱动的。消息是除了Actor之外最重要的核心组件。作为在并发程序中的核心组件,在Actor之间传递的消息应该满足不可变性,也就是不变模式,因为可变的消息无法高效的在并发环境中使用。
对于消息投递有三种不同的策略:
1. 至多一次投递,可能会出现消息丢失
2. 至少一次投递,可能会出现多次,直到成功
3. 精确的消息投递,确保一次成功投递
此外,Actor 可以保证两个actor消息投递的顺序性
Actor在actorOf函数被调用后开始建立,Actor实例建立后,会调用preStart方法,这个方法可以进行一些资源的初始化工作。如果Actor出现一些异常会需要重启,此时会调用preRestart()方法,接着会创建一个新的Actor的实例,当新实例创建后,会调用postRestart()方法,表示启动完成,同时新实例会替代老实例。可以调用stop方法停止Actor,或者给Actor发送一个PosionPill。Actor停止时,postStop方法会被调用。同时这个Actor的监视者会收到一个Terminated消息。常见生命周期方法:
public class MyWorker extends UntypedAbstractActor{private final LoggingAdapter log = Logger(getContext().system(), this);public static enum Msg{WORKING,DONE,CLOSE}@Overridepublic void onReceive(Object message) throws Throwable {if (message == Msg.WORKING) {System.out.println("i am working");}if (message == Msg.DONE) {System.out.println("Stop working");}if (message == Msg.CLOSE) {System.out.println("i will shutdown");getSender().tell(Msg.CLOSE, getSelf());getContext().stop(getSelf());}else {unhandled(message);}}//资源初始化@Overridepublic void preStart() throws Exception {System.out.println("MyWorker is starting");}//资源释放@Overridepublic void postStop() throws Exception {System.out.println("MyWorker is stopping");}
}
创建一个Myworker的监视者,在Myworker终止的时候监视者会收到消息:
public class WatchActor extends UntypedActor{private final LoggingAdapter log = Logger(getContext().system(), this);public WatchActor(ActorRef ref) {getContext().watch(ref);}@Overridepublic void onReceive(Object message) throws Throwable {//接收Myworker中终止的消息if (message instanceof Terminated) {System.out.println("shuting down system");getContext().system().terminate();}else {unhandled(message);}}public static void main(String[] args) {ActorSystem system = ate("deadwatch", ConfigFactory.load(f"));ActorRef worker = system.ate(MyWorker.class), "worker");system.ate(WatchActor.class, worker), "watcher");ll(MyWorker.Msg.WORKING, Sender());ll(MyWorker.Msg.DONE, Sender());Instance(), Sender());}
}
在Akka框架内,父Actor可以对子Actor进行监督,监控Actor的行为是否异常。大体上监督策略可以分为两种:一种是OneForOneStrategy,另一种是AllForOneStrategy
OneForOneStrategy:父Actor只会对出问题的子Actor进行处理
AllForOneStrategy:父Actor会对出问题的子Actor以及它所有的兄弟都进行处理
案例如下:
首先构造抛出异常的Actor:
public class RestartActor extends UntypedActor {public enum Msg{DONE,RESTART}@Overridepublic void onReceive(Object message) throws Throwable {if (message == Msg.DONE) {getContext().stop(getSelf());} else if (message == Msg.RESTART) {System.out.println(((Object) null).toString());double a = 0 / 0;}unhandled(message);}
}
构造监督类:
public class Supervisor extends UntypedActor {//创建异常处理的策略private static SupervisorStrategy strategy = new OneForOneStrategy(3, ate(1, TimeUnit.MINUTES), new Function<Throwable, SupervisorStrategy.Directive>() {@Overridepublic SupervisorStrategy.Directive apply(Throwable t) throws Exception {if(t instanceof ArithmeticException){System.out.println("meet ArithmeticException, just resume");sume();} else if (t instanceof NullPointerException) {System.out.println("meet NullPointException,restart");start();} else if (t instanceof IllegalArgumentException) {return SupervisorStrategy.stop();} else {return SupervisorStrategy.escalate();}}});@Overridepublic void onReceive(Object message) throws Throwable {if (message instanceof Props) {//用来新建一个名为restartActor的子Actor,这个字Actor就由当前的Supervisor进行监督getContext().actorOf((Props) message, "restartActor");} else {unhandled(message);}}@Overridepublic SupervisorStrategy supervisorStrategy() {return strategy;}public static void main(String[] args) {ActorSystem system = ate("lifecycle", ConfigFactory.load(f"));customStrategy(system);}private static void customStrategy(ActorSystem system) {//创建了SupervisorActorRef a = system.ate(Supervisor.class), "Supervisor");//发送了一个RestartActor的Props,使得Supervisor创建ate(RestartActor.class), Sender());ActorSelection sel = system.actorSelection("akka://lifecycle/user/Supervisor/restartActor");for (int i = 0; i < 100; i++) {//向RestartActor发送了100多条RESTART消息ll(RestartActor.Msg.RESTART, Sender());}}
}
在一个ActorSystem中,可能存在大量的Actor,如何才能有效进行批量的管理和通信,Akka给我们提供了一个ActorSelection类,能够批量进行消息发送:
for(int i = 0;i<WORKER_COUNT;i++){workers.add(system.ate(MyWorker.class,i),"worker_"+i));
}
//批量发送
ActorSelection selection = getContext().actorSelection("/user/worker_*");
ll(5,getSelf());
使用收件箱,可以很方便的对Actor进行消息的整体发送和接收
public static void main(String[] args) throws TimeoutException {ActorSystem system = ate("inboxdemo", ConfigFactory.load(f"));ActorRef worker = system.ate(MyWorker.class), "worker");//创建一个Inbox,监视Myworker,能够再起停止后收到消息 final Inbox inbox = ate(system);inbox.watch(worker);inbox.send(worker, Msg.WORKING);inbox.send(worker, Msg.DONE);inbox.send(worker, Msg.CLOSE);while (true) {Object msg = ate(1, TimeUnit.SECONDS));if (msg == Msg.CLOSE) {System.out.println("myworker is closing");} else if (msg instanceof Terminated) {System.out.println("myworkder is dead");inate();break;} else {System.out.println(msg);}}}
有时候,我们也许会使用一组Actor,而不是一个Actor来提供服务。这一组Actor中是对等的。也就是说你可以找任何一个Actor的为你服务。这种情况下,我们可以使用一些负载均衡的策略:
public class WatchActor extends UntypedActor {private final LoggingAdapter log = Logger(getContext().system(), this);public Router router;{List<Routee> routees = new ArrayList<>();for (int i = 0; i < 5; i++) {ActorRef worker = getContext().ate(MyWorker.class), "worker_" + i);getContext().watch(worker);routees.add(new ActorRefRoutee(worker));}//轮询的路由策略router = new Router(new RoundRobinRoutingLogic(), routees);}@Overridepublic void onReceive(Object message) throws Throwable {if (message instanceof MyWorker.Msg) {ute(message, getSender());} else if (message instanceof Terminated) {router = veRoutee(((Terminated) message).actor());System.out.println("worker is closed");if (utees().size() == 0) {System.out.println("close system");//RouteMain.flag.send(false)getContext().system().terminate();}}else {unhandled(message);}}
}
如果Actor需要根据不同的状态对同一条消息作出不同的反应该怎么处理呢,Akka给我们提供了一套方案,下面是案例:
public class BabyActor extends UntypedActor {private final LoggingAdapter log = Logger(getContext().system(), this);public static enum Msg{SLEEP,PLAY,CLOSE}//angry状态Procedure<Object> angry = new Procedure<Object>() {@Overridepublic void apply(Object param) throws Exception {System.out.println("angryApply:" + param);if (param == Msg.SLEEP) {getSender().tell("i am already angry", getSelf());System.out.println("I am already angry");} else if (param == Msg.PLAY) {System.out.println("i like playing");getContext().become(happy);}}};//happy状态Procedure<Object> happy = (Procedure<Object>) param -> {System.out.println("happy apply");if (param == Msg.PLAY) {getSender().tell("I am already happy", getSelf());System.out.println("i am already happy");} else if (param == Msg.SLEEP) {System.out.println("I dont want to sleep");getContext().become(angry);}};@Overridepublic void onReceive(Object message) throws Throwable {//根据消息转换状态if (message == Msg.SLEEP) {getContext().become(angry);} else if (message == Msg.PLAY) {getContext().become(happy);} else {unhandled(message);}}
}
Actor都是通过异步消息通信的,同样也支持Future模式:
//通过Patterns.ask来想worker发消息,这个传的是Integer,值为5,1500是超时时间
Future<Object> f = ask(worker,5,1500);
//获取异步值
int re = (sult(ate(6,TimeUnit.SECONDS));
上面的方法在获取值时可能需要等待,Akka可以重定向结果到其他的Actor:
f = ask(worker,6,1500)
//printer是另一个Actor,会打印相关的信息
pipe(f,system.dispatcher()).to(printer)
在Akka的编程模型中,Actor之间主要通过消息进行信息传递,因此,很少发生Actor需要访问同一个共享变量的情况。在Akka中也提供了通过Agent对一个变量的异步更新,每一个对Agent的变更称为action,多个action同时下发会进行并发调度执行,在任意时刻,一个Agent最多只能执行一个action,对于某一个线程来说,它执行action的顺序与它的发生顺序一致。Agent可以使用send或者alter方法来发送修改动作,前者没有返回值,后者有返回值(future)
某一项工作可能由多个Actor协作完成,如果某一个actor失败了,要做回滚操作.Akka支持这种内存中事务操作
本文发布于:2024-02-03 07:16:07,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170691576749475.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |