本文檔介紹了如何配置虛擬機(jī)spark python開發(fā)環(huán)境,以及簡(jiǎn)要的開發(fā)指南膏蚓。
環(huán)境配置
環(huán)境配置請(qǐng)參考文檔:虛擬機(jī)配置Jupyter+Pyspark
交互式界面
Spark為我們提供了一個(gè)交互式界面運(yùn)行我們的代碼,在命令行下運(yùn)行pyspark
$ pyspark
在交互式界面下蓖租,pyspark會(huì)自動(dòng)幫我們創(chuàng)建SparkContext sc覆旱,和HiveContext作為sqlContext直接使用汛闸。sc可以用來(lái)讀取HDFS上的文件,sqlContext可以訪問(wèn)我們的Hive數(shù)據(jù)庫(kù)平痰。
#讀取文件sc.textFile("/path/to/file").first()
#讀取數(shù)據(jù)庫(kù)sqlContext.sql("select count(1) from tmp.your_table").show()
應(yīng)用開發(fā)
spark也支持提交一個(gè)py文件運(yùn)行汞舱,以下是一個(gè)spark python應(yīng)用基本結(jié)構(gòu)
"""SimpleApp.py"""
from pyspark import SparkContext
#if you need access hive
from pyspark import HiveContext
sc = SparkContext("YourAppName")
#if you need access hivesqlContext = HiveContext(sc)
#your code here
sc.stop()
運(yùn)行我們開發(fā)的py文件
spark-submit yourappname.py
引用第三方依賴
Spark允許我們?cè)谶\(yùn)行時(shí)添加我們需要的第三方依賴
#在創(chuàng)建sc時(shí)添加
sc = SparkContext(pyFiles=["xx1.py","xx2.py"])
#在運(yùn)行時(shí)新增
sc.addPyFile("jieba.zip")
一個(gè)完整的示例
如下是一個(gè)完整的示例,展示了如果通過(guò)Hive讀取數(shù)據(jù)宗雇,加載第三方依賴jieba分詞昂芜,然后使用ml包提供的kmeans方法對(duì)句子做了一個(gè)分類。
我們?cè)诩荷嫌腥缦卤怼?/p>
select * from tmp.pyspark_cluster_example order by id
+----+--------------------+
| id | sentence |
+----+--------------------+
| 1 | 小許喜歡胸大的妹子 |
| 2 | 小陳喜歡可愛的妹子 |
| 3 | 我喜歡腿長(zhǎng)的妹子 |
+----+--------------------+
完整代碼如下
# -*- coding: utf-8 -*-
from pyspark import SparkContext
from pyspark import HiveContext
from pyspark.ml.feature import Word2Vec
from pyspark.ml.clustering import KMeans
#創(chuàng)建sc的同時(shí)聲明第三方依賴jieba.zip赔蒲。
sc = SparkContext(appName="example", pyFiles=["jieba.zip"])
sqlContext = HiveContext(sc)
#導(dǎo)入jieba分詞
import jieba
#從Hive表讀取數(shù)據(jù)
df = sqlContext.sql("select * from tmp.pyspark_cluster_example order by id")
#使用結(jié)巴分詞對(duì)句子進(jìn)行分詞
wordsDf = df.map(lambda r:[r.id, r.sentence,jieba.lcut(r.sentence)]).toDF(["id","sentence", "words"])
#使用Word2Vec把文本變成向量
word2Vec = Word2Vec(minCount=0, inputCol="words", outputCol="features")
model = word2Vec.fit(wordsDf)
featureDf = model.transform(wordsDf)
#分類kmeans = KMeans(k=2)
model = kmeans.fit(featureDf)
resultDf = model.transform(featureDf).select("id", "sentence", "prediction")
centers = model.clusterCenters()
print(len(centers))
#查看結(jié)果
for r in resultDf.select("*",).take(6):
print(r.sentence.encode("utf-8"))
print(r.prediction)
通過(guò)如下代碼提交運(yùn)行
spark-submit --master=local cluster.py
運(yùn)行結(jié)果如下
小許喜歡胸大的妹子1小陳喜歡可愛的妹子1我喜歡腿長(zhǎng)的妹子1紅燒肥腸好好吃0紅燒排骨好好吃0糖醋排骨好好吃0
參考文檔
https://spark.apache.org/docs/1.6.0/api/python/index.html
https://spark.apache.org/docs/1.6.0/ml-features.html