New features of Java8 - streaming data processing

1. Introduction to streaming processing

 When I came into contact with Java8 streaming data processing, my first impression was that streaming processing makes collection operations much simpler. We usually need multiple lines of code to complete operations. This can be done in one line with the help of streaming. For example, if we want to filter out all even numbers from a collection containing integers and encapsulate them into a new List for return, then before java8, we need to implement it through the following code:

List<Integer> evens = new ArrayList<>();
for (final Integer num : nums) {  
  if (num % 2 == 0) {

Through the streaming processing of java8, we The code can be simplified as:

List<Integer> evens = nums.stream().filter(num -> num % 2 == 0).collect(Collectors.toList());

   Let me briefly explain the meaning of the above line of statement. The stream() operation converts the collection into a stream, and filter() performs our customized filtering process. Here, we filter out the values ​​through lambda expressions. For all even numbers, we finally encapsulate the result through collect(), and specify its encapsulation into a List collection and return it through Collectors.toList().
As can be seen from the above example, Java8's streaming processing greatly simplifies the operation of collections. In fact, it is not just collections, including arrays, files, etc., as long as it can be converted into a stream, we can use streaming processing , operate on it similar to how we write SQL statements. Java8 implements stream processing through internal iteration. A stream processing can be divided into three parts: conversion to stream, intermediate operation, and terminal operation. As shown below:

New features of Java8 - streaming data processing

Taking a collection as an example, for a streaming operation, we first need to call the stream() function to convert it into a stream, and then call the corresponding intermediate operations to achieve the operations we need to perform on the collection, such as Filtering, conversion, etc., and finally encapsulating the previous results through terminal operations and returning the form we need.

2. Intermediate operations

To facilitate the demonstration of the following examples, we first define a simple student entity class:

public class Student {    /** 学号 */
    private long id;    private String name;    private int age;    /** 年级 */
    private int grade;    /** 专业 */
    private String major;    /** 学校 */
    private String school;    // 省略getter和setter}
// 初始化List<Student> students = new ArrayList<Student>() {
        add(new Student(20160001, "孔明", 20, 1, "土木工程", "武汉大学"));
        add(new Student(20160002, "伯约", 21, 2, "信息安全", "武汉大学"));
        add(new Student(20160003, "玄德", 22, 3, "经济管理", "武汉大学"));
        add(new Student(20160004, "云长", 21, 2, "信息安全", "武汉大学"));
        add(new Student(20161001, "翼德", 21, 2, "机械与自动化", "华中科技大学"));
        add(new Student(20161002, "元直", 23, 4, "土木工程", "华中科技大学"));
        add(new Student(20161003, "奉孝", 23, 4, "计算机科学", "华中科技大学"));
        add(new Student(20162001, "仲谋", 22, 3, "土木工程", "浙江大学"));
        add(new Student(20162002, "鲁肃", 23, 4, "计算机科学", "浙江大学"));
        add(new Student(20163001, "丁奉", 24, 5, "土木工程", "南京大学"));

2.1 Filtering

Filtering, as the name suggests, is to filter the elements that meet the conditions in the collection according to the given requirements, java8 The filtering operations provided include: filter, distinct, limit, and skip.


  In the previous example we have demonstrated how to use filter, which is defined as: Stream filter(Predicate super T> predicate), filter accepts a predicate Predicate, we can define filter through this predicate Conditions, when introducing lambda expressions, we introduced that Predicate is a functional interface, which contains a test(T t) method, which returns boolean. Now we want to filter out all Wuhan University students from the students collection, then we can do it through filter, and pass the filtering operation as a parameter to the filter:

List<Student> whuStudents = students.stream()
                                    .filter(student -> "武汉大学".equals(student.getSchool()))


  The distinct operation is similar to when we write a SQL statement, The added DISTINCT keyword is used for deduplication processing. Distinct is implemented based on Object.equals(Object). Going back to the original example, assuming we want to filter out all non-repeating even numbers, we can add the distinct operation:

List<Integer> evens = nums.stream()
                        .filter(num -> num % 2 == 0).distinct()


 The limit operation is also similar to the LIMIT keyword in the SQL statement, but its function is relatively weak. Limit returns a stream containing the first n elements. When the set size is less than n, the actual length is returned. For example, the following example returns the first two A student majoring in civil engineering:

List civilStudents = students.stream()

List<Student> civilStudents = students.stream()
                                    .filter(student -> "土木工程".equals(student.getMajor())).limit(2)

  Speaking of limit, I have to mention another stream operation: sorted. This operation is used to sort the elements in the stream. sorted requires that the elements to be compared must implement the Comparable interface. It doesn’t matter if it does not. We can pass the comparator as a parameter to sorted(Comparator super T> comparator). For example, we want Screen out students majoring in civil engineering, sort them by age from youngest to oldest, and filter out the two youngest students, then it can be implemented as:

List<Student> sortedCivilStudents = students.stream()
                                            .filter(student -> "土木工程".equals(student.getMajor())).sorted((s1, s2) -> s1.getAge() - s2.getAge())



List<Student> civilStudents = students.stream()
                                    .filter(student -> "土木工程".equals(student.getMajor()))


2.2 映射




List<String> names = students.stream()
                            .filter(student -> "计算机科学".equals(student.getMajor()))

  除了上面这类基础的map,java8还提供了mapToDouble(ToDoubleFunction super T> mapper),mapToInt(ToIntFunction super T> mapper),mapToLong(ToLongFunction super T> mapper),这些映射分别返回对应类型的流,java8为这些流设定了一些特殊的操作,比如我们希望计算所有专业为计算机科学学生的年龄之和,那么我们可以实现如下:

int totalAge = students.stream()
                    .filter(student -> "计算机科学".equals(student.getMajor()))



  flatMap与map的区别在于 flatMap是将一个流中的每个值都转成一个个流,然后再将这些流扁平化成为一个流 。举例说明,假设我们有一个字符串数组String[] strs = {"java8", "is", "easy", "to", "use"};,我们希望输出构成这一数组的所有非重复字符,那么我们可能首先会想到如下实现:

List<String[]> distinctStrs = Arrays.stream(strs)
                                .map(str -> str.split(""))  // 映射成为Stream<String[]>


[j, a, v, a, 8]
[i, s]
[e, a, s, y]
[t, o]
[u, s, e]


List<String> distinctStrs = Arrays.stream(strs)
                                .map(str -> str.split(""))  // 映射成为Stream<String[]>
                                .flatMap(Arrays::stream)  // 扁平化为Stream<String>

  与map类似,flatMap也提供了针对特定类型的映射操作:flatMapToDouble(Function super T,? extends DoubleStream> mapper),flatMapToInt(Function super T,? extends IntStream> mapper),flatMapToLong(Function super T,? extends LongStream> mapper)。

三. 终端操作


3.1 查找



boolean isAdult = students.stream().allMatch(student -> student.getAge() >= 18);



boolean hasWhu = students.stream().anyMatch(student -> "武汉大学".equals(student.getSchool()));



boolean noneCs = students.stream().noneMatch(student -> "计算机科学".equals(student.getMajor()));



Optional<Student> optStu = students.stream().filter(student -> "土木工程".equals(student.getMajor())).findFirst();

  findFirst不携带参数,具体的查找条件可以通过filter设置,此外我们可以发现findFirst返回的是一个Optional类型,关于该类型的具体讲解可以参考上一篇:Java8新特性 - Optional类。



Optional<Student> optStu = students.stream().filter(student -> "土木工程".equals(student.getMajor())).findAny();

  实际上对于顺序流式处理而言,findFirst和findAny返回的结果是一样的,至于为什么会这样设计,是因为在下一篇我们介绍的 并行流式处理,当我们启用并行流式处理的时候,查找第一个元素往往会有很多限制,如果不是特别需求,在并行流式处理中使用findAny的性能要比findFirst好。

3.2 归约


// 前面例子中的方法int totalAge = students.stream()
                .filter(student -> "计算机科学".equals(student.getMajor()))
                .mapToInt(Student::getAge).sum();// 归约操作int totalAge = students.stream()
                .filter(student -> "计算机科学".equals(student.getMajor()))
                .reduce(0, (a, b) -> a + b);// 进一步简化int totalAge2 = students.stream()
                .filter(student -> "计算机科学".equals(student.getMajor()))
                .reduce(0, Integer::sum);// 采用无初始值的重载版本,需要注意返回OptionalOptional<Integer> totalAge = students.stream()
                .filter(student -> "计算机科学".equals(student.getMajor()))
                .reduce(Integer::sum);  // 去掉初始值

3.3 收集


3.3.1 归约



long count = students.stream().collect(Collectors.counting());
// 进一步简化long count = students.stream().


// 求最大年龄
Optional<Student> olderStudent = students.stream().collect(Collectors.maxBy((s1, s2) -> s1.getAge() - s2.getAge()));

// 进一步简化
Optional<Student> olderStudent2 = students.stream().collect(Collectors.maxBy(Comparator.comparing(Student::getAge)));

// 求最小年龄
Optional<Student> olderStudent3 = students.stream().collect(Collectors.minBy(Comparator.comparing(Student::getAge)));


int totalAge4 = students.stream().collect(Collectors.summingInt(Student::getAge));



double avgAge = students.stream().collect(Collectors.averagingInt(Student::getAge));



IntSummaryStatistics statistics = students.stream().collect(Collectors.summarizingInt(Student::getAge));


IntSummaryStatistics{count=10, sum=220, min=20, average=22.000000, max=24}



String names = students.stream().map(Student::getName).collect(Collectors.joining());
// 输出:孔明伯约玄德云长翼德元直奉孝仲谋鲁肃丁奉
String names = students.stream().map(Student::getName).collect(Collectors.joining(", "));
// 输出:孔明, 伯约, 玄德, 云长, 翼德, 元直, 奉孝, 仲谋, 鲁肃, 丁奉

3.3.2 分组

  在数据库操作中,我们可以通过GROUP BY关键字对查询到的数据进行分组,java8的流式处理也为我们提供了这样的功能Collectors.groupingBy来操作集合。比如我们可以按学校对上面的学生进行分组:

Map<String, List<Student>> groups = students.stream().collect(Collectors.groupingBy(Student::getSchool));

  groupingBy接收一个分类器Function super T, ? extends K> classifier,我们可以自定义分类器来实现需要的分类效果。

  上面演示的是一级分组,我们还可以定义多个分类器实现 多级分组,比如我们希望在按学校分组的基础之上再按照专业进行分组,实现如下:

Map<String, Map<String, List<Student>>> groups2 = students.stream().collect(
                Collectors.groupingBy(Student::getSchool,  // 一级分组,按学校
                Collectors.groupingBy(Student::getMajor)));  // 二级分组,按专业


Map<String, Long> groups = students.stream().collect(Collectors.groupingBy(Student::getSchool, Collectors.counting()));


3.3.3 分区


Map<Boolean, List<Student>> partition = students.stream().collect(Collectors.partitioningBy(student -> "武汉大学".equals(student.getSchool())));



public interface Collector<T, A, R> {    /**
     * A function that creates and returns a new mutable result container.
     * @return a function which returns a new, mutable result container
    Supplier<A> supplier();    /**
     * A function that folds a value into a mutable result container.
     * @return a function which folds a value into a mutable result container
    BiConsumer<A, T> accumulator();    /**
     * A function that accepts two partial results and merges them.  The
     * combiner function may fold state from one argument into the other and
     * return that, or may return a new result container.
     * @return a function which combines two partial results into a combined
     * result
    BinaryOperator<A> combiner();    /**
     * Perform the final transformation from the intermediate accumulation type
     * {@code A} to the final result type {@code R}.
     * <p>If the characteristic {@code IDENTITY_TRANSFORM} is
     * set, this function may be presumed to be an identity transform with an
     * unchecked cast from {@code A} to {@code R}.
     * @return a function which transforms the intermediate result to the final
     * result
    Function<A, R> finisher();    /**
     * Returns a {@code Set} of {@code Collector.Characteristics} indicating
     * the characteristics of this Collector.  This set should be immutable.
     * @return an immutable set of collector characteristics
    Set<Characteristics> characteristics();



四. 并行流式数据处理

  流式处理中的很多都适合采用 分而治之 的思想,从而在处理集合较大时,极大的提高代码的性能,java8的设计者也看到了这一点,所以提供了 并行流式处理。上面的例子中我们都是调用stream()方法来启动流式处理,java8还提供了parallelStream()来启动并行流式处理,parallelStream()本质上基于java7的Fork-Join框架实现,其默认的线程数为宿主机的内核数。

