Sylph 是一個一站式的大數(shù)據(jù)流計算平臺光涂,通過編譯Stream SQL,sylph會自動生成Apache Flink等分布式程序到Apache Yarn集群運行觉鼻。
Sylph地址:https://github.com/harbby/sylph/
以下開發(fā)基于Sylph 0.5.0版本
開發(fā)目標:由于當前Sylph提供的數(shù)據(jù)流接入類型僅有kafka及一個test類型盅惜,希望可以支持從hdfs接入數(shù)據(jù)流,因此需自定義一個hdfs的數(shù)據(jù)源
開發(fā)流程:數(shù)據(jù)源配置類-->>自定義數(shù)據(jù)源對象(繼承一個類诫咱,實現(xiàn)一個接口)-->>定義數(shù)據(jù)流Row的屬性類型-->>在run方法中讀取hdfs文件、發(fā)射數(shù)據(jù)流-->>加載數(shù)據(jù)源-->>定義job示例-->>重新編譯sylph-->>運行查看結果
注意點:定義數(shù)據(jù)源對象時繼承關系必須是implements Source<DataStream<org.apache.flink.types.Row>>继找,據(jù)說從sylph0.6 alpha3開始還需要一個Plugin.class類遂跟,這個還沒研究逃沿,sylph0.5版本只有繼承關系的約束婴渡。約束必須指明schema,即定義public TypeInformation<Row> getProducedType()
以下是實現(xiàn)代碼凯亮,運行效果是:啟動job示例边臼,該job會定時讀取hdfs文件,將文件內容打印到console假消,格式為(隨機key柠并,文件內容的一行,事件時間)
數(shù)據(jù)源配置類:
package ideal.sylph.plugins.hdfs;
import ideal.sylph.annotation.Description;
import ideal.sylph.annotation.Name;
import ideal.sylph.etl.PluginConfig;
public class HdfsSourceConfig extends PluginConfig {
private static final long serialVersionUID = 2L;
private HdfsSourceConfig() {
}
@Name("fs.defaultFS")
@Description("this is fs.defaultFS")
private String fsAddress;
@Name("hdfs_read_dir")
@Description("this is the hdfs source file dir")
private String hdfsFileDir;
@Name("hdfs_time_interval")
@Description("this is the hdfs source file update time interval (ms)")
private Long hdfsTimeInterval;
public String getFsAddress()
{
return fsAddress;
}
public String getHdfsFileDir()
{
return hdfsFileDir;
}
public Long getTimeInterval()
{
return hdfsTimeInterval;
}
}
自定義數(shù)據(jù)源對象第一種實現(xiàn)方式:
指定一個hdfs上的文本文件富拗,隔一段時間讀取一次該文件的內容臼予,作為數(shù)據(jù)源
package ideal.sylph.plugins.hdfs;
import ideal.sylph.annotation.Description;
import ideal.sylph.annotation.Name;
import ideal.sylph.annotation.Version;
import ideal.sylph.etl.api.Source;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.shaded.guava18.com.google.common.base.Supplier;
import org.apache.flink.shaded.guava18.com.google.common.base.Suppliers;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.*;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import static java.util.Objects.requireNonNull;
@Name(value = "HdfsSource")
@Version("1.0.0")
@Description("this flink Hdfs source inputStream")
/**
* (定時讀取)數(shù)據(jù)源為hdfs的一個文件啃沪,具體到文件名
* job設置示例:
* type = 'HdfsSource',
fs.defaultFS = 'localhost:8020',
hdfs_read_dir = '/data/test.txt',
hdfs_time_interval = 10000
*/
public class HdfsSource1 implements Source<DataStream<Row>> {
private static final long serialVersionUID = 2L;
private final transient Supplier<DataStream<Row>> loadStream;
private static final String[] HDFS_COLUMNS = new String[]{"key", "message", "event_time"};
public HdfsSource1(StreamExecutionEnvironment execEnv, HdfsSourceConfig config) {
requireNonNull(execEnv, "execEnv is null");
requireNonNull(config, "config is null");
// 加載數(shù)據(jù)源
// this.loadStream = Suppliers.memoize(() -> execEnv.addSource(new MyDataSource()));
this.loadStream = Suppliers.memoize(() -> this.createSource(execEnv, config));
}
public DataStream<Row> createSource(StreamExecutionEnvironment execEnv, HdfsSourceConfig config) {
requireNonNull(execEnv, "execEnv is null");
requireNonNull(config, "config is null");
return execEnv.addSource(new MyDataSource(config));
}
@Override
public DataStream<Row> getSource() {
return loadStream.get();
}
/**
* 自定義數(shù)據(jù)源
**/
public static class MyDataSource
extends RichSourceFunction<Row>
implements ResultTypeQueryable<Row> {
/** Flag indicating whether the consumer is still running. */
private volatile boolean running = true;
private HdfsSourceConfig hdfsSourceConfig;
private MyDataSource(HdfsSourceConfig hdfsSourceConfig) {
this.hdfsSourceConfig = hdfsSourceConfig;
}
// DataStream 調用一次 run() 方法用來獲取數(shù)據(jù)
@Override
public void run(SourceContext<Row> ctx) throws Exception {
Configuration conf = new Configuration();
// 文件路徑及文件名
String file = hdfsSourceConfig.getHdfsFileDir();
// hdfs文件全路徑
String hdfsPath = "hdfs://" + hdfsSourceConfig.getFsAddress() + file;
conf.set("fs.defaultFS", hdfsPath);
Random random = new Random();
//表示數(shù)據(jù)已經(jīng)產(chǎn)生了 但是會有10秒以內的延遲
long eventTime = System.currentTimeMillis() - random.nextInt(10 * 1000);
while (running) {
try {
FileSystem fileSystem = FileSystem.get(conf);
Path path = new Path(file);
if (!fileSystem.exists(path)) {
System.out.println("File " + file + " does not exists");
return;
}
// 打開hdfs上的文件
FSDataInputStream in = fileSystem.open(path);
// 按行讀取
BufferedReader bufferedReader = null;
String lineTxt = null;
bufferedReader = new BufferedReader(new InputStreamReader(in));
try {
while ((lineTxt = bufferedReader.readLine()) != null) {
// 逐行讀取文件粘拾,并按schema構造Row
Row row = Row.of("key" + random.nextInt(10), lineTxt, eventTime);
// 調用collect()來發(fā)射連續(xù)的數(shù)據(jù)流
ctx.collect(row);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (bufferedReader != null) {
try {
bufferedReader.close();
in.close();
fileSystem.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 每隔一定時間重新讀取文件
TimeUnit.MILLISECONDS.sleep(hdfsSourceConfig.getTimeInterval());
} catch (Exception e) {
e.printStackTrace();
}
}
}
// Row的各個屬性的類型定義
@Override
public TypeInformation<Row> getProducedType() {
TypeInformation<?>[] types = new TypeInformation<?>[]{
//createTypeInformation[String]
TypeExtractor.createTypeInfo(String.class),
TypeExtractor.createTypeInfo(String.class),
//createTypeInformation[long]
TypeExtractor.createTypeInfo(long.class)
};
// 指明schema createTypeInformation[Row]
RowTypeInfo rowTypeInfo = new RowTypeInfo(types, HDFS_COLUMNS);
return rowTypeInfo;
}
@Override
public void cancel() {
running = false;
}
@Override
public void close()
throws Exception {
this.cancel();
super.close();
}
}
}
自定義數(shù)據(jù)源對象第二種實現(xiàn)方式:
指定一個hdfs上的目錄,隔一段時間讀取該目錄下所有文件的內容创千,作為數(shù)據(jù)源
package ideal.sylph.plugins.hdfs;
import ideal.sylph.annotation.Description;
import ideal.sylph.annotation.Name;
import ideal.sylph.annotation.Version;
import ideal.sylph.etl.api.Source;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.shaded.guava18.com.google.common.base.Supplier;
import org.apache.flink.shaded.guava18.com.google.common.base.Suppliers;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import static java.util.Objects.requireNonNull;
@Name(value = "HdfsSource2")
@Version("1.0.0")
@Description("this flink Hdfs source inputStream")
/**
* (定時讀如止汀)數(shù)據(jù)源為hdfs的一個目錄下的所有文件入偷,未遞歸到二級目錄
* job設置示例:
* type = 'HdfsSource',
fs.defaultFS = 'localhost:8020',
hdfs_read_dir = '/data/',
hdfs_time_interval = 10000
*/
public class HdfsSource2 implements Source<DataStream<Row>> {
private static final long serialVersionUID = 2L;
private final transient Supplier<DataStream<Row>> loadStream;
private static final String[] HDFS_COLUMNS = new String[]{"key", "message", "event_time"};
public HdfsSource2(StreamExecutionEnvironment execEnv, HdfsSourceConfig config) {
requireNonNull(execEnv, "execEnv is null");
requireNonNull(config, "config is null");
// 加載數(shù)據(jù)源
// this.loadStream = Suppliers.memoize(() -> execEnv.addSource(new MyDataSource()));
this.loadStream = Suppliers.memoize(() -> this.createSource(execEnv, config));
}
public DataStream<Row> createSource(StreamExecutionEnvironment execEnv, HdfsSourceConfig config) {
requireNonNull(execEnv, "execEnv is null");
requireNonNull(config, "config is null");
return execEnv.addSource(new HdfsSource2.MyDataSource(config));
}
@Override
public DataStream<Row> getSource() {
return loadStream.get();
}
/**
* 自定義數(shù)據(jù)源
**/
public static class MyDataSource
extends RichSourceFunction<Row>
implements ResultTypeQueryable<Row> {
/** Flag indicating whether the consumer is still running. */
private volatile boolean running = true;
private HdfsSourceConfig hdfsSourceConfig;
private MyDataSource(HdfsSourceConfig hdfsSourceConfig) {
this.hdfsSourceConfig = hdfsSourceConfig;
}
// DataStream 調用一次 run() 方法用來獲取數(shù)據(jù)
@Override
public void run(SourceContext<Row> ctx) throws Exception {
Configuration conf = new Configuration();
URI uri = URI.create("hdfs://" + hdfsSourceConfig.getFsAddress() );
// 文件路徑
String fileDir = hdfsSourceConfig.getHdfsFileDir();
Random random = new Random();
//表示數(shù)據(jù)已經(jīng)產(chǎn)生了 但是會有10秒以內的延遲
long eventTime = System.currentTimeMillis() - random.nextInt(10 * 1000);
while (running) {
try {
// 循環(huán)讀取目錄下的文件
FileSystem fileSystem = FileSystem.get(uri, conf);
Path path = new Path(fileDir);
BufferedReader bufferedReader = null;
// 列出目錄下的所有文件
FileStatus[] files = fileSystem.globStatus(path);
for (FileStatus file : files) {
if (file.isDirectory()) {
System.out.println("這是文件夾");
System.out.println(file.getPath());
// 通過fs的listFiles方法可以自動實現(xiàn)遞歸(自帶遞歸)列出文件類型,返回的是一個遠程可迭代對象
// 需要傳入兩個參數(shù)械哟,第一個參數(shù)是服務器路徑疏之,第二個參數(shù)是否遞歸
RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(file.getPath(), false);
while (iterator.hasNext()) {
LocatedFileStatus fileStatus = iterator.next();
// 文件名
System.out.println(fileStatus.getPath().getName());
Path filePath = new Path(fileDir + fileStatus.getPath().getName());
// 打開hdfs上的文件
FSDataInputStream in = fileSystem.open(filePath);
// 按行讀取
String lineTxt = null;
bufferedReader = new BufferedReader(new InputStreamReader(in));
try {
while ((lineTxt = bufferedReader.readLine()) != null) {
// 逐行讀取文件,并按schema構造Row
Row row = Row.of("key" + random.nextInt(10), lineTxt, eventTime);
// 調用collect()來發(fā)射連續(xù)的數(shù)據(jù)流
ctx.collect(row);
}
} catch (Exception e) {
e.printStackTrace();
}
}
} else if(file.isFile()){
System.out.println("這是文件");
System.out.println(file.getPath());
return;
} else if(file.isSymlink()) {
System.out.println("這是鏈接文件");
System.out.println(file.getPath());
return;
} else {
System.out.println("這是其他");
System.out.println(file.getPath());
return;
}
}
if (bufferedReader != null) {
try {
bufferedReader.close();
fileSystem.close();
} catch (IOException e) {
e.printStackTrace();
}
}
// 每隔一定時間重新讀取文件
TimeUnit.MILLISECONDS.sleep(hdfsSourceConfig.getTimeInterval());
} catch (Exception e) {
e.printStackTrace();
}
}
}
// Row的各個屬性的類型定義
@Override
public TypeInformation<Row> getProducedType() {
TypeInformation<?>[] types = new TypeInformation<?>[]{
//createTypeInformation[String]
TypeExtractor.createTypeInfo(String.class),
TypeExtractor.createTypeInfo(String.class),
//createTypeInformation[long]
TypeExtractor.createTypeInfo(long.class)
};
// 指明schema createTypeInformation[Row]
RowTypeInfo rowTypeInfo = new RowTypeInfo(types, HDFS_COLUMNS);
return rowTypeInfo;
}
@Override
public void cancel() {
running = false;
}
@Override
public void close()
throws Exception {
this.cancel();
super.close();
}
}
}
定義一個job示例:
./sylph/sylph-dist/src/jobs/hdfs_test/job.flow
create function get_json_object as 'ideal.sylph.runner.flink.udf.UDFJson';
create source table topic1(
key varchar,
message varchar,
event_time bigint
) with (
type = 'HdfsSource',
fs.defaultFS = 'localhost:8020',
hdfs_read_dir = '/data/test.txt',
hdfs_time_interval = 10000
);
-- 定義數(shù)據(jù)流輸出位置
create sink table event_log(
key varchar,
message varchar,
event_time bigint
) with (
type = 'console', -- print console
other = 'demo001'
);
insert into event_log
select key,message,event_time from topic1
./sylph/sylph-dist/src/jobs/hdfs_test/job.type
---
type: "StreamSql"
config:
taskManagerMemoryMb: 2048
taskManagerCount: 2
taskManagerSlots: 2
jobManagerMemoryMb: 1024
checkpointInterval: -1
checkpointTimeout: 600000
parallelism: 4
queue: "default"
appTags:
- "Sylph"
- "Flink"