本文共 17229 字,大约阅读时间需要 57 分钟。
CountDownLatch是一个计数器闭锁,通过它可以完成类似于阻塞当前线程的功能,即:一个线程或多个线程一直等待,直到其他线程执行的操作完成。当计数器值减至零时,所有因调用await()方法而处于等待状态的线程就会继续往下执行。这种现象只会出现一次,计数器不能被重置。
package com.xiaobu.JUC;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;/** * @author xiaobu * @version JDK1.8.0_171 * @date on 2019/9/20 11:27 * @description 考试场景 20个学生参加考试 一个人老师监考,只有最后一个学生交卷 老师才算任务完成 */public class CountDownLatchDemo { private static final int count = 20; static class Teacher implements Runnable{ private CountDownLatch countDownLatch; public Teacher(){ } public Teacher(CountDownLatch countDownLatch){ this.countDownLatch = countDownLatch; } @Override public void run() { System.out.print("老师发卷子\n"); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.print("老师收卷子\n"); } } static class Student implements Runnable{ private CountDownLatch countDownLatch; private int num; public Student(){ } public Student(CountDownLatch countDownLatch,int num){ this.countDownLatch = countDownLatch; this.num = num; } @Override public void run() { System.out.printf("学生(%d)写卷子\n",num); System.out.printf("学生(%d)交卷子\n",num); countDownLatch.countDown(); } } public static void main(String[] args) { CountDownLatch countDownLatch = new CountDownLatch(count); Teacher teacher = new Teacher(countDownLatch); Thread teacherThread = new Thread(teacher); teacherThread.start(); try { //为了防止还没发卷子 学生就开始写卷子了 TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0; i < count; i++) { Student student = new Student(countDownLatch, i); Thread studentThread = new Thread(student); studentThread.start(); } }}
package com.xiaobu.note.JUC.CountDownLatch;import java.util.concurrent.CountDownLatch;/** * @author xiaobu * @version JDK1.8.0_171 * @date on 2019/2/26 16:34 * @description V1.0 */public class Boss implements Runnable { private CountDownLatch downLatch; public Boss(CountDownLatch downLatch){ this.downLatch = downLatch; } @Override public void run() { System.out.println("老板正在等所有的工人干完活......"); try { this.downLatch.await(); } catch (InterruptedException e) { } System.out.println("工人活都干完了,老板开始检查了!"); }}
package com.xiaobu.note.JUC.CountDownLatch;import java.util.Random;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;/** * @author xiaobu * @version JDK1.8.0_171 * @date on 2019/2/26 16:33 * @description V1.0 */public class Worker implements Runnable{ private CountDownLatch downLatch; private String name; public Worker(CountDownLatch downLatch, String name){ this.downLatch = downLatch; this.name = name; } @Override public void run() { this.doWork(); try{ TimeUnit.SECONDS.sleep(new Random().nextInt(10)); }catch(InterruptedException ie){ ie.printStackTrace(); } System.out.println(this.name + "活干完了!"); this.downLatch.countDown(); } private void doWork(){ System.out.println(this.name + "正在干活!"); }}
package com.xiaobu.note.JUC.CountDownLatch;import java.util.concurrent.*;/** * @author xiaobu * @version JDK1.8.0_171 * @date on 2019/2/26 16:34 * @description V1.0 */public class CountDownLatchDemo { public static void main(String[] args) { //两者等价但需要显现的创建线程池这样不会出现OOM的情况 ExecutorService e = Executors.newCachedThreadPool(); ExecutorService executor=new ThreadPoolExecutor(0,5,60L, TimeUnit.SECONDS, new SynchronousQueue()); CountDownLatch latch = new CountDownLatch(3); Worker w1 = new Worker(latch,"张三"); Worker w2 = new Worker(latch,"李四"); Worker w3 = new Worker(latch,"王二"); Boss boss = new Boss(latch); executor.execute(w3); executor.execute(w2); executor.execute(w1); executor.execute(boss); executor.shutdown(); }}
CyclicBarrier也是一个同步辅助类,它允许一组线程相互等待,直到到达某个公共屏障点(common barrier point)。通过它可以完成多个线程之间相互等待,只有当每个线程都准备就绪后,才能各自继续往下执行后面的操作。
package com.xiaobu.JUC;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;/** * @author xiaobu * @version JDK1.8.0_171 * @date on 2019/9/20 11:56 * @description * 最近景色宜人,公司组织去登山,大伙都来到了山脚下,登山过程自由进行。 * * 但为了在特定的地点拍集体照,规定1个小时后在半山腰集合,谁最后到的,要给大家表演一个节目。 * * 然后继续登山,在2个小时后,在山顶集合拍照,还是谁最后到的表演节目。 * * 接着开始下山了,在2个小时后在山脚下集合,点名回家,最后到的照例表演节目。 */@Slf4jpublic class CyclicBarrierDemo { private static final int count = 5; private static CyclicBarrier cyclicBarrier = new CyclicBarrier(count,new Singer()); public static void main(String[] args) { for (int i = 0; i < count; i++) { Staff staff = new Staff(cyclicBarrier, i); Thread thread = new Thread(staff); thread.start(); } } static class Singer implements Runnable{ @Override public void run() { System.out.println("为大家表演节目"); } } static class Staff implements Runnable{ private int num; private CyclicBarrier cyclicBarrier; public Staff(){ } public Staff(CyclicBarrier cyclicBarrier,int num){ this.cyclicBarrier = cyclicBarrier; this.num = num; } @Override public void run() { log.info("员工[{}]开始出发....",num); log.info("员工[{}]到达目的地一....",num); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } log.info("员工[{}]再次出发....",num); log.info("员工[{}]到达目的地二....",num); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } log.info("员工[{}]又一次出发....",num); log.info("员工[{}]到达目的地三....",num); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } log.info("员工[{}]结束行程....",num); } }}
Semaphore与CountDownLatch相似,不同的地方在于Semaphore的值被获取到后是可以释放的,并不像CountDownLatch那样一直减到底。它也被更多地用来限制流量,类似阀门的 功能。如果限定某些资源最多有N个线程可以访问,那么超过N个主不允许再有线程来访问,同时当现有线程结束后,就会释放,然后允许新的线程进来。
void acquire() 从信号量获取一个许可,如果无可用许可前 将一直阻塞等待,
void acquire(int permits) 获取指定数目的许可,如果无可用许可前 也将会一直阻塞等待
boolean tryAcquire() 从信号量尝试获取一个许可,如果无可用许可,直接返回false,不会阻塞
boolean tryAcquire(int permits) 尝试获取指定数目的许可,如果无可用许可直接返回false,
boolean tryAcquire(int permits, long timeout, TimeUnit unit) 在指定的时间内尝试从信号量中获取许可,如果在指定的时间内获取成功,返回true,否则返回false
void release() 释放一个许可,别忘了在finally中使用,注意:多次调用该方法,会使信号量的许可数增加,达到动态扩展的效果,如:初始permits 为1, 调用了两次release,最大许可会改变为2
int availablePermits() 获取当前信号量可用的许可
package com.xiaobu.JUC;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;/** * @author xiaobu * @version JDK1.8.0_171 * @date on 2019/7/3 11:19 * @description */@Slf4jpublic class SemaphoreDemo2 { private final static int threadCount = 5; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); // 每次最多三个线程获取许可 final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { semaphore.acquire(); // 获取一个许可 test(threadNum); //把release注释掉可以看出只执行三个就不能通过了 semaphore.release(); // 释放一个许可 System.out.println("semaphore.availablePermits() = " + semaphore.availablePermits()); } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(2000); }}
release()可以使信号量的许可数增加,达到动态扩展的效果
package com.xiaobu.JUC;import java.util.concurrent.Semaphore;/** * @author xiaobu * @version JDK1.8.0_171 * @date on 2019/9/27 10:18 * @description release()可以使信号量的许可数增加,达到动态扩展的效果 */public class SemaphoreDemo3 { private static Semaphore semaphore = new Semaphore(1); public static void main(String[] args) { try { System.out.println("获取前有" + semaphore.availablePermits() + "个可用许可"); semaphore.acquire(); System.out.println("获取后有" + semaphore.availablePermits() + "个可用许可"); for (int i = 0; i < 3; i++) { semaphore.release(); System.out.println("释放"+i+"个许可后,还有"+semaphore.availablePermits()+"个可用许可"); } for (int i = 0; i < 3; i++) { semaphore.acquire(); System.out.println("获取第"+i+"个许可,还有"+semaphore.availablePermits()+"可用许可"); } } catch (InterruptedException e) { e.printStackTrace(); } }}
package com.xiaobu.learn.concurrent;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.concurrent.BasicThreadFactory;import java.util.concurrent.*;/** * @author xiaobu * @version JDK1.8.0_171 * @date on 2019/7/3 11:19 * @description */@Slf4jpublic class SemaphoreDemo { private final static int threadCount = 5; public static void main(String[] args) throws Exception { ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("thread-pool-%d").daemon(true).build(); ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 30, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), threadFactory); final Semaphore semaphore = new Semaphore(2); for (int i = 0; i < threadCount; i++) { final int threadNum = i; executor.execute(() -> { try { if (semaphore.tryAcquire(1,TimeUnit.MILLISECONDS)) { // 尝试获取一个许可 log.info("[{}]获取到许可,threadNum:[{}]",Thread.currentThread().getName(),threadNum); test(threadNum); }else { log.info("[{}]没有获得许可,threadNum:[{}]",Thread.currentThread().getName(),threadNum); } } catch (Exception e) { log.error("exception", e); }finally { log.info(Thread.currentThread().getName()+"释放前semaphore.availablePermits() = {}个" , semaphore.availablePermits()); semaphore.release(); // 释放一个许可 log.info(Thread.currentThread().getName()+"释放后semaphore.availablePermits() ={}个" , semaphore.availablePermits()); } }); } //确保所有的线程都执行完 TimeUnit.SECONDS.sleep(5); executor.shutdown(); log.info("任务结束"); } private static void test(int threadNum) throws Exception { System.out.println("test 中的threadNum:"+threadNum); Thread.sleep(1000); }}
package com.xiaobu.leetcode;import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.BlockingQueue;import java.util.concurrent.CountDownLatch;import java.util.concurrent.Semaphore;import java.util.concurrent.SynchronousQueue;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/** * @author xiaobu * @version JDK1.8.0_171 * @date on 2021/1/28 17:23 * @description 实现排序输出 按first second third来 */@Slf4jpublic class OrderSort { private static CountDownLatch secondCountDownLatch = new CountDownLatch(1); private static CountDownLatch thirdCountDownLatch = new CountDownLatch(1); private static Semaphore semaphore1 = new Semaphore(0); private static Semaphore semaphore2 = new Semaphore(0); private static BlockingQueueblockingQueue12 = new SynchronousQueue<>(); private static BlockingQueue blockingQueue23 = new SynchronousQueue<>(); private static Lock lock = new ReentrantLock(); private static int num=1; private static Condition condition1 = lock.newCondition(); private static Condition condition2 = lock.newCondition(); private static Condition condition3 = lock.newCondition(); private static void first() { System.out.println(" first"); } private static void second() { System.out.println(" second"); } private static void third() { System.out.println(" third"); } private static void test() { new Thread(new Runnable() { @Override public void run() { first(); secondCountDownLatch.countDown(); } }).start(); new Thread(new Runnable() { @SneakyThrows @Override public void run() { secondCountDownLatch.await(); second(); thirdCountDownLatch.countDown(); } }).start(); new Thread(new Runnable() { @SneakyThrows @Override public void run() { thirdCountDownLatch.await(); third(); } }).start(); } public static void main(String[] args) { //test(); //semaphoreTest(); //blockingQueueTest(); lockTest(); } private static void semaphoreTest() { new Thread(new Runnable() { @SneakyThrows @Override public void run() { first(); semaphore1.release(); } }).start(); new Thread(new Runnable() { @SneakyThrows @Override public void run() { semaphore1.acquire(); second(); semaphore2.release(); } }).start(); new Thread(new Runnable() { @SneakyThrows @Override public void run() { semaphore2.acquire(); third(); } }).start(); } private static void blockingQueueTest() { new Thread(new Runnable() { @SneakyThrows @Override public void run() { first(); blockingQueue12.put("stop"); } }).start(); new Thread(new Runnable() { @SneakyThrows @Override public void run() { blockingQueue12.take(); second(); blockingQueue23.put("stop"); } }).start(); new Thread(new Runnable() { @SneakyThrows @Override public void run() { blockingQueue23.take(); third(); } }).start(); } private static void lockTest() { new Thread(new Runnable() { @Override public void run() { lock.lock(); try { while (num != 1) { condition1.await(); } first(); num=2; condition2.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }).start(); new Thread(new Runnable() { @Override public void run() { lock.lock(); try { while (num != 2) { condition2.await(); } second(); num=3; condition3.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }).start(); new Thread(new Runnable() { @SneakyThrows @Override public void run() { lock.lock(); try { while (num != 3) { condition3.await(); } third(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }).start(); }}
转载地址:http://qgrai.baihongyu.com/