java spark sql操作cassandar

前期準(zhǔn)備:

cassandra集群(可以參考網(wǎng)站 https://cassandrazh.github.io/

spark集群(可以參考我的文章 http://www.reibang.com/p/756209fa7078


1房官、spark中配置cassandar相應(yīng)的jar包

不配置會(huì)報(bào)如下異常:ClassNotFoundException:com.datastax.spark.connector.rdd.partitioner.CassandraPartition

怎么配置:

?$SPARK_HOME/bin/spark-shell --packages datastax:spark-cassandra-connector:1.6.0-s_2.10

1.6.0指的是spark的版本蜜猾,s_2.10是scala的版本蝴蜓。

具體版本對(duì)應(yīng)可以參看這個(gè)網(wǎng)站:https://spark-packages.org/package/datastax/spark-cassandra-connector

如果用spark-shell命令提交不成功典唇,可以直接在SPARK的spark-env.sh配置中加入SPARK_CLASSPATH

export SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/spark/spark-cassandra-lib/datastax_spark-cassandra-connector-1.6.0-s_2.10.jar:/opt/spark/spark-cassandra-lib/com.datastax.cassandra_cassandra-driver-core-3.0.0.jar:/opt/spark/spark-cassandra-lib/com.google.guava_guava-16.0.1.jar:/opt/spark/spark-cassandra-lib/com.twitter_jsr166e-1.1.0.jar

記得集群中的每臺(tái)spark都要加呕臂,具體版本具體下載。

2负芋、Eclipse新建一個(gè)maven項(xiàng)目

引入相關(guān)包(這里引入的spark包版本一定要對(duì)應(yīng)上集群環(huán)境安裝的spark版本算柳,否則序列化和反序列會(huì)失敗):

分別引入:spark-core翠勉,spark-sql妖啥,spark-cassandra-connector

groupId:org.apache.spark

artifactId:spark-core_2.10

version:1.6.0

groupId:org.apache.spark

artifactId:spark-sql_2.10

version:1.6.0

groupId:com.datastax.spark

artifactId:spark-cassandra-connector_2.10

version:1.6.0

如果出現(xiàn)因?yàn)間oogle?guava造成的錯(cuò)誤,可以在引用上面3個(gè)包的時(shí)候去掉google guava对碌,然后單獨(dú)引用一個(gè)版本.

版本不一樣的錯(cuò)誤異常:

Caused by: java.lang.RuntimeException: java.io.InvalidClassException: org.apache.spark.rpc.RpcEndpointRef; local class incompatible: stream classdesc serialVersionUID = -1223633663228316618, local class serialVersionUID = 18257903091306170

3荆虱、新建一個(gè)測(cè)試類(lèi)SparkCassandraConnector(基于spark1.6,注意spark2.x開(kāi)始不一樣,后文有提到

public class SparkCassandraConnector {

????private static final String APP_NAME = "SPARK_CASSANDRA_TESTI"; //這里隨便寫(xiě)

????private static final String MASTER = "spark://172.16.101.60:7077"; //spark集群地址

????private static final String HOST = "172.16.101.60"; //cassandra安裝主機(jī)地址

? ? private static SparkConf conf = null;

????private static SparkContext sparkContext = null;

????static {

????????conf = new SparkConf();

????????conf.setAppName(APP_NAME);

????????conf.setMaster(MASTER);

????????conf.set("spark.cassandra.connection.host", HOST);

????????sparkContext = new SparkContext(conf);? // 初始化一個(gè)sparkContext

? ? ? ? /* 這里要注意幾個(gè)地方:1怀读、1.6用的是sparkcontext,2.x開(kāi)始用sparksession诉位。

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?2、spark集群中每次只能有一個(gè)sparkcontext,當(dāng)然可以通過(guò)配置修改

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?3菜枷、基于以上苍糠,所以這里初始化一個(gè)sparkcontext,并且不關(guān)閉它(缺點(diǎn)也顯而易見(jiàn))

? ? ? ? ? ?*/

? ? }

????public static void main(String[] args) {

????????CassandraSQLContext csc = new CassandraSQLContext(sparkContext);

????????csc.setKeyspace("bi"); // 設(shè)置cassandra的keyspace 啤誊,keyspace有點(diǎn)類(lèi)似數(shù)據(jù)庫(kù)的意思

????????csc.sql("select * from test").show(); // 這里可以寫(xiě)任何spark支持的sql語(yǔ)句

? ?}

}


spark2開(kāi)始spark開(kāi)始使用spark session,而且datasetx將spark-cassandra-connector中的CassandraSQLContext類(lèi)移除了:

先得到spark session:

SparkSession spark = SparkSession.builder().appName(APP_NAME).config("spark.cassandra.connection.host",HOST).master(MASTER).getOrCreate();

然后將spark的配置文件傳遞給CassandraConnector:

CassandraConnector connector = CassandraConnector.apply(spark.sparkContext().conf());

這里可以直接通過(guò)CassandraConnector這個(gè)對(duì)象去對(duì)cassandra建表岳瞭,刪除表:

Sessionsession=connector.openSession();

session.execute("CREATE TABLE mykeyspace.people(id UUID PRIMARY KEY, username TEXT, email TEXT)");

通過(guò)spark得到spark的dataset:

Dataset dataset = spark.read().format("org.apache.spark.sql.cassandra").options(new HashMap() {

{

put("keyspace", "bi"); // cassandra keyspace

put("table", "people"); // cassandra表名

}

}).load();

Dataset dataset2 = spark.read().format("org.apache.spark.sql.cassandra").options(new HashMap() {

{

put("keyspace", "bi");

put("table", "people2");

}

}).load();

上面構(gòu)造了兩個(gè)dataset,分別是dataset和dataset2,我假設(shè)這2個(gè)dataset表結(jié)構(gòu)一樣蚊锹,現(xiàn)在對(duì)他做左連接操作:

dataset.createOrReplaceTempView("usertable");

dataset2.createOrReplaceTempView("usertable2");

Dataset dataset3 = spark.sql("select * from usertable left join usertable2 on usertable.id = usertable2.id");

dataset3.show();


不用spark集群瞳筏,用local模式:

spark是可以不用安裝集群,用local模式來(lái)寫(xiě)出demo的牡昆。

只要將上面的MASTER參數(shù)改為local就行了姚炕。

但是spark2改為local模式的時(shí)候要注意,要在配置中設(shè)置:config("spark.sql.warehouse.dir", "file:///D://333")丢烘;

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末柱宦,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子播瞳,更是在濱河造成了極大的恐慌掸刊,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,544評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件狐史,死亡現(xiàn)場(chǎng)離奇詭異痒给,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)骏全,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)尼斧,“玉大人姜贡,你說(shuō)我怎么就攤上這事」卓茫” “怎么了楼咳?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,764評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)烛恤。 經(jīng)常有香客問(wèn)我母怜,道長(zhǎng),這世上最難降的妖魔是什么缚柏? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,193評(píng)論 1 292
  • 正文 為了忘掉前任苹熏,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘轨域。我一直安慰自己袱耽,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,216評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布干发。 她就那樣靜靜地躺著朱巨,像睡著了一般。 火紅的嫁衣襯著肌膚如雪枉长。 梳的紋絲不亂的頭發(fā)上冀续,一...
    開(kāi)封第一講書(shū)人閱讀 51,182評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音必峰,去河邊找鬼洪唐。 笑死,一個(gè)胖子當(dāng)著我的面吹牛自点,可吹牛的內(nèi)容都是我干的桐罕。 我是一名探鬼主播,決...
    沈念sama閱讀 40,063評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼桂敛,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼功炮!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起术唬,我...
    開(kāi)封第一講書(shū)人閱讀 38,917評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤薪伏,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后粗仓,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體嫁怀,經(jīng)...
    沈念sama閱讀 45,329評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,543評(píng)論 2 332
  • 正文 我和宋清朗相戀三年借浊,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了塘淑。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,722評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡蚂斤,死狀恐怖存捺,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情曙蒸,我是刑警寧澤捌治,帶...
    沈念sama閱讀 35,425評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站纽窟,受9級(jí)特大地震影響肖油,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜臂港,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,019評(píng)論 3 326
  • 文/蒙蒙 一森枪、第九天 我趴在偏房一處隱蔽的房頂上張望视搏。 院中可真熱鬧,春花似錦疲恢、人聲如沸凶朗。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,671評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)棚愤。三九已至,卻和暖如春杂数,著一層夾襖步出監(jiān)牢的瞬間没隘,已是汗流浹背捆探。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,825評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工蚓再, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留呆盖,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,729評(píng)論 2 368
  • 正文 我出身青樓那伐,卻偏偏與公主長(zhǎng)得像踏施,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子罕邀,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,614評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容