摘要:Spark
,局部敏感哈希
,矢量檢索
碌廓,推薦系統(tǒng)
使用背景
最近有個需求做百萬級別實體的相關(guān)推薦,離線場景算完入庫接口調(diào)用剩盒,數(shù)倉和計算引擎基于Hive和Spark谷婆,一開始設(shè)想直接老套路embedding+LSH(Spark ml下局部敏感哈希),測了幾次都GG了辽聊,十分不好用纪挎,原因有以下:
- 計算不穩(wěn)定:Spark的LSH動不動卡著不動或者慢或者OOM,主要原因是join步驟相當(dāng)消耗資源和桶內(nèi)數(shù)據(jù)傾斜導(dǎo)致跟匆,然而在傾斜的桶內(nèi)暴力搜索可能是不值得的异袄,因為相似度數(shù)據(jù)對可能也在另一個不傾斜的桶內(nèi)出現(xiàn)了
-
數(shù)據(jù)丟失:調(diào)用
approxSimilarityJoin
會莫名其妙的丟失實體,比如輸入1000個實體做最近鄰50個檢索玛臂,最后只輸出了200個實體的top50烤蜕,這個問題不是半徑太小導(dǎo)致的,而是哈希之后沒有任何一條(hash_table迹冤,哈希值)一樣可以join上的數(shù)據(jù)對讽营,這個問題是參數(shù)設(shè)置導(dǎo)致的,LSH調(diào)參比較蛋疼 -
不能對所有實體輸出TopK:Spark的LSH的
approxNearestNeighbors
是輸出TopK泡徙,和需求完全切合橱鹏,但是這個API不支持在全表操作,只能輸出一個實體進(jìn)行推薦堪藐,所以只能使用join方法再對join到的數(shù)據(jù)對進(jìn)行排序取topK莉兰,相當(dāng)浪費計算資源 -
不支持余弦相似度:Spark的
BucketedRandomProjectionLSH
不支持余弦相似度,這個影響不大礁竞,可以先做一步歸一化然后用歐氏距離糖荒,但是不是很方便
不談了,去Github搜了一個項目LinkedIn ScANNS模捂,LinkedIn機器學(xué)習(xí)團隊出品捶朵,測試了一下相當(dāng)好用
LSH原理概述
簡單而言,LSH就是對高維數(shù)據(jù)使用局部敏感哈希函數(shù)進(jìn)行轉(zhuǎn)換枫绅,映射到多個HashTable中的桶中泉孩,局部敏感哈希的特性是使得原本在高維相似的向量進(jìn)行哈希映射到低維度也相似硼端,及映射到同樣的桶中并淋,不相似的數(shù)據(jù)可能盡量避免在同一個桶,由于相似度的定義不同珍昨,局部敏感哈希函數(shù)也不同县耽,沒有通用的哈希函數(shù)(這個地方哈希函數(shù)和CNN中的卷積核函數(shù)作用類似句喷,是一種提取特征的方式,比如歐氏距離的哈希函數(shù)就是wx+b的形式)兔毙,因此先用哈希降維作為召回唾琼,然后在同一個桶下進(jìn)行線性遍歷求解距離即可,具體原理參考我的另一篇文章http://www.reibang.com/p/bbf142f8ec84
Spark ml是如何實現(xiàn)LSH的
參考https://blog.csdn.net/shenyanasop/article/details/110454273這篇文章澎剥,簡單而言Spark使用LSH先對DataFrame中的Vector列做轉(zhuǎn)化生成N和hash table中的桶值锡溯,然后炸開每個hash table作為一行進(jìn)行join匹配,只要有一個hash table值一樣就匹配上哑姚,最后再對匹配上的數(shù)據(jù)對做兩兩距離計算祭饭,留下在半徑閾值內(nèi)的數(shù)據(jù)對,結(jié)合代碼調(diào)用案例深入理解一下
首先我們導(dǎo)入包叙量,創(chuàng)建一個含有向量的DataFrame
scala> import org.apache.spark.ml.linalg.Vectors.dense
import org.apache.spark.ml.linalg.Vectors.dense
scala> import org.apache.spark.ml.linalg.Vectors.dense
import org.apache.spark.ml.linalg.Vectors.dense
scala> import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
scala> val df = Seq(("a", dense(1, 0, 2)), ("b", dense(-1, 0, 5)), ("c", dense(2, -1, 2)), ("d", dense(3, 3, -1))).toDF("a", "vec")
df: org.apache.spark.sql.DataFrame = [a: string, vec: vector]
然后我們初始化一個LSH模型倡蝙,設(shè)置hash_table 10個,桶長4绞佩,輸出一個新的列hashes寺鸥,并且fit,transform一波
scala> :paste
// Entering paste mode (ctrl-D to finish)
val brp = new BucketedRandomProjectionLSH()
.setBucketLength(4)
.setNumHashTables(10)
.setInputCol("vec")
.setOutputCol("hashes")
// Exiting paste mode, now interpreting.
brp: org.apache.spark.ml.feature.BucketedRandomProjectionLSH = brp-lsh_ea56cf7270a2
scala> val brpModel = brp.fit(df)
brpModel: org.apache.spark.ml.feature.BucketedRandomProjectionLSHModel = brp-lsh_ea56cf7270a2
scala> val hashDF = brpModel.transform(df)
hashDF: org.apache.spark.sql.DataFrame = [a: string, vec: vector ... 1 more field]
最終看一下進(jìn)行hash轉(zhuǎn)化之后的向量特征長什么樣子品山,可見新生成的hashes列中生成了10個hash_table胆建,每個hash_table中記錄了向量進(jìn)行局部敏感哈希之后的桶值
scala> hashDF.show(false)
+---+--------------+------------------------------------------------------------------------------+
|a |vec |hashes |
+---+--------------+------------------------------------------------------------------------------+
|a |[1.0,0.0,2.0] |[[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]] |
|b |[-1.0,0.0,5.0]|[[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]] |
|c |[2.0,-1.0,2.0]|[[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]|
|d |[3.0,3.0,-1.0]|[[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]] |
+---+--------------+------------------------------------------------------------------------------+
然后我們應(yīng)用模型的join方法,自身和自身join肘交,找到每個元素的近鄰
scala> val brpDf = brpModel.approxSimilarityJoin(hashDF, hashDF, 2.0, "EuclideanDistance")
brpDf: org.apache.spark.sql.Dataset[_] = [datasetA: struct<a: string, vec: vector ... 1 more field>, datasetB: struct<a: string, hashes: array<vector> ... 1 more field> ... 1 more field]
查看計算結(jié)果
scala> brpDf.show(false)
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|datasetA |datasetB |EuclideanDistance |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]] |1.4142135623730951|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]]] |[d, [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]], [3.0,3.0,-1.0]] |0.0 |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]] |[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]] |0.0 |
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]]] |[b, [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]], [-1.0,0.0,5.0]] |0.0 |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]] |[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|1.4142135623730951|
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|0.0 |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
可以看到只有a和c join上了眼坏,其他都是只有join上了自己,因為自己和自己的距離是0酸些,hash值也一模一樣宰译,原因可能是半徑閾值調(diào)小了,我們將半徑調(diào)大到10.0魄懂,重新show一波
scala> val brpDf = brpModel.approxSimilarityJoin(hashDF, hashDF, 10.0, "EuclideanDistance")
brpDf: org.apache.spark.sql.Dataset[_] = [datasetA: struct<a: string, vec: vector ... 1 more field>, datasetB: struct<a: string, hashes: array<vector> ... 1 more field> ... 1 more field]
scala> brpDf.show(false)
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|datasetA |datasetB |EuclideanDistance |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|[d, [3.0,3.0,-1.0], [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]]] |[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]] |4.69041575982343 |
|[d, [3.0,3.0,-1.0], [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]]] |[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|5.0990195135927845|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]]] |[b, [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]], [-1.0,0.0,5.0]] |7.810249675906654 |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]] |1.4142135623730951|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]]] |[d, [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]], [3.0,3.0,-1.0]] |0.0 |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]] |[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]] |0.0 |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[b, [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]], [-1.0,0.0,5.0]] |4.358898943540674 |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]] |[b, [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]], [-1.0,0.0,5.0]] |3.605551275463989 |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]] |[d, [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]], [3.0,3.0,-1.0]] |4.69041575982343 |
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]]] |[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|4.358898943540674 |
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]]] |[b, [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]], [-1.0,0.0,5.0]] |0.0 |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]] |[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|1.4142135623730951|
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|0.0 |
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]]] |[d, [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]], [3.0,3.0,-1.0]] |7.810249675906654 |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[d, [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]], [3.0,3.0,-1.0]] |5.0990195135927845|
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]]] |[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]] |3.605551275463989 |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
可以半徑設(shè)置大了之后沿侈,笛卡爾積完全展示出來了,結(jié)論就是Spark LSH在先join上之后市栗,再做半徑閾值篩選缀拭,最終計算結(jié)果顯示join且在半徑范圍內(nèi)的,但是某些情況下就算是調(diào)高半徑閾值也會丟失原有數(shù)據(jù)填帽,原因是不滿足join條件蛛淋,join條件是所有hash 桶值必須有至少一個相同的一對才能匹配,比如我們調(diào)小hash_table的數(shù)量篡腌,這樣一條向量的桶的數(shù)量就會減少褐荷,和其他向量碰撞在一個桶的概率降低,我們將hash_table數(shù)量設(shè)置為2
scala> :paste
// Entering paste mode (ctrl-D to finish)
val brp = new BucketedRandomProjectionLSH()
.setBucketLength(4)
.setNumHashTables(2)
.setInputCol("vec")
.setOutputCol("hashes")
// Exiting paste mode, now interpreting.
brp: org.apache.spark.ml.feature.BucketedRandomProjectionLSH = brp-lsh_8d2fef7fc1f5
重新走一遍流程嘹悼,最終歐式距離閾值依舊是10.0叛甫,結(jié)果如下
scala> val brpDf = brpModel.approxSimilarityJoin(hashDF, hashDF, 10.0, "EuclideanDistance")
brpDf: org.apache.spark.sql.Dataset[_] = [datasetA: struct<a: string, vec: vector ... 1 more field>, datasetB: struct<a: string, hashes: array<vector> ... 1 more field> ... 1 more field]
scala> brpDf.show(false)
+-------------------------------------+-------------------------------------+------------------+
|datasetA |datasetB |EuclideanDistance |
+-------------------------------------+-------------------------------------+------------------+
|[a, [1.0,0.0,2.0], [[-1.0], [0.0]]] |[a, [[-1.0], [0.0]], [1.0,0.0,2.0]] |0.0 |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0]]] |[d, [[0.0], [0.0]], [3.0,3.0,-1.0]] |5.0990195135927845|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0]]] |[c, [[-1.0], [0.0]], [2.0,-1.0,2.0]] |5.0990195135927845|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0]]] |[d, [[0.0], [0.0]], [3.0,3.0,-1.0]] |0.0 |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0]]] |[c, [[-1.0], [0.0]], [2.0,-1.0,2.0]] |0.0 |
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0]]]|[b, [[-2.0], [-1.0]], [-1.0,0.0,5.0]]|0.0 |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0]]] |[d, [[0.0], [0.0]], [3.0,3.0,-1.0]] |4.69041575982343 |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0]]] |[a, [[-1.0], [0.0]], [1.0,0.0,2.0]] |1.4142135623730951|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0]]] |[a, [[-1.0], [0.0]], [1.0,0.0,2.0]] |4.69041575982343 |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0]]] |[c, [[-1.0], [0.0]], [2.0,-1.0,2.0]] |1.4142135623730951|
+-------------------------------------+-------------------------------------+------------------+
可見明細(xì)有數(shù)據(jù)沒有匹配上层宫,如果不算自身和自身匹配,b徹底消失其监,a萌腿,b,c互相匹配到了抖苦,再仔細(xì)看一下b為啥沒匹配上毁菱,他的兩個hash值是-2,,1锌历,兩個hash值在同一個位次的hash_table上絕無僅有(其他都是-1,鼎俘,0)因此join不上,更不要說走下一步半徑過濾
接下來我們測試一下另一個重要的參數(shù)BucketLength
辩涝,這個數(shù)值越大贸伐,計算的hash值離散程度越低,碰撞的概率越大怔揩,現(xiàn)在我們把這個值調(diào)小
scala> :paste
// Entering paste mode (ctrl-D to finish)
val brp = new BucketedRandomProjectionLSH()
.setBucketLength(1)
.setNumHashTables(10)
.setInputCol("vec")
.setOutputCol("hashes")
// Exiting paste mode, now interpreting.
brp: org.apache.spark.ml.feature.BucketedRandomProjectionLSH = brp-lsh_70cba14ac181
直接看結(jié)果捉邢,hash值里面尤其是b離散程度非常大,出現(xiàn)了-5商膊,-4伏伐,-3
scala> brpDf.show(false)
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|datasetA |datasetB |EuclideanDistance |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|[d, [3.0,3.0,-1.0], [[2.0], [2.0], [-4.0], [1.0], [0.0], [3.0], [3.0], [-4.0], [0.0], [-3.0]]] |[a, [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]], [1.0,0.0,2.0]] |4.69041575982343 |
|[c, [2.0,-1.0,2.0], [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]]]|[b, [[-5.0], [-2.0], [3.0], [-5.0], [-5.0], [-2.0], [-2.0], [1.0], [2.0], [-4.0]], [-1.0,0.0,5.0]] |4.358898943540674 |
|[a, [1.0,0.0,2.0], [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]]] |[c, [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]], [2.0,-1.0,2.0]]|1.4142135623730951|
|[a, [1.0,0.0,2.0], [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]]] |[d, [[2.0], [2.0], [-4.0], [1.0], [0.0], [3.0], [3.0], [-4.0], [0.0], [-3.0]], [3.0,3.0,-1.0]] |4.69041575982343 |
|[b, [-1.0,0.0,5.0], [[-5.0], [-2.0], [3.0], [-5.0], [-5.0], [-2.0], [-2.0], [1.0], [2.0], [-4.0]]] |[c, [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]], [2.0,-1.0,2.0]]|4.358898943540674 |
|[c, [2.0,-1.0,2.0], [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]]]|[a, [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]], [1.0,0.0,2.0]] |1.4142135623730951|
|[c, [2.0,-1.0,2.0], [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]]]|[c, [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]], [2.0,-1.0,2.0]]|0.0 |
|[b, [-1.0,0.0,5.0], [[-5.0], [-2.0], [3.0], [-5.0], [-5.0], [-2.0], [-2.0], [1.0], [2.0], [-4.0]]] |[b, [[-5.0], [-2.0], [3.0], [-5.0], [-5.0], [-2.0], [-2.0], [1.0], [2.0], [-4.0]], [-1.0,0.0,5.0]] |0.0 |
|[a, [1.0,0.0,2.0], [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]]] |[a, [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]], [1.0,0.0,2.0]] |0.0 |
|[d, [3.0,3.0,-1.0], [[2.0], [2.0], [-4.0], [1.0], [0.0], [3.0], [3.0], [-4.0], [0.0], [-3.0]]] |[d, [[2.0], [2.0], [-4.0], [1.0], [0.0], [3.0], [3.0], [-4.0], [0.0], [-3.0]], [3.0,3.0,-1.0]] |0.0 |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
從join結(jié)果來看abcd都沒有完全join全,比如a和b沒有一個hash table的值一樣晕拆,所以結(jié)論是
-
BucketLength
越大藐翎,向量映射在同一個桶的概率越大,召回地越多实幕,計算量大吝镣,可以降低假陰,但是也提高了假陽 -
NumHashTables
越大昆庇,向量映射到的桶選擇就多末贾,如果只要求有一個桶一樣就召回,則這個值越大召回就越多整吆,計算量就越大拱撵,可以降低假陰,但是也提高了假陽 - 召回(準(zhǔn)確率)和計算量的取舍:如果需要計算的結(jié)果不漏表蝙,且準(zhǔn)確率高拴测,那么必然要越接近全表掃描,即創(chuàng)造更多的召回府蛇,可以調(diào)大桶長和hash_table的個數(shù)集索,但是計算量會變大消耗資源且容易桶內(nèi)傾斜O(jiān)OM,如果為了降低計算量調(diào)低參數(shù),又容易召回不出算不出近鄰抄谐,索引引入下面的主角ScANNS
ScANNS項目概述
ScANNS是Spark的近鄰搜索庫,它支持在余弦距離
扰法、jaccard距離
和歐幾里德距離
空間內(nèi)的離線大批量數(shù)據(jù)進(jìn)行最近鄰搜索蛹含。這個項目主要解決現(xiàn)有的Spark ML下LSH的不足:
(1)不支持余弦相似度
(2)數(shù)據(jù)量大了跑不起來OOM,程序失敗塞颁,就算資源足夠浦箱,現(xiàn)有算法存在各別桶數(shù)據(jù)傾斜的問題
(3)增加全量數(shù)據(jù)各自TopK搜索的功能
ScANNS的算法優(yōu)化
github項目官網(wǎng)上寫了項目相比與Spark ml的優(yōu)化措施
上面這一步是將原始向量映射為多個hash table,以及每個hash table中記錄的bucket值祠锣,然后使用explode炸開每一行形成(hash table索引酷窥,hash值組合)的元組,這個地方是一組hash值而不是之前一個hash值伴网,以這個元組進(jìn)行join蓬推,這個和傳統(tǒng)的Spark ml是一樣的,但是在join階段領(lǐng)英工程師進(jìn)行了如下優(yōu)化
- 對bucket join列進(jìn)行hash編碼:由于(index澡腾,hash bucket)元組由一個整數(shù)索引和多個散列值組成沸伏,因此要來回移動元組可能會非常昂貴。我們最終關(guān)心的只是索引匹配和散列值匹配动分。我們不關(guān)心索引或散列本身的實際值毅糟。因此,我們使用散列技巧將這個元組簡單地散列為整數(shù)澜公。雖然這保證了相同的元組被映射到相同的整數(shù)姆另,但也可能存在沖突,不相等的哈希桶項可能會得到相同的整數(shù)值坟乾。然而迹辐,這并不影響我們算法的“正確性”,它只是增加了我們在暴力步驟中需要檢查的元素的數(shù)量甚侣。
- 自定義連接join優(yōu)化:由于LSH的hash函數(shù)具有隨機性右核,以及可能數(shù)據(jù)源本身分布的問題,會導(dǎo)致形成某某些桶分布著大量的數(shù)據(jù)渺绒,形成桶傾斜贺喝,并且里面大量的數(shù)據(jù)可能并不值得暴力搜索。領(lǐng)英做了兩個優(yōu)化宗兼,第一基于budkct id進(jìn)行自定義分區(qū)join躏鱼,第二對于桶傾斜,設(shè)置bucket limit參數(shù)殷绍,對于超過limit的傾斜桶染苛,里面的每個元素只和該桶下隨機bucket limit數(shù)量的實體進(jìn)行距離計算,而不是所有都計算
- topQueue策略:我們還使用了一個定制的topQueue,它是scala的PriorityQueue的包裝器茶行,可以容納的元素總數(shù)是常量躯概。這用于返回前k個最近鄰居,而不是閾值距離內(nèi)的鄰居
- 數(shù)據(jù)對處理成迭代器:在一個bucket中畔师,當(dāng)通過蠻力返回候選對象時娶靡,將內(nèi)存中的所有對具體化是一種浪費,因為對的數(shù)量可能相當(dāng)大看锉。相反姿锭,我們通過構(gòu)建一個自定義迭代器來處理bucket,從而在按需的基礎(chǔ)上生成pair伯铣。
ScANNS工程下載
工程下載參考項目的github主頁https://github.com/LinkedInAttic/scanns#understanding-the-model-parameters
git clone git@github.com:linkedin/scanns.git
cd scanns
./gradlew build # jar will be built in build/scanns_2.1*/libs
其間可能會報錯沒有權(quán)限呻此,需要在個人github主頁settings下設(shè)置ssh,本地也要設(shè)置腔寡,過程報錯直接百度焚鲜,項目構(gòu)建完成生成jar包scanns_2.11-1.0.0.jar
root@ubuntu:/home/scanns/build/scanns_2.11/libs# ls
scanns_2.11-1.0.0.jar
將jar包引入Maven pom.xml依賴
<dependency>
<groupId>com.linkedin.nn</groupId>
<artifactId>scanns_2.11</artifactId>
<version>2.11</version>
<systemPath>/home/scanns/build/scanns_2.11/libs/scanns_2.11-1.0.0.jar</systemPath>
<scope>system</scope>
</dependency>
IDEA測試導(dǎo)入成功
import com.linkedin.nn.algorithm.CosineSignRandomProjectionNNS
ScANNS輸入要求
算法的輸入是RDD[Long, org.apache.spark.ml.linalg.Vector)]
. 使用rdd而不是DataFrame/Dataset的原因是保留了一些較低級別的API,允許對算法中執(zhí)行的連接進(jìn)行更多的控制放前。
一個簡單的符合算法輸入要求的例子
scala> val a = sc.parallelize(Array((1L, dense(1, 2, 3)), (2L, dense(1, 0, 3))))
a: org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] = ParallelCollectionRDD[5] at parallelize at <console>:28
也可以從DataFrame轉(zhuǎn)化為算法需要的格式
scala> val df = Seq((1L, dense(1, 2, 3)), (2L, dense(2, 3, 4))).toDF("a", "b")
df: org.apache.spark.sql.DataFrame = [a: bigint, b: vector]
scala> val df2 = df.rdd.map(x => (x.getLong(0), x.get(1).asInstanceOf[org.apache.spark.ml.linalg.Vector]))
df2: org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] = MapPartitionsRDD[4] at map at <console>:26
總結(jié)一下算法對格式有嚴(yán)格的要求:首先必須就傳入2列恃泪,順序要一致,然后第一列作為實體的標(biāo)識必須是Long類型
犀斋,字符串類型不行贝乎,最后必須是RDD
代碼實現(xiàn)部署測試
寫一個簡單的例子實現(xiàn)企業(yè)實體之間的embedding相似推薦,首先看一下輸入數(shù)據(jù)
輸入的數(shù)據(jù)包括id(作為算法的Long類型標(biāo)識列)叽粹,實體名稱(作為推薦結(jié)果)览效,16維度的embedding向量(事先離線算好)存儲為txt格式寫入放在HDFS上,第一步就是Spark讀取數(shù)據(jù)處理算法需要的格式
import org.apache.spark.ml.linalg.Vectors.dense
def loadEntVector(spark: SparkSession, config: Broadcast[Properties]): DataFrame = {
/*離線訓(xùn)練好詞向量*/
import spark.implicits._
val df = spark.read.format("csv").option("sep", " ").option("header", false).load(config.value.getProperty("entVectorPath"))
.toDF("id", "ent_name", "vector")
.select($"id".cast("Long"), $"ent_name", $"vector")
.withColumn("vector", split($"vector", ","))
.withColumn("vector", udf((vec: scala.collection.mutable.WrappedArray[String]) => dense(vec.map(_.toDouble).toArray)).apply($"vector"))
return df
}
以上主要是將向量字符串轉(zhuǎn)化為Array再轉(zhuǎn)化為org.apache.spark.ml.linalg.Vectors.dense
準(zhǔn)備兩張表虫几,一張表作為idMap映射锤灿,一張表作為算法輸入
val vtScore = loadEntVector(spark, configProperties)
val idMapDF = vtScore.select($"id", $"ent_name")
val modelDF = vtScore.select($"id", $"vector").rdd.map(x => (x.getLong(0), x.get(1).asInstanceOf[org.apache.spark.ml.linalg.Vector]))
下一步定義模型
import com.linkedin.nn.algorithm.CosineSignRandomProjectionNNS
val model = new CosineSignRandomProjectionNNS()
.setNumHashes(300)
.setSignatureLength(15)
.setJoinParallelism(200)
.setBucketLimit(1000)
.setShouldSampleBuckets(true)
.setNumOutputPartitions(100)
.createModel(16)
主要參數(shù)內(nèi)容如下
- setNumHashes
- setSignatureLength
- setJoinParallelism
- setBucketLimit
- setShouldSampleBuckets
- setNumOutputPartitions
- createModel
接著訓(xùn)練模型,這里調(diào)用getSelfAllNearestNeighbors
方法及輸入數(shù)據(jù)的每一個實體和自身整個集合尋找最相似的TopK辆脸,numCandidates
參數(shù)就是K值但校,join過程是將Long id替換為實體名稱
val numCandidates = 50
val nbrs = model.getSelfAllNearestNeighbors(modelDF, numCandidates).toDF("id_a", "id_b", "score")
val res = nbrs
.join(idMapDF.select($"id".as("id_a"), $"ent_name"), Seq("id_a"), "left_outer")
.join(idMapDF.select($"id".as("id_b"), $"ent_name".as("ent_recommend")), Seq("id_b"), "left_outer")
最后一步將處理好的TopK使用Spark的collect_list
算子全部組合在一起形成一個JSON數(shù)組入庫
val res2 = res.withColumn("recommend", udf((x: String, y: Double) => "[" + x + y.toString + "]").apply($"ent_recommend", $"score"))
.groupBy($"ent_name").agg(collect_list($"recommend").as("recommend"))
任務(wù)提交的時候?qū)ar包掛在spark2-submit后面,如下
sudo -u hdfs spark2-submit \
--class ...
--master ...
--conf ...
--jars scanns_2.11-1.0.0.jar \
myproject.jar
任務(wù)執(zhí)行相當(dāng)快啡氢,200萬實體16維向量尋找tok50状囱,3分鐘跑完
最后到庫里看一下計算結(jié)果
算法調(diào)參
先對算法參數(shù)做一個說明,此處的參數(shù)相比于Spark ml有不同更加復(fù)雜
-
bucketWidth
這個參數(shù)是針對歐式距離的LSH所需要的參數(shù)倘是,較大的桶長會降低假陰率亭枷,如果輸入向量是標(biāo)準(zhǔn)化的,1-10
倍的bucketWidth應(yīng)該是一個合理的值搀崭。 -
numHashes
叨粘,signatureLength
numHashes就是hash table數(shù)量,數(shù)量越大容錯性越好,可以降低模型的不穩(wěn)定性升敲,但是計算量增大答倡, signatureLength越大,假正率越低驴党,同時也會提高假陰率瘪撇,也就是說這個值越大模型對落入同一個桶的要求越嚴(yán)格,這個值越大召回越少鼻弧,后期需要暴力求解的數(shù)據(jù)量就越少 -
joinParallelism
聯(lián)接的并行性控制每個聯(lián)接分區(qū)/聯(lián)接任務(wù)將處理多少數(shù)據(jù)设江〗踝拢考慮到數(shù)據(jù)集的大小攘轩,您希望這是一個合理的大小。在聯(lián)接中處理的數(shù)據(jù)集是“分解”數(shù)據(jù)集码俩,因此在原始數(shù)據(jù)集本身很大(例如度帮,數(shù)千萬到數(shù)億項)的情況下,您需要將并行度設(shè)置為相當(dāng)大的值稿存,例如幾萬甚至幾十萬笨篷。spark中的任務(wù)創(chuàng)建和管理會有開銷,所以這個值不是越大越好 -
bucketLimit
瓣履,shouldSampleBuckets
bucket limit對于解決前面提到的bucket歪斜問題至關(guān)重要率翅。當(dāng)一個bucket包含的項目超過此參數(shù)設(shè)置的限制時,可以通過適當(dāng)?shù)卦O(shè)置shouldSampleBucket布爾參數(shù)進(jìn)行選擇袖迎。無論哪種情況冕臭,我們都將丟棄桶中的元素。如果shouldSampleBucket設(shè)置為true燕锥,則將從傳入流中對bucketLimit項目數(shù)進(jìn)行采樣辜贵。如果設(shè)置為false,將保留第一個bucketLimit項目數(shù)归形,而忽略其余項目數(shù)托慨。這里的基本原理是,如果我們錯過了這個桶中的高相似度鄰居暇榴,考慮到它們的高相似度厚棵,它們最終在另一個沒有傾斜的桶中匹配的可能性很大。 -
numOutputPartitions
由于連接的并行性可能很高蔼紧,因此連接操作生成的輸出將具有非常多的分區(qū)窟感,即使其大小不大。設(shè)置numOutputPartitions將連接的輸出重新分區(qū)到給定數(shù)量的分區(qū)中歉井,這樣柿祈,如果用戶試圖將這個輸出RDD寫回文件系統(tǒng),那么它將被拆分成的文件數(shù)是可處理的。