博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java并发| CountDownLatch、Semaphore和CyclicBarrier
阅读量:4181 次
发布时间:2019-05-26

本文共 17229 字,大约阅读时间需要 57 分钟。

CountDownLatch

CountDownLatch是一个计数器闭锁,通过它可以完成类似于阻塞当前线程的功能,即:一个线程或多个线程一直等待,直到其他线程执行的操作完成。当计数器值减至零时,所有因调用await()方法而处于等待状态的线程就会继续往下执行。这种现象只会出现一次,计数器不能被重置。

package com.xiaobu.JUC;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;/** * @author xiaobu * @version JDK1.8.0_171 * @date on  2019/9/20 11:27 * @description 考试场景 20个学生参加考试 一个人老师监考,只有最后一个学生交卷 老师才算任务完成 */public class CountDownLatchDemo {
private static final int count = 20; static class Teacher implements Runnable{
private CountDownLatch countDownLatch; public Teacher(){
} public Teacher(CountDownLatch countDownLatch){
this.countDownLatch = countDownLatch; } @Override public void run() {
System.out.print("老师发卷子\n"); try {
countDownLatch.await(); } catch (InterruptedException e) {
e.printStackTrace(); } System.out.print("老师收卷子\n"); } } static class Student implements Runnable{
private CountDownLatch countDownLatch; private int num; public Student(){
} public Student(CountDownLatch countDownLatch,int num){
this.countDownLatch = countDownLatch; this.num = num; } @Override public void run() {
System.out.printf("学生(%d)写卷子\n",num); System.out.printf("学生(%d)交卷子\n",num); countDownLatch.countDown(); } } public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(count); Teacher teacher = new Teacher(countDownLatch); Thread teacherThread = new Thread(teacher); teacherThread.start(); try {
//为了防止还没发卷子 学生就开始写卷子了 TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {
e.printStackTrace(); } for (int i = 0; i < count; i++) {
Student student = new Student(countDownLatch, i); Thread studentThread = new Thread(student); studentThread.start(); } }}
package com.xiaobu.note.JUC.CountDownLatch;import java.util.concurrent.CountDownLatch;/** * @author xiaobu * @version JDK1.8.0_171 * @date on  2019/2/26 16:34 * @description V1.0 */public class Boss implements Runnable {
private CountDownLatch downLatch; public Boss(CountDownLatch downLatch){
this.downLatch = downLatch; } @Override public void run() {
System.out.println("老板正在等所有的工人干完活......"); try {
this.downLatch.await(); } catch (InterruptedException e) {
} System.out.println("工人活都干完了,老板开始检查了!"); }}
package com.xiaobu.note.JUC.CountDownLatch;import java.util.Random;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;/** * @author xiaobu * @version JDK1.8.0_171 * @date on  2019/2/26 16:33 * @description V1.0 */public class Worker implements Runnable{
private CountDownLatch downLatch; private String name; public Worker(CountDownLatch downLatch, String name){
this.downLatch = downLatch; this.name = name; } @Override public void run() {
this.doWork(); try{
TimeUnit.SECONDS.sleep(new Random().nextInt(10)); }catch(InterruptedException ie){
ie.printStackTrace(); } System.out.println(this.name + "活干完了!"); this.downLatch.countDown(); } private void doWork(){
System.out.println(this.name + "正在干活!"); }}
package com.xiaobu.note.JUC.CountDownLatch;import java.util.concurrent.*;/** * @author xiaobu * @version JDK1.8.0_171 * @date on  2019/2/26 16:34 * @description V1.0 */public class CountDownLatchDemo {
public static void main(String[] args) {
//两者等价但需要显现的创建线程池这样不会出现OOM的情况 ExecutorService e = Executors.newCachedThreadPool(); ExecutorService executor=new ThreadPoolExecutor(0,5,60L, TimeUnit.SECONDS, new SynchronousQueue
()); CountDownLatch latch = new CountDownLatch(3); Worker w1 = new Worker(latch,"张三"); Worker w2 = new Worker(latch,"李四"); Worker w3 = new Worker(latch,"王二"); Boss boss = new Boss(latch); executor.execute(w3); executor.execute(w2); executor.execute(w1); executor.execute(boss); executor.shutdown(); }}

1589792154(1).jpg

CyclicBarrier

CyclicBarrier也是一个同步辅助类,它允许一组线程相互等待,直到到达某个公共屏障点(common barrier point)。通过它可以完成多个线程之间相互等待,只有当每个线程都准备就绪后,才能各自继续往下执行后面的操作。

package com.xiaobu.JUC;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;/** * @author xiaobu * @version JDK1.8.0_171 * @date on  2019/9/20 11:56 * @description * 最近景色宜人,公司组织去登山,大伙都来到了山脚下,登山过程自由进行。 * * 但为了在特定的地点拍集体照,规定1个小时后在半山腰集合,谁最后到的,要给大家表演一个节目。 * * 然后继续登山,在2个小时后,在山顶集合拍照,还是谁最后到的表演节目。 * * 接着开始下山了,在2个小时后在山脚下集合,点名回家,最后到的照例表演节目。 */@Slf4jpublic class CyclicBarrierDemo {
private static final int count = 5; private static CyclicBarrier cyclicBarrier = new CyclicBarrier(count,new Singer()); public static void main(String[] args) {
for (int i = 0; i < count; i++) {
Staff staff = new Staff(cyclicBarrier, i); Thread thread = new Thread(staff); thread.start(); } } static class Singer implements Runnable{
@Override public void run() {
System.out.println("为大家表演节目"); } } static class Staff implements Runnable{
private int num; private CyclicBarrier cyclicBarrier; public Staff(){
} public Staff(CyclicBarrier cyclicBarrier,int num){
this.cyclicBarrier = cyclicBarrier; this.num = num; } @Override public void run() {
log.info("员工[{}]开始出发....",num); log.info("员工[{}]到达目的地一....",num); try {
cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace(); } log.info("员工[{}]再次出发....",num); log.info("员工[{}]到达目的地二....",num); try {
cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace(); } log.info("员工[{}]又一次出发....",num); log.info("员工[{}]到达目的地三....",num); try {
cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace(); } log.info("员工[{}]结束行程....",num); } }}

Semaphore

Semaphore与CountDownLatch相似,不同的地方在于Semaphore的值被获取到后是可以释放的,并不像CountDownLatch那样一直减到底。它也被更多地用来限制流量,类似阀门的 功能。如果限定某些资源最多有N个线程可以访问,那么超过N个主不允许再有线程来访问,同时当现有线程结束后,就会释放,然后允许新的线程进来。

void  acquire()   从信号量获取一个许可,如果无可用许可前 将一直阻塞等待,

void acquire(int permits)  获取指定数目的许可,如果无可用许可前  也将会一直阻塞等待

boolean tryAcquire()   从信号量尝试获取一个许可,如果无可用许可,直接返回false,不会阻塞

boolean tryAcquire(int permits)   尝试获取指定数目的许可,如果无可用许可直接返回false,

boolean tryAcquire(int permits, long timeout, TimeUnit unit)   在指定的时间内尝试从信号量中获取许可,如果在指定的时间内获取成功,返回true,否则返回false

void release()  释放一个许可,别忘了在finally中使用,注意:多次调用该方法,会使信号量的许可数增加,达到动态扩展的效果,如:初始permits 为1, 调用了两次release,最大许可会改变为2

int availablePermits() 获取当前信号量可用的许可

package com.xiaobu.JUC;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;/** * @author xiaobu * @version JDK1.8.0_171 * @date on  2019/7/3 11:19 * @description */@Slf4jpublic class SemaphoreDemo2 {
private final static int threadCount = 5; public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool(); // 每次最多三个线程获取许可 final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) {
final int threadNum = i; exec.execute(() -> {
try {
semaphore.acquire(); // 获取一个许可 test(threadNum); //把release注释掉可以看出只执行三个就不能通过了 semaphore.release(); // 释放一个许可 System.out.println("semaphore.availablePermits() = " + semaphore.availablePermits()); } catch (Exception e) {
log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception {
log.info("{}", threadNum); Thread.sleep(2000); }}

SemaphoreDemo2.jpg

release()可以使信号量的许可数增加,达到动态扩展的效果

package com.xiaobu.JUC;import java.util.concurrent.Semaphore;/** * @author xiaobu * @version JDK1.8.0_171 * @date on  2019/9/27 10:18 * @description release()可以使信号量的许可数增加,达到动态扩展的效果 */public class SemaphoreDemo3 {
private static Semaphore semaphore = new Semaphore(1); public static void main(String[] args) {
try {
System.out.println("获取前有" + semaphore.availablePermits() + "个可用许可"); semaphore.acquire(); System.out.println("获取后有" + semaphore.availablePermits() + "个可用许可"); for (int i = 0; i < 3; i++) {
semaphore.release(); System.out.println("释放"+i+"个许可后,还有"+semaphore.availablePermits()+"个可用许可"); } for (int i = 0; i < 3; i++) {
semaphore.acquire(); System.out.println("获取第"+i+"个许可,还有"+semaphore.availablePermits()+"可用许可"); } } catch (InterruptedException e) {
e.printStackTrace(); } }}

1571018889(1).jpg

package com.xiaobu.learn.concurrent;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.concurrent.BasicThreadFactory;import java.util.concurrent.*;/** * @author xiaobu * @version JDK1.8.0_171 * @date on  2019/7/3 11:19 * @description */@Slf4jpublic class SemaphoreDemo {
private final static int threadCount = 5; public static void main(String[] args) throws Exception {
ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("thread-pool-%d").daemon(true).build(); ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 30, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), threadFactory); final Semaphore semaphore = new Semaphore(2); for (int i = 0; i < threadCount; i++) {
final int threadNum = i; executor.execute(() -> {
try {
if (semaphore.tryAcquire(1,TimeUnit.MILLISECONDS)) {
// 尝试获取一个许可 log.info("[{}]获取到许可,threadNum:[{}]",Thread.currentThread().getName(),threadNum); test(threadNum); }else {
log.info("[{}]没有获得许可,threadNum:[{}]",Thread.currentThread().getName(),threadNum); } } catch (Exception e) {
log.error("exception", e); }finally {
log.info(Thread.currentThread().getName()+"释放前semaphore.availablePermits() = {}个" , semaphore.availablePermits()); semaphore.release(); // 释放一个许可 log.info(Thread.currentThread().getName()+"释放后semaphore.availablePermits() ={}个" , semaphore.availablePermits()); } }); } //确保所有的线程都执行完 TimeUnit.SECONDS.sleep(5); executor.shutdown(); log.info("任务结束"); } private static void test(int threadNum) throws Exception {
System.out.println("test 中的threadNum:"+threadNum); Thread.sleep(1000); }}
package com.xiaobu.leetcode;import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.BlockingQueue;import java.util.concurrent.CountDownLatch;import java.util.concurrent.Semaphore;import java.util.concurrent.SynchronousQueue;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/** * @author xiaobu * @version JDK1.8.0_171 * @date on  2021/1/28 17:23 * @description 实现排序输出  按first second third来 */@Slf4jpublic class OrderSort {
private static CountDownLatch secondCountDownLatch = new CountDownLatch(1); private static CountDownLatch thirdCountDownLatch = new CountDownLatch(1); private static Semaphore semaphore1 = new Semaphore(0); private static Semaphore semaphore2 = new Semaphore(0); private static BlockingQueue
blockingQueue12 = new SynchronousQueue<>(); private static BlockingQueue
blockingQueue23 = new SynchronousQueue<>(); private static Lock lock = new ReentrantLock(); private static int num=1; private static Condition condition1 = lock.newCondition(); private static Condition condition2 = lock.newCondition(); private static Condition condition3 = lock.newCondition(); private static void first() {
System.out.println(" first"); } private static void second() {
System.out.println(" second"); } private static void third() {
System.out.println(" third"); } private static void test() {
new Thread(new Runnable() {
@Override public void run() {
first(); secondCountDownLatch.countDown(); } }).start(); new Thread(new Runnable() {
@SneakyThrows @Override public void run() {
secondCountDownLatch.await(); second(); thirdCountDownLatch.countDown(); } }).start(); new Thread(new Runnable() {
@SneakyThrows @Override public void run() {
thirdCountDownLatch.await(); third(); } }).start(); } public static void main(String[] args) {
//test(); //semaphoreTest(); //blockingQueueTest(); lockTest(); } private static void semaphoreTest() {
new Thread(new Runnable() {
@SneakyThrows @Override public void run() {
first(); semaphore1.release(); } }).start(); new Thread(new Runnable() {
@SneakyThrows @Override public void run() {
semaphore1.acquire(); second(); semaphore2.release(); } }).start(); new Thread(new Runnable() {
@SneakyThrows @Override public void run() {
semaphore2.acquire(); third(); } }).start(); } private static void blockingQueueTest() {
new Thread(new Runnable() {
@SneakyThrows @Override public void run() {
first(); blockingQueue12.put("stop"); } }).start(); new Thread(new Runnable() {
@SneakyThrows @Override public void run() {
blockingQueue12.take(); second(); blockingQueue23.put("stop"); } }).start(); new Thread(new Runnable() {
@SneakyThrows @Override public void run() {
blockingQueue23.take(); third(); } }).start(); } private static void lockTest() {
new Thread(new Runnable() {
@Override public void run() {
lock.lock(); try {
while (num != 1) {
condition1.await(); } first(); num=2; condition2.signal(); } catch (InterruptedException e) {
e.printStackTrace(); } finally {
lock.unlock(); } } }).start(); new Thread(new Runnable() {
@Override public void run() {
lock.lock(); try {
while (num != 2) {
condition2.await(); } second(); num=3; condition3.signal(); } catch (InterruptedException e) {
e.printStackTrace(); } finally {
lock.unlock(); } } }).start(); new Thread(new Runnable() {
@SneakyThrows @Override public void run() {
lock.lock(); try {
while (num != 3) {
condition3.await(); } third(); } catch (InterruptedException e) {
e.printStackTrace(); } finally {
lock.unlock(); } } }).start(); }}

1569570736(1).jpg

转载地址:http://qgrai.baihongyu.com/

你可能感兴趣的文章
暴力搜索内存空间获得API的线性地址
查看>>
CTF编码
查看>>
万能密码原理和总结
查看>>
缓冲区溢出学习
查看>>
Excel高级使用技巧
查看>>
速算,以后留着教孩子
查看>>
让你变成ps高手
查看>>
在可执行jar中动态载入第三方jar(转贴)
查看>>
考虑体积重量的01背包问题—基于遗传算法
查看>>
K-means 聚类算法
查看>>
带约束的K-means聚类算法
查看>>
约束优化方法
查看>>
VRPTW建模与求解—基于粒子群算法
查看>>
数据结构与算法(1):大O表示法
查看>>
Java学习知识树
查看>>
文科生,你为啥学编程?
查看>>
使用Eclipse时出现Unhandled event loop exception错误的有效解决办法
查看>>
JAVA之路:第一章 JAVA入门初体验
查看>>
菜鸟文科生的java之路:运算符
查看>>
菜鸟文科生的java之路:变量和常量
查看>>