Stream Head的構(gòu)建(一)
Stream 的中間操作(二)
上兩篇完成了源頭和中間流的構(gòu)造岩喷,這篇看看終止操作forEach是如何將整個(gè)過程串起來的谣光。
List<String> names= Arrays.asList("one", "two", "three", "four");
names.stream()
.filter(s -> s.length() > 2)
.map(String::toUpperCase)
.forEach(System.out::println);
從上文可以看出forEach是在stream2上調(diào)用的
ReferencePipeline.java
public void forEach(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, false));
}
首先工具類ForEachOps使用Consumer構(gòu)建一個(gè)終止操作作為evaluate參數(shù)
ForEachOps.java
public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action, boolean ordered) {
...
//OfRef說明是引用類型厌秒,還有OfInt等基本類型淌铐,都是對ForEachOp在特定數(shù)據(jù)類型上定制
return new ForEachOp.OfRef<>(action, ordered);
}
//OfRef 是ForEachOp子類
OfRef(Consumer<? super T> consumer, boolean ordered) {
super(ordered);
this.consumer = consumer;
}
ForEachOp和OfRef都比較簡潔,看幾個(gè)主要方法
ForEachOp實(shí)現(xiàn)了TerminalOp的evaluateSequential和TerminalSink的get方法
public <S> Void evaluateSequential(PipelineHelper<T> helper,Spliterator<S> spliterator){
//this指的就是ForEachOp.OfRef
//下文能看到helper.wrapAndCopyInto(this, spliterator)返回的就是this
//因?yàn)椴恍枰祷刂担詆et方法返回的就是下面的null,。
return helper.wrapAndCopyInto(this, spliterator).get();
}
public Void get() {
return null;
}
//OfRef 實(shí)現(xiàn)TerminalSink的accept
public void accept(T t) {
//此例consumer就是打印的方法
consumer.accept(t);
}
接下來是evaluate方法的調(diào)用
AbstractPipeline.java
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
...
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
//非并行,走的是這個(gè)方法
:terminalOp.evaluateSequential(this,sourceSpliterator(terminalOp.getOpFlags()));
}
先看sourceSpliterator方法
private Spliterator<?> sourceSpliterator(int terminalFlags) {
Spliterator<?> spliterator = null;
if (sourceStage.sourceSpliterator != null) {
// 上篇說為什么中間流要有源頭的引用占哟,作用在這里,通過源頭的引用酿矢,獲取迭代器.
spliterator = sourceStage.sourceSpliterator;
//置為空榨乎,下次再調(diào)用進(jìn)入else拋出異常
sourceStage.sourceSpliterator = null;
}
else if (sourceStage.sourceSupplier != null) {
...
}
else {
//防止再次調(diào)用
throw new IllegalStateException(MSG_CONSUMED);
}
...
return spliterator;
}
接下來帶著兩個(gè)參數(shù)進(jìn)入上文所說的evaluateSequential方法,真正實(shí)現(xiàn)方法是helper.wrapAndCopyInto(this, spliterator)
AbstractPipeline.java
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
public final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
...
//此處AbstractPipeline類型的 p就是map方法返回的stream2,
for ( AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
//參數(shù)sink就是上面的ForEachOp.OfRef
//循環(huán)起來形成的就是單向鏈表
sink = p.opWrapSink(..., sink);
}
return (Sink<P_IN>) sink;
}
再看一下雙向鏈表的數(shù)據(jù)結(jié)構(gòu)瘫筐,Sink是這樣串起來的
stream wrap_sink.png
回到中間流的opWrapSink方法蜜暑,發(fā)現(xiàn)返回的實(shí)例是ChainedReference
Sink.java
//Sink接口是Comsume的子類,ChainedReference實(shí)現(xiàn)了Sink接口策肝,里面的方法先不用管
static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
protected final Sink<? super E_OUT> downstream;
//以下游Sink為參數(shù)肛捍,并持有引用
public ChainedReference(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
@Override
public void begin(long size) {
downstream.begin(size);
}
@Override
public void end() {
downstream.end();
}
...
}
到此中間流對源元素的操作方法就從尾到頭串起來了,只差最后一步copyInto(sink)
AbstractPipeline.java
@Override
//wrappedSink就是上面wrapSink返回值
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
...
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
//這里才開始所有的計(jì)算。begin 先通知各Sink開始準(zhǔn)備
wrappedSink.begin(spliterator.getExactSizeIfKnown());
//如第一篇所述驳糯,調(diào)用迭代器此方法發(fā)射元素
spliterator.forEachRemaining(wrappedSink);
//通知結(jié)束
wrappedSink.end();
}
else {
...
}
}
//wrappedSink.begin 就是filter方法中Sink.ChainedReference復(fù)寫的begin
public void begin(long size) {
downstream.begin(-1);
}
//然后調(diào)用map方法中Sink.ChainedReference的begin ,沒有復(fù)寫
public void begin(long size) {
downstream.begin(size);
}
//最后是OfRef繼承的begin
default void begin(long size) {}
返回看看ArrayListSpliterator迭代器的遍歷
ArrayList.java
public void forEachRemaining(Consumer<? super E> action) {
int i, hi, mc; // hoist accesses and checks from loop
ArrayList<E> lst; Object[] a;
if (action == null)
throw new NullPointerException();
if ((lst = list) != null && (a = lst.elementData) != null) {
if ((hi = fence) < 0) {
mc = lst.modCount;
hi = lst.size;
}
else
mc = expectedModCount;
if ((i = index) >= 0 && (index = hi) <= a.length) {
for (; i < hi; ++i) {
@SuppressWarnings("unchecked") E e = (E) a[i];
//把元素交給Sink
action.accept(e);
}
if (lst.modCount == mc)
return;
}
}
throw new ConcurrentModificationException();
}
如前面所述Sink是Consumer子類氢橙,實(shí)現(xiàn)了accept方法
//wrappedSink.accept就是filter方法中Sink.ChainedReference復(fù)寫的accept
public void accept(P_OUT u) {
//如果滿足條件就會調(diào)用下游map的同名方法
if (predicate.test(u))
downstream.accept(u);
}
//然后是map的accept 酝枢,先映射,再調(diào)用下游accept
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
//最后是OfRef的accept悍手,
public void accept(T t) {
//到此被打印出來
consumer.accept(t);
}
//雖然accept方法屬于不同流帘睦,卻能被一次調(diào)用,這是性能所在坦康。
至此竣付,Java 流式構(gòu)建和操作的基本框架就顯示出來了。