前期準(zhǔn)備:
cassandra集群(可以參考網(wǎng)站 https://cassandrazh.github.io/)
spark集群(可以參考我的文章 http://www.reibang.com/p/756209fa7078)
1房官、spark中配置cassandar相應(yīng)的jar包
不配置會(huì)報(bào)如下異常:ClassNotFoundException:com.datastax.spark.connector.rdd.partitioner.CassandraPartition
怎么配置:
?$SPARK_HOME/bin/spark-shell --packages datastax:spark-cassandra-connector:1.6.0-s_2.10
1.6.0指的是spark的版本蜜猾,s_2.10是scala的版本蝴蜓。
具體版本對(duì)應(yīng)可以參看這個(gè)網(wǎng)站:https://spark-packages.org/package/datastax/spark-cassandra-connector
如果用spark-shell命令提交不成功典唇,可以直接在SPARK的spark-env.sh配置中加入SPARK_CLASSPATH
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/spark/spark-cassandra-lib/datastax_spark-cassandra-connector-1.6.0-s_2.10.jar:/opt/spark/spark-cassandra-lib/com.datastax.cassandra_cassandra-driver-core-3.0.0.jar:/opt/spark/spark-cassandra-lib/com.google.guava_guava-16.0.1.jar:/opt/spark/spark-cassandra-lib/com.twitter_jsr166e-1.1.0.jar
記得集群中的每臺(tái)spark都要加呕臂,具體版本具體下載。
2负芋、Eclipse新建一個(gè)maven項(xiàng)目
引入相關(guān)包(這里引入的spark包版本一定要對(duì)應(yīng)上集群環(huán)境安裝的spark版本算柳,否則序列化和反序列會(huì)失敗):
分別引入:spark-core翠勉,spark-sql妖啥,spark-cassandra-connector
groupId:org.apache.spark
artifactId:spark-core_2.10
version:1.6.0
groupId:org.apache.spark
artifactId:spark-sql_2.10
version:1.6.0
groupId:com.datastax.spark
artifactId:spark-cassandra-connector_2.10
version:1.6.0
如果出現(xiàn)因?yàn)間oogle?guava造成的錯(cuò)誤,可以在引用上面3個(gè)包的時(shí)候去掉google guava对碌,然后單獨(dú)引用一個(gè)版本.
版本不一樣的錯(cuò)誤異常:
Caused by: java.lang.RuntimeException: java.io.InvalidClassException: org.apache.spark.rpc.RpcEndpointRef; local class incompatible: stream classdesc serialVersionUID = -1223633663228316618, local class serialVersionUID = 18257903091306170
3荆虱、新建一個(gè)測(cè)試類(lèi)SparkCassandraConnector(基于spark1.6,注意spark2.x開(kāi)始不一樣,后文有提到)
public class SparkCassandraConnector {
????private static final String APP_NAME = "SPARK_CASSANDRA_TESTI"; //這里隨便寫(xiě)
????private static final String MASTER = "spark://172.16.101.60:7077"; //spark集群地址
????private static final String HOST = "172.16.101.60"; //cassandra安裝主機(jī)地址
? ? private static SparkConf conf = null;
????private static SparkContext sparkContext = null;
????static {
????????conf = new SparkConf();
????????conf.setAppName(APP_NAME);
????????conf.setMaster(MASTER);
????????conf.set("spark.cassandra.connection.host", HOST);
????????sparkContext = new SparkContext(conf);? // 初始化一個(gè)sparkContext
? ? ? ? /* 這里要注意幾個(gè)地方:1怀读、1.6用的是sparkcontext,2.x開(kāi)始用sparksession诉位。
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?2、spark集群中每次只能有一個(gè)sparkcontext,當(dāng)然可以通過(guò)配置修改
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?3菜枷、基于以上苍糠,所以這里初始化一個(gè)sparkcontext,并且不關(guān)閉它(缺點(diǎn)也顯而易見(jiàn))
? ? ? ? ? ?*/
? ? }
????public static void main(String[] args) {
????????CassandraSQLContext csc = new CassandraSQLContext(sparkContext);
????????csc.setKeyspace("bi"); // 設(shè)置cassandra的keyspace 啤誊,keyspace有點(diǎn)類(lèi)似數(shù)據(jù)庫(kù)的意思
????????csc.sql("select * from test").show(); // 這里可以寫(xiě)任何spark支持的sql語(yǔ)句
? ?}
}
spark2開(kāi)始spark開(kāi)始使用spark session,而且datasetx將spark-cassandra-connector中的CassandraSQLContext類(lèi)移除了:
先得到spark session:
SparkSession spark = SparkSession.builder().appName(APP_NAME).config("spark.cassandra.connection.host",HOST).master(MASTER).getOrCreate();
然后將spark的配置文件傳遞給CassandraConnector:
CassandraConnector connector = CassandraConnector.apply(spark.sparkContext().conf());
這里可以直接通過(guò)CassandraConnector這個(gè)對(duì)象去對(duì)cassandra建表岳瞭,刪除表:
Sessionsession=connector.openSession();
session.execute("CREATE TABLE mykeyspace.people(id UUID PRIMARY KEY, username TEXT, email TEXT)");
通過(guò)spark得到spark的dataset:
Dataset dataset = spark.read().format("org.apache.spark.sql.cassandra").options(new HashMap() {
{
put("keyspace", "bi"); // cassandra keyspace
put("table", "people"); // cassandra表名
}
}).load();
Dataset dataset2 = spark.read().format("org.apache.spark.sql.cassandra").options(new HashMap() {
{
put("keyspace", "bi");
put("table", "people2");
}
}).load();
上面構(gòu)造了兩個(gè)dataset,分別是dataset和dataset2,我假設(shè)這2個(gè)dataset表結(jié)構(gòu)一樣蚊锹,現(xiàn)在對(duì)他做左連接操作:
dataset.createOrReplaceTempView("usertable");
dataset2.createOrReplaceTempView("usertable2");
Dataset dataset3 = spark.sql("select * from usertable left join usertable2 on usertable.id = usertable2.id");
dataset3.show();
不用spark集群瞳筏,用local模式:
spark是可以不用安裝集群,用local模式來(lái)寫(xiě)出demo的牡昆。
只要將上面的MASTER參數(shù)改為local就行了姚炕。
但是spark2改為local模式的時(shí)候要注意,要在配置中設(shè)置:config("spark.sql.warehouse.dir", "file:///D://333")丢烘;