Java 中的 Java . util . concurrent . cyclic barrier
原文:https://www . geesforgeks . org/Java-util-concurrent-cyclic barrier-Java/
CyclicBarrier 用于使线程相互等待。当不同的线程处理一部分计算时,以及当所有线程都完成执行时,需要在父线程中组合结果时,使用它。换句话说,当多个线程执行不同的子任务,并且这些子任务的输出需要被组合以形成最终输出时,使用 CyclicBarrier。执行完成后,线程调用 await()方法,等待其他线程到达屏障。一旦所有线程都到达了,障碍就为线程的继续进行让路。
循环屏障的工作
CyclicBarriers 在 java.util.concurrent package 中定义。首先,创建一个 CyclicBarriers 的新实例,指定 barriers 应该等待的线程数。
CyclicBarrier newBarrier = new CyclicBarrier(numberOfThreads);
每个线程都进行一些计算,在完成计算后,调用 wait()方法,如图所示:
public void run()
{
// thread does the computation
newBarrier.await();
}
循环屏障的工作:
一旦调用 await()的线程数等于的线程数,屏障就会为等待的线程让路。CyclicBarrier 也可以通过一些操作来初始化,一旦所有线程都到达了该屏障,就会执行这些操作。这个动作可以组合/利用在屏障中等待的单个线程的计算结果。
Runnable action = ...
//action to be performed when all threads reach the barrier;
CyclicBarrier newBarrier = new CyclicBarrier(numberOfThreads, action);
循环屏障的重要方法:
-
getParties: Returns the number of parties required to trip this barrier. Syntax:
java public int getParties()
返回: 需要多少方才能通过这道屏障
-
reset: Resets the barrier to its initial state. Syntax:
java public void reset()
返回: 无效,但将屏障重置为初始状态。如果目前有任何一方在栅栏处等待,他们将带着一个破碎的恐惧返回。
-
isBroken: Queries if this barrier is in a broken state. Syntax:
java public boolean isBroken()
返回: 如果一方或多方由于施工或上次重置后的中断或超时,或由于异常导致屏障动作失败,则返回 true 否则为假。
-
getNumberWaiting: Returns the number of parties currently waiting at the barrier. Syntax:
java public int getNumberWaiting()
返回: 当前在等待中被阻止的方数()
-
等待:等待,直到各方在此屏障上调用了等待。 语法:
public int await() throws InterruptedException, BrokenBarrierException
返回: 当前线程的到达索引,其中索引 getParties()–1 表示第一个到达,零表示最后一个到达。
-
await: Waits until all parties have invoked await on this barrier, or the specified waiting time elapses. Syntax:
java public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
返回: 当前线程的到达索引,其中索引 getParties()–1 表示第一个到达,零表示最后一个到达
//JAVA program to demonstrate execution on Cyclic Barrier
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
class Computation1 implements Runnable
{
public static int product = 0;
public void run()
{
product = 2 * 3;
try
{
Tester.newBarrier.await();
}
catch (InterruptedException | BrokenBarrierException e)
{
e.printStackTrace();
}
}
}
class Computation2 implements Runnable
{
public static int sum = 0;
public void run()
{
// check if newBarrier is broken or not
System.out.println("Is the barrier broken? - " + Tester.newBarrier.isBroken());
sum = 10 + 20;
try
{
Tester.newBarrier.await(3000, TimeUnit.MILLISECONDS);
// number of parties waiting at the barrier
System.out.println("Number of parties waiting at the barrier "+
"at this point = " + Tester.newBarrier.getNumberWaiting());
}
catch (InterruptedException | BrokenBarrierException e)
{
e.printStackTrace();
}
catch (TimeoutException e)
{
e.printStackTrace();
}
}
}
public class Tester implements Runnable
{
public static CyclicBarrier newBarrier = new CyclicBarrier(3);
public static void main(String[] args)
{
// parent thread
Tester test = new Tester();
Thread t1 = new Thread(test);
t1.start();
}
public void run()
{
System.out.println("Number of parties required to trip the barrier = "+
newBarrier.getParties());
System.out.println("Sum of product and sum = " + (Computation1.product +
Computation2.sum));
// objects on which the child thread has to run
Computation1 comp1 = new Computation1();
Computation2 comp2 = new Computation2();
// creation of child thread
Thread t1 = new Thread(comp1);
Thread t2 = new Thread(comp2);
// moving child thread to runnable state
t1.start();
t2.start();
try
{
Tester.newBarrier.await();
}
catch (InterruptedException | BrokenBarrierException e)
{
e.printStackTrace();
}
// barrier breaks as the number of thread waiting for the barrier
// at this point = 3
System.out.println("Sum of product and sum = " + (Computation1.product +
Computation2.sum));
// Resetting the newBarrier
newBarrier.reset();
System.out.println("Barrier reset successful");
}
}
输出:
<Number of parties required to trip the barrier = 3
Sum of product and sum = 0
Is the barrier broken? - false
Number of parties waiting at the barrier at this point = 0
Sum of product and sum = 36
Barrier reset successful
说明:因为子线程还没有运行设置 sum 和 product 变量的值,所以(sum + product) = 0 的值打印在控制台上。接下来,控制台上会打印(sum + product) = 36,因为子线程运行时设置了 sum 和 product 的值。此外,屏障上的等待线程数达到 3,因此屏障允许所有线程通过,最终打印出 36 个。“此时在屏障处等待的参与方数量”的值= 0,因为所有三个线程都已经调用了 await()方法,因此屏障不再活动。最后,newBarrier 被重置,可以再次使用。
经纪例外
当任何等待的线程离开屏障时,屏障就会被破坏。当一个或多个等待线程被中断或等待时间结束时,会发生这种情况,因为线程调用了 await()方法,超时如下:
newBarrier.await(1000, TimeUnit.MILLISECONDS);
// thread calling this await()
// methods waits for only 1000 milliseconds.
当屏障由于一个或多个参与线程而被打破时,所有其他线程的 await()方法都会抛出一个 BrokenThreadException。而已经在屏障中等待的线程的 await()调用被终止。
循环障碍和计数障碍的区别
- CountDownLatch 在程序中只能使用一次(直到它的计数达到 0)。
- 一旦屏障中的所有线程被释放,就可以一次又一次地使用循环屏障。
参考: 甲骨文
本文由 Mayank Kumar 供稿。如果你喜欢 GeeksforGeeks 并想投稿,你也可以使用contribute.geeksforgeeks.org写一篇文章或者把你的文章邮寄到 contribute@geeksforgeeks.org。看到你的文章出现在极客博客主页上,帮助其他极客。
如果你发现任何不正确的地方,或者你想分享更多关于上面讨论的话题的信息,请写评论。
版权属于:月萌API www.moonapi.com,转载请注明出处