一、讀MySQL
1、通過(guò)JDBC方式定義MySQLDataSource類
1.1首先加入JDBC依賴
1.2定義JDBCInputFormat
1.3獲取Row類型的DataStreamSource
1.4轉(zhuǎn)化DataStream<Row>為DataStream<Student>
public class MysqlDataSource {
private static final Logger log = LoggerFactory.getLogger(MySQLDataSource.class);
public static DataStream<Student> readFromDb(StreamExecutionEnvironment streamExecutionEnvironment) throws Exception {
//final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.定義field 類型
TypeInformation[] fieldTypes = new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO};
//2.定義field name
String[] fieldNames = new String[]{"id", "name", "password", "age"};
//3.定義Row類型
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames);
String jdbcUrl = parameterTool.get(PropertiesConstants.MYSQL_JDBC_URL);
log.info(jdbcUrl);
//4.定義JDBCInputFormat
JDBCInputFormat jdbcInputFormat = JDBCInputFormat
.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl(jdbcUrl)
.setUsername(parameterTool.get(PropertiesConstants.MYSQL_USERNAME))
.setPassword(parameterTool.get(PropertiesConstants.MYSQL_PASSWORD))
.setQuery("select id, name, password, age from student")
.setRowTypeInfo(rowTypeInfo)
.finish();
//5.以JDBCInputFormat形式讀取MySQL DB數(shù)據(jù)
DataStreamSource<Row> dataStreamSourceRow = streamExecutionEnvironment.createInput(jdbcInputFormat);
//階段性驗(yàn)證可以正確讀取
dataStreamSourceRow.print();
//6.將Row類型Stream轉(zhuǎn)化為Entity類型
DataStream<Student> dataStream = dataStreamSourceRow.map(new RichMapFunction<Row, Student>() {
@Override
public Student map(Row value) throws Exception {
Student s = new Student();
s.setId((Integer) value.getField(0));
s.setName((String) value.getField(1));
s.setPassword((String) value.getField(2));
s.setAge((Integer) value.getField(3));
return s;
}
});
log.info("read datasource end");
return dataStream;
}
2鹅很、通過(guò)自定義DataSource方式
- 實(shí)現(xiàn)RichSourceFunction<T>接口,T設(shè)置DataStream數(shù)據(jù)類型
- 使用模板
- open()方法初始化全局使用數(shù)據(jù)(比如PrepareStatement等,可類比構(gòu)造函數(shù)或者junit的的@Before這些)
- run()方法
- 一般使用while循環(huán)不斷獲取數(shù)據(jù)
- while獲取的數(shù)據(jù)需要以流的形式發(fā)送出去麸锉,使用SourceContext.collect(yourData)就好
- 這里sourceContext收集(collect)的數(shù)據(jù)可以是單條(一條Student)也可是List<Student>集合流昏,使用集合要把RichSourceFunction<T>泛型設(shè)為L(zhǎng)ist<Student>
- cancel()方法用于停止while循環(huán),即停止獲取數(shù)據(jù)
/**
* 通過(guò)RichSourceFunction 返回DataStream<Student>類型數(shù)據(jù)流鬓梅,且每隔10s讀取一次MySQL DB
*/
public class JdbcReader2 extends RichSourceFunction<Student> {
private static final Logger logger = LoggerFactory.getLogger(JdbcReader2.class);
private Connection connection = null;
private PreparedStatement ps = null;
private volatile boolean isRunning = true;
//該方法主要用于打開數(shù)據(jù)庫(kù)連接供置,下面的ConfigKeys類是獲取配置的類
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
DriverManager.registerDriver(new Driver());
ParameterTool parameterTool = ExecutionEnvUtil.PARAMETER_TOOL;
String jdbcUrl = parameterTool.get(PropertiesConstants.MYSQL_JDBC_URL);
connection = DriverManager.getConnection(jdbcUrl, "root", "abc123456");//獲取連接
ps = connection.prepareStatement("select id, name, password, age, flag from student where flag='true'");
}
//執(zhí)行查詢并獲取結(jié)果
@Override
public void run(SourceContext<Student> ctx) throws Exception {
// List<Student> students = new ArrayList<>();
try {
while (isRunning) {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
Student student = new Student();
student.setId(resultSet.getInt(1));
student.setName(resultSet.getString(2));
student.setPassword(resultSet.getString(3));
student.setAge(resultSet.getInt(4));
String flag = resultSet.getString(5);
student.setFlag(flag);
if (Boolean.parseBoolean(flag)) {
//students.add(student);
//以單個(gè)Student為單位發(fā)送數(shù)據(jù)
ctx.collect(student);//發(fā)送結(jié)果
logger.info("student >>>>>>" + student);
}
}
Thread.sleep(1000 * 5);
}
} catch (Exception e) {
logger.error("runException:{}", e);
}
}
//關(guān)閉數(shù)據(jù)庫(kù)連接
@Override
public void cancel() {
try {
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
} catch (Exception e) {
logger.error("runException:{}", e);
}
isRunning = false;
}
}
3、兩種方式對(duì)比
- JDBC需要引入單獨(dú)的依賴绽快,自定義DataSource方式無(wú)特殊依賴
- JDBC不論讀還是寫只能處理批數(shù)據(jù)芥丧,RichSourceFunction還是付接口SourceFunction都是流式接口
二、寫MySQL
1坊罢、通過(guò)JDBC方式
Table API提供通過(guò)JDBC寫MySQL的方式
- 獲取Table(可以通過(guò)DataStream轉(zhuǎn)化而來(lái))-table
- 將table注冊(cè)到Environment(作為臨時(shí)view)-tempView
- 創(chuàng)建inner-dest-table->out-dest-table映射(inner-dest-table是flink內(nèi)部表续担,通過(guò)insert數(shù)據(jù)到inner-dest-table 將數(shù)據(jù)插入到out-dest-table中
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.添加數(shù)據(jù)源
DataStream<Student> studentDataStream = env.addSource(new JdbcReader2());
EnvironmentSettings settings = EnvironmentSettings.newInstance()
//.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env, settings);
//2.從DataStream獲取數(shù)據(jù)
Table table = streamTableEnvironment.fromDataStream(studentDataStream);
streamTableEnvironment.createTemporaryView("temp_table", table);
//3.創(chuàng)建sink內(nèi)部Table
String destSql = FileUtil.readSourceFile("destination.sql");
streamTableEnvironment.sqlUpdate(destSql);
//4.將內(nèi)部Table插入到outer system
String insertSql = FileUtil.readSourceFile("insert.sql");
streamTableEnvironment.sqlUpdate(insertSql);
env.execute("sort-streaming-data");
log.info("end");
- sql文件,保存到resources目錄,并用FileUtils讀然詈ⅰ(僅僅是外置SQL而已也可直接寫到代碼中)
#destination.sql
CREATE TABLE student_dest (
id INT,
name VARCHAR,
password VARCHAR,
age INT
) WITH (
'connector.type' = 'jdbc', -- 使用 jdbc connector
'connector.url' = 'jdbc:mysql://localhost:3306/flink_demo', -- jdbc url
'connector.table' = 'student_2', -- 表名
'connector.username' = 'root', -- 用戶名
'connector.password' = 'abc123456', -- 密碼
'connector.write.flush.max-rows' = '1' -- 默認(rèn)5000條物遇,為了演示改為1條
)
#insert.sql, temp_table為代碼臨時(shí)table
INSERT INTO student_dest
SELECT
id,
name,
password,
age
FROM temp_table
2、通過(guò)自定義Sink方式
- 通過(guò)實(shí)行RichSinkFunction接口
- 步驟(同RichSourceFunction是一致的)
- open() 初始化數(shù)據(jù)
- invoke() 每獲取一次數(shù)據(jù)將其處理存儲(chǔ)到outer system
- close() 清理及關(guān)閉資源
public class MySQLSink extends RichSinkFunction<Student> {
private static final Logger log = LoggerFactory.getLogger(MySQLSink.class);
PreparedStatement ps;
BasicDataSource dataSource;
private Connection connection;
/**
* open() 方法中建立連接憾儒,這樣不用每次 invoke 的時(shí)候都要建立連接和釋放連接
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
dataSource = new BasicDataSource();
connection = getConnection(dataSource);
String sql = "insert into student_2(id, name, password, age) values(?, ?, ?, ?);";
if (connection != null) {
ps = this.connection.prepareStatement(sql);
}
}
@Override
public void close() throws Exception {
super.close();
//關(guān)閉連接和釋放資源
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
/**
* 每條數(shù)據(jù)的插入都要調(diào)用一次 invoke() 方法
*
* @param value
* @param context
* @throws Exception
*/
@Override
public void invoke(Student value, Context context) throws Exception {
if (ps == null) {
return;
}
//遍歷數(shù)據(jù)集合
Student student = value;
//for (Student student : value) {
ps.setInt(1, student.getId());
ps.setString(2, student.getName());
ps.setString(3, student.getPassword());
ps.setInt(4, student.getAge());
ps.addBatch();
//}
int[] count = ps.executeBatch();//批量后執(zhí)行
log.info("成功了插入了 {} 行數(shù)據(jù)", count.length);
}
private static Connection getConnection(BasicDataSource dataSource) {
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
//注意询兴,替換成自己本地的 mysql 數(shù)據(jù)庫(kù)地址和用戶名、密碼
dataSource.setUrl("jdbc:mysql://localhost:3306/flink_demo");
dataSource.setUsername("root");
dataSource.setPassword("abc123456");
//設(shè)置連接池的一些參數(shù)
dataSource.setInitialSize(10);
dataSource.setMaxTotal(50);
dataSource.setMinIdle(2);
Connection con = null;
try {
con = dataSource.getConnection();
log.info("創(chuàng)建連接池:{}", con);
} catch (Exception e) {
log.error("-----------mysql get connection has exception , msg = {}", e.getMessage());
}
return con;
}
}
3航夺、jdbc和Sink方式對(duì)比
- JDBC是用sqlQuery()和sqlUpdate()來(lái)執(zhí)行所有查詢和insert/update更新操作蕉朵,只需要將創(chuàng)建table,insert語(yǔ)句用SQL整理到一起即可,通過(guò)insert語(yǔ)句插入out system