本例主要是為了學習大數據的相關知識抄肖,整個學習過程包含了:
- nutch抓取網上數據咒锻;
- Hbase數據篩雀颉;
- 中文分詞及mapreduce數據統(tǒng)計分析若厚。
首先需要安裝nutch2和hbase. nutch2只能通過源碼編譯安裝拦英,指定hbase為默認存放抓取數據的地方。因此测秸,建議先安裝hbase, 再編譯安裝nutch疤估。同時,hbase可以存在hdfs上乞封,方便空間擴展和mapreduce計算做裙。所以岗憋,建議首先安裝hadoop2環(huán)境肃晚。如果僅僅是為了測試用,可以只安裝single mode. 環(huán)境安裝順序為.
- Hadoop -- 網上有很多安裝攻略仔戈,不列舉关串;
- Hbase -- 按照官網的quick start安裝配置即可;
- Nutch2 -- 官網也有編譯方法监徘,編譯過程很長晋修,一定要耐心一點。一般除了倉庫連接有問題基本不會遇到什么問題凰盔。
接下來就是用nutch爬取網站數據墓卦。nutch2的爬取方式有兩種,一種是deploy的方式户敬,一種是local的方式落剪,我們測試就用local的方式。注意尿庐,nutch只能爬取靜態(tài)頁面的結果忠怖。如果想爬取服務器數據或者動態(tài)數據得另尋它法。先切換到$NUTCH_HOME/runtime/local/下面抄瑟。以爬取新浪股票的網頁為例凡泣。首先需要添加種子鏈接;
$ mkdir -p urls
$ echo http://finance.sina.com.cn/stock/ > urls/seed.txt
然后需要配置過濾規(guī)則皮假,一般是在local/conf/regex-urlfilter.txt下面鞋拟。主要看這幾項內容
# skip file: ftp: and mailto: urls 這個是跳過file/ftp/mailto等鏈接
-^(file|ftp|mailto):
# skip image and other suffixes we can't yet parse
# for a more extensive coverage use the urlfilter-suffix plugin 這個是跳過資源文件和js
-\.(gif|GIF|jpg|JPG|png|PNG|ico|ICO|css|CSS|sit|SIT|eps|EPS|wmf|WMF|zip|ZIP|ppt|PPT|mpg|MPG|xls|XLS|gz|GZ|rpm|RPM|tgz|TGZ|mov|MOV|exe|EXE|jpeg|JPEG|bmp|BMP|js|JS)$
# skip URLs containing certain characters as probable queries, etc. 這個是跳過所有含有以下符號的鏈接
-[?*!@=]
# skip URLs with slash-delimited segment that repeats 3+ times, to break loops
-.*(/[^/]+)/[^/]+\1/[^/]+\1/
# accept anything else 這個是加上所有你希望添加的鏈接
+.
總之,‘-’就是丟棄鏈接惹资,‘+’就是添加鏈接贺纲,可以根據自己情況添加filter.
接下來就是啟動hadoop/hbase
$ start-all.sh
$ cd $HBASE_HOME
$ bin/start-hbase.sh
然后切換到nutch2/runtime/local下面,nutch抓取數據需要存放到hbase上面布轿,因此還需要拷貝hbase下的jar包到local/lib下面哮笆。
$ bin/crawl urls stock 3
crawl就開始抓取工作了来颤, 抓取的文件默認放到了hbase里面(nutch編譯時決定的)。urls是種子目錄稠肘,stock是id福铅,用來命名hbase的表。3就是迭代次數项阴。抓取時間較長滑黔。
抓取結束后可進入hbase shell里查看。
$ bin/hbase shell
$ : list
用list可以看到hbase里面多了個“stock_webpage”的表环揽。接下來看看這個表的結構略荡。
可以用scan tablename查看,但是內容太多了歉胶,抓不到重點汛兜。其實nutch抓取網頁后會對網頁進行分析,我們來看看這個文檔通今。
$ more $NUTHC_HOME/runtime/local/gora_hbase_mapping.xml
<gora-orm>
<table name="webpage">
<family name="p" maxVersions="1"/>
<family name="f" maxVersions="1"/>
<family name="s" maxVersions="1"/>
<family name="il" maxVersions="1"/>
<family name="ol" maxVersions="1"/>
<family name="h" maxVersions="1"/>
<family name="mtdt" maxVersions="1"/>
<family name="mk" maxVersions="1"/>
</table>
<class table="webpage" keyClass="java.lang.String" name="org.apache.nutch.storage.WebPage">
<!-- fetch fields -->
<field name="baseUrl" family="f" qualifier="bas"/>
<field name="status" family="f" qualifier="st"/>
<field name="prevFetchTime" family="f" qualifier="pts"/>
<field name="fetchTime" family="f" qualifier="ts"/>
<field name="fetchInterval" family="f" qualifier="fi"/>
<field name="retriesSinceFetch" family="f" qualifier="rsf"/>
<field name="reprUrl" family="f" qualifier="rpr"/>
<field name="content" family="f" qualifier="cnt"/>
<field name="contentType" family="f" qualifier="typ"/>
<field name="protocolStatus" family="f" qualifier="prot"/>
<field name="modifiedTime" family="f" qualifier="mod"/>
<field name="prevModifiedTime" family="f" qualifier="pmod"/>
<field name="batchId" family="f" qualifier="bid"/>
<!-- parse fields -->
<field name="title" family="p" qualifier="t"/>
<field name="text" family="p" qualifier="c"/>
<field name="parseStatus" family="p" qualifier="st"/>
<field name="signature" family="p" qualifier="sig"/>
<field name="prevSignature" family="p" qualifier="psig"/>
<!-- score fields -->
<field name="score" family="s" qualifier="s"/>
<field name="headers" family="h"/>
<field name="inlinks" family="il"/>
<field name="outlinks" family="ol"/>
<field name="metadata" family="mtdt"/>
<field name="markers" family="mk"/>
</class>
<table name="host">
<family name="mtdt" maxVersions="1"/>
<family name="il" maxVersions="1"/>
<family name="ol" maxVersions="1"/>
</table>
<class table="host" keyClass="java.lang.String" name="org.apache.nutch.storage.Host">
<field name="metadata" family="mtdt"/>
<field name="inlinks" family="il"/>
<field name="outlinks" family="ol"/>
</class>
</gora-orm>
hbase存取數據的方式是rowkey:family:colum:value. 什么意思呢粥谬?就是一個rowkey有多個family,一個family有多個column,每個column有對應的value. 再看看這個文件辫塌。
table name為webpage漏策,加上id前綴就是‘stock_webpage‘. 它有好多family, p/f/s...
fetch_filed就是抓取網頁是存放的信息. 這些filed name都屬于'f'這個family;
parse_filed就是分析后存放的信息。這些filed name都屬于’p‘這個family.
所以臼氨,如果我們想查看http://finance.sina.com.cn/stock/ 這個網頁下的文本信息只需要這樣查看掺喻。
http://finance.sina.com.cn/stock p:c:value
什么意思呢,即rowkey為這個鏈接储矩,family為'p', column為'c'的value. 當然感耙,這個是有接口可以調用的。后續(xù)會提及椰苟。
那我們在分析數據的時候就可以根據這個存放規(guī)則去找到相應的文本信息了抑月。關于分析的部分,我首先把rowkey里面包含finance的文本信息提取出來放到本地舆蝴∏酰可以放到hdfs,也可以直接放到另一個hbase表洁仗。
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
public class HBaseToHdfs {
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
Job job = Job.getInstance(conf, HBaseToHdfs.class.getSimpleName());
job.setJarByClass(HBaseToHdfs.class);
job.setMapperClass(HBaseToHdfsMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setNumReduceTasks(0);
Scan scan = new Scan();
//過濾hbase里面所有含有finance的rowkey
Filter filter3 = new RowFilter(CompareFilter.CompareOp.EQUAL,
new SubstringComparator("finance"));
scan.setFilter(filter3);
TableMapReduceUtil.initTableMapperJob("stock_webpage", scan,HBaseToHdfsMapper.class ,Text.class, Text.class, job);
job.setOutputValueClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path("/stock"));
job.waitForCompletion(true);
}
public static class HBaseToHdfsMapper extends TableMapper<Text, Text> {
private Text outKey = new Text();
private Text outValue = new Text();
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//key在這里就是hbase的rowkey
byte[] name = null;
try {
//得到family為'p'层皱,column為'c'的值
name = value.getColumnLatestCell(Bytes.toBytes("p"), Bytes.toBytes("c")).getValue();
} catch (Exception e) {}
outKey.set(key.get());
String temp = (name==null || name.length==0)?"NULL":new String(name);
System.out.println(temp);
outValue.set(temp);
context.write(outKey, outValue);
}
}
}
現在我的數據放到了workspace/HBaseToHdfs/stock里面,接下來放到hdfs上并作mapreduce. 簡單的做個中文分詞并計算wordcount赠潦。
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.ByteArrayInputStream;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class ChineseCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
byte[] bt = value.getBytes();
InputStream ip = new ByteArrayInputStream(bt);
Reader read = new InputStreamReader(ip);
//map之前先做分詞
IKSegmenter iks = new IKSegmenter(read,true);
Lexeme t;
while ((t = iks.next()) != null)
{
word.set(t.getLexemeText());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:9000");
String[] args1 = new String[] { "/input", "/output" };
String[] otherArgs = new GenericOptionsParser(conf, args1).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(ChineseCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
好了叫胖,大功告成,可以去hdfs上的/ouput里面查看分詞結果她奥,后續(xù)的分析待追加瓮增。