函數(shù)式編程的概念
簡單說,"函數(shù)式編程"是一種"編程范式"(programming paradigm)力试,也就是如何編寫程序的方法論。
它屬于"結(jié)構(gòu)化編程"的一種,主要思想是把運算過程盡量寫成一系列嵌套的函數(shù)調(diào)用
為什么要使用函數(shù)式編程糯笙,和命令是編程有什么不同點和優(yōu)點呢?
最重要的就是他們的關(guān)注點不一樣撩银,命令式編程里面我們關(guān)注的是怎么樣做给涕,而函數(shù)式編程里我們關(guān)注的是做什么,也就是說在命令式編程里你要告訴程序要怎么樣做才能實現(xiàn)一個功能额获,而函數(shù)式編程里面你不需要告訴它怎么樣做够庙,你只告訴它你要實現(xiàn)什么樣的功能,而不需要關(guān)注實現(xiàn)的細(xì)節(jié)抄邀,這樣說還是有些抽象耘眨,我么們來以一個具體里例子來說明一下。
- 假設(shè)我們現(xiàn)在有個需求境肾,需要在一堆數(shù)字里面找到最小的一個剔难,你會怎么做呢胆屿?
命令式編程
public class LombokTest {
public static void main(String[] args) {
int[] nums = {5,52,99,68,3,-12};
int min = Integer.MAX_VALUE;
for(int i : nums){
if(i < min){
min = i;
}
}
System.out.println("最小值="+min);
}
}
結(jié)果
最小值=-12
這種解決思路就是命令式編程,我們要定義變量偶宫,我們要用 for 循環(huán),要把每個數(shù)字跟最小值對比一下非迹,如果符合還要把最小值在更新一下,這就是命令式編程纯趋,我們要告訴程序所有實現(xiàn)的細(xì)節(jié)彻秆。
函數(shù)式編程
// jdk 1.8
int asInt = IntStream.of(nums).min().getAsInt();
System.out.println("jdk1.8 函數(shù)式編程="+asInt);
結(jié)果
jdk1.8 函數(shù)式編程=-12
我們可以看到結(jié)果是一樣
在比如這一堆數(shù)字比較大,可能有幾億條结闸,在命令式編程里面唇兑,使用一個 for 循環(huán)性能可能不達(dá)標(biāo),我們可能要使用多線程桦锄,要定義線程池扎附,要把數(shù)據(jù)拆分,最后得到一個最小值结耀,這要做還是比較麻煩的我們所有的工作都需要自己去做留夜,而在函數(shù)式編程里面在這種并行的環(huán)境下你不需要做這么多細(xì)節(jié)谋减,只需要調(diào)用一個函數(shù)丽声,就可以得到想要的結(jié)果
// 并行
int pInt = IntStream.of(nums).parallel().min().getAsInt();
System.out.println("parallel result = "+pInt);
結(jié)果
parallel result = -12
我們看到結(jié)果還是一樣的 ,這樣寫是不是爽多了骆膝?
函數(shù)式編程還有一個比較明顯的有點黑毅,使用 lombok 表達(dá)式可以使代碼更加的簡短嚼摩,好讀。
- 比如線程的創(chuàng)建
jdk1.8 之前
new Thread(new Runnable(){
@Override
public void run() {
System.out.println("jdk 1.8 之前創(chuàng)建線程");
}
}).start();
jdk1.8 之后
new Thread(() -> {
System.out.println("jdk 1.8 之后創(chuàng)建線程");
}).start();
函數(shù)式帶輸入輸出參數(shù)的寫法
所謂的函數(shù)式接口矿瘦,是指只有一個抽象方法(這里注意不是只有一個方法枕面,1.8以及后 interface中可以有默認(rèn)實現(xiàn)的方法關(guān)鍵字是default,如果他有兩個及以上抽象方法,就不是函數(shù)式接口)的接口缚去,jdk8幫我們提供了一個注解潮秘,幫助我們檢查編譯時是否合格 @FunctionInterface,允許有端個 default 修飾的非抽象方法,default關(guān)鍵字在API中的解釋為易结,意思是默認(rèn)方法能夠向庫的接口添加新功能枕荞,并確保與為這些接口的舊版本編寫的代碼兼容。并且不用在其子類進(jìn)行逐個實現(xiàn)搞动。
@FunctionalInterface
interface NumsFunciton{
int numSums(int i);
// 默認(rèn)實現(xiàn)的方法 不需要在子類中實現(xiàn)
default int bool(int b){
return b * 5;
}
default int boola(int b){
return b * 8;
}
}
public class LombokTest2 {
public static void main(String[] args) {
// 一
NumsFunciton n = (i) -> i * 5;
// 二 由于只有一個參數(shù) 括號可以省略 這種最常見
NumsFunciton n1 = i -> i * 6;
// 三
NumsFunciton n2 = (int i) -> i * 7;
// 四
NumsFunciton n4 = (int i) -> {
return i * 8;
};
System.out.println(n.bool(9));
}
}
注意函數(shù)式接口多層繼承 default 默認(rèn)方法覆蓋問題
@FunctionalInterface
interface NumsFunciton{
int numSums(int i);
// 默認(rèn)實現(xiàn)的方法 不需要在子類中實現(xiàn)
default int add(){
return 18;
}
}
@FunctionalInterface
interface NumsFunciton2{
int numSums(int i);
// 默認(rèn)實現(xiàn)的方法 不需要在子類中實現(xiàn)
default int add(){
return 19;
}
}
@FunctionalInterface
interface NumsFunciton3 extends NumsFunciton,NumsFunciton2{
@Override
default int add() {
return NumsFunciton.super.add();
}
}
這里可以指定調(diào)用哪個函數(shù)接口的默認(rèn)方法
java 8 提供的函數(shù)式接口
interface IMoneyFormat{
// 格式化
String format(int i);
}
class MyMoney{
private final int money;
public MyMoney(int money){
this.money = money;
}
// 打印
public void printMoney(IMoneyFormat moneyFormat){
System.out.println("我的小目標(biāo):"+moneyFormat.format(money));
}
}
public class LombokTest3 {
public static void main(String[] args) {
// 初始化
MyMoney myMoney = new MyMoney(999999999);
// 使用函數(shù)式
myMoney.printMoney(i -> new DecimalFormat("#,###").format(i));
}
}
Loamb 表達(dá)式其實不關(guān)心實現(xiàn)的那個接口躏精,所以他不需要知道接口的名字,也不需要知道它的方法滋尉,它是需需要知道輸入什么玉控,輸出是什么飞主,這一輸入的是 int 類型狮惜,輸出的是 String 類型高诺,所有我們不需要定義接口,只需要輸入是什么輸出是什么
java 8 提供的函數(shù)式接口優(yōu)化
class MyMoney{
private final int money;
public MyMoney(int money){
this.money = money;
}
// 使用 jdk 1.8 函數(shù)式接口 Function
public void printMoney(Function<Integer,String> moneyFormat){
System.out.println("我的小目標(biāo):"+moneyFormat.apply(this.money));
}
}
public class LombokTest3 {
public static void main(String[] args) {
// 初始化
MyMoney myMoney = new MyMoney(999999999);
// 使用函數(shù)式
myMoney.printMoney(i -> new DecimalFormat("#,###").format(i));
}
}
其結(jié)果是一樣的碾篡,除了不需要定義接口外虱而,還可以支持連式編程
// 初始化
MyMoney myMoney = new MyMoney(999999999);
// 使用函數(shù)式
// myMoney.printMoney(i -> new DecimalFormat("#,###").format(i));
Function<Integer,String> moneyFormat = i -> new DecimalFormat("#,###").format(i);
// 函數(shù)式接口的鏈?zhǔn)骄幊? myMoney.printMoney(moneyFormat.andThen(s -> "人民幣:"+s));
結(jié)果
我的小目標(biāo):人民幣:999,999,999
常見的函數(shù)式接口
函數(shù)式接口簡單示例
System.out.println("=================== 斷言函數(shù)接口 ===================");
Predicate<Integer> predicate = i -> i > 10;
System.out.println(predicate.test(9));
System.out.println("=================== 消費函數(shù)接口 沒有返回值 ===================");
Consumer<String> consumer = s -> System.out.println(s);
consumer.accept("哈哈哈");
System.out.println("=================== 該接口就一個抽象方法get方法,該接口在JAVA8之函數(shù)式接口返回實例篇中第一個示例就是利用的該接口 ===================");
System.out.println("=================== 不用傳入任何參數(shù),直接返回一個泛型T的實例.就如同無參構(gòu)造一樣 ===================");
Supplier<LombokTest4> supplier = LombokTest4 :: new;
LombokTest4 r1 = supplier.get();
LombokTest4 r2 = supplier.get();
System.out.println("實例1="+r1);
System.out.println("實例2="+r2);
System.out.println("實例2=實例2,"+(r1==r2));
System.out.println("=================== 消費函數(shù)接口 沒有返回值 ===================");
BiFunction<Integer,Integer,String> biFunction = (x,y) -> {
return x+y+"";
};
System.out.println(biFunction.apply(5, 8));
結(jié)果
=================== 斷言函數(shù)接口 ===================
false
=================== 消費函數(shù)接口 沒有返回值 ===================
哈哈哈
=================== 該接口就一個抽象方法get方法,該接口在JAVA8之函數(shù)式接口返回實例篇中第一個示例就是利用的該接口 ===================
=================== 不用傳入任何參數(shù),直接返回一個泛型T的實例.就如同無參構(gòu)造一樣 ===================
實例1=com.lombok.LombokTest4@76fb509a
實例2=com.lombok.LombokTest4@300ffa5d
實例2=實例2,false
=================== 消費函數(shù)接口 沒有返回值 ===================
13
方法的引用
class Dog{
private String name = "哮天犬";
// 默認(rèn)十斤狗糧
private int food = 100;
public Dog(){}
public Dog(String name){
this.name = name;
}
/**
* 狗叫的靜態(tài)方法
* @param dog
*/
public static void bark(Dog dog){
System.out.println(dog.name + "叫了");
}
/**
* 吃狗糧
* jdk 默認(rèn)會把當(dāng)前實例傳入到非靜態(tài)方法,參數(shù)名為 this,位置是第一個
* 所以子在使用類名引用的時候?qū)嶋H上是有兩個參數(shù)
* 所以在費靜態(tài)方法中可以使用 this 關(guān)鍵字
* @param num
* @return
*/
public int eat(Dog this,int num){
System.out.println("吃了"+num+"斤狗糧");
this.food -= num;
return this.food;
}
@Override
public String toString() {
return this.name;
}
}
public class LombokTest5 {
public static void main(String[] args) {
// 方法調(diào)用
Consumer<String> consumer = s -> System.out.println(s);
consumer.accept("-----");
// lombok 是一個匿名的函數(shù)开泽,左邊是方法參數(shù)牡拇,右邊是執(zhí)行體
// 當(dāng)執(zhí)行體的參數(shù)和箭頭左邊的類型是一樣的時候就可以縮寫,寫成方法引用的形式
Consumer<String> consumer1 = System.out::println;
consumer1.accept("這是方法調(diào)用");
System.out.println("================= 靜態(tài)方法的方法引用 類名+方法名 ================");
Consumer<Dog> dog = Dog::bark;
dog.accept(new Dog());
System.out.println("================= 非靜態(tài)方法的方法引用 對象實例+方法名 ================");
Dog d = new Dog();
Function<Integer,Integer> function = d::eat;
System.out.println("還剩下"+function.apply(3)+"斤狗糧");
System.out.println("================= 可以看到方法的輸入輸出類型一樣穆律,可以使用一元函數(shù)式接口 對象實例+方法名 ================");
UnaryOperator<Integer> unaryOperator = d::eat;
System.out.println("還剩下"+unaryOperator.apply(5)+"斤狗糧");
System.out.println("================= 還可以使用基本函數(shù)式接口 對象實例+方法名 ================");
IntUnaryOperator intUnaryOperator = d::eat;
System.out.println("還剩下"+intUnaryOperator.applyAsInt(1)+"斤狗糧");
System.out.println("================= 非靜態(tài)方法的方法引用 類名+方法名 ================");
System.out.println("================= jdk 默認(rèn)會把當(dāng)前實例傳入到非靜態(tài)方法惠呼,參數(shù)名為 this,位置是第一個 ================");
System.out.println("================= 所以子在使用類名引用的時候?qū)嶋H上是有兩個參數(shù) ================");
System.out.println("================= 所以在費靜態(tài)方法中可以使用 this 關(guān)鍵字 ================");
System.out.println();
System.out.println(" public int eat(Dog this,int num){ ");
System.out.println(" System.out.println(\"吃了\"+num+\"斤狗糧\");");
System.out.println(" this.food -= num; ");
System.out.println(" this.food -= num;");
System.out.println(" return this.food;");
System.out.println(" } ");
BiFunction<Dog,Integer,Integer> biFunction = Dog::eat;
System.out.println("還剩下"+biFunction.apply(new Dog(), 8)+"斤狗糧");
System.out.println("================= 沒有參數(shù)的構(gòu)造函數(shù)的方法引用 ================");
Supplier<Dog> supplier = Dog::new;
System.out.println("創(chuàng)建新的對象:"+supplier.get());
System.out.println("================= 帶參數(shù)的構(gòu)造函數(shù)的方法引用 ================");
Function<String,Dog> func = Dog::new;
System.out.println("創(chuàng)建新的對象:"+func.apply("旺財"));
}
}
結(jié)果
-----
這是方法調(diào)用
================= 靜態(tài)方法的方法引用 類名+方法名 ================
哮天犬叫了
================= 非靜態(tài)方法的方法引用 對象實例+方法名 ================
吃了3斤狗糧
還剩下97斤狗糧
================= 可以看到方法的輸入輸出類型一樣,可以使用一元函數(shù)式接口 對象實例+方法名 ================
吃了5斤狗糧
還剩下92斤狗糧
================= 還可以使用基本函數(shù)式接口 對象實例+方法名 ================
吃了1斤狗糧
還剩下91斤狗糧
================= 非靜態(tài)方法的方法引用 類名+方法名 ================
================= jdk 默認(rèn)會把當(dāng)前實例傳入到非靜態(tài)方法峦耘,參數(shù)名為 this,位置是第一個 ================
================= 所以子在使用類名引用的時候?qū)嶋H上是有兩個參數(shù) ================
================= 所以在費靜態(tài)方法中可以使用 this 關(guān)鍵字 ================
public int eat(Dog this,int num){
System.out.println("吃了"+num+"斤狗糧");
this.food -= num;
this.food -= num;
return this.food;
}
吃了8斤狗糧
還剩下92斤狗糧
================= 沒有參數(shù)的構(gòu)造函數(shù)的方法引用 ================
創(chuàng)建新的對象:哮天犬
================= 帶參數(shù)的構(gòu)造函數(shù)的方法引用 ================
創(chuàng)建新的對象:旺財
類型推斷
lombok 表達(dá)式是一個匿名函數(shù)剔蹋,它最終返回了一個實現(xiàn)了指定接口的對象,所以要告訴它實現(xiàn)了哪一個接口辅髓,否則就會報錯泣崩,這就是類型推斷
@FunctionalInterface
interface IMath{
int add(int x,int y);
}
@FunctionalInterface
interface IMath2{
int add(int x,int y);
}
public class LombokTeset6 {
public static void main(String[] args) {
// 變量類型定義
IMath iMath = (x,y) -> x+y;
System.out.println(iMath.add(1,3));
// 數(shù)組里
IMath[] iMaths = {(x,y)->x+y,(x,y)->x*y};
System.out.println(iMaths[0].add(1,2));
System.out.println(iMaths[1].add(3,4));
// 強轉(zhuǎn)
Object obj = (IMath)(x,y) -> x+y;
// 通過類型返回
IMath iMath1 = createIMath();
// 通過方法傳入 等價于 變量類型定義
LombokTeset6 lombokTeset6 = new LombokTeset6();
// 注意如果有方法重載需要指定調(diào)用哪一個方法,強轉(zhuǎn)即可
lombokTeset6.test((IMath) (x,y) -> x+y);
lombokTeset6.test((IMath2) (x,y) -> x+y);
}
public void test(IMath math){}
public void test(IMath2 math){}
public static IMath createIMath(){
return (x,y) -> x / y;
}
}
變量引用
String str = "aaa";
Consumer<String> consumer = s -> System.out.println(s+str);
這樣寫是沒有問題的洛口,大家都知道 jdk 1.8 之前匿名類應(yīng)用外部變量必須是 final 的類型矫付,jdk 1.8 之后就不用了嗎?實際上也是需要寫的第焰,只是在 jdk 1.8 默認(rèn)可以不寫买优,不加 final 關(guān)鍵字,是實際上變量也是不能修改的挺举,試著改一下
提示已經(jīng)很清晰了而叼,必須是一個 final 或者實際上使 final 就是說初始化之后就不能最修改了。
那么為什么匿名類引用外部變量不能修改呢豹悬?實際上在 java 里面參數(shù)傳參傳的是值而不是引用葵陵,外部變量和引用的變量都指向了同一個對象,當(dāng)匿名類應(yīng)用外部外部變量的時候瞻佛,如果外部變量改變了那么其實應(yīng)用的變量和外部變量是沒有任何關(guān)系了脱篙,那么就有可能導(dǎo)致程序的不準(zhǔn)確性,那么這時引用外部變量又有什么意義呢伤柄?所以外部變量是不能修改的绊困。
級聯(lián)表達(dá)式和柯里化
/**
* 級聯(lián)表達(dá)式
*/
public class LombokTest8 {
public static void main(String[] args) {
System.out.println("==================== 級聯(lián)表達(dá)式 ==================");
/**
* 例如: x -> y -> x+y
* 我們知道 lombda 表達(dá)式 左邊為參數(shù),右邊是函數(shù)适刀,所有者個表倒是最終會返回一個這樣的函數(shù)
* 這里暫且認(rèn)為是 Integer 類型
* Function<Integer,Function<Integer,Integer>> function = x ->y -> x+y;
*/
// 這就是級聯(lián)表達(dá)式
Function<Integer,Function<Integer,Integer>> function = x ->y -> x+y;
System.out.println(function.apply(1).apply(2));
System.out.println("==================== 柯里化 ==================");
System.out.println("==================== 柯里化:把多個參數(shù)的函數(shù)轉(zhuǎn)換為只有一個參數(shù)的函數(shù) ==================");
System.out.println("==================== 柯里化的目的:函數(shù)的標(biāo)準(zhǔn)化 ==================");
Function<Integer,Function<Integer,Function<Integer ,Integer>>> f = x -> y -> z -> x * y * z;
System.out.println(f.apply(1).apply(2).apply(3));
System.out.println("==================== 柯里化的循環(huán)調(diào)用 ==================");
int num[] = {4,5,6};
for (int i = 0; i < num.length; i++) {
if(f instanceof Function){
Object obj = f.apply(num[i]);
if(obj instanceof Function){
f =(Function) obj;
}else{
System.out.println("調(diào)用結(jié)果為:"+obj);
}
}
}
}
}
結(jié)果
==================== 級聯(lián)表達(dá)式 ==================
3
==================== 柯里化 ==================
==================== 柯里化:把多個參數(shù)的函數(shù)轉(zhuǎn)換為只有一個參數(shù)的函數(shù) ==================
==================== 柯里化的目的:函數(shù)的標(biāo)準(zhǔn)化 ==================
6
==================== 柯里化的循環(huán)調(diào)用 ==================
調(diào)用結(jié)果為:120
stream 流編程
概念
首先它是一個高級的迭代器(Iterator),它不是一個數(shù)據(jù)結(jié)構(gòu)秤朗,不是一個集合,它不會存放數(shù)據(jù)笔喉,stream 關(guān)注的是怎樣把數(shù)據(jù)高效的處理取视,它其實就是把數(shù)據(jù)在流水線里面處理硝皂,它和普通的 Iterator 不同的是,它可以并行遍歷作谭,普通的 Iterator 只能是串行稽物,在一個線程中執(zhí)行
外部迭代和內(nèi)部迭代
使用for等進(jìn)行迭代我們叫做外部迭代,使用stream流迭代叫做內(nèi)部迭代折欠,內(nèi)部迭代有什么好處贝或,當(dāng)數(shù)量很大是我們不需要對數(shù)據(jù)進(jìn)行拆分
int[] nums = {4,5,6};
// 外部迭代
int sum = 0;
for (int num : nums) {
sum += num;
}
System.out.println("結(jié)果是:"+sum);
// 使用 stream 的內(nèi)部迭代
// 這個 of 可以理解為 new
int sum2 = IntStream.of(nums).sum();
System.out.println("結(jié)果是:"+sum2);
結(jié)果都是一樣的
中間操作/種植操作和惰性求值
中間操作:返回stream的操作
終止操作:得到特定的結(jié)果
惰性求值:終止沒有調(diào)用的情況下,中間操作不會執(zhí)行
public class StreamTest {
public static void main(String[] args) {
int[] nums = {4,5,6};
// 外部迭代
int sum = 0;
for (int num : nums) {
sum += num;
}
System.out.println("結(jié)果是:"+sum);
// 使用 stream 的內(nèi)部迭代
// 這個 of 可以理解為 new
int sum2 = IntStream.of(nums).sum();
System.out.println("結(jié)果是:"+sum2);
// map 就是中間操作锐秦,返回 stream 的的操作
// sum 就是總之操作
int result = IntStream.of(nums).map(StreamTest::mulitplyNum).sum();
System.out.println("結(jié)果為:"+result);
System.out.println("===================== 惰性求值就是始終沒有調(diào)用的情況下咪奖,中間操作就會執(zhí)行 =====================");
// 這一沒有調(diào)用 sum 終止操作不會執(zhí)行
IntStream.of(nums).map(StreamTest::mulitplyNum);
}
public static int mulitplyNum(int i){
System.out.println("執(zhí)行了乘以 8 的操作");
return i * 8;
}
}
流的創(chuàng)建
List<String> list = new ArrayList<String>();
list.add("a");
list.add("b");
list.add("c");
// 從集合創(chuàng)建
list.stream().forEach(System.out::print);
list.parallelStream();
// 從數(shù)組創(chuàng)建
Arrays.stream(new int[]{3,6,9});
// 創(chuàng)建數(shù)字流
IntStream.of(4,5,6);
IntStream.rangeClosed(1,10);
// 使用 random 創(chuàng)建一個無限流
new Random().ints().limit(10);
// 自己生產(chǎn)流
Random ran = new Random();
Stream.generate(() -> ran.nextInt()).limit(20);
中間操作
無狀態(tài),有狀態(tài)操作
無狀態(tài)操作就是表示當(dāng)前的操作跟其他元素沒有依賴關(guān)系酱床,有狀態(tài)就是表示當(dāng)前的結(jié)果需要依賴有些其他的元素赡艰,好比一個排序操作,就需要依賴所有的元素都計算完畢斤葱,它才有一個最終的排序結(jié)果慷垮,這就是一個有狀態(tài)的操作。
public static void main(String[] args) {
String str = "my name is 007 v";
System.out.println("================= 把每個單詞的長度調(diào)出來 =================");
// filter 里是一個斷言揍堕,比如只打印長度大于 1 的長度
Stream.of(str.split(" ")).filter(s -> s.length() > 1)
.map(s -> s+":"+s.length()+",").forEach(System.out::print);
System.out.println();
// flatMap A -> B 屬性(是個集合)料身,最終得到所有的 A 元素里面的所有 B 屬性集合
// intStream / longStream 并不是 Stream 的子類,所以要進(jìn)行裝箱 boxed
Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed()).forEach(i -> System.out.print(i+","));
System.out.println();
// 數(shù)組 char 類型
Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed()).forEach(i -> System.out.print((char)i.intValue()+","));
System.out.println();
System.out.println("================= peek 用戶 debug ,是個中間操作衩茸,和 forEach 是終止操作 =================");
// 這里這里會打印兩次芹血,peek 一次 ,forEach 一次
Stream.of(str.split(" ")).peek(System.out::println).forEach(System.out::println);
System.out.println("================= limit 使用楞慈,主要用于無限流 =================");
// 創(chuàng)建一個無限流幔烛,不出出錯,但是不會停止
// new Random().ints().forEach(System.out::println);
// 創(chuàng)建 10 個 大于 100 小于 1000 的數(shù)
new Random().ints().filter(i -> i > 100 && i < 1000).limit(10)
.forEach(System.out::println);
}
結(jié)果
================= 把每個單詞的長度調(diào)出來 =================
my:2,name:4,is:2,007:3,
109,121,110,97,109,101,105,115,48,48,55,118,
m,y,n,a,m,e,i,s,0,0,7,v,
================= peek 用戶 debug ,是個中間操作囊蓝,和 forEach 是終止操作 =================
my
my
name
name
is
is
007
007
v
v
================= limit 使用饿悬,主要用戶無限流 =================
[681, 341, 299, 861, 934, 365, 277, 747, 759, 690]
終止操作
public static void main(String[] args) {
String str = "my name is 007";
System.out.println();
System.out.println("================= 使用并行 (順序被打亂了) =================");
str.chars().parallel().forEach(i -> System.out.print((char)i));
System.out.println();
System.out.println("================= 使用forEachOrdered 保證順序 =================");
str.chars().parallel().forEachOrdered(i -> System.out.print((char)i));
System.out.println();
System.out.println("================= 收集到 List =================");
List<String> collect = Stream.of(str.split(" ")).collect(Collectors.toList());
System.out.println(collect.toString());
System.out.println("================= 使用 reduce 拼接字符串 =================");
Optional<String> reduce = Stream.of(str.split(" ")).reduce((x, y) -> x + "-" + y);
// 這種如果返回空 會拋異常 throw new NoSuchElementException("No value present");
System.out.println(reduce.get());
// 如果沒有返回空串
System.out.println(reduce.orElse(""));
// 帶初始值化值得 reduce
String reduce2 = Stream.of(str.split(" ")).reduce("", (x, y) -> x + "|" + y);
System.out.println(reduce2);
// 獲取單詞總長度
Integer reduce1 = Stream.of(str.split(" ")).map(s -> s.length()).reduce(0, (x, y) -> x + y);
System.out.println("獲取字符串長度:"+reduce1);
System.out.println("================= max 的使用 =================");
Optional<String> max = Stream.of(str.split(" ")).max((s1, s2) -> s1.length() - s2.length());
System.out.println("長度最長的單詞= "+max.get());
System.out.println("================= 使用 findFirst 短路操作 =================");
// ints() 是無限流 是不會終止的 使用 短路操作會使其終止
OptionalInt first = new Random().ints().findFirst();
System.out.println(first.getAsInt());
}
結(jié)果
================= 使用并行 (順序被打亂了) =================
is07 0namemy
================= 使用forEachOrdered 保證順序 =================
my name is 007
================= 收集到 List =================
[my, name, is, 007]
================= 使用 reduce 拼接字符串 =================
my-name-is-007
my-name-is-007
|my|name|is|007
獲取字符串長度:11
================= max 的使用 =================
長度最長的單詞= name
================= 使用 findFirst 短路操作 =================
-555829999
并行流
public class StreamTest5 {
public static void main(String[] args) {
System.out.println("============ peek 串行流 單線程 ==============");
// 單線程
IntStream.range(1,5).peek(StreamTest5::debug).count();
System.out.println("============ parallel 并行流 多線程 ==============");
// 多線程并行調(diào)用 parallel
IntStream.range(1,5 ).parallel().peek(StreamTest5::debug).count();
System.out.println("============ 現(xiàn)在要實現(xiàn)一個這樣的效果,先并行聚霜,在串行 ==============");
System.out.println("============ 多次調(diào)用 parallel / sequential ,以最后此一次調(diào)用為準(zhǔn) ==============");
IntStream.range(1,5)
//調(diào)用 parallel 產(chǎn)生并行流
.parallel().peek(StreamTest5::debug)
// 調(diào)用 sequential 產(chǎn)生串行流
.sequential().peek(StreamTest5::debug)
.count();
System.out.println("============ 并行流使用的線程池是:ForkJoinPool.commonPool ==============");
System.out.println("============ 默認(rèn)線程數(shù)是當(dāng)前 CPU 的個數(shù) ==============");
System.out.println("============ 使用 System.setProperty(\"java.util.concurrent.ForkJoinPoll.common.parallelism\",\"20\") 修改默認(rèn)線程數(shù) ==============");
System.setProperty("java.util.concurrent.ForkJoinPoll.common.parallelism","20");
IntStream.range(1,5).parallel().peek(StreamTest5::debug2).count();
System.out.println("============ 使用自己的線程池狡恬,不使用默認(rèn)的線程池,防止任務(wù)被阻塞 ==============");
ForkJoinPool pool = new ForkJoinPool(20);
pool.submit(() -> IntStream.range(1,5).parallel()
.peek(StreamTest5::debug2).count());
// 關(guān)閉線程池
pool.shutdown();
// 讓守護(hù)線程等待蝎宇,防止主線程運行完 看不到效果
synchronized (pool){
try {
pool.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void debug(int i){
System.out.println("debug:"+i);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void debug2(int i){
System.out.println(Thread.currentThread().getName()+" debug:"+i);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
結(jié)果
============ peek 串行流 單線程 ==============
debug:1
debug:2
debug:3
debug:4
============ parallel 并行流 多線程 ==============
debug:3
debug:2
debug:1
debug:4
============ 現(xiàn)在要實現(xiàn)一個這樣的效果弟劲,先并行,在串行 ==============
============ 多次調(diào)用 parallel / sequential ,以最后此一次調(diào)用為準(zhǔn) ==============
debug:1
debug:1
debug:2
debug:2
debug:3
debug:3
debug:4
debug:4
============ 并行流使用的線程池是:ForkJoinPool.commonPool ==============
============ 默認(rèn)線程數(shù)是當(dāng)前 CPU 的個數(shù) ==============
============ 使用 System.setProperty("java.util.concurrent.ForkJoinPoll.common.parallelism","20") 修改默認(rèn)線程數(shù) ==============
main debug:3
ForkJoinPool.commonPool-worker-0 debug:2
main debug:4
ForkJoinPool.commonPool-worker-0 debug:1
============ 使用自己的線程池姥芥,不使用默認(rèn)的線程池兔乞,防止任務(wù)被阻塞 ==============
ForkJoinPool-1-worker-25 debug:3
ForkJoinPool-1-worker-18 debug:2
ForkJoinPool-1-worker-4 debug:1
ForkJoinPool-1-worker-11 debug:4
收集器
class Student{
private int id; // 編號
private String name; // 年齡
private int grade; // 班級
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getGrade() {
return grade;
}
public void setGrade(int grade) {
this.grade = grade;
}
@Override
public String toString() {
return "Student{" +
"id=" + id +
", name='" + name + '\'' +
", grade=" + grade +
'}';
}
public Student(int id, String name, int grade) {
this.id = id;
this.name = name;
this.grade = grade;
}
}
public class StreamTest6 {
public StreamTest6() {
}
public static void main(String[] args) {
List<Student> students = Arrays.asList(
new Student(1,"張三",1),
new Student(2,"李四",2),
new Student(3,"王五",3),
new Student(4,"趙六",3),
new Student(5,"張三",2));
System.out.println("===================== 得到所有學(xué)生的 id ====================");
System.out.println("===================== s -> s.getAge() == Student::getAge ====================");
System.out.println("===================== Student::getAge 不會多生成一個類似 lambda$0 這樣的函數(shù),推薦使用 ====================");
// 轉(zhuǎn) List
List<Integer> ids = students.stream().map(Student::getId).collect(Collectors.toList());
System.out.println("id列表:"+ids);
System.out.println("===================== 得到所有學(xué)生的 姓名 ====================");
// 轉(zhuǎn) Set
Set<String> names = students.stream().map(Student::getName).collect(Collectors.toSet());
System.out.println("姓名列表:"+names);
// 轉(zhuǎn) TreeSet
students.stream().map(Student::getName).collect(Collectors.toCollection(TreeSet::new));
System.out.println("姓名列表:"+names);
System.out.println("===================== 統(tǒng)計匯總信息 ====================");
IntSummaryStatistics summaryStatistics = students.stream().collect(Collectors.summarizingInt(Student::getId));
System.out.println("年齡匯總信息:"+summaryStatistics);
System.out.println("===================== 分塊(根據(jù)條件分成,true/false 兩塊) ====================");
Map<Boolean, List<Student>> collect = students.stream().collect(Collectors.partitioningBy(s -> s.getName() == "趙六"));
System.out.println(collect);
System.out.println("===================== 分組 ====================");
Map<Integer, List<Student>> group = students.stream().collect(Collectors.groupingBy(Student::getGrade));
System.out.println(group);
System.out.println("===================== 分組 得到每個班級學(xué)生的列表 ====================");
Map<Integer, Long> groupCount = students.stream().collect(Collectors.groupingBy(Student::getGrade, Collectors.counting()));
System.out.println("每個班級學(xué)生的人數(shù):"+groupCount);
}
}
結(jié)果
===================== 得到所有學(xué)生的 id ====================
===================== s -> s.getAge() == Student::getAge ====================
===================== Student::getAge 不會多生成一個類似 lambda$0 這樣的函數(shù)庸追,推薦使用 ====================
id列表:[1, 2, 3, 4, 5]
===================== 得到所有學(xué)生的 姓名 ====================
姓名列表:[李四, 張三, 王五, 趙六]
姓名列表:[李四, 張三, 王五, 趙六]
===================== 統(tǒng)計匯總信息 ====================
年齡匯總信息:IntSummaryStatistics{count=5, sum=15, min=1, average=3.000000, max=5}
===================== 分塊(根據(jù)條件分成霍骄,true/false 兩塊) ====================
{false=[Student{id=1, name='張三', grade=1}, Student{id=2, name='李四', grade=2}, Student{id=3, name='王五', grade=3}, Student{id=5, name='張三', grade=2}], true=[Student{id=4, name='趙六', grade=3}]}
===================== 分組 ====================
{1=[Student{id=1, name='張三', grade=1}], 2=[Student{id=2, name='李四', grade=2}, Student{id=5, name='張三', grade=2}], 3=[Student{id=3, name='王五', grade=3}, Student{id=4, name='趙六', grade=3}]}
===================== 分組 得到每個班級學(xué)生的列表 ====================
每個班級學(xué)生的人數(shù):{1=1, 2=2, 3=2}
JDK9 Reactive Stream
概念
Reactive Stream (響應(yīng)式流/反應(yīng)流) 是JDK9引入的一套標(biāo)準(zhǔn),是一套基于發(fā)布/訂閱模式的數(shù)據(jù)處理規(guī)范锚国。響應(yīng)式流從2013年開始腕巡,作為提供非阻塞背壓的異步流處理標(biāo)準(zhǔn)的倡議玄坦。 它旨在解決處理元素流的問題——如何將元素流從發(fā)布者傳遞到訂閱者血筑,而不需要發(fā)布者阻塞,或訂閱者有無限制的緩沖區(qū)或丟棄煎楣。更確切地說豺总,Reactive流目的是“找到最小的一組接口,方法和協(xié)議择懂,用來描述必要的操作和實體以實現(xiàn)這樣的目標(biāo):以非阻塞背壓方式實現(xiàn)數(shù)據(jù)的異步流”喻喳。
被壓
“背壓(反壓)back pressure”概念很關(guān)鍵。首先異步消費者會向生產(chǎn)者訂閱接收消息困曙,然后當(dāng)有新的信息可用時表伦,消費者會通過之前訂閱時提供的回調(diào)函數(shù)被再次激活調(diào)用。如果生產(chǎn)者發(fā)出的信息比消費者能夠處理消息最大量還要多慷丽,消費者可能會被迫一直在抓消息蹦哼,耗費越來越多的資源,埋下潛在的崩潰風(fēng)險要糊。為了防止這一點纲熏,需要有一種機(jī)制使消費者可以通知生產(chǎn)者,降低消息的生成速度锄俄。生產(chǎn)者可以采用多種策略來實現(xiàn)這一要求局劲,這種機(jī)制稱為背壓。
響應(yīng)式流模型非常簡單——訂閱者向發(fā)布者發(fā)送多個元素的異步請求奶赠,發(fā)布者向訂閱者異步發(fā)送多個或稍少的元素鱼填。響應(yīng)式流會在pull模型和push模型流處理機(jī)制之間動態(tài)切換。 當(dāng)訂閱者較慢時毅戈,它使用pull模型剔氏,當(dāng)訂閱者更快時使用push模型。
簡單來說竹祷,在響應(yīng)式流下訂閱者可以與發(fā)布者溝通谈跛,如果使用JMS就應(yīng)該知道,訂閱者只能被動接收發(fā)布者所產(chǎn)生的消息數(shù)據(jù)塑陵。這就好比沒有水龍頭的水管一樣感憾,我只能被動接收水管里流過來的水,無法關(guān)閉也無法減少。而響應(yīng)式流就相當(dāng)于給水管加了個水龍頭阻桅,在消費者這邊可以控制水流的增加凉倚、減少及關(guān)閉。
Reactive Stream 主要接口
JDK9 通過java.util.concurrent.Flow 和java.util.concurrent.SubmissionPublisher 類來實現(xiàn)響應(yīng)式流嫂沉。在JDK9里Reactive Stream的主要接口聲明在Flow類里稽寒,F(xiàn)low 類中定義了四個嵌套的靜態(tài)接口,用于建立流量控制的組件趟章,發(fā)布者在其中生成一個或多個供訂閱者使用的數(shù)據(jù)項:
- Publisher:數(shù)據(jù)項發(fā)布者杏糙、生產(chǎn)者
- Subscriber:數(shù)據(jù)項訂閱者、消費者
- Subscription:發(fā)布者與訂閱者之間的關(guān)系紐帶蚓土,訂閱令牌
- Processor:數(shù)據(jù)處理器只能被動接收水管里流過來的水宏侍,無法關(guān)閉也無法減少。而響應(yīng)式流就相當(dāng)于給水管加了個水龍頭蜀漆,在消費者這邊可以控制水流的增加谅河、減少及關(guān)閉。
具體案例
public class Jdk9Test {
public static void main(String[] args) throws InterruptedException {
// 1. 定義發(fā)布者确丢,發(fā)布的數(shù)據(jù)類型是 Integer
// 直接使用 jdk 自帶的 SubmissionPublisher,他實現(xiàn)了 Publisher 接口
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<Integer>();
// 2. 定義訂閱者
Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<Integer>(){
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
// 保存訂閱關(guān)系绷耍,需要用它來給發(fā)布者響應(yīng)
this.subscription = subscription;
// 請求一個數(shù)據(jù)
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 接受一個數(shù)據(jù),處理
System.out.println("接受到數(shù)據(jù):"+item);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 處理完調(diào)用 request 在請求一個數(shù)據(jù)
this.subscription.request(1);
// 或者已經(jīng)達(dá)到了目標(biāo)鲜侥,調(diào)用 cancel 告訴發(fā)布者不要在接受數(shù)據(jù)了
//this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出現(xiàn)了異常(例如處理數(shù)據(jù)的時候產(chǎn)生了異常)
throwable.printStackTrace();
// 我們也可以告訴發(fā)布者褂始,后面不需要在接收數(shù)據(jù)了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部數(shù)據(jù)處理完了(發(fā)布者關(guān)閉了)
System.out.println("處理完了!");
}
};
// 3. 發(fā)布者和訂閱者建立訂閱關(guān)系
publisher.subscribe(subscriber);
// 4. 產(chǎn)生數(shù)據(jù)剃毒,并發(fā)布
// 這里忽略數(shù)據(jù)產(chǎn)生的過程
for (int i = 0; i < 2; i++) {
System.out.println("生成數(shù)據(jù):"+i);
// submit 是個 block 方法
publisher.submit(i);
}
// 5. 結(jié)束后 關(guān)閉發(fā)布者
// 正式環(huán)境病袄,應(yīng)該放 finally 或者 try-resouce 確保關(guān)閉
publisher.close();
// 主線程延遲停止,否則數(shù)據(jù)沒有消費就退出了
Thread.currentThread().join(1000);
// debug 的時候赘阀,下面這行需要有斷點
// 否則主線程結(jié)束無法 debug
System.out.println();
}
}
結(jié)果
生成數(shù)據(jù):0
生成數(shù)據(jù):1
接受到數(shù)據(jù):0
接受到數(shù)據(jù):1
處理完了益缠!
/**
* 帶 process 的 flow demo
*/
class MyPocessor extends SubmissionPublisher<String> implements Flow.Processor<Integer,String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
// 保存訂閱關(guān)系,需要用他們來給發(fā)布者效應(yīng)
this.subscription = subscription;
// 請求一個數(shù)據(jù)
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 接受到一個數(shù)據(jù)基公,處理
System.out.println("處理器接收到了數(shù)據(jù):"+item);
// 過濾掉小于 0 的數(shù)據(jù)幅慌,然后發(fā)布出去
if(item > 0){
this.submit("轉(zhuǎn)換后的數(shù)據(jù):"+item);
}
// 處理完調(diào)用 request 在請求一個數(shù)據(jù)
this.subscription.request(1);
// 或者已經(jīng)達(dá)到了目標(biāo),調(diào)用 cancel 告訴發(fā)布者不要在接受數(shù)據(jù)了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出現(xiàn)了異常(例如處理數(shù)據(jù)的時候產(chǎn)生了異常)
throwable.printStackTrace();
// 我們可以告訴發(fā)布者轰豆,后面不在接收數(shù)據(jù)了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部數(shù)據(jù)處理完了(發(fā)布者關(guān)閉了)
System.out.println("處理器處理完了胰伍!");
// 關(guān)不發(fā)布者
this.close();
}
}
public class Jdk9Test2 {
public static void main(String[] args) throws InterruptedException {
// 1. 定義發(fā)布者,發(fā)布數(shù)據(jù)類型是 Integer
// 直接使用 jdk 自帶的 SubmissionPublisher
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<Integer>();
// 2. 定義處理器酸休,對數(shù)據(jù)進(jìn)行過濾骂租,并轉(zhuǎn)換為 String 類型
MyPocessor pocessor = new MyPocessor();
// 3. 發(fā)布者和處理器建立訂閱關(guān)系
publisher.subscribe(pocessor);
// 4. 定于最終訂閱者,消費 String 數(shù)據(jù)類型
Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>(){
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
// 保存訂閱關(guān)系斑司,需要用它來給發(fā)布者發(fā)布響應(yīng)
this.subscription = subscription;
// 請求一個數(shù)據(jù)
this.subscription.request(1);
}
@Override
public void onNext(String item) {
// 接收到一個數(shù)據(jù)渗饮,處理
System.out.println("接收到數(shù)據(jù):"+item);
// 處理完調(diào)用 request 在請求一個數(shù)據(jù)
this.subscription.request(1);
// 或者 已經(jīng)達(dá)到了目標(biāo),調(diào)用 cancel 告訴發(fā)布者不要在接收數(shù)據(jù)了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出現(xiàn)了異常(例如處理數(shù)據(jù)的時候產(chǎn)生了異常)
throwable.printStackTrace();
// 我們可以告訴發(fā)布者,后面不在接收數(shù)據(jù)了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部數(shù)據(jù)處理完(發(fā)布者關(guān)閉了)
System.out.println("處理完了互站!");
}
};
// 5. 處理器和最終訂閱者建立訂閱關(guān)系
pocessor.subscribe(subscriber);
// 6. 產(chǎn)生數(shù)據(jù)私蕾,并發(fā)布
// 這里忽略數(shù)據(jù)產(chǎn)生過程
publisher.submit(999);
publisher.submit(666);
// 7. 結(jié)束后,關(guān)閉發(fā)布者
// 正式環(huán)境應(yīng)該放 finally 或者 try-resouce 確保關(guān)閉
publisher.close();
// 主線程延遲停止胡桃,否則數(shù)據(jù)沒有消費就退出
Thread.currentThread().join(1000);
System.out.println();
}
}
結(jié)果
處理器接收到了數(shù)據(jù):999
處理器接收到了數(shù)據(jù):666
處理器處理完了踩叭!
接收到數(shù)據(jù):轉(zhuǎn)換后的數(shù)據(jù):999
接收到數(shù)據(jù):轉(zhuǎn)換后的數(shù)據(jù):666
處理完了!
Spring WebFlux
概念
它是 spring5 提出的一種新的開發(fā) web 的技術(shù)站翠胰,它是非阻塞的開發(fā)模式容贝,它運行的 netty 或者說 servlet3.1 的容器里面,它可以支持非常多的并發(fā)量亡容,也就是說我們現(xiàn)在開發(fā) web 服務(wù)多了一個選擇嗤疯,可以使用以前的 mvc 開發(fā)模式冤今,也可以使用現(xiàn)在新的 webFlux 開發(fā)模式闺兢。
WebFlux 和 Spring MVC 關(guān)系
- 首先WebFlux 是 non-blocking 非阻塞的開發(fā)模式而傳統(tǒng)的 mvc 是同步的阻塞開發(fā)模式,非阻塞就是我們可以在一個線程里面處理更加多的請求戏罢,而以前老的模式是一個請求對應(yīng)容器里的一個線程屋谭,這是最大的不同點
- 第二個不通點就是運行的環(huán)境不一樣,老的開發(fā)模式是基于 Servlet API 所以它必須運行在 Servlet 容器里面龟糕,而 WebFlux 開發(fā)模式是基于Reactive Streams 響應(yīng)式流桐磁,它可以運行在 Servlet3.1 之后的容器,也就是支持一部 Servlet 的容器或者說運行在 netty 上面讲岁,其實 Spring 5 默認(rèn)的容器就是 netty我擂,
- 第三個不同點,現(xiàn)在新的 WebFlux 都是不支持關(guān)系型數(shù)據(jù)庫的缓艳,我們以前用的 mysql ,oracle等校摩,這些數(shù)據(jù)庫都暫時無法使用
優(yōu)勢
最大的優(yōu)勢就是可以支持非常高的并發(fā)量,我們的應(yīng)用隨著發(fā)展并發(fā)量可能會越來越大阶淘,并發(fā)量高了之后衙吩,以前的應(yīng)用可能就承受不了了,這個時候我們就要進(jìn)行擴(kuò)展溪窒,擴(kuò)展分為兩種坤塞,一種叫水平擴(kuò)展,一種叫垂直擴(kuò)展澈蚌,簡單來說水平擴(kuò)展就是加人摹芙,垂直擴(kuò)展就是加班,家人當(dāng)人的比較容易的宛瞄,好比你以前一個節(jié)點處理不了這個請求浮禾,那么我們就加幾個節(jié)點,在處理不了就在加,一個加上去伐厌,那么垂直擴(kuò)展呢承绸,就是你人還是一個人,對應(yīng)我們的技術(shù)就是線程還是這么多線程但是你要處理更加多的請求挣轨,那么我們就可以采用 WebFlux 這種異步的模式军熏,可以讓他在相同的線程下面支持更加多的一個請求就可以達(dá)到一個垂直擴(kuò)展的目的,所以它是非常重要的一個卷扮,因為你水平擴(kuò)展它對資源的要求會比較高荡澎,所以需要更多的機(jī)器,花更多的錢晤锹,但我們的垂直擴(kuò)展它機(jī)器還是那么多機(jī)器摩幔,所以我們 SE 在設(shè)計框架的時候,當(dāng)你的請求量上去了之后鞭铆,我們第一步就可以把以前的同步 servlet 或衡,就是以前的 mvc 這種模式改成我們現(xiàn)在新的 WebFlux 模式,然后在進(jìn)行水平擴(kuò)展车遂,先垂直擴(kuò)展在水平擴(kuò)展封断,這就更加好的利用我們現(xiàn)有的資源,處理更加高的并發(fā)量舶担。
異步 Servlet
為什么要使用異步 setvlet坡疼? 同步的 servlet 阻塞了什么?
- 同步 servlet
@WebServlet(urlPatterns = "/asynServlet")
public class SyncServelt extends HttpServlet {
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
doPost(req, resp);
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
long startTime = System.currentTimeMillis();
// 執(zhí)行業(yè)務(wù)代碼
doSomeThing(req,resp);
long endTime = System.currentTimeMillis();
System.out.println("總耗時:"+(endTime-startTime)+"毫秒");
}
// 業(yè)務(wù)代碼
private void doSomeThing(HttpServletRequest request,HttpServletResponse response){
try {
// 睡眠 5 秒
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
response.getWriter().append("done");
} catch (IOException e) {
e.printStackTrace();
}
}
}
結(jié)果
總耗時:5007毫秒
可以看到同步的 servlet 從請求開始到請求結(jié)束總共耗時了 5 秒左右衣陶,從這里我們就可知道同步 servlet 到底阻塞了什么柄瑰?其實就是阻塞了 tomcat 容器的 servlet 線程,當(dāng)一個請求到達(dá) tomcat 容器之后剪况,tomcat 容器會給每一個請求開啟一個線程去處理教沾,而線程里面會調(diào)用具體的 servlet 去處理,當(dāng)你使用同步 servlet 的時候拯欧,你的業(yè)務(wù)代碼話費多長時間详囤,servlet 線程就要等待多長時間,這就是阻塞
- 異步 servlet
@WebServlet(asyncSupported = true,urlPatterns = "/asyncServlet")
public class AsyncServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
doPost(req,resp);
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
long startTime = System.currentTimeMillis();
// 開啟異步
AsyncContext asyncContext = req.startAsync();
// 執(zhí)行業(yè)務(wù)代碼
CompletableFuture.runAsync(() -> doSomeThing(asyncContext,asyncContext.getRequest(),asyncContext.getResponse()));
long endTime = System.currentTimeMillis();
System.out.println("總耗時:"+(endTime-startTime)+"毫秒");
}
// 業(yè)務(wù)代碼
private void doSomeThing(AsyncContext asyncContext, ServletRequest request, ServletResponse response){
try {
// 睡眠 5 秒
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
response.getWriter().append("done");
} catch (IOException e) {
e.printStackTrace();
}
// 業(yè)務(wù)代碼處理完畢镐作,通知結(jié)束
asyncContext.complete();
}
}
結(jié)果
總耗時:0毫秒
從這里可以看出藏姐,異步的 servlet 幾乎沒有好使,這就是異步 servlet 的作用该贾,它主要作用就是不會阻塞 tomcat servlet 線程羔杨,它可以把一些耗時的業(yè)務(wù)放在一個獨立的線程池里面,那么我們的 servlet 線程就會立馬返回杨蛋,可以去處理下一個請求兜材,所以它就會使用比較少的線程來達(dá)到一個比較高的吞吐量理澎,這就是異步 servlet 的工作機(jī)制
總結(jié)
同步和異步都是后臺服務(wù)端的概念,服務(wù)器后臺才有異步這個概念曙寡,對于瀏覽器來說所有都是同步糠爬,不管后臺是同步 servlet 還是異步 servlet 它前臺都要花費相同的時間,第二個就是同步 servlet 到底阻塞了什么举庶,它其實是阻塞了 tomcat servlet 的線程执隧,使用異步 servlet 之后 ,servlet 線程就會立馬返回處理下一個請求户侥,所以它就可以達(dá)到高并發(fā)镀琉。
異步 servlet 是怎樣工作的
WebFlux 開發(fā)
reactor = jdk 8 stream + jdk 9 reactive stream
Mono 0 - 1 個元素
Flux 0 - N 個元素
例子
public static void main(String[] args) {
Subscriber<Integer> subscriber = new Subscriber<Integer>(){
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 保存訂閱關(guān)系,需要用它來給發(fā)布者響應(yīng)
this.subscription = subscription;
// 請求一個數(shù)據(jù)
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 接受一個數(shù)據(jù)蕊唐,處理
System.out.println("接受到數(shù)據(jù):"+item);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 處理完調(diào)用 request 在請求一個數(shù)據(jù)
this.subscription.request(1);
// 或者已經(jīng)達(dá)到了目標(biāo)屋摔,調(diào)用 cancel 告訴發(fā)布者不要在接受數(shù)據(jù)了
//this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出現(xiàn)了異常(例如處理數(shù)據(jù)的時候產(chǎn)生了異常)
throwable.printStackTrace();
// 我們也可以告訴發(fā)布者,后面不需要在接收數(shù)據(jù)了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部數(shù)據(jù)處理完了(發(fā)布者關(guān)閉了)
System.out.println("處理完了替梨!");
}
};
// reactor = jdk8 stream + jdk9 reactive stream
// Mono 0-1 個元素
// Flux 0-N 個元素
String[] strs = {"1","2","3"};
// 這里就是 jdk 8 的 Stream
Flux.fromArray(strs).map(s -> Integer.parseInt(s))
// 最終操作
// 這里就是 jdk9 的 reactive stream
.subscribe(subscriber);
}
reactor 是一個流钓试,也是一個發(fā)布者,而它的最終操作就是訂閱
下面這兩個請求究竟有什么區(qū)別呢耙替?
@RestController
@Slf4j
public class TestController {
@GetMapping("/1")
public String get1(){
log.info("get1 開始");
String str = createStr();
log.info("get1 結(jié)束");
return str;
}
@GetMapping("/2")
public Mono<String> get2(){
log.info("get2 開始");
Mono<String> str = Mono.fromSupplier(() -> createStr());
log.info("get2 結(jié)束");
return str;
}
private String createStr(){
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "create STR";
}
}
get1 請求
可以開到總共花的大概 3 秒鐘時間
get2 請求
可以看到 新的 WebFlux 基本上是沒有耗時的亚侠,從這里就能說明新的 WebFlux 返回的 Mono 實際上是一個流曹体,由于它沒有調(diào)用最終操作所以不會阻塞線程俗扇,而老的模式調(diào)用的操作花了多久,它在 Controller 里占了多久箕别,而新的模式它其實是一個惰性求值所以它這個 Controller 不會占用那么久铜幽,這就是新的 WebFlux 返回 Mono 的意義。
SSE ( Server-Sent Events)
我們知道 Flux 可以返回多次數(shù)據(jù)串稀,但是 http 協(xié)議是一問一答的形式除抛,它是做到如何多次返回的呢?實際上它用的就是 Html5 SSE
示例
@WebServlet("/sse")
public class SSE extends HttpServlet {
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
doPost(req,resp);
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
// 必須
resp.setContentType("text/event-stream");
resp.setCharacterEncoding("UTF-8");
for (int i = 0; i < 5; i++) {
// 格式 : data: + 數(shù)據(jù) + 2個回車
resp.getWriter().write("data:"+i+"\n\n");
resp.getWriter().flush();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
結(jié)果
可以看到結(jié)果逐條返回
前端示例
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<script type="text/javascript">
// 初始化母截,參數(shù)為 url
// 依賴 H5
var sse = new EventSource("sse");
sse.onmessage = function(e){
console.log("message",e.data,e);
}
</script>
<body>
<H1>SSE</H1>
</body>
</html>
結(jié)果
SSE 有個特點會自動的重連到忽,所以會不停的輸出
- 另一種寫法
后端
// 另一種寫法 指定 標(biāo)識世間
resp.getWriter().write("event:me\n");
前段
sse.addEventListener("me", function (e) {
console.log("me event",e.data,e)
});
結(jié)果
結(jié)果也是一樣,也會從新連接清寇,那么如何關(guān)閉呢喘漏?
就可以加一個判斷
sse.addEventListener("me", function (e) {
console.log("me event",e.data,e)
if(e.data == 3){
sse.close();
}
});
WebFlux 示例(使用 mongodb 數(shù)據(jù)庫)
- 添加 mongodb 數(shù)據(jù)庫依賴
<!-- mongodb -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
- springboot 啟動類添加 mongodb 支持
@EnableReactiveMongoRepositories // 開啟mongodb
- 定義對象
@Document(collection = "user")
@Data
public class User {
@Id
private String id;
private String name;
private int age;
}
- 創(chuàng)建倉庫
@Repository
public interface UserRepository extends ReactiveMongoRepository<User,String> {
}
- 編寫 controller
@RestController
@RequestMapping("/user")
public class UserController {
// 以前 mvn 注入在 WebFlux 中不推薦,官方推薦使用構(gòu)造方法注入
/*@Autowired
private UserRepository userRepository;*/
private final UserRepository repository;
// 官方推薦這種注入方式华烟,這種和 spring 的耦合度會更加的底
public UserController(UserRepository repository){
this.repository = repository;
}
@GetMapping("/")
public Flux<User> getAll(){
return repository.findAll();
}
/**
* 這里推薦在返回 Flux ,就是返回多個數(shù)據(jù)的時候翩迈,我們都寫兩個方法
* 第一個就是一次性返回的
* 第二種就是 SSE 向流一樣的
* @return
*/
@GetMapping(value = "/stream/all",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamGetAll(){
return repository.findAll();
}
}
- 啟動 mongodb
-
調(diào)試
image.png
可以看到是通的
完整的 CRUD
@RestController
@RequestMapping("/user")
public class UserController {
// 以前 mvn 注入在 WebFlux 中不推薦,官方推薦使用構(gòu)造方法注入
/*@Autowired
private UserRepository userRepository;*/
private final UserRepository repository;
// 官方推薦這種注入方式盔夜,這種和 spring 的耦合度會更加的底
public UserController(UserRepository repository){
this.repository = repository;
}
/**
* 以數(shù)組形式 一次性返回數(shù)據(jù)
* @return
*/
@GetMapping("/")
public Flux<User> getAll(){
return repository.findAll();
}
/**
* 這里推薦在返回 Flux ,就是返回多個數(shù)據(jù)的時候负饲,我們都寫兩個方法
* 第一個就是一次性返回的
* 第二種就是 SSE 向流一樣的
* @return
*/
@GetMapping(value = "/stream/all",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamGetAll(){
return repository.findAll();
}
/**
*
* 添加
* @param user
* @return
*/
@PostMapping("/")
public Mono<User> createUser(@RequestBody User user){
// 這里要注意 在 spring data jpa 里面
// 新增和修改都是 save 方法堤魁,有 id 是修改,沒有 id 是新增
user.setId(null);
return this.repository.save(user);
}
/**
* 刪除
* @param id
* @return
*/
@DeleteMapping("/{id}")
public Mono<ResponseEntity<Void>> deleteUser(@PathVariable("id")String id){
// deleteById 方法是沒有返回值的返十,不能判斷數(shù)據(jù)是否存在
// this.repository.deleteById(id)
return this.repository.findById(id)
// 當(dāng)你要操作數(shù)據(jù)妥泉,并且要返回一個 Mono 這個時候使用 flatMap
// 如果不操作只是裝換數(shù)據(jù),使用 map
.flatMap(user -> this.repository.delete(user)
.then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK))))
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
/**
* 修改數(shù)據(jù)洞坑、
* 存在的時候返回 200 和修改的數(shù)據(jù)涛漂,不存在的時候返回 404
* @param id
* @param user
* @return
*/
@PutMapping("/{id}")
public Mono<ResponseEntity<User>> updateUser(@PathVariable("id")String id,
@RequestBody User user){
// 先根據(jù)用戶 id 查詢是否存在
return this.repository.findById(id)
// flatMap 操作數(shù)據(jù)
.flatMap(u -> {
u.setAge(user.getAge());
u.setName(user.getName());
return this.repository.save(u);
})
// map : 轉(zhuǎn)換數(shù)據(jù)
.map(u -> new ResponseEntity<User>(u,HttpStatus.OK))
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
/**
* 根據(jù) id 查找用戶
* 存在返回用戶信息,不存在返回 404
* @param id
* @return
*/
@GetMapping("/{id}")
public Mono<ResponseEntity<User>> findUserById(@PathVariable("id")String id){
return this.repository.findById(id)
// 存在返回用戶信息
.map(u -> new ResponseEntity<User>(u,HttpStatus.OK))
// 不存在返回 404
.defaultIfEmpty(new ResponseEntity<User>(HttpStatus.NOT_FOUND));
}
}
根據(jù)年齡區(qū)間查詢用戶信息
數(shù)據(jù)訪問層 Repository
/**
* 根據(jù)年齡查找用戶信息
* @param start
* @param end
* @return
*/
Flux<User> findByAgeBetween(int start,int end);
@Query("{'age':{'$gte':20,'$lte':30}}")
Flux<User> findUserBySQL();
controller 層
/**
* 根據(jù)年齡查找用戶(SSE 以流的形式返回)
* @param start
* @param end
* @return
*/
@GetMapping(value="/age/stream/{start}/{end}",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamFindUserByAge(@PathVariable("start")int start,
@PathVariable("end")int end){
return this.repository.findByAgeBetween(start,end);
}
/**
* 手寫 sql
* @return
*/
@GetMapping(value="/age/sql")
public Flux<User> findUserBySQL(){
return this.repository.findUserBySQL();
}
參數(shù)校驗
User 類加上检诗,校驗注解
@Document(collection = "user")
@Data
public class User {
@Id
private String id;
@NotBlank
private String name;
// 年齡最小為 10 卵酪,最大為 100
@Range(min = 10,max = 100)
private int age;
}
為添加方法加上校驗注解
/**
*
* 添加
* @param user
* @return
*/
@PostMapping("/")
public Mono<User> createUser(@RequestBody @Validated User user){
// 這里要注意 在 spring data jpa 里面
// 新增和修改都是 save 方法丈屹,有 id 是修改,沒有 id 是新增
user.setId(null);
return this.repository.save(user);
}
編寫異常處理類
/**
* 異常處理切面
*/
@ControllerAdvice
public class CheckAdvice {
@ExceptionHandler(MethodArgumentNotValidException.class)
public ResponseEntity<String> handleBindingException(MethodArgumentNotValidException e){
return new ResponseEntity<String>(toStr(e), HttpStatus.BAD_REQUEST);
}
/**
* 將異常轉(zhuǎn)換為 字符串
* @param ex
* @return
*/
private String toStr(MethodArgumentNotValidException ex) {
return ex.getBindingResult().getFieldErrors().stream()
// 將異常轉(zhuǎn)換為字符串調(diào)用 map
.map(e -> e.getField()+":"+e.getDefaultMessage())
// 將數(shù)組轉(zhuǎn)換為字符串調(diào)用 reduce
.reduce("",(s1,s2) -> s1+"\n"+s2);
}
}
測試
{
"name":"",
"age":9
}
name:不能為空
age:需要在10和100之間
自定義異常(校驗用戶名是否合法)
編寫校驗邏輯類
public class CheckUtil {
// 定義不允許的常量
private static final String[] INVALID_NAMES = {"admin","administrator"};
/**
* 校驗用戶名是否合法,不合法拋出異常
* @param value
*/
public static void checkName(String value) {
Stream.of(INVALID_NAMES).filter(name -> name.equalsIgnoreCase(value))
// findAny 找到任何一個芬失, ifPresent 如果存在就拋出異常
.findAny().ifPresent(v -> {
throw new CheckException("name", v);
});
}
}
為添加用戶方法添加校驗邏輯
@PostMapping("/")
public Mono<User> createUser(@RequestBody @Validated User user){
// 這里要注意 在 spring data jpa 里面
// 新增和修改都是 save 方法,有 id 是修改揖赴,沒有 id 是新增
user.setId(null);
// 檢查用戶名是否合法
CheckUtil.checkName(user.getName());
return this.repository.save(user);
}
自定義異常類
@Data
public class CheckException extends RuntimeException {
private String fieldName; // 字段名
private String fieldValue; // 錯誤內(nèi)容
public CheckException(String fieldName, String fieldValue) {
super();
this.fieldName = fieldName;
this.fieldValue = fieldValue;
}
public CheckException() {
super();
}
public CheckException(String message) {
super(message);
}
public CheckException(String message, Throwable cause) {
super(message, cause);
}
public CheckException(Throwable cause) {
super(cause);
}
protected CheckException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
異常處理類中捕獲自定義異常
/**
* 用戶名校驗自定義異常
* @param e
* @return
*/
@ExceptionHandler(CheckException.class)
public ResponseEntity<String> handleBindingCheckNameException(CheckException e){
return new ResponseEntity<String>(toStr(e),HttpStatus.BAD_REQUEST);
}
private String toStr(CheckException ex) {
return ex.getFieldName()+":"+"錯誤的值"+ ex.getFieldValue();
}
測試
{
"name":"admin",
"age":25
}
name:錯誤的值admin
使用 Router Functions 開發(fā)
WebFlux 可以運行在以前老的 Servlet 容器诡延,也可以運行在 Servlet 3.1 之后的容器,或者運行在 Netty 上面忙菠,那么他第一步要把這連個容器的一些共同點抽離出來
- ServletRequest 對應(yīng) HttpServletRequet
- ServletResponse 對應(yīng) HttpServletResponse
使用 Router Functions 開發(fā)一般需要以下幾步
- 開發(fā) HandlerFunction ,這個 HandlerFunction 是輸入 ServletRequest 返回 ServletResponse
- 接著要開發(fā) Router Funciton 它是把我們的請求 url 和 HandlerFunction 對應(yīng)起來
- 接著我們會把 RouterFunction 包裝成 HttpHandler
- 最后交給 Server 處理何鸡,這里的 Server 指的就是 Servlet3.1 之后的容器,或者 Netty
案例
- 實體類
@Document(collection = "user")
@Data
public class User {
@Id
private String id;
@NotBlank
private String name;
// 年齡最小為 10 牛欢,最大為 100
@Range(min = 10,max = 100)
private int age;
}
- 數(shù)據(jù)訪問層
@Repository
public interface UserRepository extends ReactiveMongoRepository<User,String> {
/**
* 根據(jù)年齡查找用戶信息
* @param start
* @param end
* @return
*/
Flux<User> findByAgeBetween(int start, int end);
@Query("{'age':{'$gte':20,'$lte':30}}")
Flux<User> findUserBySQL();
}
- HandlerFunction
@Component
public class UserHandler {
private final UserRepository repository;
public UserHandler(UserRepository repository){
this.repository = repository;
}
/**
* 得到所有用戶
* @param request
* @return
*/
public Mono<ServerResponse> getAllUser(ServerRequest request){
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
.body(this.repository.findAll(), User.class);
}
/**
* 創(chuàng)建用戶
* @param request
* @return
*/
public Mono<ServerResponse> createUser(ServerRequest request){
// 獲取用戶提交的數(shù)據(jù)
Mono<User> userMono = request.bodyToMono(User.class);
return userMono.flatMap(u -> {
// 校驗用戶信息
CheckUtil.checkName(u.getName());
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
.body(this.repository.save(u), User.class);
});
}
public Mono<ServerResponse> deleteByUserId(ServerRequest request){
// 獲取 rest 參數(shù) id
// 相當(dāng)于獲取 url 中 @PathVariable("id)
String id = request.pathVariable("id");
// 查詢是否存在
return this.repository.findById(id)
// 如果存在刪除
.flatMap(user -> this.repository.delete(user)
// 返回
.then(ServerResponse.ok().build()))
// 不存在
.switchIfEmpty(ServerResponse.notFound().build());
}
}
- router Function 把 url 和 Handler Function 對應(yīng)起來
@Configuration
public class AllRouters {
/***
* spring-boot-starter-web 不能和 spring-boot-starter-webflux 同時引入否則轉(zhuǎn)發(fā)無效
* @param userHandler
* @return
*/
@Bean
RouterFunction<ServerResponse> userRouter(UserHandler userHandler){
return RouterFunctions.nest(
// 相當(dāng)于類上面的 @RequestMapping()
RequestPredicates.path("/router/user"),
// 得到所有用戶
RouterFunctions.route(RequestPredicates.GET("/"), userHandler::getAllUser)
// 創(chuàng)建用戶
.andRoute(
RequestPredicates.POST("/").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
userHandler::createUser)
// 刪除用戶
.andRoute(RequestPredicates.DELETE("/{id}"), userHandler::deleteByUserId)
);
}
}
自定義異常校驗
- 用戶名校驗類
public class CheckUtil {
// 定義不允許的常量
private static final String[] INVALID_NAMES = {"admin","administrator"};
/**
* 校驗用戶名是否合法骡男,不合法拋出異常
* @param value
*/
public static void checkName(String value) {
Stream.of(INVALID_NAMES).filter(name -> name.equalsIgnoreCase(value))
// findAny 找到任何一個, ifPresent 如果存在就拋出異常
.findAny().ifPresent(v -> {
throw new CheckException("name", v);
});
}
}
- 自定義異常類
@Data
public class CheckException extends RuntimeException {
private String fieldName; // 字段名
private String fieldValue; // 錯誤內(nèi)容
public CheckException(String fieldName, String fieldValue) {
super();
this.fieldName = fieldName;
this.fieldValue = fieldValue;
}
public CheckException() {
super();
}
public CheckException(String message) {
super(message);
}
public CheckException(String message, Throwable cause) {
super(message, cause);
}
public CheckException(Throwable cause) {
super(cause);
}
protected CheckException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
- 統(tǒng)一異常處理
/**
* 異常處理切面
*/
@Component
// 由于默認(rèn)里有多個異常處理傍睹,所以我們要把我們的異常處理器優(yōu)先級別調(diào)高
// 否則不會工作
@Order(-2)
public class CheckAdvice implements WebExceptionHandler {
@Override
public Mono<Void> handle(ServerWebExchange serverWebExchange, Throwable throwable) {
ServerHttpResponse response = serverWebExchange.getResponse();
// 設(shè)置響應(yīng)頭 400
response.setStatusCode(HttpStatus.BAD_REQUEST);
// 設(shè)置返回異常
response.getHeaders().setContentType(MediaType.TEXT_PLAIN);
// 異常信息
String errorMsg = toStr(throwable);
DataBuffer db = response.bufferFactory().wrap(errorMsg.getBytes());
return response.writeWith(Mono.just(db));
}
private String toStr(Throwable ex) {
// 已知異常
if(ex instanceof CheckException){
CheckException e = (CheckException) ex;
return e.getFieldName()+": invalid value "+e.getFieldValue();
}else{
// 未知異常
ex.printStackTrace();
return ex.toString();
}
}
}
WebFlux 客戶端聲明式 RestClient 框架開發(fā)
添加依賴
<!-- WebFlux -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
創(chuàng)建框架基礎(chǔ)類
創(chuàng)建注解隔盛,保存服務(wù)端地址信息
/**
* 服務(wù)器相關(guān)的信息
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ApiServer {
// 保存服務(wù)端請求接口
String value() default "";
}
創(chuàng)建方法調(diào)用信息類
/**
* 方法調(diào)用信息類
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MethodInfo {
/**
* 請求url
*/
private String url;
/**
* 請求方法
*/
private HttpMethod method;
/**
* 請求參數(shù) (url 上)
*/
private Map<String,Object> params;
/**
* 請求 body
*/
private Mono<?> body;
private Class<?> bodyElementType;
/**
* 返回是 flux 還是 mono
*/
private boolean returnFlux;
/**
* 返回對象的類型
*/
private Class<?> returnElementType;
}
創(chuàng)建服務(wù)器信息類
/**
* 服務(wù)器信息類
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ServerInfo {
/**
* 服務(wù)器 url
*/
private String url;
}
創(chuàng)建動態(tài)代理類接口
/**
* 創(chuàng)建代理類接口
*/
public interface ProxyCreator {
/**
* 創(chuàng)建代理類
* @param type
* @return
*/
Object createProxy(Class<?> type);
}
創(chuàng)建請求類接口 Handler
/**
* 請求調(diào)用 handler
*/
public interface RestHandler {
/**
* 初始化服務(wù)器信息
* @param serverInfo
*/
void init(ServerInfo serverInfo);
/**
* 調(diào)用 rest 請求,返回接口
* @param methodInfo
*/
Object invokeRest(MethodInfo methodInfo);
}
創(chuàng)建 jdk 動態(tài)代理類實現(xiàn)動態(tài)代理接口類
/**
* 使用 jdk 動態(tài)代理實現(xiàn)代理類
*/
@Slf4j
public class JDKProxyCreator implements ProxyCreator {
/**
* 創(chuàng)建代理類
*
* @param type
* @return
*/
@Override
public Object createProxy(Class<?> type) {
log.info("createProxy:"+type);
// 根據(jù)接口獲取得到 api 服務(wù)器信息
ServerInfo serverInfo = extractServerInfo(type);
log.info("serverInfo:"+serverInfo);
// 給每一個代理類一個實現(xiàn)
RestHandler restHandler = new WebClientRestHandler();
// 初始化服務(wù)器信息(初始化 webclient)
restHandler.init(serverInfo);
return Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{type}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 根據(jù)方法和參數(shù)得到調(diào)用信息
MethodInfo methodInfo = extractMethodInfo(method,args);
log.info("methodInfo:"+methodInfo);
// 調(diào)用 rest
return restHandler.invokeRest(methodInfo);
}
/**
* 根據(jù)方法定義和調(diào)用參數(shù)得到調(diào)用的相關(guān)信息
* @param method
* @param args
* @return
*/
private MethodInfo extractMethodInfo(Method method, Object[] args) {
MethodInfo methodInfo = new MethodInfo();
// 得到請求 url 和 方法
extractUrlAndMethod(method,methodInfo);
// 得到請求的 param 和 body
extractRequestParamAndBody(method,args,methodInfo);
// 提取返回對象信息
extractReturnInfo(method,methodInfo);
return methodInfo;
}
/**
* 提取返回對象信息
* @param method
* @param methodInfo
*/
private void extractReturnInfo(Method method, MethodInfo methodInfo) {
// 返回是 Flux 還是 Mono
// isAssignableFrom 判斷類型是否是某個類的子類
// instanceof 判斷實例是否是某個類的子類
boolean isFlux = method.getReturnType().isAssignableFrom(Flux.class);
methodInfo.setReturnFlux(isFlux);
// 得到返回對象的實例類型
Class<?> elementType = extractElementType(method.getGenericReturnType());
methodInfo.setReturnElementType(elementType);
}
/**
* 得到泛型類型的實際類型
* @param genericReturnType
* @return
*/
private Class<?> extractElementType(Type genericReturnType) {
Type[] actualTypeArguments = ((ParameterizedType) genericReturnType).getActualTypeArguments();
return (Class<?>) actualTypeArguments[0];
}
/**
* 得到請求的 param 和 body
* @param method
* @param args
* @param methodInfo
*/
private void extractRequestParamAndBody(Method method, Object[] args, MethodInfo methodInfo) {
// 參數(shù)和值對應(yīng)的 map
Map<String,Object> params = new LinkedHashMap<String, Object>();
methodInfo.setParams(params);
// 得到調(diào)用的參數(shù)或者 body
Parameter[] parameters = method.getParameters();
for (int i = 0; i < parameters.length; i++) {
// 參數(shù)是否帶 @PathVariabole 注解
PathVariable able = parameters[i].getAnnotation(PathVariable.class);
if(able != null){
params.put(able.value(), args[i]);
}
// 是否帶了 requestBody
RequestBody requestBody = parameters[i].getAnnotation(RequestBody.class);
if(requestBody != null){
methodInfo.setBody((Mono<?>)args[i]);
// 得到對象的實際類型
methodInfo.setBodyElementType(extractElementType(parameters[i].getParameterizedType()));
}
}
}
/**
* 得到請求的 url 和方法
* @param method
* @param methodInfo
*/
private void extractUrlAndMethod(Method method,MethodInfo methodInfo){
// 得到請求 url 和請求方法
Annotation[] annotations = method.getAnnotations();
for (Annotation annotation : annotations){
// GET
if(annotation instanceof GetMapping){
GetMapping getMapping = (GetMapping) annotation;
methodInfo.setUrl(getMapping.value()[0]);
methodInfo.setMethod(HttpMethod.GET);
}else if(annotation instanceof PostMapping){
// POST
PostMapping postMapping = (PostMapping) annotation;
methodInfo.setUrl(postMapping.value()[0]);
methodInfo.setMethod(HttpMethod.POST);
}else if(annotation instanceof DeleteMapping){
// DELETE
DeleteMapping deleteMapping = (DeleteMapping) annotation;
methodInfo.setUrl(deleteMapping.value()[0]);
methodInfo.setMethod(HttpMethod.DELETE);
}
}
}
});
}
/**
* 提取服務(wù)器信息
* @param type
* @return
*/
private ServerInfo extractServerInfo(Class<?> type) {
ServerInfo serverInfo = new ServerInfo();
ApiServer apiServer = type.getAnnotation(ApiServer.class);
// 設(shè)置服務(wù)器 url
serverInfo.setUrl(apiServer.value());
return serverInfo;
}
}
創(chuàng)建用戶類
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class User {
private String id;
private String name;
private int age;
}
創(chuàng)建服務(wù)端請求接口類(地址是上面 WebFlux 完整 CRUD 項目)
/**
* 服務(wù)端請求接口
*/
@ApiServer("http://localhost:8080/user")
public interface IUserApi {
@GetMapping("/")
Flux<User> getAllUser();
@GetMapping("/{id}")
Mono<User> getUserById(@PathVariable("id")String id);
@DeleteMapping("/{id}")
Mono<Void> deleteUserById(@PathVariable("id")String id);
@PostMapping("/")
Mono<User> createUser(@RequestBody Mono<User> user);
}
創(chuàng)建請求調(diào)用接口類實現(xiàn)類拾稳,這里使用 WebClient
/**
* WebClientRestHandler
*/
public class WebClientRestHandler implements RestHandler {
private WebClient webClient;
/**
* 初始化 webClient
* @param serverInfo
*/
@Override
public void init(ServerInfo serverInfo) {
this.webClient = WebClient.create(serverInfo.getUrl());
}
/**
* 處理 rest 請求
* @param methodInfo
*/
@Override
public Object invokeRest(MethodInfo methodInfo) {
// 返回結(jié)果
Object result = null;
WebClient.RequestBodySpec request = this.webClient
// 請求方法類型
.method(methodInfo.getMethod())
// 請求 url
// 不帶參數(shù)
//.uri(methodInfo.getUrl())
// 帶參數(shù)
.uri(methodInfo.getUrl(), methodInfo.getParams())
// 請求類型
.accept(MediaType.APPLICATION_JSON);
WebClient.ResponseSpec retrieve = null;
// 發(fā)出請求
// 判斷是否帶了 body
if(methodInfo.getBody() != null){
retrieve = request.body(methodInfo.getBody(), methodInfo.getBodyElementType()).retrieve();
}else{
retrieve = request.retrieve();
}
// 異常處理
retrieve.onStatus(status -> status.value() == 404, response -> Mono.just(new RuntimeException("not found")));
// 處理 body
if(methodInfo.isReturnFlux()){
result = retrieve.bodyToFlux(methodInfo.getReturnElementType());
}else{
result = retrieve.bodyToMono(methodInfo.getReturnElementType());
}
return result;
}
}
創(chuàng)建配置類完成代理類初始化
@Configuration
public class ProxyConfiguration {
/**
* 創(chuàng)建jdk 工具類
* @return
*/
@Bean
ProxyCreator jdkProxyCreator(){
return new JDKProxyCreator();
}
@Bean
FactoryBean<IUserApi> userApi(ProxyCreator proxyCreator){
return new FactoryBean<IUserApi>() {
// 返回代理對象
@Override
public IUserApi getObject() throws Exception {
return (IUserApi) proxyCreator.createProxy(this.getObjectType());
}
@Override
public Class<?> getObjectType() {
return IUserApi.class;
}
};
}
}
創(chuàng)建測試類 吮炕,測試 CRUD
@RestController
public class TestController {
@Autowired
private IUserApi userApi;
@GetMapping("/")
public void test(){
// 獲取所有用戶信息
//userApi.getAllUser().subscribe(System.out::println);
// 根據(jù) id 查詢用戶信息
String id = "5f2a8077432a9634dba7b65c";
//userApi.getUserById(id).subscribe(System.out::println);
// 創(chuàng)建用戶信息
//userApi.createUser(Mono.just(User.builder().name("張四豐").age(18).build()))
//.subscribe(System.out::println);
// 修改用戶
userApi.updateUser(id, Mono.just(User.builder().name("張五峰").age(19).build())).subscribe(System.out::println);
userApi.getAllUser().subscribe(System.out::println);
}
}