Java8-自定义收集器的实现及源码代码调用分析
- 2020 年 2 月 18 日
- 筆記
1.引子:
Java8带给Java的最大特点就是函数式编程,其中一个重要的更新就是对于流的支持,这使得开发者可以使用一行代码就能完成从一个数据结构转化为另一个数据结构,编程效率大大提高。为了充分理解收集器Colleector接口,最好的方法就是自定义一个收集器,并分析其代码执行过程。
2.自定义收集器Collector:
CodeBlock-1:
import java.util.*; import java.util.function.BiConsumer; import java.util.function.BinaryOperator; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collector; /** * @author Fisherman */ public class MySetCollector<T> implements Collector<T, Set<T>, Set<T>> { @Override public Supplier<Set<T>> supplier() { System.out.println("supplier is invoked."); return HashSet::new; } @Override public BiConsumer<Set<T>, T> accumulator() { System.out.println("accumulator is invoked."); return Set<T>::add;//相当于 return (set,item) -> set.add(item); } @Override public BinaryOperator<Set<T>> combiner() { System.out.println("combiner is invoked."); return (set1, set2) -> { set1.addAll(set2); return set1; }; } @Override /** * 这是一个可选项,只有中间容器结果容器不一致才会被传递此接口实现 */ public Function<Set<T>, Set<T>> finisher() { System.out.println("finisher is invoked."); return Function.identity();//相当于 return t->t; } @Override public Set<Characteristics> characteristics() { System.out.println("characteristics is invoked."); return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED)); } public static void main(String[] args) { List<String> list = Arrays.asList("hello", "world", "fisherman","hello"); Set<String> set = list.stream().collect(new MySetCollector<>()); System.out.println(set); } }
控制台输出:
supplier is invoked. accumulator is invoked. combiner is invoked. characteristics is invoked. characteristics is invoked. [world, hello, fisherman]
3. 代码调用分析:
分析说明:我们按照为了解释控制台输出顺序的目的来进行代码调用的分析。由结果反推运行顺序,也是一种不错的学习方法。非常重要且需要辨析的一个知识点:控制台打出来的invoked执行顺序,代表了收集器中的方法被调用,将返回值(函数式接口实现对象)作为参数传入接口内部的执行顺序。而对元素执行操作:添加到Set数据结构,只是关于accumulator()等方法的返回值(函数式接口的实现对象)的应用,此时System.out.println(“xxx is invoked.”);已经不会再被调用了。
我们使用以下语句实现了从List数据结构到Set数据结构的转变:Set<String> set = list.stream().collect(new MySetCollector<>());
stream().collect
指向了Stream
接口中的<R, A> R collect(Collector<? super T, A, R> collector);
方法,由于多态的性质,实际上此方法是由ReferencePipeline
抽象类中的 public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {...}
实现的: CodeBlock-2:
@Override @SuppressWarnings("unchecked") public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) { A container; if (isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { container = collector.supplier().get(); BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator(); forEach(u -> accumulator.accept(container, u)); } else { container = evaluate(ReduceOps.makeRef(collector)); } return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container);
此方法中泛型说明: P_OUT:流中的单个元素类型 A:中间结果容器类型 R:返回结果容器类型
代码分析:
- 首先创造了一个中间结果容器,即:
container
; - 然后判断是否为并行流,因为此例中不是并行流,所以直接跳至
else{ }
处运行; - 运行至
makeRef()
方法,其作用是将Collector接口中的几个抽象函数式接口进行赋值: CodeBlock-3:
public static <T, I> TerminalOp<T, I> makeRef(Collector<? super T, I, ?> collector) { Supplier<I> supplier = Objects.requireNonNull(collector).supplier(); BiConsumer<I, ? super T> accumulator = collector.accumulator(); BinaryOperator<I> combiner = collector.combiner(); class ReducingSink extends Box<I> implements AccumulatingSink<T, I, ReducingSink> { @Override public void begin(long size) { state = supplier.get(); } @Override public void accept(T t) { accumulator.accept(state, t); } @Override public void combine(ReducingSink other) { state = combiner.apply(state, other.state); } } return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) { @Override public ReducingSink makeSink() { return new ReducingSink(); } @Override public int getOpFlags() { return collector.characteristics().contains(Collector.Characteristics.UNORDERED) ? StreamOpFlag.NOT_ORDERED : 0; } }; }
上述代码中的以下三个语句,就实现了对三个函数式接口的赋值。
Supplier<I> supplier = Objects.requireNonNull(collector).supplier(); BiConsumer<I, ? super T> accumulator = collector.accumulator(); BinaryOperator<I> combiner = collector.combiner();
所以,我们在控制台中可以看到,首先按序输出的语句是:
supplier is invoked. accumulator is invoked. combiner is invoked.
注意:supplier.getr()方法并不会调用System.out.println("supplier is invoked.");
,其余两个接口也有这样的性质。
接下来控制台输出了两个,为什么不是一个?:
characteristics is invoked. characteristics is invoked
这是为什么呢?请看以下分析:在 CodeBlock-3: 中,最终会运行到以下语句:
public int getOpFlags() { return collector.characteristics().contains(Collector.Characteristics.UNORDERED) ? StreamOpFlag.NOT_ORDERED : 0; }
其功能是对于判断传入的collector
接口是否含有Collector.Characteristics.UNORDERED
特性,由则返回StreamOpFlag.NOT_ORDERED
,无则返回0;这里就是第一次造成了characteristics is invoked.
的输出。
那么第二次characteristics is invoked
在控制台上的输出是在哪造成的呢?可见我们依据将 CodeBlock-3: 中的代码执行完毕,所以接下来执行: CodeBlock-2: 中接下来的代码:
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container);
这里的return语句:首先判断传入的collector
接口是否含有Collector.Characteristics.IDENTITY_FINISH
特性,如果有,那么就将原本为A类型的中间容器 container强制转为R类型返回,否则就调用finisher().apply
方法进行中间容器到结果容器的转变,并返回。
注意事项:
- 强制类型转换这里根本就没有进行A,R类型转换是否合理的判断,如果A不是继承于R类型或者A≠R类型,那么肯定为抛出异常。所以这就是 CodeBlock-3: 中的方法被修饰为
@SuppressWarnings("unchecked")
的原因,因为类型强制转换没有被检查其合法性。 - 另一方面,为了防止抛出类型转换异常,要求只要
Collector
接口的特性被修饰为Characteristics.IDENTITY_FINISH
,那么结果容器类型R
和中间结果容器类型A
一定是相同或者A extends R
,避免抛出异常。
分析到这里我们已经把控制台输出的结构都按序做了分析,但是你是否有个疑问:为何finisher
接口实现中的System.out.println("finisher is invoked.");
语句没被执行,控制台上没打出这句话?这是因为接口有Characteristics.IDENTITY_FINISH
这个特性,所以直接返回(R) container
,而finisher
接口实现从来没被读取过!所以如果将 CodeBlock-1: 中的属性Characteristics.IDENTITY_FINISH
去掉,那么就会读取finisher
接口实现。
最后的最后在说明一下数据结构上的改变:
- List 转为 Set,原List中的重复元素只能加入到Set中一个,如”hello“被去重了,见控制台对于新数据结构的遍历输出。
- 由于收集器的特性被设置为
Characteristics.UNORDERED
所以从List到Set的转换过程中,元素的顺序被打乱了:比如原来"hello", "world", "fisherman"
的顺序,现在为"world", "hello", "fisherman"
- 函数式编程的思想:这里只是提供了一个收集器的接口实现,但是其具体如何进行数据结构的转换是不需要我们进行编写的。