新鮮文章只壳,昨天剛經(jīng)過(guò)線上驗(yàn)證過(guò)的俏拱,使用它導(dǎo)出了3億的用戶數(shù)據(jù)出來(lái),花了半個(gè)小時(shí)吼句,性能還是穩(wěn)穩(wěn)的锅必,好了不吹牛皮了,直接上代碼吧命辖。
MR
考查了Hbase的各種MR况毅,沒(méi)有發(fā)現(xiàn)哪一個(gè)是能實(shí)現(xiàn)的,如果有請(qǐng)通知我尔艇,我給他發(fā)紅包尔许。
所以我們只能自己來(lái)寫(xiě)一個(gè)MR了,編寫(xiě)一個(gè)Hbase的MR终娃,官方文檔上也有相應(yīng)的例子味廊。
我們用來(lái)加以化妝就得到我們想要的了。
導(dǎo)出的CSV格式為
admin,22,北京
admin,23,天津
擼scala代碼了
定義Map轉(zhuǎn)換類
class MyMapper extends TableMapper[Text, Text] {
val keyText = new Text()
val valueText = new Text()
override def map(key: ImmutableBytesWritable, value: Result, context: Mapper[ImmutableBytesWritable, Result, Text, Text]#Context): Unit = {
val maps = result2Map(value)
keyText.set(maps.get("userId"))
valueText.set(s"${maps.get("regTime")}")
context.write(keyText, valueText)
}
//將Result轉(zhuǎn)換為Map
def result2Map(result: Result): util.HashMap[lang.String, lang.String] = {
val map = new util.HashMap[lang.String, lang.String]()
result.rawCells().foreach {
cell =>
val column: Array[Byte] = CellUtil.cloneQualifier(cell)
val value: Array[Byte] = CellUtil.cloneValue(cell)
val qualifierByte = cell.getQualifierArray
if (qualifierByte != null && qualifierByte.nonEmpty) {
if (value == null || value.length == 0) {
map.put(Bytes.toString(column), "")
} else {
map.put(Bytes.toString(column), Bytes.toString(value))
}
}
}
map
}
}
定義Reducer類
class MyReducer extends Reducer[Text, Text, Text, Text] {
override def reduce(key: Text, values: lang.Iterable[Text], context: Reducer[Text, Text, Text, Text]#Context): Unit = {
val iter = values.iterator()
while (iter.hasNext) {
//這樣可以只保留下Key字段棠耕,也就只有一行數(shù)據(jù)了
val tmpText = iter.next()
val mergeKey = new Text()
mergeKey.set(key.toString + "," + tmpText.toString)
val v = new Text()
v.set("")
context.write(mergeKey, v)
}
}
}
ExportCsv核心
class ExportCsv extends Configured with Tool {
override def run(args: Array[String]): Int = {
val conf = HBaseConfiguration.create()
conf.addResource(new FileInputStream(new File("/etc/hbase/conf/hbase-site.xml")))
conf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR, "/tmp/hbasecsv")
conf.set("mapreduce.job.running.map.limit", "8") //最多有多少個(gè)Task同時(shí)跑
val job = Job.getInstance(conf, "HbaseExportCsv")
job.setJarByClass(classOf[ExportCsv])
val scan = new Scan()
//過(guò)濾我們想要的數(shù)據(jù)
scan.addFamily(Bytes.toBytes("ext"))
scan.addColumn(Bytes.toBytes("ext"), Bytes.toBytes("userId"))
scan.addColumn(Bytes.toBytes("ext"), Bytes.toBytes("regTime"))
scan.setBatch(1000)
scan.setCacheBlocks(false)
TableMapReduceUtil.initTableMapperJob(
"USER_TABLE",
scan,
classOf[MyMapper],
classOf[Text],
classOf[Text],
job
)
job.setReducerClass(classOf[MyReducer])
val jobConf = new JobConf(job.getConfiguration)
FileOutputFormat.setOutputPath(jobConf, new Path("/tmp/hbasecsv"))
val isDone = job.waitForCompletion(true)
if (isDone) 0 else 1
}
}
要跑了任務(wù)了
hadoop jar ExportCsv.jar