主要涉及 DataFrame 相關(guān)操作
RDD相關(guān)操作
-
File
// 讀取文件 val spark = SparkSession.builder().appName("Example").master("local[*]").getOrCreate() val sc = spark.sparkContext val data: Dataset[String] = spark.read.textFile("data") val df: DataFrame = data.toDF() val df0: DataFrame = spark.read.csv("data.csv") val df1: DataFrame = spark.read.format("csv") .option("sep", ";") .option("inferSchema", "true") .option("header", "true") .load("examples/src/main/resources/people.csv") val df2: DataFrame = spark.read.json("data.json") val df3: DataFrame = spark.read.format("json").load("data.json") df0.write.format("csv").save("data.csv")
-
data
// sc.parallelize / makeRDD 創(chuàng)建生成 rdd 數(shù)據(jù) // DataFrame == Dataset[Row] import org.apache.spark.ml.linalg.{Vectors, Vector} import org.apache.spark.sql.{Row, Dataset, DataFrame} val data: Dataset[Row] = df.select("a").distinct().limit(3) val data: RDD[Int] = sc.makeRDD(Seq(1, 2, 3)) val data: RDD[Int] = sc.parallelize(Seq(1, 2, 3)) val data: DataFrame = sc.parallelize(Seq(Tuple1.apply(28), Tuple1.apply(26))).toDF("a") val denseVector: Vector = Vectors.dense(1.0, 0.0, 3.0) val df: DataFrame = spark.createDataFrame(Seq( (0, "a", 18.0, Vectors.dense(1.0, 0.1, -8.0)), (1, "b", 5.0, Vectors.dense(2.0, 1.0, -4.0)), (2, "c", 9.0, Vectors.dense(4.0, 10.0, 8.0)) )).toDF("id", "cate", "hour", "features")
-
na
對缺省值進(jìn)行處理
df.na.fill("0", Seq("a","b")) df.na.fill(Map(("a"->"0"),("b","1"),("c","2"),("d","3"))) df.na.drop(4) // 過濾掉有效值小于4個(gè)的行 df.na.drop(Seq("1", "3")) df.na.drop(2,Seq("1", "3", "4"))
-
withColumn
特征處理,添加新的列
import org.apache.spark.sql.functions df.withColumn("a", lit(12)) // 增加常量 lit df.withColumn("b", $"a"*2) // 根據(jù)已有列進(jìn)行變換 df.withColumn("b", col("a")*2) // 根據(jù)已有列進(jìn)行變換 // 數(shù)據(jù)分割及合并 val df1 = df.withColumn("ab", split(col("t"), ",")).select( col("ab").getItem(0).as("a"), col("ab").getItem(1).as("b") ).drop("ab") val df2 = df.withColumn("ab", split(col("t"), ",")) .withColumn("a", col("ab").getItem(0)) .withColumn("b", col("ab").getItem(1)) .drop("ab") val df3 = df.withColumn("ab", split(col("t"),",")).withColumn("k", explode(col("ab"))) // 一行轉(zhuǎn)多列 val df4 = df.groupBy("n").withColumn("t", collect_list(when(col("a") === 1, col("a")).otherwise(lit(null))) val df5 = df.withColumn("t", concat(col("a"), ",", col("b")))
-
統(tǒng)計(jì)
添加相關(guān)統(tǒng)計(jì)特征
import org.apache.spark.sql.functions.{format_number, min, max, avg, stddev, sum} val n = df.count() val df1 = df.groupBy(col("t")).count().orderBy(col("t")).withColumn("p", col("count")/n) // 百分比 val tmp = df.groupBy(col("t")) .agg(count(lit(1)).as("t-c"), format_number(avg(col("t")), 2).as("t-avg"), format_number(stddev(col("t")), 2).as("t-std"), min(col("t")).as("t-min"), max(col("t")).as("t-max"), max(col("t")).as("t-sum")) .na.fill(0) val df2 = df.join(tmp, Seq("t"), "left")
-
窗口
窗口函數(shù)相關(guān)
import org.apache.spark.sql.expressions.Window val df1 = df.withColumn("t", row_number().over(Window.partitionBy(col("a")).orderBy(col("b").desc))) val df2 = df.withColumn("t", count(lit(1)).over(Window.partitionBy(col("a")).orderBy(col("b").desc))) val df3 = df.withColumn("t", collect_list(col("c")).over(Window.partitionBy("a").orderBy(col("b")).rowsBetween(-100, -1)))
-
Row
對rdd數(shù)據(jù)進(jìn)行處理,或通過row來獲得dataFrame表的值
import org.apache.spark.sql.Row import org.apache.spark.ml.linalg.{DenseVector, Vectors} df.rdd.map({case Row(A:String, B:Int) => (A, B)}) df.map(row => { (row.getAs[Int]("a"), row.getAs[Double]("b"), Vectors.dense(row.getAs[Seq[Double]]("c").toArray ) })
-
udf
import org.apache.spark.sql.expressions.{UserDefinedFunction, Window} val plus: UserDefinedFunction = udf((a: Int, b: Int) => a + b) val subStr = udf((c:String, i:Int) => c.split(" ")(i)) val df1 = df.withColumn("a+b", plus(col("a"), col("b"))) .withColumn("c-0", subStr(col("c"), lit(0))) val sortUdf: UserDefinedFunction = udf((rows: Seq[Row]) => { rows.map { case Row(a: Int, b: Int) => (a, b) } .sortBy { case (_, b) => b } .map { case (a, _) => a } })
-
特征處理
import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.feature._ val df: DataFrame = spark.createDataFrame(Seq( (0, "a", 18.0, Vectors.dense(1.0, 0.1, -8.0)), (1, "b", 5.0, Vectors.dense(2.0, 1.0, -4.0)), (2, "c", 9.0, Vectors.dense(4.0, 10.0, 8.0)) )).toDF("id", "cate", "hour", "features") // 二值化 val binarizer: Binarizer = new Binarizer().setInputCol("features").setOutputCol("binFeature").setThreshold(0.5) val df1: DataFrame = binarizer.transform(df) // 分桶, [a, b), 對單獨(dú)值或vector進(jìn)行處理 val bucket: Array[Double] = Array(Double.NegativeInfinity, 6.0, 12.0, Double.PositiveInfinity) val bucketizer: Bucketizer = new Bucketizer().setInputCol("hour").setOutputCol("bucketHour").setSplits(bucket) val df2: DataFrame = bucketizer.transform(df1) // 絕對值最大最小化, 歸一化到[-1, 1], 對特征處理 x/max(abs(x)) val maxAbs: MaxAbsScaler = new MaxAbsScaler().setInputCol("features").setOutputCol("maxAbs") val df3: DataFrame = maxAbs.fit(df2).transform(df2) // 最大最小化 ml.lina's.Vector, 對特征處理 (x-min)/(max-min) val minMax: MinMaxScaler = new MinMaxScaler().setInputCol("features").setOutputCol("minMax") val df4: DataFrame = minMax.fit(df3).transform(df3) // 標(biāo)準(zhǔn)化 ml.linalg.Vector 對特征處理 (x-mean)/std .setWithStd(true) .setWithMean(false) val ssc: StandardScaler = new StandardScaler().setInputCol("features").setOutputCol("ssc") val df5: DataFrame = ssc.fit(df4).transform(df4) // 正則化 對樣本處理 x / pn val norm: Normalizer = new Normalizer().setInputCol("features").setOutputCol("normFeatures").setP(1.0) val df6: DataFrame = norm.transform(df5) // IndexToString val s2i: StringIndexer = new StringIndexer().setInputCol("cate").setOutputCol("cid") val df7: DataFrame = s2i.fit(df6).transform(df6) val i2s: IndexToString = new IndexToString().setInputCol("cid").setOutputCol("cid2O") val df8: DataFrame = i2s.transform(df7) // OneHotEncoder val encoder: OneHotEncoder = new OneHotEncoder().setInputCol("cid").setOutputCol("cidVec") val df9: DataFrame = encoder.transform(df8) // degree=2, (x, y) => (x, xx, y, xy, yy) val poly: PolynomialExpansion = new PolynomialExpansion().setInputCol("features").setOutputCol("poly").setDegree(2) val df10: DataFrame = poly.transform(df9) // 分箱 val qdisc: QuantileDiscretizer = new QuantileDiscretizer().setInputCol("hour").setOutputCol("hqtd").setNumBuckets(3) val df11: DataFrame = qdisc.fit(df10).transform(df10) df11.show(false) // sql import org.apache.spark.ml.feature.SQLTransformer val dfq: DataFrame = spark.createDataFrame(Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2") val sqlTrans: SQLTransformer = new SQLTransformer().setStatement("SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__") val dfq1: DataFrame = sqlTrans.transform(dfq) // VectorAssembler 合并特征 val assembler: VectorAssembler = new VectorAssembler() .setInputCols(Array("v1", "v2", "v3", "v4")) .setOutputCol("features") val dfq2: DataFrame = assembler.transform(dfq1) dfq2.show(false)
-
Machine Learning