from tkinter import Tk, constants, Scrollbar, Canvas, Label, Button, Frame, Entry, Spinbox, messagebox, StringVar, \
IntVar, BooleanVar, Checkbutton
from tkinter.ttk import Progressbar
from tkinter.filedialog import askdirectory
from uuid import uuid4
from os import path, mkdir, startfile, walk
import sys
import time
import random
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.edge.options import Options
from threading import Thread, Lock
from concurrent.futures import ThreadPoolExecutor
from urllib.request import urlretrieve
from logging import basicConfig, getLogger
from datetime import datetime
from time import strftime, localtime
from PIL import Image
from PIL.ImageTk import PhotoImage
import imghdr
from requests import post, request
from urllib.parse import quote
from base64 import b64encode
import os
"""
1. 多線程和鎖
2. tkinter
3. selenium
4. 打包exe文件
"""
def clear_constants():
global button_running
global progressbar
global progressbar_percent
global keyword_urls_complete_num
global keyword_progress_text
global logfile_name
global upload_progress_text_dict
global upload_img_complete_num
# 定義全局變量
button_running = None
progressbar = None
progressbar_percent = None
keyword_urls_complete_num = {}
keyword_progress_text = {}
logfile_name = ''
upload_progress_text_dict = {}
upload_img_complete_num = {}
def initGui():
# window.iconbitmap(path_to_icon + '/lighting64.ico')
win_width = 550
win_height = 140
window.geometry(f"{win_width}x{win_height}")
frame = Frame(window, relief=constants.GROOVE, bd=0)
frame.pack(side=constants.TOP, fill=constants.BOTH)
def startDownload():
# 清空頁面
frame.grid_forget()
frame.pack_forget()
startDownloadGui()
# try:
# startDownloadGui()
# except Exception as exception:
# messagebox.showinfo("提示", str(exception))
button_spider = Button(frame, text="爬取圖片", height=20, width=12, anchor='center', relief=constants.GROOVE,
font=('楷體', 20), state=constants.NORMAL, command=startDownload)
button_spider.pack(side=constants.LEFT, fill=constants.BOTH, pady=15, padx=(30, 15))
def startUpload():
# 清空頁面
frame.grid_forget()
frame.pack_forget()
startUploadGui()
# try:
# startUploadGui()
# except Exception as exception:
# messagebox.showinfo("提示", str(exception))
button_upload = Button(frame, text="上傳圖片", height=20, width=12, anchor='center', relief=constants.GROOVE,
font=('楷體', 20), state=constants.NORMAL, command=startUpload)
button_upload.pack(side=constants.RIGHT, fill=constants.BOTH, pady=15, padx=(15, 30))
def startUploadGui():
global win_width
global win_height
global read_path
global username
global password
global upload_progress_text_dict
win_width = 550
win_height = 470
# g_screenwidth = int((window.winfo_screenwidth() - win_width) / 2)
# g_screenheight = int((window.winfo_screenheight() - win_height) / 2)
window.geometry(f"{win_width}x{win_height}")
frame = Frame(window, relief=constants.GROOVE, bd=0)
frame.pack(side=constants.TOP, fill=constants.BOTH, expand=constants.TRUE)
frame2 = Frame(window, relief=constants.GROOVE, bd=0)
frame2.pack(side=constants.BOTTOM, fill=constants.X, expand=constants.TRUE)
# 新建字典娃循,存儲文件夾和文件夾下的圖片
dir_pic_dict = {}
# 添加滾動條
# 為了滾動條創(chuàng)建畫布
canvas = Canvas(frame)
# canvas.grid(column=0, row=0, sticky=constants.NSEW)
canvas.pack(side=constants.LEFT, fill=constants.BOTH, expand=constants.TRUE)
sub_frame = Frame(canvas, bg="white")
# sub_frame.grid(column=1, row=0, sticky=constants.NSEW)
sub_frame.pack(side=constants.TOP, fill=constants.BOTH, expand=constants.TRUE)
# 窗口展示的是frame的左上角
canvas.create_window(0, 0, window=sub_frame, anchor=constants.NW)
# 為最外層的Frame創(chuàng)建滾動條,用于滾動畫布中的元素
scrollV = Scrollbar(frame, orient=constants.VERTICAL, command=canvas.yview)
# scrollV.grid(column=1, row=0, sticky=constants.NSEW)
scrollV.pack(side=constants.RIGHT, fill=constants.Y)
def scroll_bar(event):
canvas.configure(yscrollcommand=scrollV.set, scrollregion=canvas.bbox("all"))
def process_wheel(event):
canvas.yview_scroll(-1 * (round(event.delta / 60)), "units")
# 綁定滾動條
sub_frame.bind("<Configure>", scroll_bar)
# 綁定滾輪事件
sub_frame.bind("<MouseWheel>", process_wheel)
# 第一行
lbl00 = Label(frame2, text='讀取路徑:')
lbl00.grid(row=0, column=0)
var00 = StringVar()
if read_path is not None and read_path != '':
var00.set(read_path)
entry_path = Entry(frame2, width=44, textvariable=var00, state=constants.DISABLED)
entry_path.grid(row=0, column=1, columnspan=4, sticky=constants.W + constants.E)
def select_dir():
global read_path
global all_dir_dict
global upload_select_dict
read_path = askdirectory(initialdir=path.abspath('.'))
var00.set(read_path)
all_dir_dict.clear()
upload_select_dict.clear()
def all_chk():
for value in upload_select_dict.values():
value.set(constants.TRUE)
def non_chk():
for value in upload_select_dict.values():
value.set(constants.FALSE)
def inv_chk():
for value in upload_select_dict.values():
value.set(1 - value.get())
# 顯示選擇按鈕
all_chk_btn = Button(sub_frame, text="全選", width='10', font=("SimSun", 8), command=all_chk, bg='#D2CDCD')
all_chk_btn.grid(row=0, column=0, sticky=constants.W, padx="2")
non_chk_btn = Button(sub_frame, text="全不選", width='10', font=("SimSun", 8), command=non_chk, bg='#D2CDCD')
non_chk_btn.grid(row=0, column=1, sticky=constants.W, padx="2")
inv_btn = Button(sub_frame, text="反選", width='10', font=("SimSun", 8), command=inv_chk, bg='#D2CDCD')
inv_btn.grid(row=0, column=2, sticky=constants.W, padx="2")
# 為了下面排版斗蒋,擴充一個空白單元格
blank_label = Label(sub_frame, width=18, bg='white')
blank_label.grid(row=0, column=3)
# 分兩種情況:1.如果文件夾下還有文件夾捌斧,則將所有文件夾下的文件上傳;2.如果文件夾下沒有文件夾泉沾,則將該文件夾下文件上傳
## 遍歷文件夾下所有的文件
global img_type_list
upload_row_index = 0
for root, dirs, files in walk(read_path):
# 篩掉不符合命名規(guī)范的
if len(path.basename(root).split('-')) != 3:
continue
file_list = []
for file in files:
if imghdr.what(path.join(root, file)) in img_type_list:
file_list.append(file)
if len(file_list) > 0:
all_dir_dict[root] = file_list
# 顯示選擇的文件夾和文件夾下的圖片數(shù)
upload_select_dict[root] = BooleanVar(sub_frame)
upload_select_dict[root].set(constants.TRUE)
chk = Checkbutton(sub_frame, text=root[len(path.dirname(read_path)) + 1:], var=upload_select_dict[root],
bg="white")
chk.grid(row=upload_row_index + 2, column=0, columnspan=10, sticky=constants.W, padx=(10, 10))
# 用于填充白色
Label(sub_frame, bg="white").grid(row=upload_row_index + 2, column=10, sticky=constants.EW,
ipadx=win_width)
# 顯示進度文本
progress_text = f'0/{len(file_list)}'
progress_label = Label(sub_frame, text=progress_text, width=12, anchor=constants.CENTER, fg='black',
bg='white')
progress_label.grid(row=upload_row_index + 2, column=5)
upload_progress_text_dict[root] = progress_label
upload_row_index += 1
button = Button(frame2, text='選擇路徑', command=select_dir)
button.grid(row=0, column=5, sticky=constants.E, padx=(3, 1))
# 第二行
lbl0 = Label(frame2, text='Key:')
lbl0.grid(row=1, column=0)
var10 = StringVar()
var10.set('')
if username is not None:
var10.set(username)
entry_username = Entry(frame2, width=30, textvariable=var10)
entry_username.grid(row=1, column=1, columnspan=4, sticky=constants.W + constants.E)
entry_username.focus()
# 第三行
lbl0 = Label(frame2, text='Secret:')
lbl0.grid(row=2, column=0)
var20 = StringVar()
var20.set('')
if password is not None:
var20.set(password)
entry_password = Entry(frame2, width=30, textvariable=var20, show='*')
entry_password.grid(row=2, column=1, columnspan=4, sticky=constants.W + constants.E)
entry_password.focus()
def eye_transfor():
if entry_password['show'] == '*':
entry_password['show'] = ''
button_eye['image'] = img_eye_close
else:
entry_password['show'] = '*'
button_eye['image'] = img_eye_open
button_eye = Button(frame2, image=img_eye_open, command=eye_transfor)
button_eye.grid(row=2, column=5, sticky=constants.W)
# 第四行
def click_run():
global read_path
global username
global password
read_path = entry_path.get()
username = entry_username.get()
password = entry_password.get()
# 對參數(shù)進行校驗
# if not path.exists(read_path):
# messagebox.showinfo("提示", "該路徑不存在!")
# return
if username == '':
messagebox.showinfo("提示", "請輸入key!")
return
if password == '':
messagebox.showinfo("提示", "請輸入secret!")
return
"""
使用 AK捞蚂,SK 生成鑒權簽名(Access Token),返回 access_token,或是None(如果錯誤)
"""
global access_token
url = "https://aip.baidubce.com/oauth/2.0/token"
params = {"grant_type": "client_credentials", "client_id": username, "client_secret": password}
try:
access_token = str(post(url, params=params).json().get("access_token"))
except Exception as e:
logger.error(repr(e))
messagebox.showinfo("異常", repr(e))
if access_token == 'None':
messagebox.showinfo("提示", "獲取用戶token異常")
return
if not path.exists(read_path):
messagebox.showinfo("提示", "該路徑不存在")
return
# 獲取勾選上傳的文件
upload_chosen_dict = {}
for root, value in upload_select_dict.items():
if not value.get():
continue
upload_chosen_dict[root] = all_dir_dict[root]
if len(upload_chosen_dict) == 0:
messagebox.showinfo("提示", "不存在上傳文件")
return
# 先清理再跳轉
for widget in sub_frame.winfo_children():
widget.destroy()
frame2.grid_forget()
frame2.pack_forget()
for widget in frame2.winfo_children():
widget.destroy()
executeUploadGui(frame, sub_frame, upload_chosen_dict)
button_run = Button(frame2, text='開始', command=click_run)
button_run.grid(row=3, column=2, columnspan=4, sticky=constants.W + constants.E, pady=10)
def click_back():
# 先清理再跳轉
frame.grid_forget()
frame.pack_forget()
frame2.grid_forget()
frame2.pack_forget()
clear_constants()
initGui()
button_back = Button(frame2, text='返回', command=click_back)
button_back.grid(row=3, column=0, columnspan=2, sticky=constants.W + constants.E, pady=10, padx=5)
def executeUploadGui(frame, sub_frame, upload_chosen_dict):
global read_path
# 修改UI
frame3 = Frame(window, relief=constants.GROOVE, bd=0)
frame3.pack(side=constants.BOTTOM, fill=constants.X, expand=constants.TRUE)
upload_index = 0
for root, img_list in upload_chosen_dict.items():
name_label = Label(sub_frame, text=root[len(path.dirname(read_path)) + 1:], width=45, anchor=constants.W,
compound=constants.LEFT, bg="white")
name_label.grid(column=0, columnspan=3, row=upload_index, sticky=constants.W)
# 初始化 - 修改GUI界面
progress_text = f'0/{len(img_list)}'
progress_label = Label(sub_frame, text=progress_text, width=12, anchor=constants.CENTER, fg='black', bg='white')
progress_label.grid(column=4, row=upload_index)
upload_progress_text_dict[root] = progress_label
# 用于填充白色
Label(sub_frame, bg="white").grid(row=upload_index, column=10, sticky=constants.EW, ipadx=win_width)
upload_index += 1
# 設置進度條
global progressbar
global progressbar_percent
progressbar_percent = Label(frame3, text="0%", font=("Arial Bold", 10))
progressbar_percent["text"] = "0%"
progressbar_percent.pack(side=constants.TOP, pady=(0, 0))
# progressbar = Progressbar(window, length=200, mode="determinate", maximum=100, name="完成進度",
# orient=constants.HORIZONTAL, value=0, variable=0)
progressbar = Progressbar(frame3, length=200, mode="determinate", maximum=100, name="完成進度",
orient=constants.HORIZONTAL, value=0, variable=0)
progressbar.pack(side=constants.TOP, fill=constants.X, pady=(0, 12), padx=5)
progressbar["value"] = 0
progressbar_percent.pack(side=constants.TOP, fill=constants.X, pady=(0, 12))
progressbar_percent["text"] = "0%"
progressbar_percent.pack(side=constants.TOP, pady=(12, 0))
# 日志按鈕
global logfile_name
logfile_name = f'Upload_{int(time.time())}.log'
def open_log():
startfile(path.join(read_path, logfile_name))
# button_log = Button(window, text='查看日志', width=15, state=constants.NORMAL, command=open_log)
button_log = Button(frame3, text='查看日志', state=constants.NORMAL, width=25, command=open_log)
button_log.pack(side=constants.LEFT, fill=constants.X, anchor=constants.CENTER, padx=5)
# 運行按鈕
def jump_init():
# 先清理再跳轉
frame.pack_forget()
frame3.pack_forget()
frame.grid_forget()
frame3.grid_forget()
clear_constants()
startUploadGui()
global button_running
# button_running = Button(window, text='運行中...', width=15, state=constants.NORMAL)
button_running = Button(frame3, text='運行中...', state=constants.NORMAL, width=25, command=jump_init)
button_running["state"] = constants.DISABLED
button_running.pack(side=constants.RIGHT, fill=constants.X, anchor=constants.CENTER, padx=5)
args = [access_token, upload_chosen_dict]
th = Thread(target=executeUpload, args=(args))
th.setDaemon(True)
th.start()
def executeUpload(access_token, upload_chosen_dict):
# access_token = args[0]
# upload_chosen_dict = args[1]
global lock
global upload_img_complete_num
global upload_progress_text_dict
# tags = "大類ID,小類ID" brief = {"name": "食材名稱", "id": UUID}
url = "https://aip.baidubce.com/rest/2.0/image-classify/v1/realtime_search/similar/add?access_token=" + access_token
with open(path.join(read_path, logfile_name), 'w', encoding='utf-8') as log:
log.buffer.write(f'任務開始 -- {strftime("%Y-%m-%d %H:%M:%S", localtime())} \n'.encode())
log.buffer.write(f'上傳文件夾為: [{upload_chosen_dict.keys()}] \n'.encode())
# 選擇文件夾
def uploadImg():
total_task_num = 0
for value in upload_chosen_dict.values():
total_task_num += len(value)
for root, img_list in upload_chosen_dict.items():
# 開始執(zhí)行 - 進度文字改為橙色
global upload_progress_text_dict
progress_label = upload_progress_text_dict.get(root)
progress_label['fg'] = 'orange'
fclass, sclass, name = path.basename(root).split('-')
for img_name in img_list:
try:
img = open(path.join(root, img_name), "rb")
image = str(b64encode(img.read()), 'utf-8')
# brief = '{"name":"' + name + '","id":"' + img_name.split('.')[0] + '"}'
brief = f'{name}'
tags = f'{fclass},{sclass}'
payload = f'image={quote(image)}&brief={quote(brief)}&tags={quote(tags)}'
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
response = request("POST", url, headers=headers, data=payload)
if response.status_code != 200:
with open(path.join(read_path, logfile_name), 'a', encoding='utf-8') as log:
log.buffer.write(
f'Error: 上傳圖片失敗 [{path.join(root, img_name)}] 狀態(tài)碼[{response.status_code}] 原因[{response.reason}] \n'.encode())
progress_label = upload_progress_text_dict.get(root)
progress_label['fg'] = 'red'
else:
(success_num, complete_num) = upload_img_complete_num.get(root, (0, 0))
success_num = success_num + 1
with open(path.join(read_path, logfile_name), 'a', encoding='utf-8') as log:
log.buffer.write(
f'Success: 上傳圖片成功 [{path.join(root, img_name)}] 狀態(tài)碼[{response.status_code}] 原因[{response.reason}] \n'.encode())
progress_label = upload_progress_text_dict.get(root)
except Exception as exception:
logger.error(f"Error: {exception}")
with open(path.join(read_path, logfile_name), 'a', encoding='utf-8') as log:
log.buffer.write(f'上傳圖片失敗 [{path.join(root, img_name)}] 原因[{repr(exception)}] \n'.encode())
progress_label = upload_progress_text_dict.get(root)
progress_label['fg'] = 'red'
(success_num, complete_num) = upload_img_complete_num.get(root, (0, 0))
complete_num += 1
upload_img_complete_num[root] = (success_num, complete_num)
progress_label['text'] = f'{success_num}/{len(upload_chosen_dict[root])}'
if success_num == len(upload_chosen_dict[root]):
progress_label['fg'] = 'green'
# 修改進度條
complete_task_num = 0
for value in upload_img_complete_num.values():
complete_task_num += value[1]
progressbar_percent['text'] = str(round(100 * complete_task_num / total_task_num)) + '%'
progressbar['value'] = 100 * complete_task_num / total_task_num
uploadImg()
global button_running
button_running['text'] = '完成'
button_running["state"] = constants.NORMAL
with open(path.join(read_path, logfile_name), 'a', encoding='utf-8') as log:
log.buffer.write(f'任務完成 -- {strftime("%Y-%m-%d %H:%M:%S", localtime())} \n'.encode())
def startDownloadGui():
global keywords
global cnt_start
global cnt_end
global win_width
global win_height
global save_path
global max_sleep_ms
global thread_num
win_width = 550
win_height = 170
# g_screenwidth = int((window.winfo_screenwidth() - win_width) / 2)
# g_screenheight = int((window.winfo_screenheight() - win_height) / 2)
window.geometry(f"{win_width}x{win_height}")
frame = Frame(window, relief=constants.GROOVE, bd=0)
frame.pack(side=constants.TOP, fill=constants.BOTH, expand=constants.TRUE)
# 第一行
lbl0 = Label(frame, text='輸入關鍵字:')
lbl0.grid(row=0, column=0)
var00 = StringVar()
var00.set('')
if keywords is not None:
var00.set(keywords)
entry_keyword = Entry(frame, width=30, textvariable=var00)
entry_keyword.grid(row=0, column=1, columnspan=5, sticky=constants.W + constants.E)
entry_keyword.focus()
# 第二行
lbl10 = Label(frame, text='開始位置:')
lbl10.grid(row=1, column=0, sticky=constants.E)
var10 = IntVar()
var10.set(1)
if cnt_start is not None:
var10.set(cnt_start)
spin_start = Spinbox(frame, from_=0, to=10000, textvariable=var10)
spin_start.grid(row=1, column=1, columnspan=2, sticky=constants.W + constants.E)
lbl11 = Label(frame, text='結束位置:')
lbl11.grid(row=1, column=3, sticky=constants.E)
var01 = IntVar()
var01.set(20)
if cnt_start is not None:
var01.set(cnt_end)
spin_end = Spinbox(frame, from_=0, to=10000, textvariable=var01)
spin_end.grid(row=1, column=4, columnspan=2, sticky=constants.W + constants.E)
# 第三行
lbl20 = Label(frame, text='保存路徑:')
lbl20.grid(row=2, column=0, sticky=constants.E)
var20 = StringVar()
var20.set(path.abspath('.'))
if save_path is not None:
var20.set(save_path)
entry_path = Entry(frame, width=42, textvariable=var20)
entry_path.grid(row=2, column=1, columnspan=4, sticky=constants.W + constants.E)
def click_path():
file = askdirectory(initialdir=path.abspath('.'))
var20.set(file)
button = Button(frame, text='選擇路徑', command=click_path)
button.grid(row=2, column=5, sticky=constants.E)
# 第四行
lbl30 = Label(frame, text='睡眠(ms):')
lbl30.grid(row=3, column=0, sticky=constants.E)
var30 = IntVar()
var30.set(500)
if max_sleep_ms is not None:
var30.set(max_sleep_ms)
spin_max_sleep_ms = Spinbox(frame, from_=0, to=10000, textvariable=var30)
spin_max_sleep_ms.grid(row=3, column=1, columnspan=2, sticky=constants.W + constants.E)
lbl31 = Label(frame, text='線程數(shù):')
lbl31.grid(row=3, column=3, sticky=constants.E)
var31 = IntVar()
var31.set(8)
if thread_num is not None:
var31.set(thread_num)
spin_thread_num = Spinbox(frame, from_=0, to=10000, textvariable=var31)
spin_thread_num.grid(row=3, column=4, columnspan=2, sticky=constants.W)
# 第五行
def click_run():
global cnt_start
global cnt_end
global save_path
global keywords
global max_sleep_ms
global thread_num
keywords = entry_keyword.get()
save_path = entry_path.get()
# 對參數(shù)進行校驗
if entry_keyword.get() == '':
messagebox.showinfo("提示", "請輸入搜索關鍵詞")
return
if not path.exists(save_path):
messagebox.showinfo("提示", "該路徑不存在")
return
try:
cnt_start = int(spin_start.get())
cnt_end = int(spin_end.get())
max_sleep_ms = int(spin_max_sleep_ms.get())
thread_num = int(spin_thread_num.get())
except Exception as exception:
messagebox.showinfo("提示", "信息需要填寫整數(shù)")
return
# 先清理再跳轉
frame.grid_forget()
frame.pack_forget()
clear_constants()
execDownloadGui()
button_run = Button(frame, text='開始', command=click_run)
button_run.grid(row=4, column=2, columnspan=4, sticky=constants.W + constants.E, pady=10, padx=5)
def click_back():
# 先清理再跳轉
frame.grid_forget()
frame.pack_forget()
clear_constants()
initGui()
button_back = Button(frame, text='返回', command=click_back)
button_back.grid(row=4, column=0, columnspan=2, sticky=constants.W + constants.E, pady=10, padx=5)
def execDownloadGui():
global win_width
global win_height
win_width = 502
win_height = 480
# g_screenwidth = int((window.winfo_screenwidth() - win_width) / 2)
# g_screenheight = int((window.winfo_screenheight() - win_height) / 2)
window.geometry(f"{win_width}x{win_height}")
frame = Frame(window, relief=constants.GROOVE, bd=0)
frame.pack(side=constants.TOP, fill=constants.BOTH, expand=constants.TRUE)
frame2 = Frame(window, relief=constants.GROOVE, bd=0)
frame2.pack(side=constants.BOTTOM, fill=constants.X, expand=constants.TRUE)
# 新建字典跷究,存儲文件名和label
files_label_dict = {}
# 為了滾動條創(chuàng)建畫布
canvas = Canvas(frame)
# canvas.grid(column=0, row=0, sticky=constants.NSEW)
canvas.pack(side=constants.LEFT, fill=constants.BOTH, expand=constants.TRUE)
sub_frame = Frame(canvas, bg="white")
# sub_frame.grid(column=1, row=0, sticky=constants.NSEW)
sub_frame.pack(side=constants.TOP, fill=constants.BOTH, expand=constants.TRUE)
# 窗口展示的是frame的左上角
canvas.create_window(0, 0, window=sub_frame, anchor=constants.NW)
# 為最外層的Frame創(chuàng)建滾動條姓迅,用于滾動畫布中的元素
scrollV = Scrollbar(frame, orient=constants.VERTICAL, command=canvas.yview)
# scrollV.grid(column=1, row=0, sticky=constants.NSEW)
scrollV.pack(side=constants.RIGHT, fill=constants.Y)
def scroll_bar(event):
canvas.configure(yscrollcommand=scrollV.set, scrollregion=canvas.bbox("all"))
def process_wheel(event):
canvas.yview_scroll(-1 * (round(event.delta / 60)), "units")
# 綁定滾動條
sub_frame.bind("<Configure>", scroll_bar)
# 綁定滾輪事件
sub_frame.bind("<MouseWheel>", process_wheel)
# 添加進度條
global progressbar_percent
global progressbar
# progressbar_percent = Label(window, text="0%", font=("Arial Bold", 10))
progressbar_percent = Label(frame2, text="0%", font=("Arial Bold", 10))
progressbar_percent["text"] = "0%"
progressbar_percent.pack(side=constants.TOP, pady=(0, 0))
progressbar = Progressbar(frame2, length=200, mode="determinate", maximum=100, name="完成進度",
orient=constants.HORIZONTAL, value=0, variable=0)
progressbar.pack(side=constants.TOP, fill=constants.X, pady=(0, 12), padx=5)
# 日志按鈕
def open_log():
startfile(path.join(save_path, 'Imgs', logfile_name))
# button_log = Button(window, text='查看日志', width=15, state=constants.NORMAL, command=open_log)
button_log = Button(frame2, text='查看日志', state=constants.NORMAL, width=25, command=open_log)
button_log.pack(side=constants.LEFT, fill=constants.X, anchor=constants.CENTER, padx=5)
# 運行按鈕
def jump_init():
# 先清理再跳轉
frame.pack_forget()
frame2.pack_forget()
frame.grid_forget()
frame2.grid_forget()
clear_constants()
startDownloadGui()
global button_running
# button_running = Button(window, text='運行中...', width=15, state=constants.NORMAL)
button_running = Button(frame2, text='運行中...', state=constants.NORMAL, width=25, command=jump_init)
button_running["state"] = constants.DISABLED
button_running.pack(side=constants.RIGHT, fill=constants.X, anchor=constants.CENTER, padx=5)
# 執(zhí)行任務
for index, keyword in enumerate(keywords.split(';')):
name_label = Label(sub_frame, text=keyword, width=40, anchor=constants.W, compound=constants.LEFT, bg="white")
name_label.grid(column=0, row=index, sticky=constants.W)
# 初始化 - 修改GUI界面
progress_text = f'0/{cnt_end - cnt_start + 1}'
progress_label = Label(sub_frame, text=progress_text, width=12, anchor=constants.CENTER, fg='black', bg='white')
progress_label.grid(column=5, row=index)
global keyword_progress_text
keyword_progress_text[keyword] = progress_label
th = Thread(target=executeDownload, args=())
th.setDaemon(True)
th.start()
# 寫入日志
global logfile_name
logfile_name = f'Download_{int(time.time())}.log'
logger.info("=" * 15 + f" 任 務 開 始 ({str(datetime.now())}) " + "=" * 15)
imgs_path = path.join(save_path, 'Imgs')
if not path.exists(imgs_path):
mkdir(imgs_path)
with open(path.join(save_path, 'Imgs', logfile_name), 'w', encoding='utf-8') as log:
log.buffer.write(f'任務開始 -- {strftime("%Y-%m-%d %H:%M:%S", localtime())} \n'.encode())
log.buffer.write(f'關鍵字為: [{keywords}] 范圍為: [{cnt_start}-{cnt_end}] \n'.encode())
def executeDownload():
with ThreadPoolExecutor(max_workers=8) as executor:
def getImgUrls(keyword):
# logger.info(f'正在處理:{file}')
keyword = keyword[0]
# 開始執(zhí)行 - 進度文字改為藍色
global keyword_progress_text
progress_label = keyword_progress_text.get(keyword)
progress_label['fg'] = 'orange'
keyword_url_encode = quote(keyword)
web_url = search_ori_url + keyword_url_encode
# 爬蟲程序
try:
# 打開chrome無頭瀏覽器
edge_options = Options()
edge_options.add_argument('--headless')
edge_options.add_argument('--disable-gpu')
# 反偵測,開啟開發(fā)者模式
edge_options.add_experimental_option('excludeSwitches', ['enable-automation'])
# 禁用啟動Blink運行時功能
edge_options.add_argument('--disable-blink-features=AutomationControlled')
driver = webdriver.Edge(options=edge_options)
executor_url = driver.command_executor._url
session_id = driver.session_id
# 將打開的瀏覽區(qū)url和session_id存儲起來,提供給下一次應用
# file = open('browserMsg.txt', 'w')
# file.writelines([executor_url, 'n', session_id])
# file.close()
driver.implicitly_wait(20)
driver.set_window_size(1000, 800)
driver.get(web_url)
# 如果圖片數(shù)量不夠丁存,則向下滾動一頁
current_num = 0
img_urls = []
while current_num < int(cnt_end):
elements = driver.find_elements(by=By.CLASS_NAME, value='main_img.img-hover')
print('獲取到' + keyword + '元素個數(shù)為 ', len(elements), ' 線程號為:' + str(os.getpid()))
current_num = len(elements)
if current_num < cnt_end:
# js = 'return document.body.scrollHeight;'
driver.execute_script('window.scrollTo(0, document.body.scrollHeight)')
# time.sleep(random.randint(1, 5) / 10)
else:
for element in elements:
img_urls.append(element.get_attribute('data-imgurl'))
img_urls = img_urls[cnt_start - 1:cnt_end]
return keyword, img_urls
except Exception as exception:
# traceback.print_exc()
logger.error(f"Error: {exception}")
with open(path.join(save_path, 'Imgs', logfile_name), 'a', encoding='utf-8') as log:
log.buffer.write(f'Error: 爬取圖片失敗 [{keyword}] 錯誤信息 [{repr(exception)}] \n'.encode())
# 爬取頁面失敗肩杈,進度文字改為紅色
progress_label = keyword_progress_text.get(keyword)
progress_label['fg'] = 'red'
def downloadImgs(future):
keyword, img_urls = future.result()
def downloadImg(args3):
keyword, img_url, index = args3
imgs_path = path.join(save_path, 'Imgs')
if not path.exists(imgs_path):
mkdir(imgs_path)
if not path.exists(path.join(imgs_path, keyword)):
mkdir(path.join(imgs_path, keyword))
img_name = f'{uuid4().hex}.png'
img_abspath = path.join(path.join(imgs_path, keyword), img_name)
# 添加異常處理
global lock
global keyword_urls_complete_num
global keyword_progress_text
try:
# TODO 設置睡眠時間
# time.sleep(random.randint(1, 5) / 10)
print(f'關鍵字:[{keyword}] 序號:[{index + 1}] 網址:{img_url}')
urlretrieve(img_url, img_abspath)
# 設置公共變量詞典,key為keyword解寝,value為tuple類型扩然,存儲 (成功數(shù),處理數(shù))
lock.acquire()
(success_num, complete_num) = keyword_urls_complete_num.get(keyword, (0, 0))
success_num = success_num + 1
except Exception as exception:
# traceback.print_exc()
logger.error(f"Error: {exception}")
with open(path.join(save_path, 'Imgs', logfile_name), 'a', encoding='utf-8') as log:
log.buffer.write(f'Error: 圖片下載失敗 [{keyword}] [{img_url}] 錯誤信息 {repr(exception)} \n'.encode())
# 下載圖片失敗 - 修改進度文字顏色為紅色
progress_label = keyword_progress_text.get(keyword)
progress_label['fg'] = 'red'
(success_num, complete_num) = keyword_urls_complete_num.get(keyword, (0, 0))
# 修改
complete_num = complete_num + 1
keyword_urls_complete_num[keyword] = (success_num, complete_num)
# 執(zhí)行中 - 修改進度文字文本
progress_label = keyword_progress_text.get(keyword)
progress_label['text'] = f'{success_num}/{cnt_end - cnt_start + 1}'
lock.release()
if success_num == cnt_end - cnt_start + 1:
progress_label['fg'] = 'green'
# 修改進度條
complete_task_num = 0
for value in keyword_urls_complete_num.values():
complete_task_num += value[1]
total_task_num = len(keyword_progress_text.keys()) * (cnt_end - cnt_start + 1)
progressbar_percent["text"] = str(round(100 * complete_task_num / total_task_num)) + "%"
progressbar["value"] = 100 * complete_task_num / total_task_num
with open(path.join(save_path, 'Imgs', logfile_name), 'a', encoding='utf-8') as log:
log.buffer.write(f'Success: 處理完成 關鍵字:[{keyword}] 序號:[{index + 1}] 網址:{img_url} \n'.encode())
global thread_num
with ThreadPoolExecutor(max_workers=thread_num) as executor2:
for index, img_url in enumerate(img_urls):
global max_sleep_ms
time.sleep(random.randint(1, max_sleep_ms) / 1000)
args3 = [keyword, img_url, index]
executor2.submit(downloadImg, args3)
# downloadImg(keyword,img_url)
for keyword in keywords.split(';'):
args = [keyword]
get_url_task = executor.submit(getImgUrls, args)
get_url_task.add_done_callback(downloadImgs)
# # 通過回調add_done_callback來處理結果
# for index, img_url in enumerate(img_urls):
# args = [keyword, img_url]
# tasks = executor.submit(downloadImgs, args)
global button_running
button_running['text'] = '完成'
button_running["state"] = constants.NORMAL
with open(path.join(save_path, 'Imgs', logfile_name), 'a', encoding='utf-8') as log:
log.buffer.write(f'任務完成 -- {strftime("%Y-%m-%d %H:%M:%S", localtime())} \n'.encode())
if __name__ == '__main__':
basicConfig(level="INFO")
logger = getLogger()
# 下載功能的全局變量
search_ori_url = f'https://image.baidu.com/search/index?tn=baiduimage&ipn=r&ct=201326592&cl=2&lm=-1&st=-1&fm=result&fr=&sf=1&fmq=1683777177601_R&pv=&ic=&nc=1&z=&hd=&latest=©right=&se=1&showtab=0&fb=0&width=&height=&face=0&istype=2&dyTabStr=MCwxLDYsMyw0LDUsMiw3LDgsOQ%3D%3D&ie=utf-8&sid=&word='
save_path = None
cnt_start = 1
cnt_end = 20
keywords = None
keyword_urls_complete_num = {}
keyword_progress_text = {}
thread_num = 10
max_sleep_ms = 500
# 上傳功能的全局變量
read_path = None
all_dir_dict = {}
upload_select_dict = {}
upload_progress_text_dict = {}
upload_img_complete_num = {}
username = ''
password = ''
access_token = None
img_type_list = {'jpg', 'bmp', 'png', 'jpeg', 'jfif', 'webp'}
# 公共全局變量
lock = Lock()
logfile_name = ''
button_running = None
progressbar = None
progressbar_percent = None
# 引入圖片
bundle_dir = getattr(sys, '_MEIPASS', path.abspath(path.dirname(__file__)))
path_to_icon = path.join(bundle_dir, 'ppicon')
window = Tk()
window.title("Tool For PaddlePaddle Authored By CJ")
img_eye_open = PhotoImage(Image.open(path_to_icon + "/eye_open.png").resize((16, 16)))
img_eye_close = PhotoImage(Image.open(path_to_icon + "/eye_close.png").resize((16, 16)))
# startDownloadGui()
win_width = 550
win_height = 140
g_screenwidth = int((window.winfo_screenwidth() - win_width) / 2)
g_screenheight = int((window.winfo_screenheight() - win_height) / 2)
window.geometry(f"{win_width}x{win_height}+{g_screenwidth}+{g_screenheight}")
window.iconbitmap(path_to_icon + '/favicon.ico')
try:
initGui()
except Exception as e:
messagebox.showinfo("提示", str(e))
window.mainloop()
需要將 edge的無頭瀏覽器(msedgedriver.exe) 程序放到系統(tǒng)路徑中
打包
安裝打包工具 pip install
pyinstaller -F -w -i C:\Users\CJ\Downloads\python_downimg-master\com\cj\downimg\ppicon\favicon.ico --add-data C:\Users\CJ\Downloads\python_downimg-master\com\cj\downimg\ppicon;ppicon --hidden-import=_cffi_backend C:\Users\CJ\Downloads\python_downimg-master\com\cj\downimg\ToolForPP.py