In multi-threaded programming, a prevalent challenge is managing dependencies between threads, explicitly ensuring that a particular thread doesn't proceed until specific other threads have completed their tasks. The inherent complexity arises from the independent nature of thread execution, where each thread typically completes its tasks at its own pace and in an unpredictable order. Java provides a suite of synchronization mechanisms to effectively manage these inter-thread dependencies, including synchronized blocks, locks, and semaphores. These tools offer diverse ways to coordinate the execution of threads, enabling more controlled and predictable multi-threaded behavior.
Mechanism
In multi-threaded programming, we often encounter scenarios where the execution of specific threads depends on other threads' completion of tasks. Here are some typical situations:
- Single thread dependency. Imagine a "DataLoader" thread responsible for loading data from a file. Multiple "DataProcessor" threads are awaiting the data load completion to initiate their processing.
- Multiple thread dependency. Consider a scenario where we're searching for the maximum value in a large array of numbers. The task is divided among a group of worker threads, each searching a segment of the array. Another thread awaits the completion of all worker threads to aggregate and process the results.
- Operation Count Dependency. Picture a server capable of handling a fixed number of requests simultaneously. The server thread needs to wait not for a group of threads but for an operation to be completed a specific number of times.
Handling these cases requires a robust synchronization mechanism, and that's where CountDownLatch comes into play. The core concept of CountDownLatch is to provide a synchronization aid that allows one or more threads to wait until a set of operations performed in other threads completes. This mechanism ensures that specific tasks are completed before others proceed, thereby effectively coordinating the execution of multiple threads.
The operational principle of CountDownLatch hinges on a count-based mechanism. The latch is initialized with a specific count, representing the number of tasks to be completed or threads to finish. As each task concludes, the respective thread invokes the countDown() method on the latch, decrementing the count. Threads that are dependent on the completion of these tasks call the await() method. This call blocks the execution of the dependent threads until the count reaches zero.
A critical aspect of CountDownLatch is its irreversibility: once the count reaches zero, it can't be reset. This characteristic makes it particularly useful for "one-off" synchronization scenarios where the count isn't expected to change during the program's execution. By employing CountDownLatch, developers can effectively orchestrate the execution of threads, ensuring that crucial code sections are executed in a specific sequence or that certain prerequisites are fulfilled before proceeding.
API
The CountDownLatch class in Java is a synchronization utility that uses a countdown mechanism to enable threads to wait for specific operations to complete. The key components of CountDownLatch are its constructor and several methods:
Here's the constructor:
CountDownLatch(int count): this constructor initializes a newCountDownLatchinstance with a specified count.import java.util.concurrent.CountDownLatch; ... CountDownLatch latch = new CountDownLatch(1);
And here are the methods:
void await() throws InterruptedException: causes the calling thread to wait until the latch's count reaches zero. It's important to note that this is a blocking call, causing the thread to wait indefinitely until the count reaches zero.boolean await(long timeout, TimeUnit unit) throws InterruptedException: also causes the calling thread to wait until the latch's count reaches zero, but only up to the specified waiting time.import java.util.concurrent.TimeUnit; ... latch.await(); // blocking call latch.await(600, TimeUnit.SECONDS);void countDown(): decrements the count of the latch, releasing all waiting threads if the count reaches zero.long getCount(): returns the current count.String toString(): returns a string identifying this latch, along with its state. The state includes the string "Count =" followed by the current count.latch.countDown(); // count down from 1 to 0 latch.getCount(); // 0 latch.toString(); // java.util.concurrent.CountDownLatch@65ab7765[Count = 0]
It's crucial to remember that CountDownLatch is a one-time synchronization aid: once the count reaches zero, it can't be reset. If you require a reusable synchronization aid, consider using CyclicBarrier or Semaphore.
Use cases
Let's refine the recap of the use cases for CountDownLatch.
- Coordinating thread startup: in a system where multiple threads need to perform initialization tasks before the main processing,
CountDownLatchcan be employed to ensure all threads complete their initialization before the system proceeds further. - Managing parallel tasks: when a set of tasks are to be executed concurrently by multiple threads, and it's necessary to wait for all tasks to complete before moving forward,
CountDownLatchserves as a synchronization tool to manage these threads. - Testing multi-threaded code:
CountDownLatchis a valuable tool in testing scenarios involving multi-threaded code. It enables the simulation of critical sections and ensures that the code under test behaves as expected under these conditions. - Synchronization in producer-consumer scenarios: in scenarios involving multiple producer threads generating data and multiple consumer threads processing it,
CountDownLatchcan ensure all producers have completed their tasks before the consumers begin processing. - Parallel computation and aggregation: for tasks that can be divided into parallel executable subtasks,
CountDownLatchcan be used to ensure all subtasks are complete before the results are aggregated. This synchronization is crucial in scenarios where the final result depends on the completion of all subtasks.
In the following sections, we'll illustrate the practical use of CountDownLatch through code examples representing common scenarios in multi-threaded programming.
Single thread dependency example
Consider a situation where a "DataLoader" thread is tasked with loading data from a file. Meanwhile, several "DataProcessor" threads are poised to process this data. However, they must wait until the DataLoader has fully loaded the data before commencing their operations.
You have a Data class:
class SharedData {
private String data;
void load(String data) {
this.data = data;
}
String get() {
return data;
}
}
In this scenario, we have two types of threads. The first type, represented by dataLoaderThread, performs the task of loading data:
class WorkerDataLoader implements Runnable {
private final SharedData data;
private final CountDownLatch latch;
WorkerDataLoader(SharedData data, CountDownLatch latch) {
this.data = data;
this.latch = latch;
}
@Override
public void run() {
System.out.println("DataLoader is loading data...");
try {
Thread.sleep(3000); // Simulating time taken to load data
data.load("Loaded Data");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("DataLoader has completed loading data.");
latch.countDown();
}
}
The second type of thread in this scenario is dataProcessorThread. These threads are responsible for processing the data that the DataLoader thread loads:
class WorkerDataProcessor implements Runnable {
private final SharedData data;
private final CountDownLatch latch;
public WorkerDataProcessor(SharedData data, CountDownLatch latch) {
this.data = data;
this.latch = latch;
}
@Override
public void run() {
try {
System.out.println("DataProcessor is waiting for data to be loaded...");
latch.await();
System.out.println("DataProcessor has received data: " + data.get());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
The main class, DataLoaderProcessorExample, orchestrates the execution of the DataLoader and DataProcessor threads:
public class DataLoaderProcessorExample {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
SharedData data = new SharedData();
Thread[] dataProcessorThreads = new Thread[3];
for (int i = 0; i < dataProcessorThreads.length; i++) {
dataProcessorThreads[i] = new Thread(new WorkerDataProcessor(data, latch));
dataProcessorThreads[i].start();
}
Thread dataLoaderThread = new Thread(new WorkerDataLoader(data, latch));
dataLoaderThread.start();
dataLoaderThread.join();
for (Thread thread : dataProcessorThreads) {
thread.join();
}
}
}
In this code, the DataLoader thread emulates the task of data loading by sleeping for 3 seconds. Upon data loading completion, it invokes countDown() on CountDownLatch, decrementing the counter.
Simultaneously, the DataProcessor threads invoke await() on the CountDownLatch, causing them to pause execution until the counter reaches zero. Once the DataLoader thread completes data loading (signaled by the counter reaching zero), the DataProcessor threads are unblocked and proceed with data processing.
Multiple thread dependency example
Let's say we're searching through a large array of numbers for the maximum value, and we want to do this in parallel by having each worker thread search through a part of the array.
Here is our worker thread:
class Worker implements Runnable {
private final CountDownLatch latch;
private final int[] numbers;
private final int start;
private final int end;
private final AtomicInteger globalMax;
public Worker(CountDownLatch latch, int[] numbers, int start, int end, AtomicInteger globalMax) {
this.latch = latch;
this.numbers = numbers;
this.start = start;
this.end = end;
this.globalMax = globalMax;
}
@Override
public void run() {
int localMax = Integer.MIN_VALUE;
for (int j = start; j < end; j++) {
localMax = Math.max(localMax, numbers[j]);
}
// update global max
int currentGlobalMax;
do {
currentGlobalMax = globalMax.get();
if (localMax <= currentGlobalMax) {
break;
}
} while (!globalMax.compareAndSet(currentGlobalMax, localMax));
latch.countDown();
}
}
The main class, ParallelSearchExample, orchestrates the execution of the worker threads:
public class ParallelSearchExample {
private static final int[] numbers = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20};
private static final int NUM_THREADS = 4;
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(NUM_THREADS);
AtomicInteger globalMax = new AtomicInteger(Integer.MIN_VALUE);
int segmentSize = numbers.length / NUM_THREADS;
for (int i = 0; i < NUM_THREADS; i++) {
int start = i * segmentSize;
int end = (i + 1) * segmentSize;
if (i == NUM_THREADS - 1) { // last segment should go to the end of the array
end = numbers.length;
}
Thread worker = new Thread(new Worker(latch, numbers, start, end, globalMax));
worker.start();
}
latch.await(); // wait for all workers to finish
System.out.println("Maximum number is " + globalMax.get());
}
}
In this example, we divide the array into segments and assign each segment to a worker thread. Each worker finds the maximum in its segment and then updates a global maximum if its local maximum is larger. We use an AtomicInteger for the global maximum to ensure that the update is thread-safe. The master thread (the main thread in this case) waits for all workers to finish by calling await() on CountDownLatch. Once all workers finish and the latch count reaches zero, the master thread prints out the global maximum.
Operation count dependency example
In this example, a server thread waits for all requests to be completed before proceeding. A separate thread processes each request, and once a request is completed, it calls countDown() on the CountDownLatch, decrementing the count. The server thread calls await() on the CountDownLatch, causing it to block until the count reaches zero:
class WorkerServerThread implements Runnable {
private final CountDownLatch latch;
public WorkerServerThread(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
System.out.println("Server is waiting for all requests to be completed...");
latch.await();
System.out.println("All requests are completed. Server can now proceed to the next batch.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Once all requests are completed and the count reaches zero, the server thread is released from its blocking state and can proceed to the next batch of requests.
class WorkerRequestThread implements Runnable {
private final CountDownLatch latch;
public WorkerRequestThread(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
System.out.println("Request " + Thread.currentThread().getName() + " is being processed.");
Thread.sleep(2000); // Simulating time taken to process the request
System.out.println("Request " + Thread.currentThread().getName() + " is completed.");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
The main class, ServerExample, orchestrates the execution of the server and request threads:
public class ServerExample {
private static final int MAX_REQUESTS = 5;
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(MAX_REQUESTS);
Thread[] requestThreads = new Thread[MAX_REQUESTS];
for (int i = 0; i < requestThreads.length; i++) {
requestThreads[i] = new Thread(new WorkerRequestThread(latch));
requestThreads[i].start();
}
Thread serverThread = new Thread(new WorkerServerThread(latch));
serverThread.start();
serverThread.join();
for (Thread thread : requestThreads) {
thread.join();
}
}
}Code testing example
CountDownLatch is also often used in testing multi-threaded code. It allows you to emulate critical sections and ensure that the code tested successfully overcomes them. For example, you might have a test that spawns multiple threads, and you want to ensure that all of these threads complete a particular task before the test proceeds.
class Counter {
private int value = 0;
synchronized void increment() {
value++;
}
synchronized int getValue() {
return value;
}
}
Here is our task:
class CountingTask implements Runnable {
private final Counter counter;
private final CountDownLatch doneSignal;
CountingTask(Counter counter, CountDownLatch doneSignal) {
this.counter = counter;
this.doneSignal = doneSignal;
}
public void run() {
counter.increment();
doneSignal.countDown();
}
}
By using CountDownLatch, you can make the test wait until all threads have completed the task, ensuring that your test only proceeds when the multi-threaded part of your code has completed its work.
public class TestingExample {
@Test
public void testMultiThreadedCode() throws InterruptedException {
int numberOfThreads = 3;
CountDownLatch doneSignal = new CountDownLatch(numberOfThreads);
Counter counter = new Counter();
for (int i = 0; i < numberOfThreads; ++i) {
new Thread(new CountingTask(counter, doneSignal)).start();
}
doneSignal.await();
assertEquals(numberOfThreads, counter.getValue());
}
}CountDownLatch vs. Semaphore
CountDownLatch and Semaphore are both synchronization aids in Java that can be used to control and manage thread execution, but they are used for different purposes.
- CountDownLatch
A CountDownLatch is used when one thread (or a set of threads) needs to wait for one or more other threads to complete their tasks before it can continue. The latch is initialized with a count, and the await() method will block until the count reaches zero. Other threads can decrease the count by calling the countDown() method. Once the count reaches zero, all calls to await() return immediately. Note that the count can't be reset. Once it reaches zero, the CountDownLatch can't be used anymore.
- Semaphore
On the other hand, a Semaphore is used to control access to a shared resource. It maintains a set of permits, and threads can acquire permits (if available) to access the shared resource. When a thread is done with the resource, it releases the permit. If no permits are available, the acquire() method will block until a permit becomes available. Unlike CountDownLatch, a Semaphore can be used repeatedly. When a permit is released, it goes back into the pool and can be acquired by another thread.
- CyclicBarrier
A CyclicBarrier is similar to a CountDownLatch, but it's used when a group of threads must reach a common barrier point before continuing. When a thread reaches the barrier point (by calling await()), it blocks until all other threads reach the barrier point. Once all threads have reached the barrier point, they can all continue. Unlike CountDownLatch, CyclicBarrier can be reused after all threads have passed the barrier.
So, when to use which?
-
Use
CountDownLatchwhen you must ensure that one or more threads wait for other threads to complete their tasks. It's a synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads is complete. -
Use
Semaphorewhen you need to control access to a shared resource, especially when the resource has a limit to how many threads can access it simultaneously. It mainly protects an expensive or critical resource by limiting the number of threads that can access it concurrently. -
Use
CyclicBarrierwhen you have a group of threads that must all wait for each other to reach a common barrier point. This is often used in parallel algorithms that involve divide-and-conquer strategies, where an algorithm is broken into sub-parts that are solved in parallel. The solutions are combined only when all parts have been solved.
Remember that the choice between CountDownLatch and Semaphore depends on the specifics of your problem. Always analyze your situation thoroughly and understand the semantics of these classes before deciding which to use.
Conclusion
Here's a quick recap of what you learned in this topic:
- The
CountDownLatchclass in Java is a powerful concurrency utility that enables synchronization between multiple threads. It allows one or more threads to wait until a set of operations in other threads completes, providing a simple yet effective mechanism for managing inter-thread dependencies. - The
CountDownLatchconstructor initializes a new instance with a specified count. It usescountDown()andawait()methods for controlling threads. - The choice between
CountDownLatch,CyclicBarrierandSemaphoredepends on the specific problem you're trying to solve. Always ensure you understand your scenario well and choose the appropriate tool for the job.