Java's Stream API, introduced back in Java 8, fundamentally changed how you process collections. By enabling expressive, functional-style operations like map(), filter(), and reduce(), it helped you write more concise and readable code. However, you may have encountered situations where the fixed set of built-in operations just wasn't enough for complex data transformations.
Java 24 introduces a game-changing feature to address this: Stream Gatherers. This addition allows you to define your own custom intermediate operations, unlocking new possibilities for your stream pipelines.
What are Stream Gatherers?
Think of a standard stream pipeline as an assembly line with fixed stations: one for filtering, one for mapping, and so on. A gatherer is like a custom, programmable station you can insert anywhere in the middle of that line.
Unlike a Collector, which is a terminal operation that concludes the stream, a Gatherer is an intermediate operation. This means it can transform the stream's elements mid-flow before passing them downstream to the next operation. This distinction is crucial.
With Gatherers, you can implement sophisticated, stateful operations. They are flexible, allowing for different kinds of data transformations:
one-to-one: Transforming each element individually, similar to the familiar
map()operation.many-to-one: Reducing many elements into a single result, like summing a list of numbers.
one-to-many: Splitting a single element into several new ones, like breaking a sentence into words.
many-to-many: Processing multiple input elements to produce multiple output elements, where the relationship isn't one-to-one. For example, consuming 6 elements to produce 2 lists.
For instance, you can:
Group elements into fixed-size chunks.
Create sliding or overlapping windows of data.
Calculate running totals or other cumulative values.
Even stop (short-circuit) the stream early based on a specific condition.
This flexibility makes it much easier to tackle complex data processing tasks that were previously cumbersome or inefficient with the standard Stream API.
Built-in gatherers
Java 24 provides a utility class, java.util.stream.Gatherers, packed with several pre-built, common gatherers. Let's explore some of them.
windowFixed(int windowSize): ThewindowFixed()gatherer is a many-to-many operation that batches stream elements into non-overlapping lists of a fixed size. This is useful when you need to process data in batches (chunks).It takes one parameter:
windowSize: Anintthat specifies the size of each batch.
Imagine you have a stream of log entries and you want to process them in batches of three:
import java.util.List; import java.util.stream.Gatherers; import java.time.LocalDateTime; public class FixedWindowExample { public static void main(String[] args) { List<LogEntry> logs = List.of( new LogEntry(LocalDateTime.now(), "INFO", "User logged in"), new LogEntry(LocalDateTime.now(), "INFO", "Viewed dashboard"), new LogEntry(LocalDateTime.now(), "WARN", "API response slow"), new LogEntry(LocalDateTime.now(), "INFO", "Updated profile"), new LogEntry(LocalDateTime.now(), "ERROR", "Database connection failed"), new LogEntry(LocalDateTime.now(), "INFO", "User logged out") ); System.out.println("Processing logs in fixed batches of 3:"); logs.stream() .gather(Gatherers.windowFixed(3)) .forEach(batch -> { System.out.println("--- New Batch ---"); batch.forEach(log -> System.out.println(" " + log.message())); }); } private record LogEntry(LocalDateTime timestamp, String level, String message) {} }This code takes a list of logs and uses
windowFixed(3)to group them into lists, each containing threeLogEntryobjects.This will output:
Processing logs in fixed batches of 3: --- New Batch --- User logged in Viewed dashboard API response slow --- New Batch --- Updated profile Database connection failed User logged outwindowSliding(int windowSize): ThewindowSliding()gatherer is a many-to-many operation that creates overlapping windows of elements of a fixed size. Meaning, this gatherer takes elements from your stream and groups them into sliding, overlapping lists. Each list has exactlywindowSizeelements.Crucially, this gatherer does not emit partial windows at the start of the stream; it waits until it has enough elements to form the first full window. Meaning, it won’t give you any results until it has collected enough elements to fill the first full window. After that, each new element slides the window forward, dropping the oldest element and adding the newest. For example,
windowSliding(3)will not emit its first list until it has processed three elements.This is useful for tasks like calculating an average over a sliding window or identifying patterns in complete, sequential blocks of data.
It takes one parameter:
windowSize: Anintspecifying the maximum size of the sliding window.
Let's say you have a stream of server response times. And you want to first group them into 3-element sliding windows, and then calculate the average of each of those windows:
import java.util.List; import java.util.stream.Gatherers; public class SlidingWindowExample { public static void main(String[] args) { List<Integer> responseTimes = List.of(120, 135, 131, 145, 150, 142, 130); // --- Step 1: Create the windows and show the intermediate result --- System.out.println("Step 1: The windows created by Gatherers.windowSliding(3)"); List<List<Integer>> windows = responseTimes.stream() .gather(Gatherers.windowSliding(3)) .toList(); windows.forEach(System.out::println); // --- Step 2: Process the list of windows to get the averages --- System.out.println("\nStep 2: The calculated average of each window"); windows.stream() .map(window -> window.stream().mapToInt(Integer::intValue).average().orElse(0.0)) .forEach(avg -> System.out.printf("Average: %.2f ms%n", avg)); } }Here,
windowSliding(3)starts emitting lists only after the third element (131) is processed, creating the first window[120, 135, 131].This will output:
Step 1: The windows created by Gatherers.windowSliding(3) [120, 135, 131] [135, 131, 145] [131, 145, 150] [145, 150, 142] [150, 142, 130] Step 2: The calculated average of each window Average: 128.67 ms Average: 137.00 ms Average: 142.00 ms Average: 145.67 ms Average: 140.67 msNote: The output stream of averages is shorter than the input stream of response times. This is because
windowSliding()only produces an output once the first full window is available. To calculate a "true moving average" that has a value for every input element (including the partial windows at the start), you would need to implement a custom gatherer.scan(Supplier initial, BiFunction scanner): Thescan()gatherer is a one-to-one operation that performs a cumulative operation, producing a stream of intermediate results. This is also known as a prefix scan. It's useful when you want to see how an aggregation builds up over time.It takes two parameters:
initial: ASupplierthat provides the starting value for the accumulation.scanner: ABiFunctionthat takes the current accumulated result and the next stream element, and returns the new accumulated result.
Let's use
scan()to create a running summary of log messages:import java.util.stream.Stream; import java.util.stream.Gatherers; public class ScanExample { public static void main(String[] args) { Stream<String> messages = Stream.of("Login", "View", "Update", "Logout"); System.out.println("Running user action summary:"); messages.gather(Gatherers.scan( () -> "Actions: ", // Initial value (summary, action) -> summary + action + " -> " // Accumulator )) .forEach(System.out::println); } }What the code does:
You have a stream of strings:
"Login","View","Update","Logout".You use
Gatherers.scan()to build up a running summary of these actions as you go through the stream.After each new action, it shows the “current progress” of the summary.
scan(Supplier initial, BiFunction scanner)works like this:Initial value:
() -> "Actions: "Before anything starts, the summary string starts as
"Actions: ".
Accumulator function:
(summary, action) -> summary + action + " -> "Each time a new action comes from the stream, it appends it to the summary with
" -> "at the end.Example: if summary =
"Actions: Login -> ", and the next action is"View", then new summary ="Actions: Login -> View -> ".
This will output:
Running user action summary: Actions: Login -> Actions: Login -> View -> Actions: Login -> View -> Update -> Actions: Login -> View -> Update -> Logout ->fold(Supplier initial, BiFunction folder): Thefold()gatherer is a many-to-one operation that reduces the entire stream to a single, final result. Unlikescan()gatherer, it only outputs the final value after all stream elements have been processed.It is conceptually similar to the
reduce()terminal operation, but it functions as an intermediate operation. This means it produces a single-element stream that can be passed on to other downstream operations.It takes two parameters:
initial: ASupplierthat provides the initial value for the aggregation.folder: ABiFunctionthat combines the current aggregated result with the next stream element.
Now, let's use
fold()to join several words from a stream into a single final string:import java.util.stream.Stream; import java.util.stream.Gatherers; public class FoldExample { public static void main(String[] args) { Stream.of("Java", "Stream", "Gatherers") .gather(Gatherers.fold( StringBuilder::new, (sb, word) -> sb.append(word).append(" ") )) .forEach(System.out::println); } }Notice how the
fold()operation processes all the words from the stream but only emits a single, final result at the end.This will output:
Java Stream GatherersmapConcurrent(int maxConcurrency, Function mapper): ThemapConcurrent()gatherer is a one-to-one operation that lets you apply a function to stream elements in parallel, potentially speeding up processing for slow operations (like I/O-bound or computationally intensive tasks). In this method, you specify the maximum level of concurrency.It takes two parameters:
maxConcurrency: Anintthat sets the maximum number of tasks to run in parallel.mapper: AFunctionto apply to each stream element.
Imagine you need to enrich each log entry by calling a slow external service.
mapConcurrent()can help you do this much faster.import java.util.List; import java.util.stream.Gatherers; import java.time.LocalDateTime; public class MapConcurrentExample { // Simulate a slow network call private static String enrichLog(LogEntry log) { try { Thread.sleep(500); // Simulate 500ms delay } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "Enriched: " + log.message(); } public static void main(String[] args) { List<LogEntry> logs = List.of( new LogEntry(LocalDateTime.now(), "INFO", "User logged in"), new LogEntry(LocalDateTime.now(), "INFO", "Viewed dashboard"), new LogEntry(LocalDateTime.now(), "WARN", "API response slow"), new LogEntry(LocalDateTime.now(), "INFO", "Updated profile"), new LogEntry(LocalDateTime.now(), "ERROR", "Database connection failed"), new LogEntry(LocalDateTime.now(), "INFO", "User logged out") ); long startTime = System.currentTimeMillis(); System.out.println("Enriching logs concurrently (max 4 parallel tasks)..."); logs.stream() .gather(Gatherers.mapConcurrent(4, MapConcurrentExample::enrichLog)) .forEach(System.out::println); long duration = System.currentTimeMillis() - startTime; System.out.println("Total time: " + duration + " ms"); } private record LogEntry(LocalDateTime timestamp, String level, String message) {} }By setting the concurrency to 4, you can process up to four
enrichLog()calls simultaneously, significantly reducing the total execution time compared to a sequentialmap()operation.
Implementing a custom gatherer
While the built-in gatherers are useful, the true potential of this feature lies in creating your own. Implementing a custom Gatherer lets you define highly specific logic for your stream pipelines.
To build a custom gatherer, you first need to understand the four fundamental components that define the behavior of any gatherer. Think of them as the building blocks or instructions for a specialized worker on an assembly line (stream). When you create a custom gatherer, you define the worker's entire workflow by providing your own versions of these four key instructions (functions):
initializer()(Optional): This is the setup phase. It creates and returns the initial state object that the gatherer will use. In our assembly line analogy, this is like the worker grabbing an "empty box" before starting their work. This "box" can be anything the operation needs: an emptyListorMap, a simple counter, or even a custom object.integrator()(Required): This is the main processing loop. For each item that comes down the line, this instruction tells the worker what to do: update the state (the "box") and decide whether to send a result downstream. This is unlike a collector, which only produces a single final result, the integrator can emit intermediate results at any time. For example, after each element or after a group of elements. This flexibility is what allows for many-to-many behavior, such as running totals, sliding windows, or chunked outputs.combiner()(Optional): This handles teamwork for parallel streams. If two workers are processing items at the same time, this instruction tells a supervisor how to merge their partially filled boxes into a single, consistent one.finisher()(Optional): This is the cleanup step. When the last item has passed, this instruction tells the worker to send their final, partially filled box downstream so no work is left behind.
In our analogy, the worker's private "workspace" or "box" is technically called the state object. This is where the gatherer keeps track of information between elements.
You can implement the Gatherer interface to define custom logic tailored to your specific needs by supplying up to four functions that determine its behavior. The Gatherer<T, A, R> interface uses three generic types to define this entire operation:
T(Input Type): The type of items coming down the assembly line (e.g.,LogEntry).A(State Type): The type of the worker's box or workspace.R(Result Type): The type of the finished boxes being sent downstream (e.g., aList<LogEntry>).
Now, let's build a custom gatherer to solve a classic problem that's notoriously tricky with the traditional Stream API: grouping consecutive elements in a stream that share the same property. For example, bundling consecutive INFO logs together, then consecutive WARN logs, and so on.
Let's call our gatherer groupConsecutiveBy(). This logic is inherently sequential because it must compare each element to the one immediately before it.
Instead of implementing the Gatherer interface directly to create a custom gatherer, it's often more convenient to use one of the factory methods in the Gatherer interface. The Gatherer interface provides two main factories for this:
Gatherer.of(): For creating gatherers that are potentially parallelizable.Gatherer.ofSequential(): For creating sequential-only gatherers.
Because our groupConsecutiveBy operation is sequential (meaning they depend on the order of elements and cannot be parallelized), Gatherer.ofSequential() is the appropriate tool for the job:
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Gatherer;
import java.util.stream.Stream;
import java.time.LocalDateTime;
public class CustomGathererExample {
public static <T, K> Gatherer<T, ?, List<T>> groupConsecutiveBy(Function<? super T, ? extends K> classifier) {
// Private class to hold the gatherer's state
class State {
List<T> currentGroup = null;
K currentKey = null;
}
// Using `Gatherer.ofSequential()` for a gatherer that is not parallelizable
return Gatherer.ofSequential(
State::new, // Initializer: creates a new State object
(state, element, downstream) -> { // Integrator: processes each element
K key = classifier.apply(element);
if (state.currentGroup == null) {
state.currentGroup = new ArrayList<>();
state.currentKey = key;
}
if (!state.currentKey.equals(key)) {
downstream.push(List.copyOf(state.currentGroup)); // Push the completed group
state.currentGroup.clear(); // Start a new group
state.currentKey = key;
}
state.currentGroup.add(element);
return true; // Continue processing
},
(state, downstream) -> { // Finisher: handles the very last group
if (state.currentGroup != null && !state.currentGroup.isEmpty()) {
downstream.push(List.copyOf(state.currentGroup)); // Push the final group
}
}
);
}
public static void main(String[] args) {
Stream<LogEntry> logs = Stream.of(
new LogEntry(LocalDateTime.now(), "INFO", "User logged in"),
new LogEntry(LocalDateTime.now(), "INFO", "Viewed dashboard"),
new LogEntry(LocalDateTime.now(), "WARN", "API response slow"),
new LogEntry(LocalDateTime.now(), "WARN", "Deprecated method used"),
new LogEntry(LocalDateTime.now(), "WARN", "High memory usage"),
new LogEntry(LocalDateTime.now(), "INFO", "User logged out"),
new LogEntry(LocalDateTime.now(), "ERROR", "DB connection failed")
);
System.out.println("Grouping consecutive logs by level:");
logs.gather(groupConsecutiveBy(LogEntry::level))
.forEach(System.out::println);
}
private record LogEntry(LocalDateTime timestamp, String level, String message) {}
}The integrator checks if the current log's level is different from the previous one. If it is, it pushes the completed group downstream and starts a new one. The finisher ensures that the very last group is also pushed.
This will output:
Grouping consecutive logs by level:
[LogEntry[..., level=INFO, message=User logged in], LogEntry[..., level=INFO, message=Viewed dashboard]]
[LogEntry[..., level=WARN, message=API response slow], LogEntry[..., level=WARN, message=Deprecated method used], LogEntry[..., level=WARN, message=High memory usage]]
[LogEntry[..., level=INFO, message=User logged out]]
[LogEntry[..., level=ERROR, message=DB connection failed]]Composing gatherers
You can compose two or more gatherers into a single one to perform multi-stage transformations. Think of it like LEGO bricks. Instead of building one giant, complex custom gatherer, you can snap two smaller, standard gatherers together to create a new one with combined functionality.
This is done using the Gatherer.andThen(Gatherer anotherGatherer) method. The key rule to remember is that the following two statements are equivalent:
// Calling gather() sequentially
stream.gather(gathererA).gather(gathererB);
// Composing gatherers with andThen()
stream.gather(gathererA.andThen(gathererB));Using andThen() is a way to fuse a sequence of gathering operations into a single, reusable Gatherer object.
Let's create a composed gatherer that performs two steps:
Windowing: First, it groups log entries into batches of 3.
Summarizing: Second, it takes the stream of batches and creates a single summary string.
The example below first applies the windowingGatherer() and then immediately applies the summarizingGatherer() to its output, all within a single gather() operation. It also shows the equivalent sequential gather() calls to prove they produce the same result.
import java.util.List;
import java.util.stream.Gatherer;
import java.util.stream.Gatherers;
import java.util.stream.Stream;
import java.time.LocalDateTime;
public class ComposingGatherersExample {
public static void main(String[] args) {
List<LogEntry> logs = List.of(
new LogEntry(LocalDateTime.now(), "INFO", "User logged in"),
new LogEntry(LocalDateTime.now(), "INFO", "Viewed dashboard"),
new LogEntry(LocalDateTime.now(), "WARN", "API response slow"),
new LogEntry(LocalDateTime.now(), "INFO", "Updated profile"),
new LogEntry(LocalDateTime.now(), "ERROR", "DB connection failed"),
new LogEntry(LocalDateTime.now(), "INFO", "User logged out"),
new LogEntry(LocalDateTime.now(), "INFO", "Final entry")
);
// Gatherer 1: Groups logs into windows of 3
Gatherer<LogEntry, ?, List<LogEntry>> windowingGatherer =
Gatherers.windowFixed(3);
// Gatherer 2: Takes a stream of lists and creates a summary string from their sizes
Gatherer<List<LogEntry>, ?, String> summarizingGatherer =
Gatherers.fold(() -> "Batch sizes: ",
(summary, window) -> summary + window.size() + ";");
// Use the composed gatherer
String summary = logs.stream()
.gather(windowingGatherer.andThen(summarizingGatherer))
.findFirst()
.orElse("");
System.out.println("Summary from composed gatherer:");
System.out.println(summary);
// The result is identical to calling gather() sequentially
String sequentialSummary = logs.stream()
.gather(windowingGatherer) // First gather produces Stream<List<LogEntry>>
.gather(summarizingGatherer) // Second gather consumes that stream
.findFirst()
.orElse("");
System.out.println("\nSummary from sequential gather calls:");
System.out.println(sequentialSummary);
}
private record LogEntry(LocalDateTime timestamp, String level, String message) {}
}This will output:
Summary from composed gatherer:
Batch sizes: 3; 3; 1;
Summary from sequential gather calls:
Batch sizes: 3; 3; 1; As you can see, andThen() provides a clean way to encapsulate a multi-stage gathering process into a single unit of logic.
Conclusion
Stream Gatherers are a significant enhancement to the Java Stream API, providing a standardized, clean way to implement complex, stateful intermediate operations that were previously difficult or impossible to express within a single pipeline.
Below is an overview of their key characteristics:
Intermediate operations: Gatherers operate mid-stream to transform elements before passing them downstream, unlike
Collectors, which are terminal.Stateful processing: They can maintain state across elements, enabling complex logic like
windowFixed(),windowSliding(), and grouping consecutive items.Fully customizable: You can implement the
Gathererinterface to define custom logic tailored to your specific needs, solving problems not covered by standard operations.Built-in implementations: Java 24 provides the
java.util.stream.Gatherersutility class with ready-to-use solutions for common patterns.Concurrency support: You can also design gatherers for parallel streams, to boost performance. You can do that by either implementing a
combiner()to make your custom gatherers parallel-ready, or by leveraging built-in concurrent operations likemapConcurrent()for common concurrent tasks.
Because of these properties, Stream Gatherers are the ideal tool when you face stateful stream processing challenges that are awkward or inefficient to solve with standard operations like map() and filter(). Before Gatherers, these problems often required complex custom collectors or breaking the stream into multiple, less-readable steps. Gatherers provide a much cleaner, more integrated solution within the stream pipeline itself.
For a deeper dive, check out the official JEP 485 and the Java 24 documentation. Happy gathering!