Skip to main content

Ch 13.3 Modern Concurrency (java.util.concurrent)

Java's concurrency toolkit has evolved far beyond raw Thread management. Since Java 5, the java.util.concurrent package provides higher-level tools for building robust, efficient concurrent applications.

1. ExecutorService — Thread Pool Management

Creating and destroying threads is expensive. A thread pool pre-creates a fixed set of threads and reuses them, drastically reducing overhead.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecutorServiceExample {
public static void main(String[] args) throws InterruptedException {
// Fixed-size thread pool (5 threads)
ExecutorService fixedPool = Executors.newFixedThreadPool(5);

// Submit 10 tasks — 5 threads handle them in rotation
for (int i = 1; i <= 10; i++) {
final int taskId = i;
fixedPool.execute(() -> {
System.out.printf("[%s] Processing task %d%n",
Thread.currentThread().getName(), taskId);
try { Thread.sleep(500); } catch (InterruptedException e) {}
});
}

fixedPool.shutdown(); // no new tasks accepted
fixedPool.awaitTermination(30, TimeUnit.SECONDS); // wait for completion
System.out.println("All tasks complete");
}
}

Thread Pool Types

// 1. Fixed pool: ideal for CPU-bound work (set to number of CPU cores)
ExecutorService fixed = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);

// 2. Single thread executor: guarantees sequential processing
ExecutorService single = Executors.newSingleThreadExecutor();

// 3. Cached pool: dynamically grows/shrinks; ideal for I/O-bound tasks
ExecutorService cached = Executors.newCachedThreadPool();

// 4. Scheduled pool: for delayed or repeated task execution
import java.util.concurrent.ScheduledExecutorService;
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);

2. Future and Callable — Returning Results

Runnable cannot return a result. Callable and Future enable getting results from asynchronous tasks.

import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.List;

public class CallableFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(4);

// Prime number checking tasks
List<Future<Boolean>> futures = new ArrayList<>();
int[] numbersToCheck = {17, 100, 97, 49, 113};

for (int num : numbersToCheck) {
Future<Boolean> future = executor.submit(() -> {
Thread.sleep(200); // simulate complex computation
return isPrime(num);
});
futures.add(future);
}

// Collect results
for (int i = 0; i < numbersToCheck.length; i++) {
boolean isPrime = futures.get(i).get(); // blocks until result is ready
System.out.printf("%d is prime: %b%n", numbersToCheck[i], isPrime);
}

executor.shutdown();
}

static boolean isPrime(int n) {
if (n < 2) return false;
for (int i = 2; i <= Math.sqrt(n); i++) {
if (n % i == 0) return false;
}
return true;
}
}

3. CompletableFuture — Async Pipelines

CompletableFuture (Java 8+) is a powerful tool for non-blocking async programming, similar to JavaScript's Promise.

import java.util.concurrent.CompletableFuture;

public class CompletableFutureBasic {
public static void main(String[] args) throws Exception {

// 1. supplyAsync: produce a value asynchronously
CompletableFuture<String> cf = CompletableFuture
.supplyAsync(() -> {
System.out.println("[Async] DB query start: " + Thread.currentThread().getName());
try { Thread.sleep(1000); } catch (InterruptedException e) {}
return "User-Alice";
})
// 2. thenApply: transform the result (like map)
.thenApply(user -> {
System.out.println("[Transform] Processing user data");
return "Hello, " + user + "!";
})
// 3. Chain another thenApply
.thenApply(greeting -> greeting + " Have a great day!");

System.out.println("Main thread: async task started, doing other work...");

// 4. get() blocks until the result is available
String result = cf.get();
System.out.println("Final result: " + result);
}
}

thenAccept, thenRun

CompletableFuture.supplyAsync(() -> "Email sent successfully")
.thenAccept(result -> System.out.println("Consume: " + result)) // no return value
.thenRun(() -> System.out.println("Post-processing done")); // no input or output

thenCombine — Combine Two Tasks

import java.util.concurrent.CompletableFuture;

public class CompletableFutureCombine {
public static void main(String[] args) throws Exception {
// Call two APIs in parallel and combine results
CompletableFuture<String> weatherCF = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(800); } catch (InterruptedException e) {}
return "Sunny, 23 C";
});

CompletableFuture<String> newsCF = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(600); } catch (InterruptedException e) {}
return "Java 25 released!";
});

// Combine once both complete
CompletableFuture<String> combined = weatherCF.thenCombine(newsCF,
(weather, news) -> "Weather: " + weather + " | News: " + news
);

long start = System.currentTimeMillis();
System.out.println(combined.get());
System.out.println("Elapsed: " + (System.currentTimeMillis() - start) + "ms");
// ~800ms (parallel, not 1400ms)
}
}

exceptionally — Exception Handling

CompletableFuture<String> cf = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("API call failed!");
}
return "Success data";
})
.exceptionally(ex -> {
System.out.println("Exception caught: " + ex.getMessage());
return "Default fallback value"; // return fallback on error
})
.thenApply(result -> "Processed: " + result);

System.out.println(cf.get());

allOf — Wait for Multiple Tasks

import java.util.concurrent.CompletableFuture;

public class CompletableFutureAllOf {
public static void main(String[] args) throws Exception {
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) {}
return "User info";
});
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(800); } catch (InterruptedException e) {}
return "Order list";
});
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(600); } catch (InterruptedException e) {}
return "Point balance";
});

long start = System.currentTimeMillis();

// Wait for all three tasks to complete
CompletableFuture.allOf(cf1, cf2, cf3).get();

System.out.printf("My-page data loaded (elapsed: %dms)%n",
System.currentTimeMillis() - start);
System.out.println(cf1.get() + " | " + cf2.get() + " | " + cf3.get());
// elapsed: ~1000ms (limited by the slowest task)
}
}

4. Atomic Classes — Lock-Free Atomic Operations

Guarantees atomic operations without synchronized. Uses CAS (Compare-And-Swap) hardware instructions for superior performance.

import java.util.concurrent.atomic.*;

public class AtomicExample {
// Thread-safe counter without synchronized
static AtomicInteger counter = new AtomicInteger(0);
static AtomicLong totalSum = new AtomicLong(0L);
static AtomicBoolean isRunning = new AtomicBoolean(true);

public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.incrementAndGet(); // ++counter (atomic)
totalSum.addAndGet(j); // totalSum += j
}
});
}

for (Thread t : threads) t.start();
for (Thread t : threads) t.join();

System.out.println("Counter result: " + counter.get()); // always 10000
System.out.println("Total sum: " + totalSum.get());

// compareAndSet: change only if current value matches expected
boolean updated = counter.compareAndSet(10000, 0); // if 10000, set to 0
System.out.println("Reset success: " + updated + ", current: " + counter.get());

isRunning.set(false); // atomic boolean assignment
}
}

5. ReentrantLock — Flexible Locking

Provides more functionality than synchronized:

import java.util.concurrent.locks.*;

public class ReentrantLockExample {
private final ReentrantLock lock = new ReentrantLock(true); // fair=true: FIFO order
private int count = 0;

public void increment() {
lock.lock(); // acquire lock
try {
count++;
} finally {
lock.unlock(); // always release in finally!
}
}

// tryLock: attempt to acquire without blocking
public boolean tryIncrement() {
if (lock.tryLock()) { // returns true if acquired
try {
count++;
return true;
} finally {
lock.unlock();
}
}
return false; // lock not available
}

// Condition variables (alternative to wait/notify)
private final Condition notEmpty = lock.newCondition();
private final java.util.Queue<Integer> queue = new java.util.LinkedList<>();

public void put(int item) throws InterruptedException {
lock.lock();
try {
queue.offer(item);
notEmpty.signalAll(); // wake consumers
} finally {
lock.unlock();
}
}

public int take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // wait, releasing the lock
}
return queue.poll();
} finally {
lock.unlock();
}
}
}

6. Thread-Safe Collections

import java.util.concurrent.*;
import java.util.*;

public class ConcurrentCollectionsExample {
public static void main(String[] args) throws InterruptedException {
// ConcurrentHashMap: thread-safe HashMap
// Uses segment-level locking — far faster than synchronized HashMap
ConcurrentHashMap<String, Integer> concurrentMap = new ConcurrentHashMap<>();

Thread[] writers = new Thread[5];
for (int i = 0; i < 5; i++) {
writers[i] = new Thread(() -> {
for (int j = 0; j < 100; j++) {
concurrentMap.merge("key-" + (j % 10), 1, Integer::sum);
}
});
}

for (Thread t : writers) t.start();
for (Thread t : writers) t.join();

System.out.println("Map size: " + concurrentMap.size()); // 10

// CopyOnWriteArrayList: best for frequent reads, rare writes
// On each write, the entire array is copied (expensive writes, lock-free reads)
CopyOnWriteArrayList<String> cowList = new CopyOnWriteArrayList<>();
cowList.add("Item1");
cowList.add("Item2");

// No ConcurrentModificationException even if another thread modifies during iteration
for (String item : cowList) {
System.out.println(item);
cowList.add("New item"); // added to a copy, iteration is unaffected
}

// BlockingQueue: optimized for producer-consumer patterns
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(10);

// put: blocks if full; take: blocks if empty
blockingQueue.put("Task1");
String task = blockingQueue.take();
System.out.println("Processing task: " + task);
}
}

7. CountDownLatch, CyclicBarrier, Semaphore

import java.util.concurrent.*;

public class SynchronizerExamples {

// CountDownLatch: proceed after N tasks complete
static void countDownLatchExample() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3); // 3-count latch

for (int i = 1; i <= 3; i++) {
final int id = i;
new Thread(() -> {
try {
Thread.sleep(id * 500L);
System.out.println("Service-" + id + " ready");
latch.countDown(); // decrement count
} catch (InterruptedException e) {}
}).start();
}

System.out.println("Waiting for all services...");
latch.await(); // block until count reaches 0
System.out.println("All services ready! Starting server");
}

// CyclicBarrier: all threads wait until everyone reaches the barrier (reusable)
static void cyclicBarrierExample() throws InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(3,
() -> System.out.println("=== All teams at checkpoint! Proceeding ===")
);

for (int i = 1; i <= 3; i++) {
final int teamId = i;
new Thread(() -> {
try {
Thread.sleep(teamId * 300L);
System.out.println("Team-" + teamId + " reached checkpoint");
barrier.await(); // wait for all others
System.out.println("Team-" + teamId + " proceeding");
} catch (Exception e) {}
}).start();
}
}

// Semaphore: limit concurrent access (connection pool, rate limiting)
static void semaphoreExample() throws InterruptedException {
Semaphore semaphore = new Semaphore(3); // max 3 concurrent accesses

for (int i = 1; i <= 10; i++) {
final int userId = i;
new Thread(() -> {
try {
semaphore.acquire(); // acquire permit (wait if all 3 in use)
System.out.println("User-" + userId + " connecting to DB (active: "
+ (3 - semaphore.availablePermits()) + "/3)");
Thread.sleep(1000);
System.out.println("User-" + userId + " disconnected");
} catch (InterruptedException e) {
} finally {
semaphore.release(); // always release
}
}).start();
}
}

public static void main(String[] args) throws InterruptedException {
System.out.println("=== CountDownLatch Example ===");
countDownLatchExample();
Thread.sleep(3000);

System.out.println("\n=== CyclicBarrier Example ===");
cyclicBarrierExample();
Thread.sleep(3000);

System.out.println("\n=== Semaphore Example ===");
semaphoreExample();
}
}

8. Practical Example: Parallel API Calls with CompletableFuture

Simulating a shopping mall "My Page" that loads multiple data sources in parallel:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class UserInfo {
String name;
String email;
UserInfo(String name, String email) { this.name = name; this.email = email; }
}

class OrderSummary {
int totalOrders;
int totalAmount;
OrderSummary(int orders, int amount) { this.totalOrders = orders; this.totalAmount = amount; }
}

class PointBalance {
int points;
PointBalance(int points) { this.points = points; }
}

class MyPageData {
UserInfo user;
OrderSummary orders;
PointBalance points;

MyPageData(UserInfo user, OrderSummary orders, PointBalance points) {
this.user = user;
this.orders = orders;
this.points = points;
}

@Override
public String toString() {
return String.format(
"=== My Page ===%n" +
"Name: %s (%s)%n" +
"Orders: %d / Total $%,d%n" +
"Points: %,d P",
user.name, user.email,
orders.totalOrders, orders.totalAmount,
points.points
);
}
}

public class ParallelApiCallExample {

static UserInfo fetchUserInfo(long userId) throws InterruptedException {
Thread.sleep(800); // simulate DB query
return new UserInfo("Alice", "alice@example.com");
}

static OrderSummary fetchOrderSummary(long userId) throws InterruptedException {
Thread.sleep(1200); // simulate order aggregation
return new OrderSummary(15, 380_000);
}

static PointBalance fetchPointBalance(long userId) throws InterruptedException {
Thread.sleep(500); // simulate point lookup
return new PointBalance(12_500);
}

public static void main(String[] args) throws Exception {
long userId = 12345L;
ExecutorService executor = Executors.newFixedThreadPool(3);

System.out.println("Loading My Page data...");
long startTime = System.currentTimeMillis();

// Call 3 APIs in parallel
CompletableFuture<UserInfo> userCF = CompletableFuture
.supplyAsync(() -> {
try { return fetchUserInfo(userId); }
catch (InterruptedException e) { throw new RuntimeException(e); }
}, executor);

CompletableFuture<OrderSummary> orderCF = CompletableFuture
.supplyAsync(() -> {
try { return fetchOrderSummary(userId); }
catch (InterruptedException e) { throw new RuntimeException(e); }
}, executor);

CompletableFuture<PointBalance> pointCF = CompletableFuture
.supplyAsync(() -> {
try { return fetchPointBalance(userId); }
catch (InterruptedException e) { throw new RuntimeException(e); }
}, executor);

// Combine all three results
CompletableFuture<MyPageData> myPageCF = userCF
.thenCombine(orderCF, (user, orders) -> new Object[]{user, orders})
.thenCombine(pointCF, (arr, points) ->
new MyPageData((UserInfo) arr[0], (OrderSummary) arr[1], points)
);

// Error handling
MyPageData result = myPageCF
.exceptionally(ex -> {
System.out.println("Data load failed: " + ex.getMessage());
return null;
})
.get();

long elapsed = System.currentTimeMillis() - startTime;

if (result != null) {
System.out.println(result);
System.out.printf("%nData loaded in %dms%n", elapsed);
// Sequential would be: 800 + 1200 + 500 = 2500ms
// Parallel: ~1200ms (limited by the slowest call)
}

executor.shutdown();
}
}
Pro Tips — Virtual Threads (Java 21+)

Traditional platform threads are heavyweight — creating tens of thousands of them is impractical. Virtual threads are lightweight threads managed by the JVM. You can create millions of them with no performance problem. For I/O-heavy backend servers, they deliver dramatic throughput improvements.

// Virtual thread executor (Java 21+)
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 100_000; i++) {
executor.submit(() -> {
// While waiting for I/O, other virtual threads use the CPU
Thread.sleep(1000);
});
}
} // try-with-resources auto-shuts down

Virtual threads use the same Thread API — no rewriting required. Just swap the executor and get massive concurrency.