1. 스트리밍 소개
Java8 스트리밍 데이터 처리를 접했을 때 첫인상은 스트리밍을 사용하면 수집 작업이 훨씬 간단해진다는 것이었습니다. 스트리밍의 도움으로 한 줄로 완료됩니다. 예를 들어 정수가 포함된 컬렉션에서 모든 짝수를 필터링하고 이를 새 목록으로 캡슐화하려면 java8 이전에 다음 코드를 통해 구현해야 합니다.
List<Integer> evens = new ArrayList<>(); for (final Integer num : nums) { if (num % 2 == 0) { evens.add(num); } }
Java8을 통해 스트림 처리의 경우 다음과 같이 코드를 단순화할 수 있습니다.
List<Integer> evens = nums.stream().filter(num -> num % 2 == 0).collect(Collectors.toList());
위 명령문 줄의 의미를 간략하게 설명하겠습니다. stream() 작업은 컬렉션을 스트림으로 변환하고 filter()는 실행됩니다. 여기서의 필터링 프로세스는 람다 표현식을 통해 모든 짝수를 필터링하는 것입니다. 마지막으로 Collect()를 통해 결과를 캡슐화하고 Collectors.toList()를 통해 반환할 목록 컬렉션으로 캡슐화를 지정합니다.
위의 예에서 볼 수 있듯이 Java8의 스트리밍 처리는 컬렉션 작업을 크게 단순화합니다. 실제로 스트림으로 변환할 수 있는 한 배열, 파일 등을 포함하는 컬렉션이 아닙니다. 우리는 SQL 문을 작성하는 방법과 유사하게 작동하는 스트림 수식 처리를 사용할 수 있습니다. Java8은 내부 반복을 통해 스트림 처리를 구현합니다. 스트림 처리는 스트림으로의 변환, 중간 작업 및 터미널 작업의 세 부분으로 나눌 수 있습니다. 아래와 같이:
컬렉션을 예로 들면 스트리밍 작업의 경우 먼저 stream() 함수를 호출하여 이를 스트림으로 변환한 다음 해당 중간 작업을 호출합니다. 필터링, 변환 등과 같이 컬렉션에 대해 수행해야 하는 작업을 수행하고 마지막으로 터미널 작업을 통해 이전 결과를 캡슐화하고 필요한 형식을 반환합니다.
2. 중간 작업
이후 예제의 시연을 용이하게 하기 위해 먼저 간단한 학생 엔터티 클래스를 정의합니다.
public class Student { /** 学号 */ private long id; private String name; private int age; /** 年级 */ private int grade; /** 专业 */ private String major; /** 学校 */ private String school; // 省略getter和setter}
// 初始化List<Student> students = new ArrayList<Student>() { { add(new Student(20160001, "孔明", 20, 1, "土木工程", "武汉大学")); add(new Student(20160002, "伯约", 21, 2, "信息安全", "武汉大学")); add(new Student(20160003, "玄德", 22, 3, "经济管理", "武汉大学")); add(new Student(20160004, "云长", 21, 2, "信息安全", "武汉大学")); add(new Student(20161001, "翼德", 21, 2, "机械与自动化", "华中科技大学")); add(new Student(20161002, "元直", 23, 4, "土木工程", "华中科技大学")); add(new Student(20161003, "奉孝", 23, 4, "计算机科学", "华中科技大学")); add(new Student(20162001, "仲谋", 22, 3, "土木工程", "浙江大学")); add(new Student(20162002, "鲁肃", 23, 4, "计算机科学", "浙江大学")); add(new Student(20163001, "丁奉", 24, 5, "土木工程", "南京大学")); } };
2.1 필터링
필터링은 이름에서 알 수 있듯이 주어진 요구 사항에 따라 컬렉션의 조건을 충족하는 요소를 필터링하는 것입니다. java8에서 제공하는 필터링 작업에는 필터, 구별, 제한 및 건너뛰기가 포함됩니다.
filter
이전 예에서는 Stream
List<Student> whuStudents = students.stream() .filter(student -> "武汉大学".equals(student.getSchool())) .collect(Collectors.toList());
distinct
independent 이 작업은 중복 제거 처리를 위해 SQL 문을 작성할 때 추가한 DISTINCT 키워드와 유사합니다. 그런 다음 고유한 작업을 추가할 수 있습니다.
List<Integer> evens = nums.stream() .filter(num -> num % 2 == 0).distinct() .collect(Collectors.toList());
limit
제한 작업은 SQL 문의 LIMIT 키워드와 유사하지만 해당 기능은 상대적으로 약합니다. 처음 n개 요소를 포함하는 스트림. 설정 크기가 n보다 작으면 실제 길이가 반환됩니다. 예를 들어 다음 예에서는 처음 두 전공이 토목공학인 학생을 반환합니다.
List
List<Student> civilStudents = students.stream() .filter(student -> "土木工程".equals(student.getMajor())).limit(2) .collect(Collectors.toList());
제한에 관해 말하자면 또 다른 스트림 작업인 sorted를 언급해야 합니다. 이 작업은 스트림의 요소를 정렬하는 데 사용됩니다. sorted에서는 비교할 요소가 Comparable 인터페이스를 구현해야 합니다. 그렇지 않더라도 비교기를 sorted(Comparator super T> 비교) 예를 들어, 토목공학을 전공하는 학생을 선별하고, 가장 어린 학생부터 나이가 많은 학생까지 정렬하고, 가장 어린 학생 2명을 필터링하여 다음과 같이 구현할 수 있습니다.
List<Student> sortedCivilStudents = students.stream() .filter(student -> "土木工程".equals(student.getMajor())).sorted((s1, s2) -> s1.getAge() - s2.getAge()) .limit(2) .collect(Collectors.toList());
skip
skip操作与limit操作相反,如同其字面意思一样,是跳过前n个元素,比如我们希望找出排序在2之后的土木工程专业的学生,那么可以实现为:
List<Student> civilStudents = students.stream() .filter(student -> "土木工程".equals(student.getMajor())) .skip(2) .collect(Collectors.toList());
通过skip,就会跳过前面两个元素,返回由后面所有元素构造的流,如果n大于满足条件的集合的长度,则会返回一个空的集合。
2.2 映射
在SQL中,借助SELECT关键字后面添加需要的字段名称,可以仅输出我们需要的字段数据,而流式处理的映射操作也是实现这一目的,在java8的流式处理中,主要包含两类映射操作:map和flatMap。
map
举例说明,假设我们希望筛选出所有专业为计算机科学的学生姓名,那么我们可以在filter筛选的基础之上,通过map将学生实体映射成为学生姓名字符串,具体实现如下:
List<String> names = students.stream() .filter(student -> "计算机科学".equals(student.getMajor())) .map(Student::getName).collect(Collectors.toList());
除了上面这类基础的map,java8还提供了mapToDouble(ToDoubleFunction super T> mapper),mapToInt(ToIntFunction super T> mapper),mapToLong(ToLongFunction super T> mapper),这些映射分别返回对应类型的流,java8为这些流设定了一些特殊的操作,比如我们希望计算所有专业为计算机科学学生的年龄之和,那么我们可以实现如下:
int totalAge = students.stream() .filter(student -> "计算机科学".equals(student.getMajor())) .mapToInt(Student::getAge).sum();
通过将Student按照年龄直接映射为IntStream,我们可以直接调用提供的sum()方法来达到目的,此外使用这些数值流的好处还在于可以避免jvm装箱操作所带来的性能消耗。
flatMap
flatMap与map的区别在于 flatMap是将一个流中的每个值都转成一个个流,然后再将这些流扁平化成为一个流 。举例说明,假设我们有一个字符串数组String[] strs = {"java8", "is", "easy", "to", "use"};,我们希望输出构成这一数组的所有非重复字符,那么我们可能首先会想到如下实现:
List<String[]> distinctStrs = Arrays.stream(strs) .map(str -> str.split("")) // 映射成为Stream<String[]> .distinct() .collect(Collectors.toList());
在执行map操作以后,我们得到是一个包含多个字符串(构成一个字符串的字符数组)的流,此时执行distinct操作是基于在这些字符串数组之间的对比,所以达不到我们希望的目的,此时的输出为:
[j, a, v, a, 8] [i, s] [e, a, s, y] [t, o] [u, s, e]
distinct只有对于一个包含多个字符的流进行操作才能达到我们的目的,即对Stream
List<String> distinctStrs = Arrays.stream(strs) .map(str -> str.split("")) // 映射成为Stream<String[]> .flatMap(Arrays::stream) // 扁平化为Stream<String> .distinct() .collect(Collectors.toList());
flatMap将由map映射得到的Stream
与map类似,flatMap也提供了针对特定类型的映射操作:flatMapToDouble(Function super T,? extends DoubleStream> mapper),flatMapToInt(Function super T,? extends IntStream> mapper),flatMapToLong(Function super T,? extends LongStream> mapper)。
三. 终端操作
终端操作是流式处理的最后一步,我们可以在终端操作中实现对流查找、归约等操作。
3.1 查找
allMatch
allMatch用于检测是否全部都满足指定的参数行为,如果全部满足则返回true,例如我们希望检测是否所有的学生都已满18周岁,那么可以实现为:
boolean isAdult = students.stream().allMatch(student -> student.getAge() >= 18);
anyMatch
anyMatch则是检测是否存在一个或多个满足指定的参数行为,如果满足则返回true,例如我们希望检测是否有来自武汉大学的学生,那么可以实现为:
boolean hasWhu = students.stream().anyMatch(student -> "武汉大学".equals(student.getSchool()));
noneMatch
noneMatch用于检测是否不存在满足指定行为的元素,如果不存在则返回true,例如我们希望检测是否不存在专业为计算机科学的学生,可以实现如下:
boolean noneCs = students.stream().noneMatch(student -> "计算机科学".equals(student.getMajor()));
findFirst
findFirst用于返回满足条件的第一个元素,比如我们希望选出专业为土木工程的排在第一个学生,那么可以实现如下:
Optional<Student> optStu = students.stream().filter(student -> "土木工程".equals(student.getMajor())).findFirst();
findFirst不携带参数,具体的查找条件可以通过filter设置,此外我们可以发现findFirst返回的是一个Optional类型,关于该类型的具体讲解可以参考上一篇:Java8新特性 - Optional类。
findAny
findAny相对于findFirst的区别在于,findAny不一定返回第一个,而是返回任意一个,比如我们希望返回任意一个专业为土木工程的学生,可以实现如下:
Optional<Student> optStu = students.stream().filter(student -> "土木工程".equals(student.getMajor())).findAny();
实际上对于顺序流式处理而言,findFirst和findAny返回的结果是一样的,至于为什么会这样设计,是因为在下一篇我们介绍的 并行流式处理,当我们启用并行流式处理的时候,查找第一个元素往往会有很多限制,如果不是特别需求,在并行流式处理中使用findAny的性能要比findFirst好。
3.2 归约
前面的例子中我们大部分都是通过collect(Collectors.toList())对数据封装返回,如我的目标不是返回一个新的集合,而是希望对经过参数化操作后的集合进行进一步的运算,那么我们可用对集合实施归约操作。java8的流式处理提供了reduce方法来达到这一目的。
前面我们通过mapToInt将Stream
// 前面例子中的方法int totalAge = students.stream() .filter(student -> "计算机科学".equals(student.getMajor())) .mapToInt(Student::getAge).sum();// 归约操作int totalAge = students.stream() .filter(student -> "计算机科学".equals(student.getMajor())) .map(Student::getAge) .reduce(0, (a, b) -> a + b);// 进一步简化int totalAge2 = students.stream() .filter(student -> "计算机科学".equals(student.getMajor())) .map(Student::getAge) .reduce(0, Integer::sum);// 采用无初始值的重载版本,需要注意返回OptionalOptional<Integer> totalAge = students.stream() .filter(student -> "计算机科学".equals(student.getMajor())) .map(Student::getAge) .reduce(Integer::sum); // 去掉初始值
3.3 收集
前面利用collect(Collectors.toList())是一个简单的收集操作,是对处理结果的封装,对应的还有toSet、toMap,以满足我们对于结果组织的需求。这些方法均来自于java.util.stream.Collectors,我们可以称之为收集器。
3.3.1 归约
收集器也提供了相应的归约操作,但是与reduce在内部实现上是有区别的,收集器更加适用于可变容器上的归约操作,这些收集器广义上均基于Collectors.reducing()实现。
例1:求学生的总人数
long count = students.stream().collect(Collectors.counting()); // 进一步简化long count = students.stream(). count();
例2:求年龄的最大值和最小值
// 求最大年龄 Optional<Student> olderStudent = students.stream().collect(Collectors.maxBy((s1, s2) -> s1.getAge() - s2.getAge())); // 进一步简化 Optional<Student> olderStudent2 = students.stream().collect(Collectors.maxBy(Comparator.comparing(Student::getAge))); // 求最小年龄 Optional<Student> olderStudent3 = students.stream().collect(Collectors.minBy(Comparator.comparing(Student::getAge)));
例3:求年龄总和
int totalAge4 = students.stream().collect(Collectors.summingInt(Student::getAge));
对应的还有summingLong、summingDouble。
例4:求年龄的平均值
double avgAge = students.stream().collect(Collectors.averagingInt(Student::getAge));
对应的还有averagingLong、averagingDouble。
例5:一次性得到元素个数、总和、均值、最大值、最小值
IntSummaryStatistics statistics = students.stream().collect(Collectors.summarizingInt(Student::getAge));
输出:
IntSummaryStatistics{count=10, sum=220, min=20, average=22.000000, max=24}
对应的还有summarizingLong、summarizingDouble。
例6:字符串拼接
String names = students.stream().map(Student::getName).collect(Collectors.joining()); // 输出:孔明伯约玄德云长翼德元直奉孝仲谋鲁肃丁奉 String names = students.stream().map(Student::getName).collect(Collectors.joining(", ")); // 输出:孔明, 伯约, 玄德, 云长, 翼德, 元直, 奉孝, 仲谋, 鲁肃, 丁奉
3.3.2 分组
在数据库操作中,我们可以通过GROUP BY关键字对查询到的数据进行分组,java8的流式处理也为我们提供了这样的功能Collectors.groupingBy来操作集合。比如我们可以按学校对上面的学生进行分组:
Map<String, List<Student>> groups = students.stream().collect(Collectors.groupingBy(Student::getSchool));
groupingBy接收一个分类器Function super T, ? extends K> classifier,我们可以自定义分类器来实现需要的分类效果。
上面演示的是一级分组,我们还可以定义多个分类器实现 多级分组,比如我们希望在按学校分组的基础之上再按照专业进行分组,实现如下:
Map<String, Map<String, List<Student>>> groups2 = students.stream().collect( Collectors.groupingBy(Student::getSchool, // 一级分组,按学校 Collectors.groupingBy(Student::getMajor))); // 二级分组,按专业
实际上在groupingBy的第二个参数不是只能传递groupingBy,还可以传递任意Collector类型,比如我们可以传递一个Collector.counting,用以统计每个组的个数:
Map<String, Long> groups = students.stream().collect(Collectors.groupingBy(Student::getSchool, Collectors.counting()));
如果我们不添加第二个参数,则编译器会默认帮我们添加一个Collectors.toList()。
3.3.3 分区
分区可以看做是分组的一种特殊情况,在分区中key只有两种情况:true或false,目的是将待分区集合按照条件一分为二,java8的流式处理利用ollectors.partitioningBy()方法实现分区,该方法接收一个谓词,例如我们希望将学生分为武大学生和非武大学生,那么可以实现如下:
Map<Boolean, List<Student>> partition = students.stream().collect(Collectors.partitioningBy(student -> "武汉大学".equals(student.getSchool())));
分区相对分组的优势在于,我们可以同时得到两类结果,在一些应用场景下可以一步得到我们需要的所有结果,比如将数组分为奇数和偶数。
以上介绍的所有收集器均实现自接口java.util.stream.Collector,该接口的定义如下:
public interface Collector<T, A, R> { /** * A function that creates and returns a new mutable result container. * * @return a function which returns a new, mutable result container */ Supplier<A> supplier(); /** * A function that folds a value into a mutable result container. * * @return a function which folds a value into a mutable result container */ BiConsumer<A, T> accumulator(); /** * A function that accepts two partial results and merges them. The * combiner function may fold state from one argument into the other and * return that, or may return a new result container. * * @return a function which combines two partial results into a combined * result */ BinaryOperator<A> combiner(); /** * Perform the final transformation from the intermediate accumulation type * {@code A} to the final result type {@code R}. * * <p>If the characteristic {@code IDENTITY_TRANSFORM} is * set, this function may be presumed to be an identity transform with an * unchecked cast from {@code A} to {@code R}. * * @return a function which transforms the intermediate result to the final * result */ Function<A, R> finisher(); /** * Returns a {@code Set} of {@code Collector.Characteristics} indicating * the characteristics of this Collector. This set should be immutable. * * @return an immutable set of collector characteristics */ Set<Characteristics> characteristics(); }
我们也可以实现该接口来定义自己的收集器,此处不再展开。
四. 并行流式数据处理
流式处理中的很多都适合采用 分而治之 的思想,从而在处理集合较大时,极大的提高代码的性能,java8的设计者也看到了这一点,所以提供了 并行流式处理。上面的例子中我们都是调用stream()方法来启动流式处理,java8还提供了parallelStream()来启动并行流式处理,parallelStream()本质上基于java7的Fork-Join框架实现,其默认的线程数为宿主机的内核数。
启动并行流式处理虽然简单,只需要将stream()替换成parallelStream()即可,但既然是并行,就会涉及到多线程安全问题,所以在启用之前要先确认并行是否值得(并行的效率不一定高于顺序执行),另外就是要保证线程安全。此两项无法保证,那么并行毫无意义,毕竟结果比速度更加重要,以后有时间再来详细分析一下并行流式数据处理的具体实现和最佳实践。