github的一個(gè)項(xiàng)目 算是一個(gè)中型項(xiàng)目了 要寫的東西蠻多的
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import codecs
import csv
import os
import random
import re
import sys
import traceback
from collections import OrderedDict
from datetime import datetime, timedelta
from time import sleep
import requests
from lxml import etree
from tqdm import tqdm
class Weibo(object):
cookie = {'Cookie': 'your cookie'} # 將your cookie替換成自己的cookie
def __init__(self, user_id, filter=0, pic_download=0):
"""Weibo類初始化"""
if not isinstance(user_id, int):
sys.exit(u'user_id值應(yīng)為一串?dāng)?shù)字形式,請(qǐng)重新輸入')
if filter != 0 and filter != 1:
sys.exit(u'filter值應(yīng)為0或1,請(qǐng)重新輸入')
if pic_download != 0 and pic_download != 1:
sys.exit(u'pic_download值應(yīng)為0或1,請(qǐng)重新輸入')
self.user_id = user_id # 用戶id,即需要我們輸入的數(shù)字,如昵稱為"Dear-迪麗熱巴"的id為1669879400
self.filter = filter # 取值范圍為0、1,程序默認(rèn)值為0,代表要爬取用戶的全部微博,1代表只爬取用戶的原創(chuàng)微博
self.pic_download = pic_download # 取值范圍為0烹骨、1,程序默認(rèn)值為0,代表不下載微博原始圖片,1代表下載
self.nickname = '' # 用戶昵稱,如“Dear-迪麗熱巴”
self.weibo_num = 0 # 用戶全部微博數(shù)
self.got_num = 0 # 爬取到的微博數(shù)
self.following = 0 # 用戶關(guān)注數(shù)
self.followers = 0 # 用戶粉絲數(shù)
self.weibo = [] # 存儲(chǔ)爬取到的所有微博信息
def deal_html(self, url):
"""處理html"""
try:
html = requests.get(url, cookies=self.cookie).content
selector = etree.HTML(html)
return selector
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def deal_garbled(self, info):
"""處理亂碼"""
try:
info = (info.xpath('string(.)').replace(u'\u200b', '').encode(
sys.stdout.encoding, 'ignore').decode(sys.stdout.encoding))
return info
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def get_nickname(self):
"""獲取用戶昵稱"""
try:
url = 'https://weibo.cn/%d/info' % (self.user_id)
selector = self.deal_html(url)
nickname = selector.xpath('//title/text()')[0]
self.nickname = nickname[:-3]
if self.nickname == u'登錄 - 新' or self.nickname == u'新浪':
sys.exit(u'cookie錯(cuò)誤或已過期,請(qǐng)按照README中方法重新獲取')
print(u'用戶昵稱: ' + self.nickname)
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def get_user_info(self, selector):
"""獲取用戶昵稱、微博數(shù)、關(guān)注數(shù)屿岂、粉絲數(shù)"""
try:
self.get_nickname() # 獲取用戶昵稱
user_info = selector.xpath("http://div[@class='tip2']/*/text()")
self.weibo_num = int(user_info[0][3:-1])
print(u'微博數(shù): ' + str(self.weibo_num))
self.following = int(user_info[1][3:-1])
print(u'關(guān)注數(shù): ' + str(self.following))
self.followers = int(user_info[2][3:-1])
print(u'粉絲數(shù): ' + str(self.followers))
print('*' * 100)
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def get_page_num(self, selector):
"""獲取微博總頁(yè)數(shù)"""
try:
if selector.xpath("http://input[@name='mp']") == []:
page_num = 1
else:
page_num = (int)(
selector.xpath("http://input[@name='mp']")[0].attrib['value'])
return page_num
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def get_long_weibo(self, weibo_link):
"""獲取長(zhǎng)原創(chuàng)微博"""
try:
selector = self.deal_html(weibo_link)
info = selector.xpath("http://div[@class='c']")[1]
wb_content = self.deal_garbled(info)
wb_time = info.xpath("http://span[@class='ct']/text()")[0]
weibo_content = wb_content[wb_content.find(':') +
1:wb_content.rfind(wb_time)]
return weibo_content
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def get_original_weibo(self, info, weibo_id):
"""獲取原創(chuàng)微博"""
try:
weibo_content = self.deal_garbled(info)
weibo_content = weibo_content[:weibo_content.rfind(u'贊')]
a_text = info.xpath('div//a/text()')
if u'全文' in a_text:
weibo_link = 'https://weibo.cn/comment/' + weibo_id
wb_content = self.get_long_weibo(weibo_link)
if wb_content:
weibo_content = wb_content
return weibo_content
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def get_long_retweet(self, weibo_link):
"""獲取長(zhǎng)轉(zhuǎn)發(fā)微博"""
try:
wb_content = self.get_long_weibo(weibo_link)
weibo_content = wb_content[:wb_content.rfind(u'原文轉(zhuǎn)發(fā)')]
return weibo_content
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def get_retweet(self, info, weibo_id):
"""獲取轉(zhuǎn)發(fā)微博"""
try:
original_user = info.xpath("div/span[@class='cmt']/a/text()")
if not original_user:
wb_content = u'轉(zhuǎn)發(fā)微博已被刪除'
return wb_content
else:
original_user = original_user[0]
wb_content = self.deal_garbled(info)
wb_content = wb_content[wb_content.find(':') +
1:wb_content.rfind(u'贊')]
wb_content = wb_content[:wb_content.rfind(u'贊')]
a_text = info.xpath('div//a/text()')
if u'全文' in a_text:
weibo_link = 'https://weibo.cn/comment/' + weibo_id
weibo_content = self.get_long_retweet(weibo_link)
if weibo_content:
wb_content = weibo_content
retweet_reason = self.deal_garbled(info.xpath('div')[-1])
retweet_reason = retweet_reason[:retweet_reason.rindex(u'贊')]
wb_content = (retweet_reason + '\n' + u'原始用戶: ' + original_user +
'\n' + u'轉(zhuǎn)發(fā)內(nèi)容: ' + wb_content)
return wb_content
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def is_original(self, info):
"""判斷微博是否為原創(chuàng)微博"""
is_original = info.xpath("div/span[@class='cmt']")
if len(is_original) > 3:
return False
else:
return True
def get_weibo_content(self, info, is_original):
"""獲取微博內(nèi)容"""
try:
weibo_id = info.xpath('@id')[0][2:]
if is_original:
weibo_content = self.get_original_weibo(info, weibo_id)
else:
weibo_content = self.get_retweet(info, weibo_id)
print(weibo_content)
return weibo_content
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def get_publish_place(self, info):
"""獲取微博發(fā)布位置"""
try:
div_first = info.xpath('div')[0]
a_list = div_first.xpath('a')
publish_place = u'無(wú)'
for a in a_list:
if ('place.weibo.com' in a.xpath('@href')[0]
and a.xpath('text()')[0] == u'顯示地圖'):
weibo_a = div_first.xpath("span[@class='ctt']/a")
if len(weibo_a) >= 1:
publish_place = weibo_a[-1]
if (u'視頻' == div_first.xpath(
"span[@class='ctt']/a/text()")[-1][-2:]):
if len(weibo_a) >= 2:
publish_place = weibo_a[-2]
else:
publish_place = u'無(wú)'
publish_place = self.deal_garbled(publish_place)
break
print(u'微博發(fā)布位置: ' + publish_place)
return publish_place
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def get_publish_time(self, info):
"""獲取微博發(fā)布時(shí)間"""
try:
str_time = info.xpath("div/span[@class='ct']")
str_time = self.deal_garbled(str_time[0])
publish_time = str_time.split(u'來自')[0]
if u'剛剛' in publish_time:
publish_time = datetime.now().strftime('%Y-%m-%d %H:%M')
elif u'分鐘' in publish_time:
minute = publish_time[:publish_time.find(u'分鐘')]
minute = timedelta(minutes=int(minute))
publish_time = (datetime.now() -
minute).strftime('%Y-%m-%d %H:%M')
elif u'今天' in publish_time:
today = datetime.now().strftime('%Y-%m-%d')
time = publish_time[3:]
publish_time = today + ' ' + time
elif u'月' in publish_time:
year = datetime.now().strftime('%Y')
month = publish_time[0:2]
day = publish_time[3:5]
time = publish_time[7:12]
publish_time = year + '-' + month + '-' + day + ' ' + time
else:
publish_time = publish_time[:16]
print(u'微博發(fā)布時(shí)間: ' + publish_time)
return publish_time
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def get_publish_tool(self, info):
"""獲取微博發(fā)布工具"""
try:
str_time = info.xpath("div/span[@class='ct']")
str_time = self.deal_garbled(str_time[0])
if len(str_time.split(u'來自')) > 1:
publish_tool = str_time.split(u'來自')[1]
else:
publish_tool = u'無(wú)'
print(u'微博發(fā)布工具: ' + publish_tool)
return publish_tool
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def get_weibo_footer(self, info):
"""獲取微博點(diǎn)贊數(shù)腺晾、轉(zhuǎn)發(fā)數(shù)工猜、評(píng)論數(shù)"""
try:
footer = {}
pattern = r'\d+'
str_footer = info.xpath('div')[-1]
str_footer = self.deal_garbled(str_footer)
str_footer = str_footer[str_footer.rfind(u'贊'):]
weibo_footer = re.findall(pattern, str_footer, re.M)
up_num = int(weibo_footer[0])
print(u'點(diǎn)贊數(shù): ' + str(up_num))
footer['up_num'] = up_num
retweet_num = int(weibo_footer[1])
print(u'轉(zhuǎn)發(fā)數(shù): ' + str(retweet_num))
footer['retweet_num'] = retweet_num
comment_num = int(weibo_footer[2])
print(u'評(píng)論數(shù): ' + str(comment_num))
footer['comment_num'] = comment_num
return footer
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def extract_picture_urls(self, info, weibo_id):
"""提取微博原始圖片url"""
try:
a_list = info.xpath('div/a/@href')
first_pic = 'https://weibo.cn/mblog/pic/' + weibo_id + '?rl=0'
all_pic = 'https://weibo.cn/mblog/picAll/' + weibo_id + '?rl=1'
if first_pic in a_list:
if all_pic in a_list:
selector = self.deal_html(all_pic)
preview_picture_list = selector.xpath('//img/@src')
picture_list = [
p.replace('/thumb180/', '/large/')
for p in preview_picture_list
]
picture_urls = ','.join(picture_list)
else:
if info.xpath('.//img/@src'):
preview_picture = info.xpath('.//img/@src')[-1]
picture_urls = preview_picture.replace(
'/wap180/', '/large/')
else:
sys.exit(
u"爬蟲微博可能被設(shè)置成了'不顯示圖片',請(qǐng)前往"
u"'https://weibo.cn/account/customize/pic'肛著,修改為'顯示'"
)
else:
picture_urls = '無(wú)'
return picture_urls
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def get_picture_urls(self, info, is_original):
"""獲取微博原始圖片url"""
try:
weibo_id = info.xpath('@id')[0][2:]
picture_urls = {}
if is_original:
original_pictures = self.extract_picture_urls(info, weibo_id)
picture_urls['original_pictures'] = original_pictures
if not self.filter:
picture_urls['retweet_pictures'] = '無(wú)'
else:
retweet_url = info.xpath("div/a[@class='cc']/@href")[0]
retweet_id = retweet_url.split('/')[-1].split('?')[0]
retweet_pictures = self.extract_picture_urls(info, retweet_id)
picture_urls['retweet_pictures'] = retweet_pictures
a_list = info.xpath('div[last()]/a/@href')
original_picture = '無(wú)'
for a in a_list:
if a.endswith(('.gif', '.jpeg', '.jpg', '.png')):
original_picture = a
break
picture_urls['original_pictures'] = original_picture
return picture_urls
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def download_pic(self, url, pic_path):
"""下載單張圖片"""
try:
p = requests.get(url)
with open(pic_path, 'wb') as f:
f.write(p.content)
except Exception as e:
error_file = self.get_filepath(
'img') + os.sep + 'not_downloaded_pictures.txt'
with open(error_file, 'ab') as f:
url = url + '\n'
f.write(url.encode(sys.stdout.encoding))
print('Error: ', e)
traceback.print_exc()
def download_pictures(self):
"""下載微博圖片"""
try:
print(u'即將進(jìn)行圖片下載')
img_dir = self.get_filepath('img')
for w in tqdm(self.weibo, desc=u'圖片下載進(jìn)度'):
if w['original_pictures'] != '無(wú)':
pic_prefix = w['publish_time'][:11].replace(
'-', '') + '_' + w['id']
if ',' in w['original_pictures']:
w['original_pictures'] = w['original_pictures'].split(
',')
for j, url in enumerate(w['original_pictures']):
pic_suffix = url[url.rfind('.'):]
pic_name = pic_prefix + '_' + str(j +
1) + pic_suffix
pic_path = img_dir + os.sep + pic_name
self.download_pic(url, pic_path)
else:
pic_suffix = w['original_pictures'][
w['original_pictures'].rfind('.'):]
pic_name = pic_prefix + pic_suffix
pic_path = img_dir + os.sep + pic_name
self.download_pic(w['original_pictures'], pic_path)
print(u'圖片下載完畢,保存路徑:')
print(img_dir)
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def get_one_weibo(self, info):
"""獲取一條微博的全部信息"""
try:
weibo = OrderedDict()
is_original = self.is_original(info)
if (not self.filter) or is_original:
weibo['id'] = info.xpath('@id')[0][2:]
weibo['content'] = self.get_weibo_content(info,
is_original) # 微博內(nèi)容
picture_urls = self.get_picture_urls(info, is_original)
weibo['original_pictures'] = picture_urls[
'original_pictures'] # 原創(chuàng)圖片url
if not self.filter:
weibo['retweet_pictures'] = picture_urls[
'retweet_pictures'] # 轉(zhuǎn)發(fā)圖片url
weibo['original'] = is_original # 是否原創(chuàng)微博
weibo['publish_place'] = self.get_publish_place(info) # 微博發(fā)布位置
weibo['publish_time'] = self.get_publish_time(info) # 微博發(fā)布時(shí)間
weibo['publish_tool'] = self.get_publish_tool(info) # 微博發(fā)布工具
footer = self.get_weibo_footer(info)
weibo['up_num'] = footer['up_num'] # 微博點(diǎn)贊數(shù)
weibo['retweet_num'] = footer['retweet_num'] # 轉(zhuǎn)發(fā)數(shù)
weibo['comment_num'] = footer['comment_num'] # 評(píng)論數(shù)
else:
weibo = None
return weibo
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def get_one_page(self, page):
"""獲取第page頁(yè)的全部微博"""
try:
url = 'https://weibo.cn/u/%d?page=%d' % (self.user_id, page)
selector = self.deal_html(url)
info = selector.xpath("http://div[@class='c']")
is_exist = info[0].xpath("div/span[@class='ctt']")
if is_exist:
for i in range(0, len(info) - 2):
weibo = self.get_one_weibo(info[i])
if weibo:
self.weibo.append(weibo)
self.got_num += 1
print('-' * 100)
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def get_filepath(self, type):
"""獲取結(jié)果文件路徑"""
try:
file_dir = os.path.split(os.path.realpath(
__file__))[0] + os.sep + 'weibo' + os.sep + self.nickname
if type == 'img':
file_dir = file_dir + os.sep + 'img'
if not os.path.isdir(file_dir):
os.makedirs(file_dir)
if type == 'img':
return file_dir
file_path = file_dir + os.sep + '%d' % self.user_id + '.' + type
return file_path
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def write_csv(self, wrote_num):
"""將爬取的信息寫入csv文件"""
try:
result_headers = [
'微博id',
'微博正文',
'原始圖片url',
'發(fā)布位置',
'發(fā)布時(shí)間',
'發(fā)布工具',
'點(diǎn)贊數(shù)',
'轉(zhuǎn)發(fā)數(shù)',
'評(píng)論數(shù)',
]
if not self.filter:
result_headers.insert(3, '被轉(zhuǎn)發(fā)微博原始圖片url')
result_headers.insert(4, '是否為原創(chuàng)微博')
result_data = [w.values() for w in self.weibo][wrote_num:]
if sys.version < '3': # python2.x
reload(sys)
sys.setdefaultencoding('utf-8')
with open(self.get_filepath('csv'), 'ab') as f:
f.write(codecs.BOM_UTF8)
writer = csv.writer(f)
if wrote_num == 0:
writer.writerows([result_headers])
writer.writerows(result_data)
else: # python3.x
with open(self.get_filepath('csv'),
'a',
encoding='utf-8-sig',
newline='') as f:
writer = csv.writer(f)
if wrote_num == 0:
writer.writerows([result_headers])
writer.writerows(result_data)
print(u'%d條微博寫入csv文件完畢,保存路徑:' % self.got_num)
print(self.get_filepath('csv'))
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def write_txt(self, wrote_num):
"""將爬取的信息寫入txt文件"""
try:
temp_result = []
if wrote_num == 0:
if self.filter:
result_header = u'\n\n原創(chuàng)微博內(nèi)容: \n'
else:
result_header = u'\n\n微博內(nèi)容: \n'
result_header = (u'用戶信息\n用戶昵稱:' + self.nickname + u'\n用戶id: ' +
str(self.user_id) + u'\n微博數(shù): ' +
str(self.weibo_num) + u'\n關(guān)注數(shù): ' +
str(self.following) + u'\n粉絲數(shù): ' +
str(self.followers) + result_header)
temp_result.append(result_header)
for i, w in enumerate(self.weibo[wrote_num:]):
temp_result.append(
str(wrote_num + i + 1) + ':' + w['content'] + '\n' +
u'微博位置: ' + w['publish_place'] + '\n' + u'發(fā)布時(shí)間: ' +
w['publish_time'] + '\n' + u'點(diǎn)贊數(shù): ' + str(w['up_num']) +
u' 轉(zhuǎn)發(fā)數(shù): ' + str(w['retweet_num']) + u' 評(píng)論數(shù): ' +
str(w['comment_num']) + '\n' + u'發(fā)布工具: ' +
w['publish_tool'] + '\n\n')
result = ''.join(temp_result)
with open(self.get_filepath('txt'), 'ab') as f:
f.write(result.encode(sys.stdout.encoding))
print(u'%d條微博寫入txt文件完畢,保存路徑:' % self.got_num)
print(self.get_filepath('txt'))
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def write_file(self, wrote_num):
"""寫文件"""
if self.got_num > wrote_num:
self.write_csv(wrote_num)
self.write_txt(wrote_num)
def get_weibo_info(self):
"""獲取微博信息"""
try:
url = 'https://weibo.cn/u/%d' % (self.user_id)
selector = self.deal_html(url)
self.get_user_info(selector) # 獲取用戶昵稱、微博數(shù)跺讯、關(guān)注數(shù)枢贿、粉絲數(shù)
page_num = self.get_page_num(selector) # 獲取微博總頁(yè)數(shù)
wrote_num = 0
page1 = 0
random_pages = random.randint(1, 5)
for page in tqdm(range(1, page_num + 1), desc=u'進(jìn)度'):
self.get_one_page(page) # 獲取第page頁(yè)的全部微博
if page % 20 == 0: # 每爬20頁(yè)寫入一次文件
self.write_file(wrote_num)
wrote_num = self.got_num
# 通過加入隨機(jī)等待避免被限制。爬蟲速度過快容易被系統(tǒng)限制(一段時(shí)間后限
# 制會(huì)自動(dòng)解除)抬吟,加入隨機(jī)等待模擬人的操作萨咕,可降低被系統(tǒng)限制的風(fēng)險(xiǎn)。默
# 認(rèn)是每爬取1到5頁(yè)隨機(jī)等待6到10秒火本,如果仍然被限危队,可適當(dāng)增加sleep時(shí)間
if page - page1 == random_pages and page < page_num:
sleep(random.randint(6, 10))
page1 = page
random_pages = random.randint(1, 5)
self.write_file(wrote_num) # 將剩余不足20頁(yè)的微博寫入文件
if not self.filter:
print(u'共爬取' + str(self.got_num) + u'條微博')
else:
print(u'共爬取' + str(self.got_num) + u'條原創(chuàng)微博')
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def start(self):
"""運(yùn)行爬蟲"""
try:
self.get_weibo_info()
print(u'信息抓取完畢')
print('*' * 100)
if self.pic_download == 1:
self.download_pictures()
except Exception as e:
print('Error: ', e)
traceback.print_exc()
def main():
try:
# 使用實(shí)例,輸入一個(gè)用戶id,所有信息都會(huì)存儲(chǔ)在wb實(shí)例中
user_id = 1669879400 # 可以改成任意合法的用戶id(爬蟲的微博id除外)
filter = 1 # 值為0表示爬取全部微博(原創(chuàng)微博+轉(zhuǎn)發(fā)微博)钙畔,值為1表示只爬取原創(chuàng)微博
pic_download = 1 # 值為0代表不下載微博原始圖片,1代表下載微博原始圖片
wb = Weibo(user_id, filter, pic_download) # 調(diào)用Weibo類茫陆,創(chuàng)建微博實(shí)例wb
wb.start() # 爬取微博信息
print(u'用戶昵稱: ' + wb.nickname)
print(u'全部微博數(shù): ' + str(wb.weibo_num))
print(u'關(guān)注數(shù): ' + str(wb.following))
print(u'粉絲數(shù): ' + str(wb.followers))
if wb.weibo:
print(u'最新/置頂 微博為: ' + wb.weibo[0]['content'])
print(u'最新/置頂 微博位置: ' + wb.weibo[0]['publish_place'])
print(u'最新/置頂 微博發(fā)布時(shí)間: ' + wb.weibo[0]['publish_time'])
print(u'最新/置頂 微博獲得贊數(shù): ' + str(wb.weibo[0]['up_num']))
print(u'最新/置頂 微博獲得轉(zhuǎn)發(fā)數(shù): ' + str(wb.weibo[0]['retweet_num']))
print(u'最新/置頂 微博獲得評(píng)論數(shù): ' + str(wb.weibo[0]['comment_num']))
print(u'最新/置頂 微博發(fā)布工具: ' + wb.weibo[0]['publish_tool'])
except Exception as e:
print('Error: ', e)
traceback.print_exc()
if __name__ == '__main__':
main()