通過自定義SparkSQL外部數(shù)據(jù)源實現(xiàn)SparkSQL讀取HBase

通過自定義SparkSQL外部數(shù)據(jù)源實現(xiàn)SparkSQL讀取HBase

標簽: SparkSQL HBase Saprk External DataSource


package name: sparksql.hbase


Scala Class: HBaseRelation.scala

package sparksql.hbase
 
import java.io.Serializable
import org.apache.spark.sql._
import org.apache.spark.sql.sources.TableScan
import org.apache.hadoop.hbase.client.{Result}
import org.apache.spark.sql._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.sources.BaseRelation
import sparksql.hbase.hbase._
  
  
object Resolver extends  Serializable { 
  def resolve (hbaseField: HBaseSchemaField, result: Result ): Any = {
    val cfColArray = hbaseField.fieldName.split(":",-1)
    val cfName = cfColArray(0)
    val colName =  cfColArray(1)
    var fieldRs: Any = null
    //resolve row key otherwise resolve column
    if(cfName=="" && colName=="key") {
      fieldRs = resolveRowKey(result, hbaseField.fieldType)
    } else {
      fieldRs =  resolveColumn(result, cfName, colName,hbaseField.fieldType)
    }
    fieldRs
  }
  
  def resolveRowKey (result: Result, resultType: String): Any = {
     val rowkey = resultType match {
      case "string" =>
        result.getRow.map(_.toChar).mkString
      case "int" =>
        result  .getRow.map(_.toChar).mkString.toInt
      case "long" =>
        result.getRow.map(_.toChar).mkString.toLong
    }
    rowkey
  }
  
  def resolveColumn (result: Result, columnFamily: String, columnName: String, resultType: String): Any = {
    val column = result.containsColumn(columnFamily.getBytes, columnName.getBytes) match{
        case true =>{
            resultType match {
              case "string" =>
                Bytes.toString(result.getValue(columnFamily.getBytes,columnName.getBytes))
                //result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString
              case "int" =>
                Bytes.toInt(result.getValue(columnFamily.getBytes,columnName.getBytes))
              case "long" =>
                Bytes.toLong(result.getValue(columnFamily.getBytes,columnName.getBytes))
              case "float" =>
                Bytes.toFloat(result.getValue(columnFamily.getBytes,columnName.getBytes))
              case "double" =>
                Bytes.toDouble(result.getValue(columnFamily.getBytes,columnName.getBytes))
             }
        }
        case _ => {
            resultType match {
              case "string" =>
                ""
              case "int" =>
                0
              case "long" =>
                0
             }
        }
    }
    column
  }
}
  
/**
   val hbaseDDL = s"""
      |CREATE TEMPORARY TABLE hbase_people
      |USING com.shengli.spark.hbase
      |OPTIONS (
      |  sparksql_table_schema   '(row_key string, name string, age int, job string)',
      |   hbase_table_name     'people',
      | hbase_table_schema '(:key , profile:name , profile:age , career:job )'
      |)""".stripMargin
 */
case class HBaseRelation(@transient val hbaseProps: Map[String,String])(@transient val sqlContext: SQLContext) extends BaseRelation with Serializable with TableScan{
  
  val hbaseTableName =  hbaseProps.getOrElse("hbase_table_name", sys.error("not valid schema"))
  val hbaseTableSchema =  hbaseProps.getOrElse("hbase_table_schema", sys.error("not valid schema"))
  val registerTableSchema = hbaseProps.getOrElse("sparksql_table_schema", sys.error("not valid schema"))
  val rowRange = hbaseProps.getOrElse("row_range", "->")
  //get star row and end row
  val range = rowRange.split("->",-1)
  val startRowKey = range(0).trim
  val endRowKey = range(1).trim
  
  val tempHBaseFields = extractHBaseSchema(hbaseTableSchema) //do not use this, a temp field
  val registerTableFields = extractRegisterSchema(registerTableSchema)
  val tempFieldRelation = tableSchemaFieldMapping(tempHBaseFields,registerTableFields)
  
  val hbaseTableFields = feedTypes(tempFieldRelation)
  val fieldsRelations =  tableSchemaFieldMapping(hbaseTableFields,registerTableFields)
  val queryColumns =  getQueryTargetCloumns(hbaseTableFields)
  
  def  feedTypes( mapping: Map[HBaseSchemaField, RegisteredSchemaField]) :  Array[HBaseSchemaField] = {
         val hbaseFields = mapping.map{
           case (k,v) =>
               val field = k.copy(fieldType=v.fieldType)
               field
        }
        hbaseFields.toArray
  }
  
  def isRowKey(field: HBaseSchemaField) : Boolean = {
    val cfColArray = field.fieldName.split(":",-1)
    val cfName = cfColArray(0)
    val colName =  cfColArray(1)
    if(cfName=="" && colName=="key") true else false
  }
  
  //eg: f1:col1  f1:col2  f1:col3  f2:col1
  def getQueryTargetCloumns(hbaseTableFields: Array[HBaseSchemaField]): String = {
    var str = ArrayBuffer[String]()
    hbaseTableFields.foreach{ field=>
         if(!isRowKey(field)) {
           str +=  field.fieldName
         }
    }
    str.mkString(" ")
  }
  lazy val schema = {
    val fields = hbaseTableFields.map{ field=>
        val name  = fieldsRelations.getOrElse(field, sys.error("table schema is not match the definition.")).fieldName
        val relatedType =  field.fieldType match  {
          case "string" =>
            SchemaType(StringType,nullable = false)
          case "int" =>
            SchemaType(IntegerType,nullable = false)
          case "long" =>
            SchemaType(LongType,nullable = false)
        }
        StructField(name,relatedType.dataType,relatedType.nullable)
    }
    StructType(fields)
  }
  
  def tableSchemaFieldMapping( externalHBaseTable: Array[HBaseSchemaField],  registerTable : Array[RegisteredSchemaField]): Map[HBaseSchemaField, RegisteredSchemaField] = {
       if(externalHBaseTable.length != registerTable.length) sys.error("columns size not match in definition!")
       val rs = externalHBaseTable.zip(registerTable)
       rs.toMap
  }
  
    /**
     * spark sql schema will be register
     *   registerTableSchema   '(rowkey string, value string, column_a string)'
      */
  def extractRegisterSchema(registerTableSchema: String) : Array[RegisteredSchemaField] = {
         val fieldsStr = registerTableSchema.trim.drop(1).dropRight(1)
         val fieldsArray = fieldsStr.split(",").map(_.trim)
         fieldsArray.map{ fildString =>
           val splitedField = fildString.split("\\s+", -1)
           RegisteredSchemaField(splitedField(0), splitedField(1))
         }
   }
  
  //externalTableSchema '(:key , f1:col1 )'
  def extractHBaseSchema(externalTableSchema: String) : Array[HBaseSchemaField] = {
        val fieldsStr = externalTableSchema.trim.drop(1).dropRight(1)
        val fieldsArray = fieldsStr.split(",").map(_.trim)
        fieldsArray.map(fildString => HBaseSchemaField(fildString,""))
  }
  
  // By making this a lazy val we keep the RDD around, amortizing the cost of locating splits.
  lazy val buildScan = {
  
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", "zookeeper-name")
    hbaseConf.set(TableInputFormat.INPUT_TABLE, hbaseTableName)
    hbaseConf.set(TableInputFormat.SCAN_COLUMNS, queryColumns)
    hbaseConf.set(TableInputFormat.SCAN_ROW_START, startRowKey)
    hbaseConf.set(TableInputFormat.SCAN_ROW_STOP, endRowKey)
  
    val hbaseRdd = sqlContext.sparkContext.newAPIHadoopRDD(
      hbaseConf,
      classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result]
    )
  
    val rs = hbaseRdd.map(tuple => tuple._2).map(result => {
      var values = new ArrayBuffer[Any]()
      hbaseTableFields.foreach{field=>
        values += Resolver.resolve(field,result)
      }
      Row.fromSeq(values.toSeq)
    })
    rs
  }
  
  private case class SchemaType(dataType: DataType, nullable: Boolean)
}

Scala Class: DefaultSource.scala

package sparksql.hbase
 
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.RelationProvider
  
  
class DefaultSource extends RelationProvider {
  def createRelation(sqlContext: SQLContext, parameters: Map[String, String]) = {
    HBaseRelation(parameters)(sqlContext)
  }
}

Package Object: package.scala

package sparksql.hbase
 
import org.apache.spark.sql.SQLContext
import scala.collection.immutable.HashMap 
  
  
package object hbase {
  
  abstract class SchemaField extends Serializable
  
   case class RegisteredSchemaField(fieldName: String, fieldType: String)  extends  SchemaField  with Serializable
  
   case class HBaseSchemaField(fieldName: String, fieldType: String)  extends  SchemaField  with Serializable
  
   case class Parameter(name: String)
  
  protected  val SPARK_SQL_TABLE_SCHEMA = Parameter("sparksql_table_schema")
  protected  val HBASE_TABLE_NAME = Parameter("hbase_table_name")
  protected  val HBASE_TABLE_SCHEMA = Parameter("hbase_table_schema")
  protected  val ROW_RANGE = Parameter("row_range")  
  
  /**
   * Adds a method, `hbaseTable`, to SQLContext that allows reading data stored in hbase table.
   */
  implicit class HBaseContext(sqlContext: SQLContext) {
    def hbaseTable(sparksqlTableSchema: String, hbaseTableName: String, hbaseTableSchema: String, rowRange: String = "->") = {
      var params = new HashMap[String, String]
      params += ( SPARK_SQL_TABLE_SCHEMA.name -> sparksqlTableSchema)
      params += ( HBASE_TABLE_NAME.name -> hbaseTableName)
      params += ( HBASE_TABLE_SCHEMA.name -> hbaseTableSchema)
      //get star row and end row
      params += ( ROW_RANGE.name -> rowRange)
      sqlContext.baseRelationToDataFrame(HBaseRelation(params)(sqlContext))
    }
  }
}

使用實例: test.scala

package test
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext
 
 
object SparkSqlHbaseTest {
 
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("yarn-client").setAppName("SparkSQL HBase Test")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
 
    var hbasetable = sqlContext.read.format("sparksql.hbase").options(Map(
    "sparksql_table_schema" -> "(key string, Sequence string)",
    "hbase_table_name" -> "tbTexpertR1",
    "hbase_table_schema" -> "(:key , info:Sequence)"
    )).load()
 
    hbasetable.printSchema()
 
    hbasetable.registerTempTable("tbTexpertR1")
 
    var records = sqlContext.sql("SELECT * from tbTexpertR1 limit 10").collect
  }
}

[1] 通過自定義SparkSQL外部數(shù)據(jù)源實現(xiàn)SparkSQL讀取HBase

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市步清,隨后出現(xiàn)的幾起案子辆它,更是在濱河造成了極大的恐慌缀去,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,627評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件识脆,死亡現(xiàn)場離奇詭異设联,居然都是意外死亡,警方通過查閱死者的電腦和手機灼捂,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,180評論 3 399
  • 文/潘曉璐 我一進店門离例,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人悉稠,你說我怎么就攤上這事宫蛆。” “怎么了的猛?”我有些...
    開封第一講書人閱讀 169,346評論 0 362
  • 文/不壞的土叔 我叫張陵耀盗,是天一觀的道長。 經(jīng)常有香客問我卦尊,道長叛拷,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,097評論 1 300
  • 正文 為了忘掉前任岂却,我火速辦了婚禮忿薇,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘躏哩。我一直安慰自己署浩,他們只是感情好,可當我...
    茶點故事閱讀 69,100評論 6 398
  • 文/花漫 我一把揭開白布扫尺。 她就那樣靜靜地躺著筋栋,像睡著了一般。 火紅的嫁衣襯著肌膚如雪器联。 梳的紋絲不亂的頭發(fā)上二汛,一...
    開封第一講書人閱讀 52,696評論 1 312
  • 那天婿崭,我揣著相機與錄音,去河邊找鬼肴颊。 笑死氓栈,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的婿着。 我是一名探鬼主播授瘦,決...
    沈念sama閱讀 41,165評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼竟宋!你這毒婦竟也來了提完?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 40,108評論 0 277
  • 序言:老撾萬榮一對情侶失蹤丘侠,失蹤者是張志新(化名)和其女友劉穎徒欣,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蜗字,經(jīng)...
    沈念sama閱讀 46,646評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡打肝,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,709評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了挪捕。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片粗梭。...
    茶點故事閱讀 40,861評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖级零,靈堂內(nèi)的尸體忽然破棺而出断医,到底是詐尸還是另有隱情,我是刑警寧澤奏纪,帶...
    沈念sama閱讀 36,527評論 5 351
  • 正文 年R本政府宣布鉴嗤,位于F島的核電站,受9級特大地震影響亥贸,放射性物質(zhì)發(fā)生泄漏躬窜。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,196評論 3 336
  • 文/蒙蒙 一炕置、第九天 我趴在偏房一處隱蔽的房頂上張望荣挨。 院中可真熱鬧,春花似錦朴摊、人聲如沸默垄。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,698評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽口锭。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間鹃操,已是汗流浹背韭寸。 一陣腳步聲響...
    開封第一講書人閱讀 33,804評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留荆隘,地道東北人恩伺。 一個月前我還...
    沈念sama閱讀 49,287評論 3 379
  • 正文 我出身青樓,卻偏偏與公主長得像椰拒,于是被迫代替她去往敵國和親晶渠。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,860評論 2 361

推薦閱讀更多精彩內(nèi)容