使用Spark讀取并分析二進(jìn)制文件

需求

客戶希望通過spark來分析二進(jìn)制文件中0和1的數(shù)量以及占比蛀醉。如果要分析的是目錄悬襟,則針對目錄下的每個文件單獨進(jìn)行分析。分析后的結(jié)果保存與被分析文件同名的日志文件中拯刁,內(nèi)容包括0和1字符的數(shù)量與占比脊岳。

要求:如果值換算為二進(jìn)制不足八位,則需要在左側(cè)填充0。

可以在linux下查看二進(jìn)制文件的內(nèi)容割捅。命令:

xxd –b –c 1 filename

-c 1是顯示1列1個字符奶躯,-b是顯示二進(jìn)制

Python版本

代碼

# This Python file uses the following encoding: utf-8

from __future__ import division
import os
import time
import sys
from pyspark import SparkConf, SparkContext

APP_NAME = "Load Bin Files"


def main(spark_context, path):
    file_paths = fetch_files(path)
    for file_path in file_paths:
        outputs = analysis_file_content(spark_context, path + "/" + file_path)
        print_outputs(outputs)
        save_outputs(file_path, outputs)


def fetch_files(path):
    if os.path.isfile(path):
        return [path]
    return os.listdir(path)


def analysis_file_content(spark_context, file_path):
    data = spark_context.binaryRecords(file_path, 1)
    records = data.flatMap(lambda d: list(bin(ord(d)).replace('0b', '').zfill(8)))
    mapped_with_key = records.map(lambda d: ('0', 1) if d == '0' else ('1', 1))
    result = mapped_with_key.reduceByKey(lambda x, y: x + y)

    total = result.map(lambda r: r[1]).sum()
    return result.map(lambda r: format_outputs(r, total)).collect()


def format_outputs(value_with_key, total):
    tu = (value_with_key[0], value_with_key[1], value_with_key[1] / total * 100)
    return "字符{0}的數(shù)量為{1}, 占比為{2:.2f}%".format(*tu)


def print_outputs(outputs):
    for output in outputs:
        print output


def save_outputs(file_path, outputs):
    result_dir = "result"
    if not os.path.exists(result_dir):
        os.mkdir(result_dir)

    output_file_name = "result/" + file_name_with_extension(file_path) + ".output"
    with open(output_file_name, "a") as result_file:
        for output in outputs:
            result_file.write(output + "\n")
        result_file.write("統(tǒng)計于{0}\n\n".format(format_logging_time()))


def format_logging_time():
    return time.strftime('%Y-%m-%d %H:%m:%s', time.localtime(time.time()))


def file_name_with_extension(path):
    last_index = path.rfind("/") + 1
    length = len(path)
    return path[last_index:length]


if __name__ == "__main__":
    conf = SparkConf().setMaster("local[*]")
    conf = conf.setAppName(APP_NAME)
    sc = SparkContext(conf=conf)

    if len(sys.argv) != 2:
        print("請輸入正確的文件或目錄路徑")
    else:
        main(sc, sys.argv[1])

核心邏輯都在analysis_file_content方法中。

運行

python是腳本文件亿驾,無需編譯嘹黔。不過運行的前提是要安裝好pyspark。運行命令為:

./bin/spark-submit /Users/zhangyi/PycharmProjects/spark_binary_files_demo/parse_files_demo.py "files"

遇到的坑

開發(fā)環(huán)境的問題

要在spark下使用python莫瞬,需要事先使用pip安裝pyspark儡蔓。結(jié)果安裝總是失敗。python的第三方庫地址是https://pypi.python.org/simple/疼邀,在國內(nèi)訪問很慢喂江。通過搜索問題,許多文章提到了國內(nèi)的鏡像庫旁振,例如豆瓣的庫开呐,結(jié)果安裝時都提示找不到pyspark。

查看安裝錯誤原因规求,并非不能訪問該庫筐付,僅僅是訪問較慢,下載了不到8%的時候就提示下載失敗阻肿。這實際上是連接超時的原因瓦戚。因而可以修改連接超時值〈运可以在~/.pip/pip.conf下增加:

[global]
timeout = 6000

雖然安裝依然緩慢较解,但至少能保證pyspark安裝完畢。但是在安裝py4j時赴邻,又提示如下錯誤信息(安裝環(huán)境為mac):

OSError: [Errno 1] Operation not permitted: '/System/Library/Frameworks/Python.framework/Versions/2.7/share'

即使這個安裝方式是采用sudo印衔,且在管理員身份下安裝,仍然提示該錯誤姥敛。解決辦法是執(zhí)行如下安裝:

pip install --upgrade pip

sudo pip install numpy --upgrade --ignore-installed

sudo pip install scipy --upgrade --ignore-installed

sudo pip install scikit-learn --upgrade --ignore-installed

然后再重新執(zhí)行sudo pip install pyspark奸焙,安裝正確。

字符編碼的坑

在提示信息以及最后分析的結(jié)果中都包含了中文彤敛。運行代碼時与帆,會提示如下錯誤信息:

SyntaxError: Non-ASCII character '\xe5' in file /Users/zhangyi/PycharmProjects/spark_binary_files_demo/parse_files_demo.py on line 36, but no encoding declared; see http://python.org/dev/peps/pep-0263/ for details

需要在代碼文件的首行添加如下編碼聲明:

# This Python file uses the following encoding: utf-8

SparkConf的坑

初始化SparkContext的代碼如下所示:

conf = SparkConf().setMaster("local[*]")
conf = conf.setAppName(APP_NAME)
sc = SparkContext(conf)

結(jié)果報告運行錯誤:

Error initializing SparkContext.
org.apache.spark.SparkException: Could not parse Master URL: '<pyspark.conf.SparkConf object at 0x106666390>'

根據(jù)錯誤提示,以為是Master的設(shè)置有問題墨榄,實際上是實例化SparkContext有問題玄糟。閱讀代碼,發(fā)現(xiàn)它的構(gòu)造函數(shù)聲明如下所示:

    def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
                environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
                gateway=None, jsc=None, profiler_cls=BasicProfiler):

而前面的代碼僅僅是簡單的將conf傳遞給SparkContext構(gòu)造函數(shù)袄秩,這就會導(dǎo)致Spark會將conf看做是master參數(shù)的值阵翎,即默認(rèn)為第一個參數(shù)逢并。所以這里要帶名參數(shù):

sc = SparkContext(conf = conf)

sys.argv的坑

我需要在使用spark-submit命令執(zhí)行python腳本文件時,傳入我需要分析的文件路徑郭卫。與scala和java不同筒狠。scala的main函數(shù)參數(shù)argv實際上可以接受命令行傳來的參數(shù)。python不能這樣箱沦,只能使用sys模塊來接收命令行參數(shù)辩恼,即sys.argv

argv是一個list類型谓形,當(dāng)我們通過sys.argv獲取傳遞進(jìn)來的參數(shù)值時灶伊,一定要明白它會默認(rèn)將spark-submit后要執(zhí)行的python腳本文件路徑作為第一個參數(shù),而之后的參數(shù)則放在第二個寒跳。例如命令如下:

./bin/spark-submit /Users/zhangyi/PycharmProjects/spark_binary_files_demo/parse_files_demo.py "files"

則:

  • argv[0]: /Users/zhangyi/PycharmProjects/spark_binary_files_demo/parse_files_demo.py
  • argv[1]: files

因此聘萨,我需要獲得files文件夾名,就應(yīng)該通過argv[1]來獲得童太。

此外米辐,由于argv是一個list,沒有size屬性书释,而應(yīng)該通過len()方法來獲得它的長度翘贮,且期待的長度為2。

整數(shù)參與除法的坑

在python 2.7中爆惧,如果直接對整數(shù)執(zhí)行除法狸页,結(jié)果為去掉小數(shù)。因此4 / 5得到的結(jié)果卻是0扯再。在python 3中芍耘,這種運算會自動轉(zhuǎn)型為浮點型。

要解決這個問題熄阻,最簡單的辦法是導(dǎo)入一個現(xiàn)成的模塊:

from __future__ import division

注意:這個import的聲明應(yīng)該放在所有import聲明前面斋竞。

Scala版本

代碼

package bigdata.demo

import java.io.File
import java.text.SimpleDateFormat
import java.util.Calendar

import com.google.common.io.{Files => GoogleFiles}
import org.apache.commons.io.Charsets
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Main {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Binary Files").setMaster("local[*]")
    val sc = new SparkContext(conf)

    if (args.size != 1) {
      println("請輸入正確的文件或目錄路徑")
      return
    }

    def analyseFileContent(filePath: String): RDD[String] = {
      val data = sc.binaryRecords(filePath, 1)
      val records = data.flatMap(x => x.flatMap(x => toBinaryStr(byteToShort(x)).toCharArray))
      val mappedWithKey = records.map(i => if (i == '0') ('0', 1L) else ('1', 1L))
      val result = mappedWithKey.reduceByKey(_ + _)

      val sum = result.map(_._2).sum()
      result.map { case (key, count) => formatOutput(key, count, sum)}
    }

    val path = args.head
    val filePaths = fetchFiles(path)
    filePaths.par.foreach { filePath =>
      val outputs = analyseFileContent(filePath)
      printOutputs(outputs)
      saveOutputs(filePath, outputs)
    }
  }

  private def byteToShort(b: Byte): Short =
    if (b < 0) (b + 256).toShort else b.toShort

  private def toBinaryStr(i: Short, digits: Int = 8): String =
    String.format("%" + digits + "s", i.toBinaryString).replace(' ', '0')

  private def printOutputs(outputs: RDD[String]): Unit = {
    outputs.foreach(println)
  }

  private def saveOutputs(filePath: String, outputs: RDD[String]): Unit = {
    val resultDir = new File("result")
    if (!resultDir.exists()) resultDir.mkdir()

    val resultFile = new File("result/" + getFileNameWithExtension(filePath) + ".output")
    outputs.foreach(line => GoogleFiles.append(line + "\n", resultFile, Charsets.UTF_8))
    GoogleFiles.append(s"統(tǒng)計于:${formatLoggingTime()}\n\n", resultFile, Charsets.UTF_8)
  }

  private def formatLoggingTime(): String = {
    val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    formatter.format(Calendar.getInstance().getTime)
  }

  private def getFileNameWithExtension(filePath: String): String = {
    filePath.substring(filePath.lastIndexOf("/") + 1)
  }

  private def fetchFiles(path: String): List[String] = {
    val fileOrDirectory = new File(path)
    fileOrDirectory.isFile match {
      case true => List(path)
      case false => fileOrDirectory.listFiles().filter(_.isFile).map(_.getPath).toList
    }
  }

  private def formatPercent(number: Double): String = {
    val percent = "%1.2f" format number * 100
    s"${percent}%"
  }

  private def formatOutput(key: Char, count: Long, sum: Double): String = {
    s"字符${key}的數(shù)量為${count}, 占比為${formatPercent(count/sum)}"
  }
}

運行

通過sbt對代碼進(jìn)行編譯、打包后秃殉,生成jar文件坝初。然后在spark主目錄下運行:

$SPARK_HOME/bin/spark-submit --class bigdata.demo.Main --master spark://<ip>  $SPARK_HOME/jars/binaryfilesstastistics_2.11-1.0.jar file:///share/spark-2.2.0-bin-hadoop2.7/derby.log

最后的參數(shù)"file:///share/spark-2.2.0-bin-hadoop2.7/derby.log"就是main函數(shù)接收的參數(shù),即要分析的文件目錄复濒。如果為本地目錄脖卖,需要指定文件協(xié)議file://,如果為HDFS目錄巧颈,則指定協(xié)議hdfs://

遇到的坑

byte類型的值

在Scala中袖扛,Byte類型為8位有符號補碼整數(shù)砸泛。數(shù)值區(qū)間為 -128 到 127十籍。倘若二進(jìn)制值為11111111,通過SparkContext的binaryRecords()方法讀進(jìn)Byte數(shù)據(jù)后唇礁,其值為-1勾栗,而非255。原因就是補碼的緣故盏筐。如果十進(jìn)制為128围俘,轉(zhuǎn)換為Byte類型后,值為-128琢融。

而對于-1界牡,如果執(zhí)行toBinaryString(),則得到的字符串為"11111111111111111111111111111111"漾抬,而非我們期待的"11111111"宿亡。如下圖所示:


執(zhí)行結(jié)果

針對八位的二進(jìn)制數(shù)值,可以編寫一個方法纳令,將Byte類型轉(zhuǎn)為Short類型挽荠,然后再調(diào)用toBinaryString()方法轉(zhuǎn)換為對應(yīng)的二進(jìn)制字符串。

  private def byteToShort(b: Byte): Short =
    if (b < 0) (b + 256).toShort else b.toShort

而對于不足八位的二進(jìn)制數(shù)值平绩,如果直接調(diào)用toBinaryString()方法圈匆,則二進(jìn)制字符串將不到八位∧蟠疲可以利用String的format進(jìn)行格式化:

  private def toBinaryStr(i: Short, digits: Int = 8): String =
    String.format("%" + digits + "s", i.toBinaryString).replace(' ', '0')

當(dāng)然臭脓,可以將這兩個方法定義為Byte與Short的隱式方法。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末腹忽,一起剝皮案震驚了整個濱河市来累,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌窘奏,老刑警劉巖嘹锁,帶你破解...
    沈念sama閱讀 221,198評論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異着裹,居然都是意外死亡领猾,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評論 3 398
  • 文/潘曉璐 我一進(jìn)店門骇扇,熙熙樓的掌柜王于貴愁眉苦臉地迎上來摔竿,“玉大人,你說我怎么就攤上這事少孝〖痰停” “怎么了?”我有些...
    開封第一講書人閱讀 167,643評論 0 360
  • 文/不壞的土叔 我叫張陵稍走,是天一觀的道長袁翁。 經(jīng)常有香客問我柴底,道長,這世上最難降的妖魔是什么粱胜? 我笑而不...
    開封第一講書人閱讀 59,495評論 1 296
  • 正文 為了忘掉前任柄驻,我火速辦了婚禮,結(jié)果婚禮上焙压,老公的妹妹穿的比我還像新娘鸿脓。我一直安慰自己,他們只是感情好涯曲,可當(dāng)我...
    茶點故事閱讀 68,502評論 6 397
  • 文/花漫 我一把揭開白布野哭。 她就那樣靜靜地躺著,像睡著了一般掀抹。 火紅的嫁衣襯著肌膚如雪虐拓。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,156評論 1 308
  • 那天傲武,我揣著相機與錄音蓉驹,去河邊找鬼。 笑死揪利,一個胖子當(dāng)著我的面吹牛态兴,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播疟位,決...
    沈念sama閱讀 40,743評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼瞻润,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了甜刻?” 一聲冷哼從身側(cè)響起绍撞,我...
    開封第一講書人閱讀 39,659評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎得院,沒想到半個月后傻铣,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,200評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡祥绞,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,282評論 3 340
  • 正文 我和宋清朗相戀三年非洲,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蜕径。...
    茶點故事閱讀 40,424評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡两踏,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出兜喻,到底是詐尸還是另有隱情梦染,我是刑警寧澤,帶...
    沈念sama閱讀 36,107評論 5 349
  • 正文 年R本政府宣布虹统,位于F島的核電站弓坞,受9級特大地震影響隧甚,放射性物質(zhì)發(fā)生泄漏车荔。R本人自食惡果不足惜渡冻,卻給世界環(huán)境...
    茶點故事閱讀 41,789評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望忧便。 院中可真熱鬧族吻,春花似錦、人聲如沸珠增。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,264評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蒂教。三九已至巍举,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間凝垛,已是汗流浹背懊悯。 一陣腳步聲響...
    開封第一講書人閱讀 33,390評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留梦皮,地道東北人炭分。 一個月前我還...
    沈念sama閱讀 48,798評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像剑肯,于是被迫代替她去往敵國和親捧毛。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,435評論 2 359

推薦閱讀更多精彩內(nèi)容