Apache Spark 2.2.0 中文文檔 - SparkR (R on Spark) | ApacheCN

SparkR (R on Spark)

概述

SparkDataFrame

啟動(dòng): SparkSession

從 RStudio 來(lái)啟動(dòng)

創(chuàng)建 SparkDataFrames

從本地的 data frames 來(lái)創(chuàng)建 SparkDataFrames

從 Data Sources(數(shù)據(jù)源)創(chuàng)建 SparkDataFrame

從 Hive tables 來(lái)創(chuàng)建 SparkDataFrame

SparkDataFrame 操作

Selecting rows(行), columns(列)

Grouping, Aggregation(分組, 聚合)

Operating on Columns(列上的操作)

應(yīng)用 User-Defined Function(UDF 用戶自定義函數(shù))

Run a given function on a large dataset usingdapplyordapplyCollect

dapply

dapplyCollect

Run a given function on a large dataset grouping by input column(s) and usinggapplyorgapplyCollect(在一個(gè)大的 dataset 上通過(guò) input colums(輸入列)來(lái)進(jìn)行 grouping(分組)并且使用gapplyorgapplyCollect來(lái)運(yùn)行一個(gè)指定的函數(shù))

gapply

gapplyCollect

使用spark.lapply分發(fā)運(yùn)行一個(gè)本地的 R 函數(shù)

spark.lapply

SparkR 中運(yùn)行 SQL 查詢

機(jī)器學(xué)習(xí)

算法

分類

回歸

聚類

協(xié)同過(guò)濾

頻繁模式挖掘

統(tǒng)計(jì)

模型持久化

R和Spark之間的數(shù)據(jù)類型映射

Structured Streaming

R 函數(shù)名沖突

遷移指南

SparkR 1.5.x 升級(jí)至 1.6.x

SparkR 1.6.x 升級(jí)至 2.0

升級(jí)至 SparkR 2.1.0

升級(jí)至 SparkR 2.2.0

概述

SparkR 是一個(gè) R package, 它提供了一個(gè)輕量級(jí)的前端以從 R 中使用 Apache Spark. 在 Spark 2.2.0 中, SparkR 提供了一個(gè)分布式的 data frame, 它實(shí)現(xiàn)了像 selection, filtering, aggregation etc 一系列所支持的操作.(dplyr與 R data frames 相似) ), 除了可用于海量數(shù)據(jù)上之外. SparkR 還支持使用 MLlib 來(lái)進(jìn)行分布式的 machine learning(機(jī)器學(xué)習(xí)).

SparkDataFrame

SparkDataFrame 是一個(gè)分布式的, 將數(shù)據(jù)映射到有名稱的 colums(列)的集合. 在概念上 相當(dāng)于關(guān)系數(shù)據(jù)庫(kù)中的table表或 R 中的 data frame嘱么,但在該引擎下有更多的優(yōu)化. SparkDataFrames 可以從各種來(lái)源構(gòu)造狮含,例如: 結(jié)構(gòu)化的數(shù)據(jù)文件,Hive 中的表曼振,外部數(shù)據(jù)庫(kù)或現(xiàn)有的本地 R data frames.

All of the examples on this page use sample data included in R or the Spark distribution and can be run using the./bin/sparkRshell.

啟動(dòng): SparkSession

SparkR 的入口點(diǎn)是SparkSession, 它會(huì)連接您的 R 程序到 Spark 集群中. 您可以使用sparkR.session來(lái)創(chuàng)建SparkSession, 并傳遞諸如應(yīng)用程序名稱, 依賴的任何 spark 軟件包等選項(xiàng), 等等. 此外几迄,還可以通過(guò)SparkSession來(lái)與SparkDataFrames一起工作。 如果您正在使用sparkRshell冰评,那么SparkSession應(yīng)該已經(jīng)被創(chuàng)建了映胁,你不需要再調(diào)用sparkR.session.

sparkR.session()

從 RStudio 來(lái)啟動(dòng)

您可以從 RStudio 中來(lái)啟動(dòng) SparkR. 您可以從 RStudio, R shell, Rscript 或者 R IDEs 中連接你的 R 程序到 Spark 集群中去. 要開始, 確保已經(jīng)在環(huán)境變量中設(shè)置好 SPARK_HOME (您可以檢測(cè)下Sys.getenv), 加載 SparkR package, 并且像下面一樣調(diào)用sparkR.session. 它將檢測(cè) Spark 的安裝, 并且, 如果沒(méi)有發(fā)現(xiàn), 它將自動(dòng)的下載并且緩存起來(lái). 當(dāng)然,您也可以手動(dòng)的運(yùn)行install.spark.

為了調(diào)用sparkR.session, 您也可以指定某些 Spark driver 的屬性. 通常哪些應(yīng)用程序?qū)傩?/a>和運(yùn)行時(shí)環(huán)境不能以編程的方式來(lái)設(shè)置, 這是因?yàn)?driver 的 JVM 進(jìn)程早就已經(jīng)啟動(dòng)了, 在這種情況下 SparkR 會(huì)幫你做好準(zhǔn)備. 要設(shè)置它們, 可以像在sparkConfig參數(shù)中的其它屬性一樣傳遞它們到sparkR.session()中去.

if(nchar(Sys.getenv("SPARK_HOME"))<1){Sys.setenv(SPARK_HOME="/home/spark")}library(SparkR,lib.loc=c(file.path(Sys.getenv("SPARK_HOME"),"R","lib")))sparkR.session(master="local[*]",sparkConfig=list(spark.driver.memory="2g"))

下面的 Spark driver 屬性可以 從 RStudio 的 sparkR.session 的 sparkConfig 中進(jìn)行設(shè)置:

Property Name<(屬性名稱)Property group(屬性分組)spark-submitequivalent

spark.masterApplication Properties--master

spark.yarn.keytabApplication Properties--keytab

spark.yarn.principalApplication Properties--principal

spark.driver.memoryApplication Properties--driver-memory

spark.driver.extraClassPathRuntime Environment--driver-class-path

spark.driver.extraJavaOptionsRuntime Environment--driver-java-options

spark.driver.extraLibraryPathRuntime Environment--driver-library-path

創(chuàng)建 SparkDataFrames

有了一個(gè)SparkSession之后, 可以從一個(gè)本地的 R data frame,Hive 表, 或者其它的data sources中來(lái)創(chuàng)建SparkDataFrame應(yīng)用程序.

從本地的 data frames 來(lái)創(chuàng)建 SparkDataFrames

要?jiǎng)?chuàng)建一個(gè) data frame 最簡(jiǎn)單的方式是去轉(zhuǎn)換一個(gè)本地的 R data frame 成為一個(gè) SparkDataFrame. 我們明確的使用as.DataFrame或createDataFrame并且經(jīng)過(guò)本地的 R data frame 中以創(chuàng)建一個(gè) SparkDataFrame. 例如, 下面的例子基于 R 中已有的faithful來(lái)創(chuàng)建一個(gè)SparkDataFrame.

df<-as.DataFrame(faithful)# 展示第一個(gè) SparkDataFrame 的內(nèi)容head(df)##? eruptions waiting##1? ? 3.600? ? ? 79##2? ? 1.800? ? ? 54##3? ? 3.333? ? ? 74

從 Data Sources(數(shù)據(jù)源)創(chuàng)建 SparkDataFrame

SparkR 支持通過(guò)SparkDataFrame接口對(duì)各種 data sources(數(shù)據(jù)源)進(jìn)行操作. 本節(jié)介紹使用數(shù)據(jù)源加載和保存數(shù)據(jù)的常見(jiàn)方法. 您可以查看 Spark Sql 編程指南的specific options部分以了解更多可用于內(nèi)置的 data sources(數(shù)據(jù)源)內(nèi)容.

從數(shù)據(jù)源創(chuàng)建 SparkDataFrames 常見(jiàn)的方法是read.df. 此方法將加載文件的路徑和數(shù)據(jù)源的類型甲雅,并且將自動(dòng)使用當(dāng)前活動(dòng)的 SparkSession. SparkR 天生就支持讀取 JSON, CSV 和 Parquet 文件, 并且通過(guò)可靠來(lái)源的軟件包第三方項(xiàng)目, 您可以找到 Avro 等流行文件格式的 data source connectors(數(shù)據(jù)源連接器). 可以用spark-submit或sparkR命令指定--packages來(lái)添加這些包, 或者在交互式 R shell 或從 RStudio 中使用sparkPackages參數(shù)初始化SparkSession.

sparkR.session(sparkPackages="com.databricks:spark-avro_2.11:3.0.0")

We can see how to use data sources using an example JSON input file. Note that the file that is used here isnota typical JSON file. Each line in the file must contain a separate, self-contained valid JSON object. For more information, please seeJSON Lines text format, also called newline-delimited JSON. As a consequence, a regular multi-line JSON file will most often fail.

我們可以看看如何使用 JSON input file 的例子來(lái)使用數(shù)據(jù)源. 注意, 這里使用的文件是not一個(gè)經(jīng)典的 JSON 文件. 文件中的每行都必須包含一個(gè)單獨(dú)的解孙,獨(dú)立的有效的JSON對(duì)象

people<-read.df("./examples/src/main/resources/people.json","json")head(people)##? age? ? name##1? NA Michael##2? 30? ? Andy##3? 19? Justin# SparkR 自動(dòng)從 JSON 文件推斷出 schema(模式)printSchema(people)# root#? |-- age: long (nullable = true)#? |-- name: string (nullable = true)# 同樣, 使用? read.json 讀取多個(gè)文件people<-read.json(c("./examples/src/main/resources/people.json","./examples/src/main/resources/people2.json"))

該 data sources API 原生支持 CSV 格式的 input files(輸入文件). 要了解更多信息請(qǐng)參閱 SparkRread.dfAPI 文檔.

df<-read.df(csvPath,"csv",header="true",inferSchema="true",na.strings="NA")

該 data sources API 也可用于將 SparkDataFrames 存儲(chǔ)為多個(gè) file formats(文件格式). 例如, 我們可以使用write.df把先前的示例的 SparkDataFrame 存儲(chǔ)為一個(gè) Parquet 文件.

write.df(people,path="people.parquet",source="parquet",mode="overwrite")

從 Hive tables 來(lái)創(chuàng)建 SparkDataFrame

您也可以從 Hive tables(表)來(lái)創(chuàng)建 SparkDataFrames. 為此,我們需要?jiǎng)?chuàng)建一個(gè)具有 Hive 支持的 SparkSession抛人,它可以訪問(wèn) Hive MetaStore 中的 tables(表). 請(qǐng)注意, Spark 應(yīng)該使用Hive support來(lái)構(gòu)建弛姜,更多細(xì)節(jié)可以在SQL 編程指南中查閱.

sparkR.session()sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")# Queries can be expressed in HiveQL.results<-sql("FROM src SELECT key, value")# results is now a SparkDataFramehead(results)##? key? value## 1 238 val_238## 2? 86? val_86## 3 311 val_311

SparkDataFrame 操作

SparkDataFrames 支持一些用于結(jié)構(gòu)化數(shù)據(jù)處理的 functions(函數(shù)). 這里我們包括一些基本的例子,一個(gè)完整的列表可以在API文檔中找到:

Selecting rows(行), columns(列)

# Create the SparkDataFramedf<-as.DataFrame(faithful)# 獲取關(guān)于 SparkDataFrame 基礎(chǔ)信息df## SparkDataFrame[eruptions:double, waiting:double]# Select only the "eruptions" columnhead(select(df,df$eruptions))##? eruptions##1? ? 3.600##2? ? 1.800##3? ? 3.333# You can also pass in column name as stringshead(select(df,"eruptions"))# Filter the SparkDataFrame to only retain rows with wait times shorter than 50 minshead(filter(df,df$waiting<50))##? eruptions waiting##1? ? 1.750? ? ? 47##2? ? 1.750? ? ? 47##3? ? 1.867? ? ? 48

Grouping, Aggregation(分組, 聚合)

SparkR data frames 支持一些常見(jiàn)的, 用于在 grouping(分組)數(shù)據(jù)后進(jìn)行 aggregate(聚合)的函數(shù). 例如, 我們可以在faithfuldataset 中計(jì)算waiting時(shí)間的直方圖, 如下所示.

# We use the `n` operator to count the number of times each waiting time appearshead(summarize(groupBy(df,df$waiting),count=n(df$waiting)))##? waiting count##1? ? ? 70? ? 4##2? ? ? 67? ? 1##3? ? ? 69? ? 2# We can also sort the output from the aggregation to get the most common waiting timeswaiting_counts<-summarize(groupBy(df,df$waiting),count=n(df$waiting))head(arrange(waiting_counts,desc(waiting_counts$count)))##? waiting count##1? ? ? 78? ? 15##2? ? ? 83? ? 14##3? ? ? 81? ? 13

Operating on Columns(列上的操作)

SparkR 還提供了一些可以直接應(yīng)用于列進(jìn)行數(shù)據(jù)處理和 aggregatation(聚合)的函數(shù). 下面的例子展示了使用基本的算術(shù)函數(shù).

# Convert waiting time from hours to seconds.# Note that we can assign this to a new column in the same SparkDataFramedf$waiting_secs<-df$waiting*60head(df)##? eruptions waiting waiting_secs##1? ? 3.600? ? ? 79? ? ? ? 4740##2? ? 1.800? ? ? 54? ? ? ? 3240##3? ? 3.333? ? ? 74? ? ? ? 4440

應(yīng)用 User-Defined Function(UDF 用戶自定義函數(shù))

在 SparkR 中, 我們支持幾種 User-Defined Functions:

Run a given function on a large dataset usingdapplyordapplyCollect

dapply

應(yīng)用一個(gè) function(函數(shù))到SparkDataFrame的每個(gè) partition(分區(qū)). 應(yīng)用于SparkDataFrame每個(gè) partition(分區(qū))的 function(函數(shù))應(yīng)該只有一個(gè)參數(shù), 它中的data.frame對(duì)應(yīng)傳遞的每個(gè)分區(qū). 函數(shù)的輸出應(yīng)該是一個(gè)data.frame. Schema 指定生成的SparkDataFramerow format. 它必須匹配返回值的data types.

# Convert waiting time from hours to seconds.# Note that we can apply UDF to DataFrame.schema<-structType(structField("eruptions","double"),structField("waiting","double"),structField("waiting_secs","double"))df1<-dapply(df,function(x){x<-cbind(x,x$waiting*60)},schema)head(collect(df1))##? eruptions waiting waiting_secs##1? ? 3.600? ? ? 79? ? ? ? 4740##2? ? 1.800? ? ? 54? ? ? ? 3240##3? ? 3.333? ? ? 74? ? ? ? 4440##4? ? 2.283? ? ? 62? ? ? ? 3720##5? ? 4.533? ? ? 85? ? ? ? 5100##6? ? 2.883? ? ? 55? ? ? ? 3300

dapplyCollect

像dapply那樣, 應(yīng)用一個(gè)函數(shù)到SparkDataFrame的每個(gè)分區(qū)并且手機(jī)返回結(jié)果. 函數(shù)的輸出應(yīng)該是一個(gè)data.frame. 但是, 不需要傳遞 Schema. 注意, 如果運(yùn)行在所有分區(qū)上的函數(shù)的輸出不能 pulled(拉)到 driver 的內(nèi)存中過(guò)去, 則dapplyCollect會(huì)失敗.

# Convert waiting time from hours to seconds.# Note that we can apply UDF to DataFrame and return a R's data.frameldf<-dapplyCollect(df,function(x){x<-cbind(x,"waiting_secs"=x$waiting*60)})head(ldf,3)##? eruptions waiting waiting_secs##1? ? 3.600? ? ? 79? ? ? ? 4740##2? ? 1.800? ? ? 54? ? ? ? 3240##3? ? 3.333? ? ? 74? ? ? ? 4440

Run a given function on a large dataset grouping by input column(s) and usinggapplyorgapplyCollect(在一個(gè)大的 dataset 上通過(guò) input colums(輸入列)來(lái)進(jìn)行 grouping(分組)并且使用gapplyorgapplyCollect來(lái)運(yùn)行一個(gè)指定的函數(shù))

gapply

應(yīng)用給一個(gè)函數(shù)到SparkDataFrame的每個(gè) group. 該函數(shù)被應(yīng)用到SparkDataFrame的每個(gè) group, 并且應(yīng)該只有兩個(gè)參數(shù): grouping key 和 Rdata.frame對(duì)應(yīng)的 key. 該 groups 從SparkDataFrame的 columns(列)中選擇. 函數(shù)的輸出應(yīng)該是data.frame. Schema 指定生成的SparkDataFramerow format. 它必須在 Sparkdata types 數(shù)據(jù)類型的基礎(chǔ)上表示 R 函數(shù)的輸出 schema(模式). 用戶可以設(shè)置返回的data.frame列名.

# Determine six waiting times with the largest eruption time in minutes.schema<-structType(structField("waiting","double"),structField("max_eruption","double"))result<-gapply(df,"waiting",function(key,x){y<-data.frame(key,max(x$eruptions))},schema)head(collect(arrange(result,"max_eruption",decreasing=TRUE)))##? ? waiting? max_eruption##1? ? ? 64? ? ? 5.100##2? ? ? 69? ? ? 5.067##3? ? ? 71? ? ? 5.033##4? ? ? 87? ? ? 5.000##5? ? ? 63? ? ? 4.933##6? ? ? 89? ? ? 4.900

gapplyCollect

像gapply那樣, 將函數(shù)應(yīng)用于SparkDataFrame的每個(gè)分區(qū)妖枚,并將結(jié)果收集回 R data.frame. 函數(shù)的輸出應(yīng)該是一個(gè)data.frame. 但是廷臼,不需要傳遞 schema(模式). 請(qǐng)注意,如果在所有分區(qū)上運(yùn)行的 UDF 的輸出無(wú)法 pull(拉)到 driver 的內(nèi)存, 那么gapplyCollect可能會(huì)失敗.

# Determine six waiting times with the largest eruption time in minutes.result<-gapplyCollect(df,"waiting",function(key,x){y<-data.frame(key,max(x$eruptions))colnames(y)<-c("waiting","max_eruption")y})head(result[order(result$max_eruption,decreasing=TRUE),])##? ? waiting? max_eruption##1? ? ? 64? ? ? 5.100##2? ? ? 69? ? ? 5.067##3? ? ? 71? ? ? 5.033##4? ? ? 87? ? ? 5.000##5? ? ? 63? ? ? 4.933##6? ? ? 89? ? ? 4.900

使用spark.lapply分發(fā)運(yùn)行一個(gè)本地的 R 函數(shù)

spark.lapply

類似于本地 R 中的lapply,spark.lapply在元素列表中運(yùn)行一個(gè)函數(shù)绝页,并使用 Spark 分發(fā)計(jì)算. 以類似于doParallel或lapply的方式應(yīng)用于列表的元素. 所有計(jì)算的結(jié)果應(yīng)該放在一臺(tái)機(jī)器上. 如果不是這樣, 他們可以像df < - createDataFrame(list)這樣做, 然后使用dapply.

# Perform distributed training of multiple models with spark.lapply. Here, we pass# a read-only list of arguments which specifies family the generalized linear model should be.families<-c("gaussian","poisson")train<-function(family){model<-glm(Sepal.Length~Sepal.Width+Species,iris,family=family)summary(model)}# Return a list of model's summariesmodel.summaries<-spark.lapply(families,train)# Print the summary of each modelprint(model.summaries)

SparkR 中運(yùn)行 SQL 查詢

A SparkDataFrame can also be registered as a temporary view in Spark SQL and that allows you to run SQL queries over its data. Thesqlfunction enables applications to run SQL queries programmatically and returns the result as aSparkDataFrame.

# Load a JSON filepeople<-read.df("./examples/src/main/resources/people.json","json")# Register this SparkDataFrame as a temporary view.createOrReplaceTempView(people,"people")# SQL statements can be run by using the sql methodteenagers<-sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")head(teenagers)##? ? name##1 Justin

機(jī)器學(xué)習(xí)

算法

SparkR 現(xiàn)支持下列機(jī)器學(xué)習(xí)算法:

分類

spark.logit:邏輯回歸 Logistic Regression

spark.mlp:多層感知 (MLP)

spark.naiveBayes:樸素貝葉斯

spark.svmLinear:線性支持向量機(jī)

回歸

spark.survreg:加速失敗時(shí)間生存模型 Accelerated Failure Time (AFT) Survival Model

spark.glmorglm:廣義線性模型 Generalized Linear Model (GLM)

spark.isoreg:保序回歸

spark.gbt:梯度提升樹 for回歸and分類

spark.randomForest:隨機(jī)森林 for回歸and分類

聚類

spark.bisectingKmeans:二分k均值

spark.gaussianMixture:高斯混合模型 (GMM)

spark.kmeans:K-Means

spark.lda:隱含狄利克雷分布 (LDA)

協(xié)同過(guò)濾

spark.als:交替最小二乘 (ALS)

頻繁模式挖掘

spark.fpGrowth:FP-growth

統(tǒng)計(jì)

spark.kstest:柯爾莫哥洛夫-斯米爾諾夫檢驗(yàn)

SparkR 底層實(shí)現(xiàn)使用 MLlib 來(lái)訓(xùn)練模型. 有關(guān)示例代碼荠商,請(qǐng)參閱MLlib用戶指南的相應(yīng)章節(jié). 用戶可以調(diào)用summary輸出擬合模型的摘要, 利用模型對(duì)數(shù)據(jù)進(jìn)行預(yù)測(cè), 并且使用write.ml/read.ml來(lái) 保存/加載擬合的模型 . SparkR 支持對(duì)模型擬合使用部分R的公式運(yùn)算符, 包括 ‘~’, ‘.’, ‘:’, ‘+’, 和 ‘-‘.

模型持久化

下面的例子展示了SparkR如何 保存/加載 機(jī)器學(xué)習(xí)模型.

training<-read.df("data/mllib/sample_multiclass_classification_data.txt",source="libsvm")# Fit a generalized linear model of family "gaussian" with spark.glmdf_list<-randomSplit(training,c(7,3),2)gaussianDF<-df_list[[1]]gaussianTestDF<-df_list[[2]]gaussianGLM<-spark.glm(gaussianDF,label~features,family="gaussian")# Save and then load a fitted MLlib modelmodelPath<-tempfile(pattern="ml",fileext=".tmp")write.ml(gaussianGLM,modelPath)gaussianGLM2<-read.ml(modelPath)# Check model summarysummary(gaussianGLM2)# Check model predictiongaussianPredictions<-predict(gaussianGLM2,gaussianTestDF)head(gaussianPredictions)unlink(modelPath)

Find full example code at "examples/src/main/r/ml/ml.R" in the Spark repo.

R和Spark之間的數(shù)據(jù)類型映射

RSpark

bytebyte

integerinteger

floatfloat

doubledouble

numericdouble

characterstring

stringstring

binarybinary

rawbinary

logicalboolean

POSIXcttimestamp

POSIXlttimestamp

Datedate

arrayarray

listarray

envmap

Structured Streaming

SparkR 支持 Structured Streaming API (測(cè)試階段). Structured Streaming 是一個(gè) 構(gòu)建于SparkSQL引擎之上的易拓展、可容錯(cuò)的流式處理引擎. 更多信息請(qǐng)參考 R APIStructured Streaming Programming Guide

R 函數(shù)名沖突

當(dāng)在R中加載或引入(attach)一個(gè)新package時(shí), 可能會(huì)發(fā)生函數(shù)名沖突,一個(gè)函數(shù)掩蓋了另一個(gè)函數(shù)

下列函數(shù)是被SparkR所掩蓋的:

被掩蓋函數(shù)如何獲取

covinpackage:statsstats::cov(x, y = NULL, use = "everything",

method = c("pearson", "kendall", "spearman"))

filterinpackage:statsstats::filter(x, filter, method = c("convolution", "recursive"),

sides = 2, circular = FALSE, init)

sampleinpackage:basebase::sample(x, size, replace = FALSE, prob = NULL)

由于SparkR的一部分是在dplyr軟件包上建模的续誉,因此SparkR中的某些函數(shù)與dplyr中同名. 根據(jù)兩個(gè)包的加載順序, 后加載的包會(huì)掩蓋先加載的包的部分函數(shù). 在這種情況下, 可以在函數(shù)名前指定包名前綴, 例如:SparkR::cume_dist(x)ordplyr::cume_dist(x).

你可以在 R 中使用search()檢查搜索路徑

遷移指南

SparkR 1.5.x 升級(jí)至 1.6.x

在Spark 1.6.0 之前, 寫入模式默認(rèn)值為append. 在 Spark 1.6.0 改為error匹配 Scala API.

SparkSQL 將R 中的NA轉(zhuǎn)換為null,反之亦然.

SparkR 1.6.x 升級(jí)至 2.0

table方法已經(jīng)移除并替換為tableToDF.

類DataFrame已改名為SparkDataFrame避免名稱沖突.

Spark的SQLContext和HiveContext已經(jīng)過(guò)時(shí)并替換為SparkSession. 相應(yīng)的摒棄sparkR.init()而通過(guò)調(diào)用sparkR.session()來(lái)實(shí)例化SparkSession. 一旦實(shí)例化完成, 當(dāng)前的SparkSession即可用于SparkDataFrame 操作(注釋:spark2.0開始所有的driver實(shí)例通過(guò)sparkSession來(lái)進(jìn)行構(gòu)建).

sparkR.session不支持sparkExecutorEnv參數(shù).要為executors設(shè)置環(huán)境莱没,請(qǐng)使用前綴”spark.executorEnv.VAR_NAME”設(shè)置Spark配置屬性,例如”spark.executorEnv.PATH”, -sqlContext不再需要下列函數(shù):createDataFrame,as.DataFrame,read.json,jsonFile,read.parquet,parquetFile,read.text,sql,tables,tableNames,cacheTable,uncacheTable,clearCache,dropTempTable,read.df,loadDF,createExternalTable.

registerTempTable方法已經(jīng)過(guò)期并且替換為createOrReplaceTempView.

dropTempTable方法已經(jīng)過(guò)期并且替換為dropTempView.

scSparkContext 參數(shù)不再需要下列函數(shù):setJobGroup,clearJobGroup,cancelJobGroup

升級(jí)至 SparkR 2.1.0

join不再執(zhí)行笛卡爾積計(jì)算, 使用crossJoin來(lái)進(jìn)行笛卡爾積計(jì)算.

升級(jí)至 SparkR 2.2.0

createDataFrame和as.DataFrame添加numPartitions參數(shù). 數(shù)據(jù)分割時(shí), 分區(qū)位置計(jì)算已經(jīng)與scala計(jì)算相一致.

方法createExternalTable已經(jīng)過(guò)期并且替換為createTable. 可以調(diào)用這兩種方法來(lái)創(chuàng)建外部或托管表. 已經(jīng)添加額外的 catalog 方法.

默認(rèn)情況下酷鸦,derby.log現(xiàn)在已保存到tempdir()目錄中. 當(dāng)實(shí)例化SparkSession且選項(xiàng)enableHiveSupport 為TRUE,會(huì)創(chuàng)建derby.log .

更正spark.lda錯(cuò)誤設(shè)置優(yōu)化器的bug.

更新模型概況輸出coefficientsasmatrix. 更新的模型概況包括spark.logit,spark.kmeans,spark.glm.spark.gaussianMixture的模型概況已經(jīng)添加對(duì)數(shù)概度(log-likelihood)loglik.

我們一直在努力

apachecn/spark-doc-zh

原文地址: http://spark.apachecn.org/docs/cn/2.2.0/sparkr.html

網(wǎng)頁(yè)地址: http://spark.apachecn.org/

github: https://github.com/apachecn/spark-doc-zh(覺(jué)得不錯(cuò)麻煩給個(gè) Star郊愧,謝謝!~)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末井佑,一起剝皮案震驚了整個(gè)濱河市属铁,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌躬翁,老刑警劉巖焦蘑,帶你破解...
    沈念sama閱讀 219,366評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異盒发,居然都是意外死亡例嘱,警方通過(guò)查閱死者的電腦和手機(jī)狡逢,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,521評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)拼卵,“玉大人奢浑,你說(shuō)我怎么就攤上這事∫溉” “怎么了雀彼?”我有些...
    開封第一講書人閱讀 165,689評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)即寡。 經(jīng)常有香客問(wèn)我徊哑,道長(zhǎng),這世上最難降的妖魔是什么聪富? 我笑而不...
    開封第一講書人閱讀 58,925評(píng)論 1 295
  • 正文 為了忘掉前任莺丑,我火速辦了婚禮,結(jié)果婚禮上墩蔓,老公的妹妹穿的比我還像新娘梢莽。我一直安慰自己,他們只是感情好奸披,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,942評(píng)論 6 392
  • 文/花漫 我一把揭開白布蟹漓。 她就那樣靜靜地躺著,像睡著了一般源内。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上份殿,一...
    開封第一講書人閱讀 51,727評(píng)論 1 305
  • 那天膜钓,我揣著相機(jī)與錄音,去河邊找鬼卿嘲。 笑死颂斜,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的拾枣。 我是一名探鬼主播沃疮,決...
    沈念sama閱讀 40,447評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼梅肤!你這毒婦竟也來(lái)了司蔬?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,349評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤姨蝴,失蹤者是張志新(化名)和其女友劉穎俊啼,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體左医,經(jīng)...
    沈念sama閱讀 45,820評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡授帕,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,990評(píng)論 3 337
  • 正文 我和宋清朗相戀三年同木,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片跛十。...
    茶點(diǎn)故事閱讀 40,127評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡彤路,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出芥映,到底是詐尸還是另有隱情洲尊,我是刑警寧澤,帶...
    沈念sama閱讀 35,812評(píng)論 5 346
  • 正文 年R本政府宣布屏轰,位于F島的核電站颊郎,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏霎苗。R本人自食惡果不足惜姆吭,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,471評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望唁盏。 院中可真熱鬧内狸,春花似錦、人聲如沸厘擂。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,017評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)刽严。三九已至昂灵,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間舞萄,已是汗流浹背眨补。 一陣腳步聲響...
    開封第一講書人閱讀 33,142評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留倒脓,地道東北人撑螺。 一個(gè)月前我還...
    沈念sama閱讀 48,388評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像崎弃,于是被迫代替她去往敵國(guó)和親甘晤。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,066評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容