[SparkR]

words <- flatMap(data, function(line) {
  strsplit(line, " ")[[1]]
})
wordCount <- lapply(words, function(word) {
  list(word, 1L)
})
counts <- reduceBykey(wordCount, "+", 2L)
output <- collect(counts)
for (wordCount in output) {
  cat(wordCount[[1]], ": ", wordCount[[2]], "\n")
}
#dataframe.R
library(SparkR)

# Initialize SparkContext and SQLContext
sc <- sparkR.init(appName="SparkR-DataFrame-example")
sqlContext <- sparkRSQL.init(sc)

hadoop fs -rm /user/yyl/alldata.csv
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.0.3" "sparkr-shell"')
# Create a DataFrame from a JSON file
path <- file.path("hdfs://hadoop-namenode1:8020/user/yyl/alldata.csv")
peopleDF <- read.text(sqlContext, path)
printSchema(peopleDF)

# Register this DataFrame as a table.
registerTempTable(peopleDF, "people")

# SQL statements can be run by using the sql methods provided by sqlContext
teenagers <- sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age <= 19")

# Call collect to get a local data.frame
teenagersLocalDF <- collect(teenagers)

# Print the teenagers in our dataset 
print(teenagersLocalDF)
#data-manipulation.R
library(SparkR)
hadoop dfs -copyFromLocal /opt/cloudera/parcels/spark-1.6.2-bin-cdh5/data/hoho /user/yyl
args <- commandArgs(trailing = TRUE)
# Provides access to a copy of the command line arguments supplied when this R session was invoked.

if (length(args) != 1) {
  print("Usage: data-manipulation.R <path-to-flights.csv")
  print("The data can be downloaded from: http://s3-us-west-2.amazonaws.com/sparkr-data/flights.csv ")
  q("no")
}

## Initialize SparkContext
sc <- sparkR.init(appName = "SparkR-data-manipulation-example")

## Initialize SQLContext
sqlContext <- sparkRSQL.init(sc)

flightsCsvPath <- args[[1]]

# Create a local R dataframe
flights_df <- read.csv(flightsCsvPath, header = TRUE)
flights_df$date <- as.Date(flights_df$date)

## Filter flights whose destination is San Francisco and write to a local data frame
SFO_df <- flights_df[flights_df$dest == "SFO", ] 

# Convert the local data frame into a SparkR DataFrame
SFO_DF <- createDataFrame(sqlContext, SFO_df)

#  Directly create a SparkR DataFrame from the source data
flightsDF <- read.df(sqlContext, flightsCsvPath, source = "com.databricks.spark.csv", header = "true")

# Print the schema of this Spark DataFrame
printSchema(flightsDF)

# Cache the DataFrame
cache(flightsDF)

# Print the first 6 rows of the DataFrame
showDF(flightsDF, numRows = 6) ## Or
head(flightsDF)

# Show the column names in the DataFrame
columns(flightsDF)

# Show the number of rows in the DataFrame
count(flightsDF)

# Select specific columns
destDF <- select(flightsDF, "dest", "cancelled")

# Using SQL to select columns of data
# First, register the flights DataFrame as a table
registerTempTable(flightsDF, "flightsTable")
destDF <- sql(sqlContext, "SELECT dest, cancelled FROM flightsTable")

# Use collect to create a local R data frame
local_df <- collect(destDF)

# Print the newly created local data frame
head(local_df)

# Filter flights whose destination is JFK
jfkDF <- filter(flightsDF, "dest = \"JFK\"") ##OR
jfkDF <- filter(flightsDF, flightsDF$dest == "JFK")

# If the magrittr library is available, we can use it to
# chain data frame operations
if("magrittr" %in% rownames(installed.packages())) {
  library(magrittr)

  # Group the flights by date and then find the average daily delay
  # Write the result into a DataFrame
  groupBy(flightsDF, flightsDF$date) %>%
    summarize(avg(flightsDF$dep_delay), avg(flightsDF$arr_delay)) -> dailyDelayDF

  # Print the computed data frame
  head(dailyDelayDF)
}

# Stop the SparkContext now
sparkR.stop()
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末莱找,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖淳衙,帶你破解...
    沈念sama閱讀 211,639評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異尉辑,居然都是意外死亡腐晾,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門川尖,熙熙樓的掌柜王于貴愁眉苦臉地迎上來登下,“玉大人,你說我怎么就攤上這事叮喳”环迹” “怎么了?”我有些...
    開封第一講書人閱讀 157,221評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵馍悟,是天一觀的道長畔濒。 經(jīng)常有香客問我,道長锣咒,這世上最難降的妖魔是什么侵状? 我笑而不...
    開封第一講書人閱讀 56,474評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮宠哄,結(jié)果婚禮上壹将,老公的妹妹穿的比我還像新娘。我一直安慰自己毛嫉,他們只是感情好诽俯,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,570評(píng)論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著承粤,像睡著了一般暴区。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上辛臊,一...
    開封第一講書人閱讀 49,816評(píng)論 1 290
  • 那天仙粱,我揣著相機(jī)與錄音,去河邊找鬼彻舰。 笑死伐割,一個(gè)胖子當(dāng)著我的面吹牛候味,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播隔心,決...
    沈念sama閱讀 38,957評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼白群,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了硬霍?” 一聲冷哼從身側(cè)響起帜慢,我...
    開封第一講書人閱讀 37,718評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎唯卖,沒想到半個(gè)月后粱玲,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,176評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡拜轨,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,511評(píng)論 2 327
  • 正文 我和宋清朗相戀三年抽减,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片撩轰。...
    茶點(diǎn)故事閱讀 38,646評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡胯甩,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出堪嫂,到底是詐尸還是另有隱情偎箫,我是刑警寧澤,帶...
    沈念sama閱讀 34,322評(píng)論 4 330
  • 正文 年R本政府宣布皆串,位于F島的核電站淹办,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏恶复。R本人自食惡果不足惜怜森,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,934評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望谤牡。 院中可真熱鬧副硅,春花似錦、人聲如沸翅萤。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽套么。三九已至培己,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間胚泌,已是汗流浹背省咨。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評(píng)論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留玷室,地道東北人零蓉。 一個(gè)月前我還...
    沈念sama閱讀 46,358評(píng)論 2 360
  • 正文 我出身青樓笤受,卻偏偏與公主長得像,于是被迫代替她去往敵國和親壁公。 傳聞我的和親對(duì)象是個(gè)殘疾皇子感论,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,514評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • 十年就如此結(jié)束了,2016年過去之后紊册,世界如也是如往常一般運(yùn)轉(zhuǎn)吧。從03年記憶開始變得清晰快耿,每一年感覺都是切切實(shí)...
    浮動(dòng)浮士德閱讀 223評(píng)論 0 0
  • 通過Application類實(shí)現(xiàn)維護(hù)應(yīng)用全局狀態(tài)的方法 application回調(diào)函數(shù) Application對(duì)象...
    敘憶閱讀 604評(píng)論 0 0