摘要:用商品描述為語(yǔ)料庫(kù)訓(xùn)練商品詞向量為例,分享一下用pyspark自帶word2vec+jieba分詞訓(xùn)練詞向量的流程.
工具:python,pyspark,jieba,pandas,numpy
數(shù)據(jù)格式:自定義詞典,語(yǔ)料庫(kù)均為pyspark dataframe,停用辭典不大,直接使用txt.
1 create spark
我的pyspark參數(shù)設(shè)置如下:
def create_spark():
? ? sparkconf = SparkConf('jianwangzhilai') \
? ? ? ? .setAppName("jianwangzhilai") \
? ? ? ? .set("spark.sql.catalogImplementation","hive") \
? ? ? ? .set("spark.dynamicAllocation.enabled", "false") \
? ? ? ? .set("spark.shuffle.service.enabled", "false") \
? ? ? ? .setExecutorEnv("JAVA_HOME", os.environ["JAVA_HOME"]) \
? ? ? ? .setExecutorEnv("HADOOP_HDFS_HOME", os.environ["HADOOP_HOME"]) \
? ? ? ? .setExecutorEnv("LD_LIBRARY_PATH", os.environ["LD_LIBRARY_PATH"] ) \
? ? ? ? .setExecutorEnv("CLASSPATH", os.environ["CLASSPATH"])
? ? sparkconf.set("spark.executor.instances", '64')) \
? ? ? .set("spark.executor.cores", '8' \
? ? ? .set("spark.cores.max",'512') \
? ? ? .set("spark.executor.memory",'10g') \
? ? ? .set("spark.driver.maxResultSize", "4g")
? ? spark=SparkSession.builder.enableHiveSupport()\
? ? ? ? ? ? .config(conf=sparkconf).getOrCreate()
? ? spark.sparkContext.setLogLevel('WARN')
? ? print('spark created...')
? ? return spark
設(shè)置有點(diǎn)瑣碎,但大同小異,唯一需要注意的是,spark.driver.maxResultSize這個(gè)參數(shù)最好設(shè)置大于1g.
2 自定義詞典,udf
此處自定義詞典直接通過(guò)spark讀取,格式為spark的dataframe,只有一列'word'字段,將其直接轉(zhuǎn)化為pandas數(shù)據(jù)框后,使用jieba.add_word逐條加載.
user_dic=spark.sql("select * from user_dict")
dic=user_dic.toPandas()
for i in np.arange(dic.shape[0]):
????jieba.add_word(dic['word'][i].strip())
從pyspark中import如下部分:
from pyspark.sql.types import StringType,ArrayType
from pyspark.sql.functions import udf
定義udf,把jieba分詞包裝起來(lái),返回一個(gè)pyspark可識(shí)別的arraytype,array中的基元素是stringtype的:
def seg(x):
? ? jieba_seg_generator=jieba.cut(x, cut_all=False)
? ? words = []
? ? for word in jieba_seg_generator:
? ? ? ? if? len(word)>1:
? ? ? ? ? ? words.append(word)
? ? return words
seg_udf = udf(seg, ArrayType(StringType()))
3 語(yǔ)料庫(kù)分詞
商品的語(yǔ)料同樣通過(guò)spark.sql讀取
corpus_goods = spark.sql("select * from corpus_goods_description ").cache()
同樣,格式為spark dataframe,包含一個(gè)主鍵商品id和一個(gè)商品描述description.
利用之前打包的udf,對(duì)商品描述進(jìn)行分詞,生成一個(gè)新列seg:
corpus_goods = corpus_goods.withColumn('seg',seg_udf(corpus_goods['description']))
4 停用詞
停用詞因?yàn)檩^少,這里直接保存成了txt格式,讀取成list:
stop_words=open('./stopwords.txt', 'r', encoding='utf_8').readlines()
stop_words = [line.strip() for line in stop_words]
停用詞去除可以自己寫好,一起打包到之前的udf中,只要在seg函數(shù)中稍作改變:
? ? ? ? if? len(word)>1 and word not in stop_words:
? ? ? ? ? ? words.append(word)
也可以通過(guò)pyspark自帶模塊進(jìn)行去除:
from pyspark.ml.feature import StopWordsRemover
remover = StopWordsRemover(inputCol="seg", outputCol="words", stopWords=stop_words)? ? ? ? ? ? ?
corpus_goods = remover.transform(corpus_goods)
這里推薦后一種方法.去除停用詞后,基本可以進(jìn)行訓(xùn)練了,此時(shí)語(yǔ)料庫(kù)是這個(gè)樣:
5 詞向量訓(xùn)練
語(yǔ)料分詞后,直接進(jìn)行如下訓(xùn)練:
from pyspark.ml.feature import Word2Vec
w2v = Word2Vec(vectorSize=100, minCount=3,seed=123, numPartitions=64,inputCol="words", outputCol="result")
model = w2v.fit(corpus_goods)
model.getVectors().head(2)
model.getVectors().count()
##save
path = "./models/word2vec"
model.write().overwrite().save(path)
訓(xùn)練很簡(jiǎn)單,注意numPartitions參數(shù),這個(gè)參數(shù)默認(rèn)是1,如果使用默認(rèn)參數(shù),等于只有一個(gè)job進(jìn)行fit,如果數(shù)據(jù)很大,這個(gè)過(guò)程將會(huì)非常漫長(zhǎng),這里我設(shè)置成和instances相同的大小,也可以設(shè)置成其他合適的大小,具體看機(jī)器配置.
minCount參數(shù)控制了詞頻,詞頻低于這個(gè)字段的將會(huì)被舍棄.vectorSize控制了向量的大小,一般超過(guò)50.
詞向量訓(xùn)練完成后,得到了每個(gè)詞的向量表示,此時(shí)需要把整個(gè)商品的描述也表示成向量,如果自己實(shí)現(xiàn)也可,但是pyspark直接一行搞定,速度飛快:
corpus_goods = model.transform(corpus_goods)
此時(shí),corpus_goods數(shù)據(jù)框中,result字段就是商品描述的文本向量形式了,大工告成.之后可以進(jìn)行相似度計(jì)算或者作為特征進(jìn)入其他模型.