概述
官方地址 http://spark.apache.org/sql/
Spark SQL是Spark用來處理結(jié)構(gòu)化數(shù)據(jù)
的一個模塊笛质,它提供了2個編程抽象:DataFrame和DataSet,并且作為分布式SQL查詢引擎的作用何缓。
Spark SQL可以很好地支持SQL查詢福荸,一方面蕴坪,可以編寫Spark應(yīng)用程序使用SQL語句進行數(shù)據(jù)查詢,另一方面逞姿,也可以使用標準的數(shù)據(jù)庫連接器(比如JDBC或ODBC)連接Spark進行SQL查詢辞嗡。
DataFrame
SparkSQL使用的數(shù)據(jù)抽象是DataFrame ,DataFrame讓Spark具備了處理大數(shù)據(jù)結(jié)構(gòu)化數(shù)據(jù)的能力,它不僅比原來的RDD轉(zhuǎn)換方式更加簡單易用滞造,而且獲得了更高的計算能力续室。Spark 能夠輕松實現(xiàn)從Mysql到DataFrame的轉(zhuǎn)化,并且支持SQL查詢谒养。
DataFrame是一種以RDD為基礎(chǔ)的分布式數(shù)據(jù)集挺狰,提供了詳細的數(shù)據(jù)信息,就相當于關(guān)系數(shù)據(jù)庫的一張表买窟,每個RDD元素都是一個Java對象丰泊,即Person對象,但是無法知道Person對象的內(nèi)部結(jié)構(gòu)信息始绍,而采用DataFrame時瞳购,Person對象內(nèi)部機構(gòu)信息一目了然。它包含Name,Age,并且知道每個字段的數(shù)據(jù)類型亏推。
DataFrame創(chuàng)建
從Spark2.0以上版本開始学赛,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口來實現(xiàn)其對數(shù)據(jù)加載年堆、轉(zhuǎn)換、處理等功能盏浇。SparkSession實現(xiàn)了SQLContext及HiveContext所有功能变丧。
SparkSession支持從不同的數(shù)據(jù)源加載數(shù)據(jù),并把數(shù)據(jù)轉(zhuǎn)換成DataFrame绢掰,并且支持把DataFrame轉(zhuǎn)換成SQLContext自身中的表痒蓬,然后使用SQL語句來操作數(shù)據(jù)。SparkSession亦提供了HiveQL以及其他依賴于Hive的功能的支持滴劲。
HelloWorld
在當前工程目錄底下創(chuàng)建input目錄 創(chuàng)建測試數(shù)據(jù)文件 people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
maven 配置
https://mvnrepository.com/search?q=org.apache.spark
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.bx.spark</groupId>
<artifactId>SparkNote</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<scala.version>2.11.12</scala.version>
<spark.version>2.2.3</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
SparkSQL.scala
package cn.bx.spark
import org.apache.spark.sql.{DataFrame, SparkSession}
object SparkSQLNote {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
val df: DataFrame = sparkSession.read.json("input/people.json")
df.show()
df.printSchema()
df.select(df("name"),df("age")).show()
df.filter(df("age") > 20 ).show()
df.groupBy("name").count().show()
df.sort(df("age").desc).show()
df.sort(df("age").desc, df("name").asc).show()
df.select(df("name").as("user_name"),df("age").as("user_age")).show()
sparkSession.stop()
}
}