安徽seo网站推广外贸网站推广如何做
- 作者: 五速梦信息网
- 时间: 2026年03月21日 10:01
当前位置: 首页 > news >正文
安徽seo网站推广,外贸网站推广如何做,pc端网站营销,高端网站建设公司哪家服务好大纲
1.等待多线程完成的CountDownLatch介绍
2.CountDownLatch.await()方法源码
3.CountDownLatch.coutDown()方法源码
4.CountDownLatch总结
5.控制并发线程数的Semaphore介绍
6.Semaphore的令牌获取过程
7.Semaphore的令牌释放过程
8.同步屏障CyclicBarrier介绍
9.C…大纲
1.等待多线程完成的CountDownLatch介绍
2.CountDownLatch.await()方法源码
3.CountDownLatch.coutDown()方法源码
4.CountDownLatch总结
5.控制并发线程数的Semaphore介绍
6.Semaphore的令牌获取过程
7.Semaphore的令牌释放过程
8.同步屏障CyclicBarrier介绍
9.CyclicBarrier的await()方法源码
10.使用CountDownLatch等待注册的完成
11.使用CyclicBarrier将工作任务多线程分而治之
12.使用CyclicBarrier聚合服务接口的返回结果
13.使用Semaphore等待指定数量线程完成任务 volatile、synchronized、CAS、AQS、读写锁、锁优化和锁故障、并发集合、线程池、同步组件 1.等待多线程完成的CountDownLatch
(1)CountDownLatch的简介
(2)CountDownLatch的应用
(3)CountDownLatch的例子 (1)CountDownLatch的简介
CountDownLatch允许一个或多个线程等待其他线程完成操作。CountDownLatch提供了两个核心方法分别是await()方法和countDown()方法。CountDownLatch.await()方法让调用线程进行阻塞进入等待状态CountDownLatch.countDown()方法用于对计数器进行递减。 CountDownLatch在构造时需要传入一个正整数作为计数器初始值。线程每调用一次countDown()方法都会对该计数器减一。当计数器为0时会唤醒所有执行await()方法时被阻塞的线程。 (2)CountDownLatch的应用
应用一
使用多线程去解析一个Excel里多个sheet的数据每个线程解析一个sheet里的数据等所有sheet解析完再提示处理完成。此时便可以使用CountDownLatch来实现当然可以使用Thread.join()方法。 注意Thread.join()方法是基于wait()和notify()来实现的。在main线程里开启一个线程Amain线程如果执行了线程A的join()方法那么就会导致main线程被阻塞main线程会等待线程A执行完毕才会继续往下执行。 应用二
微服务注册中心的register-client为了在注册线程执行成功后才发送心跳。可以使用CountDownLatch当然也可以使用Thread.join()方法。 应用三
可以通过CountDownLatch实现类似并发的效果。把CountDownLatch的计数器设置为1然后让1000个线程调用await()方法。当1000个线程初始化完成后在main线程调用countDown()让计数器归零。这样这1000个线程就会在一个for()循环中依次被唤醒。 (3)CountDownLatch的例子
public class CountDownLatchDemo {public static void main(String[] args) throws Exception {final CountDownLatch latch new CountDownLatch(2);new Thread() {public void run() {try {Thread.sleep(1000);System.out.println(线程1开始执行休眠2秒…);Thread.sleep(1000);System.out.println(线程1准备执行countDown操作…);latch.countDown();System.out.println(线程1完成执行countDown操作…);} catch (Exception e) {e.printStackTrace();}}}.start();new Thread() {public void run() {try {Thread.sleep(1000);System.out.println(线程2开始执行休眠2秒…);Thread.sleep(1000);System.out.println(线程2准备执行countDown操作…);latch.countDown();System.out.println(线程2完成执行countDown操作…);} catch (Exception e) {e.printStackTrace();}}}.start();System.out.println(main线程准备执行countDownLatch的await操作将会同步阻塞等待…);latch.await();System.out.println(所有线程都完成countDown操作结束同步阻塞等待…);}
} 2.CountDownLatch.await()方法源码
(1)CountDownLatch.await()方法的阻塞流程
(2)CountDownLatch.await()方法的唤醒流程
(3)CountDownLatch.await()方法的阻塞总结 (1)CountDownLatch.await()方法的阻塞流程
CountDownLatch是基于AQS中的共享锁来实现的。从CountDownLatch的构造方法可知CountDownLatch的count就是AQS的state。 调用CountDownLatch的await()方法时会先调用AQS的acquireSharedInterruptibly()模版方法然后会调用CountDownLatch的内部类Sync实现的tryAcquireShared()方法。tryAcquireShared()方法会判断state的值是否为0如果为0才返回1否则返回-1。 当调用CountDownLatch内部类Sync的tryAcquireShared()方法获得的返回值是-1时才会调用AQS的doAcquireSharedInterruptibly()方法将当前线程封装成Node结点加入等待队列然后挂起当前线程进行阻塞。
//A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
public class CountDownLatch {private final Sync sync;public CountDownLatch(int count) {if (count 0) {throw new IllegalArgumentException(count 0);}this.sync new Sync(count);}//Synchronization control For CountDownLatch.//Uses AQS state to represent count.private static final class Sync extends AbstractQueuedSynchronizer {Sync(int count) {setState(count);}int getCount() {return getState();}protected int tryAcquireShared(int acquires) {return (getState() 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {//Decrement count; signal when transition to zerofor (;;) {int c getState();if (c 0) {return false;}int nextc c-1;if (compareAndSetState(c, nextc)) {return nextc 0;}}}}//Causes the current thread to wait until the latch has counted down to zero, //unless the thread is Thread#interrupt interrupted.public void await() throws InterruptedException {//执行AQS的acquireSharedInterruptibly()方法sync.acquireSharedInterruptibly(1);}…
}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {…//Acquires in shared mode, aborting if interrupted.//Implemented by first checking interrupt status, then invoking at least once #tryAcquireShared, returning on success.//Otherwise the thread is queued, possibly repeatedly blocking and unblocking,//invoking #tryAcquireShared until success or the thread is interrupted.public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException();}//执行CountDownLatch的内部类Sync实现的tryAcquireShared()方法抢占共享锁if (tryAcquireShared(arg) 0) {//执行AQS的doAcquireSharedInterruptibly()方法doAcquireSharedInterruptibly(arg);}}//Acquires in shared interruptible mode.private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {final Node node addWaiter(Node.SHARED);//封装当前线程为Shared类型的Node结点boolean failed true;try {//第一次循环r -1所以会执行AQS的shouldParkAfterFailedAcquire()方法//将node结点的有效前驱结点的状态设置为SIGNALfor (;;) {final Node p node.predecessor();//node结点的前驱结点if (p head) {int r tryAcquireShared(arg);if (r 0) {setHeadAndPropagate(node, r);p.next null; // help GCfailed false;return;}}//执行shouldParkAfterFailedAcquire()方法设置node结点的前驱结点的状态为SIGNAL//执行parkAndCheckInterrupt()方法挂起当前线程if (shouldParkAfterFailedAcquire(p, node) parkAndCheckInterrupt()) {throw new InterruptedException();}}} finally {if (failed) {cancelAcquire(node);}}}//Checks and updates status for a node that failed to acquire.//Returns true if thread should block. This is the main signal control in all acquire loops.private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws pred.waitStatus;if (ws Node.SIGNAL) {//This node has already set status asking a release to signal it, so it can safely park.return true;}if (ws 0) {//Predecessor was cancelled. Skip over predecessors and indicate retry.do {node.prev pred pred.prev;} while (pred.waitStatus 0);pred.next node;} else {//waitStatus must be 0 or PROPAGATE. //Indicate that we need a signal, but dont park yet. //Caller will need to retry to make sure it cannot acquire before parking.compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}//设置头结点和唤醒后续线程//Sets head of queue, and checks if successor may be waiting in shared mode, //if so propagating if either propagate 0 or PROPAGATE status was set.private void setHeadAndPropagate(Node node, int propagate) {Node h head;setHead(node);//将node结点设置为头结点if (propagate 0 || h null || h.waitStatus 0 || (h head) null || h.waitStatus 0) {Node s node.next;if (s null || s.isShared()) {doReleaseShared();}}}private void setHead(Node node) {head node;node.thread null;node.prev null;}…
}
(2)CountDownLatch.await()方法的唤醒流程
调用await()方法时首先会将当前线程封装成Node结点并添加到等待队列中然后在执行第一次for循环时会设置该Node结点的前驱结点状态为SIGNAL接着在执行第二次for循环时才会将当前线程进行挂起阻塞。 当该线程后续被唤醒时该线程又会进入下一次for循环。如果该线程对应的node结点的前驱结点是等待队列的头结点且state值已为0那么就执行AQS的setHeadAndPropagate()方法设置头结点 唤醒后续线程。 其中setHeadAndPropagate()方法有两个工作(设置头结点 唤醒传递)
工作一设置当前被唤醒线程对应的结点为头结点
工作二当满足如下这两个条件的时候需要调用doReleaseShared()方法唤醒后续的线程
条件一propagate 0表示当前是共享锁需要进行唤醒传递
条件二s.isShared()判断当前结点为共享模式 CountDownLatch的实现中会在以下两个场景调用doReleaseShared()方法
场景一state为1时调用的countDown()方法会调用doReleaseShared()方法
场景二当阻塞的线程被唤醒时会调用setHeadAndPropagate()方法进而调用doReleaseShared()方法这样可以提升唤醒共享结点的速度 (3)CountDownLatch.await()方法的阻塞总结
只要state ! 0就会进行如下处理
一.将当前线程封装成一个Node结点然后添加到AQS的等待队列中
二.调用LockSupport.park()方法挂起当前线程 3.CountDownLatch.coutDown()方法源码
(1)CountDownLatch.coutDown()的唤醒流程
(2)CountDownLatch.tryReleaseShared()
(3)AQS的doReleaseShared()方法 (1)CountDownLatch.coutDown()的唤醒流程
调用CountDownLatch的countDown()方法时会先调用AQS的releaseShared()模版方法然后会执行CountDownLatch的内部类Sync实现的tryReleaseShared()方法。 如果tryReleaseShared()方法返回true则执行AQS的doReleaseShared()方法通过AQS的doReleaseShared()方法唤醒共享锁模式下的等待队列中的线程。
//A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
public class CountDownLatch {private final Sync sync;public CountDownLatch(int count) {if (count 0) {throw new IllegalArgumentException(count 0);}this.sync new Sync(count);}//Synchronization control For CountDownLatch.//Uses AQS state to represent count.private static final class Sync extends AbstractQueuedSynchronizer {Sync(int count) {setState(count);}int getCount() {return getState();}protected int tryAcquireShared(int acquires) {return (getState() 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {//Decrement count; signal when transition to zerofor (;;) {int c getState();if (c 0) {return false;}int nextc c-1;if (compareAndSetState(c, nextc)) {return nextc 0;}}}}//Decrements the count of the latch, releasing all waiting threads if the count reaches zero.public void countDown() {//执行AQS的releaseShared()方法sync.releaseShared(1);}…
}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {…//Releases in shared mode. //Implemented by unblocking one or more threads if #tryReleaseShared returns true.public final boolean releaseShared(int arg) {//执行CountDownLatch的内部类Sync实现的tryReleaseShared()方法释放共享锁if (tryReleaseShared(arg)) {//执行AQS的doReleaseShared()方法doReleaseShared();return true;}return false;}//Release action for shared mode – signals successor and ensures propagation. //Note: For exclusive mode, release just amounts to calling unparkSuccessor of head if it needs signal.private void doReleaseShared() {for (;;) {//每次循环时头结点都会发生变化//因为调用unparkSuccessor()方法会唤醒doAcquireSharedInterruptibly()方法中阻塞的线程//然后阻塞的线程会在执行setHeadAndPropagate()方法时通过setHead()修改头结点Node h head;//获取最新的头结点if (h ! null h ! tail) {//等待队列中存在挂起线程的结点int ws h.waitStatus;if (ws Node.SIGNAL) {//头结点的状态正常表示对应的线程可以被唤醒if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {continue;//loop to recheck cases}//唤醒头结点的后继结点//唤醒的线程会在doAcquireSharedInterruptibly()方法中执行setHeadAndPropagate()方法修改头结点unparkSuccessor(h);} else if (ws 0 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {//如果ws 0表示初始状态则修改结点为PROPAGATE状态continue;//loop on failed CAS}}if (h head) {//判断头结点是否有变化break;//loop if head changed}}}//Wakes up nodes successor, if one exists.private void unparkSuccessor(Node node) {int ws node.waitStatus;if (ws 0) {compareAndSetWaitStatus(node, ws, 0);}Node s node.next;if (s null || s.waitStatus 0) {s null;for (Node t tail; t ! null t ! node; t t.prev) {if (t.waitStatus 0) {s t;}}}if (s ! null) {LockSupport.unpark(s.thread);}}…
}
(2)CountDownLatch.tryReleaseShared()
从tryReleaseShared()方法可知每次countDown()其实就是把AQS的state值减1然后通过CAS更新state值。如果CAS设置成功那么就判断当前state值是否为0。如果是0那么就返回true如果不是0那么就返回false。返回true的时候会调用AQS的doReleaseShared()方法唤醒等待队列中的线程。 (3)AQS的doReleaseShared()方法
该方法要从AQS的等待队列中唤醒头结点的后继结点需要满足
条件一等待队列中要存在挂起线程的结点(h ! null h ! tail)
条件二等待队列的头结点的状态正常(h.waitStatus Node.SIGNAL) 在共享锁模式下state为0时需要通过唤醒传递把所有挂起的线程都唤醒。首先doReleaseShared()方法会通过for(;;)进行自旋操作每次循环都会通过Node h head来获取等待队列中最新的头结点然后通过if (h head)来判断等待队列中的头结点是否发生变化。如果没有变化则退出自旋。 注意在共享锁模式下被unparkSuccessor()唤醒的等待队列中的线程会继续在在doAcquireSharedInterruptibly()方法中执行setHeadAndPropagate()方法修改头结点从而实现唤醒传递。 4.CountDownLatch总结
假设有两个线程A和B分别调用了CountDownLatch的await()方法此时state所表示的计数器不为0。所以线程A和B会被封装成SHARED类型的结点并添加到AQS的等待队列中。 当线程C调用CountDownLatch的coutDown()方法后如果state被递减到0那么就会调用doReleaseShared()方法唤醒等待队列中的线程。然后被唤醒的线程会继续调用setHeadAndPropagate()方法实现唤醒传递从而继续在doReleaseShared()方法中唤醒所有在等待队列中的被阻塞的线程。 5.控制并发线程数的Semaphore介绍
(1)Semaphore的作用
(2)Semaphore的方法
(3)Semaphore原理分析 (1)Semaphore的作用
Semaphore信号量用来控制同时访问特定资源的线程数量有两核心方法。
方法一acquire()方法获取一个令牌
方法二release()方法释放一个令牌 多个线程访问某限制访问流量的资源时可先调用acquire()获取访问令牌。如果能够正常获得则表示允许访问。如果令牌不够则会阻塞当前线程。当某个获得令牌的线程通过release()方法释放一个令牌后被阻塞在acquire()方法的线程就有机会获得这个释放的令牌。
public class SemaphoreDemo {public static void main(String[] args) throws InterruptedException {Semaphore semaphore new Semaphore(10, true);//初始化10个资源使用公平锁 semaphore.acquire();//每次获取一个资源如果获取不到线程就会阻塞semaphore.release();//释放一个资源}
}
(2)Semaphore的方法
Semaphore实际上并没有一个真实的令牌发给线程Semaphore只是对一个可分配数量进行计数维护或者说进行许可证管理。Semaphore可以在公共资源有限的场景下实现流量控制如数据库连接。
一.Semaphore(permits, fair)permits表示令牌数fair表示公平性
二.acquire(permits)获取指定数量的令牌如果数量不足则阻塞当前线程
三.tryAcquire(permits)尝试获取指定数量的令牌此过程是非阻塞的成功返回true失败返回false
四.release(permits)释放指定数量的令牌
五.drainPermits()当前线程获得剩下的所有令牌
六.hasQueuedThread()判断当前Semaphore实例上是否存在等待令牌的线程
(3)Semaphore原理分析
Semaphore也是基于AQS中的共享锁来实现的。在创建Semaphore实例时传递的参数permits其实就是AQS中的state属性。每次调用Semaphore的acquire()方法都会对state值进行递减。 所以从根本上说Semaphore是通过重写AQS的两个方法来实现的
方法一tryAcquireShared()抢占共享锁
方法二tryReleaseShared()释放共享锁
public class Semaphore implements java.io.Serializable {private final Sync sync;//Creates a Semaphore with the given number of permits and nonfair fairness setting.public Semaphore(int permits) {sync new NonfairSync(permits);}static final class NonfairSync extends Sync {NonfairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}}//Acquires a permit from this semaphore, blocking until one is available, //or the thread is Thread#interrupt interrupted.public void acquire() throws InterruptedException {//执行AQS的模版方法acquireSharedInterruptibly()sync.acquireSharedInterruptibly(1);}//Releases a permit, returning it to the semaphore.public void release() {//执行AQS的模版方法releaseShared()sync.releaseShared(1);}//Synchronization implementation for semaphore. //Uses AQS state to represent permits. Subclassed into fair and nonfair versions.abstract static class Sync extends AbstractQueuedSynchronizer {Sync(int permits) {//设置state的值为传入的令牌数setState(permits);}final int getPermits() {return getState();}final int nonfairTryAcquireShared(int acquires) {for (;;) {int available getState();int remaining available - acquires;if (remaining 0 || compareAndSetState(available, remaining)) {return remaining;}}}protected final boolean tryReleaseShared(int releases) {for (;;) {int current getState();int next current releases;if (next current) {throw new Error(Maximum permit count exceeded);}if (compareAndSetState(current, next)) {return true;}}}…}…
}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {…//Acquires in shared mode, aborting if interrupted.//Implemented by first checking interrupt status, then invoking at least once #tryAcquireShared, returning on success.//Otherwise the thread is queued, possibly repeatedly blocking and unblocking,//invoking #tryAcquireShared until success or the thread is interrupted.public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException();}//执行Semaphore的内部类Sync的子类实现的tryAcquireShared()方法抢占共享锁if (tryAcquireShared(arg) 0) {//执行AQS的doAcquireSharedInterruptibly()方法doAcquireSharedInterruptibly(arg);}}//Releases in shared mode. //Implemented by unblocking one or more threads if #tryReleaseShared returns true.public final boolean releaseShared(int arg) {//执行Semaphore的内部类Sync实现的tryReleaseShared()方法释放共享锁if (tryReleaseShared(arg)) {//执行AQS的doReleaseShared()方法doReleaseShared();return true;}return false;}…
} 6.Semaphore的令牌获取过程
(1)Semaphore的令牌获取过程
(2)Semaphore的公平策略
(3)Semaphore的非公平策略
(4)tryAcquireShared()后的处理 (1)Semaphore的令牌获取过程
在调用Semaphore的acquire()方法获取令牌时首先会执行AQS的模版方法acquireSharedInterruptibly()然后执行Sync子类实现的tryAcquireShared()方法来抢占锁。如果抢占锁失败则执行AQS的doAcquireSharedInterruptibly()方法。该方法会将当前线程封装成Node结点并加入等待队列然后挂起线程。 (2)Semaphore的公平策略
在执行Sync子类FairSync的tryAcquireShared()方法尝试获取令牌时先通过AQS的hasQueuedPredecessors()判断是否已有线程在等待队列中。如果已经有线程在等待队列中那么当前线程获取令牌就必然失败。否则就递减state的值 判断state是否小于0 CAS设置state的值。 (3)Semaphore的非公平策略
在执行Sync子类NonfairSync的tryAcquireShared()方法尝试获取令牌时则会直接执行Sync的nonfairTryAcquireShared()方法来获取令牌也就是递减state的值 判断state是否小于0 CAS设置state的值。 (4)tryAcquireShared()后的处理
不管公平策略还是非公平策略对应的tryAcquireShared()方法都是通过自旋来抢占令牌(CAS设置state)直到令牌数不够时才会让tryAcquireShared()方法返回小于0的数值。然后触发执行AQS的doAcquireSharedInterruptibly()方法该方法会将当前线程封装成Node结点并加入等待队列然后挂起线程。
public class Semaphore implements java.io.Serializable {private final Sync sync;//Creates a Semaphore with the given number of permits and nonfair fairness setting.public Semaphore(int permits) {sync new NonfairSync(permits);}static final class NonfairSync extends Sync {NonfairSync(int permits) {super(permits);}//以非公平锁的方式获取令牌protected int tryAcquireShared(int acquires) {//执行Sync的nonfairTryAcquireShared()方法return nonfairTryAcquireShared(acquires);}}static final class FairSync extends Sync {FairSync(int permits) {super(permits);}//以公平锁的方式获取令牌protected int tryAcquireShared(int acquires) {for (;;) {//如果已经有线程在等待队列中那么就说明获取令牌必然失败if (hasQueuedPredecessors()) {return -1;}int available getState();int remaining available - acquires;if (remaining 0 || compareAndSetState(available, remaining)) {return remaining;}}}}//Acquires a permit from this semaphore, blocking until one is available, //or the thread is Thread#interrupt interrupted.public void acquire() throws InterruptedException {//执行AQS的模版方法acquireSharedInterruptibly()sync.acquireSharedInterruptibly(1);}//Synchronization implementation for semaphore. //Uses AQS state to represent permits. Subclassed into fair and nonfair versions.abstract static class Sync extends AbstractQueuedSynchronizer {Sync(int permits) {//设置state的值为传入的令牌数setState(permits);}final int getPermits() {return getState();}final int nonfairTryAcquireShared(int acquires) {for (;;) {int available getState();int remaining available - acquires;if (remaining 0 || compareAndSetState(available, remaining)) {return remaining;}}}…}…
}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {…//Acquires in shared mode, aborting if interrupted.//Implemented by first checking interrupt status, then invoking at least once #tryAcquireShared, returning on success.//Otherwise the thread is queued, possibly repeatedly blocking and unblocking,//invoking #tryAcquireShared until success or the thread is interrupted.public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException();}//执行Semaphore的内部类Sync的子类实现的tryAcquireShared()方法抢占共享锁if (tryAcquireShared(arg) 0) {//执行AQS的doAcquireSharedInterruptibly()方法doAcquireSharedInterruptibly(arg);}}//Queries whether any threads have been waiting to acquire longer than the current thread.public final boolean hasQueuedPredecessors() {Node t tail; // Read fields in reverse initialization orderNode h head;Node s;return h ! t ((s h.next) null || s.thread ! Thread.currentThread());}//Acquires in shared interruptible mode.private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {final Node node addWaiter(Node.SHARED);//封装当前线程为Shared类型的Node结点boolean failed true;try {//第一次循环r -1所以会执行AQS的shouldParkAfterFailedAcquire()方法//将node结点的有效前驱结点的状态设置为SIGNALfor (;;) {final Node p node.predecessor();//node结点的前驱结点if (p head) {int r tryAcquireShared(arg);if (r 0) {setHeadAndPropagate(node, r);p.next null; // help GCfailed false;return;}}//执行shouldParkAfterFailedAcquire()方法设置node结点的前驱结点的状态为SIGNAL//执行parkAndCheckInterrupt()方法挂起当前线程if (shouldParkAfterFailedAcquire(p, node) parkAndCheckInterrupt()) {throw new InterruptedException();}}} finally {if (failed) {cancelAcquire(node);}}}…
} 7.Semaphore的令牌释放过程
(1)Semaphore的令牌释放过程
(2)Semaphore的令牌释放本质 (1)Semaphore的令牌释放过程
在调用Semaphore的release()方法去释放令牌时首先会执行AQS的模版方法releaseShared()然后执行Sync实现的tryReleaseShared()方法来释放锁(累加state值)。如果释放锁成功则执行AQS的doReleaseShared()方法去唤醒线程。 (2)Semaphore的令牌释放本质
Semaphore的release()方法释放令牌的本质就是对state字段进行累加然后唤醒等待队列头结点的后继结点 唤醒传递来唤醒等待的线程。 注意并非一定要执行acquire()方法的线程才能调用release()方法任意一个线程都可以调用release()方法也可以通过reducePermits()方法来减少令牌数。
public class Semaphore implements java.io.Serializable {private final Sync sync;//Creates a Semaphore with the given number of permits and nonfair fairness setting.public Semaphore(int permits) {sync new NonfairSync(permits);}//Releases a permit, returning it to the semaphore.public void release() {//执行AQS的模版方法releaseShared()sync.releaseShared(1);}//Synchronization implementation for semaphore. //Uses AQS state to represent permits. Subclassed into fair and nonfair versions.abstract static class Sync extends AbstractQueuedSynchronizer {Sync(int permits) {//设置state的值为传入的令牌数setState(permits);}//尝试释放锁也就是对state值进行累加protected final boolean tryReleaseShared(int releases) {for (;;) {int current getState();int next current releases;if (next current) {throw new Error(Maximum permit count exceeded);}if (compareAndSetState(current, next)) {return true;}}}…}…
}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {… //Releases in shared mode. //Implemented by unblocking one or more threads if #tryReleaseShared returns true.public final boolean releaseShared(int arg) {//执行Semaphore的内部类Sync实现的tryReleaseShared()方法释放共享锁if (tryReleaseShared(arg)) {//执行AQS的doReleaseShared()方法唤醒等待队列中的线程doReleaseShared();return true;}return false;}//Release action for shared mode – signals successor and ensures propagation. //Note: For exclusive mode, release just amounts to calling unparkSuccessor of head if it needs signal.private void doReleaseShared() {for (;;) {//每次循环时头结点都会发生变化//因为调用unparkSuccessor()方法会唤醒doAcquireSharedInterruptibly()方法中阻塞的线程//然后阻塞的线程会在执行setHeadAndPropagate()方法时通过setHead()修改头结点Node h head;//获取最新的头结点if (h ! null h ! tail) {//等待队列中存在挂起线程的结点int ws h.waitStatus;if (ws Node.SIGNAL) {//头结点的状态正常表示对应的线程可以被唤醒if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {continue;//loop to recheck cases}//唤醒头结点的后继结点//唤醒的线程会在doAcquireSharedInterruptibly()方法中执行setHeadAndPropagate()方法修改头结点unparkSuccessor(h);} else if (ws 0 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {//如果ws 0表示初始状态则修改结点为PROPAGATE状态continue;//loop on failed CAS}}if (h head) {//判断头结点是否有变化break;//loop if head changed}}}//Wakes up nodes successor, if one exists.private void unparkSuccessor(Node node) {int ws node.waitStatus;if (ws 0) {compareAndSetWaitStatus(node, ws, 0);}Node s node.next;if (s null || s.waitStatus 0) {s null;for (Node t tail; t ! null t ! node; t t.prev) {if (t.waitStatus 0) {s t;}}}if (s ! null) {LockSupport.unpark(s.thread);}}…
} 8.同步屏障CyclicBarrier介绍
(1)CyclicBarrier的作用
(2)CyclicBarrier的基本原理 (1)CyclicBarrier的作用
CyclicBarrier的字面意思就是可循环使用的屏障。CyclicBarrier的主要作用就是让一组线程到达一个屏障时被阻塞直到最后一个线程到达屏障时屏障才会打开接着才让所有被屏障拦截的线程一起继续往下执行。线程进入屏障是通过CyclicBarrier的await()方法来实现的。 (2)CyclicBarrier的基本原理
假设有3个线程在运行中都会调用CyclicBarrier的await()方法而每个线程从开始执行到执行await()方法所用时间可能不一样最终当执行时间最长的线程到达屏障时会唤醒其他较早到达屏障的线程继续往下执行。 CyclicBarrier包含两个层面的意思
一是Barrier屏障点线程调用await()方法都会阻塞在屏障点直到所有线程都到达屏障点后再放行。
二是Cyclic循环当所有线程通过当前屏障点后又可以进入下一轮的屏障点进行等待可以不断循环。 9.CyclicBarrier的await()方法源码
(1)CyclicBarrier的成员变量
(2)CyclicBarrier的await()方法源码
(3)CountDownLatch和CyclicBarrier对比 (1)CyclicBarrier的成员变量
//A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
//CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other.
//The barrier is called cyclic because it can be re-used after the waiting threads are released.
public class CyclicBarrier {…private static class Generation {boolean broken false;}private final ReentrantLock lock new ReentrantLock();private final Condition trip lock.newCondition();//用于线程之间相互唤醒private final int parties;//参与的线程数量private int count;//初始值是parties每调用一次await()就减1private final Runnable barrierCommand;//回调任务private Generation generation new Generation();…
}
CyclicBarrier是基于ReentrantLock Condition来实现的。 parties表示每次要求到达屏障点的线程数只有到达屏障点的线程数满足指定的parties数量所有线程才会被唤醒。 count是一个初始值为parties的计数器每个线程调用await()方法会对count减1当count为0时会唤醒所有线程并且结束当前的屏障周期generation然后所有线程进入下一个屏障周期而且count会恢复成parties。 (2)CyclicBarrier的await()方法源码
线程调用CyclicBarrier的await()方法时会触发调用CyclicBarrier的dowait()方法。 CyclicBarrier的dowait()方法会对count计数器进行递减。如果count递减到0则会调用CyclicBarrier的nextGeneration()唤醒所有线程同时如果异步回调任务barrierCommand不为空则会执行该任务。如果count还没递减到0则调用Condition的await()方法阻塞当前线程。 被阻塞的线程除了会被CyclicBarrier的nextGeneration()方法唤醒外还会被Thread的interrupt()方法唤醒、被中断异常唤醒而这些唤醒会调用CyclicBarrier的breakBarrier()方法。 在CyclicBarrier的nextGeneration()方法和CyclicBarrier的breakBarrier()方法中都会通过Condition的signalAll()方法唤醒所有被阻塞等待的线程。
//A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
//CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other.
//The barrier is called cyclic because it can be re-used after the waiting threads are released.
public class CyclicBarrier {…private static class Generation {boolean broken false;//用来标记屏障是否被中断}private final ReentrantLock lock new ReentrantLock();private final Condition trip lock.newCondition();//用于线程之间相互唤醒private final int parties;//参与的线程数量private int count;//初始值是parties每调用一次await()就减1private final Runnable barrierCommand;//回调任务private Generation generation new Generation();public CyclicBarrier(int parties, Runnable barrierAction) {if (parties 0) throw new IllegalArgumentException();this.parties parties;this.count parties;this.barrierCommand barrierAction;}public CyclicBarrier(int parties) {this(parties, null);}//Waits until all #getParties have invoked await on this barrier.public int await() throws InterruptedException, BrokenBarrierException {try {//执行CyclicBarrier的dowait()方法return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe);}}//Main barrier code, covering the various policies.private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {final ReentrantLock lock this.lock;lock.lock();//使用Condition需要先获取锁try {//获取当前的generationfinal Generation g generation;//确认当前generation的barrier是否有效如果generation的broken为true则抛出屏障中断异常if (g.broken) {throw new BrokenBarrierException();}if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}//统计已经到达当前generation的线程数量int index –count;//如果index为0则表示所有线程都到达了屏障点if (index 0) {boolean ranAction false;try {final Runnable command barrierCommand;if (command ! null) {//触发回调command.run();}ranAction true;//执行nextGeneration()方法唤醒所有线程同时进入下一个屏障周期nextGeneration();return 0;} finally {if (!ranAction) {breakBarrier();}}}//loop until tripped, broken, interrupted, or timed out//如果index 0则阻塞当前线程for (;;) {try {if (!timed) {//通过Condition的await()方法在阻塞当前线程的同时释放锁//这样其他线程就能获取到锁执行上面的index –counttrip.await();} else if (nanos 0L) {nanos trip.awaitNanos(nanos);}} catch (InterruptedException ie) {if (g generation ! g.broken) {breakBarrier();throw ie;} else {Thread.currentThread().interrupt();}}if (g.broken) {throw new BrokenBarrierException();}if (g ! generation) {return index;}if (timed nanos 0L) {//中断屏障设置generation.broken为truebreakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}}//Updates state on barrier trip and wakes up everyone.//Called only while holding lock.private void nextGeneration() {//通过Condition的signalAll()唤醒所有等待的线程trip.signalAll();//还原countcount parties;//进入新的generationgeneration new Generation();}//Sets current barrier generation as broken and wakes up everyone.//Called only while holding lock.private void breakBarrier() {generation.broken true;count parties;//通过Condition的signalAll()唤醒所有等待的线程trip.signalAll();}…
}
(3)CountDownLatch和CyclicBarrier对比
一.CyclicBarrier可以被重用、可以响应中断
二.CountDownLatch的计数器只能使用一次但可以通过reset()方法重置 10.使用CountDownLatch等待注册的完成
Hadoop HDFS(分布式存储系统)的NameNode分为主备两个节点各个DataNode在启动时都会向两个NameNode进行注册此时就可以使用CountDownLatch等待向主备节点注册的完成。
//DataNode启动类
public class DataNode {//是否还在运行private volatile Boolean shouldRun;//负责和一组NameNode(主NameNode 备NameNode)通信的组件private NameNodeGroupOfferService offerService;//初始化DataNodeprivate void initialize() {this.shouldRun true;this.offerService new NameNodeGroupOfferService();this.offerService.start(); }//运行DataNodeprivate void run() {try {while(shouldRun) {Thread.sleep(10000); } } catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) {DataNode datanode new DataNode();datanode.initialize();datanode.run(); }
}//负责某个NameNode进行通信的线程组件
public class NameNodeServiceActor {//向某个NameNode进行注册public void register(CountDownLatch latch) {Thread registerThread new RegisterThread(latch);registerThread.start(); }//负责注册的线程传入一个CountDownLatchclass RegisterThread extends Thread {CountDownLatch latch;public RegisterThread(CountDownLatch latch) {this.latch latch;}Overridepublic void run() {try {//发送rpc接口调用请求到NameNode去进行注册System.out.println(发送请求到NameNode进行注册…);Thread.sleep(1000); latch.countDown(); } catch (Exception e) {e.printStackTrace();}}}
}//负责跟一组NameNode(主NameNode 备NameNode)进行通信的线程组件
public class NameNodeGroupOfferService {//负责跟NameNode主节点通信的ServiceActor组件private NameNodeServiceActor activeServiceActor;//负责跟NameNode备节点通信的ServiceActor组件private NameNodeServiceActor standbyServiceActor;//构造函数public NameNodeGroupOfferService() {this.activeServiceActor new NameNodeServiceActor();this.standbyServiceActor new NameNodeServiceActor();}//启动OfferService组件public void start() {//直接使用两个ServiceActor组件分别向主备两个NameNode节点进行注册register();}//向主备两个NameNode节点进行注册private void register() {try {CountDownLatch latch new CountDownLatch(2); this.activeServiceActor.register(latch); this.standbyServiceActor.register(latch); latch.await();//阻塞等待主备都完成注册System.out.println(主备NameNode全部注册完毕…);} catch (Exception e) {e.printStackTrace(); }}
} 11.使用CyclicBarrier将工作任务多线程分而治之
//输出结果
//线程1执行自己的一部分工作…
//线程2执行自己的一部分工作…
//线程3执行自己的一部分工作…
//所有线程都完成自己的任务可以合并结果了…
//最终结果合并完成线程3可以退出…
//最终结果合并完成线程1可以退出…
//最终结果合并完成线程2可以退出…
public class CyclicBarrierDemo {public static void main(String[] args) {final CyclicBarrier barrier new CyclicBarrier(3, new Runnable() {public void run() {System.out.println(所有线程都完成自己的任务可以合并结果了…);}});new Thread() {public void run() {try {System.out.println(线程1执行自己的一部分工作…);barrier.await();System.out.println(最终结果合并完成线程1可以退出…);} catch (Exception e) {e.printStackTrace();}}}.start();new Thread() {public void run() {try {System.out.println(线程2执行自己的一部分工作…);barrier.await();System.out.println(最终结果合并完成线程2可以退出…);} catch (Exception e) {e.printStackTrace();}}}.start();new Thread() {public void run() {try {System.out.println(线程3执行自己的一部分工作…);barrier.await();System.out.println(最终结果合并完成线程3可以退出…);} catch (Exception e) {e.printStackTrace();}}}.start();}
} 12.使用CyclicBarrier聚合服务接口的返回结果
当然也可以使用CountDownLatch来实现聚合服务接口的返回结果
public class ApiServiceDemo {public MapString, Object queryOrders() throws Exception {final ListObject results new ArrayListObject();final MapString, Object map new ConcurrentHashMapString, Object();CyclicBarrier barrier new CyclicBarrier(3, new Runnable() {Overridepublic void run() {map.put(price, results.get(0)); map.put(order, results.get(1)); map.put(stats, results.get(2)); }});//请求价格接口new Thread() {public void run() {try {System.out.println(请求价格服务…); Thread.sleep(1000); results.add(new Object()); barrier.await();} catch (Exception e) {e.printStackTrace(); } };}.start();//请求订单接口new Thread() {public void run() {try {System.out.println(请求订单服务…); Thread.sleep(1000); results.add(new Object()); barrier.await();} catch (Exception e) {e.printStackTrace(); } };}.start();//请求统计接口new Thread() {public void run() {try {System.out.println(请求订单统计服务…); Thread.sleep(1000); results.add(new Object()); barrier.await();} catch (Exception e) {e.printStackTrace(); } };}.start();while(map.size() 3) {Thread.sleep(100); }return map;}
} 13.使用Semaphore等待指定数量线程完成任务
可以通过Semaphore实现等待指定数量的线程完成任务才往下执行。
//输出结果如下
//线程2执行一个计算任务
//等待1个线程完成任务即可…
//线程1执行一个计算任务
public class SemaphoreDemo {public static void main(String[] args) throws Exception {final Semaphore semaphore new Semaphore(0);new Thread() {public void run() {try {Thread.sleep(2000);System.out.println(线程1执行一个计算任务);semaphore.release();} catch (Exception e) {e.printStackTrace();}}}.start();new Thread() {public void run() {try {Thread.sleep(1000);System.out.println(线程2执行一个计算任务);semaphore.release();} catch (Exception e) {e.printStackTrace();}}}.start();semaphore.acquire(1);System.out.println(等待1个线程完成任务即可…);}
}
相关文章
-
安福县住房和城乡建设局网站wordpress插件如何破解版
安福县住房和城乡建设局网站wordpress插件如何破解版
- 技术栈
- 2026年03月21日
-
安福网站建设上海微信网站公司哪家好
安福网站建设上海微信网站公司哪家好
- 技术栈
- 2026年03月21日
-
安防公司手机网站asp网站变慢
安防公司手机网站asp网站变慢
- 技术栈
- 2026年03月21日
-
安徽创誉建设工程有限公司网站精品课程网站建设现状
安徽创誉建设工程有限公司网站精品课程网站建设现状
- 技术栈
- 2026年03月21日
-
安徽公司网站建设网站建设平台硬件要求
安徽公司网站建设网站建设平台硬件要求
- 技术栈
- 2026年03月21日
-
安徽海川建设公司网站一级消防工程师考试资料
安徽海川建设公司网站一级消防工程师考试资料
- 技术栈
- 2026年03月21日






