引子
將行為作為數(shù)據(jù)傳遞
怎樣在一行代碼里同時(shí)計(jì)算一個(gè)列表的和幔翰、最大值漩氨、最小值、平均值遗增、元素個(gè)數(shù)叫惊、奇偶分組、指數(shù)贡定、排序呢赋访?
答案是思維反轉(zhuǎn)!將行為作為數(shù)據(jù)傳遞。 文藝青年的代碼如下所示:
publicclassFunctionUtil{
publicstaticListmultiGetResult(List<Function<List<T>,?R>>?functions,?List<T>?list){
returnfunctions.stream().map(f?->?f.apply(list)).collect(Collectors.toList());
}
publicstaticvoidmain(String[]?args){
System.out.println(multiGetResult(
Arrays.asList(
list?->?list.stream().collect(Collectors.summarizingInt(x->x)),
list?->?list.stream().filter(x?->?x?<50).sorted().collect(Collectors.toList()),
list?->?list.stream().collect(Collectors.groupingBy(x->(x%2==0?"even":"odd"))),
list?->?list.stream().sorted().collect(Collectors.toList()),
list?->?list.stream().sorted().map(Math::sqrt).collect(Collectors.toMap(x->x,?y->Math.pow(2,y)))),
Arrays.asList(64,49,25,16,9,4,1,81,36)));
}
}
呃蚓耽,有點(diǎn)賣弄小聰明渠牲。 不過要是能將行為作為數(shù)據(jù)自由傳遞和施加于數(shù)據(jù)集產(chǎn)生結(jié)果,那么其代碼表達(dá)能力將如莊子之言步悠,恣意瀟灑而無所極限签杈。
行為就是數(shù)據(jù)。
Java8函數(shù)框架解讀
函數(shù)編程的最直接的表現(xiàn)鼎兽,莫過于將函數(shù)作為數(shù)據(jù)自由傳遞答姥,結(jié)合泛型推導(dǎo)能力,使代碼表達(dá)能力獲得飛一般的提升谚咬。那么鹦付,Java8是怎么支持函數(shù)編程的呢?主要有三個(gè)核心概念:
函數(shù)接口(Function)
流(Stream)
聚合器(Collector)
函數(shù)接口
關(guān)于函數(shù)接口择卦,需要記住的就是兩件事:
函數(shù)接口是行為的抽象敲长;
函數(shù)接口是數(shù)據(jù)轉(zhuǎn)換器。
最直接的支持就是 java.util.Function 包秉继。定義了四個(gè)最基礎(chǔ)的函數(shù)接口:
Supplier
: 數(shù)據(jù)提供器祈噪,可以提供 T 類型對(duì)象;無參的構(gòu)造器尚辑,提供了 get 方法辑鲤;
Function
: 數(shù)據(jù)轉(zhuǎn)換器,接收一個(gè) T 類型的對(duì)象杠茬,返回一個(gè) R類型的對(duì)象月褥; 單參數(shù)單返回值的行為接口;提供了 apply, compose, andThen, identity 方法澈蝙;
Consumer
: 數(shù)據(jù)消費(fèi)器吓坚, 接收一個(gè) T類型的對(duì)象撵幽,無返回值灯荧,通常用于根據(jù)T對(duì)象做些處理; 單參數(shù)無返回值的行為接口盐杂;提供了 accept, andThen 方法逗载;
Predicate
: 條件測(cè)試器,接收一個(gè) T 類型的對(duì)象链烈,返回布爾值厉斟,通常用于傳遞條件函數(shù); 單參數(shù)布爾值的條件性接口强衡。提供了 test (條件測(cè)試) , and-or- negate(與或非) 方法擦秽。
其中, compose, andThen, and, or, negate 用來組合函數(shù)接口而得到更強(qiáng)大的函數(shù)接口。
其它的函數(shù)接口都是通過這四個(gè)擴(kuò)展而來。
在參數(shù)個(gè)數(shù)上擴(kuò)展: 比如接收雙參數(shù)的感挥,有 Bi 前綴缩搅, 比如 BiConsumer
, BiFunction?;
在類型上擴(kuò)展: 比如接收原子類型參數(shù)的,有 [Int|Double|Long][Function|Consumer|Supplier|Predicate]
特殊常用的變形: 比如 BinaryOperator 触幼, 是同類型的雙參數(shù) BiFunction
硼瓣,二元操作符 ; UnaryOperator 是 Function?一元操作符置谦。
那么堂鲤,這些函數(shù)接口可以接收哪些值呢?
類/對(duì)象的靜態(tài)方法引用媒峡、實(shí)例方法引用瘟栖。引用符號(hào)為雙冒號(hào) ::
類的構(gòu)造器引用,比如 Class::new
lambda表達(dá)式
在博文“使用函數(shù)接口和枚舉實(shí)現(xiàn)配置式編程(Java與Scala實(shí)現(xiàn))”, “精練代碼:一次Java函數(shù)式編程的重構(gòu)之旅” 給出了基本的例子谅阿。后面還有更多例子慢宗。重在練習(xí)和嘗試。
聚合器
先說聚合器奔穿。每一個(gè)流式計(jì)算的末尾總有一個(gè)類似 collect(Collectors.toList()) 的方法調(diào)用镜沽。collect 是 Stream 的方法,而參數(shù)則是聚合器Collector贱田。已有的聚合器定義在Collectors 的靜態(tài)方法里缅茉。 那么這個(gè)聚合器是怎么實(shí)現(xiàn)的呢?
Reduce
大部分聚合器都是基于 Reduce 操作實(shí)現(xiàn)的男摧。 Reduce 蔬墩,名曰推導(dǎo),含有三個(gè)要素: 初始值 init, 二元操作符 BinaryOperator, 以及一個(gè)用于聚合結(jié)果的數(shù)據(jù)源S耗拓。
Reduce 的算法如下:
STEP1: 初始化結(jié)果 R = init 拇颅;
STEP2: 每次從 S 中取出一個(gè)值 v,通過二元操作符施加到 R 和 v 乔询,產(chǎn)生一個(gè)新值賦給 R = BinaryOperator(R, v)樟插;重復(fù) STEP2, 直到 S 中沒有值可取為止竿刁。
比如一個(gè)列表求和黄锤,Sum([1,2,3]) , 那么定義一個(gè)初始值 0 以及一個(gè)二元加法操作 BO = a + b ,通過三步完成 Reduce 操作:step1: R = 0; step2: v=1, R = 0+v = 1; step2: v=2, R = 1 + v = 3 ; step3: v = 3, R = 3 + v = 6食拜。
四要素
一個(gè)聚合器的實(shí)現(xiàn)鸵熟,通常需要提供四要素:
一個(gè)結(jié)果容器的初始值提供器 supplier ;
一個(gè)用于將每次二元操作的中間結(jié)果與結(jié)果容器的值進(jìn)行操作并重新設(shè)置結(jié)果容器的累積器 accumulator 负甸;
一個(gè)用于對(duì)Stream元素和中間結(jié)果進(jìn)行操作的二元操作符 combiner 流强;
一個(gè)用于對(duì)結(jié)果容器進(jìn)行最終聚合的轉(zhuǎn)換器 finisher(可選) 痹届。
Collectors.CollectorImpl 的實(shí)現(xiàn)展示了這一點(diǎn):
staticclassCollectorImplimplementsCollector{
privatefinalSupplier?supplier;
privatefinalBiConsumer?accumulator;
privatefinalBinaryOperator?combiner;
privatefinalFunction?finisher;
privatefinalSet?characteristics;
CollectorImpl(Supplier?supplier,
BiConsumer?accumulator,
BinaryOperator?combiner,
Function?finisher,
Set?characteristics)?{
this.supplier?=?supplier;
this.accumulator?=?accumulator;
this.combiner?=?combiner;
this.finisher?=?finisher;
this.characteristics?=?characteristics;
}
}
列表類聚合器
列表類聚合器實(shí)現(xiàn),基本是基于Reduce 操作完成的打月。 看如下代碼:
publicstatic
Collector>?toList()?{
returnnewCollectorImpl<>((Supplier>)?ArrayList::new,?List::add,
(left,?right)?->?{?left.addAll(right);returnleft;?},
CH_ID);
首先使用 ArrayList::new 創(chuàng)造一個(gè)空列表短纵; 然后 List:add 將Stream累積操作的中間結(jié)果加入到這個(gè)列表;第三個(gè)函數(shù)則將兩個(gè)列表元素進(jìn)行合并成一個(gè)結(jié)果列表中僵控。 就是這么簡(jiǎn)單香到。集合聚合器 toSet(), 字符串連接器 joining(),以及列表求和(summingXXX)报破、最大(maxBy)悠就、最小值(minBy)等都是這個(gè)套路。
映射類聚合器
映射類聚合器基于Map合并來完成充易」Fⅲ看這段代碼:
privatestatic>
BinaryOperatormapMerger(BinaryOperator<V>?mergeFunction){
return(m1,?m2)?->?{
for(Map.Entry?e?:?m2.entrySet())
m1.merge(e.getKey(),?e.getValue(),?mergeFunction);
returnm1;
};
}
根據(jù)指定的值合并函數(shù) mergeFunction, 返回一個(gè)map合并器,用來合并兩個(gè)map里相同key的值盹靴。mergeFunction用來對(duì)兩個(gè)map中相同key的值進(jìn)行運(yùn)算得到新的value值炸茧,如果value值為null,會(huì)移除相應(yīng)的key稿静,否則使用value值作為對(duì)應(yīng)key的值梭冠。這個(gè)方法是私有的,主要為支撐 toMap改备,groupingBy 而生控漠。
toMap的實(shí)現(xiàn)很簡(jiǎn)短,實(shí)際上就是將指定stream的每個(gè)元素分別使用給定函數(shù)keyMapper, valueMapper進(jìn)行映射得到 newKey, newValue悬钳,從而形成新的MapnewKey,newValue, 再使用mapMerger(mergeFunction) 生成的 map 合并器將其合并到 mapSupplier (2)盐捷。如果只傳 keyMapper, valueMapper,那么就只得到結(jié)果(1)默勾。
publicstatic>
Collector?toMap(Function?keyMapper,
Function?valueMapper,
BinaryOperator?mergeFunction,
Supplier?mapSupplier)?{
BiConsumer?accumulator
=?(map,?element)?->?map.merge(keyMapper.apply(element),
valueMapper.apply(element),?mergeFunction);
returnnewCollectorImpl<>(mapSupplier,?accumulator,?mapMerger(mergeFunction),?CH_ID);
}
toMap 的一個(gè)示例見如下代碼:
List?list?=?Arrays.asList(1,2,3,4,5);
Supplier>?mapSupplier?=?()?->?list.stream().collect(Collectors.toMap(x->x,?y->?y?*?y));
Map?mapValueAdd?=?list.stream().collect(Collectors.toMap(x->x,?y->y,?(v1,v2)?->?v1+v2,?mapSupplier));
System.out.println(mapValueAdd);
將一個(gè) List 轉(zhuǎn)成 map[1=1,2=2,3=3,4=4,5=5]碉渡,然后與另一個(gè)map[1=1,2=4,3=9,4=16,5=25]的相同key的value進(jìn)行相加。注意到, toMap 的最后一個(gè)參數(shù)是 Supplier
自定義聚合器
讓我們仿照 Collectors.toList() 做一個(gè)自定義的聚合器母剥。實(shí)現(xiàn)一個(gè)含N個(gè)數(shù)的斐波那契序列 List滞诺。由于 Reduce 每次都從流中取一個(gè)數(shù),因此需要生產(chǎn)一個(gè)含N個(gè)數(shù)的stream媳搪;可使用 Arrays.asList(1,2,3,4,5,6,7,8,9,10).stream() 铭段, 亦可使用 IntStream.range(1,11) 骤宣,不過兩者的 collector 方法是不一樣的秦爆。這里我們?nèi)∏罢摺?/p>
現(xiàn)在,需要構(gòu)造四要素:
可變的結(jié)果容器提供器 Supplier
<list
> = () -> [0, 1] 憔披; 注意這里不能使用 Arrays.asList , 因?yàn)樵摲椒ㄉ傻牧斜硎遣豢勺兊摹?lt;/list
累積器 BiConsumer
<list
, Integer> accumulator(): 這里流的元素未用等限,僅僅用來使計(jì)算進(jìn)行和終止爸吮。新的元素從結(jié)果容器中取最后兩個(gè)相加后產(chǎn)生新的結(jié)果放到結(jié)果容器中。</list
組合器 BinaryOperator
<list
> combiner() : 照葫蘆畫瓢望门,目前沒看出這步是做什么用形娇;直接 return null; 也是OK的。</list
最終轉(zhuǎn)換器 Function
<list
, List
> finisher() :在最終轉(zhuǎn)換器中筹误,移除初始設(shè)置的兩個(gè)值 0, 1 桐早。
</list
代碼如下:
/**
*?Created?by?shuqin?on?17/12/5.
*/
publicclassFiboCollectorimplementsCollector,List>{
publicSupplier>?supplier()?{
return()?->?{
List?result?=newArrayList<>();
result.add(0);?result.add(1);
returnresult;
};
}
@Override
publicBiConsumer,?Integer>?accumulator()?{
return(res,?num)?->?{
Integer?next?=?res.get(res.size()-1)?+?res.get(res.size()-2);
res.add(next);
};
}
@Override
publicBinaryOperator>?combiner()?{
returnnull;
//return?(left,?right)?->?{?left.addAll(right);?return?left;?};
}
@Override
publicFunction,?List>?finisher()?{
returnres?->?{?res.remove(0);?res.remove(1);returnres;?};
}
@Override
publicSetcharacteristics(){
returnCollections.emptySet();
}
}
List?fibo?=?Arrays.asList(1,2,3,4,5,6,7,8,9,10).stream().collect(newFiboCollector());
System.out.println(fibo);
流
流(Stream)是Java8對(duì)函數(shù)式編程的重要支撐。大部分函數(shù)式工具都圍繞Stream展開厨剪。
Stream的接口
Stream 主要有四類接口:
流到流之間的轉(zhuǎn)換:比如 filter(過濾), map(映射轉(zhuǎn)換), mapToInt|Long|Double, flatMap(高維結(jié)構(gòu)平鋪)哄酝,flatMapTo[Int|Long|Double], sorted(排序),distinct(不重復(fù)值)祷膳,peek(執(zhí)行某種操作陶衅,流不變,可用于調(diào)試)直晨,limit(限制到指定元素?cái)?shù)量), skip(跳過若干元素) 搀军;
流到終值的轉(zhuǎn)換: 比如 toArray(轉(zhuǎn)為數(shù)組),reduce(推導(dǎo)結(jié)果)勇皇,collect(聚合結(jié)果)罩句,min(最小值), max(最大值), count (元素個(gè)數(shù)), anyMatch (任一匹配), allMatch(所有都匹配)敛摘, noneMatch(一個(gè)都不匹配)的止, findFirst(選擇首元素),findAny(任選一元素) 着撩;
直接遍歷: forEach (不保序遍歷诅福,比如并行流), forEachOrdered(保序遍歷) ;
構(gòu)造流: empty (構(gòu)造空流)拖叙,of (單個(gè)元素的流及多元素順序流)氓润,iterate (無限長(zhǎng)度的有序順序流),generate (將數(shù)據(jù)提供器轉(zhuǎn)換成無限非有序的順序流)薯鳍, concat (流的連接)咖气, Builder (用于構(gòu)造流的Builder對(duì)象)
除了 Stream 本身自帶的生成Stream 的方法,數(shù)組和容器及StreamSupport都有轉(zhuǎn)換為流的方法挖滤。比如 Arrays.stream , [List|Set|Collection].[stream|parallelStream] , StreamSupport.[int|long|double|]stream崩溪;
流的類型主要有:Reference(對(duì)象流), IntStream (int元素流), LongStream (long元素流)斩松, Double (double元素流) 伶唯,定義在類 StreamShape 中,主要將操作適配于類型系統(tǒng)惧盹。
flatMap 的一個(gè)例子見如下所示乳幸,將一個(gè)二維數(shù)組轉(zhuǎn)換為一維數(shù)組:
List?nums?=?Arrays.asList(Arrays.asList(1,2,3),?Arrays.asList(1,4,9),?Arrays.asList(1,8,27))
.stream().flatMap(x?->?x.stream()).collect(Collectors.toList());
System.out.println(nums);
collector實(shí)現(xiàn)
這里我們僅分析串行是怎么實(shí)現(xiàn)的瞪讼。入口在類 java.util.stream.ReferencePipeline 的 collect 方法:
container?=?evaluate(ReduceOps.makeRef(collector));
returncollector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
??(R)?container?:?collector.finisher().apply(container);
這里的關(guān)鍵是 ReduceOps.makeRef(collector)。 點(diǎn)進(jìn)去:
publicstaticTerminalOp
makeRef(Collector?collector)
{
Supplier?supplier?=?Objects.requireNonNull(collector).supplier();
BiConsumer?accumulator?=?collector.accumulator();
BinaryOperator?combiner?=?collector.combiner();
classReducingSinkextendsBox
implementsAccumulatingSink
{
@Override
publicvoidbegin(longsize){
state?=?supplier.get();
}
@Override
publicvoidaccept(T?t){
accumulator.accept(state,?t);
}
@Override
publicvoidcombine(ReducingSink?other){
state?=?combiner.apply(state,?other.state);
}
}
returnnewReduceOp(StreamShape.REFERENCE)?{
@Override
publicReducingSinkmakeSink(){
returnnewReducingSink();
}
@Override
publicintgetOpFlags(){
returncollector.characteristics().contains(Collector.Characteristics.UNORDERED)
??StreamOpFlag.NOT_ORDERED
:0;
}
};
}
privatestaticabstractclassBox{
U?state;
Box()?{}//?Avoid?creation?of?special?accessor
publicUget(){
returnstate;
}
}
Box 是一個(gè)結(jié)果值的持有者粹断; ReducingSink 用begin, accept, combine 三個(gè)方法定義了要進(jìn)行的計(jì)算符欠;ReducingSink是有狀態(tài)的流數(shù)據(jù)消費(fèi)的計(jì)算抽象,閱讀Sink接口文檔可知瓶埋。ReduceOps.makeRef(collector) 返回了一個(gè)封裝了Reduce操作的ReduceOps對(duì)象希柿。注意到,這里都是聲明要執(zhí)行的計(jì)算养筒,而不涉及計(jì)算的實(shí)際過程狡汉。展示了表達(dá)與執(zhí)行分離的思想。真正的計(jì)算過程啟動(dòng)在 ReferencePipeline.evaluate 方法里:
finalRevaluate(TerminalOp<E_OUT,?R>?terminalOp){
assertgetOutputShape()==?terminalOp.inputShape();
if(linkedOrConsumed)
thrownewIllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed?=true;
returnisParallel()
??terminalOp.evaluateParallel(this,?sourceSpliterator(terminalOp.getOpFlags()))
:?terminalOp.evaluateSequential(this,?sourceSpliterator(terminalOp.getOpFlags()));
}
使用 IDE 的 go to implementations 功能闽颇, 跟進(jìn)去盾戴,可以發(fā)現(xiàn),最終在 AbstractPipeLine 中定義了:
@Override
finalvoidcopyInto(Sink<P_IN>?wrappedSink,?Spliterator<P_IN>?spliterator){
Objects.requireNonNull(wrappedSink);
if(!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags()))?{
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else{
copyIntoWithCancel(wrappedSink,?spliterator);
}
}
Spliterator 用來對(duì)流中的元素進(jìn)行分區(qū)和遍歷以及施加Sink指定操作兵多,可以用于并發(fā)計(jì)算尖啡。Spliterator的具體實(shí)現(xiàn)類定義在 Spliterators 的靜態(tài)類和靜態(tài)方法中。其中有:
數(shù)組Spliterator:
staticfinalclassArraySpliteratorimplementsSpliterator
staticfinalclassIntArraySpliteratorimplementsSpliterator.OfInt
staticfinalclassLongArraySpliteratorimplementsSpliterator.OfLong
staticfinalclassDoubleArraySpliteratorimplementsSpliterator.OfDouble
迭代Spliterator:
staticclassIteratorSpliteratorimplementsSpliterator
staticfinalclassIntIteratorSpliteratorimplementsSpliterator.OfInt
staticfinalclassLongIteratorSpliteratorimplementsSpliterator.OfLong
staticfinalclassDoubleIteratorSpliteratorimplementsSpliterator.OfDouble
抽象Spliterator:
publicstaticabstractclassAbstractSpliteratorimplementsSpliterator
privatestaticabstractclassEmptySpliterator,C>
publicstaticabstractclassAbstractIntSpliteratorimplementsSpliterator.OfInt
publicstaticabstractclassAbstractLongSpliteratorimplementsSpliterator.OfLong
publicstaticabstractclassAbstractDoubleSpliteratorimplementsSpliterator.OfDouble
每個(gè)具體類都實(shí)現(xiàn)了trySplit剩膘,forEachRemaining衅斩,tryAdvance,estimateSize怠褐,characteristics畏梆, getComparator。 trySplit 用于拆分流奈懒,提供并發(fā)能力奠涌;forEachRemaining,tryAdvance 用于遍歷和消費(fèi)流中的數(shù)據(jù)磷杏。下面展示了IteratorSpliterator的forEachRemaining溜畅,tryAdvance 兩個(gè)方法的實(shí)現(xiàn)〖觯可以看到慈格,木有特別的地方,就是遍歷元素并將指定操作施加于元素遥金。
@Override
publicvoidforEachRemaining(Consumer?action){
if(action?==null)thrownewNullPointerException();
Iterator?i;
if((i?=?it)?==null)?{
i?=?it?=?collection.iterator();
est?=?(long)collection.size();
}
i.forEachRemaining(action);
}
@Override
publicbooleantryAdvance(Consumer?action){
if(action?==null)thrownewNullPointerException();
if(it?==null)?{
it?=?collection.iterator();
est?=?(long)?collection.size();
}
if(it.hasNext())?{
action.accept(it.next());
returntrue;
}
returnfalse;
}
整體流程就是這樣浴捆。回顧一下:
Collector 定義了必要的聚合操作函數(shù)稿械;
ReduceOps.makeRef 將 Collector 封裝成一個(gè)計(jì)算對(duì)象 ReduceOps 选泻,依賴的 ReducingSink 定義了具體的流數(shù)據(jù)消費(fèi)過程;
Spliterator 用于對(duì)流中的元素進(jìn)行分區(qū)和遍歷以及施加Sink指定的操作溜哮。
Pipeline
那么滔金,Spliterator 又是從哪里來的呢色解?是通過類 java.util.stream.AbstractPipeline 的方法 sourceSpliterator 拿到的:
privateSpliterator?sourceSpliterator(intterminalFlags)?{
//?Get?the?source?spliterator?of?the?pipeline
Spliterator?spliterator?=null;
if(sourceStage.sourceSpliterator?!=null)?{
spliterator?=?sourceStage.sourceSpliterator;
sourceStage.sourceSpliterator?=null;
}
elseif(sourceStage.sourceSupplier?!=null)?{
spliterator?=?(Spliterator)?sourceStage.sourceSupplier.get();
sourceStage.sourceSupplier?=null;
}
else{
thrownewIllegalStateException(MSG_CONSUMED);
}
//?code?for?isParallel
returnspliterator;
}
這里的 sourceStage 是一個(gè) AbstractPipeline茂嗓。 Pipeline 是實(shí)現(xiàn)流式計(jì)算的流水線抽象餐茵,也是Stream的實(shí)現(xiàn)類∈鑫可以看到忿族,java.util.stream 定義了四種 pipeline: DoublePipeline, IntPipeline, LongPipeline, ReferencePipeline◎蛎可以重點(diǎn)看 ReferencePipeline 的實(shí)現(xiàn)道批。比如 filter, map
abstractclassReferencePipeline
extendsAbstractPipeline>
implementsStream
@Override
publicfinalStreamfilter(Predicatepredicate)
{
Objects.requireNonNull(predicate);
returnnewStatelessOp(this,?StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED)?{
@Override
SinkopWrapSink(intflags,?Sink?sink){
returnnewSink.ChainedReference(sink)?{
@Override
publicvoidbegin(longsize){
downstream.begin(-1);
}
@Override
publicvoidaccept(P_OUT?u){
if(predicate.test(u))
downstream.accept(u);
}
};
}
};
}
@Override
@SuppressWarnings("unchecked")
publicfinalStreammap(Function?mapper){
Objects.requireNonNull(mapper);
returnnewStatelessOp(this,?StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED?|?StreamOpFlag.NOT_DISTINCT)?{
@Override
SinkopWrapSink(intflags,?Sink?sink){
returnnewSink.ChainedReference(sink)?{
@Override
publicvoidaccept(P_OUT?u){
downstream.accept(mapper.apply(u));
}
};
}
};
}
套路基本一樣,關(guān)鍵點(diǎn)在于 accept 方法入撒。filter 只在滿足條件時(shí)將值傳給下一個(gè) pipeline, 而 map 將計(jì)算的值傳給下一個(gè) pipeline. StatelessOp 沒有什么邏輯隆豹,JDK文檔解釋是:Base class for a stateless intermediate stage of a Stream。相應(yīng)還有一個(gè) StatefulOp, Head茅逮。 這些都是 ReferencePipeline 璃赡,負(fù)責(zé)將值在 pipeline 之間傳遞,交給 Sink 去計(jì)算献雅。
staticclassHeadextendsReferencePipeline
abstractstaticclassStatelessOpextendsReferencePipeline
abstractstaticclassStatefulOpextendsReferencePipeline
至此碉考,我們對(duì)整個(gè)流計(jì)算過程有了更清晰的認(rèn)識(shí)。 細(xì)節(jié)可以再逐步推敲挺身。
擴(kuò)展閱讀
來源:http://www.cnblogs.com/lovesqcc/p/7965387.html