Python作為一門程序設(shè)計(jì)語言身弊,在易讀熄捍、易維護(hù)方面有獨(dú)特優(yōu)勢(shì)雾狈,越來越多的人使用 Python 進(jìn)行數(shù)據(jù)分析和處理廓潜,而 Pandas 正是為了解決數(shù)據(jù)分析任務(wù)而創(chuàng)建的,其包含大量能便捷處理數(shù)據(jù)的函數(shù)和方法善榛,使得數(shù)據(jù)處理變得容易辩蛋,它也是使 Python 成為強(qiáng)大而高效的數(shù)據(jù)分析環(huán)境的重要因素之一。
但是 Pandas 是個(gè)內(nèi)存的類庫移盆,用于處理小數(shù)據(jù)(能放入內(nèi)存)沒問題悼院,對(duì)于大數(shù)據(jù)(內(nèi)存放不下)就沒有那么方便了。而我們平時(shí)工作中卻能經(jīng)常碰到這種較大的文件(從數(shù)據(jù)庫或網(wǎng)站下載出來的數(shù)據(jù))味滞,Pandas 無能為力樱蛤,我們就只能自己想辦法,本文就來討論這個(gè)問題剑鞍。
本文所說的大數(shù)據(jù),并不是那種 TB爽醋、PB 級(jí)別的需要分布式處理的大數(shù)據(jù)蚁署,而是指普通 PC 機(jī)內(nèi)存放不下,但可以存在硬盤內(nèi)的 GB 級(jí)別的文件數(shù)據(jù)蚂四,這也是很常見的情況光戈。
由于此類文件不可以一次性讀入內(nèi)存哪痰,所以在數(shù)據(jù)處理的時(shí)候,通常需要采用逐行或者分塊讀取的方式進(jìn)行處理久妆,雖然 Python 和 pandas 在讀取文件時(shí)支持這種方式晌杰,但因?yàn)闆]有游標(biāo)系統(tǒng),使得一些函數(shù)和方法需要分段使用或者函數(shù)和方法本身都需要自己寫代碼來完成筷弦,下面我們就最常見的幾類問題來進(jìn)行介紹肋演,并寫出代碼示例供讀者參考和感受。
一烂琴、??? 聚合
簡單聚合只要遍歷一遍數(shù)據(jù)爹殊,按照聚合目標(biāo)將聚合列計(jì)算一遍即可。如:求和(sum)奸绷,遍歷數(shù)據(jù)時(shí)對(duì)讀取的數(shù)據(jù)進(jìn)行累加梗夸;計(jì)數(shù)(count),遍歷數(shù)據(jù)時(shí)号醉,記錄遍歷數(shù)即可反症;平均(mean),遍歷時(shí)同時(shí)記錄累計(jì)和和遍歷數(shù)畔派,最后相除即可铅碍。這里以求和問題為例進(jìn)行介紹。
設(shè)有如下文件父虑,數(shù)據(jù)片段如下:
現(xiàn)在需要計(jì)算銷售總額(amount 列)
(一)逐行讀取
total=0
with open("orders.txt",'r') ? as f:? ? ? ? ? ? ? ? ? ? ? ? ? ?打開文件
??? ? line=f.readline()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??標(biāo)題行
??? ? while True:
??????? line = f.readline()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?逐行讀入
??????? if not line:? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?讀不到內(nèi)容時(shí)結(jié)束
??????????? break
??????? total += ? float(line.split("\t")[4])? ? ? ? ? ? ? ? ? ??累加
print(total)
(二)pandas分塊讀取
使用 pandas 可以分塊讀取了该酗,工作邏輯結(jié)構(gòu)如下圖:
import pandas as pd
chunk_data = ? pd.read_csv("orders.txt",sep="\t",chunksize=100000)? ? ? ?/分段讀取文件,每段 10 萬行
total=0
for chunk in chunk_data:
??? ? total+=chunk['amount'].sum()? ? ? ? ? ? ? ? /累加各段的銷售額? ? ? ? ? ? ? ? ? ? ? ??
print(total)
pandas更擅長以大段讀取的方式進(jìn)行計(jì)算士嚎,理論上 chunksize 越大呜魄,計(jì)算速度越快,但要注意內(nèi)存的限制莱衩。如果 chunksize 設(shè)置成 1爵嗅,就成了逐行讀取,速度會(huì)非常非常慢笨蚁,因此不建議使用 pandas 逐行讀取文件來完成此類任務(wù)睹晒。
二、??? 過濾
過濾流程圖:
過濾和聚合差不多括细,將大文件分成 n 段伪很,對(duì)各段進(jìn)行過濾,最后將每一段的結(jié)果進(jìn)行合并即可奋单。
繼續(xù)以上面數(shù)據(jù)為例锉试,過濾出紐約州的銷售信息
(一)小結(jié)果集
(二)大結(jié)果集
大文件聚合和過濾運(yùn)算的邏輯相對(duì)簡單,但因?yàn)?Python 沒有直接提供游標(biāo)數(shù)據(jù)類型览濒,代碼也要寫很多行呆盖。
三拖云、??? 排序
排序流程圖:
排序要麻煩得多,如上圖所示:
1.? 分段讀取數(shù)據(jù)应又;
2.? 對(duì)每一段進(jìn)行排序宙项;
3.? 將每一段的排序結(jié)果寫出至臨時(shí)文件;
4.? 維護(hù)一個(gè) k 個(gè)元素的列表(k 等于分段數(shù))株扛,每個(gè)臨時(shí)文件將一行數(shù)據(jù)放入該列表尤筐;
5.? 將列表中的記錄的按排序的字段的排序 (與第二步的排序方式相同,升序都升序席里,降序都降序)叔磷;
6.? 將列表的最小或最大記錄寫出至結(jié)果文件 (升序時(shí)最小,降序時(shí)最大)奖磁;
7.? 從寫出記錄的臨時(shí)文件中再讀取一行放入列表改基;
8.? 重復(fù) 6.7 步,直至所有記錄寫出至結(jié)果文件咖为。
繼續(xù)以上面數(shù)據(jù)為例秕狰,用 Python 寫一段完整的外存排序算法,將文件中的數(shù)據(jù)按訂單金額升序排序
import pandas as pd
import os
import time
import shutil
import uuid
import traceback
def parse_type(s):
??? ? if s.isdigit():
??????? return int(s)
??? ? try:
??????? res = float(s)
??????? return res
??? ? except:
??????? return s
??? ?
def pos_by(by,head,sep):
??? ? by_num = 0
??? ? for col in head.split(sep):
??????? if col.strip()==by:
??????????? break
??????? else:
??????????? by_num+=1
??? ? return by_num
?
def ? merge_sort(directory,ofile,by,ascending=True,sep=","):
??? ?
with open(ofile,'w') as ? outfile:
???????
??????? file_list = os.listdir(directory)
???????
??????? file_chunk = [open(directory+"/"+file,'r') ? for file in file_list]
??????? k_row = [file_chunk[i].readline()for ? i in range(len(file_chunk))]
??????? by = pos_by(by,k_row[0],sep)
???????
??????? outfile.write(k_row[0])
??? ? k_row = [file_chunk[i].readline()for i in range(len(file_chunk))]
k_by = ? [parse_type(k_row[i].split(sep)[by].strip())for i in range(len(file_chunk))]
?
with open(ofile,'a') as ? outfile:
???????
??????? while True:
??????????? for i in range(len(k_by)):
??????????????? if i >= len(k_by):
??????????????????? break
???????????????
??????????????? sorted_k_by = sorted(k_by) if ? ascending else sorted(k_by,reverse=True)
??????????????? if k_by[i] == sorted_k_by[0]:
??????????????????? outfile.write(k_row[i])
??????????????????? k_row[i] = file_chunk[i].readline()
??????????????????? if not k_row[i]:
??????????????????????? file_chunk[i].close()
??????????????????????? del(file_chunk[i])
??????????????????????? del(k_row[i])
??????????????????????? del(k_by[i])
??????????????????? else:
??????????????????????? k_by[i] = ? parse_type(k_row[i].split(sep)[by].strip())
??????????? if len(k_by)==0:
??????????????? break? ??
def ? external_sort(file_path,by,ofile,tmp_dir,ascending=True,chunksize=50000,sep=',',usecols=None,index_col=None):
os.makedirs(tmp_dir,exist_ok=True)?
??? ? try:
??????? data_chunk = ? pd.read_csv(file_path,sep=sep,usecols=usecols,index_col=index_col,chunksize=chunksize)
??????? for chunk in data_chunk:
??????????? chunk = ? chunk.sort_values(by,ascending=ascending)
??????????? ? chunk.to_csv(tmp_dir+"/"+"chunk"+str(int(time.time()*10**7))+str(uuid.uuid4())+".csv",index=None,sep=sep)
??????? ? merge_sort(tmp_dir,ofile=ofile,by=by,ascending=ascending,sep=sep)
??? ? except Exception:
??????? print(traceback.format_exc())
??? ? finally:
??????? shutil.rmtree(tmp_dir, ? ignore_errors=True)?
if __name__ == "__main__":
??? ? infile = "D:/python_question_data/orders.txt"
??? ? ofile = "D:/python_question_data/extra_sort_res_py.txt"
??? ? tmp = "D:/python_question_data/tmp"
??? ? external_sort(infile,'amount',ofile,tmp,ascending=True,chunksize=1000000,sep='\t')
這里是用逐行歸并寫出的方式完成外存排序的躁染,由于 pandas 逐行讀取的方式效率非常低鸣哀,所以沒有借助 pandas 完成逐行歸并排序。讀者感興趣的話可以嘗試使用 pandas 按塊歸并吞彤,比較下兩者的效率我衬。
相比于聚合和過濾,這個(gè)代碼相當(dāng)復(fù)雜了饰恕,對(duì)于很多非專業(yè)程序員來講已經(jīng)是不太可能實(shí)現(xiàn)的任務(wù)了挠羔,而且它的運(yùn)算效率也不高。
以上代碼也僅處理了規(guī)范的結(jié)構(gòu)化文件和單列排序埋嵌。如果文件結(jié)構(gòu)不規(guī)范比如不帶表頭破加、各行的分隔符數(shù)量不同、排序列是不規(guī)范的日期格式或者按照多列排序等等情況雹嗦,代碼還會(huì)進(jìn)一步復(fù)雜化范舀。
四、??? 分組
大文件的分組匯總也很麻煩了罪,一個(gè)容易想到的辦法是先將文件按分組列排序锭环,然后再遍歷有序文件,如果分組列值和前一行相同則匯總在同一組內(nèi)泊藕,和前一行不同則新建一組繼續(xù)匯總田藐。如果結(jié)果集過大,還要看情況把計(jì)算好的分組結(jié)果及時(shí)寫出吱七。
這個(gè)算法相對(duì)簡單汽久,但性能很差,需要經(jīng)過大排序的過程踊餐。一般數(shù)據(jù)庫會(huì)使用 Hash 分組的方案景醇,能夠有效地提高速度,但代碼復(fù)雜度要高出幾倍吝岭。普通非專業(yè)人員基本上沒有可能寫出來了三痰。這里也就不再列出代碼了。
通過以上介紹窜管,我們知道散劫,Python 處理大文件還是非常費(fèi)勁的,這主要是因?yàn)樗鼪]有提供為大數(shù)據(jù)服務(wù)的游標(biāo)類型及相關(guān)運(yùn)算幕帆,只能自己寫代碼获搏,不僅繁瑣而且運(yùn)算效率低。
Python不方便失乾,那么還有什么工具適合非專業(yè)程序員來處理大文件呢常熙?
esProc SPL在這方面要要比 Python 方便得多,SPL 是專業(yè)的結(jié)構(gòu)化數(shù)據(jù)處理語言碱茁,提供了比 pandas 更豐富的運(yùn)算裸卫,內(nèi)置有游標(biāo)數(shù)據(jù)類型,解決大文件的運(yùn)算就非常簡單纽竣。比如上面這些例子都可以很容易完成墓贿。
一、??? 聚合
二蜓氨、??? 過濾
三聋袋、??? 排序
四、??? 分組
特別指出语盈,SPL 的分組匯總就是采用前面說過的數(shù)據(jù)庫中常用的 HASH 算法舱馅,效率很高。
SPL中還內(nèi)置了并行計(jì)算刀荒,現(xiàn)在多核 CPU 很常見代嗤,使用并行計(jì)算可以大幅度提高性能,比如分組匯總缠借,只多加一個(gè) @m 就可以變成并行計(jì)算干毅。
而 Python 寫并行計(jì)算的程序就太困難了,網(wǎng)上說啥的都有泼返,就是找不到一個(gè)簡單的辦法硝逢。