Java WebFlux 響應(yīng)式編程 Springboot 2.0

函數(shù)式編程的概念

簡單說,"函數(shù)式編程"是一種"編程范式"(programming paradigm)力试,也就是如何編寫程序的方法論。
它屬于"結(jié)構(gòu)化編程"的一種,主要思想是把運算過程盡量寫成一系列嵌套的函數(shù)調(diào)用

為什么要使用函數(shù)式編程糯笙,和命令是編程有什么不同點和優(yōu)點呢?

最重要的就是他們的關(guān)注點不一樣撩银,命令式編程里面我們關(guān)注的是怎么樣做给涕,而函數(shù)式編程里我們關(guān)注的是做什么,也就是說在命令式編程里你要告訴程序要怎么樣做才能實現(xiàn)一個功能额获,而函數(shù)式編程里面你不需要告訴它怎么樣做够庙,你只告訴它你要實現(xiàn)什么樣的功能,而不需要關(guān)注實現(xiàn)的細(xì)節(jié)抄邀,這樣說還是有些抽象耘眨,我么們來以一個具體里例子來說明一下。

  • 假設(shè)我們現(xiàn)在有個需求境肾,需要在一堆數(shù)字里面找到最小的一個剔难,你會怎么做呢胆屿?
命令式編程
public class LombokTest {
    public static void main(String[] args) {
        int[] nums = {5,52,99,68,3,-12};

        int min = Integer.MAX_VALUE;
        for(int i : nums){
            if(i < min){
                min = i;
            }
        }
        System.out.println("最小值="+min);
    }
}

結(jié)果

最小值=-12

這種解決思路就是命令式編程,我們要定義變量偶宫,我們要用 for 循環(huán),要把每個數(shù)字跟最小值對比一下非迹,如果符合還要把最小值在更新一下,這就是命令式編程纯趋,我們要告訴程序所有實現(xiàn)的細(xì)節(jié)彻秆。

函數(shù)式編程
        // jdk 1.8
        int asInt = IntStream.of(nums).min().getAsInt();
        System.out.println("jdk1.8 函數(shù)式編程="+asInt);

結(jié)果

jdk1.8 函數(shù)式編程=-12

我們可以看到結(jié)果是一樣
在比如這一堆數(shù)字比較大,可能有幾億條结闸,在命令式編程里面唇兑,使用一個 for 循環(huán)性能可能不達(dá)標(biāo),我們可能要使用多線程桦锄,要定義線程池扎附,要把數(shù)據(jù)拆分,最后得到一個最小值结耀,這要做還是比較麻煩的我們所有的工作都需要自己去做留夜,而在函數(shù)式編程里面在這種并行的環(huán)境下你不需要做這么多細(xì)節(jié)谋减,只需要調(diào)用一個函數(shù)丽声,就可以得到想要的結(jié)果

        // 并行
        int pInt = IntStream.of(nums).parallel().min().getAsInt();
        System.out.println("parallel result = "+pInt);

結(jié)果

parallel result = -12

我們看到結(jié)果還是一樣的 ,這樣寫是不是爽多了骆膝?

函數(shù)式編程還有一個比較明顯的有點黑毅,使用 lombok 表達(dá)式可以使代碼更加的簡短嚼摩,好讀。

  • 比如線程的創(chuàng)建
    jdk1.8 之前
        new Thread(new Runnable(){
            @Override
            public void run() {
                System.out.println("jdk 1.8 之前創(chuàng)建線程");
            }
        }).start();

jdk1.8 之后

new Thread(() -> {
            System.out.println("jdk 1.8 之后創(chuàng)建線程");
        }).start();
函數(shù)式帶輸入輸出參數(shù)的寫法

所謂的函數(shù)式接口矿瘦,是指只有一個抽象方法(這里注意不是只有一個方法枕面,1.8以及后 interface中可以有默認(rèn)實現(xiàn)的方法關(guān)鍵字是default,如果他有兩個及以上抽象方法,就不是函數(shù)式接口)的接口缚去,jdk8幫我們提供了一個注解潮秘,幫助我們檢查編譯時是否合格 @FunctionInterface,允許有端個 default 修飾的非抽象方法,default關(guān)鍵字在API中的解釋為易结,意思是默認(rèn)方法能夠向庫的接口添加新功能枕荞,并確保與為這些接口的舊版本編寫的代碼兼容。并且不用在其子類進(jìn)行逐個實現(xiàn)搞动。

@FunctionalInterface
interface NumsFunciton{
    int numSums(int i);

    // 默認(rèn)實現(xiàn)的方法  不需要在子類中實現(xiàn)
    default int bool(int b){
        return b * 5;
    }
    default int boola(int b){
        return b * 8;
    }
}
public class LombokTest2 {
    public static void main(String[] args) {
        // 一
        NumsFunciton n = (i) -> i * 5;
        // 二  由于只有一個參數(shù)  括號可以省略  這種最常見
        NumsFunciton n1 = i -> i * 6;
        // 三
        NumsFunciton n2 = (int i) -> i * 7;
        // 四
        NumsFunciton n4 = (int i) -> {
            return i * 8;
        };
        System.out.println(n.bool(9));
    }
}

注意函數(shù)式接口多層繼承 default 默認(rèn)方法覆蓋問題
@FunctionalInterface
interface NumsFunciton{
    int numSums(int i);

    // 默認(rèn)實現(xiàn)的方法  不需要在子類中實現(xiàn)
    default int add(){
        return 18;
    }
}
@FunctionalInterface
interface NumsFunciton2{
    int numSums(int i);

    // 默認(rèn)實現(xiàn)的方法  不需要在子類中實現(xiàn)
    default int add(){
        return 19;
    }
}
@FunctionalInterface
interface NumsFunciton3 extends NumsFunciton,NumsFunciton2{

    @Override
    default int add() {
        return NumsFunciton.super.add();
    }
}

這里可以指定調(diào)用哪個函數(shù)接口的默認(rèn)方法

java 8 提供的函數(shù)式接口

interface IMoneyFormat{
    // 格式化
    String format(int i);
}

class MyMoney{
    private final int money;
    public MyMoney(int money){
        this.money = money;
    }
    // 打印
    public void printMoney(IMoneyFormat moneyFormat){
        System.out.println("我的小目標(biāo):"+moneyFormat.format(money));
    }
}
public class LombokTest3 {
    public static void main(String[] args) {
        // 初始化
        MyMoney myMoney = new MyMoney(999999999);
        // 使用函數(shù)式
        myMoney.printMoney(i -> new DecimalFormat("#,###").format(i));
    }
}

Loamb 表達(dá)式其實不關(guān)心實現(xiàn)的那個接口躏精,所以他不需要知道接口的名字,也不需要知道它的方法滋尉,它是需需要知道輸入什么玉控,輸出是什么飞主,這一輸入的是 int 類型狮惜,輸出的是 String 類型高诺,所有我們不需要定義接口,只需要輸入是什么輸出是什么

java 8 提供的函數(shù)式接口優(yōu)化
class MyMoney{
    private final int money;
    public MyMoney(int money){
        this.money = money;
    }
    // 使用 jdk 1.8 函數(shù)式接口 Function
    public void printMoney(Function<Integer,String> moneyFormat){
        System.out.println("我的小目標(biāo):"+moneyFormat.apply(this.money));
    }
}

public class LombokTest3 {
    public static void main(String[] args) {
        // 初始化
        MyMoney myMoney = new MyMoney(999999999);
        // 使用函數(shù)式
        myMoney.printMoney(i -> new DecimalFormat("#,###").format(i));
    }
}

其結(jié)果是一樣的碾篡,除了不需要定義接口外虱而,還可以支持連式編程

        // 初始化
        MyMoney myMoney = new MyMoney(999999999);
        // 使用函數(shù)式
       // myMoney.printMoney(i -> new DecimalFormat("#,###").format(i));

        Function<Integer,String> moneyFormat = i -> new DecimalFormat("#,###").format(i);
        // 函數(shù)式接口的鏈?zhǔn)骄幊?        myMoney.printMoney(moneyFormat.andThen(s -> "人民幣:"+s));

結(jié)果

我的小目標(biāo):人民幣:999,999,999

常見的函數(shù)式接口

ZAG(JGQ@[ZUG1J$T@YQR]M3.png
函數(shù)式接口簡單示例
        System.out.println("===================  斷言函數(shù)接口  ===================");
        Predicate<Integer> predicate = i -> i > 10;
        System.out.println(predicate.test(9));

        System.out.println("===================  消費函數(shù)接口  沒有返回值  ===================");
        Consumer<String> consumer = s -> System.out.println(s);
        consumer.accept("哈哈哈");

        System.out.println("===================  該接口就一個抽象方法get方法,該接口在JAVA8之函數(shù)式接口返回實例篇中第一個示例就是利用的該接口  ===================");
        System.out.println("===================  不用傳入任何參數(shù),直接返回一個泛型T的實例.就如同無參構(gòu)造一樣  ===================");
        Supplier<LombokTest4> supplier = LombokTest4 :: new;
        LombokTest4 r1 = supplier.get();
        LombokTest4 r2 = supplier.get();
        System.out.println("實例1="+r1);
        System.out.println("實例2="+r2);
        System.out.println("實例2=實例2,"+(r1==r2));

        System.out.println("===================  消費函數(shù)接口  沒有返回值  ===================");
        BiFunction<Integer,Integer,String> biFunction = (x,y) -> {
         return x+y+"";
        };
        System.out.println(biFunction.apply(5, 8));

結(jié)果

===================  斷言函數(shù)接口  ===================
false
===================  消費函數(shù)接口  沒有返回值  ===================
哈哈哈
===================  該接口就一個抽象方法get方法,該接口在JAVA8之函數(shù)式接口返回實例篇中第一個示例就是利用的該接口  ===================
===================  不用傳入任何參數(shù),直接返回一個泛型T的實例.就如同無參構(gòu)造一樣  ===================
實例1=com.lombok.LombokTest4@76fb509a
實例2=com.lombok.LombokTest4@300ffa5d
實例2=實例2,false
===================  消費函數(shù)接口  沒有返回值  ===================
13

方法的引用

class Dog{

    private String name = "哮天犬";
    // 默認(rèn)十斤狗糧
    private int food = 100;

    public Dog(){}
    public Dog(String name){
        this.name = name;
    }

    /**
     * 狗叫的靜態(tài)方法
     * @param dog
     */
    public static void bark(Dog dog){
        System.out.println(dog.name + "叫了");
    }

    /**
     * 吃狗糧
     * jdk 默認(rèn)會把當(dāng)前實例傳入到非靜態(tài)方法,參數(shù)名為 this,位置是第一個
     * 所以子在使用類名引用的時候?qū)嶋H上是有兩個參數(shù)
     * 所以在費靜態(tài)方法中可以使用 this 關(guān)鍵字
     * @param num
     * @return
     */
    public int eat(Dog this,int num){
        System.out.println("吃了"+num+"斤狗糧");
        this.food -= num;
        return this.food;
    }

    @Override
    public String toString() {
        return this.name;
    }
}

public class LombokTest5 {

    public static void main(String[] args) {
        // 方法調(diào)用
        Consumer<String> consumer = s -> System.out.println(s);
        consumer.accept("-----");
        // lombok 是一個匿名的函數(shù)开泽,左邊是方法參數(shù)牡拇,右邊是執(zhí)行體
        // 當(dāng)執(zhí)行體的參數(shù)和箭頭左邊的類型是一樣的時候就可以縮寫,寫成方法引用的形式
        Consumer<String> consumer1 = System.out::println;
        consumer1.accept("這是方法調(diào)用");

        System.out.println("=================  靜態(tài)方法的方法引用 類名+方法名 ================");
        Consumer<Dog> dog = Dog::bark;
        dog.accept(new Dog());

        System.out.println("=================  非靜態(tài)方法的方法引用 對象實例+方法名 ================");
        Dog d = new Dog();
        Function<Integer,Integer> function = d::eat;
        System.out.println("還剩下"+function.apply(3)+"斤狗糧");
        System.out.println("=================  可以看到方法的輸入輸出類型一樣穆律,可以使用一元函數(shù)式接口 對象實例+方法名 ================");
        UnaryOperator<Integer> unaryOperator = d::eat;
        System.out.println("還剩下"+unaryOperator.apply(5)+"斤狗糧");
        System.out.println("=================  還可以使用基本函數(shù)式接口 對象實例+方法名 ================");
        IntUnaryOperator intUnaryOperator = d::eat;
        System.out.println("還剩下"+intUnaryOperator.applyAsInt(1)+"斤狗糧");
        System.out.println("=================  非靜態(tài)方法的方法引用 類名+方法名 ================");
        System.out.println("=================  jdk 默認(rèn)會把當(dāng)前實例傳入到非靜態(tài)方法惠呼,參數(shù)名為 this,位置是第一個 ================");
        System.out.println("=================  所以子在使用類名引用的時候?qū)嶋H上是有兩個參數(shù) ================");
        System.out.println("=================  所以在費靜態(tài)方法中可以使用 this 關(guān)鍵字 ================");
        System.out.println();
        System.out.println("                   public int eat(Dog this,int num){ ");
        System.out.println("                        System.out.println(\"吃了\"+num+\"斤狗糧\");");
        System.out.println("                        this.food -= num; ");
        System.out.println("                        this.food -= num;");
        System.out.println("                        return this.food;");
        System.out.println("                   }  ");
        BiFunction<Dog,Integer,Integer> biFunction = Dog::eat;
        System.out.println("還剩下"+biFunction.apply(new Dog(), 8)+"斤狗糧");
        System.out.println("=================  沒有參數(shù)的構(gòu)造函數(shù)的方法引用 ================");
        Supplier<Dog> supplier = Dog::new;
        System.out.println("創(chuàng)建新的對象:"+supplier.get());
        System.out.println("=================  帶參數(shù)的構(gòu)造函數(shù)的方法引用 ================");
        Function<String,Dog> func = Dog::new;
        System.out.println("創(chuàng)建新的對象:"+func.apply("旺財"));
    }
}

結(jié)果

-----
這是方法調(diào)用
=================  靜態(tài)方法的方法引用 類名+方法名 ================
哮天犬叫了
=================  非靜態(tài)方法的方法引用 對象實例+方法名 ================
吃了3斤狗糧
還剩下97斤狗糧
=================  可以看到方法的輸入輸出類型一樣,可以使用一元函數(shù)式接口 對象實例+方法名 ================
吃了5斤狗糧
還剩下92斤狗糧
=================  還可以使用基本函數(shù)式接口 對象實例+方法名 ================
吃了1斤狗糧
還剩下91斤狗糧
=================  非靜態(tài)方法的方法引用 類名+方法名 ================
=================  jdk 默認(rèn)會把當(dāng)前實例傳入到非靜態(tài)方法峦耘,參數(shù)名為 this,位置是第一個 ================
=================  所以子在使用類名引用的時候?qū)嶋H上是有兩個參數(shù) ================
=================  所以在費靜態(tài)方法中可以使用 this 關(guān)鍵字 ================

                   public int eat(Dog this,int num){ 
                        System.out.println("吃了"+num+"斤狗糧");
                        this.food -= num; 
                        this.food -= num;
                        return this.food;
                   }  
吃了8斤狗糧
還剩下92斤狗糧
=================  沒有參數(shù)的構(gòu)造函數(shù)的方法引用 ================
創(chuàng)建新的對象:哮天犬
=================  帶參數(shù)的構(gòu)造函數(shù)的方法引用 ================
創(chuàng)建新的對象:旺財

類型推斷

lombok 表達(dá)式是一個匿名函數(shù)剔蹋,它最終返回了一個實現(xiàn)了指定接口的對象,所以要告訴它實現(xiàn)了哪一個接口辅髓,否則就會報錯泣崩,這就是類型推斷

@FunctionalInterface
interface IMath{
    int add(int x,int y);
}

@FunctionalInterface
interface IMath2{
    int add(int x,int y);
}

public class LombokTeset6 {
    public static void main(String[] args) {

        // 變量類型定義
        IMath iMath = (x,y) -> x+y;
        System.out.println(iMath.add(1,3));

        // 數(shù)組里
        IMath[] iMaths = {(x,y)->x+y,(x,y)->x*y};
        System.out.println(iMaths[0].add(1,2));
        System.out.println(iMaths[1].add(3,4));

        // 強轉(zhuǎn)
        Object obj = (IMath)(x,y) -> x+y;

        // 通過類型返回
        IMath iMath1 = createIMath();

        // 通過方法傳入 等價于 變量類型定義
        LombokTeset6 lombokTeset6 = new LombokTeset6();
        // 注意如果有方法重載需要指定調(diào)用哪一個方法,強轉(zhuǎn)即可
        lombokTeset6.test((IMath) (x,y) -> x+y);
        lombokTeset6.test((IMath2) (x,y) -> x+y);
    }

    public void test(IMath math){}
    public void test(IMath2 math){}

    public static IMath createIMath(){
        return (x,y) -> x / y;
    }
}

變量引用

        String str = "aaa";
        Consumer<String> consumer = s -> System.out.println(s+str);

這樣寫是沒有問題的洛口,大家都知道 jdk 1.8 之前匿名類應(yīng)用外部變量必須是 final 的類型矫付,jdk 1.8 之后就不用了嗎?實際上也是需要寫的第焰,只是在 jdk 1.8 默認(rèn)可以不寫买优,不加 final 關(guān)鍵字,是實際上變量也是不能修改的挺举,試著改一下


image.png

提示已經(jīng)很清晰了而叼,必須是一個 final 或者實際上使 final 就是說初始化之后就不能最修改了。
那么為什么匿名類引用外部變量不能修改呢豹悬?實際上在 java 里面參數(shù)傳參傳的是值而不是引用葵陵,外部變量和引用的變量都指向了同一個對象,當(dāng)匿名類應(yīng)用外部外部變量的時候瞻佛,如果外部變量改變了那么其實應(yīng)用的變量和外部變量是沒有任何關(guān)系了脱篙,那么就有可能導(dǎo)致程序的不準(zhǔn)確性,那么這時引用外部變量又有什么意義呢伤柄?所以外部變量是不能修改的绊困。

級聯(lián)表達(dá)式和柯里化

/**
 * 級聯(lián)表達(dá)式
 */
public class LombokTest8 {
    public static void main(String[] args) {

        System.out.println("====================  級聯(lián)表達(dá)式  ==================");

        /**
         * 例如: x -> y -> x+y
         * 我們知道 lombda 表達(dá)式 左邊為參數(shù),右邊是函數(shù)适刀,所有者個表倒是最終會返回一個這樣的函數(shù)
         * 這里暫且認(rèn)為是 Integer 類型
         * Function<Integer,Function<Integer,Integer>> function = x ->y -> x+y;
         */
        // 這就是級聯(lián)表達(dá)式
        Function<Integer,Function<Integer,Integer>> function = x ->y -> x+y;
        System.out.println(function.apply(1).apply(2));

        System.out.println("====================  柯里化  ==================");
        System.out.println("====================  柯里化:把多個參數(shù)的函數(shù)轉(zhuǎn)換為只有一個參數(shù)的函數(shù)  ==================");
        System.out.println("====================  柯里化的目的:函數(shù)的標(biāo)準(zhǔn)化  ==================");

        Function<Integer,Function<Integer,Function<Integer ,Integer>>> f = x -> y -> z -> x * y * z;
        System.out.println(f.apply(1).apply(2).apply(3));

        System.out.println("====================  柯里化的循環(huán)調(diào)用  ==================");
        int num[] = {4,5,6};
        for (int i = 0; i < num.length; i++) {
            if(f instanceof Function){
                Object obj = f.apply(num[i]);
                if(obj instanceof  Function){
                    f =(Function) obj;
                }else{
                    System.out.println("調(diào)用結(jié)果為:"+obj);
                }
            }
        }
    }
}

結(jié)果

====================  級聯(lián)表達(dá)式  ==================
3
====================  柯里化  ==================
====================  柯里化:把多個參數(shù)的函數(shù)轉(zhuǎn)換為只有一個參數(shù)的函數(shù)  ==================
====================  柯里化的目的:函數(shù)的標(biāo)準(zhǔn)化  ==================
6
====================  柯里化的循環(huán)調(diào)用  ==================
調(diào)用結(jié)果為:120

stream 流編程

概念

首先它是一個高級的迭代器(Iterator),它不是一個數(shù)據(jù)結(jié)構(gòu)秤朗,不是一個集合,它不會存放數(shù)據(jù)笔喉,stream 關(guān)注的是怎樣把數(shù)據(jù)高效的處理取视,它其實就是把數(shù)據(jù)在流水線里面處理硝皂,它和普通的 Iterator 不同的是,它可以并行遍歷作谭,普通的 Iterator 只能是串行稽物,在一個線程中執(zhí)行

外部迭代和內(nèi)部迭代

使用for等進(jìn)行迭代我們叫做外部迭代,使用stream流迭代叫做內(nèi)部迭代折欠,內(nèi)部迭代有什么好處贝或,當(dāng)數(shù)量很大是我們不需要對數(shù)據(jù)進(jìn)行拆分

        int[] nums = {4,5,6};

        // 外部迭代
        int sum = 0;
        for (int num : nums) {
            sum += num;
        }
        System.out.println("結(jié)果是:"+sum);

        // 使用 stream 的內(nèi)部迭代
        // 這個 of 可以理解為 new
        int sum2 = IntStream.of(nums).sum();
        System.out.println("結(jié)果是:"+sum2);

結(jié)果都是一樣的

中間操作/種植操作和惰性求值

中間操作:返回stream的操作
終止操作:得到特定的結(jié)果
惰性求值:終止沒有調(diào)用的情況下,中間操作不會執(zhí)行

public class StreamTest {
    public static void main(String[] args) {
        int[] nums = {4,5,6};

        // 外部迭代
        int sum = 0;
        for (int num : nums) {
            sum += num;
        }
        System.out.println("結(jié)果是:"+sum);

        // 使用 stream 的內(nèi)部迭代
        // 這個 of 可以理解為 new
        int sum2 = IntStream.of(nums).sum();
        System.out.println("結(jié)果是:"+sum2);

        // map 就是中間操作锐秦,返回 stream 的的操作
        // sum 就是總之操作
        int result = IntStream.of(nums).map(StreamTest::mulitplyNum).sum();
        System.out.println("結(jié)果為:"+result);
        System.out.println("=====================  惰性求值就是始終沒有調(diào)用的情況下咪奖,中間操作就會執(zhí)行  =====================");
        // 這一沒有調(diào)用 sum 終止操作不會執(zhí)行
        IntStream.of(nums).map(StreamTest::mulitplyNum);
    }


    public static int mulitplyNum(int i){
        System.out.println("執(zhí)行了乘以 8 的操作");
        return i * 8;
    }
}
流的創(chuàng)建
image.png
        List<String> list = new ArrayList<String>();
        list.add("a");
        list.add("b");
        list.add("c");
        // 從集合創(chuàng)建
        list.stream().forEach(System.out::print);
        list.parallelStream();

        // 從數(shù)組創(chuàng)建
        Arrays.stream(new int[]{3,6,9});

        // 創(chuàng)建數(shù)字流
        IntStream.of(4,5,6);
        IntStream.rangeClosed(1,10);

        // 使用 random 創(chuàng)建一個無限流
        new Random().ints().limit(10);

        // 自己生產(chǎn)流
        Random ran = new Random();
        Stream.generate(() -> ran.nextInt()).limit(20);
中間操作
image.png
無狀態(tài),有狀態(tài)操作

無狀態(tài)操作就是表示當(dāng)前的操作跟其他元素沒有依賴關(guān)系酱床,有狀態(tài)就是表示當(dāng)前的結(jié)果需要依賴有些其他的元素赡艰,好比一個排序操作,就需要依賴所有的元素都計算完畢斤葱,它才有一個最終的排序結(jié)果慷垮,這就是一個有狀態(tài)的操作。

    public static void main(String[] args) {

        String str = "my name is 007 v";

        System.out.println("=================  把每個單詞的長度調(diào)出來  =================");
        // filter 里是一個斷言揍堕,比如只打印長度大于 1 的長度
        Stream.of(str.split(" ")).filter(s -> s.length() > 1)
                .map(s -> s+":"+s.length()+",").forEach(System.out::print);

        System.out.println();

        // flatMap A -> B 屬性(是個集合)料身,最終得到所有的 A 元素里面的所有 B 屬性集合
        // intStream / longStream 并不是 Stream 的子類,所以要進(jìn)行裝箱 boxed
        Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed()).forEach(i -> System.out.print(i+","));
        System.out.println();
        // 數(shù)組 char 類型
        Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed()).forEach(i -> System.out.print((char)i.intValue()+","));
        System.out.println();
        System.out.println("=================  peek 用戶 debug ,是個中間操作衩茸,和 forEach 是終止操作  =================");
        // 這里這里會打印兩次芹血,peek 一次  ,forEach 一次
        Stream.of(str.split(" ")).peek(System.out::println).forEach(System.out::println);

        System.out.println("=================  limit 使用楞慈,主要用于無限流  =================");
        // 創(chuàng)建一個無限流幔烛,不出出錯,但是不會停止
        // new Random().ints().forEach(System.out::println);
        // 創(chuàng)建 10 個 大于 100 小于 1000 的數(shù)
        new Random().ints().filter(i -> i > 100 && i < 1000).limit(10)
                .forEach(System.out::println);
    }

結(jié)果

=================  把每個單詞的長度調(diào)出來  =================
my:2,name:4,is:2,007:3,
109,121,110,97,109,101,105,115,48,48,55,118,
m,y,n,a,m,e,i,s,0,0,7,v,
=================  peek 用戶 debug ,是個中間操作囊蓝,和 forEach 是終止操作  =================
my
my
name
name
is
is
007
007
v
v
=================  limit 使用饿悬,主要用戶無限流  =================
[681, 341, 299, 861, 934, 365, 277, 747, 759, 690]
終止操作
image.png
    public static void main(String[] args) {
        String str = "my name is 007";
        System.out.println();
        System.out.println("================= 使用并行 (順序被打亂了) =================");
        str.chars().parallel().forEach(i -> System.out.print((char)i));
        System.out.println();
        System.out.println("================= 使用forEachOrdered 保證順序 =================");
        str.chars().parallel().forEachOrdered(i -> System.out.print((char)i));
        System.out.println();
        System.out.println("================= 收集到 List =================");
        List<String> collect = Stream.of(str.split(" ")).collect(Collectors.toList());
        System.out.println(collect.toString());
        System.out.println("================= 使用 reduce 拼接字符串 =================");
        Optional<String> reduce = Stream.of(str.split(" ")).reduce((x, y) -> x + "-" + y);
        // 這種如果返回空 會拋異常 throw new NoSuchElementException("No value present");
        System.out.println(reduce.get());
        // 如果沒有返回空串
        System.out.println(reduce.orElse(""));
        // 帶初始值化值得 reduce
        String reduce2 = Stream.of(str.split(" ")).reduce("", (x, y) -> x + "|" + y);
        System.out.println(reduce2);
        // 獲取單詞總長度
        Integer reduce1 = Stream.of(str.split(" ")).map(s -> s.length()).reduce(0, (x, y) -> x + y);
        System.out.println("獲取字符串長度:"+reduce1);
        System.out.println("================= max 的使用 =================");
        Optional<String> max = Stream.of(str.split(" ")).max((s1, s2) -> s1.length() - s2.length());
        System.out.println("長度最長的單詞= "+max.get());
        System.out.println("================= 使用 findFirst 短路操作 =================");
        // ints() 是無限流 是不會終止的 使用 短路操作會使其終止
        OptionalInt first = new Random().ints().findFirst();
        System.out.println(first.getAsInt());
    }

結(jié)果

================= 使用并行 (順序被打亂了) =================
 is07 0namemy 
================= 使用forEachOrdered 保證順序 =================
my name is 007
================= 收集到 List =================
[my, name, is, 007]
================= 使用 reduce 拼接字符串 =================
my-name-is-007
my-name-is-007
|my|name|is|007
獲取字符串長度:11
================= max 的使用 =================
長度最長的單詞= name
================= 使用 findFirst 短路操作 =================
-555829999
并行流
public class StreamTest5 {
    public static void main(String[] args) {

        System.out.println("============  peek 串行流 單線程  ==============");
        // 單線程
        IntStream.range(1,5).peek(StreamTest5::debug).count();
        System.out.println("============  parallel 并行流 多線程  ==============");
        // 多線程并行調(diào)用 parallel
        IntStream.range(1,5 ).parallel().peek(StreamTest5::debug).count();
        System.out.println("============  現(xiàn)在要實現(xiàn)一個這樣的效果,先并行聚霜,在串行  ==============");
        System.out.println("============  多次調(diào)用 parallel / sequential ,以最后此一次調(diào)用為準(zhǔn)  ==============");
        IntStream.range(1,5)
                //調(diào)用 parallel 產(chǎn)生并行流
                .parallel().peek(StreamTest5::debug)
                // 調(diào)用 sequential 產(chǎn)生串行流
                .sequential().peek(StreamTest5::debug)
                .count();

        System.out.println("============  并行流使用的線程池是:ForkJoinPool.commonPool  ==============");
        System.out.println("============  默認(rèn)線程數(shù)是當(dāng)前 CPU 的個數(shù)  ==============");
        System.out.println("============  使用 System.setProperty(\"java.util.concurrent.ForkJoinPoll.common.parallelism\",\"20\") 修改默認(rèn)線程數(shù)  ==============");
        System.setProperty("java.util.concurrent.ForkJoinPoll.common.parallelism","20");
        IntStream.range(1,5).parallel().peek(StreamTest5::debug2).count();

        System.out.println("============  使用自己的線程池狡恬,不使用默認(rèn)的線程池,防止任務(wù)被阻塞  ==============");
        ForkJoinPool pool = new ForkJoinPool(20);
        pool.submit(() -> IntStream.range(1,5).parallel()
        .peek(StreamTest5::debug2).count());
        
        // 關(guān)閉線程池
        pool.shutdown();

        // 讓守護(hù)線程等待蝎宇,防止主線程運行完 看不到效果
        synchronized (pool){
            try {
                pool.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void debug(int i){
        System.out.println("debug:"+i);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void debug2(int i){
        System.out.println(Thread.currentThread().getName()+" debug:"+i);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

結(jié)果

============  peek 串行流 單線程  ==============
debug:1
debug:2
debug:3
debug:4
============  parallel 并行流 多線程  ==============
debug:3
debug:2
debug:1
debug:4
============  現(xiàn)在要實現(xiàn)一個這樣的效果弟劲,先并行,在串行  ==============
============  多次調(diào)用 parallel / sequential ,以最后此一次調(diào)用為準(zhǔn)  ==============
debug:1
debug:1
debug:2
debug:2
debug:3
debug:3
debug:4
debug:4
============  并行流使用的線程池是:ForkJoinPool.commonPool  ==============
============  默認(rèn)線程數(shù)是當(dāng)前 CPU 的個數(shù)  ==============
============  使用 System.setProperty("java.util.concurrent.ForkJoinPoll.common.parallelism","20") 修改默認(rèn)線程數(shù)  ==============
main debug:3
ForkJoinPool.commonPool-worker-0 debug:2
main debug:4
ForkJoinPool.commonPool-worker-0 debug:1
============  使用自己的線程池姥芥,不使用默認(rèn)的線程池兔乞,防止任務(wù)被阻塞  ==============
ForkJoinPool-1-worker-25 debug:3
ForkJoinPool-1-worker-18 debug:2
ForkJoinPool-1-worker-4 debug:1
ForkJoinPool-1-worker-11 debug:4
收集器
class Student{
    private int id;        // 編號
    private String name;   // 年齡
    private int grade;    // 班級
    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getGrade() {
        return grade;
    }

    public void setGrade(int grade) {
        this.grade = grade;
    }

    @Override
    public String toString() {
        return "Student{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", grade=" + grade +
                '}';
    }

    public Student(int id, String name, int grade) {
        this.id = id;
        this.name = name;
        this.grade = grade;
    }
}
public class StreamTest6 {
    public StreamTest6() {
    }

    public static void main(String[] args) {
        List<Student> students = Arrays.asList(
                new Student(1,"張三",1),
                new Student(2,"李四",2),
                new Student(3,"王五",3),
                new Student(4,"趙六",3),
                new Student(5,"張三",2));
        System.out.println("=====================  得到所有學(xué)生的 id  ====================");
        System.out.println("=====================  s -> s.getAge() == Student::getAge  ====================");
        System.out.println("=====================  Student::getAge 不會多生成一個類似 lambda$0 這樣的函數(shù),推薦使用 ====================");
        // 轉(zhuǎn) List
        List<Integer> ids = students.stream().map(Student::getId).collect(Collectors.toList());
        System.out.println("id列表:"+ids);
        System.out.println("=====================  得到所有學(xué)生的 姓名  ====================");
        // 轉(zhuǎn) Set
        Set<String> names = students.stream().map(Student::getName).collect(Collectors.toSet());
        System.out.println("姓名列表:"+names);
        // 轉(zhuǎn) TreeSet
        students.stream().map(Student::getName).collect(Collectors.toCollection(TreeSet::new));
        System.out.println("姓名列表:"+names);
        System.out.println("=====================  統(tǒng)計匯總信息  ====================");
        IntSummaryStatistics summaryStatistics = students.stream().collect(Collectors.summarizingInt(Student::getId));
        System.out.println("年齡匯總信息:"+summaryStatistics);
        System.out.println("=====================  分塊(根據(jù)條件分成,true/false 兩塊)  ====================");
        Map<Boolean, List<Student>> collect = students.stream().collect(Collectors.partitioningBy(s -> s.getName() == "趙六"));
        System.out.println(collect);
        System.out.println("=====================  分組  ====================");
        Map<Integer, List<Student>> group = students.stream().collect(Collectors.groupingBy(Student::getGrade));
        System.out.println(group);
        System.out.println("=====================  分組 得到每個班級學(xué)生的列表 ====================");
        Map<Integer, Long> groupCount = students.stream().collect(Collectors.groupingBy(Student::getGrade, Collectors.counting()));
        System.out.println("每個班級學(xué)生的人數(shù):"+groupCount);
    }
}

結(jié)果

=====================  得到所有學(xué)生的 id  ====================
=====================  s -> s.getAge() == Student::getAge  ====================
=====================  Student::getAge 不會多生成一個類似 lambda$0 這樣的函數(shù)庸追,推薦使用 ====================
id列表:[1, 2, 3, 4, 5]
=====================  得到所有學(xué)生的 姓名  ====================
姓名列表:[李四, 張三, 王五, 趙六]
姓名列表:[李四, 張三, 王五, 趙六]
=====================  統(tǒng)計匯總信息  ====================
年齡匯總信息:IntSummaryStatistics{count=5, sum=15, min=1, average=3.000000, max=5}
=====================  分塊(根據(jù)條件分成霍骄,true/false 兩塊)  ====================
{false=[Student{id=1, name='張三', grade=1}, Student{id=2, name='李四', grade=2}, Student{id=3, name='王五', grade=3}, Student{id=5, name='張三', grade=2}], true=[Student{id=4, name='趙六', grade=3}]}
=====================  分組  ====================
{1=[Student{id=1, name='張三', grade=1}], 2=[Student{id=2, name='李四', grade=2}, Student{id=5, name='張三', grade=2}], 3=[Student{id=3, name='王五', grade=3}, Student{id=4, name='趙六', grade=3}]}
=====================  分組 得到每個班級學(xué)生的列表 ====================
每個班級學(xué)生的人數(shù):{1=1, 2=2, 3=2}

JDK9 Reactive Stream

概念

Reactive Stream (響應(yīng)式流/反應(yīng)流) 是JDK9引入的一套標(biāo)準(zhǔn),是一套基于發(fā)布/訂閱模式的數(shù)據(jù)處理規(guī)范锚国。響應(yīng)式流從2013年開始腕巡,作為提供非阻塞背壓的異步流處理標(biāo)準(zhǔn)的倡議玄坦。 它旨在解決處理元素流的問題——如何將元素流從發(fā)布者傳遞到訂閱者血筑,而不需要發(fā)布者阻塞,或訂閱者有無限制的緩沖區(qū)或丟棄煎楣。更確切地說豺总,Reactive流目的是“找到最小的一組接口,方法和協(xié)議择懂,用來描述必要的操作和實體以實現(xiàn)這樣的目標(biāo):以非阻塞背壓方式實現(xiàn)數(shù)據(jù)的異步流”喻喳。

被壓

“背壓(反壓)back pressure”概念很關(guān)鍵。首先異步消費者會向生產(chǎn)者訂閱接收消息困曙,然后當(dāng)有新的信息可用時表伦,消費者會通過之前訂閱時提供的回調(diào)函數(shù)被再次激活調(diào)用。如果生產(chǎn)者發(fā)出的信息比消費者能夠處理消息最大量還要多慷丽,消費者可能會被迫一直在抓消息蹦哼,耗費越來越多的資源,埋下潛在的崩潰風(fēng)險要糊。為了防止這一點纲熏,需要有一種機(jī)制使消費者可以通知生產(chǎn)者,降低消息的生成速度锄俄。生產(chǎn)者可以采用多種策略來實現(xiàn)這一要求局劲,這種機(jī)制稱為背壓。

響應(yīng)式流模型非常簡單——訂閱者向發(fā)布者發(fā)送多個元素的異步請求奶赠,發(fā)布者向訂閱者異步發(fā)送多個或稍少的元素鱼填。響應(yīng)式流會在pull模型和push模型流處理機(jī)制之間動態(tài)切換。 當(dāng)訂閱者較慢時毅戈,它使用pull模型剔氏,當(dāng)訂閱者更快時使用push模型。

簡單來說竹祷,在響應(yīng)式流下訂閱者可以與發(fā)布者溝通谈跛,如果使用JMS就應(yīng)該知道,訂閱者只能被動接收發(fā)布者所產(chǎn)生的消息數(shù)據(jù)塑陵。這就好比沒有水龍頭的水管一樣感憾,我只能被動接收水管里流過來的水,無法關(guān)閉也無法減少。而響應(yīng)式流就相當(dāng)于給水管加了個水龍頭阻桅,在消費者這邊可以控制水流的增加凉倚、減少及關(guān)閉。

Reactive Stream 主要接口

JDK9 通過java.util.concurrent.Flow 和java.util.concurrent.SubmissionPublisher 類來實現(xiàn)響應(yīng)式流嫂沉。在JDK9里Reactive Stream的主要接口聲明在Flow類里稽寒,F(xiàn)low 類中定義了四個嵌套的靜態(tài)接口,用于建立流量控制的組件趟章,發(fā)布者在其中生成一個或多個供訂閱者使用的數(shù)據(jù)項:

  • Publisher:數(shù)據(jù)項發(fā)布者杏糙、生產(chǎn)者
  • Subscriber:數(shù)據(jù)項訂閱者、消費者
  • Subscription:發(fā)布者與訂閱者之間的關(guān)系紐帶蚓土,訂閱令牌
  • Processor:數(shù)據(jù)處理器只能被動接收水管里流過來的水宏侍,無法關(guān)閉也無法減少。而響應(yīng)式流就相當(dāng)于給水管加了個水龍頭蜀漆,在消費者這邊可以控制水流的增加谅河、減少及關(guān)閉。
具體案例
public class Jdk9Test {
    public static void main(String[] args) throws InterruptedException {
        // 1. 定義發(fā)布者确丢,發(fā)布的數(shù)據(jù)類型是 Integer
        // 直接使用 jdk 自帶的 SubmissionPublisher,他實現(xiàn)了 Publisher 接口
        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<Integer>();

        // 2. 定義訂閱者
        Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<Integer>(){

            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                // 保存訂閱關(guān)系绷耍,需要用它來給發(fā)布者響應(yīng)
                this.subscription = subscription;
                // 請求一個數(shù)據(jù)
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                // 接受一個數(shù)據(jù),處理
                System.out.println("接受到數(shù)據(jù):"+item);
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 處理完調(diào)用 request 在請求一個數(shù)據(jù)
                this.subscription.request(1);
                // 或者已經(jīng)達(dá)到了目標(biāo)鲜侥,調(diào)用 cancel 告訴發(fā)布者不要在接受數(shù)據(jù)了
                //this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出現(xiàn)了異常(例如處理數(shù)據(jù)的時候產(chǎn)生了異常)
                throwable.printStackTrace();
                // 我們也可以告訴發(fā)布者褂始,后面不需要在接收數(shù)據(jù)了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部數(shù)據(jù)處理完了(發(fā)布者關(guān)閉了)
                System.out.println("處理完了!");
            }
        };

        // 3. 發(fā)布者和訂閱者建立訂閱關(guān)系
        publisher.subscribe(subscriber);

        // 4. 產(chǎn)生數(shù)據(jù)剃毒,并發(fā)布
        //    這里忽略數(shù)據(jù)產(chǎn)生的過程
        for (int i = 0; i < 2; i++) {
            System.out.println("生成數(shù)據(jù):"+i);
            // submit 是個 block 方法
            publisher.submit(i);
        }

        // 5. 結(jié)束后 關(guān)閉發(fā)布者
        // 正式環(huán)境病袄,應(yīng)該放 finally 或者 try-resouce 確保關(guān)閉
        publisher.close();

        // 主線程延遲停止,否則數(shù)據(jù)沒有消費就退出了
        Thread.currentThread().join(1000);
        // debug 的時候赘阀,下面這行需要有斷點
        // 否則主線程結(jié)束無法 debug
        System.out.println();
    }
}
結(jié)果
生成數(shù)據(jù):0
生成數(shù)據(jù):1
接受到數(shù)據(jù):0

接受到數(shù)據(jù):1
處理完了益缠!
/**
 * 帶 process 的 flow demo
 */
class MyPocessor extends SubmissionPublisher<String> implements Flow.Processor<Integer,String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        // 保存訂閱關(guān)系,需要用他們來給發(fā)布者效應(yīng)
        this.subscription = subscription;
        // 請求一個數(shù)據(jù)
        this.subscription.request(1);
    }

    @Override
    public void onNext(Integer item) {
        // 接受到一個數(shù)據(jù)基公,處理
        System.out.println("處理器接收到了數(shù)據(jù):"+item);
        // 過濾掉小于 0 的數(shù)據(jù)幅慌,然后發(fā)布出去
        if(item > 0){
            this.submit("轉(zhuǎn)換后的數(shù)據(jù):"+item);
        }
        // 處理完調(diào)用 request 在請求一個數(shù)據(jù)
        this.subscription.request(1);
        // 或者已經(jīng)達(dá)到了目標(biāo),調(diào)用 cancel 告訴發(fā)布者不要在接受數(shù)據(jù)了
        // this.subscription.cancel();
    }

    @Override
    public void onError(Throwable throwable) {
        // 出現(xiàn)了異常(例如處理數(shù)據(jù)的時候產(chǎn)生了異常)
        throwable.printStackTrace();
        // 我們可以告訴發(fā)布者轰豆,后面不在接收數(shù)據(jù)了
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        // 全部數(shù)據(jù)處理完了(發(fā)布者關(guān)閉了)
        System.out.println("處理器處理完了胰伍!");
        // 關(guān)不發(fā)布者
        this.close();
    }
}

public class Jdk9Test2 {
    public static void main(String[] args) throws InterruptedException {
        // 1. 定義發(fā)布者,發(fā)布數(shù)據(jù)類型是 Integer
        // 直接使用 jdk 自帶的 SubmissionPublisher
        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<Integer>();
        // 2. 定義處理器酸休,對數(shù)據(jù)進(jìn)行過濾骂租,并轉(zhuǎn)換為 String 類型
        MyPocessor pocessor = new MyPocessor();
        // 3. 發(fā)布者和處理器建立訂閱關(guān)系
        publisher.subscribe(pocessor);
        // 4. 定于最終訂閱者,消費 String 數(shù)據(jù)類型
        Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>(){

            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                // 保存訂閱關(guān)系斑司,需要用它來給發(fā)布者發(fā)布響應(yīng)
                this.subscription = subscription;
                // 請求一個數(shù)據(jù)
                this.subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                // 接收到一個數(shù)據(jù)渗饮,處理
                System.out.println("接收到數(shù)據(jù):"+item);
                // 處理完調(diào)用 request 在請求一個數(shù)據(jù)
                this.subscription.request(1);
                // 或者 已經(jīng)達(dá)到了目標(biāo),調(diào)用 cancel 告訴發(fā)布者不要在接收數(shù)據(jù)了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出現(xiàn)了異常(例如處理數(shù)據(jù)的時候產(chǎn)生了異常)
                throwable.printStackTrace();
                // 我們可以告訴發(fā)布者,后面不在接收數(shù)據(jù)了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部數(shù)據(jù)處理完(發(fā)布者關(guān)閉了)
                System.out.println("處理完了互站!");
            }
        };

        // 5. 處理器和最終訂閱者建立訂閱關(guān)系
        pocessor.subscribe(subscriber);
        // 6. 產(chǎn)生數(shù)據(jù)私蕾,并發(fā)布
        // 這里忽略數(shù)據(jù)產(chǎn)生過程
        publisher.submit(999);
        publisher.submit(666);
        // 7. 結(jié)束后,關(guān)閉發(fā)布者
        // 正式環(huán)境應(yīng)該放 finally 或者 try-resouce 確保關(guān)閉
        publisher.close();
        // 主線程延遲停止胡桃,否則數(shù)據(jù)沒有消費就退出
        Thread.currentThread().join(1000);
        System.out.println();
    }
}
結(jié)果
處理器接收到了數(shù)據(jù):999
處理器接收到了數(shù)據(jù):666
處理器處理完了踩叭!
接收到數(shù)據(jù):轉(zhuǎn)換后的數(shù)據(jù):999
接收到數(shù)據(jù):轉(zhuǎn)換后的數(shù)據(jù):666
處理完了!

Spring WebFlux

image.png
概念

它是 spring5 提出的一種新的開發(fā) web 的技術(shù)站翠胰,它是非阻塞的開發(fā)模式容贝,它運行的 netty 或者說 servlet3.1 的容器里面,它可以支持非常多的并發(fā)量亡容,也就是說我們現(xiàn)在開發(fā) web 服務(wù)多了一個選擇嗤疯,可以使用以前的 mvc 開發(fā)模式冤今,也可以使用現(xiàn)在新的 webFlux 開發(fā)模式闺兢。

WebFlux 和 Spring MVC 關(guān)系
  • 首先WebFlux 是 non-blocking 非阻塞的開發(fā)模式而傳統(tǒng)的 mvc 是同步的阻塞開發(fā)模式,非阻塞就是我們可以在一個線程里面處理更加多的請求戏罢,而以前老的模式是一個請求對應(yīng)容器里的一個線程屋谭,這是最大的不同點
  • 第二個不通點就是運行的環(huán)境不一樣,老的開發(fā)模式是基于 Servlet API 所以它必須運行在 Servlet 容器里面龟糕,而 WebFlux 開發(fā)模式是基于Reactive Streams 響應(yīng)式流桐磁,它可以運行在 Servlet3.1 之后的容器,也就是支持一部 Servlet 的容器或者說運行在 netty 上面讲岁,其實 Spring 5 默認(rèn)的容器就是 netty我擂,
  • 第三個不同點,現(xiàn)在新的 WebFlux 都是不支持關(guān)系型數(shù)據(jù)庫的缓艳,我們以前用的 mysql ,oracle等校摩,這些數(shù)據(jù)庫都暫時無法使用
優(yōu)勢

最大的優(yōu)勢就是可以支持非常高的并發(fā)量,我們的應(yīng)用隨著發(fā)展并發(fā)量可能會越來越大阶淘,并發(fā)量高了之后衙吩,以前的應(yīng)用可能就承受不了了,這個時候我們就要進(jìn)行擴(kuò)展溪窒,擴(kuò)展分為兩種坤塞,一種叫水平擴(kuò)展,一種叫垂直擴(kuò)展澈蚌,簡單來說水平擴(kuò)展就是加人摹芙,垂直擴(kuò)展就是加班,家人當(dāng)人的比較容易的宛瞄,好比你以前一個節(jié)點處理不了這個請求浮禾,那么我們就加幾個節(jié)點,在處理不了就在加,一個加上去伐厌,那么垂直擴(kuò)展呢承绸,就是你人還是一個人,對應(yīng)我們的技術(shù)就是線程還是這么多線程但是你要處理更加多的請求挣轨,那么我們就可以采用 WebFlux 這種異步的模式军熏,可以讓他在相同的線程下面支持更加多的一個請求就可以達(dá)到一個垂直擴(kuò)展的目的,所以它是非常重要的一個卷扮,因為你水平擴(kuò)展它對資源的要求會比較高荡澎,所以需要更多的機(jī)器,花更多的錢晤锹,但我們的垂直擴(kuò)展它機(jī)器還是那么多機(jī)器摩幔,所以我們 SE 在設(shè)計框架的時候,當(dāng)你的請求量上去了之后鞭铆,我們第一步就可以把以前的同步 servlet 或衡,就是以前的 mvc 這種模式改成我們現(xiàn)在新的 WebFlux 模式,然后在進(jìn)行水平擴(kuò)展车遂,先垂直擴(kuò)展在水平擴(kuò)展封断,這就更加好的利用我們現(xiàn)有的資源,處理更加高的并發(fā)量舶担。

異步 Servlet

為什么要使用異步 setvlet坡疼? 同步的 servlet 阻塞了什么?
  • 同步 servlet
@WebServlet(urlPatterns = "/asynServlet")
public class SyncServelt extends HttpServlet {

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        doPost(req, resp);
    }

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        long startTime = System.currentTimeMillis();
        // 執(zhí)行業(yè)務(wù)代碼
        doSomeThing(req,resp);
        long endTime = System.currentTimeMillis();
        System.out.println("總耗時:"+(endTime-startTime)+"毫秒");
    }

    // 業(yè)務(wù)代碼
    private  void doSomeThing(HttpServletRequest request,HttpServletResponse response){
        try {
            // 睡眠 5 秒
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            response.getWriter().append("done");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

結(jié)果

總耗時:5007毫秒

可以看到同步的 servlet 從請求開始到請求結(jié)束總共耗時了 5 秒左右衣陶,從這里我們就可知道同步 servlet 到底阻塞了什么柄瑰?其實就是阻塞了 tomcat 容器的 servlet 線程,當(dāng)一個請求到達(dá) tomcat 容器之后剪况,tomcat 容器會給每一個請求開啟一個線程去處理教沾,而線程里面會調(diào)用具體的 servlet 去處理,當(dāng)你使用同步 servlet 的時候拯欧,你的業(yè)務(wù)代碼話費多長時間详囤,servlet 線程就要等待多長時間,這就是阻塞

  • 異步 servlet
@WebServlet(asyncSupported = true,urlPatterns = "/asyncServlet")
public class AsyncServlet extends HttpServlet {
    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        doPost(req,resp);
    }

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        long startTime = System.currentTimeMillis();

        // 開啟異步
        AsyncContext asyncContext = req.startAsync();

        // 執(zhí)行業(yè)務(wù)代碼
        CompletableFuture.runAsync(() -> doSomeThing(asyncContext,asyncContext.getRequest(),asyncContext.getResponse()));


        long endTime = System.currentTimeMillis();
        System.out.println("總耗時:"+(endTime-startTime)+"毫秒");
    }

    // 業(yè)務(wù)代碼
    private  void doSomeThing(AsyncContext asyncContext, ServletRequest request, ServletResponse response){
        try {
            // 睡眠 5 秒
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            response.getWriter().append("done");
        } catch (IOException e) {
            e.printStackTrace();
        }
        // 業(yè)務(wù)代碼處理完畢镐作,通知結(jié)束
        asyncContext.complete();
    }
}

結(jié)果

總耗時:0毫秒

從這里可以看出藏姐,異步的 servlet 幾乎沒有好使,這就是異步 servlet 的作用该贾,它主要作用就是不會阻塞 tomcat servlet 線程羔杨,它可以把一些耗時的業(yè)務(wù)放在一個獨立的線程池里面,那么我們的 servlet 線程就會立馬返回杨蛋,可以去處理下一個請求兜材,所以它就會使用比較少的線程來達(dá)到一個比較高的吞吐量理澎,這就是異步 servlet 的工作機(jī)制

總結(jié)

同步和異步都是后臺服務(wù)端的概念,服務(wù)器后臺才有異步這個概念曙寡,對于瀏覽器來說所有都是同步糠爬,不管后臺是同步 servlet 還是異步 servlet 它前臺都要花費相同的時間,第二個就是同步 servlet 到底阻塞了什么举庶,它其實是阻塞了 tomcat servlet 的線程执隧,使用異步 servlet 之后 ,servlet 線程就會立馬返回處理下一個請求户侥,所以它就可以達(dá)到高并發(fā)镀琉。

異步 servlet 是怎樣工作的

WebFlux 開發(fā)

reactor = jdk 8 stream + jdk 9 reactive stream
Mono 0 - 1 個元素
Flux 0 - N 個元素
例子

    public static void main(String[] args) {

        Subscriber<Integer> subscriber = new Subscriber<Integer>(){

            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                // 保存訂閱關(guān)系,需要用它來給發(fā)布者響應(yīng)
                this.subscription = subscription;
                // 請求一個數(shù)據(jù)
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                // 接受一個數(shù)據(jù)蕊唐,處理
                System.out.println("接受到數(shù)據(jù):"+item);
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 處理完調(diào)用 request 在請求一個數(shù)據(jù)
                this.subscription.request(1);
                // 或者已經(jīng)達(dá)到了目標(biāo)屋摔,調(diào)用 cancel 告訴發(fā)布者不要在接受數(shù)據(jù)了
                //this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出現(xiàn)了異常(例如處理數(shù)據(jù)的時候產(chǎn)生了異常)
                throwable.printStackTrace();
                // 我們也可以告訴發(fā)布者,后面不需要在接收數(shù)據(jù)了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部數(shù)據(jù)處理完了(發(fā)布者關(guān)閉了)
                System.out.println("處理完了替梨!");
            }
        };

        // reactor = jdk8 stream + jdk9 reactive stream
        // Mono 0-1 個元素
        // Flux 0-N 個元素

        String[] strs = {"1","2","3"};

        // 這里就是 jdk 8 的 Stream
        Flux.fromArray(strs).map(s -> Integer.parseInt(s))
        // 最終操作
        // 這里就是 jdk9 的 reactive stream
        .subscribe(subscriber);

    }

reactor 是一個流钓试,也是一個發(fā)布者,而它的最終操作就是訂閱

下面這兩個請求究竟有什么區(qū)別呢耙替?
@RestController
@Slf4j
public class TestController {

    @GetMapping("/1")
    public String get1(){
        log.info("get1 開始");
        String str = createStr();
        log.info("get1 結(jié)束");
        return str;
    }

    @GetMapping("/2")
    public Mono<String> get2(){
        log.info("get2 開始");
        Mono<String> str = Mono.fromSupplier(() -> createStr());
        log.info("get2 結(jié)束");
        return str;
    }

    private String createStr(){
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "create STR";
    }
}
get1 請求
image.png

可以開到總共花的大概 3 秒鐘時間

get2 請求
image.png

可以看到 新的 WebFlux 基本上是沒有耗時的亚侠,從這里就能說明新的 WebFlux 返回的 Mono 實際上是一個流曹体,由于它沒有調(diào)用最終操作所以不會阻塞線程俗扇,而老的模式調(diào)用的操作花了多久,它在 Controller 里占了多久箕别,而新的模式它其實是一個惰性求值所以它這個 Controller 不會占用那么久铜幽,這就是新的 WebFlux 返回 Mono 的意義。

SSE ( Server-Sent Events)

我們知道 Flux 可以返回多次數(shù)據(jù)串稀,但是 http 協(xié)議是一問一答的形式除抛,它是做到如何多次返回的呢?實際上它用的就是 Html5 SSE

示例
@WebServlet("/sse")
public class SSE extends HttpServlet {
    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        doPost(req,resp);
    }

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        // 必須
        resp.setContentType("text/event-stream");
        resp.setCharacterEncoding("UTF-8");

        for (int i = 0; i < 5; i++) {
            // 格式 : data: + 數(shù)據(jù) + 2個回車
            resp.getWriter().write("data:"+i+"\n\n");
            resp.getWriter().flush();
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

結(jié)果


image.png

可以看到結(jié)果逐條返回

前端示例
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<script type="text/javascript">
    // 初始化母截,參數(shù)為 url
    // 依賴 H5 
    var sse = new EventSource("sse");
    sse.onmessage = function(e){
        console.log("message",e.data,e);
    }
</script>
<body>
    <H1>SSE</H1>
</body>
</html>

結(jié)果


image.png

SSE 有個特點會自動的重連到忽,所以會不停的輸出

  • 另一種寫法
    后端
            // 另一種寫法 指定 標(biāo)識世間
            resp.getWriter().write("event:me\n");

前段

    sse.addEventListener("me", function (e) {
        console.log("me event",e.data,e)
    });

結(jié)果


image.png

結(jié)果也是一樣,也會從新連接清寇,那么如何關(guān)閉呢喘漏?
就可以加一個判斷

sse.addEventListener("me", function (e) {
        console.log("me event",e.data,e)
        if(e.data == 3){
            sse.close();
        }
    });

WebFlux 示例(使用 mongodb 數(shù)據(jù)庫)

  • 添加 mongodb 數(shù)據(jù)庫依賴
        <!--  mongodb -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
        </dependency>
  • springboot 啟動類添加 mongodb 支持
@EnableReactiveMongoRepositories   // 開啟mongodb
  • 定義對象
@Document(collection = "user")
@Data
public class User {

    @Id
    private String id;
    private String name;
    private int age;
}
  • 創(chuàng)建倉庫
@Repository
public interface UserRepository extends ReactiveMongoRepository<User,String> {
}
  • 編寫 controller
@RestController
@RequestMapping("/user")
public class UserController {

    // 以前 mvn 注入在 WebFlux 中不推薦,官方推薦使用構(gòu)造方法注入
    /*@Autowired
    private UserRepository userRepository;*/

    private final UserRepository repository;

    // 官方推薦這種注入方式华烟,這種和 spring 的耦合度會更加的底
    public UserController(UserRepository repository){
        this.repository = repository;
    }

    @GetMapping("/")
    public Flux<User> getAll(){
        return repository.findAll();
    }

    /**
     * 這里推薦在返回 Flux ,就是返回多個數(shù)據(jù)的時候翩迈,我們都寫兩個方法
     * 第一個就是一次性返回的
     * 第二種就是 SSE 向流一樣的
     * @return
     */
    @GetMapping(value = "/stream/all",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<User> streamGetAll(){
        return repository.findAll();
    }
}
  • 啟動 mongodb
  • 調(diào)試


    image.png

    可以看到是通的

完整的 CRUD
@RestController
@RequestMapping("/user")
public class UserController {

    // 以前 mvn 注入在 WebFlux 中不推薦,官方推薦使用構(gòu)造方法注入
    /*@Autowired
    private UserRepository userRepository;*/

    private final UserRepository repository;

    // 官方推薦這種注入方式盔夜,這種和 spring 的耦合度會更加的底
    public UserController(UserRepository repository){
        this.repository = repository;
    }

    /**
     * 以數(shù)組形式 一次性返回數(shù)據(jù)
     * @return
     */
    @GetMapping("/")
    public Flux<User> getAll(){
        return repository.findAll();
    }

    /**
     * 這里推薦在返回 Flux ,就是返回多個數(shù)據(jù)的時候负饲,我們都寫兩個方法
     * 第一個就是一次性返回的
     * 第二種就是 SSE 向流一樣的
     * @return
     */
    @GetMapping(value = "/stream/all",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<User> streamGetAll(){
        return repository.findAll();
    }

    /**
     *
     * 添加
     * @param user
     * @return
     */
    @PostMapping("/")
    public Mono<User> createUser(@RequestBody User user){
        // 這里要注意 在 spring data jpa 里面
        // 新增和修改都是 save 方法堤魁,有 id 是修改,沒有 id 是新增
        user.setId(null);
        return this.repository.save(user);
    }

    /**
     * 刪除
     * @param id
     * @return
     */
    @DeleteMapping("/{id}")
    public Mono<ResponseEntity<Void>> deleteUser(@PathVariable("id")String id){
        // deleteById 方法是沒有返回值的返十,不能判斷數(shù)據(jù)是否存在
        // this.repository.deleteById(id)
        return this.repository.findById(id)
                // 當(dāng)你要操作數(shù)據(jù)妥泉,并且要返回一個 Mono 這個時候使用 flatMap
                // 如果不操作只是裝換數(shù)據(jù),使用 map
                .flatMap(user -> this.repository.delete(user)
                .then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK))))
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
    }

    /**
     * 修改數(shù)據(jù)洞坑、
     * 存在的時候返回 200 和修改的數(shù)據(jù)涛漂,不存在的時候返回 404
     * @param id
     * @param user
     * @return
     */
    @PutMapping("/{id}")
    public Mono<ResponseEntity<User>> updateUser(@PathVariable("id")String id,
                                                 @RequestBody User user){
                // 先根據(jù)用戶 id 查詢是否存在
        return this.repository.findById(id)
                // flatMap 操作數(shù)據(jù)
                .flatMap(u -> {
                    u.setAge(user.getAge());
                    u.setName(user.getName());
                    return this.repository.save(u);
                })
                // map : 轉(zhuǎn)換數(shù)據(jù)
                .map(u -> new ResponseEntity<User>(u,HttpStatus.OK))
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
    }


    /**
     * 根據(jù) id 查找用戶
     * 存在返回用戶信息,不存在返回 404
     * @param id
     * @return
     */
    @GetMapping("/{id}")
    public Mono<ResponseEntity<User>> findUserById(@PathVariable("id")String id){
        return this.repository.findById(id)
                // 存在返回用戶信息
                .map(u -> new ResponseEntity<User>(u,HttpStatus.OK))
                // 不存在返回 404
                .defaultIfEmpty(new ResponseEntity<User>(HttpStatus.NOT_FOUND));
    }
}

根據(jù)年齡區(qū)間查詢用戶信息

數(shù)據(jù)訪問層 Repository

    /**
     * 根據(jù)年齡查找用戶信息
     * @param start
     * @param end
     * @return
     */
    Flux<User> findByAgeBetween(int start,int end);

    @Query("{'age':{'$gte':20,'$lte':30}}")
    Flux<User> findUserBySQL();

controller 層

    /**
     * 根據(jù)年齡查找用戶(SSE 以流的形式返回)
     * @param start
     * @param end
     * @return
     */
    @GetMapping(value="/age/stream/{start}/{end}",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<User> streamFindUserByAge(@PathVariable("start")int start,
                                          @PathVariable("end")int end){
        return this.repository.findByAgeBetween(start,end);
    }

    /**
     * 手寫 sql
     * @return
     */
    @GetMapping(value="/age/sql")
    public Flux<User> findUserBySQL(){
        return this.repository.findUserBySQL();
    }
參數(shù)校驗

User 類加上检诗,校驗注解

@Document(collection = "user")
@Data
public class User {

    @Id
    private String id;
    @NotBlank
    private String name;
    // 年齡最小為 10 卵酪,最大為 100
    @Range(min = 10,max = 100)
    private int age;
}

為添加方法加上校驗注解

    /**
     *
     * 添加
     * @param user
     * @return
     */
    @PostMapping("/")
    public Mono<User> createUser(@RequestBody @Validated User user){
        // 這里要注意 在 spring data jpa 里面
        // 新增和修改都是 save 方法丈屹,有 id 是修改,沒有 id 是新增
        user.setId(null);
        return this.repository.save(user);
    }

編寫異常處理類

/**
 * 異常處理切面
 */
@ControllerAdvice
public class CheckAdvice {


    @ExceptionHandler(MethodArgumentNotValidException.class)
    public ResponseEntity<String> handleBindingException(MethodArgumentNotValidException e){
        return new ResponseEntity<String>(toStr(e), HttpStatus.BAD_REQUEST);
    }

    /**
     * 將異常轉(zhuǎn)換為 字符串
     * @param ex
     * @return
     */
    private String toStr(MethodArgumentNotValidException ex) {

        return ex.getBindingResult().getFieldErrors().stream()
                // 將異常轉(zhuǎn)換為字符串調(diào)用 map
                .map(e -> e.getField()+":"+e.getDefaultMessage())
                // 將數(shù)組轉(zhuǎn)換為字符串調(diào)用 reduce
                .reduce("",(s1,s2) -> s1+"\n"+s2);
    }

}

測試

{
    "name":"",
    "age":9
}
name:不能為空
age:需要在10和100之間
自定義異常(校驗用戶名是否合法)

編寫校驗邏輯類

public class CheckUtil {

    // 定義不允許的常量
    private static final String[] INVALID_NAMES = {"admin","administrator"};

    /**
     * 校驗用戶名是否合法,不合法拋出異常
     * @param value
     */
    public static void checkName(String value) {
        Stream.of(INVALID_NAMES).filter(name -> name.equalsIgnoreCase(value))
                // findAny 找到任何一個芬失, ifPresent 如果存在就拋出異常
                .findAny().ifPresent(v -> {
                    throw new CheckException("name", v);
        });
    }
}

為添加用戶方法添加校驗邏輯

    @PostMapping("/")
    public Mono<User> createUser(@RequestBody @Validated User user){
        // 這里要注意 在 spring data jpa 里面
        // 新增和修改都是 save 方法,有 id 是修改揖赴,沒有 id 是新增
        user.setId(null);

        // 檢查用戶名是否合法
        CheckUtil.checkName(user.getName());

        return this.repository.save(user);
    }

自定義異常類

@Data
public class CheckException extends RuntimeException {


    private String fieldName;  // 字段名
    private String fieldValue; // 錯誤內(nèi)容

    public CheckException(String fieldName, String fieldValue) {
        super();
        this.fieldName = fieldName;
        this.fieldValue = fieldValue;
    }

    public CheckException() {
        super();
    }

    public CheckException(String message) {
        super(message);
    }

    public CheckException(String message, Throwable cause) {
        super(message, cause);
    }


    public CheckException(Throwable cause) {
        super(cause);
    }


    protected CheckException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
        super(message, cause, enableSuppression, writableStackTrace);
    }
}

異常處理類中捕獲自定義異常

    /**
     * 用戶名校驗自定義異常
     * @param e
     * @return
     */
    @ExceptionHandler(CheckException.class)
    public ResponseEntity<String> handleBindingCheckNameException(CheckException e){
        return new ResponseEntity<String>(toStr(e),HttpStatus.BAD_REQUEST);
    }

    private String toStr(CheckException ex) {

        return ex.getFieldName()+":"+"錯誤的值"+ ex.getFieldValue();
    }

測試

{
    "name":"admin",
    "age":25
}
name:錯誤的值admin

使用 Router Functions 開發(fā)

WebFlux 可以運行在以前老的 Servlet 容器诡延,也可以運行在 Servlet 3.1 之后的容器,或者運行在 Netty 上面忙菠,那么他第一步要把這連個容器的一些共同點抽離出來

  • ServletRequest 對應(yīng) HttpServletRequet
  • ServletResponse 對應(yīng) HttpServletResponse
使用 Router Functions 開發(fā)一般需要以下幾步
  • 開發(fā) HandlerFunction ,這個 HandlerFunction 是輸入 ServletRequest 返回 ServletResponse
  • 接著要開發(fā) Router Funciton 它是把我們的請求 url 和 HandlerFunction 對應(yīng)起來
  • 接著我們會把 RouterFunction 包裝成 HttpHandler
  • 最后交給 Server 處理何鸡,這里的 Server 指的就是 Servlet3.1 之后的容器,或者 Netty
案例
  • 實體類
@Document(collection = "user")
@Data
public class User {

    @Id
    private String id;
    @NotBlank
    private String name;
    // 年齡最小為 10 牛欢,最大為 100
    @Range(min = 10,max = 100)
    private int age;
}
  • 數(shù)據(jù)訪問層
@Repository
public interface UserRepository extends ReactiveMongoRepository<User,String> {
    /**
     * 根據(jù)年齡查找用戶信息
     * @param start
     * @param end
     * @return
     */
    Flux<User> findByAgeBetween(int start, int end);

    @Query("{'age':{'$gte':20,'$lte':30}}")
    Flux<User> findUserBySQL();
}
  • HandlerFunction
@Component
public class UserHandler {

    private final UserRepository repository;

    public UserHandler(UserRepository repository){
        this.repository = repository;
    }

    /**
     * 得到所有用戶
     * @param request
     * @return
     */
    public Mono<ServerResponse> getAllUser(ServerRequest request){
        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
                .body(this.repository.findAll(), User.class);
    }

    /**
     * 創(chuàng)建用戶
     * @param request
     * @return
     */
    public Mono<ServerResponse> createUser(ServerRequest request){
        // 獲取用戶提交的數(shù)據(jù)
        Mono<User> userMono = request.bodyToMono(User.class);
        return userMono.flatMap(u -> {
            // 校驗用戶信息
            CheckUtil.checkName(u.getName());
            return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
                    .body(this.repository.save(u), User.class);
        });
    }

    public Mono<ServerResponse> deleteByUserId(ServerRequest request){
        // 獲取 rest 參數(shù) id
        // 相當(dāng)于獲取 url 中 @PathVariable("id)
        String id = request.pathVariable("id");
                // 查詢是否存在
        return this.repository.findById(id)
                // 如果存在刪除
                .flatMap(user -> this.repository.delete(user)
                 // 返回
                .then(ServerResponse.ok().build()))
                // 不存在
                .switchIfEmpty(ServerResponse.notFound().build());
    }
}
  • router Function 把 url 和 Handler Function 對應(yīng)起來
@Configuration
public class AllRouters {

    /***
     * spring-boot-starter-web 不能和 spring-boot-starter-webflux 同時引入否則轉(zhuǎn)發(fā)無效
     * @param userHandler
     * @return
     */
    @Bean
    RouterFunction<ServerResponse> userRouter(UserHandler userHandler){
        return RouterFunctions.nest(
                // 相當(dāng)于類上面的 @RequestMapping()
                RequestPredicates.path("/router/user"),
                // 得到所有用戶
                RouterFunctions.route(RequestPredicates.GET("/"), userHandler::getAllUser)
                // 創(chuàng)建用戶
                .andRoute(
                        RequestPredicates.POST("/").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
                        userHandler::createUser)
                // 刪除用戶
                .andRoute(RequestPredicates.DELETE("/{id}"), userHandler::deleteByUserId)

        );
    }
}
自定義異常校驗
  • 用戶名校驗類
public class CheckUtil {

    // 定義不允許的常量
    private static final String[] INVALID_NAMES = {"admin","administrator"};

    /**
     * 校驗用戶名是否合法骡男,不合法拋出異常
     * @param value
     */
    public static void checkName(String value) {
        Stream.of(INVALID_NAMES).filter(name -> name.equalsIgnoreCase(value))
                // findAny 找到任何一個, ifPresent 如果存在就拋出異常
                .findAny().ifPresent(v -> {
                    throw new CheckException("name", v);
        });
    }
}
  • 自定義異常類
@Data
public class CheckException extends RuntimeException {


    private String fieldName;  // 字段名
    private String fieldValue; // 錯誤內(nèi)容

    public CheckException(String fieldName, String fieldValue) {
        super();
        this.fieldName = fieldName;
        this.fieldValue = fieldValue;
    }

    public CheckException() {
        super();
    }

    public CheckException(String message) {
        super(message);
    }

    public CheckException(String message, Throwable cause) {
        super(message, cause);
    }


    public CheckException(Throwable cause) {
        super(cause);
    }


    protected CheckException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
        super(message, cause, enableSuppression, writableStackTrace);
    }
}
  • 統(tǒng)一異常處理
/**
 * 異常處理切面
 */
@Component
// 由于默認(rèn)里有多個異常處理傍睹,所以我們要把我們的異常處理器優(yōu)先級別調(diào)高
// 否則不會工作
@Order(-2)
public class CheckAdvice implements WebExceptionHandler {


    @Override
    public Mono<Void> handle(ServerWebExchange serverWebExchange, Throwable throwable) {
        ServerHttpResponse response = serverWebExchange.getResponse();
        // 設(shè)置響應(yīng)頭 400
        response.setStatusCode(HttpStatus.BAD_REQUEST);
        // 設(shè)置返回異常
        response.getHeaders().setContentType(MediaType.TEXT_PLAIN);
        // 異常信息
        String errorMsg = toStr(throwable);
        DataBuffer db = response.bufferFactory().wrap(errorMsg.getBytes());
        return response.writeWith(Mono.just(db));
    }

    private String toStr(Throwable ex) {
        // 已知異常
        if(ex instanceof CheckException){
            CheckException e = (CheckException) ex;
            return e.getFieldName()+": invalid value "+e.getFieldValue();
        }else{
            // 未知異常
            ex.printStackTrace();
            return ex.toString();
        }
    }
}

WebFlux 客戶端聲明式 RestClient 框架開發(fā)

添加依賴
        <!--  WebFlux -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <!--  lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
創(chuàng)建框架基礎(chǔ)類

創(chuàng)建注解隔盛,保存服務(wù)端地址信息

/**
 * 服務(wù)器相關(guān)的信息
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ApiServer {
    // 保存服務(wù)端請求接口
    String value() default "";
}

創(chuàng)建方法調(diào)用信息類

/**
 * 方法調(diào)用信息類
 */
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MethodInfo {

    /**
     * 請求url
     */
    private String url;
    /**
     * 請求方法
     */
    private HttpMethod method;
    /**
     * 請求參數(shù) (url 上)
     */
    private Map<String,Object> params;
    /**
     * 請求 body
     */
    private Mono<?> body;

    private Class<?> bodyElementType;

    /**
     * 返回是 flux 還是 mono
     */
    private boolean returnFlux;

    /**
     * 返回對象的類型
     */
    private Class<?> returnElementType;
}

創(chuàng)建服務(wù)器信息類

/**
 * 服務(wù)器信息類
 */
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ServerInfo {

    /**
     * 服務(wù)器 url
     */
    private String url;
}

創(chuàng)建動態(tài)代理類接口

/**
 * 創(chuàng)建代理類接口
 */
public interface ProxyCreator {

    /**
     * 創(chuàng)建代理類
     * @param type
     * @return
     */
    Object createProxy(Class<?> type);
}

創(chuàng)建請求類接口 Handler

/**
 * 請求調(diào)用 handler
 */
public interface RestHandler {
    /**
     * 初始化服務(wù)器信息
     * @param serverInfo
     */
    void init(ServerInfo serverInfo);

    /**
     * 調(diào)用 rest 請求,返回接口
     * @param methodInfo
     */
    Object invokeRest(MethodInfo methodInfo);
}

創(chuàng)建 jdk 動態(tài)代理類實現(xiàn)動態(tài)代理接口類

/**
 * 使用 jdk 動態(tài)代理實現(xiàn)代理類
 */
@Slf4j
public class JDKProxyCreator implements ProxyCreator {
    /**
     * 創(chuàng)建代理類
     *
     * @param type
     * @return
     */
    @Override
    public Object createProxy(Class<?> type) {

        log.info("createProxy:"+type);

        // 根據(jù)接口獲取得到 api 服務(wù)器信息
        ServerInfo serverInfo = extractServerInfo(type);

        log.info("serverInfo:"+serverInfo);
        // 給每一個代理類一個實現(xiàn)
        RestHandler restHandler = new WebClientRestHandler();

        // 初始化服務(wù)器信息(初始化 webclient)
        restHandler.init(serverInfo);

        return Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{type}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // 根據(jù)方法和參數(shù)得到調(diào)用信息
                MethodInfo methodInfo = extractMethodInfo(method,args);

                log.info("methodInfo:"+methodInfo);

                // 調(diào)用  rest
                return restHandler.invokeRest(methodInfo);
            }

            /**
             * 根據(jù)方法定義和調(diào)用參數(shù)得到調(diào)用的相關(guān)信息
             * @param method
             * @param args
             * @return
             */
            private MethodInfo extractMethodInfo(Method method, Object[] args) {

                MethodInfo methodInfo = new MethodInfo();

                // 得到請求 url 和 方法
                extractUrlAndMethod(method,methodInfo);

                // 得到請求的 param 和 body
                extractRequestParamAndBody(method,args,methodInfo);

                // 提取返回對象信息
                extractReturnInfo(method,methodInfo);

                return methodInfo;
            }

            /**
             * 提取返回對象信息
             * @param method
             * @param methodInfo
             */
            private void extractReturnInfo(Method method, MethodInfo methodInfo) {
                // 返回是  Flux 還是 Mono
                // isAssignableFrom 判斷類型是否是某個類的子類
                // instanceof 判斷實例是否是某個類的子類
                boolean isFlux = method.getReturnType().isAssignableFrom(Flux.class);
                methodInfo.setReturnFlux(isFlux);

                // 得到返回對象的實例類型
                Class<?> elementType = extractElementType(method.getGenericReturnType());
                methodInfo.setReturnElementType(elementType);
            }

            /**
             * 得到泛型類型的實際類型
             * @param genericReturnType
             * @return
             */
            private Class<?> extractElementType(Type genericReturnType) {
                Type[] actualTypeArguments = ((ParameterizedType) genericReturnType).getActualTypeArguments();
                return (Class<?>) actualTypeArguments[0];
            }

            /**
             * 得到請求的 param 和 body
             * @param method
             * @param args
             * @param methodInfo
             */
            private void extractRequestParamAndBody(Method method, Object[] args, MethodInfo methodInfo) {
                // 參數(shù)和值對應(yīng)的 map
                Map<String,Object> params = new LinkedHashMap<String, Object>();
                methodInfo.setParams(params);
                // 得到調(diào)用的參數(shù)或者 body
                Parameter[] parameters = method.getParameters();
                for (int i = 0; i < parameters.length; i++) {
                    // 參數(shù)是否帶 @PathVariabole 注解
                    PathVariable able = parameters[i].getAnnotation(PathVariable.class);
                    if(able != null){
                        params.put(able.value(), args[i]);
                    }

                    // 是否帶了 requestBody
                    RequestBody requestBody = parameters[i].getAnnotation(RequestBody.class);
                    if(requestBody != null){
                        methodInfo.setBody((Mono<?>)args[i]);

                        // 得到對象的實際類型
                        methodInfo.setBodyElementType(extractElementType(parameters[i].getParameterizedType()));
                    }
                }
            }

            /**
             * 得到請求的 url 和方法
             * @param method
             * @param methodInfo
             */
            private void extractUrlAndMethod(Method method,MethodInfo methodInfo){
                // 得到請求 url 和請求方法
                Annotation[] annotations = method.getAnnotations();
                for (Annotation annotation : annotations){
                    // GET
                    if(annotation instanceof GetMapping){
                        GetMapping getMapping = (GetMapping) annotation;
                        methodInfo.setUrl(getMapping.value()[0]);
                        methodInfo.setMethod(HttpMethod.GET);
                    }else if(annotation instanceof PostMapping){
                        // POST
                        PostMapping postMapping = (PostMapping) annotation;
                        methodInfo.setUrl(postMapping.value()[0]);
                        methodInfo.setMethod(HttpMethod.POST);
                    }else if(annotation instanceof DeleteMapping){
                        // DELETE
                        DeleteMapping deleteMapping = (DeleteMapping) annotation;
                        methodInfo.setUrl(deleteMapping.value()[0]);
                        methodInfo.setMethod(HttpMethod.DELETE);
                    }
                }
            }
        });
    }

    /**
     * 提取服務(wù)器信息
     * @param type
     * @return
     */
    private ServerInfo extractServerInfo(Class<?> type) {
        ServerInfo serverInfo = new ServerInfo();
        ApiServer apiServer = type.getAnnotation(ApiServer.class);
        // 設(shè)置服務(wù)器 url
        serverInfo.setUrl(apiServer.value());
        return serverInfo;
    }
}

創(chuàng)建用戶類

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class User {

    private String id;
    private String name;
    private int age;
}

創(chuàng)建服務(wù)端請求接口類(地址是上面 WebFlux 完整 CRUD 項目)

/**
 * 服務(wù)端請求接口
 */
@ApiServer("http://localhost:8080/user")
public interface IUserApi {

    @GetMapping("/")
    Flux<User> getAllUser();

    @GetMapping("/{id}")
    Mono<User> getUserById(@PathVariable("id")String id);

    @DeleteMapping("/{id}")
    Mono<Void> deleteUserById(@PathVariable("id")String id);

    @PostMapping("/")
    Mono<User> createUser(@RequestBody Mono<User> user);
}

創(chuàng)建請求調(diào)用接口類實現(xiàn)類拾稳,這里使用 WebClient

/**
 * WebClientRestHandler
 */
public class WebClientRestHandler implements RestHandler {

    private WebClient webClient;

    /**
     * 初始化 webClient
     * @param serverInfo
     */
    @Override
    public void init(ServerInfo serverInfo) {
        this.webClient = WebClient.create(serverInfo.getUrl());
    }

    /**
     * 處理 rest 請求
     * @param methodInfo
     */
    @Override
    public Object invokeRest(MethodInfo methodInfo) {
        //  返回結(jié)果
        Object result = null;

        WebClient.RequestBodySpec request = this.webClient
                // 請求方法類型
                .method(methodInfo.getMethod())
                // 請求 url
                // 不帶參數(shù)
                //.uri(methodInfo.getUrl())
                // 帶參數(shù)
                .uri(methodInfo.getUrl(), methodInfo.getParams())
                // 請求類型
                .accept(MediaType.APPLICATION_JSON);

                WebClient.ResponseSpec retrieve = null;

                // 發(fā)出請求
                // 判斷是否帶了 body
                if(methodInfo.getBody() != null){
                    retrieve = request.body(methodInfo.getBody(), methodInfo.getBodyElementType()).retrieve();
                }else{
                    retrieve = request.retrieve();
                }

                // 異常處理
                retrieve.onStatus(status -> status.value() == 404, response -> Mono.just(new RuntimeException("not found")));


                // 處理  body
                if(methodInfo.isReturnFlux()){
                    result = retrieve.bodyToFlux(methodInfo.getReturnElementType());
                }else{
                    result = retrieve.bodyToMono(methodInfo.getReturnElementType());
                }
                return result;
    }
}

創(chuàng)建配置類完成代理類初始化

@Configuration
public class ProxyConfiguration {
    /**
     * 創(chuàng)建jdk 工具類
     * @return
     */
    @Bean
    ProxyCreator jdkProxyCreator(){
        return new JDKProxyCreator();
    }

    @Bean
    FactoryBean<IUserApi> userApi(ProxyCreator proxyCreator){
        return new FactoryBean<IUserApi>() {
            // 返回代理對象
            @Override
            public IUserApi getObject() throws Exception {
                return (IUserApi) proxyCreator.createProxy(this.getObjectType());
            }

            @Override
            public Class<?> getObjectType() {
                return IUserApi.class;
            }
        };
    }
}

創(chuàng)建測試類 吮炕,測試 CRUD

@RestController
public class TestController {

    @Autowired
    private IUserApi userApi;

    @GetMapping("/")
    public void test(){
        // 獲取所有用戶信息
        //userApi.getAllUser().subscribe(System.out::println);

        // 根據(jù) id 查詢用戶信息
        String id = "5f2a8077432a9634dba7b65c";
        //userApi.getUserById(id).subscribe(System.out::println);

        // 創(chuàng)建用戶信息
        //userApi.createUser(Mono.just(User.builder().name("張四豐").age(18).build()))
        //.subscribe(System.out::println);

        // 修改用戶
        userApi.updateUser(id, Mono.just(User.builder().name("張五峰").age(19).build())).subscribe(System.out::println);
        userApi.getAllUser().subscribe(System.out::println);
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市访得,隨后出現(xiàn)的幾起案子龙亲,更是在濱河造成了極大的恐慌,老刑警劉巖悍抑,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件鳄炉,死亡現(xiàn)場離奇詭異,居然都是意外死亡传趾,警方通過查閱死者的電腦和手機(jī)迎膜,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來浆兰,“玉大人磕仅,你說我怎么就攤上這事珊豹。” “怎么了榕订?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵店茶,是天一觀的道長。 經(jīng)常有香客問我劫恒,道長贩幻,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任两嘴,我火速辦了婚禮丛楚,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘憔辫。我一直安慰自己趣些,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布贰您。 她就那樣靜靜地躺著坏平,像睡著了一般。 火紅的嫁衣襯著肌膚如雪锦亦。 梳的紋絲不亂的頭發(fā)上舶替,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天,我揣著相機(jī)與錄音杠园,去河邊找鬼顾瞪。 笑死,一個胖子當(dāng)著我的面吹牛返劲,可吹牛的內(nèi)容都是我干的玲昧。 我是一名探鬼主播,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼篮绿,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了吕漂?” 一聲冷哼從身側(cè)響起亲配,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎惶凝,沒想到半個月后吼虎,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡苍鲜,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年思灰,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片混滔。...
    茶點故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡洒疚,死狀恐怖歹颓,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情油湖,我是刑警寧澤巍扛,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站乏德,受9級特大地震影響撤奸,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜喊括,卻給世界環(huán)境...
    茶點故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一胧瓜、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧郑什,春花似錦贷痪、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至强胰,卻和暖如春舱沧,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背偶洋。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工熟吏, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人玄窝。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓牵寺,卻偏偏與公主長得像,于是被迫代替她去往敵國和親恩脂。 傳聞我的和親對象是個殘疾皇子帽氓,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,037評論 2 355