起因
先說(shuō)下為什么要做這個(gè)事。做的圖片站的圖片來(lái)源為很多美女圖片站,自然地疯淫,會(huì)有很多重復(fù)的圖片,而我的目標(biāo)就是要把重復(fù)的圖片找出來(lái)戳玫,剔除掉或者是做其他處理熙掺。
什么樣的圖片屬于相同圖片呢?因?yàn)闀?huì)存在一些有水印的圖片(如下圖)咕宿,或者是略微變形的圖片(如1024 * 720 與1020 * 720的圖片)
phash
phash全稱是感知哈希算法(Perceptual hash algorithm)币绩,使用這玩意兒可以對(duì)每個(gè)圖片生成一個(gè)值蜡秽,如上面兩個(gè)圖分別是2582314446007581403 與 2582314446141799129 (轉(zhuǎn)為二進(jìn)制再比較),然后計(jì)算他們的hamming distance缆镣,簡(jiǎn)單的說(shuō)就是數(shù)一數(shù)二進(jìn)制之后有幾位不同芽突。整個(gè)處理流程有點(diǎn)像對(duì)文章去重時(shí)先算simhash再算hamming distance,很多東西都可以直接套用過(guò)來(lái)董瞻。
phash具體的實(shí)現(xiàn)可以很多地方都有了诉瓦,可以搜到很多差不多的內(nèi)容,在這我也就簡(jiǎn)單的記錄下力细,具體可以去谷歌或者百度搜下。
- 縮小尺寸 為了后邊的步驟計(jì)算簡(jiǎn)單些
- 簡(jiǎn)化色彩 將圖片轉(zhuǎn)化成灰度圖像固额,進(jìn)一步簡(jiǎn)化計(jì)算量
- 計(jì)算DCT 計(jì)算圖片的DCT變換眠蚂,得到32*32的DCT系數(shù)矩陣。
- 縮小DCT 雖然DCT的結(jié)果是32*32大小的矩陣斗躏,但我們只要保留左上角的8*8的矩陣逝慧,這部分呈現(xiàn)了圖片中的最低頻率。
- 計(jì)算平均值 如同均值哈希一樣啄糙,計(jì)算DCT的均值笛臣。
-
計(jì)算hash值 根據(jù)8*8的DCT矩陣,設(shè)置0或1的64位的hash值隧饼,大于等于DCT均值的設(shè)為”1”沈堡,小于DCT均值的設(shè)為“0”。組合在一起燕雁,就構(gòu)成了一個(gè)64位的整數(shù)诞丽,這就是這張圖片的指紋。
python 版本的實(shí)現(xiàn)
# -*- coding: utf-8 -*-
from compiler.ast import flatten
import cv2
import numpy as np
def pHash(imgfile):
# 加載并調(diào)整圖片為32x32灰度圖片
img = cv2.imread(imgfile, 0)
img = cv2.resize(img, (32, 32), interpolation=cv2.INTER_CUBIC)
# 創(chuàng)建二維列表
h, w = img.shape[:2]
vis0 = np.zeros((h, w), np.float32)
vis0[:h, :w] = img # 填充數(shù)據(jù)
# 二維Dct變換
vis1 = cv2.dct(cv2.dct(vis0))
# 拿到左上角的8 * 8
vis1 = vis1[0:8, 0:8]
# 把二維list變成一維list
img_list = flatten(vis1.tolist())
# 計(jì)算均值
avg = sum(img_list) * 1. / len(img_list)
avg_list = ['0' if i < avg else '1' for i in img_list]
# 得到哈希值
return ''.join(['%x' % int(''.join(avg_list[x:x + 4]), 2) for x in range(0, 8 * 8, 4)])
這段代碼是網(wǎng)上找來(lái)做測(cè)試用的拐格,當(dāng)時(shí)有個(gè)坑僧免,他沒有vis1 = vis1[0:8,0:8]
這一步,然后出來(lái)的結(jié)果就很奇葩捏浊,而且指紋長(zhǎng)的可怕(32 * 32位)懂衩,準(zhǔn)確率和召回率都低的驚人。這段代碼也很簡(jiǎn)單金踪,幾乎和白話一樣浊洞,就是把上面phash的流程給翻譯了一遍。
然鵝热康,我并沒有使用上面的python版的沛申,出于兩個(gè)原因,一是我上邊說(shuō)的坑姐军,當(dāng)時(shí)并沒有發(fā)現(xiàn)铁材,二是畢竟是python尖淘,雖說(shuō)大部分計(jì)算的部分是用c寫的(opencv),但還是覺得會(huì)慢著觉。找到的是一個(gè)純c的村生,來(lái)自 phash.org (沒錯(cuò),就是這么官方)饼丘。安裝啥的網(wǎng)站里邊都有趁桃,附上一個(gè)python調(diào)用的腳本。
class pHash(object):
def __init__(self):
self._lib = ctypes.CDLL('/opt/local/lib/libpHash.dylib', use_errno=True)
def dct_imagehash(self, path):
phash = ctypes.c_uint64()
if self._lib.ph_dct_imagehash(path, ctypes.pointer(phash)):
errno_ = ctypes.get_errno()
err, err_msg = (errno.errorcode[errno_], os.strerror(errno_)) \
if errno_ else ('none', 'errno was set to 0')
print(('Failed to get image hash'
' ({!r}): [{}] {}').format(path, err, err_msg), file=sys.stderr)
return None
return phash.value
def hamming_distance(self, hash1, hash2):
return self._lib.ph_hamming_distance(
*map(ctypes.c_uint64, [hash1, hash2]))
非常貼心的還附贈(zèng)了海明距的計(jì)算肄鸽。
因?yàn)槲业膱D片都是存在云端卫病,為了速度更快,我會(huì)直接用云端圖像處理把圖片先縮小典徘,壓縮后再處理蟀苛。我本機(jī)測(cè)試的結(jié)果是一千張圖生成phash耗時(shí)1.5s,相當(dāng)快了逮诲。(有個(gè)很驚悚的發(fā)現(xiàn)帜平,上頭那個(gè)python版本千張耗時(shí)0.7s...驚呆了...可能實(shí)現(xiàn)不太一樣吧...)
大量數(shù)據(jù)hamming distance 計(jì)算
如標(biāo)題所述,較大規(guī)模圖片梅鹦,我這邊的大概是百萬(wàn)級(jí)別裆甩,但是即便是千萬(wàn)級(jí)別應(yīng)該還是差不多的方式,億級(jí)別的數(shù)據(jù)可能我的小破開發(fā)機(jī)就受不了了(沒錯(cuò)...沒用服務(wù)器...)
先說(shuō)說(shuō)海明距齐唆,咱們上邊不是生成了一段64位的數(shù)呢嗤栓?海明距就是數(shù)一數(shù)兩個(gè)hash值有多少位的差異,一般小于5的都算近似箍邮,就是這么簡(jiǎn)單:)
假設(shè)有1000萬(wàn)已經(jīng)處理完的phash值吧抛腕,現(xiàn)在來(lái)了一個(gè)新的phash,如何找出所有可能和他重復(fù)的圖呢媒殉?
最簡(jiǎn)單粗暴的担敌,直接遍歷一次...即遍歷1000萬(wàn)次....那么耗時(shí)大概...不用算了,肯定是個(gè)很夸張的值廷蓉,不靠譜全封。
這邊我采用的是一種內(nèi)存換速度的方式,64位的的hash值桃犬,分為八組刹悴,每組八位。建立八個(gè)dict攒暇,每個(gè)dict代表一組土匀,以每組的值作為key,value是一個(gè)list形用,存放key相同的hash值就轧。查找的時(shí)候证杭,把hash值分成八個(gè),分別在八個(gè)map里邊查找妒御,如果有key相同的解愤,取出key相同的所有hash值進(jìn)行遍歷。
說(shuō)的相當(dāng)?shù)膩y乎莉,下邊是代碼送讲。
split_count = 8 # 每個(gè)64位的phash值分為八段,每段8位
def split(key, split_count):
pre_length = 64 / split_count
return [key[i * pre_length: (i + 1) * pre_length] for i in range(split_count)]
class ImageManager(object):
def __init__(self):
self.phash = pHash() # 就是上面那個(gè)pHash類
self.phash_cache = [defaultdict(list) for i in range(split_count)] #
self.init_phash_map()
def init_phash_map(self):
#我是把所有的phash存在sqlite里邊惋啃,這邊取出所有的Image
for image in Image.select():
self.add_to_image_cache(image)
def add_to_image_cache(self, image):
# 將hash值分割為8段
key_split = split(bin(int(image.phash))[2:].rjust(64, '0'), split_count)
for index, k in enumerate(key_split):
self.phash_cache[index][k].append(image)
def has_same(self, ori_image):
phash = ori_image.phash
key_split = split(bin(int(phash))[2:].rjust(64, '0'), split_count)
result = set()
for index, k in enumerate(key_split):
if k in self.phash_cache[index]:
for image in self.phash_cache[index][k]:
distance = self.distance(int(phash), int(image.phash))
if distance < 5 and ori_image.key != image.key:
result.add(image)
if result:
return True,list(result)
return False,[]