Skip to main content
Advertisement

14.5 Parallel Streams and Primitive Streams

1. Parallel Streams

parallelStream() processes stream operations in parallel using multiple CPU cores via the ForkJoinPool.

List<Integer> numbers = IntStream.rangeClosed(1, 1_000_000)
.boxed().collect(Collectors.toList());

// Sequential
long sum1 = numbers.stream().mapToLong(Integer::longValue).sum();

// Parallel
long sum2 = numbers.parallelStream().mapToLong(Integer::longValue).sum();

When to Use Parallel Streams

Good FitBad Fit
Very large data setsSmall data sets
CPU-intensive operationsSimple/fast operations
Independent operations (order doesn't matter)Order-sensitive operations
Splittable structures (ArrayList)Hard-to-split structures (LinkedList)

Warning: No Shared Mutable State

// ❌ Dangerous! Shared state modification
List<Integer> shared = new ArrayList<>();
IntStream.range(0, 1000).parallel().forEach(shared::add); // ConcurrentModificationException!

// ✅ Safe with collect
List<Integer> safe = IntStream.range(0, 1000).parallel()
.boxed().collect(Collectors.toList());

2. Primitive Streams

IntStream, LongStream, DoubleStream work directly with primitive types, avoiding boxing overhead.

IntStream

// Creation
IntStream.of(1, 2, 3, 4, 5);
IntStream.range(1, 6); // 1,2,3,4,5 (end exclusive)
IntStream.rangeClosed(1, 5); // 1,2,3,4,5 (end inclusive)
"Hello".chars(); // int codes of each character

// Statistics
int sum = IntStream.rangeClosed(1, 100).sum(); // 5050
double avg = IntStream.rangeClosed(1, 100).average().orElse(0); // 50.5
int max = IntStream.of(3, 1, 4, 1, 5, 9).max().orElse(0); // 9

IntSummaryStatistics stats = IntStream.rangeClosed(1, 100).summaryStatistics();
System.out.println(stats.getSum()); // 5050
System.out.println(stats.getAverage()); // 50.5

// To array
int[] arr = IntStream.rangeClosed(1, 5).toArray(); // [1, 2, 3, 4, 5]

// Box to Stream<Integer>
List<Integer> list = IntStream.rangeClosed(1, 5).boxed().collect(Collectors.toList());

LongStream and DoubleStream

// Large numbers with LongStream
long factorial = LongStream.rangeClosed(1, 20).reduce(1L, (a, b) -> a * b);

// Double statistics
double[] prices = {100.0, 200.0, 300.0, 150.0};
DoubleSummaryStatistics priceStats = DoubleStream.of(prices).summaryStatistics();
System.out.printf("Average: %.2f%n", priceStats.getAverage());

mapToInt, mapToLong, mapToDouble

record Product(String name, int price, int quantity) {}

List<Product> products = List.of(
new Product("Apple", 1000, 50),
new Product("Banana", 800, 30),
new Product("Berry", 2000, 20)
);

int totalValue = products.stream()
.mapToInt(p -> p.price() * p.quantity())
.sum();
System.out.println("Total inventory value: " + totalValue);

OptionalDouble avgPrice = products.stream()
.mapToDouble(Product::price)
.average();

3. Advanced Stream Creation

// Stream.generate - infinite stream (always use limit!)
Stream<Double> randoms = Stream.generate(Math::random).limit(5);
Stream<String> hellos = Stream.generate(() -> "hello").limit(3);

// Stream.iterate - initial value + function
Stream<Integer> evens = Stream.iterate(0, n -> n + 2).limit(10); // 0,2,4,...,18

// With termination condition (Java 9+)
Stream<Integer> under100 = Stream.iterate(1, n -> n < 100, n -> n * 2); // 1,2,4,...,64

// Stream.concat
Stream<String> combined = Stream.concat(Stream.of("A","B"), Stream.of("C","D"));

// Stream.builder
Stream.Builder<String> builder = Stream.builder();
builder.add("A"); builder.add("B");
Stream<String> built = builder.build();

Pro Tip

Parallel stream real-world guide:

  1. Measure first: Parallel streams aren't always faster. For small data or simple operations, they're often slower due to overhead. Benchmark with System.nanoTime() before applying.

  2. ForkJoinPool size: By default uses availableProcessors - 1 threads. In a Spring web server, excessive parallelStream() can exhaust the shared thread pool.

  3. Prefer primitive streams: Replace Stream<Integer> with IntStream where possible. The difference in numerical aggregation is significant.

// Slow: boxing overhead
long sum1 = Stream.iterate(1, n -> n + 1).limit(1_000_000)
.mapToLong(Integer::longValue).sum();

// Fast: primitive stream from the start
long sum2 = LongStream.rangeClosed(1, 1_000_000).sum();
Advertisement