Spark、BulkLoad Hbase弄贿、單列春锋、多列

背景

之前的博客:Spark:DataFrame寫HFile (Hbase)一個列族、一個列擴展一個列族差凹、多個列

用spark 1.6.0 和 hbase 1.2.0 版本實現(xiàn)過spark BulkLoad Hbase的功能,并且擴展了其只能操作單列的不便性期奔。

現(xiàn)在要用spark 2.3.2 和 hbase 2.0.2 來實現(xiàn)相應(yīng)的功能豆拨;
本以為會很簡單,兩個框架經(jīng)過大版本的升級能庆,API變化很大施禾;
官網(wǎng)的案例其實有點難實現(xiàn),且網(wǎng)上的資料要么老舊搁胆,要么復(fù)制黏貼實在是感人弥搞,所以花了點時間重新實現(xiàn)了該功能;
同時記錄了在這個過程中遇到的很多問題渠旁。


版本信息

工具 版本
spark 2.3.2
hbase 2.0.2

配置文件

hdfs.properties
# zookeeper的信息
zk=slave01:2181,slave02:2181,slave03:2181,slave04:2181,slave05:2181
zk.host=slave01,slave02,slave03,slave04,slave05
zk.port=2181

maven 依賴

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.version>2.11</scala.version>
        <spark.version>2.3.2</spark.version>
        <hbase.version>2.0.2</hbase.version>
        <hadoop.version>3.1.1</hadoop.version>
    </properties>
    
    <dependencies>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-it</artifactId>
            <version>${hbase.version}</version>
        </dependency>

    </dependencies>

實現(xiàn)代碼

模版方法
package com.aaa.base

import org.apache.spark.storage.StorageLevel
import org.slf4j.{Logger, LoggerFactory}

/**
 * @author lillcol 
 *         create_time  2019/6/14-14:25
 *         description :使用模板方法模式創(chuàng)建任務(wù)執(zhí)行流程攀例,保證所有任務(wù)的流程統(tǒng)一,所有非流處理任務(wù)需要實現(xiàn)此接口
 */
trait ExportToHbaseTemplate {
  val logger: Logger = LoggerFactory.getLogger(getClass.getSimpleName)
  //任務(wù)狀態(tài)
  val PERSIST_LEVEL: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER


  /**
   * 任務(wù)模板
   *
   * @param args
   */
  def runWork(args: Array[String]): Unit = {
    try {
      //      initTepmlate(args) // 模板初始化信息
      init(args) // 初始化信息
      //      printfTepmlate //輸出模板初始化結(jié)果
      printf //輸出初始化結(jié)果
      workFlow //數(shù)據(jù)處理流
    } catch {
      case e: Exception =>
        e.printStackTrace
    } finally {
      //      spark.sparkContext.stop()
    }
  }


  /**
   * 初始化信息
   *
   * @param args
   */
  def init(args: Array[String])

  /**
   * 輸出初始化結(jié)果
   */
  def printf()

  /**
   * 數(shù)據(jù)處理流
   */
  def workFlow()

  /**
   * 模板初始化
   *
   * @param args
   */
  def initTepmlate(args: Array[String]): Unit = {
  }

  /**
   * 輸出模板初始化結(jié)果
   */
  def printfTepmlate(): Unit = {
  }

}


讀取配置文件方法
package com.aaa.util

import java.io.FileInputStream
import java.util.Properties

/**
 * 讀取.properties配置文件
 *
 * @param path
 */
class ReadProperties(path: String) {
  /**
   * 讀取顾腊、加載指定路徑配置文件
   *
   * @return Properties 實例
   */
  def getProInstance(): Properties = {
    val pro = new Properties()
    pro.load(new FileInputStream(path))
    pro
  }
}

/**
 * 伴生對象
 */
object ReadProperties {
  def getInstance(path: String): ReadProperties = {
    new ReadProperties(path)
  }
}


實現(xiàn)主體
package com.aaa.test

import com.aaa.base.{ExportToHbaseTemplate}
import com.aaa.util.ReadProperties
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, RegionLocator, Table}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object TestHbase extends ExportToHbaseTemplate {

  val proPath = "/root/lillcol/hdfs.properties" //配置文件路徑
  var cf: String = "info" //列族
  var defKey: String = "UID" //默認(rèn)key
  val proInstance = ReadProperties.getInstance(proPath).getProInstance
  var partition: String = "20190918"
  var conf: Configuration = _
  var SourceDataFrame: DataFrame = _
  var outPutTable: String = "outPutTable"
  var savePath: String = s"/tmp/hbase/$outPutTable" //臨時HFile保存路徑
  val spark: SparkSession = SparkSession
    .builder()
    //    .master("local")
    .appName("ExportToHBase")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()

  import spark.implicits._

  def main(args: Array[String]): Unit = {
    runWork(args)
  }

  /**
   * 初始化信息
   *
   * @param args
   */
  override def init(args: Array[String]): Unit = {
    conf = HBaseConfiguration.create() //Hbase配置信息
    conf.set("hbase.zookeeper.quorum", proInstance.getProperty("zk")) //Hbase zk信息
    conf.set("hbase.mapreduce.hfileoutputformat.table.name", outPutTable) //Hbase 輸出表
    conf.set("hbase.unsafe.stream.capability.enforce", "false") //hbase  根目錄設(shè)定  (有時候會報錯粤铭,具體看錯誤處理部分)
    conf.set("zookeeper.znode.parent", "/hbase")
    conf.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", "400")
  }

  /**
   * 數(shù)據(jù)處理流
   */
  override def workFlow(): Unit = {
    getDataset()
    val hfileRDD: RDD[(ImmutableBytesWritable, KeyValue)] = getHFileRDD
    saveHfile(hfileRDD)
    loadHFileToHbase()
  }

  /**
   * 獲取源數(shù)據(jù)表
   */
  def getDataset() = {
    SourceDataFrame = spark.read.parquet("/warehouse/data/lillcol/test.parquet")
  }

  /**
   * 將dataset處理成Hbase的數(shù)據(jù)格式
   * 注:
   * 默認(rèn)API只能處理一個列族一個列的情況
   * 此處擴展了該功能:
   * 用var kvlist: Seq[KeyValue] = List()
   * 和rdd.flatMapValues(_.iterator) 方式自適應(yīng)列名
   * 處理后的結(jié)果為:一個列族多個列
   *
   * @return
   */
  def getHFileRDD(): RDD[(ImmutableBytesWritable, KeyValue)] = {
    //key:全局變量不能在 map  內(nèi)部使用  所以創(chuàng)建一個局部變量
    //注:如果不做會出現(xiàn)奇怪的異常比如類初始化失敗,spark為初始化等杂靶,目前還沒發(fā)現(xiàn)具體原因梆惯,后續(xù)去跟蹤
    val key = defKey
    //列族
    val clounmFamily: String = cf
    //獲取列名 第一個為key
    val columnsName: Array[String] = SourceDataFrame.columns.sorted

    val result1: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = SourceDataFrame
      .repartition(200, $"$key") //如果數(shù)據(jù)量大,可以根據(jù)key進(jìn)行分區(qū)操作
      .rdd
      .map(row => {
        var kvlist: Seq[KeyValue] = List() //存儲多個列
        var kv: KeyValue = null
        val cf: Array[Byte] = clounmFamily.getBytes //列族
        val rowKey = Bytes.toBytes(row.getAs[Int](key) + "")
        val immutableRowKey: ImmutableBytesWritable = new ImmutableBytesWritable(rowKey)
        for (i <- 0 to (columnsName.length - 1)) {
          //將rdd轉(zhuǎn)換成HFile需要的格式,
          //我們上面定義了Hfile的key是ImmutableBytesWritable,
          //那么我們定義的RDD也是要以ImmutableBytesWritable的實例為key
          var value: Array[Byte] = null
          try {
            //數(shù)據(jù)是字符串的都映射成String
            value = Bytes.toBytes(row.getAs[String](columnsName(i)))
          } catch {
            case e: ClassCastException =>
              //出現(xiàn)數(shù)據(jù)類型轉(zhuǎn)換異常則說明是數(shù)字,都映射成BigInt
              value = Bytes.toBytes(row.getAs[BigInt](columnsName(i)) + "")
            case e: Exception =>
              e.printStackTrace()
          }
          //封裝KeyValue
          kv = new KeyValue(rowKey, cf, Bytes.toBytes(columnsName(i)), value)
          //將新的kv加在kvlist后面(不能反 需要整體有序)
          kvlist = kvlist :+ kv
        }
        (immutableRowKey, kvlist)
      })

    val hfileRDD: RDD[(ImmutableBytesWritable, KeyValue)] = result1
      .flatMapValues(_.iterator)
    hfileRDD
  }

  /**
   * 保存生成的HFile文件
   * 注:bulk load  生成的HFile文件需要落地
   * 然后再通過LoadIncrementalHFiles類load進(jìn)Hbase
   * 此處關(guān)于  sortBy 操作詳解:
   * 0. Hbase查詢是根據(jù)rowkey進(jìn)行查詢的吗垮,并且rowkey是有序垛吗,
   * 某種程度上來說rowkey就是一個索引,這是Hbase查詢高效的一個原因烁登,
   * 這就要求我們在插入數(shù)據(jù)的時候怯屉,要插在rowkey該在的位置。
   * 1. Put方式插入數(shù)據(jù)饵沧,會有WAL锨络,同時在插入Hbase的時候會根據(jù)RowKey的值選擇合適的位置,此方式本身就可以保證RowKey有序
   * 2. bulk load 方式?jīng)]有WAL狼牺,它更像是hive通過load方式直接將底層文件HFile移動到制定的Hbase路徑下羡儿,所以,在不東HFile的情況下锁右,要保證本身有序才行
   * 之前寫的時候只要rowkey有序即可失受,但是2.0.2版本的時候發(fā)現(xiàn)clounm也要有序,所以會有sortBy(x => (x._1, x._2.getKeyString), true)
   *
   * @param hfileRDD
   */
  def saveHfile(hfileRDD: RDD[(ImmutableBytesWritable, KeyValue)]) = {
    //刪除可能存在的文件咏瑟,否則回報文件已存在異常
    delete_hdfspath(savePath)

    //生成的HFile保存到指定目錄
    hfileRDD
      .sortBy(x => (x._1, x._2.getKeyString), true) //要保持 整體有序
      .saveAsNewAPIHadoopFile(savePath,
        classOf[ImmutableBytesWritable],
        classOf[KeyValue],
        classOf[HFileOutputFormat2],
        conf)
  }

  /**
   * HFile 導(dǎo)入HBase
   */
  def loadHFileToHbase() = {
    //開始即那個HFile導(dǎo)入到Hbase,此處都是hbase的api操作
    val load: LoadIncrementalHFiles = new LoadIncrementalHFiles(conf)

    //創(chuàng)建hbase的鏈接,利用默認(rèn)的配置文件,實際上讀取的hbase的master地址
    val conn: Connection = ConnectionFactory.createConnection(conf)

    //根據(jù)表名獲取表
    val table: Table = conn.getTable(TableName.valueOf(outPutTable))

    //獲取hbase表的region分布
    val regionLocator: RegionLocator = conn.getRegionLocator(TableName.valueOf(outPutTable))

    //創(chuàng)建一個hadoop的mapreduce的job
    val job: Job = Job.getInstance(conf)

    //設(shè)置job名稱
    job.setJobName(s"$outPutTable LoadIncrementalHFiles")

    //此處最重要,需要設(shè)置文件輸出的key,因為我們要生成HFil,所以outkey要用ImmutableBytesWritable
    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])

    //輸出文件的內(nèi)容KeyValue
    job.setMapOutputValueClass(classOf[KeyValue])

    //配置HFileOutputFormat2的信息
    HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator)

    //開始導(dǎo)入
    load.doBulkLoad(new Path(savePath), conn.getAdmin, table, regionLocator)
    spark.stop()
  }

  /**
   * 輸出初始化結(jié)果
   */
  override def printf(): Unit = {
  }

  /**
   * 刪除hdfs下的文件
   *
   * @param url 需要刪除的路徑
   */
  def delete_hdfspath(url: String) {
    val hdfs: FileSystem = FileSystem.get(new Configuration)
    val path: Path = new Path(url)
    if (hdfs.exists(path)) {
      val filePermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ)
      hdfs.delete(path, true)
    }
  }
}


打包及執(zhí)行命令

執(zhí)行命令:

spark-submit \
--master yarn-client \
--driver-memory 2G \
--executor-memory 4G \
--executor-cores 4 \
--num-executors 4 \
--conf spark.yarn.executor.memoryOverhead=8192 \
--class com.aaa.test.TestHbase \
/home/apps/lillcol/TestHbase.jar \

注:已有Hbase表“outPutTable”拂到,想要查看hbase數(shù)據(jù)除了hbase shell 還可以關(guān)聯(lián)hive表,
參考:Spark:DataFrame批量導(dǎo)入Hbase的兩種方式(HFile码泞、Hive)


異常和錯誤

非法循環(huán)引用

scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference

Exception in thread "main" scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving object InterfaceAudience
    at scala.reflect.internal.Symbols$Symbol$$anonfun$info$3.apply(Symbols.scala:1502)
    at scala.reflect.internal.Symbols$Symbol$$anonfun$info$3.apply(Symbols.scala:1500)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.reflect.internal.Symbols$Symbol.lock(Symbols.scala:546)
    at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1500)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:171)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
    at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
    at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:171)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.info(SynchronizedSymbols.scala:127)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.info(SynchronizedSymbols.scala:171)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$coreLookup$1(JavaMirrors.scala:992)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$lookupClass$1(JavaMirrors.scala:998)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$classToScala1(JavaMirrors.scala:1003)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToScala$1.apply(JavaMirrors.scala:980)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToScala$1.apply(JavaMirrors.scala:980)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$toScala$1.apply(JavaMirrors.scala:97)
    at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toScala$1.apply(TwoWayCaches.scala:38)
    at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
    at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
    at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toScala(TwoWayCaches.scala:33)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.toScala(JavaMirrors.scala:95)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.classToScala(JavaMirrors.scala:980)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy.<init>(JavaMirrors.scala:163)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy$.apply(JavaMirrors.scala:162)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy$.apply(JavaMirrors.scala:162)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$copyAnnotations(JavaMirrors.scala:683)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.load(JavaMirrors.scala:733)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.complete(JavaMirrors.scala:744)
    at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1514)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:171)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
    at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
    at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:171)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.info(SynchronizedSymbols.scala:127)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.info(SynchronizedSymbols.scala:171)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$coreLookup$1(JavaMirrors.scala:992)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$lookupClass$1(JavaMirrors.scala:998)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$classToScala1(JavaMirrors.scala:1003)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToScala$1.apply(JavaMirrors.scala:980)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToScala$1.apply(JavaMirrors.scala:980)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$toScala$1.apply(JavaMirrors.scala:97)
    at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toScala$1.apply(TwoWayCaches.scala:38)
    at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
    at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
    at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toScala(TwoWayCaches.scala:33)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.toScala(JavaMirrors.scala:95)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.classToScala(JavaMirrors.scala:980)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy.<init>(JavaMirrors.scala:163)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy$.apply(JavaMirrors.scala:162)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy$.apply(JavaMirrors.scala:162)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$copyAnnotations(JavaMirrors.scala:683)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.load(JavaMirrors.scala:733)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$typeParams$1.apply(SynchronizedSymbols.scala:142)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$typeParams$1.apply(SynchronizedSymbols.scala:133)
    at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
    at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$8.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:168)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.typeParams(SynchronizedSymbols.scala:132)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$8.typeParams(SynchronizedSymbols.scala:168)
    at scala.reflect.internal.Types$NoArgsTypeRef.typeParams(Types.scala:1926)
    at scala.reflect.internal.Types$NoArgsTypeRef.isHigherKinded(Types.scala:1925)
    at scala.reflect.internal.transform.UnCurry$class.scala$reflect$internal$transform$UnCurry$$expandAlias(UnCurry.scala:22)
    at scala.reflect.internal.transform.UnCurry$$anon$2.apply(UnCurry.scala:26)
    at scala.reflect.internal.transform.UnCurry$$anon$2.apply(UnCurry.scala:24)
    at scala.collection.immutable.List.loop$1(List.scala:173)
    at scala.collection.immutable.List.mapConserve(List.scala:189)
    at scala.reflect.internal.tpe.TypeMaps$TypeMap.mapOver(TypeMaps.scala:115)
    at scala.reflect.internal.transform.UnCurry$$anon$2.apply(UnCurry.scala:46)
    at scala.reflect.internal.transform.Transforms$class.transformedType(Transforms.scala:43)
    at scala.reflect.internal.SymbolTable.transformedType(SymbolTable.scala:16)
    at scala.reflect.internal.Types$TypeApiImpl.erasure(Types.scala:225)
    at scala.reflect.internal.Types$TypeApiImpl.erasure(Types.scala:218)
    at o?rg.apache.spark.sql.catalyst.ScalaReflection$class.getClassNameFromType(ScalaReflection.scala:853)
    at org.apache.spark.sql.catalyst.ScalaReflection$.getClassNameFromType(ScalaReflection.scala:39)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:78)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:65)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:64)
    at org.apache.spark.sql.catalyst.ScalaReflection$.dataTypeFor(ScalaReflection.scala:62)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:63)
    at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
    at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
    at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
    at com.aaa.TestHbase$.main(TestHbase.scala:40)
    at com.aaa.TestHbase.main(TestHbase.scala)

這個錯誤的意思是非法的循環(huán)引用兄旬,
目前我沒搞明白我循環(huán)引用了啥,不過大概摸清了出現(xiàn)異常的情況。
異常出現(xiàn)的代碼塊:

val result1 : RDD[(ImmutableBytesWritable, Seq[KeyValue])] = TM_D
      .map(row => {
        var kvlist: Seq[KeyValue] = List()
        var kv: KeyValue = null
        val cf: Array[Byte] = clounmFamily.getBytes //列族
        val rowKey = Bytes.toBytes(row.getAs[Int]("ID"))
        val immutableRowKey = new ImmutableBytesWritable(rowKey)
        for (i <- 1 to (columnsName.length - 1)) {
          //          將rdd轉(zhuǎn)換成HFile需要的格式,
          //          我們上面定義了Hfile的key是ImmutableBytesWritable,
          //          那么我們定義的RDD也是要以ImmutableBytesWritable的實例為key
          kv = new KeyValue(rowKey, cf, Bytes.toBytes(columnsName(i)), Bytes.toBytes(row.get(i) + ""))
          //          將新的kv加在kvlist后面(不能反 需要整體有序)
          kvlist = kvlist :+ kv
        }
        //(rowKey, kvlist.length)-----1
        //(rowKey, kvlist)-----2
        //(immutableRowKey, kvlist.length)-----3
        //(immutableRowKey, kvlist)-----4
      })

如上面的代碼所示:
如果最后的返回值是2领铐、3悯森、4中的一個,那么就會報這個非法循環(huán)引用的錯誤绪撵,
他們的共同點是都是對象(雖然scala萬物皆可對象瓢姻,但是還是沒搞懂);
如果返回的是1則沒有問題音诈,但是這并不是我們要的答案幻碱。

網(wǎng)上一堆說scala版本問題,JDK版本問題,廣播變量等都沒有解決细溅,只能自己慢慢搗鼓褥傍。

通過觀察數(shù)據(jù)類型發(fā)現(xiàn)TM_D是DataFrame/Dataset[Row]
進(jìn)行map操作后還是DataFrame/Dataset[Row],但是編譯期間沒有報錯喇聊;
有可能因為是DataFrame/Dataset[Row]map操作有我不知道的限制恍风,所以果斷DataFrame/Dataset[Row]轉(zhuǎn)RDD試試。
嗯......~誓篱,還真的給我試出來了朋贬,運氣成分,我現(xiàn)在也不知道啥原因燕鸽,也許是天選之子吧兄世。
關(guān)于轉(zhuǎn)換的操作可以參考我的博客Spark 讀寫數(shù)據(jù)啼辣、抽象轉(zhuǎn)換 拾遺
修改后的代碼(未優(yōu)化):

   val result1: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = TM_D
      .rdd  //轉(zhuǎn)換rdd
      .map(row => {
        var kvlist: Seq[KeyValue] = List()
        var kv: KeyValue = null
        val cf: Array[Byte] = clounmFamily.getBytes //列族
        val rowKey = Bytes.toBytes(row.getAs[Int]("ID"))
        val immutableRowKey = new ImmutableBytesWritable(rowKey)
        for (i <- 1 to (columnsName.length - 1)) {
          kv = new KeyValue(rowKey, cf, Bytes.toBytes(columnsName(i)), Bytes.toBytes(row.get(i) + ""))
          kvlist = kvlist :+ kv
        }
        (immutableRowKey, kvlist)
      })

key排序

Added a key not lexically larger than previous

Caused by: java.io.IOException: Added a key not lexically larger than previous. Current cell = \x00\x00\xE4h/cf:CN_TAG/1568255140650/Put/vlen=3/seqid=0, lastCell = \x00\x00\xE4h/cf:FIRST_DT/1568255140650/Put/vlen=6/seqid=0
        at org.apache.hadoop.hbase.io.hfile.HFileWriterImpl.checkKey(HFileWriterImpl.java:245)
        at org.apache.hadoop.hbase.io.hfile.HFileWriterImpl.append(HFileWriterImpl.java:731)
        at org.apache.hadoop.hbase.regionserver.StoreFileWriter.append(StoreFileWriter.java:234)
        at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:344)
        at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:231)
        at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.write(SparkHadoopWriter.scala:356)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:130)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:127)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1415)
        at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:139)

Hbase查詢是根據(jù)rowkey進(jìn)行查詢的啊研,并且rowkey是有序,某種程度上來說rowkey就是一個索引鸥拧,這是Hbase查詢高效的一個原因党远。
一開始代碼中只是對key排序,在舊的版本測試沒問題富弦,但是2.0.2出問題了沟娱。
此處報錯的意思是當(dāng)前列CN_TAG 比 上一列FIRST_DT小,
猜測同一個key下clounm也需要有序腕柜,
于是對key济似,clounm排序解決了這個問題。
(之前的博客中應(yīng)該是因為一開始對列排了序 所以沒出問題)盏缤。

解決方法:

 hfileRDD
      .sortBy(x => (x._1, x._2.getKeyString), true) //要保持 整體有序
      .saveAsNewAPIHadoopFile(savePath,
        classOf[ImmutableBytesWritable],
        classOf[KeyValue],
        classOf[HFileOutputFormat2],
        conf)

HBase 根目錄不存在

java.util.concurrent.ExecutionException: org.apache.phoenix.shaded.org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /hbase/hbaseid
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at org.apache.hadoop.hbase.client.ConnectionImplementation.retrieveClusterId(ConnectionImplementation.java:549)
        at org.apache.hadoop.hbase.client.ConnectionImplementation.<init>(ConnectionImplementation.java:287)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:219)
        at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:114)
        at com.aaa.TestHbase$.main(TestHbase.scala:99)
        at com.aaa.TestHbase.main(TestHbase.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

默認(rèn)為:/hbase
如果修改了需要指定砰蠢,否則找不到該路徑

修改方式有兩個:


  • 修改配置文件bhase-site.xml
<configuration>
  <property>
    <name>hbase.unsafe.stream.capability.enforce</name>
    <value>false</value>
  </property>

  <property>
    <name>zookeeper.znode.parent</name>
    <value>/hbase</value>
  </property>
</configuration>
  • 代碼中設(shè)置參數(shù)
    代碼中執(zhí)行要使用此方法
conf.set("hbase.unsafe.stream.capability.enforce", "false") //hbase  根目錄設(shè)定
conf.set("zookeeper.znode.parent", "/hbase") //設(shè)置成真實的值

一個family下超過了默認(rèn)的32個hfile

Exception in thread "main" java.io.IOException: Trying to load more than 32 hfiles to one family of one region
        at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.doBulkLoad(LoadIncrementalHFiles.java:288)
        at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.run(LoadIncrementalHFiles.java:842)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
        at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.main(LoadIncrementalHFiles.java:847)

解決辦法有兩個:

  • 修改配置文件bhase-site.xml
  <property>
    <name>hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily</name>
    <value>400</value>
  </property>
  • 代碼中設(shè)置參數(shù)
    代碼中執(zhí)行要使用此方法
conf.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", "400")

內(nèi)存溢出

19/09/17 15:25:17 ERROR YarnScheduler: 
Lost executor 8 on slave2: Container killed by YARN for exceeding memory limits. 
11.0 GB of 11 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

excutor 內(nèi)存不夠,這個就要根據(jù)自己的代碼進(jìn)行調(diào)整了唉铜,
加大內(nèi)存總量不一定有用台舱,也不存在萬能的方法,但是可以根據(jù)下面的思路去嘗試潭流。

  1. spark.yarn.executor.memoryOverhead設(shè)置為最大值竞惋,可以考慮一下4096柜去。這個數(shù)值一般都是2的次冪。
  2. 加大rdd拆宛、DataFrame分區(qū)嗓奢,像我repartition(200),前提是數(shù)據(jù)是均勻分布的浑厚,否則可能會出現(xiàn)數(shù)據(jù)傾斜蔓罚。
  3. 減少將spark.executor.core如:從8設(shè)置為4。將core的個數(shù)調(diào)小瞻颂。
  4. 增加將spark.executor.memory如:從8g設(shè)置為12g豺谈。將內(nèi)存調(diào)大。
  • spark.yarn.executor.memoryOverhead計算方式
E = max(MEMORY_OVERHEAD_MIN,MEMORY_OVERHEAD_FACTOR*executorMemory)

MEMORY_OVERHEAD_FACTOR默認(rèn)為0.1;
executorMemory為設(shè)置的executor-memory;
MEMORY_OVERHEAD_MIN默認(rèn)為384m;
參數(shù)MEMORY_OVERHEAD_FACTOR和MEMORY_OVERHEAD_MIN一般不能直接修改贡这,是Spark代碼中直接寫死的

  • executor可用內(nèi)存的計算方式:
E = (driver-memory+spark.yarn.executor.memoryOverhead)

本文為原創(chuàng)文章茬末,轉(zhuǎn)載請注明出處!8墙谩丽惭!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市辈双,隨后出現(xiàn)的幾起案子责掏,更是在濱河造成了極大的恐慌,老刑警劉巖湃望,帶你破解...
    沈念sama閱讀 212,454評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件换衬,死亡現(xiàn)場離奇詭異,居然都是意外死亡证芭,警方通過查閱死者的電腦和手機瞳浦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來废士,“玉大人叫潦,你說我怎么就攤上這事」傧酰” “怎么了矗蕊?”我有些...
    開封第一講書人閱讀 157,921評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長氢架。 經(jīng)常有香客問我傻咖,道長,這世上最難降的妖魔是什么达箍? 我笑而不...
    開封第一講書人閱讀 56,648評論 1 284
  • 正文 為了忘掉前任没龙,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘硬纤。我一直安慰自己解滓,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,770評論 6 386
  • 文/花漫 我一把揭開白布筝家。 她就那樣靜靜地躺著洼裤,像睡著了一般。 火紅的嫁衣襯著肌膚如雪溪王。 梳的紋絲不亂的頭發(fā)上腮鞍,一...
    開封第一講書人閱讀 49,950評論 1 291
  • 那天,我揣著相機與錄音莹菱,去河邊找鬼移国。 笑死,一個胖子當(dāng)著我的面吹牛道伟,可吹牛的內(nèi)容都是我干的迹缀。 我是一名探鬼主播,決...
    沈念sama閱讀 39,090評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼蜜徽,長吁一口氣:“原來是場噩夢啊……” “哼祝懂!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起拘鞋,我...
    開封第一講書人閱讀 37,817評論 0 268
  • 序言:老撾萬榮一對情侶失蹤砚蓬,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后盆色,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體灰蛙,經(jīng)...
    沈念sama閱讀 44,275評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,592評論 2 327
  • 正文 我和宋清朗相戀三年傅事,在試婚紗的時候發(fā)現(xiàn)自己被綠了缕允。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,724評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡蹭越,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出教届,到底是詐尸還是另有隱情响鹃,我是刑警寧澤,帶...
    沈念sama閱讀 34,409評論 4 333
  • 正文 年R本政府宣布案训,位于F島的核電站买置,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏强霎。R本人自食惡果不足惜忿项,卻給世界環(huán)境...
    茶點故事閱讀 40,052評論 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧轩触,春花似錦寞酿、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至榨为,卻和暖如春惨好,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背随闺。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評論 1 266
  • 我被黑心中介騙來泰國打工日川, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人矩乐。 一個月前我還...
    沈念sama閱讀 46,503評論 2 361
  • 正文 我出身青樓逗鸣,卻偏偏與公主長得像,于是被迫代替她去往敵國和親绰精。 傳聞我的和親對象是個殘疾皇子撒璧,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,627評論 2 350

推薦閱讀更多精彩內(nèi)容