Spark優(yōu)雅的操作Redis

Spark的優(yōu)勢在于內(nèi)存計(jì)算,然而在計(jì)算中難免會用到一些元數(shù)據(jù)或中間數(shù)據(jù)黍判,有的存在關(guān)系型數(shù)據(jù)庫中豫尽,有的存在HDFS上,有的存在HBase中顷帖,但其讀寫速度都和Spark計(jì)算的速度相差甚遠(yuǎn)美旧,而Redis基于內(nèi)存的讀寫則可以完美解決此類問題渤滞,下面介紹Spark如何與Redis交互。
在Spark計(jì)算的時(shí)候如何加載Redis中的數(shù)據(jù)榴嗅,其實(shí)官方有現(xiàn)成的包和文檔妄呕,文檔是全英文,好在東西不多嗽测,下面介紹如何使用趴腋。

首先把jar包引入工程,在maven上居然找不到這個(gè)包论咏。优炬。。所以使用Maven和SBT的同學(xué)自行解決厅贪。下載地址(打不開的同學(xué)可以嘗試翻墻)

2018-11-06更新: 最新maven依賴已經(jīng)被中央倉庫收錄([https://github.com/RedisLabs/spark-redis](https://github.com/RedisLabs/spark-redis)
)
<dependency>
    <groupId>RedisLabs</groupId>
    <artifactId>spark-redis</artifactId>
    <version>0.3.2</version>
</dependency>

可以看出提供的功能還是挺全面的刁卜,有單獨(dú)的redis分區(qū)伊佃,redisRDD,SQLAPI以及StreamingAPI

下面我們一點(diǎn)一點(diǎn)來做一個(gè)示例:

在這里先看看官方包中的一部分源碼:

/**
    官方提供源碼包中解析Redis配置需要的字段
*/
case class RedisEndpoint(val host: String = Protocol.DEFAULT_HOST,
                         val port: Int = Protocol.DEFAULT_PORT,
                         val auth: String = null,
                         val dbNum: Int = Protocol.DEFAULT_DATABASE,
                         val timeout: Int = Protocol.DEFAULT_TIMEOUT)
  extends Serializable {

/**
    源碼中獲取配置的字段名及來源,可以看出是從SparkConf中讀取到相應(yīng)字段涌献,所以連接redis只需在SparkConf中set相應(yīng)字段即可
*/
  def this(conf: SparkConf) {
      this(
        conf.get("redis.host", Protocol.DEFAULT_HOST),
        conf.getInt("redis.port", Protocol.DEFAULT_PORT),
        conf.get("redis.auth", null),
        conf.getInt("redis.db", Protocol.DEFAULT_DATABASE),
        conf.getInt("redis.timeout", Protocol.DEFAULT_TIMEOUT)
      )
  }

  ···
}

現(xiàn)在我們啟動(dòng)SparkContext

先引入Redis相關(guān)的隱式轉(zhuǎn)換
import com.redislabs.provider.redis._

//這里直接使用yarn-cluster模式
val conf = new SparkConf().setMaster("yarn-cluster").setAppName("sparkRedisTest")
conf.set("redis.host", "10.1.11.70")    //host,隨便一個(gè)節(jié)點(diǎn),自動(dòng)發(fā)現(xiàn)
conf.set("redis.port", "6379")  //端口號鳞溉,不填默認(rèn)為6379
//conf.set("redis.auth","null")  //用戶權(quán)限配置
//conf.set("redis.db","0")  //數(shù)據(jù)庫設(shè)置
//conf.set("redis.timeout","2000")  //設(shè)置連接超時(shí)時(shí)間
val sc = new SparkContext(conf)

之后可以看到IDEA給出的提示弓柱,sc通過導(dǎo)入的隱式轉(zhuǎn)換可以調(diào)出的讀取Redis的方法,都是以fromRedis開頭的悄谐,都是redis可以存儲的數(shù)據(jù)結(jié)構(gòu)介评,這里以常見的KV進(jìn)行示例


還是先扒一下源碼看看:

def fromRedisKV[T](keysOrKeyPattern: T,
                     partitionNum: Int = 3)
                    (implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))):
  RDD[(String, String)] = {
    keysOrKeyPattern match {
      case keyPattern: String => fromRedisKeyPattern(keyPattern, partitionNum)(redisConfig).getKV
      case keys: Array[String] => fromRedisKeys(keys, partitionNum)(redisConfig).getKV
      case _ => throw new scala.Exception("KeysOrKeyPattern should be String or Array[String]")
    }
  }

先看傳入的參數(shù):

  1. 泛型類型keysOrKeyPattern
    從的模式匹配代碼中可以看出,這里的T可是是兩種類型爬舰,一個(gè)是String们陆,另一個(gè)是Array[String],如果傳入其他類型則會拋出運(yùn)行時(shí)異常,其中String類型的意思是匹配鍵情屹,這里可以用通配符比如foo*坪仇,所以返回值是一個(gè)結(jié)果集RDD[(String, String)],當(dāng)參數(shù)類型為Array[String]時(shí)是指傳入key的數(shù)組垃你,返回的結(jié)果則為相應(yīng)的的結(jié)果集椅文,RDD的內(nèi)容類型也是KV形式。
  2. Int類型partitionNum
    生成RDD的分區(qū)數(shù)惜颇,默認(rèn)為3皆刺,如果傳入的第一個(gè)參數(shù)類型是Array[String],這個(gè)參數(shù)可以這樣設(shè)置官还,先預(yù)估一下返回結(jié)果集的大小芹橡,使用keyArr.length / num + 1,這樣則保證分區(qū)的合理性望伦,以防發(fā)生數(shù)據(jù)傾斜林说。若第一個(gè)參數(shù)類型為String煎殷,能預(yù)估盡量預(yù)估,如果實(shí)在沒辦法腿箩,比如確實(shí)在這里發(fā)生了數(shù)據(jù)傾斜豪直,可以嘗試考慮使用sc.fromRedisKeys()返回key的集合,提前把握返回結(jié)果集的大小珠移,或者根據(jù)集群機(jī)器數(shù)量弓乙,把握分區(qū)數(shù)。
  3. 柯里化形式隱式參數(shù)redisConfig
    由于我們之前在sparkConf里面set了相應(yīng)的參數(shù)钧惧,這里不傳入這個(gè)參數(shù)即可暇韧。如要調(diào)整,則可以按照源碼中的方式傳入浓瞪,其中RedisEndpoint是一個(gè)case class類懈玻,而且很多參數(shù)都有默認(rèn)值(比如6379的端口號),所以自己建立一個(gè)RedisEndpoint也是非常方便的乾颁。
    了解了參數(shù)之后來繼續(xù)完成測試代碼:
/*這里標(biāo)出了resultSet的類型*/
val resultSet:RDD[(String, String)] = sc.fromRedisKV("to*")
//找出鍵以`to`開頭的鍵值對涂乌,這里就不進(jìn)行計(jì)算了,直接保存到HDFS看結(jié)果如何,同時(shí)合并分區(qū)便于觀察結(jié)果
resultSet.coalesce(1).saveAsTextFile("HDFSpath")

現(xiàn)在往redis里面隨便set幾個(gè)數(shù)據(jù)

Redis shell

打包之后運(yùn)行英岭,命令為:

spark-submit --master yarn --deploy-mode cluster --class test.SparkRedis --jars jedis-2.9.0.jar,spark-redis-0.3.2.jar,/opt/cloudera/parcels/CDH/jars/commons-pool2-2.2.jar --driver-class-path jedis-2.9.0.jar,spark-redis-0.3.2.jar,/opt/cloudera/parcels/CDH/jars/commons-pool2-2.2.jar spark-redis.jar

命令中指明了依賴的資源包:jedis-2.9.0.jar,spark-redis-0.3.2.jar,commons-pool2-2.2.jar其中commons-pool2-2.2.jar是spark-redis依賴的包湾盒,如果集群環(huán)境為CDH發(fā)行版,可在/opt/cloudera/parcels/CDH/jars/commons-pool2-2.2.jar下找到該包诅妹,罚勾,而且yarn的運(yùn)行環(huán)境里面沒有默認(rèn)引入該包;如果為自建環(huán)境漾唉,則需要自行下載該包荧库,Maven上搜索commons-pool2即可。

等待執(zhí)行成功


執(zhí)行UI

再看一下DAG圖

DAG圖

運(yùn)行結(jié)果


運(yùn)行結(jié)果

和預(yù)期的一樣赵刑,以to開頭的數(shù)據(jù)都被找到。

如果傳入的是key數(shù)組

val keys = Array[String]("high", "abc", "together")
sc.fromRedisKV(keys).coalesce(1).saveAsTextFile("hdfs://nameservice1/spark/test/redisResult2")

結(jié)果如下:

運(yùn)行結(jié)果

完整代碼:

import org.apache.spark.{SparkConf, SparkContext}
import com.redislabs.provider.redis._

object SparkRedis extends App {
  val conf = new SparkConf().setMaster("yarn-cluster").setAppName("sparkRedisTest")
  conf.set("redis.host", "10.1.11.70")
  val sc = new SparkContext(conf)
  val keys = Array[String]("high", "abc", "together")
  sc.fromRedisKV(keys).coalesce(1).saveAsTextFile("hdfs://nameservice1/spark/test/redisResult2")
}

下面看如何寫入Redis


還是以常見的KV為例
源碼中是這樣處理的场刑,接收兩個(gè)參數(shù)般此,RDD類型為RDD[(String, String),第二個(gè)為失效時(shí)間

def toRedisKV(kvs: RDD[(String, String)], ttl: Int = 0)
               (implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))) {
    kvs.foreachPartition(partition => setKVs(partition, ttl, redisConfig))
  }

測試代碼為:

val data = Seq[(String,String)](("high","111"), ("abc","222"), ("together","333"))
val redisData:RDD[(String,String)] = sc.parallelize(data)
sc.toRedisKV(redisData)

先清空一下redis


redis shell

打包后按相同的命令提交到集群并執(zhí)行成功后即可看到數(shù)據(jù)

redis shell

完整代碼:

import org.apache.spark.{SparkConf, SparkContext}
import com.redislabs.provider.redis._
import org.apache.spark.rdd.RDD

object SparkRedis extends App {
  val conf = new SparkConf().setMaster("yarn-cluster").setAppName("sparkRedisTest")
  conf.set("redis.host", "10.1.11.70")
  val sc = new SparkContext(conf)
  val data = Seq[(String,String)](("high","111"), ("abc","222"), ("together","333"))
  val redisData:RDD[(String,String)] = sc.parallelize(data)
  sc.toRedisKV(redisData)
}

原創(chuàng)文章@貪戀清晨de陽光

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末牵现,一起剝皮案震驚了整個(gè)濱河市铐懊,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌瞎疼,老刑警劉巖科乎,帶你破解...
    沈念sama閱讀 216,692評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異贼急,居然都是意外死亡茅茂,警方通過查閱死者的電腦和手機(jī)捏萍,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來空闲,“玉大人令杈,你說我怎么就攤上這事〔昵悖” “怎么了逗噩?”我有些...
    開封第一講書人閱讀 162,995評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長跌榔。 經(jīng)常有香客問我异雁,道長,這世上最難降的妖魔是什么僧须? 我笑而不...
    開封第一講書人閱讀 58,223評論 1 292
  • 正文 為了忘掉前任纲刀,我火速辦了婚禮,結(jié)果婚禮上皆辽,老公的妹妹穿的比我還像新娘柑蛇。我一直安慰自己,他們只是感情好驱闷,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,245評論 6 388
  • 文/花漫 我一把揭開白布耻台。 她就那樣靜靜地躺著,像睡著了一般空另。 火紅的嫁衣襯著肌膚如雪盆耽。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,208評論 1 299
  • 那天扼菠,我揣著相機(jī)與錄音摄杂,去河邊找鬼。 笑死循榆,一個(gè)胖子當(dāng)著我的面吹牛析恢,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播秧饮,決...
    沈念sama閱讀 40,091評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼映挂,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了盗尸?” 一聲冷哼從身側(cè)響起柑船,我...
    開封第一講書人閱讀 38,929評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎泼各,沒想到半個(gè)月后鞍时,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,346評論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,570評論 2 333
  • 正文 我和宋清朗相戀三年逆巍,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了及塘。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,739評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡蒸苇,死狀恐怖磷蛹,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情溪烤,我是刑警寧澤味咳,帶...
    沈念sama閱讀 35,437評論 5 344
  • 正文 年R本政府宣布,位于F島的核電站檬嘀,受9級特大地震影響槽驶,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜鸳兽,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,037評論 3 326
  • 文/蒙蒙 一掂铐、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧揍异,春花似錦全陨、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,677評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至戚嗅,卻和暖如春雨涛,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背懦胞。 一陣腳步聲響...
    開封第一講書人閱讀 32,833評論 1 269
  • 我被黑心中介騙來泰國打工替久, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人躏尉。 一個(gè)月前我還...
    沈念sama閱讀 47,760評論 2 369
  • 正文 我出身青樓蚯根,卻偏偏與公主長得像,于是被迫代替她去往敵國和親胀糜。 傳聞我的和親對象是個(gè)殘疾皇子稼锅,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,647評論 2 354

推薦閱讀更多精彩內(nèi)容