Mysql 流增量寫入 Hdfs(二) --Storm + hdfs 的流式處理

一. 概述

上一篇我們介紹了如何將數(shù)據(jù)從 mysql 拋到 kafka,這次我們就專注于利用 storm 將數(shù)據(jù)寫入到 hdfs 的過程凰荚,由于 storm 寫入 hdfs 的可定制東西有些多燃观,我們先不從 kafka 讀取,而先自己定義一個(gè) Spout 數(shù)據(jù)充當(dāng)數(shù)據(jù)源便瑟,下章再進(jìn)行整合缆毁。這里默認(rèn)你是擁有一定的 storm 知識(shí)的基礎(chǔ),起碼知道 Spout 和 bolt 是什么到涂。

寫入 hdfs 可以有以下的定制策略:

  1. 自定義寫入文件的名字
  2. 定義寫入內(nèi)容格式
  3. 滿足給定條件后更改寫入的文件
  4. 更改寫入文件時(shí)觸發(fā)的 Action

本篇會(huì)先說明如何用 storm 寫入 HDFS脊框,寫入過程一些 API 的描述颁督,以及最后給定一個(gè)例子:

storm 每接收到 10 個(gè) Tuple 后就會(huì)改變 hdfs 寫入文件,新文件的名字就是第幾次改變浇雹。

ps:storm 版本:1.1.1 沉御。Hadoop 版本:2.7.4 。

接下來我們首先看看 Storm 如何寫入 HDFS 昭灵。

二. Storm 寫入 HDFS

Storm 官方有提供了相應(yīng)的 API 讓我們可以使用吠裆。可以通過創(chuàng)建 HdfsBolt 以及定義相應(yīng)的規(guī)則烂完,即可寫入 HDFS 试疙。

首先通過 maven 配置依賴以及插件。


    <properties>
        <storm.version>1.1.1</storm.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${storm.version}</version>
            <!--<scope>provided</scope>-->
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>log4j-over-slf4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>commons-collections</groupId>
            <artifactId>commons-collections</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>15.0</version>
        </dependency>

        <!--hadoop模塊-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.4</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.4</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-hdfs -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-hdfs</artifactId>
            <version>1.1.1</version>
            <!--<scope>test</scope>-->
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.2.1</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>exec</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <executable>java</executable>
                    <includeProjectDependencies>true</includeProjectDependencies>
                    <includePluginDependencies>false</includePluginDependencies>
                    <classpathScope>compile</classpathScope>
                    <mainClass>com.learningstorm.kafka.KafkaTopology</mainClass>
                </configuration>
            </plugin>
   
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.7</version>
                <configuration>
                    <createDependencyReducedPom>true</createDependencyReducedPom>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

這里要提一下抠蚣,如果要打包部署到集群上的話效斑,打包的插件需要使用 maven-shade-plugin 這個(gè)插件,然后使用 maven Lifecycle 中的 package 打包柱徙。而不是用 Maven-assembly-plugin 插件進(jìn)行打包缓屠。

因?yàn)槭褂?Maven-assembly-plugin 的時(shí)候,會(huì)將所有依賴的包unpack护侮,然后在pack敌完,這樣就會(huì)出現(xiàn),同樣的文件被覆蓋的情況羊初。發(fā)布到集群上的時(shí)候就會(huì)報(bào) No FileSystem for scheme: hdfs 的錯(cuò) 滨溉。

然后是使用 HdfsBolt 寫入 Hdfs。這里來看看官方文檔中的例子吧长赞。

// 使用 "|" 來替代 ","晦攒,來進(jìn)行字符分割
RecordFormat format = new DelimitedRecordFormat()
        .withFieldDelimiter("|");

// 每輸入 1k 后將內(nèi)容同步到 Hdfs 中
SyncPolicy syncPolicy = new CountSyncPolicy(1000);

// 當(dāng)文件大小達(dá)到 5MB ,轉(zhuǎn)換寫入文件得哆,即寫入到一個(gè)新的文件中
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);

//當(dāng)轉(zhuǎn)換寫入文件時(shí)脯颜,生成新文件的名字并使用
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
        .withPath("/foo/");

HdfsBolt bolt = new HdfsBolt()
        .withFsUrl("hdfs://localhost:9000")
        .withFileNameFormat(fileNameFormat)
        .withRecordFormat(format)
        .withRotationPolicy(rotationPolicy)
        .withSyncPolicy(syncPolicy);

//生成該 bolt
topologyBuilder.setBolt("hdfsBolt", bolt, 5).globalGrouping("randomStrSpout");
        

到這里就結(jié)束了》肪荩可以將 HdfsBolt 當(dāng)作一個(gè) Storm 中特殊一些的 bolt 即可栋操。這個(gè) bolt 的作用即使根據(jù)接收信息寫入 Hdfs。

而在新建 HdfsBolt 中饱亮,Storm 為我們提供了相當(dāng)強(qiáng)的靈活性矾芙,我們可以定義一些策略,比如當(dāng)達(dá)成某個(gè)條件的時(shí)候轉(zhuǎn)換寫入文件近上,新寫入文件的名字剔宪,寫入時(shí)候的分隔符等等。

如果選擇使用的話,Storm 有提供部分接口供我們使用葱绒,但如果我們覺得不夠豐富也可以自定義相應(yīng)的類感帅。下面我們看看如何控制這些策略吧。

RecordFormat

這是一個(gè)接口哈街,允許你自由定義接收到內(nèi)容的格式留瞳。

public interface RecordFormat extends Serializable {
    byte[] format(Tuple tuple);
}

Storm 提供了 DelimitedRecordFormat 拒迅,使用方法在上面已經(jīng)有了骚秦。這個(gè)類默認(rèn)的分割符是逗號(hào)",",而你可以通過 withFieldDelimiter 方法改變分隔符璧微。
如果你的初始分隔符不是逗號(hào)的話作箍,那么也可以重寫寫一個(gè)類實(shí)現(xiàn) RecordFormat 接口即可。

FileNameFormat

同樣是一個(gè)接口前硫。

public interface FileNameFormat extends Serializable {
    void prepare(Map conf, TopologyContext topologyContext);
    String getName(long rotation, long timeStamp);
    String getPath();
}

Storm 所提供的默認(rèn)的是 org.apache.storm.hdfs.format.DefaultFileNameFormat 胞得。默認(rèn)人使用的轉(zhuǎn)換文件名有點(diǎn)長,格式是這樣的:

{prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}

例如:

MyBolt-5-7-1390579837830.txt

默認(rèn)情況下屹电,前綴是空的阶剑,擴(kuò)展標(biāo)識(shí)是".txt"。

SyncPolicy

同步策略允許你將 buffered data 緩沖到 Hdfs 文件中(從而client可以讀取數(shù)據(jù))危号,通過實(shí)現(xiàn)org.apache.storm.hdfs.sync.SyncPolicy 接口:

public interface SyncPolicy extends Serializable {
    boolean mark(Tuple tuple, long offset);
    void reset();
}

FileRotationPolicy

這個(gè)接口允許你控制什么情況下轉(zhuǎn)換寫入文件牧愁。

public interface FileRotationPolicy extends Serializable {
    boolean mark(Tuple tuple, long offset);
    void reset();
}

Storm 有提供三個(gè)實(shí)現(xiàn)該接口的類:

  • 最簡單的就是不進(jìn)行轉(zhuǎn)換的org.apache.storm.hdfs.bolt.rotation.NoRotationPolicy ,就是什么也不干外莲。

  • 通過文件大小觸發(fā)轉(zhuǎn)換的 org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy猪半。

  • 通過時(shí)間條件來觸發(fā)轉(zhuǎn)換的 org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy。

如果有更加復(fù)雜的需求也可以自己定義偷线。

RotationAction

這個(gè)主要是提供一個(gè)或多個(gè) hook 磨确,可加可不加。主要是在觸發(fā)寫入文件轉(zhuǎn)換的時(shí)候會(huì)啟動(dòng)声邦。

public interface RotationAction extends Serializable {
    void execute(FileSystem fileSystem, Path filePath) throws IOException;
}

三.實(shí)現(xiàn)一個(gè)例子

了解了上面的情況后乏奥,我們會(huì)實(shí)現(xiàn)一個(gè)例子,根據(jù)寫入記錄的多少來控制寫入轉(zhuǎn)換(改變寫入的文件)亥曹,并且轉(zhuǎn)換后文件的名字表示當(dāng)前是第幾次轉(zhuǎn)換英融。

首先來看看 HdfsBolt 的內(nèi)容:

        RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(" ");
        // sync the filesystem after every 1k tuples
        SyncPolicy syncPolicy = new CountSyncPolicy(1000);
//        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.KB);
        /** rotate file with Date,every month create a new file
         * format:yyyymm.txt
         */
        FileRotationPolicy rotationPolicy = new CountStrRotationPolicy();
        FileNameFormat fileNameFormat = new TimesFileNameFormat().withPath("/test/");
        RotationAction action = new NewFileAction();
        HdfsBolt bolt = new HdfsBolt()
                .withFsUrl("hdfs://127.0.0.1:9000")
                .withFileNameFormat(fileNameFormat)
                .withRecordFormat(format)
                .withRotationPolicy(rotationPolicy)
                .withSyncPolicy(syncPolicy)
                .addRotationAction(action);

然后分別來看各個(gè)策略的類。

FileRotationPolicy

import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.tuple.Tuple;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 計(jì)數(shù)以改變Hdfs寫入文件的位置歇式,當(dāng)寫入10次的時(shí)候驶悟,則更改寫入文件,更改名字取決于 “TimesFileNameFormat”
 * 這個(gè)類是線程安全
 */

public class CountStrRotationPolicy implements FileRotationPolicy {


    private SimpleDateFormat df = new SimpleDateFormat("yyyyMM");

    private String date =  null;

    private int count = 0;

    public CountStrRotationPolicy(){
        this.date =  df.format(new Date());
//        this.date = df.format(new Date());
    }


    /**
     * Called for every tuple the HdfsBolt executes.
     *
     * @param tuple  The tuple executed.
     * @param offset current offset of file being written
     * @return true if a file rotation should be performed
     */
    @Override
    public boolean mark(Tuple tuple, long offset) {
        count ++;
        if(count == 10) {
            System.out.print("num :" +count + "   ");
            count = 0;
            return true;

        }
        else {
            return false;
        }
    }

    /**
     * Called after the HdfsBolt rotates a file.
     */
    @Override
    public void reset() {

    }

    @Override
    public FileRotationPolicy copy() {
        return new CountStrRotationPolicy();
    }


}

FileNameFormat


import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.task.TopologyContext;

import java.util.Map;

/**
 * 決定重新寫入文件時(shí)候的名字
 * 這里會(huì)返回是第幾次轉(zhuǎn)換寫入文件材失,將這個(gè)第幾次做為文件名
 */
public class TimesFileNameFormat implements FileNameFormat {
    //默認(rèn)路徑
    private String path = "/storm";
    //默認(rèn)后綴
    private String extension = ".txt";
    private Long times = new Long(0);

    public TimesFileNameFormat withPath(String path){
        this.path = path;
        return this;
    }

    @Override
    public void prepare(Map conf, TopologyContext topologyContext) {
    }


    @Override
    public String getName(long rotation, long timeStamp) {
        times ++ ;
        //返回文件名痕鳍,文件名為更換寫入文件次數(shù)
        return times.toString() + this.extension;
    }

    public String getPath(){
        return this.path;
    }
}

RotationAction


import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.storm.hdfs.common.rotation.RotationAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
/**
    當(dāng)轉(zhuǎn)換寫入文件時(shí)候調(diào)用的 hook ,這里僅寫入日志。
 */
public class NewFileAction implements RotationAction {
    private static final Logger LOG = LoggerFactory.getLogger(NewFileAction.class);



    @Override
    public void execute(FileSystem fileSystem, Path filePath) throws IOException {
        LOG.info("Hdfs change the written fileA簟熊响!");

        return;
    }
}

OK,這樣就大功告成了诗赌。通過上面的代碼汗茄,每接收到 10 個(gè) Tuple 后就會(huì)轉(zhuǎn)換寫入文件,新文件的名字就是第幾次轉(zhuǎn)換铭若。

完整代碼包括一個(gè)隨機(jī)生成字符串的 Spout 洪碳,可以到我的 github 上查看。

StormHdfsDemo:https://github.com/shezhiming/StormHdfsDemo

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末叼屠,一起剝皮案震驚了整個(gè)濱河市瞳腌,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌镜雨,老刑警劉巖嫂侍,帶你破解...
    沈念sama閱讀 216,591評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異荚坞,居然都是意外死亡挑宠,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門颓影,熙熙樓的掌柜王于貴愁眉苦臉地迎上來各淀,“玉大人,你說我怎么就攤上這事瞭空【景ⅲ” “怎么了?”我有些...
    開封第一講書人閱讀 162,823評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵咆畏,是天一觀的道長南捂。 經(jīng)常有香客問我,道長旧找,這世上最難降的妖魔是什么溺健? 我笑而不...
    開封第一講書人閱讀 58,204評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮钮蛛,結(jié)果婚禮上鞭缭,老公的妹妹穿的比我還像新娘。我一直安慰自己魏颓,他們只是感情好岭辣,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,228評(píng)論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著甸饱,像睡著了一般沦童。 火紅的嫁衣襯著肌膚如雪仑濒。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,190評(píng)論 1 299
  • 那天偷遗,我揣著相機(jī)與錄音墩瞳,去河邊找鬼。 笑死氏豌,一個(gè)胖子當(dāng)著我的面吹牛喉酌,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播泵喘,決...
    沈念sama閱讀 40,078評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼泪电,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼!你這毒婦竟也來了涣旨?” 一聲冷哼從身側(cè)響起歪架,我...
    開封第一講書人閱讀 38,923評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤股冗,失蹤者是張志新(化名)和其女友劉穎霹陡,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體止状,經(jīng)...
    沈念sama閱讀 45,334評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡烹棉,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,550評(píng)論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了怯疤。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片浆洗。...
    茶點(diǎn)故事閱讀 39,727評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖集峦,靈堂內(nèi)的尸體忽然破棺而出伏社,到底是詐尸還是另有隱情,我是刑警寧澤塔淤,帶...
    沈念sama閱讀 35,428評(píng)論 5 343
  • 正文 年R本政府宣布摘昌,位于F島的核電站,受9級(jí)特大地震影響高蜂,放射性物質(zhì)發(fā)生泄漏聪黎。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,022評(píng)論 3 326
  • 文/蒙蒙 一备恤、第九天 我趴在偏房一處隱蔽的房頂上張望稿饰。 院中可真熱鬧,春花似錦露泊、人聲如沸喉镰。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,672評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽侣姆。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間铺敌,已是汗流浹背汇歹。 一陣腳步聲響...
    開封第一講書人閱讀 32,826評(píng)論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留偿凭,地道東北人产弹。 一個(gè)月前我還...
    沈念sama閱讀 47,734評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像弯囊,于是被迫代替她去往敵國和親痰哨。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,619評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容