spark dataframe操作大全

取某一列的分位數(shù)

#Python:
df.approxQuantile("x", [0.5], 0.25)
#Scala:
df.stat.approxQuantile("x", Array(0.5), 0.25)
#第一個(gè)參數(shù)是列名,第二個(gè)參數(shù)為分位數(shù)尚镰,第三個(gè)參數(shù)為精度, 0表示完全沒有誤差

取出a_df中有鹰椒,b_df中沒有的行

a_df.except(b_df)

spark json array string

//假設(shè)string長(zhǎng)成下面這樣子
        import spark.implicits._
        import org.apache.spark.sql.types._
        import org.apache.spark.sql.functions._
        val egDF = Seq("""[{"text": "哈哈哈哈哈", "no": "hehe"},{"text": "嘻嘻嘻嘻", "no": "2"}]""",
            """[{"text": "噢噢噢噢", "no": "1"},{"text": "嘎嘎嘎嘎", "no": "2"}]""").toDF("value")
        val schema = ArrayType(StructType(
            Seq(StructField("text", StringType),StructField("no", StringType))
        ))
        val jsonDF = egDF.withColumn("text_no_arr", from_json(col("value"), schema)).
            withColumn("text_no", explode(col("text_no_arr"))).select(col("text_no")("text"))

// example2
        val egDF = Seq("""{"text": "哈哈哈哈哈", "no": "hehe"}""",
            """{"text": "噢噢噢噢", "no": "1"}""").toDF("value")
        val schema = StructType(
            Seq(StructField("text", StringType),StructField("no", StringType))
        )
        val jsonDF = egDF.withColumn("text_no_arr", from_json(col("value"), schema)).
            select(col("text_no_arr")("text"))

spark sql在讀取hive表數(shù)據(jù)結(jié)構(gòu)的時(shí)候經(jīng)常會(huì)遇到struct類型的列票灰,這種情況下對(duì)于后續(xù)數(shù)據(jù)處理是非常不方便的蔬墩,下面給出一個(gè)將struct類型轉(zhuǎn)為spark里面自定義的class類型的例子

//struct類型的字段處理成case class類型 
import spark.implicits._
case class Location(lat: Double, lon: Double)
val eg_df = Seq((10, Location(35, 25)), (20, Location(45, 35))).toDF
// 方法一
val eg_ds = eg_df.map { row =>
        (row: @unchecked) match {
            case Row(a: Int, Row(b: Double, c: Double)) => (a, Location(b, c))
        }
  }
// 方法二
val eg_ds2 = eg_df.map(row => {
        (row.getInt(0), Location(row.getStruct(1).getDouble(0), row.getStruct(1).getDouble(1)))
    })

spark添加一列自增的id斤贰,使用rdd中的zipWithIndex方法


image.png
  // 在原Schema信息的基礎(chǔ)上添加一列 “id”信息
    val schema: StructType = dataframe.schema.add(StructField("id", LongType))
    // DataFrame轉(zhuǎn)RDD 然后調(diào)用 zipWithIndex
    val dfRDD: RDD[(Row, Long)] = dataframe.rdd.zipWithIndex()
    val rowRDD: RDD[Row] = dfRDD.map(tp => Row.merge(tp._1, Row(tp._2)))
    // 將添加了索引的RDD 轉(zhuǎn)化為DataFrame
    val df2 = spark.createDataFrame(rowRDD, schema)
    df2.show()

scala class和json string相互轉(zhuǎn)換

case class DeviceSeqObj(device_id: String,
                            text_id_click_seq: String,
                            text_tag_click_seq: String
                           )

val pushClickSeqFeatureDF = pushClickSeqFeature(jobConfig, spark).as[DeviceSeqObj].
            map(row => {
                val device_id = row.device_id
                val gson = new Gson
                val device_push_seq_feature = gson.toJson(row, classOf[DeviceSeqObj])

                (device_id, device_push_seq_feature)
            }).toDF("device_id", "feature")

val df = pushClickSeqFeatureDF.map(row => {
            val feature = row.getString(row.fieldIndex("feature"))
            val gson = new Gson
            val device_push_seq_feature = gson.fromJson(feature, classOf[DeviceSeqObj])

            device_push_seq_feature
        })

spark dag圖計(jì)算優(yōu)化可能會(huì)改變執(zhí)行順序

val opPushDF = deviceInfo.join(broadcast(opTextDS), "explore"). 
withColumn("isSelected", isSelectedUDF(col("flow_ratio"))).
where(col("isSelected") === 1)
由于udf依賴的列只在opTextDS中出現(xiàn)篡帕,因此代碼執(zhí)行過程中會(huì)先在opTextDS中選出一部分行殖侵,然后和deviceInfo進(jìn)行join
而事實(shí)上,我們期望的執(zhí)行的效果是對(duì)于device+text的pair對(duì)進(jìn)行隨機(jī)選擇镰烧。
這個(gè)種情況下可以冗余一個(gè)deviceInfo中的字段拢军,確保udf在join之后生效
    val opPushDF = deviceInfo.join(broadcast(opTextDS), "explore").
      withColumn("isSelected", isSelectedUDF(col("device_id"),col("flow_ratio"))).
      where(col("isSelected") === 1)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市怔鳖,隨后出現(xiàn)的幾起案子茉唉,更是在濱河造成了極大的恐慌,老刑警劉巖结执,帶你破解...
    沈念sama閱讀 221,695評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件度陆,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡献幔,警方通過查閱死者的電腦和手機(jī)懂傀,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蜡感,“玉大人蹬蚁,你說我怎么就攤上這事恃泪。” “怎么了犀斋?”我有些...
    開封第一講書人閱讀 168,130評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵贝乎,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我叽粹,道長(zhǎng)览效,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,648評(píng)論 1 297
  • 正文 為了忘掉前任球榆,我火速辦了婚禮朽肥,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘持钉。我一直安慰自己衡招,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,655評(píng)論 6 397
  • 文/花漫 我一把揭開白布每强。 她就那樣靜靜地躺著始腾,像睡著了一般。 火紅的嫁衣襯著肌膚如雪空执。 梳的紋絲不亂的頭發(fā)上浪箭,一...
    開封第一講書人閱讀 52,268評(píng)論 1 309
  • 那天,我揣著相機(jī)與錄音辨绊,去河邊找鬼奶栖。 笑死,一個(gè)胖子當(dāng)著我的面吹牛门坷,可吹牛的內(nèi)容都是我干的宣鄙。 我是一名探鬼主播,決...
    沈念sama閱讀 40,835評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼默蚌,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼冻晤!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起绸吸,我...
    開封第一講書人閱讀 39,740評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤鼻弧,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后锦茁,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體攘轩,經(jīng)...
    沈念sama閱讀 46,286評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,375評(píng)論 3 340
  • 正文 我和宋清朗相戀三年码俩,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了撑刺。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,505評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡握玛,死狀恐怖够傍,靈堂內(nèi)的尸體忽然破棺而出甫菠,到底是詐尸還是另有隱情,我是刑警寧澤冕屯,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布寂诱,位于F島的核電站,受9級(jí)特大地震影響安聘,放射性物質(zhì)發(fā)生泄漏痰洒。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,873評(píng)論 3 333
  • 文/蒙蒙 一浴韭、第九天 我趴在偏房一處隱蔽的房頂上張望丘喻。 院中可真熱鬧,春花似錦念颈、人聲如沸泉粉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)嗡靡。三九已至,卻和暖如春窟感,著一層夾襖步出監(jiān)牢的瞬間讨彼,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工柿祈, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留哈误,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,921評(píng)論 3 376
  • 正文 我出身青樓躏嚎,卻偏偏與公主長(zhǎng)得像黑滴,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子紧索,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,515評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容