public enum State {/*** 新生*/NEW,/*** 运行*/RUNNABLE,/*** 阻塞*/BLOCKED,/*** 等待(不会超时的等待)*/WAITING,/*** 超时等待(会超时的等待)*/TIMED_WAITING,/*** 终止*/TERMINATED;}
public ReentrantLock() {sync = new NonfairSync();}/*** Creates an instance of {@code ReentrantLock} with the* given fairness policy.** @param fair {@code true} if this lock should use a fair ordering policy*/public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}
synchronized 和Lock的区别
1、Synchronized内置的Java关键字,Lock是一个Java类
2、Synchronized 无法判断获取锁的状态,Lock 可以判断是否获取到了锁
3、Synchronized 会自动释放锁,lock必须要手动释放锁!如果不释放锁,死锁
4、Synchronized 线程1(获得锁,阻塞)、线程2(等待,傻傻的等) ;Lock锁就不一定会等待下去;
5、Synchronized 可重入锁,不可以中断的,非公平;Lock,可重入锁,可以判断锁,非公平(可以自己设置);
6、Synchronized适合锁少量的代码同步问题,Lock适合锁大量的同步代码!
public class PC {public static void main(String[] args) {Data data = new Data();new Thread(()->{for (int i = 0; i < 10; i++) {data.decrement();}},"A").start();new Thread(()->{for (int i = 0; i < 10; i++) {data.increment();}},"B").start();new Thread(()->{for (int i = 0; i < 10; i++) {data.decrement();}},"C").start();new Thread(()->{for (int i = 0; i < 10; i++) {data.increment();}},"D").start();}
}class Data {private int number = 0;public synchronized void increment() {if (number > 0) {try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}number++;System.out.println(Thread.currentThread().getName() + "=>" + number);ifyAll();}public synchronized void decrement() {if (number <= 0) {try {wait();} catch (InterruptedException e) {e.printStackTrace();}}number--;System.out.println(Thread.currentThread().getName() + "=>" + number);ifyAll();}
}
public class PC {public static void main(String[] args) {Data data = new Data();new Thread(()->{for (int i = 0; i < 10; i++) {data.decrement();}},"A").start();new Thread(()->{for (int i = 0; i < 10; i++) {data.increment();}},"B").start();new Thread(()->{for (int i = 0; i < 10; i++) {data.decrement();}},"C").start();new Thread(()->{for (int i = 0; i < 10; i++) {data.increment();}},"D").start();}
}class Data {private int number = 0;public synchronized void increment() {while (number > 0) {try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}number++;System.out.println(Thread.currentThread().getName() + "=>" + number);ifyAll();}public synchronized void decrement() {while (number <= 0) {try {wait();} catch (InterruptedException e) {e.printStackTrace();}}number--;System.out.println(Thread.currentThread().getName() + "=>" + number);ifyAll();}
}
import urrent.locks.Condition;
import urrent.locks.Lock;
import urrent.locks.ReentrantLock;public class PC {public static void main(String[] args) {Data data = new Data();new Thread(()->{for (int i = 0; i < 10; i++) {data.decrement();}},"A").start();new Thread(()->{for (int i = 0; i < 10; i++) {data.increment();}},"B").start();new Thread(()->{for (int i = 0; i < 10; i++) {data.decrement();}},"C").start();new Thread(()->{for (int i = 0; i < 10; i++) {data.increment();}},"D").start();}
}class Data {private int number = 0;Lock lock = new ReentrantLock();Condition condition = wCondition();public void increment() {lock.lock();try {while (number > 0) {try {condition.await();} catch (InterruptedException e) {e.printStackTrace();}}number++;System.out.println(Thread.currentThread().getName() + "=>" + number);condition.signalAll();} finally {lock.unlock();}}public void decrement() {lock.lock();try {while (number <= 0) {try {condition.await();} catch (InterruptedException e) {e.printStackTrace();}}number--;System.out.println(Thread.currentThread().getName() + "=>" + number);condition.signalAll();} finally {lock.unlock();}}
}
Condition 的优势 精准的通知 唤醒线程
import urrent.locks.Condition;
import urrent.locks.Lock;
import urrent.locks.ReentrantLock;
public class ConditionTest {public static void main(String[] args) {ConditionTest01 conditionTest01 = new ConditionTest01();new Thread(() -> {for (int i = 0; i < 10; i++) {hod01();}},"AAAA").start();new Thread(() -> {for (int i = 0; i < 10; i++) {hod02();}},"BBBB").start();new Thread(() -> {for (int i = 0; i < 10; i++) {hod03();}},"CCCC").start();}
}class ConditionTest01 {Lock lock = new ReentrantLock();Condition condition1 = wCondition();Condition condition2 = wCondition();Condition condition3 = wCondition();private int number = 2;public void method01() {lock.lock();try {
// 这里为了测试是否真的精准唤醒不会虚假唤醒而使用if 实际使用上就应该按上文说的用whileif (number != 1){try {condition1.await();} catch (InterruptedException e) {e.printStackTrace();}}number = 2;System.out.println(Thread.currentThread().getName()+"唤醒了condition2");condition2.signal();} finally {lock.unlock();}}public void method02() {lock.lock();try {if (number != 2){condition2.await();}number = 3;System.out.println(Thread.currentThread().getName()+"唤醒了condition3");condition3.signal();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}public void method03() {lock.lock();try {if (number != 3){condition3.await();}number = 1;System.out.println(Thread.currentThread().getName()+"唤醒了condition1");condition1.signal();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}
}
package com.ambition;import urrent.TimeUnit;/*** 同一个对象 两个线程 两个同步方法 谁先执行?**/public class Question1 {public static void main(String[] args) {Phone phone = new Phone();new Thread(() -> phone.sendMsg()).start();
// 延迟的目的是控制哪个线程先拿到锁try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}new Thread(() -> phone.call()).start();}
}class Phone {
// synchronized 锁的对象是方法的调用者
// 两个方法用的是同一个锁 谁先拿到谁先执行public synchronized void sendMsg() {try {TimeUnit.SECONDS.sleep(4);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("发短信");}public synchronized void call() {System.out.println("打电话");}
}
小结
new this 锁的具体的对象
static 锁的是 Class 全局唯一的模板
import java.util.*;
import urrent.CopyOnWriteArrayList;public class ListTest {
// java.util.ConcurrentModificationException 并发修改异常public static void main(String[] args) {/** 解决方案* 1.List<String> list = new Vector<>();* 2.List<String> list = Collections.synchronizedList(new ArrayList<>());* 3.List<String> list = new CopyOnWriteArrayList<>();* COW 读写分离 在写入的时候复制一份再编辑 再塞回去* */List<String> list = new CopyOnWriteArrayList<>();for (int i = 1; i <= 10; i++) {new Thread(() -> {list.add(UUID.randomUUID().toString().substring(0, 5));System.out.println(list);}, String.valueOf(i)).start();}}
}
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import urrent.CopyOnWriteArraySet;public class SetTest {public static void main(String[] args) {
// Set set = new HashSet();
// Set set = Collections.synchronizedSet(new HashSet());Set set = new CopyOnWriteArraySet();for (int i = 1; i <= 20; i++) {new Thread(() ->{set.add(UUID.randomUUID().toString().substring(0, 5));System.out.println(set);},String.valueOf(i)).start();}}
}
// 就是一个mappublic HashSet() {map = new HashMap<>();}public boolean add(E e) {return map.put(e, PRESENT)==null;}
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import urrent.ConcurrentHashMap;public class MapTest {public static void main(String[] args) {
// Map<String, String> map = new HashMap<>();
// Map<String, String> map = Collections.synchronizedMap(new HashMap<>());Map<String, String> map = new ConcurrentHashMap<>();for (int i = 1; i <= 20; i++) {new Thread(() -> {map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5));System.out.println(map);},String.valueOf(i)).start();}}
}
import urrent.Callable;
import urrent.ExecutionException;
import urrent.FutureTask;public class CallTest {public static void main(String[] args) {MyThread myThread = new MyThread();FutureTask<Integer> futureTask = new FutureTask(myThread);new Thread(futureTask).start();new Thread(futureTask,"B").start();//就算新建两个线程也只会执行一次 执行结果会被缓存try {Integer result = ();//如果是耗时操作 可能会阻塞 因为获取结果需要等待System.out.println(result);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}class MyThread implements Callable<Integer> {@Overridepublic Integer call() throws Exception {System.out.println("call……");return 1024;}
}
import urrent.CountDownLatch;public class CountDownLatchDemo {public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(6);for (int i = 1; i <= 6; i++) {new Thread(()-> {System.out.println(Thread.currentThread().getName() + "Go out");
// 计数器-untDown();},String.valueOf(i)).start();}
// 等到计数器归零 才继续往下执行countDownLatch.await();System.out.println("Close door");}
}
import urrent.BrokenBarrierException;
import urrent.CyclicBarrier;public class CyclicBarrierDemo {public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(7,() -> System.out.println("召唤神龙!!!!!!!"));for (int i = 1; i <= 7; i++) {final int temp = i;new Thread(() -> {System.out.println("收集到了第"+ temp + "颗龙珠!");try {cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}).start();}}
}
import urrent.Semaphore;
import urrent.TimeUnit;
public class SemaphoreDemo {public static void main(String[] args) {
// 作用:多个共享资源的互斥使用 高并发下限流Semaphore semaphore = new Semaphore(3);for (int i = 1; i <= 6; i++) {new Thread(()-> {try {
// 获取信号量,如果信号量满了线程等待semaphore.acquire();System.out.println(Thread.currentThread().getName()+ "抢到了车位");TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName()+ "离开了车位");
// 释放当前信号量 唤醒等待的线程lease();} catch (InterruptedException e) {e.printStackTrace();}},String.valueOf(i)).start();}}
}
import java.util.HashMap;
import java.util.Map;
import urrent.locks.ReadWriteLock;
import urrent.locks.ReentrantReadWriteLock;public class ReadWriteLockDemo {public static void main(String[] args) {MyCache myCache = new MyCache();for (int i = 1; i <= 5; i++) {new Thread(() -> {myCache.put(Thread.currentThread().getName(),"Hello World");},String.valueOf(i)).start();}for (int i = 1; i <= 5; i++) {new Thread(() -> {(Thread.currentThread().getName());},String.valueOf(i)).start();}}
}class MyCache {private Map<String,String> map = new HashMap<>();ReadWriteLock readWriteLock = new ReentrantReadWriteLock();public void put(String key, String value) {
// 写锁(独占锁) 一次只能被一个线程占有readWriteLock.writeLock().lock();try {System.out.println(Thread.currentThread().getName()+ "写入");map.put(key,value);System.out.println(Thread.currentThread().getName()+ "写入成功");} finally {readWriteLock.writeLock().unlock();}}public void get(String key) {try {
// 读锁(共享锁) 可以同时被多个线程占有adLock().lock();System.out.println(Thread.currentThread().getName()+ "读取");(key);System.out.println(Thread.currentThread().getName()+ "读取成功");} finally {adLock().unlock();}}
}
方式 | 抛出异常 | 有返回值不抛异常 | 阻塞等待 | 超时等待 |
---|---|---|---|---|
添加 | add | offer() | put | offer(E e, long timeout, TimeUnit unit) |
移除 | remove | poll() | take | poll(long timeout, TimeUnit unit) |
判断队列首 | element | peek | - | - |
package com.ambition.queen;import urrent.ArrayBlockingQueue;
import urrent.TimeUnit;/*** @program: springboot-08-shiro* @description:* @author: zhengzx* @create: 2022-03-08 15:54**/public class BlockingQueueDemo {public static void main(String[] args) {try {test04();} catch (InterruptedException e) {e.printStackTrace();}}/*** 抛出异常*/public static void test01() {ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);System.out.println(queue.add(1));System.out.println(queue.add(2));System.out.println(queue.add(3));System.out.println("========");System.out.ve());System.out.ve());System.out.ve());System.out.println(queue.element());System.out.ve());}/*** 有返回值,不抛出异常*/public static void test02() {ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);System.out.println(queue.offer(1));System.out.println(queue.offer(2));System.out.println(queue.offer(3));System.out.println(queue.offer(4));System.out.println("========");System.out.println(queue.poll());System.out.println(queue.poll());System.out.println(queue.poll());System.out.println(queue.peek());System.out.println(queue.poll());}/*** 等待,一直阻塞* @throws InterruptedException*/public static void test03() throws InterruptedException {ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);queue.put(1);queue.put(2);queue.put(3);
// queue.put(4);System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());}/*** 超时等待*/public static void test04() throws InterruptedException {ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);System.out.println(queue.offer(1));System.out.println(queue.offer(2));System.out.println(queue.offer(3));System.out.println(queue.offer(4,2, TimeUnit.SECONDS));System.out.println("========");System.out.println(queue.poll());System.out.println(queue.poll());System.out.println(queue.poll());System.out.println(queue.poll(2, TimeUnit.SECONDS));}
}
package com.ambition.queen;import urrent.SynchronousQueue;
import urrent.TimeUnit;/*** @program: springboot-08-shiro* @description:* @author: zhengzx* @create: 2022-03-08 16:45**/public class SynchronousQueueDemo {public static void main(String[] args) {SynchronousQueue<String> queue = new SynchronousQueue();new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + " PUT A");queue.put("A");System.out.println(Thread.currentThread().getName() + " PUT B");queue.put("B");System.out.println(Thread.currentThread().getName() + " PUT C");queue.put("C");} catch (InterruptedException e) {e.printStackTrace();}},"T1").start();new Thread(() -> {try {TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName() + " TAKE " + queue.take());TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName() + " TAKE " + queue.take());TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName() + " TAKE " + queue.take());} catch (InterruptedException e) {e.printStackTrace();}},"T2").start();}
}
package com.ambition.pool;import urrent.ExecutorService;
import urrent.Executors;/*** @program: springboot-08-shiro* @description:* @author: zhengzx* @create: 2022-03-08 17:47**/public class Demo1 {public static void main(String[] args) {
// ExecutorService threadPool = wSingleThreadExecutor();//单个线程
// ExecutorService threadPool = wFixedThreadPool(3);//固定大小的线程池ExecutorService threadPool = wCachedThreadPool();//可伸缩的线程池try {for (int i = 0; i < 10; i++) {ute(() -> {System.out.println(Thread.currentThread().getName() + "OK");});}} catch (Exception e) {e.printStackTrace();} finally {threadPool.shutdown();}}
}
public ThreadPoolExecutor(int corePoolSize,//核心线程数int maximumPoolSize,//最大线程数long keepAliveTime,//线程未被使用时存活时间TimeUnit unit,//时间单位BlockingQueue<Runnable> workQueue,//阻塞队列 未获取到线程的等待队列ThreadFactory threadFactory,//线程工厂 一般不用动RejectedExecutionHandler handler//拒绝策略) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = SecurityManager() == null ?null :Context();PoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = Nanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
4. 【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样
的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors 返回的线程池对象的弊端如下: 1)FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。 2)CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
ExecutorService threadPool = new ThreadPoolExecutor(2, 5,3L, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());//等待队列满后的拒绝策略
/*** new ThreadPoolExecutor.AbortPolicy() 满员后直接抛出异常* new ThreadPoolExecutor.CallerRunsPolicy() 满员后 新来的哪个线程发起的 就丢回去让哪个线程执行* new ThreadPoolExecutor.DiscardPolicy() 满员后直接丢掉新来的 不抛出异常* new ThreadPoolExecutor.DiscardOldestPolicy() 满员后尝试与更早的线程竞争 竞争失败也不抛出异常**/
函数式接口:只有一个方法的接口
import java.util.function.Function;public class FunctionTest {public static void main(String[] args) {Function<String, String> function = new Function<String, String>() {/*** Applies this function to the given argument.** @param s the function argument* @return the function result*/@Overridepublic String apply(String s) {return s+":hhhh";}};System.out.println(function.apply("123"));Function<String, String> function2 = (a) ->{ return a + ":hhhh";};System.out.println(function2.apply("456"));}
}
import java.util.function.Predicate;public class PredicateTest {public static void main(String[] args) {
// 断定型接口 有一个入参,返回值只能是布尔值Predicate<String> predicate = new Predicate<String>() {/*** Evaluates this predicate on the given argument.** @param s the input argument* @return {@code true} if the input argument matches the predicate,* otherwise {@code false}*/@Overridepublic boolean test(String s) {return s != null && s.equals("HELLO");}};System.out.st("WORLD"));Predicate<String> predicate2 = a -> a != null && a.equals("HELLO");System.out.st("HELLO"));}
}
import java.util.function.Consumer;public class ConsumerTest {public static void main(String[] args) {
// 消费型接口 只有输入没有返回Consumer<String> consumer = new Consumer<String>() {@Overridepublic void accept(String s) {System.out.println(s);}};consumer.accept("Hello");Consumer<String> consumer2 = System.out::print;consumer2.accept("World");}
}
import java.util.function.Supplier;
public class SupplierTest {public static void main(String[] args) {
// 供给型接口 只返回 不输入Supplier<Integer> supplier = new Supplier<Integer>() {@Overridepublic Integer get() {System.out.println("get");return 1025;}};System.out.());Supplier<Integer> supplier2 = () -> {System.out.println("get");return 1024;};System.out.());}
}
起始于jdk1.7 用于并行执行任务
package com.ambition.forkjoin;import urrent.ForkJoinTask;
import urrent.RecursiveTask;public class ForkJoinDemo extends RecursiveTask<Long> {private Long start;private Long end;private static final Long MAX_NUMBER = 100000L;public ForkJoinDemo(Long start, Long end) {this.start = d = end;}/*** The main computation performed by this task.** @return the result of the computation*/@Overrideprotected Long compute() {Long result = 0L;if ((end - start) < MAX_NUMBER) {for (Long i = start;i <= end;i++) {result += i;}return result;} else {long mid = (start + end) / 2;ForkJoinDemo task1 = new ForkJoinDemo(start, mid);ForkJoinTask<Long> result1 = task1.fork();ForkJoinDemo task2 = new ForkJoinDemo(mid + 1, end);ForkJoinTask<Long> result2 = task2.fork();return result1.join() + result2.join();}}
}package com.ambition.forkjoin;import urrent.ExecutionException;
import urrent.ForkJoinPool;
import urrent.ForkJoinTask;
import java.util.stream.LongStream;public class ForkJoinTest {public static void main(String[] args) throws ExecutionException, InterruptedException {
// test01();//7470
// test02();//3966test03();//164}public static void test01() {long start = System.currentTimeMillis();Long sum = 0L;for (Long i = 0L; i < 10_0000_0000;i++) {sum += i;}long end = System.currentTimeMillis();System.out.println("sum=" + sum + ",cost:" + (end - start));}public static void test02() throws ExecutionException, InterruptedException {long start = System.currentTimeMillis();ForkJoinDemo forkJoinDemo = new ForkJoinDemo(0L,10_0000_0000L);ForkJoinPool forkJoinPool = new ForkJoinPool();ForkJoinTask<Long> submit = forkJoinPool.submit(forkJoinDemo);Long sum = submit.join();long end = System.currentTimeMillis();System.out.println("sum=" + sum + ",cost:" + (end - start));}public static void test03() throws ExecutionException, InterruptedException {long start = System.currentTimeMillis();long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0L, Long::sum);long end = System.currentTimeMillis();System.out.println("sum=" + sum + ",cost:" + (end - start));}
}
package com.ambition.future;import urrent.CompletableFuture;
import urrent.ExecutionException;
import urrent.Future;
import urrent.TimeUnit;/*** 子线程是异步执行的,主线程休眠等待子线程执行完成,子线程执行完成后唤醒主线程,主线程获取任务执行结果后退出。**/public class FutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {
// 没有返回值的异步任务/*CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(2);System.out.println(Thread.currentThread().getName() + "=>runAsync");} catch (InterruptedException e) {e.printStackTrace();}});System.out.println("=======");completableFuture.join();System.out.println("||||||");*/
// 有返回值的异步回调CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "=>supplyAsync1111");return 1024;});CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "=>supplyAsync2222");return 2048;});
// cf1 和 cf2 都执行完后执行给定任务 cf1 cf2的执行结果作为入参
/* CompletableFuture<Integer> cf3 = cf1.thenCombine(cf2, (a, b) -> {System.out.println("a=>" + a);System.out.println("b=>" + b);return a + b;});*/// 回调执行结果 t相当于 success u相当于errorcf2.whenComplete((t,u)-> {System.out.println("t=>"+t);System.out.println("u=>"+u);}).exceptionally((e)-> {System.out.Message());return 404;});cf1.whenComplete((t,u)-> {System.out.println("t=>"+t);System.out.println("u=>"+u);}).exceptionally((e)-> {System.out.Message());return 404;});}
}
本文发布于:2024-02-01 10:43:49,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170675542936063.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |