10.spark sql之快速入門

前世今生

Hive&Shark

??隨著大數(shù)據(jù)時代的來臨,Hadoop風(fēng)靡一時陨瘩。為了使熟悉RDBMS但又不理解MapReduce的技術(shù)人員快速進行大數(shù)據(jù)開發(fā),Hive應(yīng)運而生恨闪。Hive是當(dāng)時唯一運行在Hadoop上的SQL-on-Hadoop工具究飞。

??但是MapReduce計算過程中大量的中間磁盤落地過程消耗了大量的I/O置谦,降低的運行效率。為了提高SQL-on-Hadoop的效率亿傅,大量的SQL-on-Hadoop工具開始產(chǎn)生媒峡,其中表現(xiàn)較為突出的是:

  • MapR的Drill
  • Cloudera的Impala
  • Shark

??Shark是伯克利實驗室Spark生態(tài)的組件之一,它修改了Hive Driver的內(nèi)存管理葵擎、物理計劃谅阿、執(zhí)行三個模塊,使之能運行在Spark引擎上酬滤,從而使得SQL查詢的速度得到10-100倍的提升签餐。

Hive&Shark.jpg

Shark&Spark SQL

??Shark對于Hive的太多依賴(如采用Hive的語法解析器、查詢優(yōu)化器等等)盯串,制約了Spark的One Stack Rule Them All的既定方針氯檐,制約了Spark各個組件的相互集成,所以提出了SparkSQL項目体捏。

??SparkSQL拋棄原有Shark的代碼冠摄,汲取了Shark的一些優(yōu)點,如內(nèi)存列存儲(In-Memory Columnar Storage)几缭、Hive兼容性等河泳,重新開發(fā)了SparkSQL代碼。由于擺脫了對Hive的依賴性年栓,SparkSQL無論在數(shù)據(jù)兼容拆挥、性能優(yōu)化、組件擴展方面都得到了極大地提升某抓。

  • 數(shù)據(jù)兼容方面

??不但兼容Hive纸兔,還可以從RDD黄锤、parquet文件、JSON文件中獲取數(shù)據(jù)食拜,也支持獲取RDBMS數(shù)據(jù)以及cassandra等NOSQL數(shù)據(jù)。

  • 性能優(yōu)化方面

??除了采取In-Memory Columnar Storage副编、byte-code generation等優(yōu)化技術(shù)外,引進Cost Model對查詢進行動態(tài)評估负甸、獲取最佳物理計劃等。

  • 組件擴展方面

??無論是SQL的語法解析器痹届、分析器還是優(yōu)化器都可以重新定義呻待,進行擴展。

??2014年Shark停止開發(fā)队腐,團隊將所有資源放SparkSQL項目上蚕捉,至此,Shark的發(fā)展畫上了句號柴淘,但也因此發(fā)展出兩條線:SparkSQL和Hive on Spark迫淹。

Shark&SparkSQL.jpg

??其中SparkSQL作為Spark生態(tài)的一員繼續(xù)發(fā)展,而不再受限于Hive为严,只是兼容Hive敛熬;而Hive on Spark是一個Hive的發(fā)展計劃,該計劃將Spark作為Hive的底層引擎之一第股,也就是說应民,Hive將不再受限于一個引擎,可以采用Map-Reduce夕吻、Tez诲锹、Spark等引擎。

簡介

??Spark SQL是一個用于結(jié)構(gòu)化數(shù)據(jù)處理的模塊涉馅。Spark SQL賦予待處理數(shù)據(jù)一些結(jié)構(gòu)化信息归园,可以使用SQL語句或DataSet API接口與Spark SQL進行交互。

  • SQL

??Spark SQL可以使用sql讀寫Hive中的數(shù)據(jù)稚矿;也可以在編程語言中使用sql蔓倍,返回Dataset/DataFrame結(jié)果集。

  • DataSets&DataFrames

??Dataset是一個分布式數(shù)據(jù)集盐捷,它結(jié)合了RDD與SparkSQL執(zhí)行引擎的優(yōu)點偶翅。Dataset可以通過JVM對象構(gòu)造,然后使用算子操作進行處理碉渡。Java和Scala都有Dataset API聚谁;Python和R本身支持Dataset特性。

??DataFrame是一個二維結(jié)構(gòu)的DataSet滞诺,相當(dāng)于RDBMS中的表形导。DataFrame可以有多種方式構(gòu)造环疼,比如結(jié)構(gòu)化數(shù)據(jù)文件、hive表朵耕、外部數(shù)據(jù)庫炫隶、RDD等。在Scala阎曹、Java伪阶、Python及R中都有DataFrame API。

DataFrame與DataSet

DataFrame創(chuàng)建及操作

  • scala
import org.apache.spark.sql.SparkSession

// 構(gòu)造SparkSession
val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// 創(chuàng)建DataFrame
val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// DataFrame操作
// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+
  • java
import org.apache.spark.sql.SparkSession;

//構(gòu)造SparkSession
SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate();
  
//創(chuàng)建DataFrame
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

//DataFrame操作
// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;

// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show();
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+
  • python
from pyspark.sql import SparkSession

# 構(gòu)造SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
    
# 創(chuàng)建DataFrame
# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

# DataFrame操作
# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# |   name|(age + 1)|
# +-------+---------+
# |Michael|     null|
# |   Andy|       31|
# | Justin|       20|
# +-------+---------+

# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+

# Count people by age
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# |  19|    1|
# |null|    1|
# |  30|    1|
# +----+-----+

DataSet創(chuàng)建及操作

??Datasets和RDD類似处嫌,但使用專門的Encoder編碼器來序列化需要經(jīng)過網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)對象栅贴,而不用RDD使用的Java序列化或Kryo庫。Encoder編碼器是動態(tài)生成的代碼熏迹,允許直接執(zhí)行各種算子操作檐薯,而不用反序列化。

  • scala
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface
case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
  • java
import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

public static class Person implements Serializable {
  private String name;
  private int age;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public int getAge() {
    return age;
  }

  public void setAge(int age) {
    this.age = age;
  }
}

// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);

// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
  Collections.singletonList(person),
  personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+

// Encoders for most common types are provided in class Encoders
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map(
    (MapFunction<Integer, Integer>) value -> value + 1,
    integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]

// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

SQL操作

  • scala
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
//df.createGlobalTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
  • java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");
//df.createGlobalTempView("people")

Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
  • python
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
# df.createGlobalTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末注暗,一起剝皮案震驚了整個濱河市坛缕,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌捆昏,老刑警劉巖祷膳,帶你破解...
    沈念sama閱讀 211,376評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異屡立,居然都是意外死亡直晨,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,126評論 2 385
  • 文/潘曉璐 我一進店門膨俐,熙熙樓的掌柜王于貴愁眉苦臉地迎上來勇皇,“玉大人,你說我怎么就攤上這事焚刺×舱” “怎么了?”我有些...
    開封第一講書人閱讀 156,966評論 0 347
  • 文/不壞的土叔 我叫張陵乳愉,是天一觀的道長兄淫。 經(jīng)常有香客問我,道長蔓姚,這世上最難降的妖魔是什么捕虽? 我笑而不...
    開封第一講書人閱讀 56,432評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮坡脐,結(jié)果婚禮上泄私,老公的妹妹穿的比我還像新娘。我一直安慰自己,他們只是感情好晌端,可當(dāng)我...
    茶點故事閱讀 65,519評論 6 385
  • 文/花漫 我一把揭開白布捅暴。 她就那樣靜靜地躺著,像睡著了一般咧纠。 火紅的嫁衣襯著肌膚如雪蓬痒。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,792評論 1 290
  • 那天漆羔,我揣著相機與錄音梧奢,去河邊找鬼。 笑死钧椰,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的符欠。 我是一名探鬼主播嫡霞,決...
    沈念sama閱讀 38,933評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼希柿!你這毒婦竟也來了诊沪?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,701評論 0 266
  • 序言:老撾萬榮一對情侶失蹤曾撤,失蹤者是張志新(化名)和其女友劉穎端姚,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體挤悉,經(jīng)...
    沈念sama閱讀 44,143評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡渐裸,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,488評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了装悲。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片昏鹃。...
    茶點故事閱讀 38,626評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖诀诊,靈堂內(nèi)的尸體忽然破棺而出洞渤,到底是詐尸還是另有隱情,我是刑警寧澤属瓣,帶...
    沈念sama閱讀 34,292評論 4 329
  • 正文 年R本政府宣布载迄,位于F島的核電站,受9級特大地震影響抡蛙,放射性物質(zhì)發(fā)生泄漏护昧。R本人自食惡果不足惜婶熬,卻給世界環(huán)境...
    茶點故事閱讀 39,896評論 3 313
  • 文/蒙蒙 一塌碌、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧否纬,春花似錦、人聲如沸怠晴。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蒜田。三九已至稿械,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間冲粤,已是汗流浹背美莫。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留梯捕,地道東北人厢呵。 一個月前我還...
    沈念sama閱讀 46,324評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像傀顾,于是被迫代替她去往敵國和親襟铭。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,494評論 2 348

推薦閱讀更多精彩內(nèi)容