/**
* 把數(shù)據(jù)寫入到hbase 中
*/
object KafkaToHbase {
def main(args: Array[String]): Unit = {
//testHbase()
? ? //創(chuàng)建kafka消費(fèi)者的對象
? ? val kafkaConsumer =new KafkaConsumer[String,String](PropertiesUtil.properties)
//訂閱指定的topic? 用于數(shù)據(jù)的消費(fèi)
? ? kafkaConsumer.subscribe(util.Arrays.asList(PropertiesUtil.getProperty("kafka.topics")))
println("等待消費(fèi)數(shù)據(jù)--------------")
while (true) {
//每0.1S 從指定topic中消費(fèi)數(shù)據(jù)
? ? ? val records: ConsumerRecords[String,String] = kafkaConsumer.poll(100)
//這個是scala和java集合類型之間的轉(zhuǎn)換
? ? ? for (cr <- records) {
//得到每條數(shù)據(jù)的value
? ? ? ? val str:String = cr.value()
println(str)
HbaseDao.put(str)
}
}
}
}