Spark的優(yōu)勢在于內(nèi)存計(jì)算,然而在計(jì)算中難免會用到一些元數(shù)據(jù)或中間數(shù)據(jù)黍判,有的存在關(guān)系型數(shù)據(jù)庫中豫尽,有的存在HDFS上,有的存在HBase中顷帖,但其讀寫速度都和Spark計(jì)算的速度相差甚遠(yuǎn)美旧,而Redis基于內(nèi)存的讀寫則可以完美解決此類問題渤滞,下面介紹Spark如何與Redis交互。
在Spark計(jì)算的時(shí)候如何加載Redis中的數(shù)據(jù)榴嗅,其實(shí)官方有現(xiàn)成的包和文檔妄呕,文檔是全英文,好在東西不多嗽测,下面介紹如何使用趴腋。
首先把jar包引入工程,在maven上居然找不到這個(gè)包论咏。优炬。。所以使用Maven和SBT的同學(xué)自行解決厅贪。下載地址(打不開的同學(xué)可以嘗試翻墻)
2018-11-06更新: 最新maven依賴已經(jīng)被中央倉庫收錄([https://github.com/RedisLabs/spark-redis](https://github.com/RedisLabs/spark-redis)
)
<dependency>
<groupId>RedisLabs</groupId>
<artifactId>spark-redis</artifactId>
<version>0.3.2</version>
</dependency>
可以看出提供的功能還是挺全面的刁卜,有單獨(dú)的redis分區(qū)伊佃,redisRDD,SQLAPI以及StreamingAPI
下面我們一點(diǎn)一點(diǎn)來做一個(gè)示例:
在這里先看看官方包中的一部分源碼:
/**
官方提供源碼包中解析Redis配置需要的字段
*/
case class RedisEndpoint(val host: String = Protocol.DEFAULT_HOST,
val port: Int = Protocol.DEFAULT_PORT,
val auth: String = null,
val dbNum: Int = Protocol.DEFAULT_DATABASE,
val timeout: Int = Protocol.DEFAULT_TIMEOUT)
extends Serializable {
/**
源碼中獲取配置的字段名及來源,可以看出是從SparkConf中讀取到相應(yīng)字段涌献,所以連接redis只需在SparkConf中set相應(yīng)字段即可
*/
def this(conf: SparkConf) {
this(
conf.get("redis.host", Protocol.DEFAULT_HOST),
conf.getInt("redis.port", Protocol.DEFAULT_PORT),
conf.get("redis.auth", null),
conf.getInt("redis.db", Protocol.DEFAULT_DATABASE),
conf.getInt("redis.timeout", Protocol.DEFAULT_TIMEOUT)
)
}
···
}
現(xiàn)在我們啟動(dòng)SparkContext
先引入Redis相關(guān)的隱式轉(zhuǎn)換
import com.redislabs.provider.redis._
//這里直接使用yarn-cluster模式
val conf = new SparkConf().setMaster("yarn-cluster").setAppName("sparkRedisTest")
conf.set("redis.host", "10.1.11.70") //host,隨便一個(gè)節(jié)點(diǎn),自動(dòng)發(fā)現(xiàn)
conf.set("redis.port", "6379") //端口號鳞溉,不填默認(rèn)為6379
//conf.set("redis.auth","null") //用戶權(quán)限配置
//conf.set("redis.db","0") //數(shù)據(jù)庫設(shè)置
//conf.set("redis.timeout","2000") //設(shè)置連接超時(shí)時(shí)間
val sc = new SparkContext(conf)
之后可以看到IDEA給出的提示弓柱,sc通過導(dǎo)入的隱式轉(zhuǎn)換可以調(diào)出的讀取Redis的方法,都是以fromRedis
開頭的悄谐,都是redis可以存儲的數(shù)據(jù)結(jié)構(gòu)介评,這里以常見的KV進(jìn)行示例
還是先扒一下源碼看看:
def fromRedisKV[T](keysOrKeyPattern: T,
partitionNum: Int = 3)
(implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))):
RDD[(String, String)] = {
keysOrKeyPattern match {
case keyPattern: String => fromRedisKeyPattern(keyPattern, partitionNum)(redisConfig).getKV
case keys: Array[String] => fromRedisKeys(keys, partitionNum)(redisConfig).getKV
case _ => throw new scala.Exception("KeysOrKeyPattern should be String or Array[String]")
}
}
先看傳入的參數(shù):
- 泛型類型
keysOrKeyPattern
從的模式匹配代碼中可以看出,這里的T
可是是兩種類型爬舰,一個(gè)是String
们陆,另一個(gè)是Array[String]
,如果傳入其他類型則會拋出運(yùn)行時(shí)異常,其中String
類型的意思是匹配鍵情屹,這里可以用通配符比如foo*
坪仇,所以返回值是一個(gè)結(jié)果集RDD[(String, String)]
,當(dāng)參數(shù)類型為Array[String]
時(shí)是指傳入key的數(shù)組垃你,返回的結(jié)果則為相應(yīng)的的結(jié)果集椅文,RDD的內(nèi)容類型也是KV形式。 -
Int
類型partitionNum
生成RDD的分區(qū)數(shù)惜颇,默認(rèn)為3
皆刺,如果傳入的第一個(gè)參數(shù)類型是Array[String]
,這個(gè)參數(shù)可以這樣設(shè)置官还,先預(yù)估一下返回結(jié)果集的大小芹橡,使用keyArr.length / num + 1
,這樣則保證分區(qū)的合理性望伦,以防發(fā)生數(shù)據(jù)傾斜林说。若第一個(gè)參數(shù)類型為String
煎殷,能預(yù)估盡量預(yù)估,如果實(shí)在沒辦法腿箩,比如確實(shí)在這里發(fā)生了數(shù)據(jù)傾斜豪直,可以嘗試考慮使用sc.fromRedisKeys()
返回key
的集合,提前把握返回結(jié)果集的大小珠移,或者根據(jù)集群機(jī)器數(shù)量弓乙,把握分區(qū)數(shù)。 - 柯里化形式隱式參數(shù)
redisConfig
由于我們之前在sparkConf
里面set了相應(yīng)的參數(shù)钧惧,這里不傳入這個(gè)參數(shù)即可暇韧。如要調(diào)整,則可以按照源碼中的方式傳入浓瞪,其中RedisEndpoint
是一個(gè)case class
類懈玻,而且很多參數(shù)都有默認(rèn)值(比如6379
的端口號),所以自己建立一個(gè)RedisEndpoint
也是非常方便的乾颁。
了解了參數(shù)之后來繼續(xù)完成測試代碼:
/*這里標(biāo)出了resultSet的類型*/
val resultSet:RDD[(String, String)] = sc.fromRedisKV("to*")
//找出鍵以`to`開頭的鍵值對涂乌,這里就不進(jìn)行計(jì)算了,直接保存到HDFS看結(jié)果如何,同時(shí)合并分區(qū)便于觀察結(jié)果
resultSet.coalesce(1).saveAsTextFile("HDFSpath")
現(xiàn)在往redis里面隨便set幾個(gè)數(shù)據(jù)
打包之后運(yùn)行英岭,命令為:
spark-submit --master yarn --deploy-mode cluster --class test.SparkRedis --jars jedis-2.9.0.jar,spark-redis-0.3.2.jar,/opt/cloudera/parcels/CDH/jars/commons-pool2-2.2.jar --driver-class-path jedis-2.9.0.jar,spark-redis-0.3.2.jar,/opt/cloudera/parcels/CDH/jars/commons-pool2-2.2.jar spark-redis.jar
命令中指明了依賴的資源包:jedis-2.9.0.jar,spark-redis-0.3.2.jar,commons-pool2-2.2.jar
其中commons-pool2-2.2.jar
是spark-redis依賴的包湾盒,如果集群環(huán)境為CDH發(fā)行版,可在/opt/cloudera/parcels/CDH/jars/commons-pool2-2.2.jar
下找到該包诅妹,罚勾,而且yarn的運(yùn)行環(huán)境里面沒有默認(rèn)引入該包;如果為自建環(huán)境漾唉,則需要自行下載該包荧库,Maven上搜索commons-pool2
即可。
等待執(zhí)行成功
再看一下DAG圖
運(yùn)行結(jié)果
和預(yù)期的一樣赵刑,以to
開頭的數(shù)據(jù)都被找到。
如果傳入的是key數(shù)組
val keys = Array[String]("high", "abc", "together")
sc.fromRedisKV(keys).coalesce(1).saveAsTextFile("hdfs://nameservice1/spark/test/redisResult2")
結(jié)果如下:
完整代碼:
import org.apache.spark.{SparkConf, SparkContext}
import com.redislabs.provider.redis._
object SparkRedis extends App {
val conf = new SparkConf().setMaster("yarn-cluster").setAppName("sparkRedisTest")
conf.set("redis.host", "10.1.11.70")
val sc = new SparkContext(conf)
val keys = Array[String]("high", "abc", "together")
sc.fromRedisKV(keys).coalesce(1).saveAsTextFile("hdfs://nameservice1/spark/test/redisResult2")
}
下面看如何寫入Redis
還是以常見的KV為例
源碼中是這樣處理的场刑,接收兩個(gè)參數(shù)般此,RDD類型為
RDD[(String, String)
,第二個(gè)為失效時(shí)間
def toRedisKV(kvs: RDD[(String, String)], ttl: Int = 0)
(implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))) {
kvs.foreachPartition(partition => setKVs(partition, ttl, redisConfig))
}
測試代碼為:
val data = Seq[(String,String)](("high","111"), ("abc","222"), ("together","333"))
val redisData:RDD[(String,String)] = sc.parallelize(data)
sc.toRedisKV(redisData)
先清空一下redis
打包后按相同的命令提交到集群并執(zhí)行成功后即可看到數(shù)據(jù)
完整代碼:
import org.apache.spark.{SparkConf, SparkContext}
import com.redislabs.provider.redis._
import org.apache.spark.rdd.RDD
object SparkRedis extends App {
val conf = new SparkConf().setMaster("yarn-cluster").setAppName("sparkRedisTest")
conf.set("redis.host", "10.1.11.70")
val sc = new SparkContext(conf)
val data = Seq[(String,String)](("high","111"), ("abc","222"), ("together","333"))
val redisData:RDD[(String,String)] = sc.parallelize(data)
sc.toRedisKV(redisData)
}
原創(chuàng)文章@貪戀清晨de陽光