spark基礎(chǔ)數(shù)據(jù)操作

主要涉及 DataFrame 相關(guān)操作
RDD相關(guān)操作

  • File

    // 讀取文件
    val spark = SparkSession.builder().appName("Example").master("local[*]").getOrCreate()
    val sc = spark.sparkContext
    
    val data: Dataset[String] = spark.read.textFile("data")
    val df: DataFrame = data.toDF()
    val df0: DataFrame = spark.read.csv("data.csv")
    val df1: DataFrame = spark.read.format("csv")
        .option("sep", ";")
        .option("inferSchema", "true")
        .option("header", "true")
        .load("examples/src/main/resources/people.csv")
    val df2: DataFrame = spark.read.json("data.json")
    val df3: DataFrame = spark.read.format("json").load("data.json")
    df0.write.format("csv").save("data.csv")
    
  • data

    // sc.parallelize / makeRDD 創(chuàng)建生成 rdd 數(shù)據(jù)
    // DataFrame == Dataset[Row]
    import org.apache.spark.ml.linalg.{Vectors, Vector}
    import org.apache.spark.sql.{Row, Dataset, DataFrame}
    val data: Dataset[Row] = df.select("a").distinct().limit(3)
    
    val data: RDD[Int] = sc.makeRDD(Seq(1, 2, 3))
    val data: RDD[Int] = sc.parallelize(Seq(1, 2, 3))
    val data: DataFrame = sc.parallelize(Seq(Tuple1.apply(28), Tuple1.apply(26))).toDF("a")
    
    val denseVector: Vector = Vectors.dense(1.0, 0.0, 3.0)
    val df: DataFrame = spark.createDataFrame(Seq(
            (0, "a", 18.0, Vectors.dense(1.0, 0.1, -8.0)),
            (1, "b", 5.0, Vectors.dense(2.0, 1.0, -4.0)),
            (2, "c", 9.0, Vectors.dense(4.0, 10.0, 8.0))
        )).toDF("id", "cate", "hour", "features")
    
  • na

    對缺省值進(jìn)行處理

    df.na.fill("0", Seq("a","b"))
    df.na.fill(Map(("a"->"0"),("b","1"),("c","2"),("d","3")))
    df.na.drop(4)  // 過濾掉有效值小于4個(gè)的行
    df.na.drop(Seq("1", "3"))
    df.na.drop(2,Seq("1", "3", "4"))
    
  • withColumn

    特征處理,添加新的列

      import org.apache.spark.sql.functions
      df.withColumn("a", lit(12))        // 增加常量 lit 
      df.withColumn("b", $"a"*2)    // 根據(jù)已有列進(jìn)行變換
      df.withColumn("b", col("a")*2)    // 根據(jù)已有列進(jìn)行變換
    
      // 數(shù)據(jù)分割及合并
      val df1 = df.withColumn("ab", split(col("t"), ",")).select(
                              col("ab").getItem(0).as("a"),
                              col("ab").getItem(1).as("b")
                  ).drop("ab")
      val df2 = df.withColumn("ab", split(col("t"), ","))
                            .withColumn("a", col("ab").getItem(0))
                              .withColumn("b", col("ab").getItem(1))
                              .drop("ab")
      val df3 = df.withColumn("ab", split(col("t"),",")).withColumn("k", explode(col("ab")))   // 一行轉(zhuǎn)多列
      val df4 = df.groupBy("n").withColumn("t", collect_list(when(col("a") === 1, col("a")).otherwise(lit(null)))
      val df5 = df.withColumn("t", concat(col("a"), ",", col("b")))
    
  • 統(tǒng)計(jì)

    添加相關(guān)統(tǒng)計(jì)特征

    import org.apache.spark.sql.functions.{format_number, min, max, avg, stddev, sum}
    val n = df.count()
    val df1 = df.groupBy(col("t")).count().orderBy(col("t")).withColumn("p", col("count")/n)  // 百分比
    
    val tmp = df.groupBy(col("t"))
                .agg(count(lit(1)).as("t-c"),
                    format_number(avg(col("t")), 2).as("t-avg"),
                    format_number(stddev(col("t")), 2).as("t-std"), 
                    min(col("t")).as("t-min"), 
                    max(col("t")).as("t-max"), 
                    max(col("t")).as("t-sum"))
                .na.fill(0)
    val df2 = df.join(tmp, Seq("t"), "left")
    
  • 窗口

    窗口函數(shù)相關(guān)

    import org.apache.spark.sql.expressions.Window
    val df1 = df.withColumn("t", row_number().over(Window.partitionBy(col("a")).orderBy(col("b").desc)))
    val df2 = df.withColumn("t", count(lit(1)).over(Window.partitionBy(col("a")).orderBy(col("b").desc)))
    val df3 = df.withColumn("t", collect_list(col("c")).over(Window.partitionBy("a").orderBy(col("b")).rowsBetween(-100, -1)))
    
  • Row

    對rdd數(shù)據(jù)進(jìn)行處理,或通過row來獲得dataFrame表的值

    import org.apache.spark.sql.Row
    import org.apache.spark.ml.linalg.{DenseVector, Vectors}
    df.rdd.map({case Row(A:String, B:Int) => (A, B)})
    df.map(row => {
            (row.getAs[Int]("a"), 
             row.getAs[Double]("b"), 
             Vectors.dense(row.getAs[Seq[Double]]("c").toArray
            )
        })
    
  • udf

    import org.apache.spark.sql.expressions.{UserDefinedFunction, Window}
    val plus: UserDefinedFunction = udf((a: Int, b: Int) => a + b)
    val subStr = udf((c:String, i:Int) => c.split(" ")(i))
    val df1 = df.withColumn("a+b", plus(col("a"), col("b")))
                .withColumn("c-0", subStr(col("c"), lit(0)))
    
    val sortUdf: UserDefinedFunction = udf((rows: Seq[Row]) => {
      rows.map { case Row(a: Int, b: Int) => (a, b) }
        .sortBy { case (_, b) => b }
        .map { case (a, _) => a }
    })
    
  • 特征處理

    import org.apache.spark.ml.linalg.{Vector, Vectors}
    import org.apache.spark.ml.feature._
    
    val df: DataFrame = spark.createDataFrame(Seq(
            (0, "a", 18.0, Vectors.dense(1.0, 0.1, -8.0)),
            (1, "b", 5.0, Vectors.dense(2.0, 1.0, -4.0)),
            (2, "c", 9.0, Vectors.dense(4.0, 10.0, 8.0))
    )).toDF("id", "cate", "hour", "features")
    
    // 二值化
    val binarizer: Binarizer = new Binarizer().setInputCol("features").setOutputCol("binFeature").setThreshold(0.5)
    val df1: DataFrame = binarizer.transform(df)
    
    // 分桶, [a, b), 對單獨(dú)值或vector進(jìn)行處理
    val bucket: Array[Double] = Array(Double.NegativeInfinity, 6.0, 12.0, Double.PositiveInfinity)
    val bucketizer: Bucketizer = new Bucketizer().setInputCol("hour").setOutputCol("bucketHour").setSplits(bucket)
    val df2: DataFrame = bucketizer.transform(df1)
    
    // 絕對值最大最小化, 歸一化到[-1, 1], 對特征處理   x/max(abs(x))
    val maxAbs: MaxAbsScaler = new MaxAbsScaler().setInputCol("features").setOutputCol("maxAbs")
    val df3: DataFrame = maxAbs.fit(df2).transform(df2)
    
    // 最大最小化  ml.lina's.Vector, 對特征處理   (x-min)/(max-min)
    val minMax: MinMaxScaler = new MinMaxScaler().setInputCol("features").setOutputCol("minMax")
    val df4: DataFrame = minMax.fit(df3).transform(df3)
    
    // 標(biāo)準(zhǔn)化  ml.linalg.Vector 對特征處理   (x-mean)/std     .setWithStd(true) .setWithMean(false)
    val ssc: StandardScaler = new StandardScaler().setInputCol("features").setOutputCol("ssc")
    val df5: DataFrame = ssc.fit(df4).transform(df4)
    
    // 正則化   對樣本處理  x / pn
    val norm: Normalizer = new Normalizer().setInputCol("features").setOutputCol("normFeatures").setP(1.0)
    val df6: DataFrame = norm.transform(df5)
    
    // IndexToString
    val s2i: StringIndexer = new StringIndexer().setInputCol("cate").setOutputCol("cid")
    val df7: DataFrame = s2i.fit(df6).transform(df6)
    val i2s: IndexToString = new IndexToString().setInputCol("cid").setOutputCol("cid2O")
    val df8: DataFrame = i2s.transform(df7)
    
    // OneHotEncoder
    val encoder: OneHotEncoder = new OneHotEncoder().setInputCol("cid").setOutputCol("cidVec")
    val df9: DataFrame = encoder.transform(df8)
    
    // degree=2, (x, y) => (x, xx, y, xy, yy)
    val poly: PolynomialExpansion = new PolynomialExpansion().setInputCol("features").setOutputCol("poly").setDegree(2)
    val df10: DataFrame = poly.transform(df9)
    
    // 分箱
    val qdisc: QuantileDiscretizer = new QuantileDiscretizer().setInputCol("hour").setOutputCol("hqtd").setNumBuckets(3)
    val df11: DataFrame = qdisc.fit(df10).transform(df10)
    df11.show(false)
    
    // sql
    import org.apache.spark.ml.feature.SQLTransformer
    val dfq: DataFrame = spark.createDataFrame(Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")
    val sqlTrans: SQLTransformer = new SQLTransformer().setStatement("SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
    val dfq1: DataFrame = sqlTrans.transform(dfq)
    
    // VectorAssembler   合并特征
    val assembler: VectorAssembler = new VectorAssembler()
            .setInputCols(Array("v1", "v2", "v3", "v4"))
            .setOutputCol("features")
    val dfq2: DataFrame = assembler.transform(dfq1)
    dfq2.show(false)
    
  • Machine Learning

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蟆盹,一起剝皮案震驚了整個(gè)濱河市吹害,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌妆够,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,383評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異书斜,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)酵使,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評論 3 385
  • 文/潘曉璐 我一進(jìn)店門荐吉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人口渔,你說我怎么就攤上這事样屠。” “怎么了缺脉?”我有些...
    開封第一講書人閱讀 157,852評論 0 348
  • 文/不壞的土叔 我叫張陵痪欲,是天一觀的道長。 經(jīng)常有香客問我攻礼,道長业踢,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,621評論 1 284
  • 正文 為了忘掉前任礁扮,我火速辦了婚禮知举,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘太伊。我一直安慰自己雇锡,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,741評論 6 386
  • 文/花漫 我一把揭開白布僚焦。 她就那樣靜靜地躺著锰提,像睡著了一般。 火紅的嫁衣襯著肌膚如雪芳悲。 梳的紋絲不亂的頭發(fā)上立肘,一...
    開封第一講書人閱讀 49,929評論 1 290
  • 那天,我揣著相機(jī)與錄音名扛,去河邊找鬼赛不。 笑死,一個(gè)胖子當(dāng)著我的面吹牛罢洲,可吹牛的內(nèi)容都是我干的踢故。 我是一名探鬼主播文黎,決...
    沈念sama閱讀 39,076評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼殿较!你這毒婦竟也來了耸峭?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,803評論 0 268
  • 序言:老撾萬榮一對情侶失蹤淋纲,失蹤者是張志新(化名)和其女友劉穎劳闹,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體洽瞬,經(jīng)...
    沈念sama閱讀 44,265評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡本涕,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,582評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了伙窃。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片菩颖。...
    茶點(diǎn)故事閱讀 38,716評論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖为障,靈堂內(nèi)的尸體忽然破棺而出晦闰,到底是詐尸還是另有隱情,我是刑警寧澤鳍怨,帶...
    沈念sama閱讀 34,395評論 4 333
  • 正文 年R本政府宣布呻右,位于F島的核電站,受9級特大地震影響鞋喇,放射性物質(zhì)發(fā)生泄漏声滥。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,039評論 3 316
  • 文/蒙蒙 一侦香、第九天 我趴在偏房一處隱蔽的房頂上張望醒串。 院中可真熱鬧,春花似錦鄙皇、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至膘壶,卻和暖如春错蝴,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背颓芭。 一陣腳步聲響...
    開封第一講書人閱讀 32,027評論 1 266
  • 我被黑心中介騙來泰國打工顷锰, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人亡问。 一個(gè)月前我還...
    沈念sama閱讀 46,488評論 2 361
  • 正文 我出身青樓官紫,卻偏偏與公主長得像肛宋,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子束世,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,612評論 2 350

推薦閱讀更多精彩內(nèi)容