一号阿、簡介
????????? ? 本實驗參考《spark權(quán)威指南》第9章:Spark-SQL
二冈钦、實驗內(nèi)容
????? ? ? ? 利用spark-sql讀寫以下類型的數(shù)據(jù)源
1.Databases
????1)Show Databases
????2)Use Databases
????3)Creating Databases
????5)Dropping Databases
2.Tables
????1)Creating Tables
????2)Inserting Into Tables
????3)Describing Table Metadata
????4)Refreshing Table Metadata
????5)Dropping Tables
3.Views
????1)Creating Views
????2)Dropping Views
4.Select Statements
? ? 1)Case When Then Statements
5.Advanced Topics
????1)Complex Types
????????????Structs
????????????Sets and Lists
????2)Functions
????????????SYSTEM FUNCTIONS
????????????User Defined Functions
????3)Spark Managed Tables
????????????Creating External Tables
????????????Dropping Unmanaged Tables
????4)Subqueries
????????????Uncorrelated Predicate Subqueries
????????????Correlated Predicated Subqueries
????????????Uncorrelated Scalar Queries
三十办、實驗前準備
1.實驗數(shù)據(jù)
????????參考“好玩的大數(shù)據(jù)之43:Spark實驗1(用Spark-Shell讀寫外部數(shù)據(jù)源)”實驗前準備章節(jié)
2.改變?nèi)罩据敵黾墑e
????????參考“好玩的大數(shù)據(jù)之43:Spark實驗1(用Spark-Shell讀寫外部數(shù)據(jù)源)”實驗前準備章節(jié):改變?nèi)罩据敵黾墑e
四坝撑、啟動
? ? ? ? ? ? 1.啟動spark-sql
? ? ? ? ? ? ? ? ? spark-sql
? ? ? ? ? ? ? ? ? 直到看到spark-sql提示符
說明修噪,這里我已經(jīng)修改了conf/log4j.properties里面的控制臺日志輸出級別為ERROR
? ??????????????????????log4j.rootCategory=ERROR, console?
? ? ? ? ? ? 2.小試一下牛刀
? ? ? ? ? ? ? ? ? ? ? ? select 1+1
五喊巍、實驗過程
除非特殊說明,以下均在spark-sql (default)> 提示符下執(zhí)行骡和,需要輸入的代碼在begin-end之間
注:所有的語句均要以分號“;”結(jié)尾才能執(zhí)行
1.Databases
=====================Databases?begin =====================
1)Show Databases
show databases;
2)Creating Databases
create database testdb;
show databases;
3)Use Databases
use testdb;
4)Dropping Databases
create database testdb2;
show databases;
drop database testdb2;
5)背后的故事
實際上相赁,這些命令執(zhí)行的結(jié)果相寇,最后都保存在hadoop的文件系統(tǒng)中,具體位置根據(jù)hive配置文件conf/hive-site.xml的如下屬性
<property>
? ? <name>hive.metastore.warehouse.dir</name>
? ? <value>/mylab/soft/apache-hive-3.1.2-bin/working/metastore.warehouse</value>
? ? <description>location of default database for the warehouse</description>
? </property>
新開一個Terminal
hadoop fs -ls /mylab/soft/apache-hive-3.1.2-bin/working/metastore.warehouse
hadoop fs -ls?/mylab/soft/apache-hive-3.1.2-bin/working/metastore.warehouse/testdb.db
=====================Databases?end=====================
2.Tables
=====================Tables?begin =====================
1)Creating Tables
#常規(guī)建表
create table student(
? ? id int,
? ? name?string
);
#使用Using建表
CREATE TABLE flights (
DEST_COUNTRY_NAME STRING,
ORIGIN_COUNTRY_NAME STRING,
count LONG)
USING JSON
OPTIONS (
path '/mylab/mydata/spark/spark_guide_data/flight-data/json/2015-summary.json');
#使用Using建表,并且給列注釋
CREATE TABLE flights_csv (
DEST_COUNTRY_NAME STRING,
ORIGIN_COUNTRY_NAME STRING COMMENT "remember that the most prevalent will",
count LONG)
USING csv
OPTIONS (
inferSchema true,
header true,
path '/mylab/mydata/spark/spark_guide_data/flight-data/csv/2015-summary.csv');
show tables;
在Terminal下執(zhí)行下面的命令钮科,看看文件在不在
hadoop fs -ls /mylab/mydata/spark/spark_guide_data/flight-data/json/2015-summary.json
hadoop fs -ls /mylab/mydata/spark/spark_guide_data/flight-data/csv/2015-summary.csv
hadoop fs -ls?/mylab/soft/apache-hive-3.1.2-bin/working/metastore.warehouse/testdb.db
說明使用了USING 之后唤衫,數(shù)據(jù)用的是原來的那個數(shù)據(jù)文件
#使用查詢建表
CREATE TABLE flights_from_select
AS
SELECT * FROM flights;
CREATE TABLE IF NOT EXISTS flights_from_select
AS
SELECT * FROM flights
LIMIT 5;
show tables;
在Terminal下執(zhí)行下面的命令,看看文件在不在
hadoop fs -ls?/mylab/soft/apache-hive-3.1.2-bin/working/metastore.warehouse/testdb.db
hadoop fs -ls /mylab/soft/apache-hive-3.1.2-bin/working/metastore.warehouse/testdb.db/flights_from_select
hadoop fs -cat /mylab/soft/apache-hive-3.1.2-bin/working/metastore.warehouse/testdb.db/flights_from_select/part-00000-5bfd19b2-ceb4-4eca-adbe-19118cc4bcd0-c000
#使用分區(qū)建表
CREATE TABLE partitioned_flights
USING parquet
PARTITIONED BY (DEST_COUNTRY_NAME)
AS
SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights
LIMIT 5 ;
select * from flights limit 5;
select * from partitioned_flights;
在Terminal下執(zhí)行
hadoop fs -ls /mylab/soft/apache-hive-3.1.2-bin/working/metastore.warehouse/testdb.db/partitioned_flights
按關(guān)鍵字進行分區(qū)后绵脯,會根據(jù)不同的關(guān)鍵字屬性值生成不同的文件
2)Show Tables
? ? ? ? show tables;
? ? ? ? show tables in testdb;
3)Inserting Into Tables
#插入到普通表
insert into student values(1,'zhang');
insert into student values(2,'zhao');
insert into student values(3,'wang');
insert into student values(4,'li');
select * from student;
hadoop fs -ls /mylab/soft/apache-hive-3.1.2-bin/working/metastore.warehouse/testdb.db/student
hadoop fs -cat /mylab/soft/apache-hive-3.1.2-bin/working/metastore.warehouse/testdb.db/student/*
hadoop fs -cat /mylab/soft/apache-hive-3.1.2-bin/working/metastore.warehouse/testdb.db/student/part-00000-4d48fb12-7412-46b0-9460-27ba674bfce9-c000
#插入到使用Using創(chuàng)建的表
insert into flights_csv values("beijing","shanghai",15);
#插入到分區(qū)表
#通過values插入
INSERT INTO partitioned_flights
PARTITION (DEST_COUNTRY_NAME="United States")
values('China',1);
INSERT INTO partitioned_flights
PARTITION (DEST_COUNTRY_NAME="China")
values('japan',10);
INSERT INTO partitioned_flights
PARTITION (DEST_COUNTRY_NAME="China")
values('Singapore',15);
#通過select插入
INSERT INTO partitioned_flights
PARTITION (DEST_COUNTRY_NAME="United States")
SELECT ORIGIN_COUNTRY_NAME,count FROM flights
WHERE DEST_COUNTRY_NAME='United States'
LIMIT 1;
select * from partitioned_flights;
在Terminal下執(zhí)行
hadoop fs -ls /mylab/soft/apache-hive-3.1.2-bin/working/metastore.warehouse/testdb.db/partitioned_flights
hadoop fs -ls /mylab/soft/apache-hive-3.1.2-bin/working/metastore.warehouse/testdb.db/partitioned_flights/DEST_COUNTRY_NAME=China
4)Describing Table
DESCRIBE TABLE flights;
DESC TABLE flights;
DESC TABLE partitioned_flights;
在Terminal下執(zhí)行
hadoop fs -ls /mylab/soft/apache-hive-3.1.2-bin/working/metastore.warehouse/testdb.db/partitioned_flights
5)Refreshing Table
? ? ? ? 兩種方式
????????????REFRESH table partitioned_flights
????????????MSCK REPAIR TABLE partitioned_flights
6)Dropping Tables
????????DROP TABLE flights_csv;
? ??????DROP TABLE IF EXISTS flights_csv;
=====================Tables end=====================
3.Views
=====================Viewsbegin =====================
1)Show?Views
? ? show views;
? ? show views in testdb;
2)?Creating Views
CREATE OR REPLACE VIEW just_usa_view AS
SELECT *
FROM flights
WHERE dest_country_name = 'United States';
CREATE OR REPLACE TEMP VIEW just_usa_view_temp AS
SELECT *
FROM flights
WHERE dest_country_name = 'United States';
CREATE OR REPLACE GLOBAL TEMP VIEW just_usa_global_view_temp AS
SELECT *
FROM flights
WHERE dest_country_name = 'United States';
SELECT * FROM just_usa_view;
SELECT * FROM just_usa_view_temp;
SELECT * FROM just_usa_global_view_temp;(這個好像出錯佳励,不知道為啥)
3)Dropping Views
? ? ? ? drop view?just_usa_view;
? ? ? ? drop view?just_usa_view_temp蛆挫;
????????drop view?just_usa_global_view_temp赃承;(這個好像出錯,不知道為啥)
=====================Views?end=====================
4.Select Statements
=====================Select?begin=====================
1)Select語法
SELECT [ALL|DISTINCT] named_expression[, named_expression, ...]
FROM relation[, relation, ...]
[lateral_view[, lateral_view, ...]]
[WHERE boolean_expression]
[aggregation [HAVING boolean_expression]]
[ORDER BY sort_expressions]
[CLUSTER BY expressions]
[DISTRIBUTE BY expressions]
[SORT BY sort_expressions]
[WINDOW named_window[, WINDOW named_window, ...]]
[LIMIT num_rows]
named_expression:
: expression [AS alias]
relation:
| join_relation
| (table_name|query|relation) [sample] [AS alias]
: VALUES (expressions)[, (expressions), ...]
[AS (column_name[, column_name, ...])]
expressions:
: expression[, expression, ...]
sort_expressions:
: expression [ASC|DESC][, expression [ASC|DESC], ...]
2)Case When Then Statements
SELECT
CASE?
WHEN DEST_COUNTRY_NAME = 'UNITED STATES' THEN 1
WHEN DEST_COUNTRY_NAME = 'Egypt' THEN 0
ELSE -1 END
FROM
partitioned_flights;
=====================Select?end=====================
5.Advanced Topics
=====================Advanced?Topics?begin =====================
1) Complex Types
????1.1)Structs
CREATE VIEW IF NOT EXISTS
nested_data
AS
SELECT
(DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME) as country,
count
FROM flights;
SELECT * FROM nested_data limit 5;
SELECT country.DEST_COUNTRY_NAME, count
FROM nested_data;
SELECT country.*, count
FROM nested_data
????1.2)Sets and Lists
SELECT
DEST_COUNTRY_NAME as new_name,
collect_list(count) as flight_counts,
collect_set(ORIGIN_COUNTRY_NAME) as origin_set
FROM
partitioned_flights
GROUP BY
DEST_COUNTRY_NAME;
SELECT
DEST_COUNTRY_NAME as new_name,
collect_list(count)[0]
FROM
partitioned_flights
GROUP BY
DEST_COUNTRY_NAME;
CREATE OR REPLACE TEMP VIEW flights_agg
AS
SELECT
DEST_COUNTRY_NAME,
collect_list(count) as collected_counts
FROM
partitioned_flights
GROUP BY
DEST_COUNTRY_NAME;
select * from flights_agg;
SELECT explode(collected_counts), DEST_COUNTRY_NAME
FROM flights_agg;
2)Functions
? ??2.1)ALL FUNCTIONS
? ??????????SHOW FUNCTIONS
? ? ? ? ? ? 所有的FUNCTIONS參見附錄列表
? ? 2.2)SYSTEM FUNCTIONS
? ??????????SHOW SYSTEM FUNCTIONS
def power3(number:Double):Double = {
number * number * number
}
spark.udf.register("power3", power3(_:Double):Double)
? ? 2.3)User Defined Functions
? ??????????SHOW USER FUNCTIONS;
????2.4)展示部分
? ??????????SHOW FUNCTIONS "s*";
? ??????????SHOW FUNCTIONS LIKE "collect*";
????2.5)創(chuàng)建自定義函數(shù)
? ? ? ? ? ? 切換到Spark-shell
????????????????????def power3(number:Double):Double = {
????????????????????????????number * number * number
????????????????????}
????????????????????spark.udf.register("power3", power3(_:Double):Double)
????????????????切換到Spark-SQL(好像不能使用)
? ??SELECT count, power3(count)
????FROM flights;
3)Spark Managed Tables
Creating External Tables
Dropping Unmanaged Tables
4)Subqueries
Uncorrelated Predicate Subqueries
Correlated Predicated Subqueries
Uncorrelated Scalar Queries
=====================Advanced?Topics?end=====================
exit;????退出
六悴侵、附錄:SYSTEM FUNTION
!
!=
%
&
*
+
-
/
<
<=
<=>
<>
=
==
>
>=
^
abs
acos
acosh
add_months
aggregate
and
any
approx_count_distinct
approx_percentile
array
array_contains
array_distinct
array_except
array_intersect
array_join
array_max
array_min
array_position
array_remove
array_repeat
array_sort
array_union
arrays_overlap
arrays_zip
ascii
asin
asinh
assert_true
atan
atan2
atanh
avg
base64
between
bigint
bin
binary
bit_and
bit_count
bit_length
bit_or
bit_xor
bool_and
bool_or
boolean
bround
cardinality
case
cast
cbrt
ceil
ceiling
char
char_length
character_length
chr
coalesce
collect_list
collect_set
concat
concat_ws
conv
corr
cos
cosh
cot
count
count_if
count_min_sketch
covar_pop
covar_samp
crc32
cube
cume_dist
current_database
current_date
current_timestamp
date
date_add
date_format
date_part
date_sub
date_trunc
datediff
day
dayofmonth
dayofweek
dayofyear
decimal
decode
degrees
dense_rank
div
double
e
element_at
elt
encode
every
exists
exp
explode
explode_outer
expm1
extract
factorial
filter
find_in_set
first
first_value
flatten
float
floor
forall
format_number
format_string
from_csv
from_json
from_unixtime
from_utc_timestamp
get_json_object
greatest
grouping
grouping_id
hash
hex
hour
hypot
if
ifnull
in
initcap
inline
inline_outer
input_file_block_length
input_file_block_start
input_file_name
instr
int
isnan
isnotnull
isnull
java_method
json_tuple
kurtosis
lag
last
last_day
last_value
lcase
lead
least
left
length
levenshtein
like
ln
locate
log
log10
log1p
log2
lower
lpad
ltrim
make_date
make_interval
make_timestamp
map
map_concat
map_entries
map_filter
map_from_arrays
map_from_entries
map_keys
map_values
map_zip_with
max
max_by
md5
mean
min
min_by
minute
mod
monotonically_increasing_id
month
months_between
named_struct
nanvl
negative
next_day
not
now
ntile
nullif
nvl
nvl2
octet_length
or
overlay
parse_url
percent_rank
percentile
percentile_approx
pi
pmod
posexplode
posexplode_outer
position
positive
pow
power
printf
quarter
radians
rand
randn
random
rank
reflect
regexp_extract
regexp_replace
repeat
replace
reverse
right
rint
rlike
rollup
round
row_number
rpad
rtrim
schema_of_csv
schema_of_json
second
sentences
sequence
sha
sha1
sha2
shiftleft
shiftright
shiftrightunsigned
shuffle
sign
signum
sin
sinh
size
skewness
slice
smallint
some
sort_array
soundex
space
spark_partition_id
split
sqrt
stack
std
stddev
stddev_pop
stddev_samp
str_to_map
string
struct
substr
substring
substring_index
sum
tan
tanh
timestamp
tinyint
to_csv
to_date
to_json
to_timestamp
to_unix_timestamp
to_utc_timestamp
transform
transform_keys
transform_values
translate
trim
trunc
typeof
ucase
unbase64
unhex
unix_timestamp
upper
uuid
var_pop
var_samp
variance
version
weekday
weekofyear
when
window
xpath
xpath_boolean
xpath_double
xpath_float
xpath_int
xpath_long
xpath_number
xpath_short
xpath_string
xxhash64
year
zip_with
|
~