Java中的函数式编程(六)流Stream基础

写在前面

如果说函数式接口和lambda表达式是Java中函数式编程的基石,那么stream就是在基石上的最富丽堂皇的大厦。

只有熟悉了stream,你才能说熟悉了Java 的函数式编程。

本文主要介绍Stream的基础概念和基本操作,让大家对Stream有一个初步的理解。

本文的示例代码可从gitee上获取://gitee.com/cnmemset/javafp

stream的概念

首先,看一个典型的stream例子:

public static void simpleStream() {
    List<String> words = Arrays.asList("hello", "world", "I", "love", "you");
    int letterCount = words.stream()
            .filter(s -> s.length() > 3)  // 过滤掉长度小于等于3的单词
            .mapToInt(String::length)     // 将每个单词映射为单词长度
            .sum();  // 计算总长度 5(hello) + 5(world) + 4(love) = 14
 
    // 输出为 14
    System.out.println(letterCount);
}

在上述例子中,我们将字符串列表 words 作为stream的数据源,然后执行了 filter-map-reduce 的系列操作(sum方法属于 reduce 操作),后面会详细介绍map和reduce 操作。如果你有大数据的编程经验,会更容易理解map和reduce的含义。

stream的定义比较晦涩,大致可以理解为是一个支持串行或并行操作的数据元素序列。它具备以下几个特点:

  • 首先,stream不是一种数据结构,它并不存储数据。stream是某个数据源之上的数据视图。数据源可以是一个数组,或者是一个Collection类,甚至还可以是I/O channel。它通过一个计算管道(a pipeline of computational operations),对数据源的数据进行filter-map-reduce的操作。
  • 其次,stream天生支持函数式编程。函数式编程的一个重要特点就是不会修改变量的值(没有“副作用”)。而对stream的任何操作,都不会修改数据源中的数据。例如,对一个数据源为Collection的stream进行filter操作,只会生成一个新的stream对象,而不会真的删除底层数据源中的元素。
  • 第三,stream的许多操作都是惰性求值的(laziness-seeking)。惰性求值是指该操作只是对stream的一个描述,并不会马上执行。这类惰性的操作在stream中被称为中间操作(intermediate operations)。
  • 第四,stream呈现的数据可以是无限的。例如Stream.generate可以生成一个无限的流。我们可以通过 limit(n) 方法来将一个无限流转换为有限流,或者通过 findFirst() 方法终止一个无限流。
  • 最后,stream中的元素只能被消费1次。和迭代器 Iterator 相似,当需要重复访问某个元素时,需要重新生成一个新的stream。

stream的操作可以分成两类,中间操作(intermediate operations)和终止操作(terminal operations)。一个stream管道(stream pipeline)是由一个数据源 + 0个或多个中间操作 + 1个终止操作组成的。

中间操作:
中间操作(intermediate operations)指的是将一个stream转换为另一个stream的操作,譬如filter和map操作。中间操作都是惰性的,它们的作用仅仅是描述了一个新的stream,不会马上被执行。

终止操作:
终止操作(terminal operations)则指的是那些会产生一个新值或副作用(side-effect)的操作,譬如count 和 forEach 操作。只有遇到终止操作时,之前定义的中间操作才会真正被执行。需要注意,当一个stream执行了一个终止操作后,它的状态会变成“已消费”,不能再被使用。

为了证实“中间操作都是惰性的”,我们设计了一个实验性的示例代码:

public static void intermediateOperations() {
    List<String> words = Arrays.asList("hello", "world", "I", "love", "you");
 
    System.out.println("start: " + System.currentTimeMillis());
 
    Stream<String> interStream = words.stream()
            .filter(s -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // do nothing
                }
                return s.length() > 3;
            });
    IntStream intStream = interStream.mapToInt(s -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // do nothing
        }
        return s.length();
    });
 
    // 因为 filter 和 map 操作都属于中间操作,并不会真正执行,
    // 所以它们不受 Thread.sleep 的影响,耗时很短
    System.out.println("after filter && map: " + System.currentTimeMillis());
 
    int letterCount = intStream.sum();
 
    // sum 属于终止操作,会执行之前定义的中间操作,
    // Thread.sleep 被真正执行了,耗时为 5(filter) + 3(mapToInt) = 8秒
    System.out.println("after sum: " + System.currentTimeMillis());
 
    // 输出为 14
    System.out.println(letterCount);
}

上述代码的输出类似:

start: 1633438922526
after filter && map: 1633438922588
after sum: 1633438930620
14

可以看到,上述代码验证了“中间操作都是惰性的”:打印“start”和打印“after filter && map”之间只隔了几十毫秒,而打印“after sum”则在8秒之后,证明了只有在遇到 sum 操作后,filter 和 map 中定义的函数才真正被执行。

生成一个stream对象

Java 8中,引入了4个stream的接口:Stream、IntStream、LongStream、DoubleStream,分别对应Object类型,以及基础类型int、long和double。如下图所示:

在Java中,与stream相关的操作基本都是通过上述的4个接口来实现的,不会涉及到具体的stream实现类。要得到一个stream,通常不会手动创建,而是调用对应的工具方法。

常用的工具方法包括:

  1. Collection方法:Collection.stream() 或 Collection.parallelStream()
  2. 数组方法:Arrays.stream(Object[])
  3. 工厂方法:Stream.of(Object[]), IntStream.range(int, int) 或 Stream.iterate(Object, UnaryOperator) 等等
  4. 读取文件方法:BufferedReader.lines()
  5. 类 java.nio.file.Files 中,也提供了Stream相关的API,例如 Files.list, Files.walk 等等

Stream的基本操作

我们以接口Stream为例,先介绍stream的一些基本操作。

forEach()

Stream中的forEach方法和Collection中的forEach方法相似,都是对每个元素执行指定的操作。

forEach方法签名为:

void forEach(Consumer<? super T> action)

forEach方法是一个终止操作,意味着在它之前的所有中间操作都将会被执行,然后再马上执行 action 。

filter()

filter方法的方法签名是:

Stream<T> filter(Predicate<? super T> predicate)

filter方法是一个中间操作,它的作用是根据参数 predicate 过滤元素,返回一个只包含满足predicate条件元素的Stream。

示例代码:

public static void filterStream() {
    List<String> words = Arrays.asList("hello", "world", "I", "love", "you");
    words.stream()
            .filter(s -> s.length() > 3)  // 过滤掉长度小于等于3的单词
            .forEach(s -> System.out.println(s));
}

上述代码输出为:

hello
world
love

limit()

limit方法签名为:

Stream<T> limit(long maxSize);

limit方法是一个短路型(short-circuiting)的中间操作,作用是将当前的Stream截断,只留下最多 maxSize 个元素组成一个新的Stream。短路型(short-circuiting)的含义是指将一个无限元素的Stream转换为一个有限元素的Stream。

例如,Random.ints 可以生成一个近似无限的随机整数流,我们可以通过limit方法限制生成随机整数的个数。示例代码:

public static void limitStream() {
    Random random = new Random();
 
    // 打印左闭右开区间中 [1, 100) 中的 5 个随机整数
    random.ints(1, 100)
            .limit(5)
            .forEach(System.out::println);
}

上述代码的输出类似:

90
31
31
52
63

distinct()

distinct的方法签名是:

Stream<T> distinct();

distinct是一个中间操作,作用是返回一个去除重复元素后的Stream。

作者曾遇到过一个有趣的场景:要生成10个不重复的随机数字。可以结合Random.ints (Random.ints 可以生成一个近似无限的随机整数流)方法来实现这个需求。示例代码如下:

public static void distinctStream() {
    Random random = new Random();
 
    // 在左闭右开区间中 [1, 100) 随机生成 10 个不重复的数字
    random.ints(1, 100)
            .distinct()
            .limit(10)
            .forEach(System.out::println);
 
    /*
    // 一个有趣的问题,如果 limit 方法放在 distinct 前面,
    // 结果和上面的代码有什么区别吗?
    // 欢迎加群讨论。
    random.ints(1, 100)
            .limit(10)
            .distinct()
            .forEach(System.out::println);
    */
}

sorted()

sorted的方法签名有两个,分别是:

Stream<T> sorted();

Stream<T> sorted(Comparator<? super T> comparator);

前者是按照自然顺序排序,后者是根据指定的比较器进行排序。

sorted方法是一个中间操作,和Collection.sort方法作用相似。

示例代码如下:

public static void sortedStream() {
    List<String> list = Arrays.asList("Guangdong", "Fujian", "Hunan", "Guangxi");
 
    // 自然排序
    list.stream().sorted().forEach(System.out::println);
 
    System.out.println("===============");
 
    // 对省份进行排序,首先按照长度排序,如果长度一样,则按照字母顺序排序
    list.stream().sorted((first, second) -> {
        int lenDiff = first.length() - second.length();
        return lenDiff == 0 ? first.compareTo(second) : lenDiff;
    }).forEach(System.out::println);
}

上述代码的输出为:

Fujian
Guangdong
Guangxi
Hunan
===============
Hunan
Fujian
Guangxi
Guangdong

结语

欢迎来到 Java 的函数式编程世界!!!

本文介绍了 Stream 的概念和基本操作。大家尤其要理解中间操作和终止操作的概念。

认真阅读完本文后,你应该对 Stream 有了一个初步的认识,但这只是 Stream 编程的入门,更有趣更有挑战性更有可玩性的还是随后即将要介绍的 map-reduce 操作。