python watchdog 實現(xiàn)文件目錄的監(jiān)控

使用: 如果想監(jiān)控目錄闷叉,做相應邏輯處理(不想大費周章),相信這會是你很好的選擇

**需求: 實現(xiàn)linux 目錄監(jiān)控,將新增的文件放入處理引擎中執(zhí)行, python 3以上 **

注意事項:

由于文件上傳到指定目錄時卫漫,會觸發(fā)多重監(jiān)控事件帅刀,需要做邏輯處理

watchdog 簡單demo 使用

# -*- coding: utf-8 -*-
# 作用: 用于監(jiān)控目錄的變化,調(diào)用對應的處理邏輯

from watchdog.observers import Observer
from watchdog.events import *
import time
import sys
import os
import logging
import zipfile

# 設置系統(tǒng)編碼格式
reload(sys)
sys.setdefaultencoding('utf8')

# 創(chuàng)建一個logger让腹,設置日志
logger = logging.getLogger('MonitorDir')
logger.setLevel(logging.DEBUG)

# 創(chuàng)建一個handler,用于寫入日志文件
fh = logging.FileHandler('E:/testLog.log')
fh.setLevel(logging.DEBUG)

# 再創(chuàng)建一個handler扣溺,用于輸出到控制臺
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)

# 定義handler的輸出格式
formatter = logging.Formatter(
    '[%(asctime)s] [%(thread)d] [%(filename)s] [line: %(lineno)d][%(levelname)s] ## %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)

# 給logger添加handler
logger.addHandler(fh)
logger.addHandler(ch)

class FileEventHandler(FileSystemEventHandler):

    def __init__(self):
        FileSystemEventHandler.__init__(self)

    def on_moved(self, event):
        if event.is_directory:
            logger.info("directory moved from {0} to {1}".format(event.src_path, event.dest_path))
        else:
            logger.info("file moved from {0} to {1}".format(event.src_path, event.dest_path))

    def on_created(self, event):
        if event.is_directory:
            logger.info("directory created:{0}".format(event.src_path))
        else:
            logger.info("file created:{0}".format(event.src_path))

    def on_deleted(self, event):
        if event.is_directory:
            logger.info("directory deleted:{0}".format(event.src_path))
        else:
            logger.info("file deleted:{0}".format(event.src_path))
    # 主要監(jiān)控目錄下有文件修改
    def on_modified(self, event):
        # 監(jiān)控目錄下面的目錄
        if event.is_directory:
            logger.info("directory modified:{0}".format(event.src_path))
        else:
            logger.info("file modified:{0}".format(event.src_path))

if __name__ == "__main__":
    observer = Observer()
    event_handler = FileEventHandler()
    # 監(jiān)控目錄
    observer.schedule(event_handler, "E:\TestMonitor", True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

通過上面的學習骇窍,想必已經(jīng)對watchdog 有個簡單的了解,下面進入正題锥余,直接上實際使用中的代碼腹纳,也會講解使用中用到的一些問題,多的不說,正式上代碼開始

# -*- coding: utf-8 -*-
# 作用: 用于監(jiān)控目錄的變化,調(diào)用對應的處理邏輯

from watchdog.observers import Observer
from watchdog.events import *
import time
import sys
import os
import logging
import zipfile
import DiffPlatDeal
import json
import requests

# 設置系統(tǒng)編碼格式
# reload(sys)
# sys.setdefaultencoding('utf8')

# 創(chuàng)建一個logger
logger = logging.getLogger('MonitorDir')
logger.setLevel(logging.DEBUG)

# 創(chuàng)建一個handler,用于寫入日志文件
logTime = time.strftime('%Y%m%d', time.localtime(time.time()))
fh = logging.FileHandler('/home/liuqing/logData/MonitorDirLog_' + logTime + '.log')
fh.setLevel(logging.DEBUG)

# 再創(chuàng)建一個handler只估,用于輸出到控制臺
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)

# 定義handler的輸出格式
formatter = logging.Formatter(
    '[%(asctime)s] [%(thread)d] [%(filename)s] [line: %(lineno)d][%(levelname)s] ## %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)

# 給logger添加handler
logger.addHandler(fh)
logger.addHandler(ch)


class FileEventHandler(FileSystemEventHandler):

    def __init__(self):
        FileSystemEventHandler.__init__(self)

    def on_moved(self, event):
        if event.is_directory:
            logger.info("directory moved from {0} to {1}".format(event.src_path, event.dest_path))
        else:
            logger.info("file moved from {0} to {1}".format(event.src_path, event.dest_path))

    def on_created(self, event):
        if event.is_directory:
            logger.info("directory created:{0}".format(event.src_path))
        else:
            logger.info("file created:{0}".format(event.src_path))

    def on_deleted(self, event):
        if event.is_directory:
            logger.info("directory deleted:{0}".format(event.src_path))
        else:
            logger.info("file deleted:{0}".format(event.src_path))

    # 主要監(jiān)控目錄下有文件修改
    def on_modified(self, event):
        sourcePath = event.src_path
        global lastFile
        global n
        # 監(jiān)控目錄下面的目錄
        if event.is_directory:
            logger.info("directory modified:{0}".format(sourcePath))
        else:
            size = os.path.getsize(sourcePath)
            if size == 0:
                logger.info('文件是空的')
            else:
                # 用于釘釘監(jiān)控
                url = "xxxxxxxx"
                header = {
                    "Content-Type": "application/json",
                    "charset": "utf-8"
                }
                try:
                    filename, type = os.path.splitext(sourcePath)
                    # 處理臨時文件以及壓縮文件
                    if 'tmp' in type:
                        logger.info('文件是臨時文件志群,不做處理')
                    elif 'zip' in type:
                        logger.info('需要先解壓文件')
                        f = zipfile.ZipFile(sourcePath, 'r')
                        f.extractall(filename + '/zipData')
                    elif 'swp' in type:
                        logger.info('文件是swp臨時文件,不做處理')
                    elif lastFile == sourcePath:
                        logger.info('處理文件重復的問題:{0}'.format(sourcePath))
                    else:
                        # 在這里面調(diào)用處理邏輯
                        nowTime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
                        if type != ".txt":
                            lastFile = sourcePath
                            logger.info("file modified:{0}".format(sourcePath))
                            listFile.add(sourcePath)
                            logger.info("list:{0}".format(listFile))
                        # 時間處理蛔钙,20s 執(zhí)行一次或則list 文件中數(shù)量大于4個直接處理
                        if int(n) > 20 and len(listFile) == 0:
                            n = 0
                        if int(n) > 20 and len(listFile) > 0 and len(listFile) < 4:
                            logger.info("n:{0};listFile:{1}".format(n, listFile))
                            # 調(diào)用處理的方法
                            DiffPlatDeal.xxxx(logger, listFile)
                            sendList = []
                            for file in listFile:
                                sendList.append(os.path.basename(file))

                            dataSucee1 = {
                                "msgtype": "text",
                                "text": {
                                    "content": "【Success: 處理成功】:【"+nowTime+"】\n"+ ";  ####\n".join(sendList)+"\n 【總條數(shù):】"+str(len(sendList))
                                }
                            }
                            sendData1 = json.dumps(dataSucee1).encode("utf-8")
                            requests.post(url=url, data=sendData1, headers=header)
                            n = 0
                            listFile.clear()
                        # 當list中的值大于等于4個后在處理
                        if len(listFile) >= 4:
                            # 調(diào)用處理邏輯然后清空list
                            DiffPlatDeal.xxxxxx(logger, listFile)
                            # 清空list
                            sendList = []
                            for file in listFile:
                                sendList.append(os.path.basename(file))
                            dataSucee = {
                                "msgtype": "text",
                                "text": {
                                    "content": "【Success: 處理成功】:【"+nowTime+"】\n"+  ";  ####\n".join(sendList)+"\n 【總條數(shù):】"+str(len(sendList))
                                }
                            }
                            sendData = json.dumps(dataSucee).encode("utf-8")
                            requests.post(url=url, data=sendData, headers=header)
                            listFile.clear()

                except Exception as e:
                    logger.error(e)
                    time.sleep(5)
                    #  讀取文件锌云,獲取異常文件名發(fā)出通知
                    with open("/xxxxx/py/readExcelList/list.txt", 'r') as f:  # 打開文件
                        lines = f.readlines()  # 讀取所有行
                        last_line = lines[-1].strip()  # 取最后一行
                        # 處理異常
                        if len(last_line) != 0:
                            if len(last_line) != 0 and len(last_line.split("::")) == 2:
                                if "," not in last_line.split("::")[1]:
                                    logger.error('當前批次文件全部有問題')
                                    sendList = []
                                    for file in listFile:
                                        sendList.append(os.path.basename(file))
                                    data = {
                                        "msgtype": "text",
                                        "text": {
                                            "content": "【Fail: 文件沒有處理】:【"+nowTime+"】\n"+  ";  ####\n".join(sendList)+"\n【錯誤原因】:\n"+str(e)+"\n 【總條數(shù):】"+str(len(sendList))
                                        }
                                    }
                                    sendData = json.dumps(data).encode("utf-8")
                                    requests.post(url=url, data=sendData, headers=header)
                                else:
                                    spline = last_line.split("::")[1].split(",")
                                    logger.info('Except當前批次處理的文件:{0}'.format(spline))
                                    tmpset = listFile - set(spline)
                                    sendList = []
                                    for file in tmpset:
                                        sendList.append(os.path.basename(file))
                                    data = {
                                        "msgtype": "text",
                                        "text": {
                                            "content": "【Fail: 文件沒有處理】:【"+nowTime+"】\n"+  ";  ####\n".join(sendList)+"\n【錯誤原因】:\n"+str(e)+"\n 【總條數(shù):】"+str(len(sendList))
                                        }
                                    }
                                    sendData = json.dumps(data).encode("utf-8")
                                    requests.post(url=url, data=sendData, headers=header)
                            # 當文件異常時寫入換行符
                            with open('/xxxxx/readExcelList/list.txt', 'a') as f:
                                f.write("\n")
                    listFile.clear()


if __name__ == "__main__":
    # 定義一個集合,將一段時間內(nèi)的目錄放入這個集合中:
    listFile = set()
    # 定義一個變量吁脱,如果變量達到某個值就執(zhí)行處理操作
    n = 0
    # 用于獲取上次監(jiān)控的文件
    lastFile = ""
    observer = Observer()
    event_handler = FileEventHandler()
    observer.schedule(event_handler, "/xxxxxx/importData", True)
    observer.start()
    try:
        while True:
            time.sleep(1)
            n += 1
            # 為了觸發(fā) def on_modified(self, event) 操作
            if n == 25:
                with open('/xxxxxxx/importData/test.txt', 'w+') as f:
                    f.write("test")
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

代碼講解:

  1. 作用: 用于監(jiān)控目錄 observer.schedule(event_handler, "/xxxxxx/importData", True) , 程序會將上傳文件寫入listFile = set() 集合當中桑涎,觸發(fā)邏輯處理條件是: f int(n) > 20 and len(listFile) > 0 and len(listFile) < 4【大于20秒,并且set當中文件數(shù)量需要小于4兼贡,為什么要用set攻冷,因為文件寫入目錄的過程當作會多次觸發(fā)def on_modified(self, event),避免文件重復】 或則 len(listFile) >= 4【文件數(shù)量大于4個】; 處理執(zhí)行邏輯DiffPlatDeal.xxxxxx遍希,根據(jù)自己業(yè)務需求做變化
  1. 文件處理的過程中【如果上次傳入文件和這次相同等曼,不做任何操作,也是為了避免文件誤傳凿蒜,多次觸發(fā)】 elif lastFile == sourcePath:
    logger.info('處理文件重復的問題:{0}'.format(sourcePath))
  1. 如果文件處理失敗禁谦,會記錄下處理到那個文件了,然后做出對應釘釘告警信息
    異常處理邏輯: except Exception as e: xxxxxx
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末废封,一起剝皮案震驚了整個濱河市州泊,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌漂洋,老刑警劉巖遥皂,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異刽漂,居然都是意外死亡演训,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進店門爽冕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來仇祭,“玉大人,你說我怎么就攤上這事颈畸∥谄妫” “怎么了?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵眯娱,是天一觀的道長礁苗。 經(jīng)常有香客問我,道長徙缴,這世上最難降的妖魔是什么试伙? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任嘁信,我火速辦了婚禮,結(jié)果婚禮上疏叨,老公的妹妹穿的比我還像新娘潘靖。我一直安慰自己,他們只是感情好蚤蔓,可當我...
    茶點故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布卦溢。 她就那樣靜靜地躺著,像睡著了一般秀又。 火紅的嫁衣襯著肌膚如雪单寂。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天吐辙,我揣著相機與錄音宣决,去河邊找鬼。 笑死昏苏,一個胖子當著我的面吹牛尊沸,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播捷雕,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼椒丧,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了救巷?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤句柠,失蹤者是張志新(化名)和其女友劉穎浦译,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體溯职,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡精盅,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了谜酒。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片叹俏。...
    茶點故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖僻族,靈堂內(nèi)的尸體忽然破棺而出粘驰,到底是詐尸還是另有隱情,我是刑警寧澤述么,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布蝌数,位于F島的核電站,受9級特大地震影響度秘,放射性物質(zhì)發(fā)生泄漏顶伞。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望唆貌。 院中可真熱鬧滑潘,春花似錦、人聲如沸锨咙。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蓖租。三九已至粱侣,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蓖宦,已是汗流浹背齐婴。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留稠茂,地道東北人柠偶。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像睬关,于是被迫代替她去往敵國和親诱担。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容