Ch 13.3 현대적 동시성 (java.util.concurrent)
자바에서는 단순히 Thread 인스턴스를 직접 생성하여 관리하는 단계를 넘어, 더욱 효율적이고 현대적인 동시성 환경을 제공하는 도구들이 발전해왔습니다. 자바 5부터 추가된 java.util.concurrent 패키지가 그 핵심입니다.
1. ExecutorService - 쓰레드 풀 관리
쓰레드를 생성하고 소멸시키는 작업은 시스템 리소스를 크게 소모합니다. 쓰레드 풀(Thread Pool) 은 미리 쓰레드를 만들어 풀에 담아두고 재사용하는 방식입니다.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ExecutorServiceExample {
public static void main(String[] args) throws InterruptedException {
// 고정 크기 쓰레드 풀 (5개 쓰레드)
ExecutorService fixedPool = Executors.newFixedThreadPool(5);
// 작업 10개 제출 (쓰레드 5개가 번갈아 처리)
for (int i = 1; i <= 10; i++) {
final int taskId = i;
fixedPool.execute(() -> {
System.out.printf("[%s] 작업 %d 처리 중%n",
Thread.currentThread().getName(), taskId);
try { Thread.sleep(500); } catch (InterruptedException e) {}
});
}
fixedPool.shutdown(); // 더 이상 새 작업 받지 않음
fixedPool.awaitTermination(30, TimeUnit.SECONDS); // 완료 대기
System.out.println("모든 작업 완료");
}
}
쓰레드 풀 종류
// 1. 고정 크기 풀: CPU 바운드 작업에 적합 (코어 수와 동일하게 설정)
ExecutorService fixed = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
// 2. 단일 쓰레드 풀: 순서 보장이 필요한 작업
ExecutorService single = Executors.newSingleThreadExecutor();
// 3. 캐시 풀: I/O 바운드 작업, 동적으로 쓰레드 생성/제거
ExecutorService cached = Executors.newCachedThreadPool();
// 4. 스케줄 풀: 지연/반복 실행
import java.util.concurrent.ScheduledExecutorService;
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);
2. Future와 Callable - 결과값 반환
Runnable은 작업 결과를 반환할 수 없습니다. Callable과 Future를 사용하면 비동기 작업의 결과를 받을 수 있습니다.
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.List;
public class CallableFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(4);
// 소수 판별 Callable 작업
List<Future<Boolean>> futures = new ArrayList<>();
int[] numbersToCheck = {17, 100, 97, 49, 113};
for (int num : numbersToCheck) {
Future<Boolean> future = executor.submit(() -> {
Thread.sleep(200); // 복잡한 계산 시뮬레이션
return isPrime(num);
});
futures.add(future);
}
// 결과 수집
for (int i = 0; i < numbersToCheck.length; i++) {
boolean isPrime = futures.get(i).get(); // 완료될 때까지 대기
System.out.printf("%d는 소수: %b%n", numbersToCheck[i], isPrime);
}
executor.shutdown();
}
static boolean isPrime(int n) {
if (n < 2) return false;
for (int i = 2; i <= Math.sqrt(n); i++) {
if (n % i == 0) return false;
}
return true;
}
}
3. CompletableFuture - 비동기 파이프라인
CompletableFuture는 자바 8에서 도입된 강력한 비동기 프로그래밍 도구입니다. 자바스크립트의 Promise와 유사합니다.
import java.util.concurrent.CompletableFuture;
public class CompletableFutureBasic {
public static void main(String[] args) throws Exception {
// 1. supplyAsync: 비동기로 값 생산
CompletableFuture<String> cf = CompletableFuture
.supplyAsync(() -> {
System.out.println("[비동기] DB 조회 시작: " + Thread.currentThread().getName());
try { Thread.sleep(1000); } catch (InterruptedException e) {}
return "사용자-홍길동";
})
// 2. thenApply: 결과를 변환 (map과 유사)
.thenApply(user -> {
System.out.println("[변환] 사용자 데이터 가공");
return "안녕하세요, " + user + "님!";
})
// 3. thenApply 체이닝
.thenApply(greeting -> greeting + " 오늘도 좋은 하루 되세요.");
System.out.println("메인 쓰레드: 비동기 작업 시작됨, 다른 일 수행 중...");
// 4. get()으로 최종 결과 수신
String result = cf.get();
System.out.println("최종 결과: " + result);
}
}
thenAccept, thenRun
CompletableFuture.supplyAsync(() -> "이메일 전송 완료")
.thenAccept(result -> System.out.println("소비: " + result)) // 반환값 없음
.thenRun(() -> System.out.println("후처리 완료")); // 결과값도 없음
thenCombine - 두 작업 결합
import java.util.concurrent.CompletableFuture;
public class CompletableFutureCombine {
public static void main(String[] args) throws Exception {
// 두 API를 병렬 호출하여 결과 합치기
CompletableFuture<String> weatherCF = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(800); } catch (InterruptedException e) {}
return "맑음, 23°C";
});
CompletableFuture<String> newsCF = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(600); } catch (InterruptedException e) {}
return "Java 25 발표!";
});
// 두 결과가 모두 나오면 합치기
CompletableFuture<String> combined = weatherCF.thenCombine(newsCF,
(weather, news) -> "날씨: " + weather + " | 뉴스: " + news
);
long start = System.currentTimeMillis();
System.out.println(combined.get());
System.out.println("소요시간: " + (System.currentTimeMillis() - start) + "ms");
// 약 800ms (병렬이므로 1400ms가 아닌 800ms)
}
}
exceptionally - 예외 처리
CompletableFuture<String> cf = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("API 호출 실패!");
}
return "성공 데이터";
})
.exceptionally(ex -> {
System.out.println("예외 발생: " + ex.getMessage());
return "기본값으로 대체"; // 예외 시 대체값 반환
})
.thenApply(result -> "처리 완료: " + result);
System.out.println(cf.get());
allOf - 여러 작업 모두 완료 대기
import java.util.concurrent.CompletableFuture;
public class CompletableFutureAllOf {
public static void main(String[] args) throws Exception {
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) {}
return "유저 정보";
});
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(800); } catch (InterruptedException e) {}
return "주문 목록";
});
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(600); } catch (InterruptedException e) {}
return "포인트 잔액";
});
long start = System.currentTimeMillis();
// 세 작업이 모두 완료될 때까지 기다림
CompletableFuture.allOf(cf1, cf2, cf3).get();
System.out.printf("마이페이지 데이터 로드 완료 (소요: %dms)%n",
System.currentTimeMillis() - start);
System.out.println(cf1.get() + " | " + cf2.get() + " | " + cf3.get());
// 소요시간: ~1000ms (가장 오래 걸린 작업 기준)
}
}
4. Atomic 클래스 - Lock-Free 원자 연산
synchronized 없이 원자적 연산을 보장합니다. CAS(Compare-And-Swap) 하드웨어 연산을 사용해 성능이 뛰어납니다.
import java.util.concurrent.atomic.*;
public class AtomicExample {
// synchronized 없이 스레드 안전한 카운터
static AtomicInteger counter = new AtomicInteger(0);
static AtomicLong totalSum = new AtomicLong(0L);
static AtomicBoolean isRunning = new AtomicBoolean(true);
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.incrementAndGet(); // ++counter (원자적)
totalSum.addAndGet(j); // totalSum += j
}
});
}
for (Thread t : threads) t.start();
for (Thread t : threads) t.join();
System.out.println("카운터 결과: " + counter.get()); // 항상 10000
System.out.println("합계: " + totalSum.get());
// compareAndSet: 예상값과 같을 때만 변경
boolean updated = counter.compareAndSet(10000, 0); // 10000이면 0으로 변경
System.out.println("리셋 성공: " + updated + ", 현재값: " + counter.get());
isRunning.set(false); // 원자적 boolean 설정
}
}
5. ReentrantLock - 유연한 락
synchronized보다 더 많은 기능을 제공합니다.
import java.util.concurrent.locks.*;
public class ReentrantLockExample {
private final ReentrantLock lock = new ReentrantLock(true); // fair=true: 대기 순서 보장
private int count = 0;
public void increment() {
lock.lock(); // 락 획득
try {
count++;
} finally {
lock.unlock(); // 반드시 finally에서 해제!
}
}
// tryLock: 락 획득 시도 (대기 없이 즉시 반환)
public boolean tryIncrement() {
if (lock.tryLock()) { // 락 획득 성공 시 true
try {
count++;
return true;
} finally {
lock.unlock();
}
}
return false; // 락 획득 실패
}
// 조건 변수 활용 (wait/notify 대체)
private final Condition notEmpty = lock.newCondition();
private final java.util.Queue<Integer> queue = new java.util.LinkedList<>();
public void put(int item) throws InterruptedException {
lock.lock();
try {
queue.offer(item);
notEmpty.signalAll(); // 소비자 깨우기
} finally {
lock.unlock();
}
}
public int take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // 대기 (락 반납)
}
return queue.poll();
} finally {
lock.unlock();
}
}
}
6. 쓰레드 안전 컬렉션
import java.util.concurrent.*;
import java.util.*;
public class ConcurrentCollectionsExample {
public static void main(String[] args) throws InterruptedException {
// ConcurrentHashMap: HashMap의 스레드 안전 버전
// synchronized Map과 달리 버킷 단위 락으로 성능 우수
ConcurrentHashMap<String, Integer> concurrentMap = new ConcurrentHashMap<>();
Thread[] writers = new Thread[5];
for (int i = 0; i < 5; i++) {
final int id = i;
writers[i] = new Thread(() -> {
for (int j = 0; j < 100; j++) {
concurrentMap.merge("key-" + (j % 10), 1, Integer::sum);
}
});
}
for (Thread t : writers) t.start();
for (Thread t : writers) t.join();
System.out.println("맵 크기: " + concurrentMap.size()); // 10
// CopyOnWriteArrayList: 읽기 빈번, 쓰기 적은 경우
// 쓰기 시 전체 배열을 복사 (쓰기 비용 높지만 읽기는 락 불필요)
CopyOnWriteArrayList<String> cowList = new CopyOnWriteArrayList<>();
cowList.add("항목1");
cowList.add("항목2");
// 이터레이션 중 다른 쓰레드가 수정해도 ConcurrentModificationException 없음
for (String item : cowList) {
System.out.println(item);
cowList.add("새항목"); // 복사본에 추가됨
}
// BlockingQueue: 생산자-소비자 패턴에 최적화
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(10);
// put: 꽉 차면 대기, take: 비어있으면 대기
blockingQueue.put("작업1");
String task = blockingQueue.take();
System.out.println("처리할 작업: " + task);
}
}
7. CountDownLatch, CyclicBarrier, Semaphore
import java.util.concurrent.*;
public class SynchronizerExamples {
// CountDownLatch: N개 작업 완료 후 다음 단계 진행
static void countDownLatchExample() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3); // 3개 카운트
for (int i = 1; i <= 3; i++) {
final int id = i;
new Thread(() -> {
try {
Thread.sleep(id * 500L);
System.out.println("서비스-" + id + " 준비 완료");
latch.countDown(); // 카운트 감소
} catch (InterruptedException e) {}
}).start();
}
System.out.println("모든 서비스 준비 대기 중...");
latch.await(); // 카운트가 0이 될 때까지 대기
System.out.println("모든 서비스 준비 완료! 서버 시작");
}
// CyclicBarrier: 모든 쓰레드가 특정 지점에 도달할 때까지 대기 (재사용 가능)
static void cyclicBarrierExample() throws InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(3,
() -> System.out.println("=== 모든 팀 집결! 다음 단계 시작 ===")
);
for (int i = 1; i <= 3; i++) {
final int teamId = i;
new Thread(() -> {
try {
Thread.sleep(teamId * 300L);
System.out.println("팀-" + teamId + " 체크포인트 도달");
barrier.await(); // 다른 쓰레드들 기다림
System.out.println("팀-" + teamId + " 다음 단계 진행");
} catch (Exception e) {}
}).start();
}
}
// Semaphore: 동시 접근 수 제한 (DB 커넥션 풀, API Rate Limiting)
static void semaphoreExample() throws InterruptedException {
Semaphore semaphore = new Semaphore(3); // 동시에 3개만 허용
for (int i = 1; i <= 10; i++) {
final int userId = i;
new Thread(() -> {
try {
semaphore.acquire(); // 허가 획득 (3개 초과 시 대기)
System.out.println("사용자-" + userId + " DB 접속 중 (현재 접속: "
+ (3 - semaphore.availablePermits()) + "/3)");
Thread.sleep(1000);
System.out.println("사용자-" + userId + " DB 연결 해제");
} catch (InterruptedException e) {
} finally {
semaphore.release(); // 반드시 해제
}
}).start();
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println("=== CountDownLatch 예제 ===");
countDownLatchExample();
Thread.sleep(3000);
System.out.println("\n=== CyclicBarrier 예제 ===");
cyclicBarrierExample();
Thread.sleep(3000);
System.out.println("\n=== Semaphore 예제 ===");
semaphoreExample();
}
}
8. 실전 예제: CompletableFuture로 여러 API 병렬 호출
쇼핑몰 마이페이지에서 여러 데이터를 병렬로 조회하는 시뮬레이션입니다.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
// DTO 역할 클래스들
class UserInfo {
String name;
String email;
UserInfo(String name, String email) { this.name = name; this.email = email; }
}
class OrderSummary {
int totalOrders;
int totalAmount;
OrderSummary(int orders, int amount) { this.totalOrders = orders; this.totalAmount = amount; }
}
class PointBalance {
int points;
PointBalance(int points) { this.points = points; }
}
class MyPageData {
UserInfo user;
OrderSummary orders;
PointBalance points;
MyPageData(UserInfo user, OrderSummary orders, PointBalance points) {
this.user = user;
this.orders = orders;
this.points = points;
}
@Override
public String toString() {
return String.format(
"=== 마이페이지 ===%n" +
"이름: %s (%s)%n" +
"주문: %d건 / 총 %,d원%n" +
"포인트: %,d P",
user.name, user.email,
orders.totalOrders, orders.totalAmount,
points.points
);
}
}
public class ParallelApiCallExample {
// API 서비스 (실제로는 HTTP 호출, 여기서는 지연 시뮬레이션)
static UserInfo fetchUserInfo(long userId) throws InterruptedException {
Thread.sleep(800); // DB 조회 시뮬레이션
return new UserInfo("홍길동", "hong@example.com");
}
static OrderSummary fetchOrderSummary(long userId) throws InterruptedException {
Thread.sleep(1200); // 주문 집계 시뮬레이션
return new OrderSummary(15, 380_000);
}
static PointBalance fetchPointBalance(long userId) throws InterruptedException {
Thread.sleep(500); // 포인트 조회 시뮬레이션
return new PointBalance(12_500);
}
public static void main(String[] args) throws Exception {
long userId = 12345L;
ExecutorService executor = Executors.newFixedThreadPool(3);
System.out.println("마이페이지 데이터 로드 시작...");
long startTime = System.currentTimeMillis();
// 3개 API 병렬 호출
CompletableFuture<UserInfo> userCF = CompletableFuture
.supplyAsync(() -> {
try { return fetchUserInfo(userId); }
catch (InterruptedException e) { throw new RuntimeException(e); }
}, executor);
CompletableFuture<OrderSummary> orderCF = CompletableFuture
.supplyAsync(() -> {
try { return fetchOrderSummary(userId); }
catch (InterruptedException e) { throw new RuntimeException(e); }
}, executor);
CompletableFuture<PointBalance> pointCF = CompletableFuture
.supplyAsync(() -> {
try { return fetchPointBalance(userId); }
catch (InterruptedException e) { throw new RuntimeException(e); }
}, executor);
// 세 결과를 합쳐서 MyPageData 생성
CompletableFuture<MyPageData> myPageCF = userCF
.thenCombine(orderCF, (user, orders) -> new Object[]{user, orders})
.thenCombine(pointCF, (arr, points) ->
new MyPageData((UserInfo) arr[0], (OrderSummary) arr[1], points)
);
// 예외 처리
MyPageData result = myPageCF
.exceptionally(ex -> {
System.out.println("데이터 로드 실패: " + ex.getMessage());
return null;
})
.get();
long elapsed = System.currentTimeMillis() - startTime;
if (result != null) {
System.out.println(result);
System.out.printf("%n데이터 로드 완료 (소요: %dms)%n", elapsed);
// 순차 처리였다면: 800 + 1200 + 500 = 2500ms
// 병렬 처리: ~1200ms (가장 오래 걸린 작업 기준)
}
executor.shutdown();
}
}
팁
고수 팁 - 가상 쓰레드 (Virtual Threads, Java 21+)
전통적인 플랫폼 쓰레드는 무거워서 수만 개 생성이 어렵지만, 가상 쓰레드는 JVM이 관리하는 경량 쓰레드로 수백만 개를 동시에 생성해도 문제없습니다. I/O 대기가 많은 백엔드 서버에서 혁신적인 성능 향상을 가져옵니다.
// 가상 쓰레드 풀 사용
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 100_000; i++) {
executor.submit(() -> {
// I/O 대기 중 다른 가상 쓰레드가 CPU 사용
Thread.sleep(1000);
});
}
} // try-with-resources로 자동 shutdown