flink sql自定義函數(shù) 多個參數(shù)調(diào)用http接口 并且實時調(diào)用

1. 使用通過flink sql 自動定義函數(shù)(udf)將實時接入的數(shù)據(jù)輸出到 http 接口當中 甲葬。

2. 環(huán)境

flink mysql 接口
flink 14.5 5.20 spring boot 接口

3. 環(huán)境

3.1 mysql 語句


CREATE TABLE `Flink_cdc` (
  `id` bigint(64) NOT NULL AUTO_INCREMENT,
  `name` varchar(64) DEFAULT NULL,
  `age` int(20) DEFAULT NULL,
  `birthday` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=73 DEFAULT CHARSET=utf8mb4;

/*Data for the table `Flink_cdc` */

insert  into `Flink_cdc`(`id`,`name`,`age`,`birthday`,`ts`) values 
(69,'flink',21,'2022-12-09 22:57:15','2022-12-09 22:57:17'),
(70,'flink sql',22,'2022-12-09 23:01:43','2022-12-09 23:01:46'),
(71,'flk sql',23,'2022-12-09 23:43:04','2022-12-09 23:43:07'),
(72,'hbase',55,'2023-04-05 16:40:38','2023-04-05 16:40:40');

4.flink sql 自定義函數(shù)

4.1 實現(xiàn)自定義函數(shù)接口, 上傳jar 包到flink lib 中 一定要重啟flink 然后注冊flink sql 的函數(shù)

4.2 自定義函數(shù)的實現(xiàn)

package com.wudl.flink.fun;

import com.alibaba.fastjson.JSON;
import com.wudl.flink.util.HttpClientUtils;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;

import java.util.Map;

/**
 * @Author: wudl
 * @Description:
 * @ClassName: MyHttpFunction
 * @Date: 2023/4/5 12:41
 */
public class MyHttpFunction extends ScalarFunction {

    public String eval(Map<String, String> map) {
        String str = JSON.toJSONString(map);
        String s = HttpClientUtils.doPostJson("http://192.168.244.1:8081/tUser/savedta", str);
        return s;
    }


}

4.2.1 需要的工具類

package com.wudl.flink.util;

import com.alibaba.fastjson.JSON;
import com.wudl.flink.bean.User;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @Author: wudl
 * @Description:
 * @ClassName: HttpClientUtils
 * @Date: 2023/4/5 12:54
 */
public class HttpClientUtils {
    public static void main(String[] args) {

//        String s = HttpClientUtils.doGet("http://localhost:8081/tUser/get/getList");
//        System.out.println(s);
        User  u = new User();
        u.setId(12l);
        u.setName("flinksql");
        String s = JSON.toJSONString(u);
        System.out.println(s);
        String s1 = HttpClientUtils.doPostJson("http://192.168.1.109:8081/tUser/savedta", s);
        System.out.println(s1);
    }

    public static String doGet(String url) {
        return doGet(url, null);
    }
    public static String doGet(String url, Map<String, String> params) {
        // 創(chuàng)建Httpclient對象
        CloseableHttpClient httpclient = HttpClients.createDefault();
        String result = "";
        CloseableHttpResponse response = null;
        try {
            // 創(chuàng)建uri
            URIBuilder builder = new URIBuilder(url);
            if (params != null) {
                for (String key : params.keySet()) {
                    builder.addParameter(key, params.get(key));
                }
            }
            URI uri = builder.build();
            // 創(chuàng)建http GET請求
            HttpGet httpGet = new HttpGet(uri);
            // 執(zhí)行請求
            response = httpclient.execute(httpGet);
            // 判斷返回狀態(tài)是否為200
            if (response.getStatusLine().getStatusCode() == 200) {
                result = EntityUtils.toString(response.getEntity(), "UTF-8");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (response != null) {
                    response.close();
                }
                httpclient.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return result;
    }
    public static String doPost(String url) {
        return doPost(url, null);
    }
    public static String doPost(String url, Map<String, String> params) {
        // 創(chuàng)建Httpclient對象
        CloseableHttpClient httpClient = HttpClients.createDefault();
        CloseableHttpResponse response = null;
        String result = "";
        try {
            // 創(chuàng)建Http Post請求
            HttpPost httpPost = new HttpPost(url);
            // 創(chuàng)建參數(shù)列表
            if (params != null) {
                List<NameValuePair> paramList = new ArrayList<>();
                for (String key : params.keySet()) {
                    paramList.add(new BasicNameValuePair(key, params.get(key)));
                }
                // 模擬表單
                UrlEncodedFormEntity entity = new UrlEncodedFormEntity(paramList,"utf-8");
                httpPost.setEntity(entity);
            }
            // 執(zhí)行http請求
            response = httpClient.execute(httpPost);
            result = EntityUtils.toString(response.getEntity(), "utf-8");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                response.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        return result;
    }
    public static String doPostJson(String url, String json) {
        // 創(chuàng)建Httpclient對象
        CloseableHttpClient httpClient = HttpClients.createDefault();
        CloseableHttpResponse response = null;
        String result = "";
        try {
            // 創(chuàng)建Http Post請求
            HttpPost httpPost = new HttpPost(url);
            // 創(chuàng)建請求內(nèi)容
            StringEntity entity = new StringEntity(json.toString(),"UTF-8");//解決中文亂碼問題
            entity.setContentEncoding("UTF-8");
            entity.setContentType("application/json");
            httpPost.setEntity(entity);
            // 執(zhí)行http請求
            response = httpClient.execute(httpPost);
            result = EntityUtils.toString(response.getEntity(), "utf-8");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                response.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        return result;
    }
}

4.2.2 需要的maven 環(huán)境

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.wudl.flink</groupId>
        <artifactId>Flink-learning</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>wudl-flink-14.5</artifactId>


    <!-- 指定倉庫位置,依次為aliyun镜雨、apache和cloudera倉庫 -->
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>apache</id>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>

        <repository>
            <id>spring-plugin</id>
            <url>https://repo.spring.io/plugins-release/</url>
        </repository>

    </repositories>

    <properties>
        <encoding>UTF-8</encoding>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <java.version>1.8</java.version>
        <scala.version>2.12</scala.version>
        <flink.version>1.14.5</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>commons-httpclient</groupId>
            <artifactId>commons-httpclient</artifactId>
            <version>3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.5</version>
        </dependency>


        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>




        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.12</artifactId>
            <version>1.10.3</version>
        </dependency>

        <!--依賴Scala語言-->


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- blink執(zhí)行計劃,1.11+默認的-->
<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-table-planner-blink_2.12</artifactId>-->
<!--            <version>${flink.version}</version>-->
<!--            <exclusions>-->
<!--                <exclusion>-->
<!--                    <artifactId>slf4j-api</artifactId>-->
<!--                    <groupId>org.slf4j</groupId>-->
<!--                </exclusion>-->
<!--            </exclusions>-->
<!--        </dependency>-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- flink連接器-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>



        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-metastore</artifactId>
            <version>2.1.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>hadoop-hdfs</artifactId>
                    <groupId>org.apache.hadoop</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>2.1.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.7.5-10.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.1.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
            <!--<version>8.0.20</version>-->
        </dependency>

        <!-- 高性能異步組件:Vertx-->
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-core</artifactId>
            <version>3.9.0</version>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-jdbc-client</artifactId>
            <version>3.9.0</version>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-redis-client</artifactId>
            <version>3.9.0</version>
        </dependency>

        <!-- 日志 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>


        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.2</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.1.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.3</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hbase_2.11</artifactId>
            <version>1.10.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

4.3 接口實現(xiàn)

@Api
@RestController
@RequestMapping("/tUser")
public class TUserController {

    @Autowired
    private TUserService tUserService;

    @PostMapping("/savedta")
    public  String saveData(@RequestBody TUser user)
    {
        System.out.println(user.toString());
        return user.toString();
    }

4.4需要注冊自定義函數(shù)---在flink sql client 中實現(xiàn)

CREATE FUNCTION myhttp AS 'com.wudl.flink.fun.MyHttpFunction';

4.5 執(zhí)行

 CREATE TABLE source_mysql (
  id BIGINT PRIMARY KEY NOT ENFORCED,
  name STRING,
  age INT,
  birthday TIMESTAMP(3),
  ts TIMESTAMP(3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.1.180',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'server-time-zone' = 'Asia/Shanghai',
'debezium.snapshot.mode' = 'initial',
'database-name' = 'test',
'table-name' = 'Flink_cdc'
);


CREATE TABLE output (
     t string
 ) WITH (
    'connector' = 'print'
 );


 insert into output  select myhttp(MAP['id',cast(id as string),'name',name])  from source_mysql;


6查看結果

flink sql-自定義函數(shù).png
接口.png

這樣就實時實現(xiàn)了 flink sql sql 實時調(diào)用接口函數(shù)了崖瞭。

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末洞辣,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子雕拼,更是在濱河造成了極大的恐慌,老刑警劉巖粘招,帶你破解...
    沈念sama閱讀 216,591評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件啥寇,死亡現(xiàn)場離奇詭異,居然都是意外死亡洒扎,警方通過查閱死者的電腦和手機辑甜,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來袍冷,“玉大人磷醋,你說我怎么就攤上這事『” “怎么了邓线?”我有些...
    開封第一講書人閱讀 162,823評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長煌恢。 經(jīng)常有香客問我骇陈,道長,這世上最難降的妖魔是什么瑰抵? 我笑而不...
    開封第一講書人閱讀 58,204評論 1 292
  • 正文 為了忘掉前任缩歪,我火速辦了婚禮,結果婚禮上谍憔,老公的妹妹穿的比我還像新娘匪蝙。我一直安慰自己,他們只是感情好习贫,可當我...
    茶點故事閱讀 67,228評論 6 388
  • 文/花漫 我一把揭開白布逛球。 她就那樣靜靜地躺著,像睡著了一般苫昌。 火紅的嫁衣襯著肌膚如雪颤绕。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,190評論 1 299
  • 那天,我揣著相機與錄音奥务,去河邊找鬼物独。 笑死,一個胖子當著我的面吹牛氯葬,可吹牛的內(nèi)容都是我干的挡篓。 我是一名探鬼主播,決...
    沈念sama閱讀 40,078評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼帚称,長吁一口氣:“原來是場噩夢啊……” “哼官研!你這毒婦竟也來了?” 一聲冷哼從身側響起闯睹,我...
    開封第一講書人閱讀 38,923評論 0 274
  • 序言:老撾萬榮一對情侶失蹤戏羽,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后楼吃,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體始花,經(jīng)...
    沈念sama閱讀 45,334評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,550評論 2 333
  • 正文 我和宋清朗相戀三年孩锡,在試婚紗的時候發(fā)現(xiàn)自己被綠了衙荐。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,727評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡浮创,死狀恐怖忧吟,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情斩披,我是刑警寧澤溜族,帶...
    沈念sama閱讀 35,428評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站垦沉,受9級特大地震影響煌抒,放射性物質發(fā)生泄漏。R本人自食惡果不足惜厕倍,卻給世界環(huán)境...
    茶點故事閱讀 41,022評論 3 326
  • 文/蒙蒙 一寡壮、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧讹弯,春花似錦况既、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,672評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至臭胜,卻和暖如春莫其,著一層夾襖步出監(jiān)牢的瞬間癞尚,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,826評論 1 269
  • 我被黑心中介騙來泰國打工乱陡, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留浇揩,地道東北人。 一個月前我還...
    沈念sama閱讀 47,734評論 2 368
  • 正文 我出身青樓憨颠,卻偏偏與公主長得像胳徽,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子烙心,可洞房花燭夜當晚...
    茶點故事閱讀 44,619評論 2 354

推薦閱讀更多精彩內(nèi)容