object HbaseDao {
private val sdf1 =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
private val sdf2 =new SimpleDateFormat("yyyyMMddHHmmss")
//實(shí)現(xiàn)批量添加
? private val cacheList =new util.ArrayList[Put]
//創(chuàng)建Habse環(huán)境對象
? var conf: Configuration = HBaseConfiguration.create()
//讀取region數(shù)
? private val regions: Integer = Integer.valueOf(PropertiesUtil.getProperty("hbase.calllog.regions"))
//讀取命名空間
? private val nameSpace:String = PropertiesUtil.getProperty("hbase.calllog.namespace")
//讀取表名
? private val tableName:String = PropertiesUtil.getProperty("hbase.calllog.tablename")
var table:HTable =null
? //首先創(chuàng)建命名空間,在創(chuàng)建表
? if(!HBaseUtil.isExistTable(conf,tableName)){
HBaseUtil.initNamespace(conf,nameSpace)
HBaseUtil.createTable(conf,tableName,regions,"f1","f2")
}
/**
? * 把數(shù)據(jù)寫入到hbase
? * ori數(shù)據(jù)樣式: 18576581848,17269452013,2017-08-14 13:38:31,1761
? * rowkey樣式:01_18576581848_20170814133831_17269452013_1_1761
? * HBase表的列:call1? call2? build_time? build_time_ts? flag? duration
*
*
*/
? def put(str:String) = {
if(cacheList.size ==0){
val conn = ConnectionInstance.getConnection(conf)
table = conn.getTable(TableName.valueOf(tableName)).asInstanceOf[HTable]
table.setAutoFlushTo(false)
table.setWriteBufferSize(2 *1024 *1024)
}
//對數(shù)據(jù)進(jìn)行切割
? ? val splitOri = str.split(",")
val call1:String = splitOri(0)
val call1_name:String = splitOri(1)
val call2:String = splitOri(2)
val call2_name:String = splitOri(3)
val buildTime:String = splitOri(4)
val duration:String = splitOri(5)
val flag:String = splitOri(6)
//獲取region編碼
? ? val regionCode = HBaseUtil.genRegionCode(call1, buildTime,regions)
//建立通話時(shí)間
? ? val buildTimeReplace =sdf2.format(sdf1.parse(buildTime))
val buildTimeTs:String = String.valueOf(sdf1.parse(buildTime).getTime)
//生成rowKey
? ? val rowkey = HBaseUtil.genRowKey(regionCode, call1, buildTimeReplace, call2, flag, duration)
//向表中插入數(shù)據(jù)
? ? val put: Put =new Put(Bytes.toBytes(rowkey))
//主叫號碼
? ? put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("call1"),Bytes.toBytes(call1))
//主叫名稱
? ? put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("call1_name"),Bytes.toBytes(call1_name))
//被叫號碼
? ? put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("call2"),Bytes.toBytes(call2))
//被叫名稱
? ? put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("call2_name"),Bytes.toBytes(call2_name))
//通話日期
? ? put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("build_time"),Bytes.toBytes(buildTime))
//通話時(shí)間
? ? put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("build_time_ts"),Bytes.toBytes(buildTimeTs))
//通話標(biāo)識
? ? put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("flag"),Bytes.toBytes(flag))
//通話時(shí)間
? ? put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("duration"),Bytes.toBytes(duration))
cacheList.add(put)
if(cacheList.size >0){
table.put(cacheList)
table.flushCommits()
cacheList.clear()
}
}