Ch 13.3 Modern Concurrency (java.util.concurrent)
Java's concurrency toolkit has evolved far beyond raw Thread management. Since Java 5, the java.util.concurrent package provides higher-level tools for building robust, efficient concurrent applications.
1. ExecutorService — Thread Pool Management
Creating and destroying threads is expensive. A thread pool pre-creates a fixed set of threads and reuses them, drastically reducing overhead.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ExecutorServiceExample {
public static void main(String[] args) throws InterruptedException {
// Fixed-size thread pool (5 threads)
ExecutorService fixedPool = Executors.newFixedThreadPool(5);
// Submit 10 tasks — 5 threads handle them in rotation
for (int i = 1; i <= 10; i++) {
final int taskId = i;
fixedPool.execute(() -> {
System.out.printf("[%s] Processing task %d%n",
Thread.currentThread().getName(), taskId);
try { Thread.sleep(500); } catch (InterruptedException e) {}
});
}
fixedPool.shutdown(); // no new tasks accepted
fixedPool.awaitTermination(30, TimeUnit.SECONDS); // wait for completion
System.out.println("All tasks complete");
}
}
Thread Pool Types
// 1. Fixed pool: ideal for CPU-bound work (set to number of CPU cores)
ExecutorService fixed = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
// 2. Single thread executor: guarantees sequential processing
ExecutorService single = Executors.newSingleThreadExecutor();
// 3. Cached pool: dynamically grows/shrinks; ideal for I/O-bound tasks
ExecutorService cached = Executors.newCachedThreadPool();
// 4. Scheduled pool: for delayed or repeated task execution
import java.util.concurrent.ScheduledExecutorService;
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);
2. Future and Callable — Returning Results
Runnable cannot return a result. Callable and Future enable getting results from asynchronous tasks.
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.List;
public class CallableFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(4);
// Prime number checking tasks
List<Future<Boolean>> futures = new ArrayList<>();
int[] numbersToCheck = {17, 100, 97, 49, 113};
for (int num : numbersToCheck) {
Future<Boolean> future = executor.submit(() -> {
Thread.sleep(200); // simulate complex computation
return isPrime(num);
});
futures.add(future);
}
// Collect results
for (int i = 0; i < numbersToCheck.length; i++) {
boolean isPrime = futures.get(i).get(); // blocks until result is ready
System.out.printf("%d is prime: %b%n", numbersToCheck[i], isPrime);
}
executor.shutdown();
}
static boolean isPrime(int n) {
if (n < 2) return false;
for (int i = 2; i <= Math.sqrt(n); i++) {
if (n % i == 0) return false;
}
return true;
}
}
3. CompletableFuture — Async Pipelines
CompletableFuture (Java 8+) is a powerful tool for non-blocking async programming, similar to JavaScript's Promise.
import java.util.concurrent.CompletableFuture;
public class CompletableFutureBasic {
public static void main(String[] args) throws Exception {
// 1. supplyAsync: produce a value asynchronously
CompletableFuture<String> cf = CompletableFuture
.supplyAsync(() -> {
System.out.println("[Async] DB query start: " + Thread.currentThread().getName());
try { Thread.sleep(1000); } catch (InterruptedException e) {}
return "User-Alice";
})
// 2. thenApply: transform the result (like map)
.thenApply(user -> {
System.out.println("[Transform] Processing user data");
return "Hello, " + user + "!";
})
// 3. Chain another thenApply
.thenApply(greeting -> greeting + " Have a great day!");
System.out.println("Main thread: async task started, doing other work...");
// 4. get() blocks until the result is available
String result = cf.get();
System.out.println("Final result: " + result);
}
}
thenAccept, thenRun
CompletableFuture.supplyAsync(() -> "Email sent successfully")
.thenAccept(result -> System.out.println("Consume: " + result)) // no return value
.thenRun(() -> System.out.println("Post-processing done")); // no input or output
thenCombine — Combine Two Tasks
import java.util.concurrent.CompletableFuture;
public class CompletableFutureCombine {
public static void main(String[] args) throws Exception {
// Call two APIs in parallel and combine results
CompletableFuture<String> weatherCF = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(800); } catch (InterruptedException e) {}
return "Sunny, 23 C";
});
CompletableFuture<String> newsCF = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(600); } catch (InterruptedException e) {}
return "Java 25 released!";
});
// Combine once both complete
CompletableFuture<String> combined = weatherCF.thenCombine(newsCF,
(weather, news) -> "Weather: " + weather + " | News: " + news
);
long start = System.currentTimeMillis();
System.out.println(combined.get());
System.out.println("Elapsed: " + (System.currentTimeMillis() - start) + "ms");
// ~800ms (parallel, not 1400ms)
}
}
exceptionally — Exception Handling
CompletableFuture<String> cf = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("API call failed!");
}
return "Success data";
})
.exceptionally(ex -> {
System.out.println("Exception caught: " + ex.getMessage());
return "Default fallback value"; // return fallback on error
})
.thenApply(result -> "Processed: " + result);
System.out.println(cf.get());
allOf — Wait for Multiple Tasks
import java.util.concurrent.CompletableFuture;
public class CompletableFutureAllOf {
public static void main(String[] args) throws Exception {
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) {}
return "User info";
});
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(800); } catch (InterruptedException e) {}
return "Order list";
});
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(600); } catch (InterruptedException e) {}
return "Point balance";
});
long start = System.currentTimeMillis();
// Wait for all three tasks to complete
CompletableFuture.allOf(cf1, cf2, cf3).get();
System.out.printf("My-page data loaded (elapsed: %dms)%n",
System.currentTimeMillis() - start);
System.out.println(cf1.get() + " | " + cf2.get() + " | " + cf3.get());
// elapsed: ~1000ms (limited by the slowest task)
}
}
4. Atomic Classes — Lock-Free Atomic Operations
Guarantees atomic operations without synchronized. Uses CAS (Compare-And-Swap) hardware instructions for superior performance.
import java.util.concurrent.atomic.*;
public class AtomicExample {
// Thread-safe counter without synchronized
static AtomicInteger counter = new AtomicInteger(0);
static AtomicLong totalSum = new AtomicLong(0L);
static AtomicBoolean isRunning = new AtomicBoolean(true);
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.incrementAndGet(); // ++counter (atomic)
totalSum.addAndGet(j); // totalSum += j
}
});
}
for (Thread t : threads) t.start();
for (Thread t : threads) t.join();
System.out.println("Counter result: " + counter.get()); // always 10000
System.out.println("Total sum: " + totalSum.get());
// compareAndSet: change only if current value matches expected
boolean updated = counter.compareAndSet(10000, 0); // if 10000, set to 0
System.out.println("Reset success: " + updated + ", current: " + counter.get());
isRunning.set(false); // atomic boolean assignment
}
}
5. ReentrantLock — Flexible Locking
Provides more functionality than synchronized:
import java.util.concurrent.locks.*;
public class ReentrantLockExample {
private final ReentrantLock lock = new ReentrantLock(true); // fair=true: FIFO order
private int count = 0;
public void increment() {
lock.lock(); // acquire lock
try {
count++;
} finally {
lock.unlock(); // always release in finally!
}
}
// tryLock: attempt to acquire without blocking
public boolean tryIncrement() {
if (lock.tryLock()) { // returns true if acquired
try {
count++;
return true;
} finally {
lock.unlock();
}
}
return false; // lock not available
}
// Condition variables (alternative to wait/notify)
private final Condition notEmpty = lock.newCondition();
private final java.util.Queue<Integer> queue = new java.util.LinkedList<>();
public void put(int item) throws InterruptedException {
lock.lock();
try {
queue.offer(item);
notEmpty.signalAll(); // wake consumers
} finally {
lock.unlock();
}
}
public int take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // wait, releasing the lock
}
return queue.poll();
} finally {
lock.unlock();
}
}
}
6. Thread-Safe Collections
import java.util.concurrent.*;
import java.util.*;
public class ConcurrentCollectionsExample {
public static void main(String[] args) throws InterruptedException {
// ConcurrentHashMap: thread-safe HashMap
// Uses segment-level locking — far faster than synchronized HashMap
ConcurrentHashMap<String, Integer> concurrentMap = new ConcurrentHashMap<>();
Thread[] writers = new Thread[5];
for (int i = 0; i < 5; i++) {
writers[i] = new Thread(() -> {
for (int j = 0; j < 100; j++) {
concurrentMap.merge("key-" + (j % 10), 1, Integer::sum);
}
});
}
for (Thread t : writers) t.start();
for (Thread t : writers) t.join();
System.out.println("Map size: " + concurrentMap.size()); // 10
// CopyOnWriteArrayList: best for frequent reads, rare writes
// On each write, the entire array is copied (expensive writes, lock-free reads)
CopyOnWriteArrayList<String> cowList = new CopyOnWriteArrayList<>();
cowList.add("Item1");
cowList.add("Item2");
// No ConcurrentModificationException even if another thread modifies during iteration
for (String item : cowList) {
System.out.println(item);
cowList.add("New item"); // added to a copy, iteration is unaffected
}
// BlockingQueue: optimized for producer-consumer patterns
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(10);
// put: blocks if full; take: blocks if empty
blockingQueue.put("Task1");
String task = blockingQueue.take();
System.out.println("Processing task: " + task);
}
}
7. CountDownLatch, CyclicBarrier, Semaphore
import java.util.concurrent.*;
public class SynchronizerExamples {
// CountDownLatch: proceed after N tasks complete
static void countDownLatchExample() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3); // 3-count latch
for (int i = 1; i <= 3; i++) {
final int id = i;
new Thread(() -> {
try {
Thread.sleep(id * 500L);
System.out.println("Service-" + id + " ready");
latch.countDown(); // decrement count
} catch (InterruptedException e) {}
}).start();
}
System.out.println("Waiting for all services...");
latch.await(); // block until count reaches 0
System.out.println("All services ready! Starting server");
}
// CyclicBarrier: all threads wait until everyone reaches the barrier (reusable)
static void cyclicBarrierExample() throws InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(3,
() -> System.out.println("=== All teams at checkpoint! Proceeding ===")
);
for (int i = 1; i <= 3; i++) {
final int teamId = i;
new Thread(() -> {
try {
Thread.sleep(teamId * 300L);
System.out.println("Team-" + teamId + " reached checkpoint");
barrier.await(); // wait for all others
System.out.println("Team-" + teamId + " proceeding");
} catch (Exception e) {}
}).start();
}
}
// Semaphore: limit concurrent access (connection pool, rate limiting)
static void semaphoreExample() throws InterruptedException {
Semaphore semaphore = new Semaphore(3); // max 3 concurrent accesses
for (int i = 1; i <= 10; i++) {
final int userId = i;
new Thread(() -> {
try {
semaphore.acquire(); // acquire permit (wait if all 3 in use)
System.out.println("User-" + userId + " connecting to DB (active: "
+ (3 - semaphore.availablePermits()) + "/3)");
Thread.sleep(1000);
System.out.println("User-" + userId + " disconnected");
} catch (InterruptedException e) {
} finally {
semaphore.release(); // always release
}
}).start();
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println("=== CountDownLatch Example ===");
countDownLatchExample();
Thread.sleep(3000);
System.out.println("\n=== CyclicBarrier Example ===");
cyclicBarrierExample();
Thread.sleep(3000);
System.out.println("\n=== Semaphore Example ===");
semaphoreExample();
}
}
8. Practical Example: Parallel API Calls with CompletableFuture
Simulating a shopping mall "My Page" that loads multiple data sources in parallel:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class UserInfo {
String name;
String email;
UserInfo(String name, String email) { this.name = name; this.email = email; }
}
class OrderSummary {
int totalOrders;
int totalAmount;
OrderSummary(int orders, int amount) { this.totalOrders = orders; this.totalAmount = amount; }
}
class PointBalance {
int points;
PointBalance(int points) { this.points = points; }
}
class MyPageData {
UserInfo user;
OrderSummary orders;
PointBalance points;
MyPageData(UserInfo user, OrderSummary orders, PointBalance points) {
this.user = user;
this.orders = orders;
this.points = points;
}
@Override
public String toString() {
return String.format(
"=== My Page ===%n" +
"Name: %s (%s)%n" +
"Orders: %d / Total $%,d%n" +
"Points: %,d P",
user.name, user.email,
orders.totalOrders, orders.totalAmount,
points.points
);
}
}
public class ParallelApiCallExample {
static UserInfo fetchUserInfo(long userId) throws InterruptedException {
Thread.sleep(800); // simulate DB query
return new UserInfo("Alice", "alice@example.com");
}
static OrderSummary fetchOrderSummary(long userId) throws InterruptedException {
Thread.sleep(1200); // simulate order aggregation
return new OrderSummary(15, 380_000);
}
static PointBalance fetchPointBalance(long userId) throws InterruptedException {
Thread.sleep(500); // simulate point lookup
return new PointBalance(12_500);
}
public static void main(String[] args) throws Exception {
long userId = 12345L;
ExecutorService executor = Executors.newFixedThreadPool(3);
System.out.println("Loading My Page data...");
long startTime = System.currentTimeMillis();
// Call 3 APIs in parallel
CompletableFuture<UserInfo> userCF = CompletableFuture
.supplyAsync(() -> {
try { return fetchUserInfo(userId); }
catch (InterruptedException e) { throw new RuntimeException(e); }
}, executor);
CompletableFuture<OrderSummary> orderCF = CompletableFuture
.supplyAsync(() -> {
try { return fetchOrderSummary(userId); }
catch (InterruptedException e) { throw new RuntimeException(e); }
}, executor);
CompletableFuture<PointBalance> pointCF = CompletableFuture
.supplyAsync(() -> {
try { return fetchPointBalance(userId); }
catch (InterruptedException e) { throw new RuntimeException(e); }
}, executor);
// Combine all three results
CompletableFuture<MyPageData> myPageCF = userCF
.thenCombine(orderCF, (user, orders) -> new Object[]{user, orders})
.thenCombine(pointCF, (arr, points) ->
new MyPageData((UserInfo) arr[0], (OrderSummary) arr[1], points)
);
// Error handling
MyPageData result = myPageCF
.exceptionally(ex -> {
System.out.println("Data load failed: " + ex.getMessage());
return null;
})
.get();
long elapsed = System.currentTimeMillis() - startTime;
if (result != null) {
System.out.println(result);
System.out.printf("%nData loaded in %dms%n", elapsed);
// Sequential would be: 800 + 1200 + 500 = 2500ms
// Parallel: ~1200ms (limited by the slowest call)
}
executor.shutdown();
}
}
Traditional platform threads are heavyweight — creating tens of thousands of them is impractical. Virtual threads are lightweight threads managed by the JVM. You can create millions of them with no performance problem. For I/O-heavy backend servers, they deliver dramatic throughput improvements.
// Virtual thread executor (Java 21+)
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 100_000; i++) {
executor.submit(() -> {
// While waiting for I/O, other virtual threads use the CPU
Thread.sleep(1000);
});
}
} // try-with-resources auto-shuts down
Virtual threads use the same Thread API — no rewriting required. Just swap the executor and get massive concurrency.