Sparklyr是rstudio 社區(qū)維護(hù)的一個(gè)spark的接口晨横。
文檔
Sparklyr 文檔:https://spark.rstudio.com
安裝
Sparklyr: sparklyr::spark_install(version = "2.3.0", hadoop_version = "2.7")蒜胖,不依賴于 Spark 版本,spark 2.X 完美兼容 1.X卒煞。
Spark 環(huán)境配置需要注意的問題:
- 下載和 Hadoop 對(duì)應(yīng)版本號(hào)的發(fā)行版蹬叭,具體可以通過 sparklyr::spark_available_versions() 查詢可用的 spark 版本
- JAVA_HOME/SPARK_HOME/HADOOP_HOME 是必須要指定的環(huán)境變量,建議使用 JDK8/spark2.x/hadoop2.7
- yarn-client/yarn-cluster 模式需要設(shè)置環(huán)境變量 Sys.setenv("HADOOP_CONF_DIR"="/etc/hadoop/conf")
- 連接 Hive 需要提供 Hive 鏈接配置, 在 spark-connection 初始化時(shí)指定對(duì)應(yīng) hive-site.xml 文件
初始化
sc <- sparklyr::spark_connect(master = "yarn-client",
spark_home = "/data/FinanceR/Spark",
version = "2.2.0",
config = sparklyr::spark_config())
數(shù)據(jù)輸入輸出
以寫 Parquet 文件為例, 同理你可以用 SparkR::write.()/sparklyr::spark_write_()
等寫入其他格式文件到HDFS
上, 比如csv/text
役耕。
什么是 Parquet 文件? Parquet 是一種高性能列式存儲(chǔ)文件格式聪廉,比 CSV 文件強(qiáng)在內(nèi)建索引瞬痘,可以快速查詢數(shù)據(jù),目前普遍應(yīng)用在模型訓(xùn)練過程锄列。
df <- sparklyr::copy_to(sc,faithful,"df")
sparklyr::spark_write_parquet(df,path="/user/FinanceR",mode="overwrite",partition_by = "dt")
數(shù)據(jù)清洗
library(sparklyr)
library(dplyr)
# 在 mutate 中支持 Hive UDF
remote_df = dplyr::tbl(sc,from = "db.financer_tbl") # 定義數(shù)據(jù)源表
# 或者 remote_df = dplyr::tbl(sc,from = dplyr::sql("select * from db.financer_tbl limit 10")) #
remote_df %>%
mutate(a = b+2) %>% # 在 mutate 中支持 Hive UDF
filter(a > 2)%>%
group_by(key)%>%
summarize(count = n())%>%
select(cnt = count)%>%
order_by(cnt)%>%
arrange(desc(cnt))%>%
na.omit() ->
pipeline
pipeline %>% sdf_persist() # 大數(shù)據(jù)集 緩存在集群上
pipeline %>% head() %>% collect() # 小數(shù)據(jù) 加載到本地
SQL
df <- sc %>%
dplyr::tbl(dplyr::sql('SELECT * FROM financer_tbl WHERE dt = "20180318"'))
sc %>% DBI::dbGetQuery('SELECT * FROM financer_tbl WHERE dt = "20180318" limit 10') # 直接將數(shù)據(jù) collect 到本地, 與操作MySQL完全一樣
df %>% dbplyr::sql_render() # 將 pipeline 自動(dòng)翻譯為 SQL
# SELECT * FROM financer_tbl WHERE dt = "20180318"
分發(fā) R 代碼
分發(fā)機(jī)制:
系統(tǒng)會(huì)將本地依賴文件壓縮打包上傳到 HDFS 路徑上图云,通過 Spark 動(dòng)態(tài)分發(fā)到執(zhí)行任務(wù)的機(jī)器上解壓縮。 執(zhí)行任務(wù)的機(jī)器本地獨(dú)立的線程邻邮、內(nèi)存中執(zhí)行代碼竣况,最后匯總計(jì)算結(jié)果到主要節(jié)點(diǎn)機(jī)器上實(shí)現(xiàn) R 代碼的分發(fā)。
func <- function(x){x + runif(1) } # 原生 R代碼
sparklyr::spark_apply(x = df,packages=T,name = c("key","value"),func =func,group = "key")
流式計(jì)算
什么是流式計(jì)算? 流式計(jì)算是介于實(shí)時(shí)與離線計(jì)算之間的一種計(jì)算方式筒严,以亞秒級(jí)準(zhǔn)實(shí)時(shí)的方式小批量計(jì)算數(shù)據(jù)丹泉,廣泛應(yīng)用在互聯(lián)網(wǎng)廣告、推薦等場(chǎng)景鸭蛙。
Sparklyr: 暫時(shí)不支持流式計(jì)算摹恨,功能開發(fā)中。
統(tǒng)計(jì)之都原文:
https://cosx.org/2018/05/sparkr-vs-sparklyr
學(xué)習(xí)資源
https://spark.rstudio.com/
https://github.com/rstudio/cheatsheets/raw/master/translations/chinese/sparklyr-cheatsheet_zh_CN.pdf