Phaser

Comprehensive guide with executable examples to using Phaser, an advanced and sophisticated synchronization barrier construct.

If you are interviewing, consider buying our number#1 course for Java Multithreading Interviews.

Overview

The Phaser class is an extension of the functionality offered by CyclicBarrier and CountDownLatch classes and is more flexible in use. One stark difference is that the Phaser class allows the number of registered parties that synchronize on a phaser to vary over time. The Phaser can be repeatedly awaited similar to a CyclicBarrier.

Example

Apart from specifying the number of threads/tasks to synchronize in the constructor, threads/tasks can also register with an instance of Phaser using the register() or the bulkRegister(int) methods. Note, that if a thread register()-s with an instance of Phaser there’s no way for the thread to query the instance to determine if it registered with the instance, i.e. there’s no internal book-keeping maintained by the Phaser instance. However, if such behavior is desired the Phaser class can be subclassed and the book-keeping functionality added.

The program below exercises some of the APIs exposed by Phaser to register threads with the barrier. Run the program and study the comments before we discuss them.

Press + to interact
import java.util.concurrent.*;
class Demostration {
public static void main( String args[] ) throws Exception {
// create an executor service
ExecutorService executorService = Executors.newFixedThreadPool(5);
// create an instance of Phaser class and register only a single that will arrive
// at the barrier
Phaser phaser = new Phaser(1);
try {
// a thread registers with the Phaser post construction of the instance
executorService.submit(new Runnable() {
@Override
public void run() {
phaser.register();
}
});
// main thread bulk-registers two more parties
phaser.bulkRegister(2);
// main thread registering one more party.
phaser.register();
// we now have 5 parties registered with the Phaser instance
// we instantiate four threads and have them arrive at the barrier
for (int i = 0; i < 4; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " moving past barrier.");
}
});
}
// sleep for a while so that previous threads can arrive at the barrier
Thread.sleep(2000);
// before arriving at the barrier, print the counts of parties
System.out.println(Thread.currentThread().getName() + " just before arrived. \n Arrived parties: " + phaser.getArrivedParties() +
"\n Registered parties: " + phaser.getRegisteredParties() +
"\n Unarrived parties: " + phaser.getUnarrivedParties());
phaser.arriveAndAwaitAdvance();
} finally {
// remember to shutdown the executor in a finally block
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS);
}
// main thread prints party counts for the barrier
System.out.println(Thread.currentThread().getName() + " exiting. \n Arrived parties: " + phaser.getArrivedParties() +
"\n Registered parties: " + phaser.getRegisteredParties() +
"\n Unarrived parties: " + phaser.getUnarrivedParties());
}
}

Notice that the main thread is responsible for registering 3 parties with the Phaser instance after the instance has been constructed but arrives at the barrier only once, i.e. it is not necessary that the thread that invokes register() must also be the same thread that arrives at the barrier.

Arriving and Deregistering

Consider a scenario where we want all the spawned threads/tasks to wait until the main thread has finished initialization or performed some tasks before we want the spawned threads to proceed. We could initialize the Phaser with a count of one more than the number of threads we plan to spawn, and then have the main thread do the required work. Finally, the main thread arrives at and deregisters with the barrier at the same time. This releases the spawned threads that have already been waiting at the barrier and reduces the number of parties required to synchronize at the barrier by one for future. The described example appears in the program below.

Press + to interact
import java.util.concurrent.*;
class Demonstration {
public static void main( String args[] ) throws Exception {
// create an executor service
ExecutorService executorService = Executors.newFixedThreadPool(15);
// create an instance of Phaser with 3 registered parties
Phaser phaser = new Phaser(3);
try {
for (int i = 0; i < 2; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " about to arrive at the barrier");
phaser.arriveAndAwaitAdvance();
System.out.println("Thread " + Thread.currentThread().getName() + " moving past the phaser once");
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " moving past the phaser twice");
}
});
}
// sleep for a while to simulate work that the main thread needs to get done before
// letting the spawn threads proceed forward.
Thread.sleep(5000);
phaser.arriveAndDeregister();
System.out.println(Thread.currentThread().getName() + " past the barrier. \n Arrived parties: " + phaser.getArrivedParties() +
"\n Registered parties: " + phaser.getRegisteredParties() +
"\n Unarrived parties: " + phaser.getUnarrivedParties());
} finally {
// remember to shutdown the barrier in a finally block
executorService.shutdown();
// wait for spawned threads to finish
executorService.awaitTermination(1, TimeUnit.HOURS);
}
System.out.println("Program exiting");
}
}

From ...

Create a free account to view this lesson.

By signing up, you agree to Educative's Terms of Service and Privacy Policy