In this chapter, you’ll learn how to adopt the Streams API. First, you’ll gain an understanding behind the motivation for the Streams API, and then you’ll learn exactly what a stream is and what it’s used for. Next, you’ll learn about various operations and data processing patterns using the Streams API, and about Collectors, which let you write more sophisticated queries. You’ll then look at a practical refactoring example. Finally, you’ll learn about parallel streams.
The Collections API is one of the most important parts of the Java API. Nearly every Java application makes and processes collections. But despite its importance, the processing of collections in Java is still unsatisfactory in many aspects.
For one reason, many alternative programming languages or libraries let you express typical data processing patterns in a declarative way. Think of SQL, where you can select from a table, filter values given a condition, and also group elements in some form. There’s no need to detail how to implement the query—the database figures it out for you. The benefit is that your code is easier to understand. Unfortunately, in Java you don’t get this. You have to implement the low-level details of a data processing query using control flow constructs.
Second, how can you process really large collections efficiently? Ideally, to speed up the processing, you want to leverage multicore architectures. However, writing parallel code is hard and error-prone.
The Streams API addresses both these issues. It introduces a new abstraction called Stream
that lets you process data in a declarative way. Furthermore, streams can leverage multicore architectures without you having to deal with low-level constructs such as threads, locks, conditional variables, and volatiles, etc.
For example, say you need to filter a list of invoices to find those related to a specific customer, sort them by amount of the invoice, and then extract their IDs. Using the Streams API, you can express this simply with the following query:
List
<
Integer
>
ids
=
invoices
.
stream
()
.
filter
(
inv
->
inv
.
getCustomer
()
==
Customer
.
ORACLE
)
.
sorted
(
comparingDouble
(
Invoice:
:
getAmount
))
.
map
(
Invoice:
:
getId
)
.
collect
(
Collectors
.
toList
());
You’ll see how this code works in more detail later in this chapter.
So what is a stream
? Informally, you can think of it as a “fancy iterator” that supports database-like operations. Technically, it’s a sequence of elements from a source that supports aggregate operations. Here’s a breakdown of the more formal definition:
A stream provides an interface to a sequenced set of values of a specific element type. However, streams don’t actually store elements; they’re computed on demand.
Streams consume from a data-providing source such as collections, arrays, or I/O resources.
Streams support database-like operations and common operations from functional programming languages, such as filter
, map
, reduce
, findFirst
, allMatch
, sorted
, and so on.
Furthermore, stream operations have two additional fundamental characteristics that differentiate them from collections:
Many stream operations return a stream themselves. This allows operations to be chained to form a larger pipeline. This style enables certain optimizations such as laziness, short-circuiting, and loop fusion.
In contrast to collections, which are iterated explicitly (external iteration), stream operations do the iteration behind the scenes for you.
The Stream
interface in java.util.stream.Stream
defines many operations, which can be grouped into two categories:
Operations such as filter
, sorted
, and map
, which can be connected together to form a pipeline
Operations such as collect
, findFirst
, and allMatch
, which terminate the pipeline and return a result
Stream operations that can be connected are called intermediate operations. They can be connected together because their return type is a Stream
. Intermediate operations are “lazy” and can often be optimized. Operations that terminate a stream pipeline are called terminal operations. They produce a result from a pipeline such as a List
, Integer
, or even void
(i.e., any nonstream type).
Let’s take a tour of some of the operations available on streams. Refer to the java.util.stream.Stream
interface for the complete list.
There are several operations that can be used to filter elements from a stream:
filter
Takes a Predicate
object as an argument and returns a stream including all elements that match the predicate
distinct
Returns a stream with unique elements (according to the implementation of equals for a stream element)
limit
Returns a stream that is no longer than a certain size
skip
Returns a stream with the first n
number of elements discarded
List
<
Invoice
>
expensiveInvoices
=
invoices
.
stream
()
.
filter
(
inv
->
inv
.
getAmount
()
>
10_000
)
.
limit
(
5
)
.
collect
(
Collectors
.
toList
());
A common data processing pattern is determining whether some elements match a given property. You can use the anyMatch
, allMatch
, and noneMatch
operations to help you do this. They all take a predicate as an argument and return a boolean
as the result. For example, you can use allMatch
to check that all elements in a stream of invoices have a value higher than 1,000:
boolean
expensive
=
invoices
.
stream
()
.
allMatch
(
inv
->
inv
.
getAmount
()
>
1_000
);
In addition, the Stream
interface provides the operations findFirst
and findAny
for retrieving arbitrary elements from a stream. They can be used in conjunction with other stream operations such as filter
. Both findFirst
and findAny
return an Optional
object (which we discussed in Chapter 1):
Optional
<
Invoice
>
=
invoices
.
stream
()
.
filter
(
inv
->
inv
.
getCustomer
()
==
Customer
.
ORACLE
)
.
findAny
();
Streams support the method map
, which takes a Function
object as an argument to turn the elements of a stream into another type. The function is applied to each element, “mapping” it into a new element.
For example, you might want to use it to extract information from each element of a stream. This code returns a list of the IDs from a list of invoices:
List
<
Integer
>
ids
=
invoices
.
stream
()
.
map
(
Invoice:
:
getId
)
.
collect
(
Collectors
.
toList
());
Another common pattern is that of combining elements from a source to provide a single value. For example, “calculate the invoice with the highest amount” or “calculate the sum of all invoices’ amounts.” This is possible using the reduce
operation on streams, which repeatedly applies an operation to each element until a result is produced.
As an example of a reduce pattern, it helps to first look at how you could calculate the sum of a list using a for
loop:
int
sum
=
0
;
for
(
int
x
:
numbers
)
{
sum
+=
x
;
}
Each element of the list of numbers is combined iteratively using the addition operator to produce a result, essentially reducing the list of numbers into one number. There are two parameters in this code: the initial value of the sum
variable—in this case 0—and the operation for combining all the elements of the list, in this case the addition operation.
Using the reduce
method on streams, you can sum all the elements of a stream as shown here:
int
sum
=
numbers
.
stream
().
reduce
(
0
,
(
a
,
b
)
->
a
+
b
);
The reduce
method takes two arguments:
An initial value; here, 0.
A BinaryOperator<T>
to combine two elements and produce a new value. The reduce
method essentially abstracts the pattern of repeated application. Other queries such as “calculate the product” or “calculate the maximum” become special-use cases of the reduce
method, like so:
int
product
=
numbers
.
stream
().
reduce
(
1
,
(
a
,
b
)
->
a
*
b
);
int
max
=
numbers
.
stream
().
reduce
(
Integer
.
MIN_VALUE
,
Integer:
:
max
);
The operations you have seen so far were either returning another stream (i.e., intermediate operations) or returning a value, such as a boolean
, an int
, or an Optional
object (i.e., terminal operations). By contrast, the collect method is a terminal operation. It lets you accumulate the elements of a stream into a summary result.
The argument passed to collect
is an object of type java.util.stream.Collector
. A Collector
object essentially describes a recipe for accumulating the elements of a stream into a final result. The factory method Collectors.toList()
used earlier returns a Collector
object describing how to accumulate a stream into a List
. However, there are many similar built-in collectors available, which you can see in the class Collectors
. For example, you can group invoices by customers using Collectors.groupingBy
as shown here:
Map
<
Customer
,
List
<
Invoice
>>
customerToInvoices
=
invoices
.
stream
().
collect
(
Collectors
.
groupingBy
(
Invoice:
:
getCustomer
));
Here’s a step-by-step example so you can practice refactoring old-style Java code to use the Streams API. The following code filters invoices that are from a specific customer and related to training, sorts the resulting invoices by amount, and finally extracts the first five IDs:
List
<
Invoice
>
oracleAndTrainingInvoices
=
new
ArrayList
<>();
List
<
Integer
>
ids
=
new
ArrayList
<>();
List
<
Integer
>
firstFiveIds
=
new
ArrayList
<>();
for
(
Invoice
inv:
invoices
)
{
if
(
inv
.
getCustomer
()
==
Customer
.
ORACLE
)
{
if
(
inv
.
getTitle
().
contains
(
"Training"
))
{
oracleAndTrainingInvoices
.
add
(
inv
);
}
}
}
Collections
.
sort
(
oracleAndTrainingInvoices
,
new
Comparator
<
Invoice
>()
{
@Override
public
int
compare
(
Invoice
inv1
,
Invoice
inv2
)
{
return
Double
.
compare
(
inv1
.
getAmount
(),
inv2
.
getAmount
());
}
});
for
(
Invoice
inv:
oracleAndTrainingInvoices
)
{
ids
.
add
(
inv
.
getId
());
}
for
(
int
i
=
0
;
i
<
5
;
i
++)
{
firstFiveIds
.
add
(
ids
.
get
(
i
));
}
Now you’ll refactor this code step-by-step using the Streams API.
First, you may notice that you are using an intermediate container to store invoices that have the customer Customer.ORACLE
and "Training"
in the title. This is the use case for using the filter
operation:
Stream
<
Invoice
>
oracleAndTrainingInvoices
=
invoices
.
stream
()
.
filter
(
inv
->
inv
.
getCustomer
()
==
Customer
.
ORACLE
)
.
filter
(
inv
->
inv
.
getTitle
().
contains
(
"Training"
));
Next, you need to sort the invoices by their amount. You can use the new utility method Comparator.comparing
together with the method sorted
, as shown in the previous chapter:
Stream
<
Invoice
>
sortedInvoices
=
oracleAndTrainingInvoices
.
sorted
(
comparingDouble
(
Invoice:
:
getAmount
));
Next, you need to extract the IDs. This is a pattern for the map
operation:
Stream
<
Integer
>
ids
=
sortedInvoices
.
map
(
Invoice:
:
getId
);
Finally, you’re only interested in the first five invoices. You can use the operation limit
to stop after those five. Once you tidy up the code and use the collect operation, the final code is as follows:
List
<
Integer
>
firstFiveIds
=
invoices
.
stream
()
.
filter
(
inv
->
inv
.
getCustomer
()
==
Customer
.
ORACLE
)
.
filter
(
inv
->
inv
.
getTitle
().
contains
(
"Training"
))
.
sorted
(
comparingDouble
(
Invoice:
:
getAmount
))
.
map
(
Invoice:
:
getId
)
.
limit
(
5
)
.
collect
(
Collectors
.
toList
());
You can observe that in the old-style Java code, each local variable was stored once and used once by the next stage. Using the Streams API, these throwaway local variables are eliminated.
The Streams API supports easy data parallelism. In other words, you can explicitly ask for a stream pipeline to be performed in parallel without thinking about low-level implementation details. Behind the scenes, the Streams API will use the Fork/Join framework, which will leverage the multiple cores of your machine.
All you need to do is exchange stream()
with parallel
Stream()
. For example, here’s how to filter expensive invoices in parallel:
List
<
Invoice
>
expensiveInvoices
=
invoices
.
parallelStream
()
.
filter
(
inv
->
inv
.
getAmount
()
>
10_000
)
.
collect
(
Collectors
.
toList
());
Alternatively, you can convert an existing Stream
into a parallel Stream
by using the parallel
method:
Stream
<
Invoice
>
expensiveInvoices
=
invoices
.
stream
()
.
filter
(
inv
->
inv
.
getAmount
()
>
10_000
);
List
<
Invoice
>
result
=
expensiveInvoices
.
parallel
()
.
collect
(
Collectors
.
toList
());
Nonetheless, it’s not always a good idea to use parallel streams. There are several factors you need to take into consideration to manage performance benefits:
The internal implementation of parallel streams relies on how simple it is to split the source data structure so different threads can work on different parts. Data structures such as arrays are easily splittable, but other data structures such as LinkedList
or files offer poor splittability.
The more expensive it is to calculate an element of the stream, the more benefit from parallelism you can get.
It is preferable to use primitives instead of objects if possible, as they have lower memory footprint and better cache locality.
A larger number of data elements can produce better results because the parallel setup cost will be amortized over the processing of many elements, and the parallel speedup will outweigh the setup cost. This also depends on the processing cost per element, just mentioned.
Typically, the more cores available, the more parallelism you can get.
In practice, I advise that you benchmark and profile your code if you want a performance improvement. Java Microbenchmark Harness (JMH) is a popular framework maintained by Oracle that can help you with that. Without care, you could get poorer performance by simply switching to parallel streams.
Here are the most important takeaways from this chapter:
A stream is a sequence of elements from a source that supports aggregate operations.
There are two types of stream operations: intermediate and terminal operations.
Intermediate operations can be connected together to form a pipeline.
Intermediate operations include filter
, map
, distinct
, and sorted
.
Terminal operations process a stream pipeline to return a result.
Terminal operations include allMatch
, collect
, and forEach
.
Collectors are recipes to accumulate the element of a stream into a summary result, including containers such as List
and Map
.
A stream pipeline can be executed in parallel.
There are various factors to consider when using parallel streams for enhanced performance, including splittability, cost per element, packing, data size, and number of cores available.