Spark LSH 近似最近鄰矢量檢索:LinkedInAttic ScANNS項目學(xué)習(xí)和工程使用

摘要:Spark局部敏感哈希矢量檢索碌廓,推薦系統(tǒng)

使用背景

最近有個需求做百萬級別實體的相關(guān)推薦,離線場景算完入庫接口調(diào)用剩盒,數(shù)倉和計算引擎基于Hive和Spark谷婆,一開始設(shè)想直接老套路embedding+LSH(Spark ml下局部敏感哈希),測了幾次都GG了辽聊,十分不好用纪挎,原因有以下:

  • 計算不穩(wěn)定:Spark的LSH動不動卡著不動或者慢或者OOM,主要原因是join步驟相當(dāng)消耗資源和桶內(nèi)數(shù)據(jù)傾斜導(dǎo)致跟匆,然而在傾斜的桶內(nèi)暴力搜索可能是不值得的异袄,因為相似度數(shù)據(jù)對可能也在另一個不傾斜的桶內(nèi)出現(xiàn)了
  • 數(shù)據(jù)丟失:調(diào)用approxSimilarityJoin會莫名其妙的丟失實體,比如輸入1000個實體做最近鄰50個檢索玛臂,最后只輸出了200個實體的top50烤蜕,這個問題不是半徑太小導(dǎo)致的,而是哈希之后沒有任何一條(hash_table迹冤,哈希值)一樣可以join上的數(shù)據(jù)對讽营,這個問題是參數(shù)設(shè)置導(dǎo)致的,LSH調(diào)參比較蛋疼
  • 不能對所有實體輸出TopK:Spark的LSH的approxNearestNeighbors是輸出TopK泡徙,和需求完全切合橱鹏,但是這個API不支持在全表操作,只能輸出一個實體進(jìn)行推薦堪藐,所以只能使用join方法再對join到的數(shù)據(jù)對進(jìn)行排序取topK莉兰,相當(dāng)浪費計算資源
  • 不支持余弦相似度:Spark的BucketedRandomProjectionLSH不支持余弦相似度,這個影響不大礁竞,可以先做一步歸一化然后用歐氏距離糖荒,但是不是很方便

不談了,去Github搜了一個項目LinkedIn ScANNS模捂,LinkedIn機器學(xué)習(xí)團隊出品捶朵,測試了一下相當(dāng)好用


LSH原理概述

簡單而言,LSH就是對高維數(shù)據(jù)使用局部敏感哈希函數(shù)進(jìn)行轉(zhuǎn)換枫绅,映射到多個HashTable中的桶中泉孩,局部敏感哈希的特性是使得原本在高維相似的向量進(jìn)行哈希映射到低維度也相似硼端,及映射到同樣的桶中并淋,不相似的數(shù)據(jù)可能盡量避免在同一個桶,由于相似度的定義不同珍昨,局部敏感哈希函數(shù)也不同县耽,沒有通用的哈希函數(shù)(這個地方哈希函數(shù)和CNN中的卷積核函數(shù)作用類似句喷,是一種提取特征的方式,比如歐氏距離的哈希函數(shù)就是wx+b的形式)兔毙,因此先用哈希降維作為召回唾琼,然后在同一個桶下進(jìn)行線性遍歷求解距離即可,具體原理參考我的另一篇文章http://www.reibang.com/p/bbf142f8ec84


Spark ml是如何實現(xiàn)LSH的

參考https://blog.csdn.net/shenyanasop/article/details/110454273這篇文章澎剥,簡單而言Spark使用LSH先對DataFrame中的Vector列做轉(zhuǎn)化生成N和hash table中的桶值锡溯,然后炸開每個hash table作為一行進(jìn)行join匹配,只要有一個hash table值一樣就匹配上哑姚,最后再對匹配上的數(shù)據(jù)對做兩兩距離計算祭饭,留下在半徑閾值內(nèi)的數(shù)據(jù)對,結(jié)合代碼調(diào)用案例深入理解一下
首先我們導(dǎo)入包叙量,創(chuàng)建一個含有向量的DataFrame

scala> import org.apache.spark.ml.linalg.Vectors.dense
import org.apache.spark.ml.linalg.Vectors.dense

scala> import org.apache.spark.ml.linalg.Vectors.dense
import org.apache.spark.ml.linalg.Vectors.dense

scala> import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH

scala> val df = Seq(("a", dense(1, 0, 2)), ("b", dense(-1, 0, 5)), ("c", dense(2, -1, 2)), ("d", dense(3, 3, -1))).toDF("a", "vec")
df: org.apache.spark.sql.DataFrame = [a: string, vec: vector]

然后我們初始化一個LSH模型倡蝙,設(shè)置hash_table 10個,桶長4绞佩,輸出一個新的列hashes寺鸥,并且fit,transform一波

scala> :paste
// Entering paste mode (ctrl-D to finish)

val brp = new BucketedRandomProjectionLSH()
      .setBucketLength(4)
      .setNumHashTables(10)
      .setInputCol("vec")
      .setOutputCol("hashes")

// Exiting paste mode, now interpreting.

brp: org.apache.spark.ml.feature.BucketedRandomProjectionLSH = brp-lsh_ea56cf7270a2
scala> val brpModel = brp.fit(df)
brpModel: org.apache.spark.ml.feature.BucketedRandomProjectionLSHModel = brp-lsh_ea56cf7270a2

scala> val hashDF = brpModel.transform(df)
hashDF: org.apache.spark.sql.DataFrame = [a: string, vec: vector ... 1 more field]

最終看一下進(jìn)行hash轉(zhuǎn)化之后的向量特征長什么樣子品山,可見新生成的hashes列中生成了10個hash_table胆建,每個hash_table中記錄了向量進(jìn)行局部敏感哈希之后的桶值

scala> hashDF.show(false)
+---+--------------+------------------------------------------------------------------------------+
|a  |vec           |hashes                                                                        |
+---+--------------+------------------------------------------------------------------------------+
|a  |[1.0,0.0,2.0] |[[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]  |
|b  |[-1.0,0.0,5.0]|[[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]] |
|c  |[2.0,-1.0,2.0]|[[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]|
|d  |[3.0,3.0,-1.0]|[[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]]     |
+---+--------------+------------------------------------------------------------------------------+

然后我們應(yīng)用模型的join方法,自身和自身join肘交,找到每個元素的近鄰

scala> val brpDf = brpModel.approxSimilarityJoin(hashDF, hashDF, 2.0, "EuclideanDistance")
brpDf: org.apache.spark.sql.Dataset[_] = [datasetA: struct<a: string, vec: vector ... 1 more field>, datasetB: struct<a: string, hashes: array<vector> ... 1 more field> ... 1 more field]

查看計算結(jié)果

scala> brpDf.show(false)
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|datasetA                                                                                           |datasetB                                                                                           |EuclideanDistance |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]]   |1.4142135623730951|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]]]     |[d, [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]], [3.0,3.0,-1.0]]     |0.0               |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]]   |[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]]   |0.0               |
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]]] |[b, [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]], [-1.0,0.0,5.0]] |0.0               |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]]   |[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|1.4142135623730951|
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|0.0               |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+

可以看到只有a和c join上了眼坏,其他都是只有join上了自己,因為自己和自己的距離是0酸些,hash值也一模一樣宰译,原因可能是半徑閾值調(diào)小了,我們將半徑調(diào)大到10.0魄懂,重新show一波

scala> val brpDf = brpModel.approxSimilarityJoin(hashDF, hashDF, 10.0, "EuclideanDistance")
brpDf: org.apache.spark.sql.Dataset[_] = [datasetA: struct<a: string, vec: vector ... 1 more field>, datasetB: struct<a: string, hashes: array<vector> ... 1 more field> ... 1 more field]

scala> brpDf.show(false)
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|datasetA                                                                                           |datasetB                                                                                           |EuclideanDistance |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|[d, [3.0,3.0,-1.0], [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]]]     |[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]]   |4.69041575982343  |
|[d, [3.0,3.0,-1.0], [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]]]     |[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|5.0990195135927845|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]]]     |[b, [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]], [-1.0,0.0,5.0]] |7.810249675906654 |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]]   |1.4142135623730951|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]]]     |[d, [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]], [3.0,3.0,-1.0]]     |0.0               |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]]   |[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]]   |0.0               |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[b, [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]], [-1.0,0.0,5.0]] |4.358898943540674 |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]]   |[b, [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]], [-1.0,0.0,5.0]] |3.605551275463989 |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]]   |[d, [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]], [3.0,3.0,-1.0]]     |4.69041575982343  |
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]]] |[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|4.358898943540674 |
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]]] |[b, [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]], [-1.0,0.0,5.0]] |0.0               |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]]   |[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|1.4142135623730951|
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|0.0               |
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]]] |[d, [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]], [3.0,3.0,-1.0]]     |7.810249675906654 |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[d, [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]], [3.0,3.0,-1.0]]     |5.0990195135927845|
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]]] |[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]]   |3.605551275463989 |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+

可以半徑設(shè)置大了之后沿侈,笛卡爾積完全展示出來了,結(jié)論就是Spark LSH在先join上之后市栗,再做半徑閾值篩選缀拭,最終計算結(jié)果顯示join且在半徑范圍內(nèi)的,但是某些情況下就算是調(diào)高半徑閾值也會丟失原有數(shù)據(jù)填帽,原因是不滿足join條件蛛淋,join條件是所有hash 桶值必須有至少一個相同的一對才能匹配,比如我們調(diào)小hash_table的數(shù)量篡腌,這樣一條向量的桶的數(shù)量就會減少褐荷,和其他向量碰撞在一個桶的概率降低,我們將hash_table數(shù)量設(shè)置為2

scala> :paste
// Entering paste mode (ctrl-D to finish)

val brp = new BucketedRandomProjectionLSH()
      .setBucketLength(4)
      .setNumHashTables(2)
      .setInputCol("vec")
      .setOutputCol("hashes")

// Exiting paste mode, now interpreting.

brp: org.apache.spark.ml.feature.BucketedRandomProjectionLSH = brp-lsh_8d2fef7fc1f5

重新走一遍流程嘹悼,最終歐式距離閾值依舊是10.0叛甫,結(jié)果如下

scala> val brpDf = brpModel.approxSimilarityJoin(hashDF, hashDF, 10.0, "EuclideanDistance")
brpDf: org.apache.spark.sql.Dataset[_] = [datasetA: struct<a: string, vec: vector ... 1 more field>, datasetB: struct<a: string, hashes: array<vector> ... 1 more field> ... 1 more field]

scala> brpDf.show(false)
+-------------------------------------+-------------------------------------+------------------+
|datasetA                             |datasetB                             |EuclideanDistance |
+-------------------------------------+-------------------------------------+------------------+
|[a, [1.0,0.0,2.0], [[-1.0], [0.0]]]  |[a, [[-1.0], [0.0]], [1.0,0.0,2.0]]  |0.0               |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0]]] |[d, [[0.0], [0.0]], [3.0,3.0,-1.0]]  |5.0990195135927845|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0]]]  |[c, [[-1.0], [0.0]], [2.0,-1.0,2.0]] |5.0990195135927845|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0]]]  |[d, [[0.0], [0.0]], [3.0,3.0,-1.0]]  |0.0               |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0]]] |[c, [[-1.0], [0.0]], [2.0,-1.0,2.0]] |0.0               |
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0]]]|[b, [[-2.0], [-1.0]], [-1.0,0.0,5.0]]|0.0               |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0]]]  |[d, [[0.0], [0.0]], [3.0,3.0,-1.0]]  |4.69041575982343  |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0]]] |[a, [[-1.0], [0.0]], [1.0,0.0,2.0]]  |1.4142135623730951|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0]]]  |[a, [[-1.0], [0.0]], [1.0,0.0,2.0]]  |4.69041575982343  |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0]]]  |[c, [[-1.0], [0.0]], [2.0,-1.0,2.0]] |1.4142135623730951|
+-------------------------------------+-------------------------------------+------------------+

可見明細(xì)有數(shù)據(jù)沒有匹配上层宫,如果不算自身和自身匹配,b徹底消失其监,a萌腿,b,c互相匹配到了抖苦,再仔細(xì)看一下b為啥沒匹配上毁菱,他的兩個hash值是-2,,1锌历,兩個hash值在同一個位次的hash_table上絕無僅有(其他都是-1,鼎俘,0)因此join不上,更不要說走下一步半徑過濾
接下來我們測試一下另一個重要的參數(shù)BucketLength辩涝,這個數(shù)值越大贸伐,計算的hash值離散程度越低,碰撞的概率越大怔揩,現(xiàn)在我們把這個值調(diào)小

scala> :paste
// Entering paste mode (ctrl-D to finish)

val brp = new BucketedRandomProjectionLSH()
      .setBucketLength(1)
      .setNumHashTables(10)
      .setInputCol("vec")
      .setOutputCol("hashes")

// Exiting paste mode, now interpreting.

brp: org.apache.spark.ml.feature.BucketedRandomProjectionLSH = brp-lsh_70cba14ac181

直接看結(jié)果捉邢,hash值里面尤其是b離散程度非常大,出現(xiàn)了-5商膊,-4伏伐,-3

scala> brpDf.show(false)
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|datasetA                                                                                           |datasetB                                                                                           |EuclideanDistance |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|[d, [3.0,3.0,-1.0], [[2.0], [2.0], [-4.0], [1.0], [0.0], [3.0], [3.0], [-4.0], [0.0], [-3.0]]]     |[a, [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]], [1.0,0.0,2.0]]   |4.69041575982343  |
|[c, [2.0,-1.0,2.0], [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]]]|[b, [[-5.0], [-2.0], [3.0], [-5.0], [-5.0], [-2.0], [-2.0], [1.0], [2.0], [-4.0]], [-1.0,0.0,5.0]] |4.358898943540674 |
|[a, [1.0,0.0,2.0], [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]]]   |[c, [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]], [2.0,-1.0,2.0]]|1.4142135623730951|
|[a, [1.0,0.0,2.0], [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]]]   |[d, [[2.0], [2.0], [-4.0], [1.0], [0.0], [3.0], [3.0], [-4.0], [0.0], [-3.0]], [3.0,3.0,-1.0]]     |4.69041575982343  |
|[b, [-1.0,0.0,5.0], [[-5.0], [-2.0], [3.0], [-5.0], [-5.0], [-2.0], [-2.0], [1.0], [2.0], [-4.0]]] |[c, [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]], [2.0,-1.0,2.0]]|4.358898943540674 |
|[c, [2.0,-1.0,2.0], [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]]]|[a, [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]], [1.0,0.0,2.0]]   |1.4142135623730951|
|[c, [2.0,-1.0,2.0], [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]]]|[c, [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]], [2.0,-1.0,2.0]]|0.0               |
|[b, [-1.0,0.0,5.0], [[-5.0], [-2.0], [3.0], [-5.0], [-5.0], [-2.0], [-2.0], [1.0], [2.0], [-4.0]]] |[b, [[-5.0], [-2.0], [3.0], [-5.0], [-5.0], [-2.0], [-2.0], [1.0], [2.0], [-4.0]], [-1.0,0.0,5.0]] |0.0               |
|[a, [1.0,0.0,2.0], [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]]]   |[a, [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]], [1.0,0.0,2.0]]   |0.0               |
|[d, [3.0,3.0,-1.0], [[2.0], [2.0], [-4.0], [1.0], [0.0], [3.0], [3.0], [-4.0], [0.0], [-3.0]]]     |[d, [[2.0], [2.0], [-4.0], [1.0], [0.0], [3.0], [3.0], [-4.0], [0.0], [-3.0]], [3.0,3.0,-1.0]]     |0.0               |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+

從join結(jié)果來看abcd都沒有完全join全,比如a和b沒有一個hash table的值一樣晕拆,所以結(jié)論是

  • BucketLength越大藐翎,向量映射在同一個桶的概率越大,召回地越多实幕,計算量大吝镣,可以降低假陰,但是也提高了假陽
  • NumHashTables越大昆庇,向量映射到的桶選擇就多末贾,如果只要求有一個桶一樣就召回,則這個值越大召回就越多整吆,計算量就越大拱撵,可以降低假陰,但是也提高了假陽
  • 召回(準(zhǔn)確率)和計算量的取舍:如果需要計算的結(jié)果不漏表蝙,且準(zhǔn)確率高拴测,那么必然要越接近全表掃描,即創(chuàng)造更多的召回府蛇,可以調(diào)大桶長和hash_table的個數(shù)集索,但是計算量會變大消耗資源且容易桶內(nèi)傾斜O(jiān)OM,如果為了降低計算量調(diào)低參數(shù),又容易召回不出算不出近鄰抄谐,索引引入下面的主角ScANNS

ScANNS項目概述

ScANNS是Spark的近鄰搜索庫,它支持在余弦距離扰法、jaccard距離歐幾里德距離空間內(nèi)的離線大批量數(shù)據(jù)進(jìn)行最近鄰搜索蛹含。這個項目主要解決現(xiàn)有的Spark ML下LSH的不足:
(1)不支持余弦相似度
(2)數(shù)據(jù)量大了跑不起來OOM,程序失敗塞颁,就算資源足夠浦箱,現(xiàn)有算法存在各別桶數(shù)據(jù)傾斜的問題
(3)增加全量數(shù)據(jù)各自TopK搜索的功能

ScANNS的算法優(yōu)化

github項目官網(wǎng)上寫了項目相比與Spark ml的優(yōu)化措施



上面這一步是將原始向量映射為多個hash table,以及每個hash table中記錄的bucket值祠锣,然后使用explode炸開每一行形成(hash table索引酷窥,hash值組合)的元組,這個地方是一組hash值而不是之前一個hash值伴网,以這個元組進(jìn)行join蓬推,這個和傳統(tǒng)的Spark ml是一樣的,但是在join階段領(lǐng)英工程師進(jìn)行了如下優(yōu)化

  • 對bucket join列進(jìn)行hash編碼:由于(index澡腾,hash bucket)元組由一個整數(shù)索引和多個散列值組成沸伏,因此要來回移動元組可能會非常昂貴。我們最終關(guān)心的只是索引匹配和散列值匹配动分。我們不關(guān)心索引或散列本身的實際值毅糟。因此,我們使用散列技巧將這個元組簡單地散列為整數(shù)澜公。雖然這保證了相同的元組被映射到相同的整數(shù)姆另,但也可能存在沖突,不相等的哈希桶項可能會得到相同的整數(shù)值坟乾。然而迹辐,這并不影響我們算法的“正確性”,它只是增加了我們在暴力步驟中需要檢查的元素的數(shù)量甚侣。
  • 自定義連接join優(yōu)化:由于LSH的hash函數(shù)具有隨機性右核,以及可能數(shù)據(jù)源本身分布的問題,會導(dǎo)致形成某某些桶分布著大量的數(shù)據(jù)渺绒,形成桶傾斜贺喝,并且里面大量的數(shù)據(jù)可能并不值得暴力搜索。領(lǐng)英做了兩個優(yōu)化宗兼,第一基于budkct id進(jìn)行自定義分區(qū)join躏鱼,第二對于桶傾斜,設(shè)置bucket limit參數(shù)殷绍,對于超過limit的傾斜桶染苛,里面的每個元素只和該桶下隨機bucket limit數(shù)量的實體進(jìn)行距離計算,而不是所有都計算
  • topQueue策略:我們還使用了一個定制的topQueue,它是scala的PriorityQueue的包裝器茶行,可以容納的元素總數(shù)是常量躯概。這用于返回前k個最近鄰居,而不是閾值距離內(nèi)的鄰居
  • 數(shù)據(jù)對處理成迭代器:在一個bucket中畔师,當(dāng)通過蠻力返回候選對象時娶靡,將內(nèi)存中的所有對具體化是一種浪費,因為對的數(shù)量可能相當(dāng)大看锉。相反姿锭,我們通過構(gòu)建一個自定義迭代器來處理bucket,從而在按需的基礎(chǔ)上生成pair伯铣。

ScANNS工程下載

工程下載參考項目的github主頁https://github.com/LinkedInAttic/scanns#understanding-the-model-parameters

git clone git@github.com:linkedin/scanns.git
cd scanns
./gradlew build # jar will be built in build/scanns_2.1*/libs

其間可能會報錯沒有權(quán)限呻此,需要在個人github主頁settings下設(shè)置ssh,本地也要設(shè)置腔寡,過程報錯直接百度焚鲜,項目構(gòu)建完成生成jar包scanns_2.11-1.0.0.jar

root@ubuntu:/home/scanns/build/scanns_2.11/libs# ls
scanns_2.11-1.0.0.jar

將jar包引入Maven pom.xml依賴

<dependency>
    <groupId>com.linkedin.nn</groupId>
    <artifactId>scanns_2.11</artifactId>
    <version>2.11</version>
    <systemPath>/home/scanns/build/scanns_2.11/libs/scanns_2.11-1.0.0.jar</systemPath>
    <scope>system</scope>
</dependency>

IDEA測試導(dǎo)入成功

import com.linkedin.nn.algorithm.CosineSignRandomProjectionNNS

ScANNS輸入要求

算法的輸入是RDD[Long, org.apache.spark.ml.linalg.Vector)]. 使用rdd而不是DataFrame/Dataset的原因是保留了一些較低級別的API,允許對算法中執(zhí)行的連接進(jìn)行更多的控制放前。
一個簡單的符合算法輸入要求的例子

scala> val a  = sc.parallelize(Array((1L, dense(1, 2, 3)), (2L, dense(1, 0, 3))))
a: org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] = ParallelCollectionRDD[5] at parallelize at <console>:28

也可以從DataFrame轉(zhuǎn)化為算法需要的格式

scala> val df = Seq((1L, dense(1, 2, 3)), (2L, dense(2, 3, 4))).toDF("a", "b")
df: org.apache.spark.sql.DataFrame = [a: bigint, b: vector]

scala> val df2 = df.rdd.map(x => (x.getLong(0), x.get(1).asInstanceOf[org.apache.spark.ml.linalg.Vector]))
df2: org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] = MapPartitionsRDD[4] at map at <console>:26

總結(jié)一下算法對格式有嚴(yán)格的要求:首先必須就傳入2列恃泪,順序要一致,然后第一列作為實體的標(biāo)識必須是Long類型犀斋,字符串類型不行贝乎,最后必須是RDD


代碼實現(xiàn)部署測試

寫一個簡單的例子實現(xiàn)企業(yè)實體之間的embedding相似推薦,首先看一下輸入數(shù)據(jù)



輸入的數(shù)據(jù)包括id(作為算法的Long類型標(biāo)識列)叽粹,實體名稱(作為推薦結(jié)果)览效,16維度的embedding向量(事先離線算好)存儲為txt格式寫入放在HDFS上,第一步就是Spark讀取數(shù)據(jù)處理算法需要的格式

import org.apache.spark.ml.linalg.Vectors.dense

def loadEntVector(spark: SparkSession, config: Broadcast[Properties]): DataFrame = {
    /*離線訓(xùn)練好詞向量*/
    import spark.implicits._
    val df = spark.read.format("csv").option("sep", " ").option("header", false).load(config.value.getProperty("entVectorPath"))
      .toDF("id", "ent_name", "vector")
      .select($"id".cast("Long"), $"ent_name", $"vector")
      .withColumn("vector", split($"vector", ","))
      .withColumn("vector", udf((vec: scala.collection.mutable.WrappedArray[String]) => dense(vec.map(_.toDouble).toArray)).apply($"vector"))
    return df
  }

以上主要是將向量字符串轉(zhuǎn)化為Array再轉(zhuǎn)化為org.apache.spark.ml.linalg.Vectors.dense
準(zhǔn)備兩張表虫几,一張表作為idMap映射锤灿,一張表作為算法輸入

val vtScore = loadEntVector(spark, configProperties)
val idMapDF = vtScore.select($"id", $"ent_name")
val modelDF = vtScore.select($"id", $"vector").rdd.map(x => (x.getLong(0), x.get(1).asInstanceOf[org.apache.spark.ml.linalg.Vector]))

下一步定義模型

import com.linkedin.nn.algorithm.CosineSignRandomProjectionNNS

val model = new CosineSignRandomProjectionNNS()
      .setNumHashes(300)
      .setSignatureLength(15)
      .setJoinParallelism(200)
      .setBucketLimit(1000)
      .setShouldSampleBuckets(true)
      .setNumOutputPartitions(100)
      .createModel(16)

主要參數(shù)內(nèi)容如下

  • setNumHashes
  • setSignatureLength
  • setJoinParallelism
  • setBucketLimit
  • setShouldSampleBuckets
  • setNumOutputPartitions
  • createModel

接著訓(xùn)練模型,這里調(diào)用getSelfAllNearestNeighbors方法及輸入數(shù)據(jù)的每一個實體和自身整個集合尋找最相似的TopK辆脸,numCandidates參數(shù)就是K值但校,join過程是將Long id替換為實體名稱

val numCandidates = 50
val nbrs = model.getSelfAllNearestNeighbors(modelDF, numCandidates).toDF("id_a", "id_b", "score")
val res = nbrs
      .join(idMapDF.select($"id".as("id_a"), $"ent_name"), Seq("id_a"), "left_outer")
      .join(idMapDF.select($"id".as("id_b"), $"ent_name".as("ent_recommend")), Seq("id_b"), "left_outer")

最后一步將處理好的TopK使用Spark的collect_list算子全部組合在一起形成一個JSON數(shù)組入庫

val res2 = res.withColumn("recommend", udf((x: String, y: Double) => "[" + x + y.toString + "]").apply($"ent_recommend", $"score"))
      .groupBy($"ent_name").agg(collect_list($"recommend").as("recommend"))

任務(wù)提交的時候?qū)ar包掛在spark2-submit后面,如下

sudo -u hdfs spark2-submit \
--class ...
--master ...
--conf ...
--jars scanns_2.11-1.0.0.jar \
myproject.jar

任務(wù)執(zhí)行相當(dāng)快啡氢,200萬實體16維向量尋找tok50状囱,3分鐘跑完
最后到庫里看一下計算結(jié)果


算法調(diào)參

先對算法參數(shù)做一個說明,此處的參數(shù)相比于Spark ml有不同更加復(fù)雜

  • bucketWidth
    這個參數(shù)是針對歐式距離的LSH所需要的參數(shù)倘是,較大的桶長會降低假陰率亭枷,如果輸入向量是標(biāo)準(zhǔn)化的,1-10倍的bucketWidth應(yīng)該是一個合理的值搀崭。
  • numHashes叨粘,signatureLength
    numHashes就是hash table數(shù)量,數(shù)量越大容錯性越好,可以降低模型的不穩(wěn)定性升敲,但是計算量增大答倡, signatureLength越大,假正率越低驴党,同時也會提高假陰率瘪撇,也就是說這個值越大模型對落入同一個桶的要求越嚴(yán)格,這個值越大召回越少鼻弧,后期需要暴力求解的數(shù)據(jù)量就越少
  • joinParallelism
    聯(lián)接的并行性控制每個聯(lián)接分區(qū)/聯(lián)接任務(wù)將處理多少數(shù)據(jù)设江〗踝拢考慮到數(shù)據(jù)集的大小攘轩,您希望這是一個合理的大小。在聯(lián)接中處理的數(shù)據(jù)集是“分解”數(shù)據(jù)集码俩,因此在原始數(shù)據(jù)集本身很大(例如度帮,數(shù)千萬到數(shù)億項)的情況下,您需要將并行度設(shè)置為相當(dāng)大的值稿存,例如幾萬甚至幾十萬笨篷。spark中的任務(wù)創(chuàng)建和管理會有開銷,所以這個值不是越大越好
  • bucketLimit瓣履,shouldSampleBuckets
    bucket limit對于解決前面提到的bucket歪斜問題至關(guān)重要率翅。當(dāng)一個bucket包含的項目超過此參數(shù)設(shè)置的限制時,可以通過適當(dāng)?shù)卦O(shè)置shouldSampleBucket布爾參數(shù)進(jìn)行選擇袖迎。無論哪種情況冕臭,我們都將丟棄桶中的元素。如果shouldSampleBucket設(shè)置為true燕锥,則將從傳入流中對bucketLimit項目數(shù)進(jìn)行采樣辜贵。如果設(shè)置為false,將保留第一個bucketLimit項目數(shù)归形,而忽略其余項目數(shù)托慨。這里的基本原理是,如果我們錯過了這個桶中的高相似度鄰居暇榴,考慮到它們的高相似度厚棵,它們最終在另一個沒有傾斜的桶中匹配的可能性很大
  • numOutputPartitions
    由于連接的并行性可能很高蔼紧,因此連接操作生成的輸出將具有非常多的分區(qū)窟感,即使其大小不大。設(shè)置numOutputPartitions將連接的輸出重新分區(qū)到給定數(shù)量的分區(qū)中歉井,這樣柿祈,如果用戶試圖將這個輸出RDD寫回文件系統(tǒng),那么它將被拆分成的文件數(shù)是可處理的。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末躏嚎,一起剝皮案震驚了整個濱河市蜜自,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌卢佣,老刑警劉巖重荠,帶你破解...
    沈念sama閱讀 211,265評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異虚茶,居然都是意外死亡戈鲁,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評論 2 385
  • 文/潘曉璐 我一進(jìn)店門嘹叫,熙熙樓的掌柜王于貴愁眉苦臉地迎上來婆殿,“玉大人,你說我怎么就攤上這事罩扇∑怕” “怎么了?”我有些...
    開封第一講書人閱讀 156,852評論 0 347
  • 文/不壞的土叔 我叫張陵喂饥,是天一觀的道長消约。 經(jīng)常有香客問我,道長员帮,這世上最難降的妖魔是什么或粮? 我笑而不...
    開封第一講書人閱讀 56,408評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮捞高,結(jié)果婚禮上氯材,老公的妹妹穿的比我還像新娘。我一直安慰自己棠枉,他們只是感情好浓体,可當(dāng)我...
    茶點故事閱讀 65,445評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著辈讶,像睡著了一般命浴。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上贱除,一...
    開封第一講書人閱讀 49,772評論 1 290
  • 那天生闲,我揣著相機與錄音,去河邊找鬼月幌。 笑死碍讯,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的扯躺。 我是一名探鬼主播捉兴,決...
    沈念sama閱讀 38,921評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼蝎困,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了倍啥?” 一聲冷哼從身側(cè)響起禾乘,我...
    開封第一講書人閱讀 37,688評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎虽缕,沒想到半個月后始藕,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,130評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡氮趋,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,467評論 2 325
  • 正文 我和宋清朗相戀三年伍派,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片剩胁。...
    茶點故事閱讀 38,617評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡诉植,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出摧冀,到底是詐尸還是另有隱情倍踪,我是刑警寧澤系宫,帶...
    沈念sama閱讀 34,276評論 4 329
  • 正文 年R本政府宣布索昂,位于F島的核電站,受9級特大地震影響扩借,放射性物質(zhì)發(fā)生泄漏椒惨。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,882評論 3 312
  • 文/蒙蒙 一潮罪、第九天 我趴在偏房一處隱蔽的房頂上張望康谆。 院中可真熱鬧,春花似錦嫉到、人聲如沸沃暗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽孽锥。三九已至,卻和暖如春细层,著一層夾襖步出監(jiān)牢的瞬間惜辑,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評論 1 265
  • 我被黑心中介騙來泰國打工疫赎, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留盛撑,地道東北人。 一個月前我還...
    沈念sama閱讀 46,315評論 2 360
  • 正文 我出身青樓捧搞,卻偏偏與公主長得像抵卫,于是被迫代替她去往敵國和親狮荔。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,486評論 2 348

推薦閱讀更多精彩內(nèi)容