1.前言
本論壇發(fā)送的所有內(nèi)容,都是筆者在自己的個人的筆記上優(yōu)化后謄寫而來浇坐,希望自己所擁有的知識能夠幫助更多有學(xué)習(xí)欲望的人勺拣。但值得一提的是,由于本人接觸行業(yè)時間有限,可能會出現(xiàn)一些技術(shù)上的紕漏熙尉,如果有問題歡迎私信联逻、評論指出,大家共同進步检痰。
2.簡單介紹
在Flink對數(shù)據(jù)進行計算的時候包归,一般會按照階段的不同,將處理過程分為 source->transform->sink
借此來完成數(shù)據(jù)從讀取到計算再到寫出的全過程铅歼。
本章節(jié)當(dāng)中要介紹的FlinkJDBC其實就是Sink階段的成員之一公壤,它能夠幫助Flink達成從數(shù)據(jù)流到存儲介質(zhì)保存的全過程(存儲介質(zhì)需要支持JDBC)。如果SINK方的這個存儲介質(zhì)支持XA事務(wù)的話椎椰,那么FlinkJDBC還能夠?qū)ζ涮峁┚珳?zhǔn)一次性語義厦幅。
3.FlinkJDBC使用
3.1 引入依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>1.15.0</version>
</dependency>
3.2 直接指定JDBCSink即可
這里以ClickHouse為例->本代碼可直接粘貼使用,因為是在文檔中手寫的,沒有用編輯器俭识,所以可能會有錯別單詞.
//1.聲明靜態(tài)方法
public static <T> SinkFunction<T> getJdbcSink(String sql){
return JdbcSink.<T>sink(
sql,
new JdbcStatementBuilder<T>(){
//這個方法主要是完成對sql語句中的數(shù)據(jù)內(nèi)容對PreparedStatement對象中占位符的賦值
@Overwrite
public void accept(PreparedStatement preparedStatement,T obj) throws SQLException{
//通過反射來完成賦值慨削,本段代碼結(jié)束之后有關(guān)于這部分內(nèi)容反射相關(guān)知識的介紹
Field[] declaredFields = obj.getClass().getDeclaredFields();
for(int i=0; i<declaredFields.length; i++){
Filed declaredField = declaredFields[i];
declaredField.setAccessible(true);
try{
Object value = declaredField.get(obj);
preparedStatement.setObject(i,value);
}catch(IllegalAccessException e){
e.printStackTrace();
}
}
}
},
JdbcExecutionOptions.bulider()
.withBatchIntervalMs(5000L) //指定多長時間發(fā)送一次
.withBatchSize(5) //指定攢夠多少條數(shù)發(fā)送一次
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver") //Drvier
.withUrl("jdbc:clickhouse://hadoop102:8123/table") //url
.build()
);
}
4.反射相關(guān)知識的描述
反射機制對于我來說,實際上就是一種能夠為使用者提供針對未知對象或者未知類來進行內(nèi)容讀取的一個功能套媚。這是我個人對于反射的淺顯理解缚态,如果有錯誤歡迎指正。接下來我就要用我所理解的內(nèi)容堤瘤,編寫一個簡單的例子玫芦,來解釋3.2程序段中accept方法是如何完成實體類對占位符進行賦值的過程。
4.1 反射的小例子
思路:主程序想要通過對一個方法傳入不同的實體類本辐,來獲得所有實體類中的所有屬性的字段信息桥帆。
準(zhǔn)備:主程序(用來調(diào)用方法)、兩個不同的實體類(用來對公共方法做驗證)慎皱、泛型方法(輸出實體類中的字段信息)
//實體類1
@Data
@AllArgsConstructor
public class Student {
//用來表示學(xué)生信息
private String name;
private String banji;
private String score;
}
//實體類2
@Data
@AllArgsConstructor
public class Teacher {
//用來表示老師信息
private String dept;
private String classHeader;
}
//泛型方法
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
public class ReadInfoMethod {
//通過傳入?yún)?shù)老虫,然后循環(huán)的將作為參數(shù)的對象中的所有屬性的值信息添加到數(shù)組中
public static <T> List<T> getInfo(T t){
Class<?> aClass = t.getClass();
Field[] declaredFields = aClass.getDeclaredFields();
ArrayList<T> result = new ArrayList<T>();
for (Field declaredField : declaredFields) {
int num = 0;
declaredField.setAccessible(true);
try {
T o = (T)declaredField.get(t);
result.add(num,o);
num++;
}catch (IllegalAccessException e){
e.printStackTrace();
}
}
return result;
}
}
//主程序
public class test {
public static void main(String[] args) {
Student student = new Student("弗林克", "三年二班", "95");
Teacher teacher = new Teacher("辦公室部門", "三年二班班主任");
//調(diào)用泛型方法,獲得傳入對象的所有屬性字段的值信息的列表
List<Student> info = ReadInfoMethod.getInfo(student);
List<Teacher> teachers = ReadInfoMethod.getInfo(teacher);
System.out.println(info);
System.out.println(teachers);
}
}
泛型方法中調(diào)用的方法的方式茫多,與FlinkJDBC中的accept方法中的內(nèi)容如出一轍祈匙。二者在表現(xiàn)形式上的區(qū)別就是accept方法在對占位符進行賦值的時候,需要指定對應(yīng)字段的索引位置天揖,因此 才有了preparedStatement.setObject(i,value);的這種方式夺欲。
Flink官網(wǎng)中針對這部分內(nèi)容進行描述的地址是:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/jdbc/