Phaser
Comprehensive guide with executable examples to using Phaser, an advanced and sophisticated synchronization barrier construct.
If you are interviewing, consider buying our number#1 course for Java Multithreading Interviews.
Overview
The Phaser
class is an extension of the functionality offered by CyclicBarrier
and CountDownLatch
classes and is more flexible in use. One stark difference is that the Phaser
class allows the number of registered parties that synchronize on a phaser to vary over time. The Phaser
can be repeatedly awaited similar to a CyclicBarrier
.
Example
Apart from specifying the number of threads/tasks to synchronize in the constructor, threads/tasks can also register with an instance of Phaser
using the register()
or the bulkRegister(int)
methods. Note, that if a thread register()
-s with an instance of Phaser
there’s no way for the thread to query the instance to determine if it registered with the instance, i.e. there’s no internal book-keeping maintained by the Phaser
instance. However, if such behavior is desired the Phaser
class can be subclassed and the book-keeping functionality added.
The program below exercises some of the APIs exposed by Phaser
to register threads with the barrier. Run the program and study the comments before we discuss them.
import java.util.concurrent.*;class Demostration {public static void main( String args[] ) throws Exception {// create an executor serviceExecutorService executorService = Executors.newFixedThreadPool(5);// create an instance of Phaser class and register only a single that will arrive// at the barrierPhaser phaser = new Phaser(1);try {// a thread registers with the Phaser post construction of the instanceexecutorService.submit(new Runnable() {@Overridepublic void run() {phaser.register();}});// main thread bulk-registers two more partiesphaser.bulkRegister(2);// main thread registering one more party.phaser.register();// we now have 5 parties registered with the Phaser instance// we instantiate four threads and have them arrive at the barrierfor (int i = 0; i < 4; i++) {executorService.submit(new Runnable() {@Overridepublic void run() {phaser.arriveAndAwaitAdvance();System.out.println(Thread.currentThread().getName() + " moving past barrier.");}});}// sleep for a while so that previous threads can arrive at the barrierThread.sleep(2000);// before arriving at the barrier, print the counts of partiesSystem.out.println(Thread.currentThread().getName() + " just before arrived. \n Arrived parties: " + phaser.getArrivedParties() +"\n Registered parties: " + phaser.getRegisteredParties() +"\n Unarrived parties: " + phaser.getUnarrivedParties());phaser.arriveAndAwaitAdvance();} finally {// remember to shutdown the executor in a finally blockexecutorService.shutdown();executorService.awaitTermination(1, TimeUnit.HOURS);}// main thread prints party counts for the barrierSystem.out.println(Thread.currentThread().getName() + " exiting. \n Arrived parties: " + phaser.getArrivedParties() +"\n Registered parties: " + phaser.getRegisteredParties() +"\n Unarrived parties: " + phaser.getUnarrivedParties());}}
Notice that the main thread is responsible for registering 3 parties with the Phaser
instance after the instance has been constructed but arrives at the barrier only once, i.e. it is not necessary that the thread that invokes register()
must also be the same thread that arrives at the barrier.
Arriving and Deregistering
Consider a scenario where we want all the spawned threads/tasks to wait until the main thread has finished initialization or performed some tasks before we want the spawned threads to proceed. We could initialize the Phaser
with a count of one more than the number of threads we plan to spawn, and then have the main thread do the required work. Finally, the main thread arrives at and deregisters with the barrier at the same time. This releases the spawned threads that have already been waiting at the barrier and reduces the number of parties required to synchronize at the barrier by one for future. The described example appears in the program below.
import java.util.concurrent.*;class Demonstration {public static void main( String args[] ) throws Exception {// create an executor serviceExecutorService executorService = Executors.newFixedThreadPool(15);// create an instance of Phaser with 3 registered partiesPhaser phaser = new Phaser(3);try {for (int i = 0; i < 2; i++) {executorService.execute(new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + " about to arrive at the barrier");phaser.arriveAndAwaitAdvance();System.out.println("Thread " + Thread.currentThread().getName() + " moving past the phaser once");phaser.arriveAndAwaitAdvance();System.out.println(Thread.currentThread().getName() + " moving past the phaser twice");}});}// sleep for a while to simulate work that the main thread needs to get done before// letting the spawn threads proceed forward.Thread.sleep(5000);phaser.arriveAndDeregister();System.out.println(Thread.currentThread().getName() + " past the barrier. \n Arrived parties: " + phaser.getArrivedParties() +"\n Registered parties: " + phaser.getRegisteredParties() +"\n Unarrived parties: " + phaser.getUnarrivedParties());} finally {// remember to shutdown the barrier in a finally blockexecutorService.shutdown();// wait for spawned threads to finishexecutorService.awaitTermination(1, TimeUnit.HOURS);}System.out.println("Program exiting");}}
From ...
Create a free account to view this lesson.
By signing up, you agree to Educative's Terms of Service and Privacy Policy