Python:基于 Fabric 部署分布式爬蟲的思考
Fabric 本身是一款用于自動化管理,發(fā)布任務(wù)和布署應(yīng)用的工具宝踪,在自動化運(yùn)維中比較常見
當(dāng)然其他的 連接工具同樣優(yōu)秀渡贾,比如 paramiko ,只是 fabric 封裝的更好撑教,文檔更全叹话,使用也更簡單
Fabric 是一個用 Python 編寫的命令行工具庫偷遗,它可以幫助系統(tǒng)管理員高效地執(zhí)行某些任務(wù)
一個讓通過 SSH 執(zhí)行 Shell 命令更加 容易 、 更符合 Python 風(fēng)格 的命令庫
安裝
sudo pip3 install fabric3
如何運(yùn)行
最終這樣都可以運(yùn)行
fab -f REQUESTS_HTML.py host_type # host_type 是任務(wù)函數(shù)
or
python3 REQUESTS_HTML.py # 運(yùn)行整個文件
實(shí)戰(zhàn)案例
我是一個不怎么愛說話的人(其實(shí)是文筆一般)驼壶,所以直接貼代碼恐怕是最好的分享的方式了
第一個 Demo
from fabric.api import run,cd,env,hosts,execute
env.hosts=['root@2.2.2.2:22']
env.password='pwd2'
def host_type():
with cd('../home'):
run('ls')
run('cd youboy_redis')
run('cd Youboy && cd youboy && ls && python3 run.py')
print(execute(host_type)) # execute 執(zhí)行任務(wù)
可以很清晰的看到函數(shù)執(zhí)行的順序氏豌,進(jìn)入 爬蟲 目錄并運(yùn)行一個爬蟲腳本,就像在本地執(zhí)行命令一樣热凹,這里支持 with 上下文
簡單做一個類封裝
from fabric.api import run,cd,env,hosts,execute
class H():
def __init__(self,host,pwd):
env.hosts=list(hosts)
env.password=pwd
def host_type(self):
with cd('../home'):
run('ls')
def run(self):
print(execute(self.host_type))
h = H('root@2.2.2.2:22','pwd2')
h.run()
嗯泵喘,沒有問題
并行執(zhí)行
我們在介紹執(zhí)行遠(yuǎn)程命令時曾提到過多臺機(jī)器的任務(wù)默認(rèn)情況下是串行執(zhí)行的泪电。
Fabric 支持并行任務(wù),當(dāng)服務(wù)器的任務(wù)之間沒有依賴時纪铺,并行可以有效的加快執(zhí)行速度相速。
怎么開啟并行執(zhí)行呢?
在執(zhí)行”fab”命令時加上”-P”參數(shù)
$ fab -P host_type
或者
設(shè)置 ”env.parallel” 環(huán)境參數(shù)為True
from fabric.api import env
env.parallel = True
如果鲜锚,我們只想對某一任務(wù)做并行的話突诬,我們可以在任務(wù)函數(shù)上加上”@parallel”裝飾器:
from fabric.api import parallel
@parallel
def runs_in_parallel():
pass
def runs_serially():
pass
這樣即便并行未開啟,”runs_in_parallel()”任務(wù)也會并行執(zhí)行芜繁。
反過來旺隙,我們可以在任務(wù)函數(shù)上加上”@serial”裝飾器:
from fabric.api import serial
def runs_in_parallel():
pass
@serial
def runs_serially():
pass
這樣即便并行已經(jīng)開啟,”runs_serially()”任務(wù)也會串行執(zhí)行骏令。
試著做一些事情
配置文件
使用 yaml 格式
[mysql]
host = 127.0.0.1
port = 3306
db = python
user = root
passwd = 123456
charset = utf8
[mongodb]
host = ip
port = 27017
db = QXB
[redis]
host = 127.0.0.1
port = 6379
db = 0
[server]
aliyun1_host = ["公網(wǎng)ip", "ssh密碼", 22]
aliyun2_host = ["公網(wǎng)ip", "ssh密碼", 22]
讀取配置
config.py
from configparser import ConfigParser
import json
config = ConfigParser()
config.read('./conf.yml') # ['conf.ini'] ['conf.cfg]
# 獲取所有的section
# print(config.sections()) # ['mysql', redis]
conf_list = list()
for host in config.options('server'):
str_host = config.get('server', host)
json_host = json.loads(str_host)
conf_list.append(json_host)
print(conf_list)
遠(yuǎn)程連接服務(wù)器以及一些常用操作
import warnings
warnings.filterwarnings("ignore")
import time
from fabric.api import * # run,cd,env,hosts,execute,sudo,settings,hide
from fabric.colors import *
from fabric.contrib.console import confirm
import config
import json
from fabric.tasks import Task
class HA():
def __init__(self):
self.host = "root@{host}:{port}"
self.ssh = "root@{host}:{port}"
self.env = env
self.env.warn_only = True # 這樣寫比較痛快
self.env.hosts = [
self.host.format(host=host[0],port=host[2]) for host in config.conf_list]
self.env.passwords = {
self.ssh.format(host=host[0], port=host[2]):host[1] for host in config.conf_list}
print(self.env["hosts"])
# def Hide_all(self):
# with settings(hide('everything'), warn_only=True): # 關(guān)閉顯示
# result = run('ls')
# print(result) # 命令執(zhí)行的結(jié)果
# print(result.return_code)
# def Show_all(self):
# with settings(show('everything'), warn_only=True): # 顯示所有
# result = run('docker')
# print(str(result.return_code)) # 返回碼蔬捷,0表示正確執(zhí)行,1表示錯誤
# print(str(result.failed))
# @task
# def Prefix(self): # 前綴榔袋,它接受一個命令作為參數(shù)抠刺,表示在其內(nèi)部執(zhí)行的代碼塊,都要先執(zhí)行prefix的命令參數(shù)摘昌。
# with cd('../home'):
# with prefix('echo 123'):
# run('echo caonima')
# def Shell_env(self): # 設(shè)置shell腳本的環(huán)境變量
# with shell_env(HTTP_PROXY='1.1.1.1'):
# run('echo $HTTP_PROXY')
# def Path_env(self): # 配置遠(yuǎn)程服務(wù)器PATH環(huán)境變量,只對當(dāng)前會話有效高蜂,不會影響遠(yuǎn)程服務(wù)器的其他操作聪黎,path的修改支持多種模式
# with path('/tmp', 'prepend'):
# run("echo $PATH")
# run("echo $PATH")
# def Mongo(self): # 嘗試連接mongodb數(shù)據(jù)庫 不知道為什么制定端口就不行了
# # with remote_tunnel(27017):
# run('mongo')
# def Mysql(self): # 嘗試連接mysql數(shù)據(jù)庫
# with remote_tunnel(3306):
# run('mysql -u root -p password')
'''
指定host時,可以同時指定用戶名和端口號: username@hostname:port
通過命令行指定要多哪些hosts執(zhí)行人物:fab mytask:hosts="host1;host2"
通過hosts裝飾器指定要對哪些hosts執(zhí)行當(dāng)前task
通過env.reject_unkown_hosts控制未知host的行為备恤,默認(rèn)True稿饰,類似于SSH的StrictHostKeyChecking的選項(xiàng)設(shè)置為no,不進(jìn)行公鑰確認(rèn)露泊。
'''
# @hosts('root@ip:22')
# @task
# def Get_Ip(self):
# run('ifconfig')
# # return run("ip a")
# @hosts("root@ip:22")
# @runs_once
# def Get_One_Ip(self):
# run('ifconfig')
'''
role是對服務(wù)器進(jìn)行分類的手段喉镰,通過role可以定義服務(wù)器的角色,
以便對不同的服務(wù)器執(zhí)行不同的操作惭笑,Role邏輯上將服務(wù)器進(jìn)行了分類侣姆,
分類以后,我們可以對某一類服務(wù)器指定一個role名即可沉噩。
進(jìn)行task任務(wù)時捺宗,對role進(jìn)行控制。
'''
# @roles('web') # 只對role為db的主機(jī)進(jìn)行操作
# @task
# def Roles_Get_Ip():
# run('ifconfig')
# def Confirm(self): # 有時候我們在某一步執(zhí)行錯誤川蒙,會給用戶提示蚜厉,是否繼續(xù)執(zhí)行時,confirm就非常有用了畜眨,它包含在 fabric.contrib.console中
# result = confirm('Continue Anyway?')
# print(result)
# def run_python(self):
# run("python3 trigger.py")
@task
@parallel
def celery_call(): # 執(zhí)行celery任務(wù)
with cd('../home'):
warn(yellow('----->Celery'))
puts(green('----->puts'))
run('cd ./celery_1 && celery -A Celery worker -l info')
time.sleep(3)
run('python3 run_tasks.py')
# @task
# def update_file(): # 上傳文件到服務(wù)器
# with settings(warn_only=True):
# local("tar -czf test.tar.gz config.py")
# result = put("test.tar.gz", "/home/test.tar.gz")
# if result.failed and not confirm("continue[y/n]?"):
# abort("put test.tar.gz failed")
# with settings(warn_only=True):
# local_file_md5 = local("md5sum test.tar.gz",capture=True).split(" ")[0]
# remote_file_md5 = run("md5sum /home/test.tar.gz").split(" ")[0]
# if local_file_md5 == remote_file_md5:
# print(green("local_file == remote_file"))
# else:
# print(red("local_file != remote"))
# run("mkdir /home/test")
# run("tar -zxf /home/test.tar.gz -C /home/scp")
'''
有一個地方很神奇昼牛,self和@task裝飾器在類中不能共用术瓮,否則會報錯
'''
# @task
# def downloads_file(): # get文件到本地
# with settings(warn_only=True):
# result = get("/home/celery_1", "./")
# if result.failed and not confirm("continue[y/n]?"):
# abort("get test.tar.gz failed")
# local("mkdir ./test")
# local("tar zxf ./hh.tar.gz -C ./test")
# @task
# @parallel
# def scp_docker_file():
# with settings(warn_only=True):
# local("tar -czf docker.tar.gz ../docker")
# result = put("docker.tar.gz", "/home/docker.tar.gz")
# if result.failed and not confirm("continue[y/n]?"):
# abort("put dockerfile failed")
# run("mkdir /home/docker")
# run("tar -zxf /home/docker.tar.gz -C /home")
def Run(self):
execute(self.celery_call)
h = HA()
h.Run()
我盡可能添加一些代碼注釋,更多解釋還請參考 fabric 文檔啊
看看 docker
只要能夠連接到服務(wù)器贰健,那么在這些服務(wù)器上安裝服務(wù)也就在情理之中了胞四,比如 docker
來看具體的代碼實(shí)現(xiàn)
import warnings
warnings.filterwarnings("ignore")
import time
from fabric.api import * # run,cd,env,hosts,execute,sudo,settings,hide
from fabric.colors import *
from fabric.contrib.console import confirm
import config
import json
from fabric.tasks import Task
class HA():
def __init__(self):
self.host = "root@{host}:{port}"
self.ssh = "root@{host}:{port}"
self.env = env
self.env.warn_only = True # 這樣寫比較痛快
self.env.hosts = [
self.host.format(host=host[0],port=host[2]) for host in config.conf_list]
self.env.passwords = {
self.ssh.format(host=host[0], port=host[2]):host[1] for host in config.conf_list}
print(self.env["hosts"])
@task
def get_docker_v(): # 查看docker版本
with cd('../home'):
run('docker version')
@task
def pull_images(images_name):
with settings(warn_only=True):
with cd("../home/"):
try:
run("docker pull {}".format(images_name))
except:
abort("docker pull failed")
@task
def push_images(images_name,username_repository,tag):
with settings(warn_only=True):
with cd("../home/"):
try:
run("docker tag {image_name} {username_repository}:{tag}".format(images_name=images_name,username_repository=username_repository,tag=tag))
run("docker push {username_repository}:{tag}".format(username_repository=username_repository,tag=tag))
except:
abort("docker push failed")
@task
def run_docker_images(images_name_tag):
with settings(warn_only=True):
with cd("../home/"):
try:
run("docker run -p 4000:80 {}".format(images_name_tag))
except:
abort("docker run failed")
@task
@parallel
def execute_docker_compose():
with settings(warn_only=True):
with cd("../home/flask_app"):
run("docker-compose up")
@task
def create_docker_service(service_name,images_name,num=4):
with settings(warn_only=True):
with cd("../home/"):
run("docker service create --name {service_name} -p 4000:80 {images_name}".format(service_name=service_name,images_name=images_name))
run("docker service scale {service_name}={num}".format(service_name=service_name,num=num))
@task
def stop_docker_service(service_name):
with settings(warn_only=True):
with cd("../home/"):
run("docker service rm {}".format(service_name))
def Run(self):
# execute(self.create_docker_service,"demo","3417947630/py:hello")
execute(self.execute_docker_compose)
h = HA()
h.Run()
嘿嘿,挺好
總結(jié)
基于python第三方庫 fabric 實(shí)現(xiàn)遠(yuǎn)程ssh分布式調(diào)度部署應(yīng)用,是一種很不錯的選擇霎烙,那么如果用于部署 爬蟲的應(yīng)用呢撬讽?
如果你是使用 scrapy 框架編寫的爬蟲(或者是其他框架,各種腳本也是一樣)悬垃,那么可以直接運(yùn)行文件上傳的方法把完整目錄拷貝到目標(biāo)服務(wù)器(當(dāng)然是批量的)
然后鍵入爬取的命令游昼,記住 fabric 是支持并行的,就能達(dá)到多機(jī)協(xié)作抓取的目的了
其實(shí)在 scrapy 中也可能用 scrapyd 來打包部署分布式爬蟲尝蠕,但是打包過程略為繁瑣烘豌,而 SSH 連接則比較直接,操作簡單
然鵝說到底 fabric 也只是一種自動化運(yùn)維的工具看彼,本質(zhì)上也只是把代碼拷貝到目標(biāo)服務(wù)器和執(zhí)行相應(yīng)的命令而已廊佩,并沒有像 scrapyd 提供爬蟲管理的可視化界面
所以這樣看來, Fabric 至少算得上是部署分布式爬蟲的一種選擇靖榕,就是因?yàn)椴渴鸷唵?/p>
以上就是我對這個 py 庫的一些看法标锄,它為我們?nèi)蘸蟛渴饝?yīng)用和服務(wù)提供了更多的選擇,多多實(shí)戰(zhàn)吧 W录啤料皇!
歡迎轉(zhuǎn)載,但要聲明出處星压,不然我順著網(wǎng)線過去就是一拳践剂。
個人技術(shù)博客:http://www.gzky.live