全文約2000字膜廊,閱讀大約需要10分鐘
在日常工作中,我們經(jīng)常會(huì)遇到需要處理大量文件并將數(shù)據(jù)存儲(chǔ)至數(shù)據(jù)庫(kù)或整合到一個(gè)文件的需求淫茵。這個(gè)任務(wù)對(duì)于人力和時(shí)間來(lái)說(shuō)都是一大挑戰(zhàn)爪瓜。幸運(yùn)的是,我們有Python這個(gè)神奇的工具匙瘪,可以幫助我們自動(dòng)化這個(gè)任務(wù)铆铆,省時(shí)又省力!現(xiàn)在辆苔,我將向你展示如何使用Python處理Excel文件并將數(shù)據(jù)存儲(chǔ)到PostgreSQL數(shù)據(jù)庫(kù)中算灸。
先打個(gè)底:以理解為主,不夠嚴(yán)謹(jǐn)驻啤,如果看完還是不會(huì),那一定是我講的不夠好荐吵,千萬(wàn)別影響你們探索Python的興趣骑冗。
在我們的奇妙冒險(xiǎn)中,如果你想將多個(gè)excel文件整合到一個(gè)表中先煎,需要滿足一個(gè)前置條件——每個(gè)excel文件的格式和列對(duì)應(yīng)的含義順序必須一致贼涩。但是,如果表頭不一樣也沒(méi)關(guān)系薯蝎,我們可以用程序來(lái)解決這個(gè)問(wèn)題遥倦。本文將帶你進(jìn)入Python的魔法世界,教你如何處理Excel文件并將數(shù)據(jù)存儲(chǔ)到PostgreSQL數(shù)據(jù)庫(kù)中。在開始之前袒哥,我們需要安裝一些神奇的庫(kù):
- pandas:用于處理Excel文件中的數(shù)據(jù)
- sqlalchemy:用于連接和操作PostgreSQL數(shù)據(jù)庫(kù)
安裝方法這里就不再重點(diǎn)講了了缩筛,直接搜網(wǎng)上的教程安裝即可。
1.日志記錄
開局先送送你一串Python日志記錄的代碼堡称,可在任何場(chǎng)景下復(fù)用瞎抛,它能夠?qū)崟r(shí)監(jiān)測(cè)程序的運(yùn)行狀態(tài),輕松解決測(cè)試和問(wèn)題排查的難題却紧。
注意:log_home
需要改為自己本地路徑
# 定義日志記錄器
log_home = '/home/xusl/log/excel' # 請(qǐng)將此路徑改為你自己的本地路徑
log_level = logging.INFO
log_to_console = True
log_config = {
'version': 1,
'formatters': {
'generic': {
'format': '%(asctime)s %(levelname)-5.5s [%(name)s:%(lineno)s][%(threadName)s] %(message)s',
},
'simple': {
'format': '%(asctime)s %(levelname)-5.5s %(message)s',
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'generic',
},
'file': {
'class': 'logging.FileHandler',
'filename': os.path.join(log_home, 'excel_to_data.log'),
'encoding': 'utf-8',
'formatter': 'generic',
},
},
'root': {
'level': log_level,
'handlers': ['console', 'file', ] if log_to_console else ['file', ],
}
}
logging.config.dictConfig(log_config)
logger = logging.getLogger(__name__)
2.數(shù)據(jù)庫(kù)連接
接下來(lái)桐臊,我們需要配置自己的數(shù)據(jù)庫(kù)信息。
# 建立與PostgreSQL數(shù)據(jù)庫(kù)的連接 此處需要更改為自己的數(shù)據(jù)庫(kù)配置
db_user = 'dps'
db_password = 'DPS888'
db_host = '10.12.8.88'
db_port = '5432'
db_name = 'dpstest'
def get_conn():
conn_url = 'postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}'
engine = create_engine(conn_url.format(database=db_name, user=db_user, password=db_password, host=db_host, port=db_port),
pool_size=20,
pool_recycle=7200,
connect_args={'connect_timeout': 30})
try:
with engine.connect():
logger.info('成功連接到數(shù)據(jù)庫(kù)')
except Exception as e:
logger.error('無(wú)法連接到數(shù)據(jù)庫(kù):', str(e))
return engine
3.設(shè)計(jì)及創(chuàng)建表結(jié)構(gòu)
根據(jù)文件內(nèi)容來(lái)設(shè)計(jì)和創(chuàng)建表結(jié)構(gòu)晓殊,當(dāng)然你也可以用中文
# 創(chuàng)建存儲(chǔ)數(shù)據(jù)的表
table_name = 'public.excel_data'
ddl = """
DROP TABLE IF EXISTS public.excel_data;
CREATE TABLE IF NOT EXISTS public.excel_data (
file_nm VARCHAR(255),
cust_nm VARCHAR(255),
cert_no VARCHAR(255),
prod_nm VARCHAR(255),
amt numeric(20,2),
crt_dtm timestamp NOT NULL DEFAULT now() -- 創(chuàng)建時(shí)間
);
"""
4.處理數(shù)據(jù)
思路如下:
提取文件名
讀取Excel文件數(shù)據(jù)并提取前4列
列名重命名
根據(jù)條件過(guò)濾末尾的空行
將數(shù)據(jù)存儲(chǔ)到PostgreSQL表中
處理成功后將Excel文件移動(dòng)到end目錄
重點(diǎn)講下to_sql()
函數(shù):
name:SQL 表名
con:與數(shù)據(jù)庫(kù)鏈接的?式断凶,推薦使?sqlalchemy的engine類型
schema:相應(yīng)數(shù)據(jù)庫(kù)的引擎,不設(shè)置則使?數(shù)據(jù)庫(kù)的默認(rèn)引擎巫俺,如mysql中的innodb引擎
if_exists:當(dāng)數(shù)據(jù)庫(kù)中已經(jīng)存在數(shù)據(jù)表時(shí)對(duì)數(shù)據(jù)表的操作认烁,有replace替換、append追加识藤,fail則當(dāng)表存在時(shí)提?
index:對(duì)DataFrame的index索引的處理砚著,為True時(shí)索引也將作為數(shù)據(jù)寫?數(shù)據(jù)表
index_label:當(dāng)上?個(gè)參數(shù)index為True時(shí),設(shè)置寫?數(shù)據(jù)表時(shí)index的列名稱
chunsize:設(shè)置整數(shù)痴昧,如20000稽穆,?次寫?數(shù)據(jù)時(shí)的數(shù)據(jù)?數(shù)量,當(dāng)數(shù)據(jù)量很?時(shí)赶撰,需要設(shè)置舌镶,否則會(huì)鏈接超時(shí)寫?失敗。
dtype:列名到 SQL 類型的字典豪娜,默認(rèn)無(wú);可選地指定列的數(shù)據(jù)類型
完整代碼如下:
import os
import pandas as pd
import logging.config
import shutil
import datetime
from sqlalchemy import create_engine
_tb_nm = 'excel_to_data'
_tb_nm_cn = "excel數(shù)據(jù)入庫(kù)"
_service_code = _tb_nm
# 日志目錄
log_home = '/home/xusl/log/excel'
# 日志level
log_level = logging.INFO
# 日志打印到控制臺(tái)
log_to_console = True
# 配置日志記錄器
log_config = {
'version': 1,
'formatters': {
'generic': {
'format': '%(asctime)s %(levelname)-5.5s [%(name)s:%(lineno)s][%(threadName)s] %(message)s',
},
'simple': {
'format': '%(asctime)s %(levelname)-5.5s %(message)s',
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'generic',
},
'file': {
'class': 'logging.FileHandler',
'filename': os.path.join(log_home, _tb_nm + '.log'),
'encoding': 'utf-8',
'formatter': 'generic',
},
},
'root': {
'level': log_level,
'handlers': ['console', 'file', ] if log_to_console else ['file', ],
}
}
logging.config.dictConfig(log_config)
logger = logging.getLogger(_tb_nm)
# 建立與PostgreSQL數(shù)據(jù)庫(kù)的連接 39數(shù)據(jù)庫(kù)
db_user = 'dps'
db_password = 'DPS888'
db_host = '10.12.8.88'
db_port = '5432'
db_name = 'dpstest'
def get_conn():
conn_url = 'postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}'
engine = create_engine(conn_url.format(database=db_name, user=db_user, password=db_password, host=db_host, port=db_port),
pool_size=20,
pool_recycle=7200,
connect_args={'connect_timeout': 30})
try:
with engine.connect():
print('成功連接到數(shù)據(jù)庫(kù)')
except Exception as e:
print('無(wú)法連接到數(shù)據(jù)庫(kù):', str(e))
return engine
# engine = create_engine(f'postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')
# 創(chuàng)建存儲(chǔ)數(shù)據(jù)的表
table_name = 'public.excel_data'
ddl = """
DROP TABLE IF EXISTS public.excel_data;
CREATE TABLE IF NOT EXISTS public.excel_data (
file_nm VARCHAR(255),
cust_nm VARCHAR(255),
cert_no VARCHAR(255),
prod_nm VARCHAR(255),
amt numeric(20,2),
crt_dtm timestamp NOT NULL DEFAULT now() -- 創(chuàng)建時(shí)間
);
"""
# 遍歷指定目錄下的所有Excel文件
excel_dir = '/home/xusl/data'
src_excel = '/home/xusl/data/src'
end_excel = '/home/xusl/data/end'
src_dir = 'src'
end_dir = 'end'
def deal(conn):
for filename in os.listdir(src_excel):
if not filename.endswith('.xlsx'):
logging.info('沒(méi)有excel文件餐胀!')
continue
else:
logging.info('')
logging.info('')
excel_file = os.path.join(src_excel, filename)
# 提取文件名
file_nm = os.path.basename(excel_file)
func_name = file_nm
logging.info('start %s' % func_name)
logging.info(f'Reading data from {excel_file}')
d0 = datetime.datetime.now()
# 讀取Excel文件數(shù)據(jù)并提取前4列
try:
df = pd.read_excel(excel_file, usecols=[0, 1, 2, 3])
logging.info('df讀取內(nèi)容:%s ' % df)
except Exception as e:
logging.error(f'Error reading file {excel_file}: {str(e)}')
continue
# 修改列名
df.columns = ['cust_nm', 'cert_no', 'prod_nm', 'amt']
logging.info('df修改后內(nèi)容:%s ' % df)
# 根據(jù)條件過(guò)濾末尾的空行
if not df.empty and df.iloc[-1].isnull().all():
df = df[:-1]
logging.debug('df刪減末尾后:%s ' % df)
# 將數(shù)據(jù)存儲(chǔ)到PostgreSQL表中
df['file_nm'] = file_nm
df = df[['file_nm', 'cust_nm', 'cert_no', 'prod_nm', 'amt']]
try:
# 將整個(gè)DF導(dǎo)入數(shù)據(jù)庫(kù)中
df.to_sql(name='excel_data', schema='public', con=conn, if_exists="append", index=False)
d1 = datetime.datetime.now()
s = (d1 - d0).total_seconds()
logging.info('... end %s, 耗時(shí): %s 秒. ' % (func_name, s))
except Exception as e:
logging.error(f'Error inserting data from file {excel_file}: {str(e)}')
continue
# 處理成功后將Excel文件移動(dòng)到end目錄
src_file = os.path.join(src_excel, filename)
end_file = os.path.join(end_excel, filename)
try:
shutil.move(src_file, end_file)
except Exception as e:
logging.error(f'Error moving file {src_file} to {end_file}: {str(e)}')
# 關(guān)閉數(shù)據(jù)庫(kù)連接
# engine.dispose()
if __name__ == '__main__':
engine = get_conn()
deal(engine)