一. 概述
上一篇我們介紹了如何將數(shù)據(jù)從 mysql 拋到 kafka,這次我們就專注于利用 storm 將數(shù)據(jù)寫入到 hdfs 的過程凰荚,由于 storm 寫入 hdfs 的可定制東西有些多燃观,我們先不從 kafka 讀取,而先自己定義一個(gè) Spout 數(shù)據(jù)充當(dāng)數(shù)據(jù)源便瑟,下章再進(jìn)行整合缆毁。這里默認(rèn)你是擁有一定的 storm 知識(shí)的基礎(chǔ),起碼知道 Spout 和 bolt 是什么到涂。
寫入 hdfs 可以有以下的定制策略:
- 自定義寫入文件的名字
- 定義寫入內(nèi)容格式
- 滿足給定條件后更改寫入的文件
- 更改寫入文件時(shí)觸發(fā)的 Action
本篇會(huì)先說明如何用 storm 寫入 HDFS脊框,寫入過程一些 API 的描述颁督,以及最后給定一個(gè)例子:
storm 每接收到 10 個(gè) Tuple 后就會(huì)改變 hdfs 寫入文件,新文件的名字就是第幾次改變浇雹。
ps:storm 版本:1.1.1 沉御。Hadoop 版本:2.7.4 。
接下來我們首先看看 Storm 如何寫入 HDFS 昭灵。
二. Storm 寫入 HDFS
Storm 官方有提供了相應(yīng)的 API 讓我們可以使用吠裆。可以通過創(chuàng)建 HdfsBolt 以及定義相應(yīng)的規(guī)則烂完,即可寫入 HDFS 试疙。
首先通過 maven 配置依賴以及插件。
<properties>
<storm.version>1.1.1</storm.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<!--<scope>provided</scope>-->
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>15.0</version>
</dependency>
<!--hadoop模塊-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.4</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.4</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-hdfs -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hdfs</artifactId>
<version>1.1.1</version>
<!--<scope>test</scope>-->
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>com.learningstorm.kafka.KafkaTopology</mainClass>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.7</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
這里要提一下抠蚣,如果要打包部署到集群上的話效斑,打包的插件需要使用 maven-shade-plugin 這個(gè)插件,然后使用 maven Lifecycle 中的 package 打包柱徙。而不是用 Maven-assembly-plugin 插件進(jìn)行打包缓屠。
因?yàn)槭褂?Maven-assembly-plugin 的時(shí)候,會(huì)將所有依賴的包unpack护侮,然后在pack敌完,這樣就會(huì)出現(xiàn),同樣的文件被覆蓋的情況羊初。發(fā)布到集群上的時(shí)候就會(huì)報(bào) No FileSystem for scheme: hdfs 的錯(cuò) 滨溉。
然后是使用 HdfsBolt 寫入 Hdfs。這里來看看官方文檔中的例子吧长赞。
// 使用 "|" 來替代 ","晦攒,來進(jìn)行字符分割
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|");
// 每輸入 1k 后將內(nèi)容同步到 Hdfs 中
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// 當(dāng)文件大小達(dá)到 5MB ,轉(zhuǎn)換寫入文件得哆,即寫入到一個(gè)新的文件中
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
//當(dāng)轉(zhuǎn)換寫入文件時(shí)脯颜,生成新文件的名字并使用
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/foo/");
HdfsBolt bolt = new HdfsBolt()
.withFsUrl("hdfs://localhost:9000")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
//生成該 bolt
topologyBuilder.setBolt("hdfsBolt", bolt, 5).globalGrouping("randomStrSpout");
到這里就結(jié)束了》肪荩可以將 HdfsBolt 當(dāng)作一個(gè) Storm 中特殊一些的 bolt 即可栋操。這個(gè) bolt 的作用即使根據(jù)接收信息寫入 Hdfs。
而在新建 HdfsBolt 中饱亮,Storm 為我們提供了相當(dāng)強(qiáng)的靈活性矾芙,我們可以定義一些策略,比如當(dāng)達(dá)成某個(gè)條件的時(shí)候轉(zhuǎn)換寫入文件近上,新寫入文件的名字剔宪,寫入時(shí)候的分隔符等等。
如果選擇使用的話,Storm 有提供部分接口供我們使用葱绒,但如果我們覺得不夠豐富也可以自定義相應(yīng)的類感帅。下面我們看看如何控制這些策略吧。
RecordFormat
這是一個(gè)接口哈街,允許你自由定義接收到內(nèi)容的格式留瞳。
public interface RecordFormat extends Serializable {
byte[] format(Tuple tuple);
}
Storm 提供了 DelimitedRecordFormat 拒迅,使用方法在上面已經(jīng)有了骚秦。這個(gè)類默認(rèn)的分割符是逗號(hào)",",而你可以通過 withFieldDelimiter 方法改變分隔符璧微。
如果你的初始分隔符不是逗號(hào)的話作箍,那么也可以重寫寫一個(gè)類實(shí)現(xiàn) RecordFormat 接口即可。
FileNameFormat
同樣是一個(gè)接口前硫。
public interface FileNameFormat extends Serializable {
void prepare(Map conf, TopologyContext topologyContext);
String getName(long rotation, long timeStamp);
String getPath();
}
Storm 所提供的默認(rèn)的是 org.apache.storm.hdfs.format.DefaultFileNameFormat 胞得。默認(rèn)人使用的轉(zhuǎn)換文件名有點(diǎn)長,格式是這樣的:
{prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}
例如:
MyBolt-5-7-1390579837830.txt
默認(rèn)情況下屹电,前綴是空的阶剑,擴(kuò)展標(biāo)識(shí)是".txt"。
SyncPolicy
同步策略允許你將 buffered data 緩沖到 Hdfs 文件中(從而client可以讀取數(shù)據(jù))危号,通過實(shí)現(xiàn)org.apache.storm.hdfs.sync.SyncPolicy 接口:
public interface SyncPolicy extends Serializable {
boolean mark(Tuple tuple, long offset);
void reset();
}
FileRotationPolicy
這個(gè)接口允許你控制什么情況下轉(zhuǎn)換寫入文件牧愁。
public interface FileRotationPolicy extends Serializable {
boolean mark(Tuple tuple, long offset);
void reset();
}
Storm 有提供三個(gè)實(shí)現(xiàn)該接口的類:
最簡單的就是不進(jìn)行轉(zhuǎn)換的org.apache.storm.hdfs.bolt.rotation.NoRotationPolicy ,就是什么也不干外莲。
通過文件大小觸發(fā)轉(zhuǎn)換的 org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy猪半。
通過時(shí)間條件來觸發(fā)轉(zhuǎn)換的 org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy。
如果有更加復(fù)雜的需求也可以自己定義偷线。
RotationAction
這個(gè)主要是提供一個(gè)或多個(gè) hook 磨确,可加可不加。主要是在觸發(fā)寫入文件轉(zhuǎn)換的時(shí)候會(huì)啟動(dòng)声邦。
public interface RotationAction extends Serializable {
void execute(FileSystem fileSystem, Path filePath) throws IOException;
}
三.實(shí)現(xiàn)一個(gè)例子
了解了上面的情況后乏奥,我們會(huì)實(shí)現(xiàn)一個(gè)例子,根據(jù)寫入記錄的多少來控制寫入轉(zhuǎn)換(改變寫入的文件)亥曹,并且轉(zhuǎn)換后文件的名字表示當(dāng)前是第幾次轉(zhuǎn)換英融。
首先來看看 HdfsBolt 的內(nèi)容:
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(" ");
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.KB);
/** rotate file with Date,every month create a new file
* format:yyyymm.txt
*/
FileRotationPolicy rotationPolicy = new CountStrRotationPolicy();
FileNameFormat fileNameFormat = new TimesFileNameFormat().withPath("/test/");
RotationAction action = new NewFileAction();
HdfsBolt bolt = new HdfsBolt()
.withFsUrl("hdfs://127.0.0.1:9000")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy)
.addRotationAction(action);
然后分別來看各個(gè)策略的類。
FileRotationPolicy
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.tuple.Tuple;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 計(jì)數(shù)以改變Hdfs寫入文件的位置歇式,當(dāng)寫入10次的時(shí)候驶悟,則更改寫入文件,更改名字取決于 “TimesFileNameFormat”
* 這個(gè)類是線程安全
*/
public class CountStrRotationPolicy implements FileRotationPolicy {
private SimpleDateFormat df = new SimpleDateFormat("yyyyMM");
private String date = null;
private int count = 0;
public CountStrRotationPolicy(){
this.date = df.format(new Date());
// this.date = df.format(new Date());
}
/**
* Called for every tuple the HdfsBolt executes.
*
* @param tuple The tuple executed.
* @param offset current offset of file being written
* @return true if a file rotation should be performed
*/
@Override
public boolean mark(Tuple tuple, long offset) {
count ++;
if(count == 10) {
System.out.print("num :" +count + " ");
count = 0;
return true;
}
else {
return false;
}
}
/**
* Called after the HdfsBolt rotates a file.
*/
@Override
public void reset() {
}
@Override
public FileRotationPolicy copy() {
return new CountStrRotationPolicy();
}
}
FileNameFormat
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.task.TopologyContext;
import java.util.Map;
/**
* 決定重新寫入文件時(shí)候的名字
* 這里會(huì)返回是第幾次轉(zhuǎn)換寫入文件材失,將這個(gè)第幾次做為文件名
*/
public class TimesFileNameFormat implements FileNameFormat {
//默認(rèn)路徑
private String path = "/storm";
//默認(rèn)后綴
private String extension = ".txt";
private Long times = new Long(0);
public TimesFileNameFormat withPath(String path){
this.path = path;
return this;
}
@Override
public void prepare(Map conf, TopologyContext topologyContext) {
}
@Override
public String getName(long rotation, long timeStamp) {
times ++ ;
//返回文件名痕鳍,文件名為更換寫入文件次數(shù)
return times.toString() + this.extension;
}
public String getPath(){
return this.path;
}
}
RotationAction
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.storm.hdfs.common.rotation.RotationAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
/**
當(dāng)轉(zhuǎn)換寫入文件時(shí)候調(diào)用的 hook ,這里僅寫入日志。
*/
public class NewFileAction implements RotationAction {
private static final Logger LOG = LoggerFactory.getLogger(NewFileAction.class);
@Override
public void execute(FileSystem fileSystem, Path filePath) throws IOException {
LOG.info("Hdfs change the written fileA簟熊响!");
return;
}
}
OK,這樣就大功告成了诗赌。通過上面的代碼汗茄,每接收到 10 個(gè) Tuple 后就會(huì)轉(zhuǎn)換寫入文件,新文件的名字就是第幾次轉(zhuǎn)換铭若。
完整代碼包括一個(gè)隨機(jī)生成字符串的 Spout 洪碳,可以到我的 github 上查看。