Sylph平臺自定義數(shù)據(jù)源hdfs數(shù)據(jù)源

Sylph 是一個一站式的大數(shù)據(jù)流計算平臺光涂,通過編譯Stream SQL,sylph會自動生成Apache Flink等分布式程序到Apache Yarn集群運行觉鼻。
Sylph地址:https://github.com/harbby/sylph/
以下開發(fā)基于Sylph 0.5.0版本
開發(fā)目標:由于當前Sylph提供的數(shù)據(jù)流接入類型僅有kafka及一個test類型盅惜,希望可以支持從hdfs接入數(shù)據(jù)流,因此需自定義一個hdfs的數(shù)據(jù)源
開發(fā)流程:數(shù)據(jù)源配置類-->>自定義數(shù)據(jù)源對象(繼承一個類诫咱,實現(xiàn)一個接口)-->>定義數(shù)據(jù)流Row的屬性類型-->>在run方法中讀取hdfs文件、發(fā)射數(shù)據(jù)流-->>加載數(shù)據(jù)源-->>定義job示例-->>重新編譯sylph-->>運行查看結果
注意點:定義數(shù)據(jù)源對象時繼承關系必須是implements Source<DataStream<org.apache.flink.types.
Row>>继找,據(jù)說從sylph0.6 alpha3開始還需要一個Plugin.class類遂跟,這個還沒研究逃沿,sylph0.5版本只有繼承關系的約束婴渡。約束必須指明schema,即定義public TypeInformation<Row> getProducedType()
以下是實現(xiàn)代碼凯亮,運行效果是:啟動job示例边臼,該job會定時讀取hdfs文件,將文件內容打印到console假消,格式為(隨機key柠并,文件內容的一行,事件時間)

數(shù)據(jù)源配置類:

package ideal.sylph.plugins.hdfs;

import ideal.sylph.annotation.Description;
import ideal.sylph.annotation.Name;
import ideal.sylph.etl.PluginConfig;

public class HdfsSourceConfig extends PluginConfig {
    private static final long serialVersionUID = 2L;

    private HdfsSourceConfig() {
    }

    @Name("fs.defaultFS")
    @Description("this is fs.defaultFS")
    private String fsAddress;

    @Name("hdfs_read_dir")
    @Description("this is the hdfs source file dir")
    private String hdfsFileDir;

    @Name("hdfs_time_interval")
    @Description("this is the hdfs source file update time interval (ms)")
    private Long hdfsTimeInterval;

    public String getFsAddress()
    {
        return fsAddress;
    }

    public String getHdfsFileDir()
    {
        return hdfsFileDir;
    }

    public Long getTimeInterval()
    {
        return hdfsTimeInterval;
    }
}

自定義數(shù)據(jù)源對象第一種實現(xiàn)方式:
指定一個hdfs上的文本文件富拗,隔一段時間讀取一次該文件的內容臼予,作為數(shù)據(jù)源

package ideal.sylph.plugins.hdfs;

import ideal.sylph.annotation.Description;
import ideal.sylph.annotation.Name;
import ideal.sylph.annotation.Version;
import ideal.sylph.etl.api.Source;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.shaded.guava18.com.google.common.base.Supplier;
import org.apache.flink.shaded.guava18.com.google.common.base.Suppliers;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.*;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import static java.util.Objects.requireNonNull;


@Name(value = "HdfsSource")
@Version("1.0.0")
@Description("this flink Hdfs source inputStream")
/**
 * (定時讀取)數(shù)據(jù)源為hdfs的一個文件啃沪,具體到文件名
 * job設置示例:
 * type = 'HdfsSource',
 fs.defaultFS = 'localhost:8020',
 hdfs_read_dir = '/data/test.txt',
 hdfs_time_interval = 10000
 */
public class HdfsSource1 implements Source<DataStream<Row>> {
    private static final long serialVersionUID = 2L;
    private final transient Supplier<DataStream<Row>> loadStream;

    private static final String[] HDFS_COLUMNS = new String[]{"key", "message", "event_time"};

    public HdfsSource1(StreamExecutionEnvironment execEnv, HdfsSourceConfig config) {
        requireNonNull(execEnv, "execEnv is null");
        requireNonNull(config, "config is null");
        // 加載數(shù)據(jù)源
//        this.loadStream = Suppliers.memoize(() -> execEnv.addSource(new MyDataSource()));
        this.loadStream = Suppliers.memoize(() -> this.createSource(execEnv, config));
    }

    public DataStream<Row> createSource(StreamExecutionEnvironment execEnv, HdfsSourceConfig config) {
        requireNonNull(execEnv, "execEnv is null");
        requireNonNull(config, "config is null");
        return execEnv.addSource(new MyDataSource(config));
    }

    @Override
    public DataStream<Row> getSource() {
        return loadStream.get();
    }

    /**
     * 自定義數(shù)據(jù)源
     **/
    public static class MyDataSource
            extends RichSourceFunction<Row>
            implements ResultTypeQueryable<Row> {

        /** Flag indicating whether the consumer is still running. */
        private volatile boolean running = true;
        private HdfsSourceConfig hdfsSourceConfig;

        private MyDataSource(HdfsSourceConfig hdfsSourceConfig) {
            this.hdfsSourceConfig = hdfsSourceConfig;
        }
        // DataStream 調用一次 run() 方法用來獲取數(shù)據(jù)
        @Override
        public void run(SourceContext<Row> ctx) throws Exception {
            Configuration conf = new Configuration();
            // 文件路徑及文件名
            String file = hdfsSourceConfig.getHdfsFileDir();
            // hdfs文件全路徑
            String hdfsPath = "hdfs://" + hdfsSourceConfig.getFsAddress() + file;
            conf.set("fs.defaultFS", hdfsPath);

            Random random = new Random();
            //表示數(shù)據(jù)已經(jīng)產(chǎn)生了 但是會有10秒以內的延遲
            long eventTime = System.currentTimeMillis() - random.nextInt(10 * 1000);

            while (running) {
                try {
                    FileSystem fileSystem = FileSystem.get(conf);

                    Path path = new Path(file);
                    if (!fileSystem.exists(path)) {
                        System.out.println("File " + file + " does not exists");
                        return;
                    }

                    // 打開hdfs上的文件
                    FSDataInputStream in = fileSystem.open(path);
                    // 按行讀取
                    BufferedReader bufferedReader = null;
                    String lineTxt = null;
                    bufferedReader = new BufferedReader(new InputStreamReader(in));

                    try {
                        while ((lineTxt = bufferedReader.readLine()) != null) {
                            // 逐行讀取文件粘拾,并按schema構造Row
                            Row row = Row.of("key" + random.nextInt(10), lineTxt, eventTime);
                            // 調用collect()來發(fā)射連續(xù)的數(shù)據(jù)流
                            ctx.collect(row);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        if (bufferedReader != null) {
                            try {
                                bufferedReader.close();
                                in.close();
                                fileSystem.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                    // 每隔一定時間重新讀取文件
                    TimeUnit.MILLISECONDS.sleep(hdfsSourceConfig.getTimeInterval());
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }
        }

        // Row的各個屬性的類型定義
        @Override
        public TypeInformation<Row> getProducedType() {
            TypeInformation<?>[] types = new TypeInformation<?>[]{
                    //createTypeInformation[String]
                    TypeExtractor.createTypeInfo(String.class),
                    TypeExtractor.createTypeInfo(String.class),
                    //createTypeInformation[long]
                    TypeExtractor.createTypeInfo(long.class)
            };

            // 指明schema  createTypeInformation[Row]
            RowTypeInfo rowTypeInfo = new RowTypeInfo(types, HDFS_COLUMNS);
            return rowTypeInfo;
        }

        @Override
        public void cancel() {
            running = false;
        }

        @Override
        public void close()
                throws Exception {
            this.cancel();
            super.close();
        }
    }
}

自定義數(shù)據(jù)源對象第二種實現(xiàn)方式:
指定一個hdfs上的目錄,隔一段時間讀取該目錄下所有文件的內容创千,作為數(shù)據(jù)源

package ideal.sylph.plugins.hdfs;

import ideal.sylph.annotation.Description;
import ideal.sylph.annotation.Name;
import ideal.sylph.annotation.Version;
import ideal.sylph.etl.api.Source;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.shaded.guava18.com.google.common.base.Supplier;
import org.apache.flink.shaded.guava18.com.google.common.base.Suppliers;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import static java.util.Objects.requireNonNull;

@Name(value = "HdfsSource2")
@Version("1.0.0")
@Description("this flink Hdfs source inputStream")
/**
 * (定時讀如止汀)數(shù)據(jù)源為hdfs的一個目錄下的所有文件入偷,未遞歸到二級目錄
 * job設置示例:
 * type = 'HdfsSource',
 fs.defaultFS = 'localhost:8020',
 hdfs_read_dir = '/data/',
 hdfs_time_interval = 10000
 */
public class HdfsSource2 implements Source<DataStream<Row>> {
    private static final long serialVersionUID = 2L;
    private final transient Supplier<DataStream<Row>> loadStream;

    private static final String[] HDFS_COLUMNS = new String[]{"key", "message", "event_time"};

    public HdfsSource2(StreamExecutionEnvironment execEnv, HdfsSourceConfig config) {
        requireNonNull(execEnv, "execEnv is null");
        requireNonNull(config, "config is null");
        // 加載數(shù)據(jù)源
//        this.loadStream = Suppliers.memoize(() -> execEnv.addSource(new MyDataSource()));
        this.loadStream = Suppliers.memoize(() -> this.createSource(execEnv, config));
    }

    public DataStream<Row> createSource(StreamExecutionEnvironment execEnv, HdfsSourceConfig config) {
        requireNonNull(execEnv, "execEnv is null");
        requireNonNull(config, "config is null");
        return execEnv.addSource(new HdfsSource2.MyDataSource(config));
    }

    @Override
    public DataStream<Row> getSource() {
        return loadStream.get();
    }

    /**
     * 自定義數(shù)據(jù)源
     **/
    public static class MyDataSource
            extends RichSourceFunction<Row>
            implements ResultTypeQueryable<Row> {

        /** Flag indicating whether the consumer is still running. */
        private volatile boolean running = true;
        private HdfsSourceConfig hdfsSourceConfig;

        private MyDataSource(HdfsSourceConfig hdfsSourceConfig) {
            this.hdfsSourceConfig = hdfsSourceConfig;
        }
        // DataStream 調用一次 run() 方法用來獲取數(shù)據(jù)
        @Override
        public void run(SourceContext<Row> ctx) throws Exception {
            Configuration conf = new Configuration();
            URI uri = URI.create("hdfs://" + hdfsSourceConfig.getFsAddress() );
            // 文件路徑
            String fileDir = hdfsSourceConfig.getHdfsFileDir();

            Random random = new Random();
            //表示數(shù)據(jù)已經(jīng)產(chǎn)生了 但是會有10秒以內的延遲
            long eventTime = System.currentTimeMillis() - random.nextInt(10 * 1000);

            while (running) {
                try {
                    // 循環(huán)讀取目錄下的文件
                    FileSystem fileSystem = FileSystem.get(uri, conf);
                    Path path = new Path(fileDir);
                    BufferedReader bufferedReader = null;
                    // 列出目錄下的所有文件
                    FileStatus[] files = fileSystem.globStatus(path);
                    for (FileStatus file : files) {
                        if (file.isDirectory()) {
                            System.out.println("這是文件夾");
                            System.out.println(file.getPath());
                            // 通過fs的listFiles方法可以自動實現(xiàn)遞歸(自帶遞歸)列出文件類型,返回的是一個遠程可迭代對象
                            // 需要傳入兩個參數(shù)械哟,第一個參數(shù)是服務器路徑疏之,第二個參數(shù)是否遞歸
                            RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(file.getPath(), false);
                            while (iterator.hasNext()) {
                                LocatedFileStatus fileStatus = iterator.next();
                                // 文件名
                                System.out.println(fileStatus.getPath().getName());
                                Path filePath = new Path(fileDir + fileStatus.getPath().getName());
                                // 打開hdfs上的文件
                                FSDataInputStream in = fileSystem.open(filePath);
                                // 按行讀取
                                String lineTxt = null;
                                bufferedReader = new BufferedReader(new InputStreamReader(in));

                                try {
                                    while ((lineTxt = bufferedReader.readLine()) != null) {
                                        // 逐行讀取文件,并按schema構造Row
                                        Row row = Row.of("key" + random.nextInt(10), lineTxt, eventTime);
                                        // 調用collect()來發(fā)射連續(xù)的數(shù)據(jù)流
                                        ctx.collect(row);
                                    }
                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                            }
                        } else if(file.isFile()){
                            System.out.println("這是文件");
                            System.out.println(file.getPath());
                            return;
                        } else if(file.isSymlink()) {
                            System.out.println("這是鏈接文件");
                            System.out.println(file.getPath());
                            return;
                        } else {
                            System.out.println("這是其他");
                            System.out.println(file.getPath());
                            return;
                        }
                    }
                    if (bufferedReader != null) {
                        try {
                            bufferedReader.close();
                            fileSystem.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                    // 每隔一定時間重新讀取文件
                    TimeUnit.MILLISECONDS.sleep(hdfsSourceConfig.getTimeInterval());
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }
        }

        // Row的各個屬性的類型定義
        @Override
        public TypeInformation<Row> getProducedType() {
            TypeInformation<?>[] types = new TypeInformation<?>[]{
                    //createTypeInformation[String]
                    TypeExtractor.createTypeInfo(String.class),
                    TypeExtractor.createTypeInfo(String.class),
                    //createTypeInformation[long]
                    TypeExtractor.createTypeInfo(long.class)
            };

            // 指明schema  createTypeInformation[Row]
            RowTypeInfo rowTypeInfo = new RowTypeInfo(types, HDFS_COLUMNS);
            return rowTypeInfo;
        }

        @Override
        public void cancel() {
            running = false;
        }

        @Override
        public void close()
                throws Exception {
            this.cancel();
            super.close();
        }
    }
}

定義一個job示例:
./sylph/sylph-dist/src/jobs/hdfs_test/job.flow

create function get_json_object as 'ideal.sylph.runner.flink.udf.UDFJson';

create source table topic1(
    key varchar,
    message varchar,
    event_time bigint
) with (
    type = 'HdfsSource',
    fs.defaultFS = 'localhost:8020',
    hdfs_read_dir = '/data/test.txt',
    hdfs_time_interval = 10000
);

-- 定義數(shù)據(jù)流輸出位置
create sink table event_log(
    key varchar,
    message varchar,
    event_time bigint
) with (
    type = 'console',   -- print console
    other = 'demo001'
);

insert into event_log
select key,message,event_time from topic1

./sylph/sylph-dist/src/jobs/hdfs_test/job.type

---
type: "StreamSql"
config:
  taskManagerMemoryMb: 2048
  taskManagerCount: 2
  taskManagerSlots: 2
  jobManagerMemoryMb: 1024
  checkpointInterval: -1
  checkpointTimeout: 600000
  parallelism: 4
  queue: "default"
  appTags:
  - "Sylph"
  - "Flink"
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末暇咆,一起剝皮案震驚了整個濱河市锋爪,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌爸业,老刑警劉巖几缭,帶你破解...
    沈念sama閱讀 219,188評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異沃呢,居然都是意外死亡年栓,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,464評論 3 395
  • 文/潘曉璐 我一進店門薄霜,熙熙樓的掌柜王于貴愁眉苦臉地迎上來某抓,“玉大人,你說我怎么就攤上這事惰瓜》窀保” “怎么了?”我有些...
    開封第一講書人閱讀 165,562評論 0 356
  • 文/不壞的土叔 我叫張陵崎坊,是天一觀的道長备禀。 經(jīng)常有香客問我,道長奈揍,這世上最難降的妖魔是什么曲尸? 我笑而不...
    開封第一講書人閱讀 58,893評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮男翰,結果婚禮上另患,老公的妹妹穿的比我還像新娘。我一直安慰自己蛾绎,他們只是感情好昆箕,可當我...
    茶點故事閱讀 67,917評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著租冠,像睡著了一般鹏倘。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上顽爹,一...
    開封第一講書人閱讀 51,708評論 1 305
  • 那天纤泵,我揣著相機與錄音,去河邊找鬼话原。 笑死夕吻,一個胖子當著我的面吹牛诲锹,可吹牛的內容都是我干的。 我是一名探鬼主播涉馅,決...
    沈念sama閱讀 40,430評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼归园,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了稚矿?” 一聲冷哼從身側響起庸诱,我...
    開封第一講書人閱讀 39,342評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎晤揣,沒想到半個月后桥爽,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,801評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡昧识,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,976評論 3 337
  • 正文 我和宋清朗相戀三年钠四,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片跪楞。...
    茶點故事閱讀 40,115評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡缀去,死狀恐怖,靈堂內的尸體忽然破棺而出甸祭,到底是詐尸還是另有隱情缕碎,我是刑警寧澤,帶...
    沈念sama閱讀 35,804評論 5 346
  • 正文 年R本政府宣布池户,位于F島的核電站咏雌,受9級特大地震影響,放射性物質發(fā)生泄漏校焦。R本人自食惡果不足惜赊抖,卻給世界環(huán)境...
    茶點故事閱讀 41,458評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望斟湃。 院中可真熱鬧熏迹,春花似錦檐薯、人聲如沸凝赛。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,008評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽墓猎。三九已至,卻和暖如春赚楚,著一層夾襖步出監(jiān)牢的瞬間毙沾,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,135評論 1 272
  • 我被黑心中介騙來泰國打工宠页, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留左胞,地道東北人寇仓。 一個月前我還...
    沈念sama閱讀 48,365評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像烤宙,于是被迫代替她去往敵國和親遍烦。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,055評論 2 355

推薦閱讀更多精彩內容

  • 【什么是大數(shù)據(jù)躺枕、大數(shù)據(jù)技術】 大數(shù)據(jù)服猪,又稱巨量資料,指的是所涉及的數(shù)據(jù)資料量規(guī)模巨大到無法在合理時間內通過傳統(tǒng)的應...
    kimibob閱讀 2,745評論 0 51
  • Zookeeper用于集群主備切換拐云。 YARN讓集群具備更好的擴展性罢猪。 Spark沒有存儲能力。 Spark的Ma...
    Yobhel閱讀 7,277評論 0 34
  • HDFS入門 hadoop架構 Hadoop 1.0中的資源管理方案 Hadoop 1.0指的是版本為Apache...
    依天立業(yè)閱讀 1,057評論 0 1
  • 婆婆養(yǎng)了一只母貓叉瘩,黑白相間的毛色膳帕,光溜溜的皮毛倒挺漂亮,給它叫個花麗的名字薇缅,“花麗”我嫁進門就養(yǎng)了幾年了备闲。因為打小...
    流云碧草閱讀 277評論 1 7
  • 又是四季輪轉,又是春暖花開捅暴,又是鑼鼓喧天……道口古會終于在盼望中恬砂,在期待中,在等待中蓬痒,款款向我們走來泻骤!…… 3...
    檸檬的清香521閱讀 329評論 0 1