新鮮文章,昨天剛經(jīng)過線上驗證過的教寂,使用它導出了3億的用戶數(shù)據(jù)出來捏鱼,花了半個小時,性能還是穩(wěn)穩(wěn)的酪耕,好了不吹牛皮了导梆,直接上代碼吧。
MR
考查了Hbase的各種MR迂烁,沒有發(fā)現(xiàn)哪一個是能實現(xiàn)的看尼,如果有請通知我,我給他發(fā)紅包盟步。
所以我們只能自己來寫一個MR了藏斩,編寫一個Hbase的MR,官方文檔上也有相應的例子却盘。
我們用來加以化妝就得到我們想要的了狰域。
導出的CSV格式為
admin,22,北京
admin,23,天津
擼scala代碼了
定義Map轉換類
class MyMapper extends TableMapper[Text, Text] {
val keyText = new Text()
val valueText = new Text()
override def map(key: ImmutableBytesWritable, value: Result, context: Mapper[ImmutableBytesWritable, Result, Text, Text]#Context): Unit = {
val maps = result2Map(value)
keyText.set(maps.get("userId"))
valueText.set(s"${maps.get("regTime")}")
context.write(keyText, valueText)
}
//將Result轉換為Map
def result2Map(result: Result): util.HashMap[lang.String, lang.String] = {
val map = new util.HashMap[lang.String, lang.String]()
result.rawCells().foreach {
cell =>
val column: Array[Byte] = CellUtil.cloneQualifier(cell)
val value: Array[Byte] = CellUtil.cloneValue(cell)
val qualifierByte = cell.getQualifierArray
if (qualifierByte != null && qualifierByte.nonEmpty) {
if (value == null || value.length == 0) {
map.put(Bytes.toString(column), "")
} else {
map.put(Bytes.toString(column), Bytes.toString(value))
}
}
}
map
}
}
定義Reducer類
class MyReducer extends Reducer[Text, Text, Text, Text] {
override def reduce(key: Text, values: lang.Iterable[Text], context: Reducer[Text, Text, Text, Text]#Context): Unit = {
val iter = values.iterator()
while (iter.hasNext) {
//這樣可以只保留下Key字段,也就只有一行數(shù)據(jù)了
val tmpText = iter.next()
val mergeKey = new Text()
mergeKey.set(key.toString + "," + tmpText.toString)
val v = new Text()
v.set("")
context.write(mergeKey, v)
}
}
}
ExportCsv核心
class ExportCsv extends Configured with Tool {
override def run(args: Array[String]): Int = {
val conf = HBaseConfiguration.create()
conf.addResource(new FileInputStream(new File("/etc/hbase/conf/hbase-site.xml")))
conf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR, "/tmp/hbasecsv")
conf.set("mapreduce.job.running.map.limit", "8") //最多有多少個Task同時跑
val job = Job.getInstance(conf, "HbaseExportCsv")
job.setJarByClass(classOf[ExportCsv])
val scan = new Scan()
//過濾我們想要的數(shù)據(jù)
scan.addFamily(Bytes.toBytes("ext"))
scan.addColumn(Bytes.toBytes("ext"), Bytes.toBytes("userId"))
scan.addColumn(Bytes.toBytes("ext"), Bytes.toBytes("regTime"))
scan.setBatch(1000)
scan.setCacheBlocks(false)
TableMapReduceUtil.initTableMapperJob(
"USER_TABLE",
scan,
classOf[MyMapper],
classOf[Text],
classOf[Text],
job
)
job.setReducerClass(classOf[MyReducer])
val jobConf = new JobConf(job.getConfiguration)
FileOutputFormat.setOutputPath(jobConf, new Path("/tmp/hbasecsv"))
val isDone = job.waitForCompletion(true)
if (isDone) 0 else 1
}
}
要跑了任務了
hadoop jar ExportCsv.jar