使用: 如果想監(jiān)控目錄闷叉,做相應邏輯處理(不想大費周章),相信這會是你很好的選擇
**需求: 實現(xiàn)linux 目錄監(jiān)控,將新增的文件放入處理引擎中執(zhí)行, python 3以上 **
注意事項:
由于文件上傳到指定目錄時卫漫,會觸發(fā)多重監(jiān)控事件帅刀,需要做邏輯處理
watchdog 簡單demo 使用
# -*- coding: utf-8 -*-
# 作用: 用于監(jiān)控目錄的變化,調(diào)用對應的處理邏輯
from watchdog.observers import Observer
from watchdog.events import *
import time
import sys
import os
import logging
import zipfile
# 設置系統(tǒng)編碼格式
reload(sys)
sys.setdefaultencoding('utf8')
# 創(chuàng)建一個logger让腹,設置日志
logger = logging.getLogger('MonitorDir')
logger.setLevel(logging.DEBUG)
# 創(chuàng)建一個handler,用于寫入日志文件
fh = logging.FileHandler('E:/testLog.log')
fh.setLevel(logging.DEBUG)
# 再創(chuàng)建一個handler扣溺,用于輸出到控制臺
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# 定義handler的輸出格式
formatter = logging.Formatter(
'[%(asctime)s] [%(thread)d] [%(filename)s] [line: %(lineno)d][%(levelname)s] ## %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# 給logger添加handler
logger.addHandler(fh)
logger.addHandler(ch)
class FileEventHandler(FileSystemEventHandler):
def __init__(self):
FileSystemEventHandler.__init__(self)
def on_moved(self, event):
if event.is_directory:
logger.info("directory moved from {0} to {1}".format(event.src_path, event.dest_path))
else:
logger.info("file moved from {0} to {1}".format(event.src_path, event.dest_path))
def on_created(self, event):
if event.is_directory:
logger.info("directory created:{0}".format(event.src_path))
else:
logger.info("file created:{0}".format(event.src_path))
def on_deleted(self, event):
if event.is_directory:
logger.info("directory deleted:{0}".format(event.src_path))
else:
logger.info("file deleted:{0}".format(event.src_path))
# 主要監(jiān)控目錄下有文件修改
def on_modified(self, event):
# 監(jiān)控目錄下面的目錄
if event.is_directory:
logger.info("directory modified:{0}".format(event.src_path))
else:
logger.info("file modified:{0}".format(event.src_path))
if __name__ == "__main__":
observer = Observer()
event_handler = FileEventHandler()
# 監(jiān)控目錄
observer.schedule(event_handler, "E:\TestMonitor", True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
通過上面的學習骇窍,想必已經(jīng)對watchdog 有個簡單的了解,下面進入正題锥余,直接上實際使用中的代碼腹纳,也會講解使用中用到的一些問題,多的不說,正式上代碼開始
# -*- coding: utf-8 -*-
# 作用: 用于監(jiān)控目錄的變化,調(diào)用對應的處理邏輯
from watchdog.observers import Observer
from watchdog.events import *
import time
import sys
import os
import logging
import zipfile
import DiffPlatDeal
import json
import requests
# 設置系統(tǒng)編碼格式
# reload(sys)
# sys.setdefaultencoding('utf8')
# 創(chuàng)建一個logger
logger = logging.getLogger('MonitorDir')
logger.setLevel(logging.DEBUG)
# 創(chuàng)建一個handler,用于寫入日志文件
logTime = time.strftime('%Y%m%d', time.localtime(time.time()))
fh = logging.FileHandler('/home/liuqing/logData/MonitorDirLog_' + logTime + '.log')
fh.setLevel(logging.DEBUG)
# 再創(chuàng)建一個handler只估,用于輸出到控制臺
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# 定義handler的輸出格式
formatter = logging.Formatter(
'[%(asctime)s] [%(thread)d] [%(filename)s] [line: %(lineno)d][%(levelname)s] ## %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# 給logger添加handler
logger.addHandler(fh)
logger.addHandler(ch)
class FileEventHandler(FileSystemEventHandler):
def __init__(self):
FileSystemEventHandler.__init__(self)
def on_moved(self, event):
if event.is_directory:
logger.info("directory moved from {0} to {1}".format(event.src_path, event.dest_path))
else:
logger.info("file moved from {0} to {1}".format(event.src_path, event.dest_path))
def on_created(self, event):
if event.is_directory:
logger.info("directory created:{0}".format(event.src_path))
else:
logger.info("file created:{0}".format(event.src_path))
def on_deleted(self, event):
if event.is_directory:
logger.info("directory deleted:{0}".format(event.src_path))
else:
logger.info("file deleted:{0}".format(event.src_path))
# 主要監(jiān)控目錄下有文件修改
def on_modified(self, event):
sourcePath = event.src_path
global lastFile
global n
# 監(jiān)控目錄下面的目錄
if event.is_directory:
logger.info("directory modified:{0}".format(sourcePath))
else:
size = os.path.getsize(sourcePath)
if size == 0:
logger.info('文件是空的')
else:
# 用于釘釘監(jiān)控
url = "xxxxxxxx"
header = {
"Content-Type": "application/json",
"charset": "utf-8"
}
try:
filename, type = os.path.splitext(sourcePath)
# 處理臨時文件以及壓縮文件
if 'tmp' in type:
logger.info('文件是臨時文件志群,不做處理')
elif 'zip' in type:
logger.info('需要先解壓文件')
f = zipfile.ZipFile(sourcePath, 'r')
f.extractall(filename + '/zipData')
elif 'swp' in type:
logger.info('文件是swp臨時文件,不做處理')
elif lastFile == sourcePath:
logger.info('處理文件重復的問題:{0}'.format(sourcePath))
else:
# 在這里面調(diào)用處理邏輯
nowTime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
if type != ".txt":
lastFile = sourcePath
logger.info("file modified:{0}".format(sourcePath))
listFile.add(sourcePath)
logger.info("list:{0}".format(listFile))
# 時間處理蛔钙,20s 執(zhí)行一次或則list 文件中數(shù)量大于4個直接處理
if int(n) > 20 and len(listFile) == 0:
n = 0
if int(n) > 20 and len(listFile) > 0 and len(listFile) < 4:
logger.info("n:{0};listFile:{1}".format(n, listFile))
# 調(diào)用處理的方法
DiffPlatDeal.xxxx(logger, listFile)
sendList = []
for file in listFile:
sendList.append(os.path.basename(file))
dataSucee1 = {
"msgtype": "text",
"text": {
"content": "【Success: 處理成功】:【"+nowTime+"】\n"+ "; ####\n".join(sendList)+"\n 【總條數(shù):】"+str(len(sendList))
}
}
sendData1 = json.dumps(dataSucee1).encode("utf-8")
requests.post(url=url, data=sendData1, headers=header)
n = 0
listFile.clear()
# 當list中的值大于等于4個后在處理
if len(listFile) >= 4:
# 調(diào)用處理邏輯然后清空list
DiffPlatDeal.xxxxxx(logger, listFile)
# 清空list
sendList = []
for file in listFile:
sendList.append(os.path.basename(file))
dataSucee = {
"msgtype": "text",
"text": {
"content": "【Success: 處理成功】:【"+nowTime+"】\n"+ "; ####\n".join(sendList)+"\n 【總條數(shù):】"+str(len(sendList))
}
}
sendData = json.dumps(dataSucee).encode("utf-8")
requests.post(url=url, data=sendData, headers=header)
listFile.clear()
except Exception as e:
logger.error(e)
time.sleep(5)
# 讀取文件锌云,獲取異常文件名發(fā)出通知
with open("/xxxxx/py/readExcelList/list.txt", 'r') as f: # 打開文件
lines = f.readlines() # 讀取所有行
last_line = lines[-1].strip() # 取最后一行
# 處理異常
if len(last_line) != 0:
if len(last_line) != 0 and len(last_line.split("::")) == 2:
if "," not in last_line.split("::")[1]:
logger.error('當前批次文件全部有問題')
sendList = []
for file in listFile:
sendList.append(os.path.basename(file))
data = {
"msgtype": "text",
"text": {
"content": "【Fail: 文件沒有處理】:【"+nowTime+"】\n"+ "; ####\n".join(sendList)+"\n【錯誤原因】:\n"+str(e)+"\n 【總條數(shù):】"+str(len(sendList))
}
}
sendData = json.dumps(data).encode("utf-8")
requests.post(url=url, data=sendData, headers=header)
else:
spline = last_line.split("::")[1].split(",")
logger.info('Except當前批次處理的文件:{0}'.format(spline))
tmpset = listFile - set(spline)
sendList = []
for file in tmpset:
sendList.append(os.path.basename(file))
data = {
"msgtype": "text",
"text": {
"content": "【Fail: 文件沒有處理】:【"+nowTime+"】\n"+ "; ####\n".join(sendList)+"\n【錯誤原因】:\n"+str(e)+"\n 【總條數(shù):】"+str(len(sendList))
}
}
sendData = json.dumps(data).encode("utf-8")
requests.post(url=url, data=sendData, headers=header)
# 當文件異常時寫入換行符
with open('/xxxxx/readExcelList/list.txt', 'a') as f:
f.write("\n")
listFile.clear()
if __name__ == "__main__":
# 定義一個集合,將一段時間內(nèi)的目錄放入這個集合中:
listFile = set()
# 定義一個變量吁脱,如果變量達到某個值就執(zhí)行處理操作
n = 0
# 用于獲取上次監(jiān)控的文件
lastFile = ""
observer = Observer()
event_handler = FileEventHandler()
observer.schedule(event_handler, "/xxxxxx/importData", True)
observer.start()
try:
while True:
time.sleep(1)
n += 1
# 為了觸發(fā) def on_modified(self, event) 操作
if n == 25:
with open('/xxxxxxx/importData/test.txt', 'w+') as f:
f.write("test")
except KeyboardInterrupt:
observer.stop()
observer.join()
代碼講解:
- 作用: 用于監(jiān)控目錄 observer.schedule(event_handler, "/xxxxxx/importData", True) , 程序會將上傳文件寫入listFile = set() 集合當中桑涎,觸發(fā)邏輯處理條件是: f int(n) > 20 and len(listFile) > 0 and len(listFile) < 4【大于20秒,并且set當中文件數(shù)量需要小于4兼贡,為什么要用set攻冷,因為文件寫入目錄的過程當作會多次觸發(fā)def on_modified(self, event),避免文件重復】 或則 len(listFile) >= 4【文件數(shù)量大于4個】; 處理執(zhí)行邏輯DiffPlatDeal.xxxxxx遍希,根據(jù)自己業(yè)務需求做變化
- 文件處理的過程中【如果上次傳入文件和這次相同等曼,不做任何操作,也是為了避免文件誤傳凿蒜,多次觸發(fā)】 elif lastFile == sourcePath:
logger.info('處理文件重復的問題:{0}'.format(sourcePath))
- 如果文件處理失敗禁谦,會記錄下處理到那個文件了,然后做出對應釘釘告警信息
異常處理邏輯: except Exception as e: xxxxxx