Spark實現(xiàn)海量新聞文本聚類

背景介紹

在和實驗室導師討論構(gòu)建旅游文本倉庫的時候濒旦,老師的一記操作讓我很吃驚...

wget --mirror some ip

這個操作老師稱此為一鍋端,是將某個網(wǎng)址域名下的所有網(wǎng)址內(nèi)容都遞歸wget到...先不考慮反爬蟲措施塔次,假設真的能夠?qū)⑦@個旅游網(wǎng)站的所有游記文本都順利拿到,這些文本數(shù)據(jù)必定是海量的名秀,在這成千上外的文本數(shù)據(jù)中励负,如何構(gòu)建我們自己需要的文本倉庫,涉及到一級一級嚴謹高效的pipeline,以后有空會把這其中的構(gòu)思寫成博文分享匕得。

在這里继榆,首先第一步,必定是對這海量的文本進行文本聚類的操作汁掠,聚類后的文本略吨,能夠輔助文本倉庫的打標簽工作。之前筆者曾經(jīng)做過python文本聚類分析的若干實驗考阱,感興趣的讀者可以前往文本聚類頁面翠忠。面對海量文本數(shù)據(jù),python多進程似乎可以解決效率的問題羔砾,但考慮到資源分配负间、同步異步偶妖、異常處理等實際操作中會遇到的問題姜凄,憑空造python的分布式輪子耗時過久。此時趾访,Spark工具就進入了筆者的考慮范圍了态秧。

spark

在大數(shù)據(jù)開發(fā)領(lǐng)域,Spark的大名如雷貫耳扼鞋,其RDD(彈性分布式數(shù)據(jù)集)/DataFrame的內(nèi)存數(shù)據(jù)結(jié)構(gòu)申鱼,在機器學習“迭代”算法的場景下,速度明顯優(yōu)于Hadoop磁盤落地的方式云头,此外捐友,Spark豐富的生態(tài)圈也使得使用它為核心能夠構(gòu)建一整套大數(shù)據(jù)開發(fā)系統(tǒng)。

本文將采用Spark溃槐,利用tf-idf作為文本特征匣砖,k-means算法進行聚類,各工具版本信息如下:

Spark   2.0.0
scala   2.11.8
java    1.8
hanlp   1.5.3

實現(xiàn)流程

參考里面的博客所采用的數(shù)據(jù)集是已經(jīng)預處理過的昏滴,每個類別的文件都按照1,2,3這樣的數(shù)據(jù)開頭猴鲫,這里的1,2,3就代表類別1,類別2,類別3.這樣會遇到一個問題,也是該博客實現(xiàn)過程中的一個bug谣殊,類別10的開頭第一個字母也是‘1’拂共,導致類別1的判定是存在爭議的。但為了省事姻几,筆者這里就只用其中的9類文本作為聚類文本宜狐,由已知標簽势告,從而判斷聚類效果。

參考中的博客采用的Spark版本偏老抚恒,為Spark1.6,現(xiàn)在Spark的版本已經(jīng)邁進了2代培慌,很多使用方法都不建議了,比如SQLContext,HiveContext和java2scala的一些數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)換柑爸。本文立足2.0版本的spark,將其中過時的地方代替吵护,更加適合新手入門上手。

開發(fā)環(huán)境

開發(fā)環(huán)境采用idea+maven(雖然SBT在spark業(yè)界更加流行)

下面是筆者的maven配置表鳍,放在pom.xml文件中:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>HanLP</groupId>
    <artifactId>myHanLP</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>2.0.0</spark.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <!-- scala環(huán)境,有了spark denpendencies后可以省略 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.8</version>
        </dependency>
        <!-- 日志框架 -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.12</version>
        </dependency>
        <!-- 中文分詞框架 -->
        <dependency>
            <groupId>com.hankcs</groupId>
            <artifactId>hanlp</artifactId>
            <version>portable-1.5.3</version>
        </dependency>
        <!-- Spark dependencies -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.19</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
    </build>


</project>

其中需要注意的有兩個地方馅而,第一個地方是scala.version,不要具體寫到2.11.8,這樣的話是找不到合適的spark依賴的譬圣,直接寫2.11就好瓮恭。第二個地方是maven-scala-plugin,這個地方主要是為了使得項目中java代碼和scala代碼共存的,畢竟它們倆是不一樣的語言厘熟,雖然都能在jvm中跑屯蹦,但編譯器不一樣呀...所以這個地方非常重要.

java目錄功能介紹

java目錄下的文件主要有兩個功能:

  • 測試Hanlp
  • 轉(zhuǎn)換編碼、合并文件

測試hanlp工具绳姨,這是個開源的java版本分詞工具登澜,文件中分別測試了不同的分詞功能。另一個是將所有文件從GBK編碼模式轉(zhuǎn)換成UTF-8飘庄,再將這些小文件寫到一個大文件中脑蠕。轉(zhuǎn)換編碼是為了文件讀取順利不報編碼的錯誤。大文件是為了提高Spark或Hadoop這類工具的效率跪削,這里涉及到它們的一些實現(xiàn)原理谴仙,簡單來說,文件輸入到Spark中還會有分塊碾盐、切片的操作晃跺,大文件在這些操作時,效率更高毫玖。

scala目錄功能介紹

scala目錄下總共有4個子目錄掀虎,分別是用來測試scala編譯運行是否成功,調(diào)用Spark MLlib計算tf-idf孕豹,計算TF-IDF再利用K-means聚類涩盾,工具類。這里的工具類是原博客作者設計的励背,設計的目的是確定Spark是在本地測試春霍,還是在集群上火力全來跑,并且適用于Window系統(tǒng)叶眉。因為我去掉了其封裝的SQLContext(已不建議使用),所以這個工具類在我Linux操作系統(tǒng)下意義也不是很大...

求TF-IDF

求TF-IDF采用SparkSession替代SparkContext,如下:

package test_tfidf

import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.sql.SparkSession
//import utils.SparkUtils
/**
  *測試Spark MLlib的tf-idf
  * Created by zcy on 18-1-4.
  */
object TFIDFDemo {
  def main(args: Array[String]) {
    val spark_session = SparkSession.builder().appName("tf-idf").master("local[4]").getOrCreate()
    import spark_session.implicits._ // 隱式轉(zhuǎn)換
    val sentenceData = spark_session.createDataFrame(Seq(
      (0, "Hi I heard about Spark"),
      (0, "I wish Java could use case classes"),
      (1, "Logistic regression models are neat")
    )).toDF("label", "sentence")

    // 分詞
    val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
    println("wordsData----------------")
    val wordsData = tokenizer.transform(sentenceData)
    wordsData.show(3)
    // 求TF
    println("featurizedData----------------")
    val hashingTF = new HashingTF()
      .setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(2000) // 設置哈希表的桶數(shù)為2000址儒,即特征維度
    val featurizedData = hashingTF.transform(wordsData)
    featurizedData.show(3)
    // 求IDF
    println("recaledData----------------")
    val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
    val idfModel = idf.fit(featurizedData)
    val rescaledData = idfModel.transform(featurizedData)
    rescaledData.show(3)
    println("----------------")
    rescaledData.select("features", "label").take(3).foreach(println)
  }
}

上面TF轉(zhuǎn)換特征向量的代碼設置了桶數(shù)芹枷,即特征向量的維度,這里將每個文本用2000個特征向量表示莲趣。

這里有一個非常好的博文鸳慈,詳細的介紹了使用Spark MLlib計算TF-IDF,傳送門在這喧伞,我就不多介紹辣:

Spark 2.1.0 入門:特征抽取 -- TF-IDF

調(diào)用K-means模型

這里和參考博客一樣走芋,參考官網(wǎng)教程即可:

// Trains a k-means model.
println("creating kmeans model ...")
val kmeans = new KMeans().setK(k).setSeed(1L)
val model = kmeans.fit(rescaledData)
// Evaluate clustering by computing Within Set Sum of Squared Errors.
println("calculating wssse ...")
val WSSSE = model.computeCost(rescaledData)
println(s"Within Set Sum of Squared Errors = $WSSSE")
評價方式

假設最終得到的文件和預測結(jié)果如下:

val t = List(
    ("121.txt",0),("122.txt",0),("123.txt",3),("124.txt",0),("125.txt",0),("126.txt",1),
    ("221.txt",3),("222.txt",4),("223.txt",3),("224.txt",3),("225.txt",3),("226.txt",1),
    ("421.txt",4),("422.txt",4),("4.txt",3),("41.txt",3),("43.txt",4),("426.txt",1)

文件名的第一個字符是否和聚類類別一致,統(tǒng)計結(jié)果來判斷潘鲫,是否聚類成功翁逞,最終得到整體的聚類準確率,這里提供demo例子如下:

package test_scala

import org.apache.spark.Partitioner
import utils.SparkUtils
/**
  * Created by zcy on 18-1-4.
  */
object TestPartition {
  def main(args: Array[String]): Unit ={
    val t = List(
      ("121.txt",0),("122.txt",0),("123.txt",3),("124.txt",0),("125.txt",0),("126.txt",1),
      ("221.txt",3),("222.txt",4),("223.txt",3),("224.txt",3),("225.txt",3),("226.txt",1),
      ("421.txt",4),("422.txt",4),("4.txt",3),("41.txt",3),("43.txt",4),("426.txt",1)
    ) // 文檔開頭代表類別,后一個數(shù)字代表預測類型
    val sc = SparkUtils.getSparkContext("test partitioner",true) //本地測試:true

    val data = sc.parallelize(t)
    val file_index = data.map(_._1.charAt(0)).distinct.zipWithIndex().collect().toMap
    println("file_index: " + file_index) // key:begin of txt, value:index
    val partitionData = data.partitionBy(MyPartitioner(file_index))

    val tt = partitionData.mapPartitionsWithIndex((index: Int, it: Iterator[(String,Int)]) => it.toList.map(x => (index,x)).toIterator)
    println("map partitions with index:")
    tt.collect().foreach(println(_)) // like this: (0,(421.txt,4))
    // firstCharInFileName , firstCharInFileName - predictType

    val combined = partitionData.map(x =>( (x._1.charAt(0), Integer.parseInt(x._1.charAt(0)+"") - x._2),1) )
      .mapPartitions{f => var aMap = Map[(Char,Int),Int]();
        for(t <- f){
          if (aMap.contains(t._1)){
            aMap = aMap.updated(t._1,aMap.getOrElse(t._1,0)+1)
          }else{
            aMap = aMap + t
          }
        }
        val aList = aMap.toList
        val total= aList.map(_._2).sum
        val total_right = aList.map(_._2).max
        List((aList.head._1._1,total,total_right)).toIterator
        //       aMap.toIterator //打印各個partition的總結(jié)
      }
    val result = combined.collect()
    println("results: ")
    result.foreach(println(_)) // (4,6,3) 類別4溉仑,總共6個班利,3個正確
    for(re <- result ){
      println("文檔"+re._1+"開頭的 文檔總數(shù):"+ re._2+",分類正確的有:"+re._3+",分類正確率是:"+(re._3*100.0/re._2)+"%")
    }
    val averageRate = result.map(_._3).sum *100.0 / result.map(_._2).sum
    println("平均正確率為:"+averageRate+"%")
    sc.stop()
  }
}

case class MyPartitioner(file_index:Map[Char,Long]) extends Partitioner{
  override def getPartition(key: Any): Int = key match {
    case _ => file_index.getOrElse(key.toString.charAt(0),0L).toInt //將value轉(zhuǎn)換成int
  }
  override def numPartitions: Int = file_index.size
}

結(jié)果展示

最終酥艳,在筆者本地Spark偽集群環(huán)境下,用4個進程模擬4臺主機气忠,輸出結(jié)果如下:

文檔4開頭的 文檔總數(shù):214,分類正確的有:200,分類正確率是:93.45794392523365%
文檔8開頭的 文檔總數(shù):249,分類正確的有:221,分類正確率是:88.75502008032129%
文檔6開頭的 文檔總數(shù):325,分類正確的有:258,分類正確率是:79.38461538461539%
文檔2開頭的 文檔總數(shù):248,分類正確的有:170,分類正確率是:68.54838709677419%
文檔7開頭的 文檔總數(shù):204,分類正確的有:200,分類正確率是:98.03921568627452%
文檔5開頭的 文檔總數(shù):200,分類正確的有:185,分類正確率是:92.5%
文檔9開頭的 文檔總數(shù):505,分類正確的有:504,分類正確率是:99.8019801980198%
文檔3開頭的 文檔總數(shù):220,分類正確的有:114,分類正確率是:51.81818181818182%
文檔1開頭的 文檔總數(shù):450,分類正確的有:448,分類正確率是:99.55555555555556%
平均正確率為:87.95411089866157%

這里已經(jīng)排除了參考博文中類別1與類別11的影響宾尚,還有某一類別中有一個文件開頭不是數(shù)字的尷尬問題..初學者可以直接用我github庫中的data文件夾绣张,參考博客的有一些無傷大雅的小問題雅宾。

從整個運行結(jié)果來看唆迁,正確率還是很高的,值得信賴吩案,但和參考博客比棚赔,某些類別還是不夠準確帝簇,畢竟k-means算法有一定的隨機性徘郭,這種誤差我們還是可以接受的。并且從整體運行時間上來說丧肴,真的非巢腥啵快(估計在十幾秒),這個時間還包括了啟動Spark芋浮,初始化等等過程抱环,和python處理相比,不僅高效纸巷,還更加可靠镇草。強推...

寫了這么多,不知道導師的一鍋端操作搞定沒有...搞定了的話瘤旨,正好練練手梯啤,打成jar包,submit到實驗室的spark集群上去跑跑...

  • 參考資料

Spark應用HanLP對中文語料進行文本挖掘--聚類

詳細代碼見筆者的github:文本聚類Spark版本

××××××××××××××××××××××××××××××××××××××××××

本文屬于筆者(EdwardChou)原創(chuàng)

轉(zhuǎn)載請注明出處

××××××××××××××××××××××××××××××××××××××××××

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末存哲,一起剝皮案震驚了整個濱河市因宇,隨后出現(xiàn)的幾起案子七婴,更是在濱河造成了極大的恐慌,老刑警劉巖察滑,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件打厘,死亡現(xiàn)場離奇詭異,居然都是意外死亡贺辰,警方通過查閱死者的電腦和手機户盯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來饲化,“玉大人先舷,你說我怎么就攤上這事∽沂蹋” “怎么了蒋川?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長撩笆。 經(jīng)常有香客問我捺球,道長,這世上最難降的妖魔是什么夕冲? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任氮兵,我火速辦了婚禮,結(jié)果婚禮上歹鱼,老公的妹妹穿的比我還像新娘泣栈。我一直安慰自己,他們只是感情好弥姻,可當我...
    茶點故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布南片。 她就那樣靜靜地躺著,像睡著了一般庭敦。 火紅的嫁衣襯著肌膚如雪疼进。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天秧廉,我揣著相機與錄音伞广,去河邊找鬼。 笑死疼电,一個胖子當著我的面吹牛嚼锄,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播蔽豺,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼区丑,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了茫虽?” 一聲冷哼從身側(cè)響起刊苍,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤既们,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后正什,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體啥纸,經(jīng)...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年婴氮,在試婚紗的時候發(fā)現(xiàn)自己被綠了斯棒。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,059評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡主经,死狀恐怖荣暮,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情罩驻,我是刑警寧澤穗酥,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站惠遏,受9級特大地震影響砾跃,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜节吮,卻給世界環(huán)境...
    茶點故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一抽高、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧透绩,春花似錦翘骂、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至志鞍,卻和暖如春瞭亮,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背固棚。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留仙蚜,地道東北人此洲。 一個月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像委粉,于是被迫代替她去往敵國和親呜师。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,792評論 2 345

推薦閱讀更多精彩內(nèi)容