python多線程多進程讀取大文件

支持python2.7 3.5 3.6, 運用multiprocessing模塊的Pool 異步進程池言缤,分段讀取文件(文件編碼由chardet自動判斷钉汗,需pip install chardet)碳蛋,并統(tǒng)計詞頻胚膊,代碼如下:

# wordcounter.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function, division, unicode_literals
import sys, re, time, os
import operator
from collections import Counter
from functools import reduce
from multiprocessing import Pool, cpu_count
from datetime import datetime
from utils import humansize, humantime, processbar

def wrap(wcounter,  fn, p1, p2, f_size):
    return wcounter.count_multi(fn, p1, p2, f_size)
    
class WordCounter(object):
    def __init__(self, from_file, to_file=None, workers=None, coding=None,
                    max_direct_read_size=10000000):
        '''根據(jù)設定的進程數(shù)豺撑,把文件from_file分割成大小基本相同娃循,數(shù)量等同與進程數(shù)的文件段,
        來讀取并統(tǒng)計詞頻窖剑,然后把結果寫入to_file中坚洽,當其為None時直接打印在終端或命令行上。
        Args:
        @from_file 要讀取的文件
        @to_file 結果要寫入的文件
        @workers 進程數(shù)西土,為0時直接把文件一次性讀入內存讶舰;為1時按for line in open(xxx)
                讀取需了;>=2時為多進程分段讀忍纭;默認為根據(jù)文件大小選擇0或cpu數(shù)量的64倍
        @coding 文件的編碼方式肋乍,默認為采用chardet模塊讀取前1萬個字符才自動判斷
        @max_direct_read_size 直接讀取的最大值鹅颊,默認為10000000(約10M)
        
        How to use:
        w = WordCounter('a.txt', 'b.txt')
        w.run()        
        '''
        if not os.path.isfile(from_file):
            raise Exception('No such file: 文件不存在')
        self.f1 = from_file
        self.filesize = os.path.getsize(from_file)
        self.f2 = to_file
        if workers is None:
            if self.filesize < int(max_direct_read_size):
                self.workers = 0
            else:
                self.workers = cpu_count() * 64 
        else:
            self.workers = int(workers)
        if coding is None:
            try:
                import chardet
            except ImportError:
                os.system('pip install chardet')
                print('-'*70)
                import chardet
            with open(from_file, 'rb') as f:    
                coding = chardet.detect(f.read(10000))['encoding']            
        self.coding = coding
        self._c = Counter()
        
    def run(self):
        start = time.time()
        if self.workers == 0:
            self.count_direct(self.f1)
        elif self.workers == 1:
            self.count_single(self.f1, self.filesize)
        else:
            pool = Pool(self.workers)
            res_list = []
            for i in range(self.workers):
                p1 = self.filesize * i // self.workers 
                p2 = self.filesize * (i+1) // self.workers 
                args = [self, self.f1, p1, p2, self.filesize]
                res = pool.apply_async(func=wrap, args=args)
                res_list.append(res)
            pool.close()
            pool.join()
            self._c.update(reduce(operator.add, [r.get() for r in res_list]))            
        if self.f2:
            with open(self.f2, 'wb') as f:
                f.write(self.result.encode(self.coding))
        else:
            print(self.result)
        cost = '{:.1f}'.format(time.time()-start)
        size = humansize(self.filesize)
        tip = '\nFile size: {}. Workers: {}. Cost time: {} seconds'     
        print(tip.format(size, self.workers, cost))
        self.cost = cost + 's'
                
    def count_single(self, from_file, f_size):
        '''單進程讀取文件并統(tǒng)計詞頻'''
        start = time.time()
        with open(from_file, 'rb') as f:
            for line in f:
                self._c.update(self.parse(line))
                processbar(f.tell(), f_size, from_file, f_size, start)   

    def count_direct(self, from_file):
        '''直接把文件內容全部讀進內存并統(tǒng)計詞頻'''
        start = time.time()
        with open(from_file, 'rb') as f:
            line = f.read()
        self._c.update(self.parse(line))  
                
    def count_multi(self, fn, p1, p2, f_size):  
        c = Counter()
        with open(fn, 'rb') as f:    
            if p1:  # 為防止字被截斷的,分段處所在行不處理墓造,從下一行開始正式處理
                f.seek(p1-1)
                while b'\n' not in f.read(1):
                    pass
            start = time.time()
            while 1:                           
                line = f.readline()
                c.update(self.parse(line))   
                pos = f.tell()  
                if p1 == 0: #顯示進度
                    processbar(pos, p2, fn, f_size, start)
                if pos >= p2:               
                    return c      
                    
    def parse(self, line):  #解析讀取的文件流
        return Counter(re.sub(r'\s+','',line.decode(self.coding)))
        
    def flush(self):  #清空統(tǒng)計結果
        self._c = Counter()

    @property
    def counter(self):  #返回統(tǒng)計結果的Counter類       
        return self._c
                    
    @property
    def result(self):  #返回統(tǒng)計結果的字符串型式堪伍,等同于要寫入結果文件的內容
        ss = ['{}: {}'.format(i, j) for i, j in self._c.most_common()]
        return '\n'.join(ss)
        
def main():
    if len(sys.argv) < 2:
        print('Usage: python wordcounter.py from_file to_file')
        exit(1)
    from_file, to_file = sys.argv[1:3]
    args = {'coding' : None, 'workers': None, 'max_direct_read_size':10000000}
    for i in sys.argv:
        for k in args:
            if re.search(r'{}=(.+)'.format(k), i):
                args[k] = re.findall(r'{}=(.+)'.format(k), i)[0]

    w = WordCounter(from_file, to_file, **args)
    w.run()
    
if __name__ == '__main__':
    main()        
# utils.py
#coding=utf-8
from __future__ import print_function, division, unicode_literals
import os
import time

def humansize(size):
    """將文件的大小轉成帶單位的形式
    >>> humansize(1024) == '1 KB'
    True
    >>> humansize(1000) == '1000 B'
    True
    >>> humansize(1024*1024) == '1 M'
    True
    >>> humansize(1024*1024*1024*2) == '2 G'
    True
    """
    units = ['B', 'KB', 'M', 'G', 'T']    
    for unit in units:
        if size < 1024:
            break
        size = size // 1024
    return '{} {}'.format(size, unit)

def humantime(seconds):
    """將秒數(shù)轉成00:00:00的形式
    >>> humantime(3600) == '01:00:00'
    True
    >>> humantime(360) == '06:00'
    True
    >>> humantime(6) == '00:06'
    True
    """
    h = m = ''
    seconds = int(seconds)
    if seconds >= 3600:
        h = '{:02}:'.format(seconds // 3600)
        seconds = seconds % 3600
    if 1 or seconds >= 60:
        m = '{:02}:'.format(seconds // 60)
        seconds = seconds % 60
    return '{}{}{:02}'.format(h, m, seconds)
        
def processbar(pos, p2, fn, f_size, start):
    '''打印進度條
    just like:
    a.txt, 50.00% [=====     ] 1/2 [00:01<00:01]
    '''
    percent = min(pos * 10000 // p2, 10000)
    done = '=' * (percent//1000)
    half = '-' if percent // 100 % 10 > 5 else ''
    tobe = ' ' * (10 - percent//1000 - len(half))
    tip = '{}{}, '.format('\33[?25l', os.path.basename(fn))  #隱藏光標          
    past = time.time()-start
    remain = past/(percent+0.01)*(10000-percent)      
    print('\r{}{:.1f}% [{}{}{}] {:,}/{:,} [{}<{}]'.format(tip, 
            percent/100, done, half, tobe, 
            min(pos*int(f_size//p2+0.5), f_size), f_size, 
            humantime(past), humantime(remain)),
        end='')
    if percent == 10000:
        print('\33[?25h', end='')     # 顯示光標  

if __name__ == '__main__':
    import doctest
    doctest.testmod()

github地址:https://github.com/waketzheng/wordcounter
可以直接:

git clone https://github.com/waketzheng/wordcounter
  • 運行結果:
[willie@localhost linuxtools]$ python wordcounter.py test/var/20000thousandlines.txt tmp2.txt 
20000thousandlines.txt, 100.0% [==========] 115,000,000/115,000,000 [06:57<00:00]
File size: 109 M. Workers: 128. Cost time: 417.8 seconds
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市觅闽,隨后出現(xiàn)的幾起案子帝雇,更是在濱河造成了極大的恐慌,老刑警劉巖蛉拙,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件尸闸,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機室叉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進店門睹栖,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人茧痕,你說我怎么就攤上這事∧粘” “怎么了踪旷?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長豁辉。 經(jīng)常有香客問我令野,道長,這世上最難降的妖魔是什么徽级? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任气破,我火速辦了婚禮,結果婚禮上餐抢,老公的妹妹穿的比我還像新娘现使。我一直安慰自己,他們只是感情好旷痕,可當我...
    茶點故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布碳锈。 她就那樣靜靜地躺著,像睡著了一般欺抗。 火紅的嫁衣襯著肌膚如雪售碳。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天绞呈,我揣著相機與錄音贸人,去河邊找鬼。 笑死佃声,一個胖子當著我的面吹牛艺智,可吹牛的內容都是我干的。 我是一名探鬼主播秉溉,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼力惯,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了?” 一聲冷哼從身側響起迎瞧,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤肉迫,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后甲喝,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡铛只,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年埠胖,在試婚紗的時候發(fā)現(xiàn)自己被綠了糠溜。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,561評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡直撤,死狀恐怖非竿,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情谋竖,我是刑警寧澤红柱,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站蓖乘,受9級特大地震影響锤悄,放射性物質發(fā)生泄漏。R本人自食惡果不足惜嘉抒,卻給世界環(huán)境...
    茶點故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一零聚、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧些侍,春花似錦隶症、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至狈定,卻和暖如春颂龙,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背纽什。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工措嵌, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人芦缰。 一個月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓企巢,卻偏偏與公主長得像,于是被迫代替她去往敵國和親让蕾。 傳聞我的和親對象是個殘疾皇子浪规,可洞房花燭夜當晚...
    茶點故事閱讀 45,573評論 2 359

推薦閱讀更多精彩內容

  • # Python 資源大全中文版 我想很多程序員應該記得 GitHub 上有一個 Awesome - XXX 系列...
    aimaile閱讀 26,502評論 6 427
  • 可以看我的博客 lmwen.top 或者訂閱我的公眾號 簡介有稍微接觸python的人就會知道,python中...
    ayuLiao閱讀 3,124評論 1 5
  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理探孝,服務發(fā)現(xiàn)笋婿,斷路器,智...
    卡卡羅2017閱讀 134,702評論 18 139
  • python學習筆記 聲明:學習筆記主要是根據(jù)廖雪峰官方網(wǎng)站python學習學習的顿颅,另外根據(jù)自己平時的積累進行修正...
    renyangfar閱讀 3,050評論 0 10
  • 我有一個朋友斩跌,男,父親是某銀行的行長捞慌,母親是公務員耀鸦,家境殷實,從小富養(yǎng)啸澡。 高中的時候揭糕,他到了叛逆期,成績一落千丈锻霎,...
    發(fā)財秘術閱讀 482評論 1 1