實現(xiàn)多任務的方式
多線程
多進程
協(xié)程
多線程+多進程
為什么你能夠?qū)崿F(xiàn)多任務默勾?
并行:同時發(fā)起碉渡,同時執(zhí)行,多進程,進程:cpu分配資源
并發(fā):同時發(fā)起灾测,單個執(zhí)行爆价,線程
在pyhton語言中,并不能真正意義上實現(xiàn)多線程媳搪,因為Cpython解釋器有一個全局的GIL解釋器鎖, 來保證同一時刻只有一個線程在執(zhí)行
線程:
是cpu執(zhí)行的一個基本單元,暫用的資源非常少,并且線程和線程之間的資源是共享的,線程是依賴于進程而存在的,多線程一般適用于I/O密集型操作,線程的執(zhí)行是無序的
線程的創(chuàng)建和使用
from threading import Thread
import threading,time
data = []
def download_image(url,num):
"""
下載圖片
:param url:
:param num:
:return:
"""
global data
time.sleep(2)
print(url, num)
data.append(num)
def read_data():
global data
for i in data:
print(i)
if __name__ == '__main__':
# 獲取當前線程的名稱:threading.currentThread().name
print('主線程開始',threading.currentThread().name)
# 創(chuàng)建一個子線程
"""
target=None, 線程要執(zhí)行的目標函數(shù)
name=None, 創(chuàng)建線程的時候指定線程名稱
args=():為目標函數(shù)傳參數(shù),對于的是元祖類型(tuple)
"""
thread_sub1 = Thread(
target=download_image,
name='下載線程',
args=('https://f10.baidu.com/it/u=3931984114,750350835&fm=72',1))
thread_sub2 = Thread(
target=read_data,
name='讀取'
)
# 是否開啟守護進程
# daemon = False骤宣,在主線程結(jié)束的時候會檢測子線程人物是否結(jié)束秦爆,
# 如果子線程的任務沒有結(jié)束,則會讓子線程正常結(jié)束任務
# daemon = True憔披,如果子線程中的任務沒有結(jié)束會跟主線程一起結(jié)束
# thread_sub1.daemon = True
# 啟動線程
thread_sub1.start()
thread_sub1.join()
thread_sub2.start()
# join():阻塞等限,等待子線程中的任務結(jié)束再回到主線程中繼續(xù)執(zhí)行
thread_sub2.join()
print('主線程結(jié)束',threading.currentThread().name)
隊列
# 隊列
import queue
# 創(chuàng)建一個隊列,指定最大數(shù)據(jù)量
dataqueue = queue.Queue(maxsize=40)
for i in range(0,50):
# 存值,沒有存滿情況下存值
if not dataqueue.full():
dataqueue.put(i)
# 判斷是否為空
dataqueue.empty()
# 判斷是否存滿
dataqueue.full()
# 長度
dataqueue.qsize()
# 取值芬膝,F(xiàn)IFO:先進先出望门,先存的哪個就先取哪個
dataqueue.get()
# li
# 創(chuàng)建線程執(zhí)行下載任務
for i in range(1, 10):
taskQueue.put(i)
threadName = ['下載線程1號','下載線程2號','下載線程3號','下載線程4號']
crawl_thread = []
for name in threadName:
# 創(chuàng)建線程
thread_crawl = threading.Thread(target=download_page_data,
name=name,
args=(taskQueue,dataQueue)
)
crawl_thread.append(thread_crawl)
# 開啟線程
thread_crawl.start()
# 讓所有的爬取線程執(zhí)行完畢,再回到主線程中繼續(xù)執(zhí)行
for thread in crawl_thread:
thread.join()
# 加線程鎖
lock = threading.Lock()
lock.acquire() # 加鎖
lock.release() # 解鎖
使用隊列做一個簡單的爬蟲--jobbole
import queue,requests,threading,json
from lxml.html import etree
# 注意:隊列是線程之間數(shù)據(jù)的交換形式锰霜,為隊列在線程間筹误,是線程安全的
"""
1.創(chuàng)建一個任務隊列:存放的是爬取的url地址
2. 創(chuàng)建爬取線程,執(zhí)行任務下載
3. 創(chuàng)建數(shù)據(jù)隊列癣缅,存放爬取線程獲取到的頁面源碼
4.創(chuàng)建解析線程:解析html源碼厨剪,提取目標數(shù)據(jù),數(shù)據(jù)持久化
"""
# 獲取jobbole的文章列表
# http://blog.jobbole.com/all-posts/page/1/
# http://blog.jobbole.com/all-posts/page/2/
def download_page_data(taskQueue,dataQueue):
"""
執(zhí)行下載任務
:param taskQueue: 從任務隊列里面取出任務
:param dataQueue: 將獲取到的頁面源碼存到dataQueue隊列中
:return:
"""
while not taskQueue.empty():
page = taskQueue.get()
print('正在下載'+str(page)+ '頁',threading.currentThread().name)
full_url = 'http://blog.jobbole.com/all-posts/page/{}/'.format(str(page))
req_header = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:64.0) Gecko/20100101 Firefox/64.0'
}
response = requests.get(full_url,headers=req_header)
if response.status_code == 200:
# 將獲取到的頁面源碼存到dataQueue隊列里
dataQueue.put(response.text)
else:
taskQueue.put(page)
def parse_data(dataQueue,lock):
"""
解析數(shù)據(jù)友存,從dataQueue中取出的數(shù)據(jù)進行解析
:param dataQueue:
:return:
"""
while not dataQueue.empty():
print('正在解析',threading.currentThread().name)
html = data = dataQueue.get()
html_element = etree.HTML(html)
articles = html_element.xpath('//div[@class="post floated-thumb"]')
for article in articles:
articleInfo = {}
# 標題
articleInfo['title'] = article.xpath('.//a[@class="archive-title"]/text()')[0]
# 封面
img_element = article.xpath('.//div[@class="post-thumb"]/a/img')
if len(img_element) > 0:
articleInfo['coverImage'] = img_element[0].xpath('./@src')[0]
else:
articleInfo['coverImage'] = '暫無圖片'
p_as = article.xpath('.//div[@class="post-meta"]/p[1]//a')
if len(p_as) >2:
# tag類型
articleInfo['tag'] = p_as[1].xpath('./text()')[0]
# 評論量
articleInfo['commentNum'] = p_as[2].xpath('./text()')[0]
else:
# tag類型
articleInfo['tag'] = p_as[1].xpath('./text()')[0]
# 評論量
articleInfo['commentNum'] = '0'
# 簡介
articleInfo['content'] = article.xpath('.//span[@class="excerpt"]/p/text()')
# 時間
articleInfo['publishTime'] = ''.join(article.xpath('.//div[@class="post-meta"]/p[1]/text()')).replace('\n','').replace(' ','').replace('\r','').replace('.','')# //text()當前標簽下的所有文本祷膳,包括子標簽
# lock.acquire() # 加鎖
# with open('jobbole.json','a+',encoding='utf-8') as file:
# json_str = json.dumps(articleInfo,ensure_ascii=False) + '\n'
# file.write(json_str)
# lock.release() #解鎖
# print(articleInfo)
if __name__ == '__main__':
# 創(chuàng)建任務隊列
taskQueue = queue.Queue()
for i in range(1,10):
taskQueue.put(i)
# 創(chuàng)建數(shù)據(jù)隊列
dataQueue = queue.Queue()
# 創(chuàng)建線程執(zhí)行下載任務
threadName = ['下載線程1號','下載線程2號','下載線程3號','下載線程4號']
crawl_thread = []
for name in threadName:
# 創(chuàng)建線程
thread_crawl = threading.Thread(target=download_page_data,
name=name,
args=(taskQueue,dataQueue)
)
crawl_thread.append(thread_crawl)
# print(crawl_thread)
# 開啟線程
thread_crawl.start()
# 讓所有的爬取線程執(zhí)行完畢,再回到主線程中繼續(xù)執(zhí)行
for thread in crawl_thread:
thread.join()
# 加線程鎖
lock = threading.Lock()
# 創(chuàng)建解析線程屡立,從dataQueue隊列中取出頁面源碼進行解析
threadName = ['解析線程1號', '解析線程2號', '解析線程3號','解析線程4號']
parse_thread = []
for name in threadName:
# 創(chuàng)建線程
thread_parse = threading.Thread(target=parse_data,
name=name,
args=(dataQueue,lock)
)
parse_thread.append(thread_crawl)
# 開啟線程
thread_parse.start()
# 讓所有的爬取線程執(zhí)行完畢直晨,再回到主線程中繼續(xù)執(zhí)行
for thread in parse_thread:
thread.join()
線程池
from concurrent.futures import ThreadPoolExecutor
# max_workers:指定線程池中的線程的數(shù)量
pool = ThreadPoolExecutor(max_workers=1000)
# 在線程池中添加任務
handler = pool.submit(目標函數(shù),參數(shù))
# 設置回調(diào)方法,當某個線程執(zhí)行結(jié)束執(zhí)行回調(diào)結(jié)果
handler.add_done_callback(download_data)
def download_done(futures):
# 返回回調(diào)結(jié)果
print(futures.result())
# 同join()
pool.shutdown()
線程池爬蟲
from concurrent.futures import ThreadPoolExecutor
import requests,threading,json
from lxml.html import etree
# 線程池的目的:創(chuàng)建一個線程池,里面有指定數(shù)量的線程勇皇,讓線程執(zhí)行任務
def download_data(page):
print(page)
print('正在下載' + str(page) + '頁',threading.currentThread().name)
full_url = 'http://blog.jobbole.com/all-posts/page/{}/'.format(str(page))
req_header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:64.0) Gecko/20100101 Firefox/64.0'
}
response = requests.get(full_url, headers=req_header)
if response.status_code == 200:
# 將獲取到的頁面源碼存到dataQueue隊列里
print('請求成功')
return response.text,response.status_code
def download_done(futures):
# 返回回調(diào)結(jié)果
print(futures.result())
# 可以在這里做數(shù)據(jù)解析
html = futures.result()[0]
html_element = etree.HTML(html)
articles = html_element.xpath('//div[@class="post floated-thumb"]')
for article in articles:
articleInfo = {}
# 標題
articleInfo['title'] = article.xpath('.//a[@class="archive-title"]/text()')[0]
# 封面
img_element = article.xpath('.//div[@class="post-thumb"]/a/img')
if len(img_element) > 0:
articleInfo['coverImage'] = img_element[0].xpath('./@src')[0]
else:
articleInfo['coverImage'] = '暫無圖片'
p_as = article.xpath('.//div[@class="post-meta"]/p[1]//a')
if len(p_as) > 2:
# tag類型
articleInfo['tag'] = p_as[1].xpath('./text()')[0]
# 評論量
articleInfo['commentNum'] = p_as[2].xpath('./text()')[0]
else:
# tag類型
articleInfo['tag'] = p_as[1].xpath('./text()')[0]
# 評論量
articleInfo['commentNum'] = '0'
# 簡介
articleInfo['content'] = article.xpath('.//span[@class="excerpt"]/p/text()')
# 時間
articleInfo['publishTime'] = ''.join(article.xpath('.//div[@class="post-meta"]/p[1]/text()')).replace('\n',
'').replace(
' ', '').replace('\r', '').replace('.', '') # //text()當前標簽下的所有文本奕巍,包括子標簽
with open('jobbole.json', 'a+',encoding='utf-8') as file:
json_str = json.dumps(articleInfo, ensure_ascii=False) + '\n'
file.write(json_str)
if __name__ == '__main__':
# 創(chuàng)建線程池
# max_workers:指定線程池中的線程的數(shù)量
pool = ThreadPoolExecutor(max_workers=10)
for i in range(1,201):
# 線程池中添加任務
handler = pool.submit(download_data,i)
# 設置回調(diào)方法,當某個線程執(zhí)行結(jié)束執(zhí)行回調(diào)結(jié)果
handler.add_done_callback(download_data)
# 執(zhí)行shutdown()內(nèi)部是執(zhí)行join()方法
pool.shutdown()
進程
隊列
from multiprocessing import Process,Queue
import os
#maxsize=-1:設置隊列中嫩夠存儲的最大元素的個數(shù)
data_queue = Queue(maxsize=10)
def write_data(num,data_queue):
print(num)
#global data_queue
for i in range(0,num):
data_queue.put(i)
print(os.getpid(),data_queue.full())
def read_data(data_queue):
print('正在讀取',os.getpid())
#global data_queue
print(data_queue.qsize())
for i in range(0,data_queue.qsize()):
print(data_queue.get())
if __name__ == '__main__':
#os.getpid()獲取進程的id
print('主進程開啟',os.getpid())
#創(chuàng)建子進程
"""
target=None,:設置進程要執(zhí)行的函數(shù)
name=None,:設置進程的名稱
args=(), :給進程執(zhí)行的函數(shù)傳遞參數(shù)(tuple類型)
kwargs={} :給進程執(zhí)行的函數(shù)傳遞參數(shù)(字典類型)
"""
process1 = Process(target=write_data,args=(10,data_queue))
#使用start()啟動進程
process1.start()
#timeout=5:設置阻塞時間
process1.join()
process2 = Process(target=read_data,args=(data_queue,))
# 使用start()啟動進程
process2.start()
# timeout=5:設置阻塞時間
process2.join()
print('主進程結(jié)束',os.getpid())
隊列爬蟲
"""
1.創(chuàng)建任務隊列
2.創(chuàng)建爬取進程,執(zhí)行爬取任務
3.創(chuàng)建數(shù)據(jù)隊列
4.創(chuàng)建解析線程,解析獲取的數(shù)據(jù)
"""
# 案例網(wǎng)站:世紀家園
# 武漢地區(qū)的活動:(第一頁數(shù)據(jù)是靜態(tài)頁面,第二頁之后是動態(tài)加載的)
# http://date.jiayuan.com/eventslist_new.php?
# page=1&city_id=4201&shop_id=33∪迨俊(第一頁)
# http://date.jiayuan.com/eventslist_new.php?
# page=2&city_id=4201&shop_id=33〉闹埂(第二頁)
# http://date.jiayuan.com/eventslist_new.php?
# page=3&city_id=4201&shop_id=33 (第三頁)
"""
_gscu_1380850711=43812116hs5dyy11; accessID=20181222071935501079;
jy_refer=www.baidu.com; _gscbrs_1380850711=1;
PHPSESSID=9202a7e752f801a49a5747832520f1da;
plat=date_pc; DATE_FROM=daohang;
SESSION_HASH=61e963462c6b312ee1ffacf151ffaa028477217d;
user_access=1; uv_flag=124.64.18.38;
DATE_SHOW_LOC=4201; DATE_SHOW_SHOP=33
"""
# http://date.jiayuan.com/eventslist_new.php?
# page=2&city_id=31&shop_id=15
"""
_gscu_1380850711=43812116hs5dyy11; accessID=20181222071935501079;
jy_refer=www.baidu.com; _gscbrs_1380850711=1;
PHPSESSID=9202a7e752f801a49a5747832520f1da;
plat=date_pc; DATE_FROM=daohang;
SESSION_HASH=61e963462c6b312ee1ffacf151ffaa028477217d;
user_access=1; uv_flag=124.64.18.38;
DATE_SHOW_LOC=31; DATE_SHOW_SHOP=15
"""
from multiprocessing import Process,Queue
import requests,re,json
from lxml.html import etree
import time
def down_load_page_data(taskQueue,dataQueue):
"""
執(zhí)行任務的下載
:param taskQueue:
:param dataQueue:
:return:
"""
sumTime = 0
isContinue = True
while isContinue:
if not taskQueue.empty():
sumTime = 0
url = taskQueue.get()
response,cur_page = download_page_data(url)
data_dict = {'data':response.text,'page':cur_page}
dataQueue.put(data_dict)
#獲取下一頁
if cur_page != 1:
print('====',cur_page)
if isinstance(response.json(),list):
next_page = cur_page+1
next_url = re.sub('page=\d+','page='+str(next_page),url)
taskQueue.put(next_url)
else:
print('已獲取到'+str(cur_page)+'頁','沒有數(shù)據(jù)了',response.json())
pass
elif cur_page == 1:
next_page = cur_page + 1
next_url = re.sub('page=\d+', 'page=' + str(next_page), url)
taskQueue.put(next_url)
else:
#數(shù)據(jù)隊列中沒有任務了
time.sleep(0.001)
sumTime = sumTime + 1
if sumTime > 5000:
print('跳出循環(huán)')
isContinue = False
break
def download_page_data(url):
"""
下載每一個分頁的數(shù)據(jù)
:param url: 每一個分頁的url地址
:return:
"""
#http://date.jiayuan.com/eventslist_new.php?
# page=1&city_id=4201&shop_id=33
pattern = re.compile('.*?page=(\d+)&city_id=(\d+)&shop_id=(\d+)')
result = re.findall(pattern,url)[0]
cur_page = result[0]
DATE_SHOW_LOC = result[1]
DATE_SHOW_SHOP = result[2]
print(cur_page,DATE_SHOW_SHOP,DATE_SHOW_LOC)
cookie = """_gscu_1380850711=43812116hs5dyy11; accessID=20181222071935501079; jy_refer=www.baidu.com; _gscbrs_1380850711=1; PHPSESSID=9202a7e752f801a49a5747832520f1da; plat=date_pc; DATE_FROM=daohang; SESSION_HASH=61e963462c6b312ee1ffacf151ffaa028477217d; user_access=1; uv_flag=124.64.18.38; DATE_SHOW_LOC=%s; DATE_SHOW_SHOP=%s""" % (DATE_SHOW_LOC,DATE_SHOW_SHOP)
# print(cookie)
req_header = {
'User-Agent':'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36',
'Cookie':cookie,
'Referer':'http://date.jiayuan.com/eventslist.php',
}
# cookie_dict = {sub_str.split('=')[0]:sub_str.split('=')[1] for sub_str in cookie.split('; ')}
# print(cookie_dict)
#cookies(cookiejar object or dict)
response = requests.get(url,headers=req_header)
if response.status_code == 200:
print('第'+cur_page+'頁獲取成功',DATE_SHOW_SHOP,DATE_SHOW_LOC)
return response,int(cur_page)
def parse_page_data(dataQueue):
"""
解析進程解析數(shù)據(jù)
:param dataQueue:
:return:
"""
while not dataQueue.empty():
data = dataQueue.get()
page = data['page']
html = data['data']
if page == 1:
print('解析第一頁數(shù)據(jù),靜態(tài)頁面')
html_element = etree.HTML(html)
hot_active = html_element.xpath('//div[@class="hot_detail fn-clear"]')
for hot_div in hot_active:
# 活動詳情的url地址
full_detail_url = 'http://date.jiayuan.com' + hot_div.xpath('.//h2[@class="hot_title"]/a/@href')[0]
response = download_detail_data(full_detail_url)
parse_detail_data(response)
more_active = html_element.xpath('//ul[@class="review_detail fn-clear t-activiUl"]/li')
for more_li in more_active:
# 活動詳情的url地址
full_detail_url = 'http://date.jiayuan.com' + more_li.xpath('.//a[@class="review_link"]/@href')[0]
response = download_detail_data(full_detail_url)
parse_detail_data(response)
else:
print('解析第'+str(page)+'數(shù)據(jù)','非靜態(tài)頁面')
#使用json.loads()將json字符串轉(zhuǎn)換為python數(shù)據(jù)類型
json_obj = json.loads(html)
if isinstance(json_obj, list):
# 是列表,說明得到的是正確的數(shù)據(jù),
print('正在解析數(shù)據(jù)')
for sub_dict in json_obj:
id = sub_dict['id']
#http://date.jiayuan.com/activityreviewdetail.php?id=11706
full_detail_url = 'http://date.jiayuan.com/activityreviewdetail.php?id=%s' % id
response = download_detail_data(full_detail_url)
parse_detail_data(response)
def download_detail_data(url):
"""
根據(jù)活動詳情的url地址發(fā)起請求
:param url:
:return:
"""
req_header = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36',
'Cookie': '_gscu_1380850711=43812116hs5dyy11; accessID=20181222071935501079; jy_refer=www.baidu.com; _gscbrs_1380850711=1; PHPSESSID=9202a7e752f801a49a5747832520f1da; plat=date_pc; DATE_FROM=daohang; SESSION_HASH=61e963462c6b312ee1ffacf151ffaa028477217d; user_access=1; uv_flag=124.64.18.38; DATE_SHOW_LOC=50; DATE_SHOW_SHOP=5',
'Referer': 'http://date.jiayuan.com/eventslist.php',
}
response = requests.get(url, headers=req_header)
if response.status_code == 200:
print('詳情頁面獲取成功',response.url)
return response
def parse_detail_data(response):
"""
解析活動詳情
:param response:
:return:
"""
html_element = etree.HTML(response.text)
# 創(chuàng)建一個字典着撩,存放獲取的數(shù)據(jù)
item = {}
# 活動標題
item['title'] = ''.join(html_element.xpath('//h1[@class="detail_title"]/text()')[0])
# 活動時間
item['time'] = ','.join(
html_element.xpath('//div[@class="detail_right fn-left"]/ul[@class="detail_info"]/li[1]//text()')[0])
# 活動地址
item['adress'] = html_element.xpath('//ul[@class="detail_info"]/li[2]/text()')[0]
# 參加人數(shù)
item['joinnum'] = html_element.xpath('//ul[@class="detail_info"]/li[3]/span[1]/text()')[0]
# 預約人數(shù)
item['yuyue'] = html_element.xpath('//ul[@class="detail_info"]/li[3]/span[2]/text()')[0]
# 介紹
item['intreduces'] = html_element.xpath('//div[@class="detail_act fn-clear"][1]//p[@class="info_word"]/span[1]/text()')[0]
# 提示
item['point'] = html_element.xpath('//div[@class="detail_act fn-clear"][2]//p[@class="info_word"]/text()')[0]
# 體驗店介紹
item['introductionStore'] = ''.join(
html_element.xpath('//div[@class="detail_act fn-clear"][3]//p[@class="info_word"]/text()'))
# 圖片連接
item['coverImage'] = html_element.xpath('//div[@class="detail_left fn-left"]/img/@data-original')[0]
with open('shijijiyua.json','a+') as file:
json_str = json.dumps(item,ensure_ascii=False)+'\n'
file.write(json_str)
if __name__ == '__main__':
#創(chuàng)建任務隊列
taskQueue = Queue()
#設置起始任務
taskQueue.put('http://date.jiayuan.com/eventslist_new.php?page=1&city_id=4201&shop_id=33')
taskQueue.put('http://date.jiayuan.com/eventslist_new.php?page=1&city_id=31&shop_id=15')
taskQueue.put('http://date.jiayuan.com/eventslist_new.php?page=1&city_id=3702&shop_id=42')
taskQueue.put('http://date.jiayuan.com/eventslist_new.php?page=1&city_id=50&shop_id=5')
#創(chuàng)建數(shù)據(jù)隊列
dataQueue = Queue()
#創(chuàng)建進程爬取任務
for i in range(0,3):
process_crawl = Process(
target=down_load_page_data,
args=(taskQueue,dataQueue)
)
process_crawl.start()
time.sleep(10)
#創(chuàng)建解析進程
for i in range(0,3):
process_parse = Process(
target=parse_page_data,
args=(dataQueue,)
)
process_parse.start()
進程池
from concurrent.futures import ProcessPoolExecutor
import os
"""
def download_page_data(page):
print(page,os.getpid())
return '下載完成'+str(page),page
def download_done(futures):
result = futures.result()
print(result)
next_page = int(result[1])+1
handler = pool.submit(download_page_data,next_page)
handler.add_done_callback(download_done)
if __name__ == '__main__':
#創(chuàng)建進程池
pool = ProcessPoolExecutor(4)
for page in range(0,200):
hanlder = pool.submit(download_page_data,page)
#回調(diào)函數(shù)的設置,看自己是否需要
hanlder.add_done_callback(download_done)
#cannot schedule new futures after shutdown
# pool.shutdown()
"""
#方式二
from multiprocessing import Pool
def download_page_data(page):
print(page,os.getpid())
return '下載完成'+str(page),page
def done(futures):
print(futures)
if __name__ == '__main__':
#創(chuàng)建進程池
pool = Pool(4)
for page in range(0,200):
# pool.apply_async() 異步非阻塞添加任務
# pool.apply() 同步的方式添加任務
# func, 要執(zhí)行的方法(函數(shù))
# args=(),給函數(shù)傳遞的參數(shù)
#callback = None,成功的回調(diào)
#error_callback = None,執(zhí)行錯誤的回調(diào)
pool.apply_async(download_page_data,args=(page,),callback=done)
pool.close() #執(zhí)行close后不可以再添加任務了
pool.join()
進程池爬蟲
from concurrent.futures import ProcessPoolExecutor
import requests
import time,re,json
from lxml.html import etree
def down_load_page_data(url):
"""
執(zhí)行任務的下載
:param url
:return:
"""
response,cur_page = download_page_data(url)
data_dict = {'data':response.text,'page':cur_page}
#獲取下一頁
if cur_page != 1:
if isinstance(response.json(),list):
next_page = cur_page+1
next_url = re.sub('page=\d+','page='+str(next_page),url)
else:
print('已獲取到'+str(cur_page)+'頁','沒有數(shù)據(jù)了',response.json())
next_url = None
pass
elif cur_page == 1:
next_page = cur_page + 1
next_url = re.sub('page=\d+', 'page=' + str(next_page), url)
print('====', cur_page)
return data_dict,next_url
def download_page_data(url):
"""
下載每一個分頁的數(shù)據(jù)
:param url: 每一個分頁的url地址
:return:
"""
#http://date.jiayuan.com/eventslist_new.php?
# page=1&city_id=4201&shop_id=33
pattern = re.compile('.*?page=(\d+)&city_id=(\d+)&shop_id=(\d+)')
result = re.findall(pattern,url)[0]
cur_page = result[0]
DATE_SHOW_LOC = result[1]
DATE_SHOW_SHOP = result[2]
print(cur_page,DATE_SHOW_SHOP,DATE_SHOW_LOC)
cookie = """_gscu_1380850711=43812116hs5dyy11; accessID=20181222071935501079; jy_refer=www.baidu.com; _gscbrs_1380850711=1; PHPSESSID=9202a7e752f801a49a5747832520f1da; plat=date_pc; DATE_FROM=daohang; SESSION_HASH=61e963462c6b312ee1ffacf151ffaa028477217d; user_access=1; uv_flag=124.64.18.38; DATE_SHOW_LOC=%s; DATE_SHOW_SHOP=%s""" % (DATE_SHOW_LOC,DATE_SHOW_SHOP)
# print(cookie)
req_header = {
'User-Agent':'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36',
'Cookie':cookie,
'Referer':'http://date.jiayuan.com/eventslist.php',
}
# cookie_dict = {sub_str.split('=')[0]:sub_str.split('=')[1] for sub_str in cookie.split('; ')}
# print(cookie_dict)
#cookies(cookiejar object or dict)
response = requests.get(url,headers=req_header)
if response.status_code == 200:
print('第'+cur_page+'頁獲取成功',DATE_SHOW_SHOP,DATE_SHOW_LOC)
return response,int(cur_page)
def parse_page_data(futures):
"""
step1:獲取到下一頁的url地址,繼續(xù)網(wǎng)進程池中添加任務
strp2:獲取到分頁的頁面源碼,進行數(shù)據(jù)的解析
:param futures:
:return:
"""
result = futures.result()
data = result[0]
next_page_url = result[1]
print(data,next_page_url)
if next_page_url:
print('正在天加任務',next_page_url)
handler = page_pool.submit(down_load_page_data, next_page_url)
handler.add_done_callback(parse_page_data)
page = data['page']
html = data['data']
# 創(chuàng)建進程池(獲取活動詳情的頁面源碼)
detail_pool = ProcessPoolExecutor(3)
if page == 1:
print('解析第一頁數(shù)據(jù),靜態(tài)頁面')
html_element = etree.HTML(html)
hot_active = html_element.xpath('//div[@class="hot_detail fn-clear"]')
for hot_div in hot_active:
# 活動詳情的url地址
full_detail_url = 'http://date.jiayuan.com' + hot_div.xpath('.//h2[@class="hot_title"]/a/@href')[0]
detail_handler = detail_pool.submit(download_detail_data,full_detail_url)
detail_handler.add_done_callback(parse_detail_data)
more_active = html_element.xpath('//ul[@class="review_detail fn-clear t-activiUl"]/li')
for more_li in more_active:
# 活動詳情的url地址
full_detail_url = 'http://date.jiayuan.com' + more_li.xpath('.//a[@class="review_link"]/@href')[0]
detail_handler = detail_pool.submit(download_detail_data, full_detail_url)
detail_handler.add_done_callback(parse_detail_data)
else:
print('解析第' + str(page) + '數(shù)據(jù)', '非靜態(tài)頁面')
# 使用json.loads()將json字符串轉(zhuǎn)換為python數(shù)據(jù)類型
json_obj = json.loads(html)
if isinstance(json_obj, list):
# 是列表,說明得到的是正確的數(shù)據(jù),
print('正在解析數(shù)據(jù)')
for sub_dict in json_obj:
id = sub_dict['id']
# http://date.jiayuan.com/activityreviewdetail.php?id=11706
full_detail_url = 'http://date.jiayuan.com/activityreviewdetail.php?id=%s' % id
detail_handler = detail_pool.submit(download_detail_data, full_detail_url)
detail_handler.add_done_callback(parse_detail_data)
def download_detail_data(url):
"""
根據(jù)活動詳情的url地址發(fā)起請求
:param url:
:return:
"""
req_header = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36',
'Cookie': '_gscu_1380850711=43812116hs5dyy11; accessID=20181222071935501079; jy_refer=www.baidu.com; _gscbrs_1380850711=1; PHPSESSID=9202a7e752f801a49a5747832520f1da; plat=date_pc; DATE_FROM=daohang; SESSION_HASH=61e963462c6b312ee1ffacf151ffaa028477217d; user_access=1; uv_flag=124.64.18.38; DATE_SHOW_LOC=50; DATE_SHOW_SHOP=5',
'Referer': 'http://date.jiayuan.com/eventslist.php',
}
response = requests.get(url, headers=req_header)
if response.status_code == 200:
print('詳情頁面獲取成功',response.url)
return response
def parse_detail_data(futures):
"""
解析活動詳情
:param response:
:return:
"""
response = futures.result()
html_element = etree.HTML(response.text)
# 創(chuàng)建一個字典伶唯,存放獲取的數(shù)據(jù)
item = {}
# 活動標題
item['title'] = ''.join(html_element.xpath('//h1[@class="detail_title"]/text()')[0])
# 活動時間
item['time'] = ','.join(
html_element.xpath('//div[@class="detail_right fn-left"]/ul[@class="detail_info"]/li[1]//text()')[0])
# 活動地址
item['adress'] = html_element.xpath('//ul[@class="detail_info"]/li[2]/text()')[0]
# 參加人數(shù)
item['joinnum'] = html_element.xpath('//ul[@class="detail_info"]/li[3]/span[1]/text()')[0]
# 預約人數(shù)
item['yuyue'] = html_element.xpath('//ul[@class="detail_info"]/li[3]/span[2]/text()')[0]
# 介紹
item['intreduces'] = html_element.xpath('//div[@class="detail_act fn-clear"][1]//p[@class="info_word"]/span[1]/text()')[0]
# 提示
item['point'] = html_element.xpath('//div[@class="detail_act fn-clear"][2]//p[@class="info_word"]/text()')[0]
# 體驗店介紹
item['introductionStore'] = ''.join(
html_element.xpath('//div[@class="detail_act fn-clear"][3]//p[@class="info_word"]/text()'))
# 圖片連接
item['coverImage'] = html_element.xpath('//div[@class="detail_left fn-left"]/img/@data-original')[0]
with open('shijijiyua.json','a+') as file:
json_str = json.dumps(item,ensure_ascii=False)+'\n'
file.write(json_str)
if __name__ == '__main__':
#創(chuàng)建一個進程池,執(zhí)行分頁任務下載
page_pool = ProcessPoolExecutor(4)
start_urls = [
'http://date.jiayuan.com/eventslist_new.php?page=1&city_id=4201&shop_id=33',
'http://date.jiayuan.com/eventslist_new.php?page=1&city_id=31&shop_id=15',
'http://date.jiayuan.com/eventslist_new.php?page=1&city_id=3702&shop_id=42',
'http://date.jiayuan.com/eventslist_new.php?page=1&city_id=50&shop_id=5',
]
for url in start_urls:
handler = page_pool.submit(down_load_page_data,url)
handler.add_done_callback(parse_page_data)
爬蟲多任務盐捷、進程、線程
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
- 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事∈鑫” “怎么了错英?”我有些...
- 文/不壞的土叔 我叫張陵,是天一觀的道長碉考。 經(jīng)常有香客問我章钾,道長,這世上最難降的妖魔是什么串纺? 我笑而不...
- 正文 為了忘掉前任,我火速辦了婚禮椰棘,結(jié)果婚禮上邪狞,老公的妹妹穿的比我還像新娘。我一直安慰自己,他們只是感情好碍脏,可當我...
- 文/花漫 我一把揭開白布急黎。 她就那樣靜靜地躺著扎狱,像睡著了一般匠抗。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上污抬,一...
- 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼楣责!你這毒婦竟也來了竣灌?” 一聲冷哼從身側(cè)響起,我...
- 正文 年R本政府宣布,位于F島的核電站豌骏,受9級特大地震影響龟梦,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜窃躲,卻給世界環(huán)境...
- 文/蒙蒙 一计贰、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧蒂窒,春花似錦躁倒、人聲如沸荞怒。這莊子的主人今日做“春日...
- 文/蒼蘭香墨 我抬頭看了看天上的太陽褐桌。三九已至,卻和暖如春象迎,著一層夾襖步出監(jiān)牢的瞬間荧嵌,已是汗流浹背。 一陣腳步聲響...
推薦閱讀更多精彩內(nèi)容
- 實現(xiàn)多任務的方式 多線程多進程協(xié)程多線程+多進程 為什么你能夠?qū)崿F(xiàn)多任務要拂? 并行:同時發(fā)起抠璃,同時執(zhí)行,多進程,進程...
- 一 、實現(xiàn)多任務的方式 多線程多進程協(xié)程多線程+多進程 并行,并發(fā) 并行:同時發(fā)起同時執(zhí)行,(4核,4個任務)并發(fā)...
- 這一次分析主要是針對上 分布式爬蟲筆記(一)- 非框架實現(xiàn)的Crawlspider 的一次改進拉一,從單機的爬蟲改成多...
- 用 python 挺久了,但并沒有深入了解過多線程多進程之類的知識嫡纠,最近看了許多關(guān)于多線程多進程的知識烦租,記錄簡單的...