基于 Fabric 部署分布式爬蟲的思考

Python:基于 Fabric 部署分布式爬蟲的思考

Fabric 本身是一款用于自動化管理,發(fā)布任務(wù)和布署應(yīng)用的工具宝踪,在自動化運(yùn)維中比較常見

當(dāng)然其他的 連接工具同樣優(yōu)秀渡贾,比如 paramiko ,只是 fabric 封裝的更好撑教,文檔更全叹话,使用也更簡單

中文文檔 https://fabric-chs.readthedocs.io/zh_CN/chs/

Fabric 是一個用 Python 編寫的命令行工具庫偷遗,它可以幫助系統(tǒng)管理員高效地執(zhí)行某些任務(wù)
一個讓通過 SSH 執(zhí)行 Shell 命令更加 容易 、 更符合 Python 風(fēng)格 的命令庫

image

安裝

sudo pip3 install fabric3

如何運(yùn)行

最終這樣都可以運(yùn)行

fab -f REQUESTS_HTML.py host_type # host_type 是任務(wù)函數(shù)

or

python3 REQUESTS_HTML.py  # 運(yùn)行整個文件

實(shí)戰(zhàn)案例

我是一個不怎么愛說話的人(其實(shí)是文筆一般)驼壶,所以直接貼代碼恐怕是最好的分享的方式了

第一個 Demo

from fabric.api import run,cd,env,hosts,execute
env.hosts=['root@2.2.2.2:22']
env.password='pwd2'

def host_type():
    with cd('../home'):
        run('ls')
        run('cd youboy_redis')
        run('cd Youboy && cd youboy && ls && python3 run.py')

print(execute(host_type)) # execute 執(zhí)行任務(wù)

可以很清晰的看到函數(shù)執(zhí)行的順序氏豌,進(jìn)入 爬蟲 目錄并運(yùn)行一個爬蟲腳本,就像在本地執(zhí)行命令一樣热凹,這里支持 with 上下文

簡單做一個類封裝

from fabric.api import run,cd,env,hosts,execute
class H():
    def __init__(self,host,pwd):
        env.hosts=list(hosts)
        env.password=pwd

    def host_type(self):
        with cd('../home'):
            run('ls')
    def run(self):
        print(execute(self.host_type))

h = H('root@2.2.2.2:22','pwd2')
h.run()

嗯泵喘,沒有問題

并行執(zhí)行

我們在介紹執(zhí)行遠(yuǎn)程命令時曾提到過多臺機(jī)器的任務(wù)默認(rèn)情況下是串行執(zhí)行的泪电。
Fabric 支持并行任務(wù),當(dāng)服務(wù)器的任務(wù)之間沒有依賴時纪铺,并行可以有效的加快執(zhí)行速度相速。
怎么開啟并行執(zhí)行呢?

在執(zhí)行”fab”命令時加上”-P”參數(shù)

$ fab -P host_type

或者
設(shè)置 ”env.parallel” 環(huán)境參數(shù)為True

from fabric.api import env
env.parallel = True

如果鲜锚,我們只想對某一任務(wù)做并行的話突诬,我們可以在任務(wù)函數(shù)上加上”@parallel”裝飾器:

from fabric.api import parallel
 
@parallel
def runs_in_parallel():
    pass
 
def runs_serially():
    pass

這樣即便并行未開啟,”runs_in_parallel()”任務(wù)也會并行執(zhí)行芜繁。
反過來旺隙,我們可以在任務(wù)函數(shù)上加上”@serial”裝飾器:

from fabric.api import serial
 
def runs_in_parallel():
    pass
 
@serial
def runs_serially():
    pass

這樣即便并行已經(jīng)開啟,”runs_serially()”任務(wù)也會串行執(zhí)行骏令。

試著做一些事情

配置文件

使用 yaml 格式

[mysql]
host = 127.0.0.1
port = 3306
db = python
user = root
passwd = 123456
charset = utf8

[mongodb]
host = ip
port = 27017
db = QXB

[redis]
host = 127.0.0.1
port = 6379
db = 0

[server]
aliyun1_host = ["公網(wǎng)ip", "ssh密碼", 22]
aliyun2_host = ["公網(wǎng)ip", "ssh密碼", 22]

讀取配置

config.py

from configparser import ConfigParser
import json
config = ConfigParser()
config.read('./conf.yml')  # ['conf.ini'] ['conf.cfg]


# 獲取所有的section
# print(config.sections())  # ['mysql', redis]

conf_list = list()
for host in config.options('server'):
    str_host = config.get('server', host)
    json_host = json.loads(str_host)
    conf_list.append(json_host)

print(conf_list)

遠(yuǎn)程連接服務(wù)器以及一些常用操作

import warnings
warnings.filterwarnings("ignore")
import time
from fabric.api import * # run,cd,env,hosts,execute,sudo,settings,hide
from fabric.colors import *
from fabric.contrib.console import confirm
import config
import json
from fabric.tasks import Task

class HA():
    def __init__(self):
        self.host = "root@{host}:{port}"
        self.ssh = "root@{host}:{port}"
        self.env = env
        self.env.warn_only = True # 這樣寫比較痛快
        self.env.hosts = [
            self.host.format(host=host[0],port=host[2]) for host in config.conf_list]
        self.env.passwords = {
            self.ssh.format(host=host[0], port=host[2]):host[1] for host in config.conf_list}

 
        print(self.env["hosts"])

    # def Hide_all(self):
    #     with settings(hide('everything'), warn_only=True):  # 關(guān)閉顯示
    #         result = run('ls')
    #         print(result)  # 命令執(zhí)行的結(jié)果
    #         print(result.return_code)

    # def Show_all(self):
    #     with settings(show('everything'), warn_only=True):  # 顯示所有
    #         result = run('docker')
    #         print(str(result.return_code))  # 返回碼蔬捷,0表示正確執(zhí)行,1表示錯誤
    #         print(str(result.failed))

    # @task
    # def Prefix(self): # 前綴榔袋,它接受一個命令作為參數(shù)抠刺,表示在其內(nèi)部執(zhí)行的代碼塊,都要先執(zhí)行prefix的命令參數(shù)摘昌。
    #     with cd('../home'):
    #         with prefix('echo 123'):
    #             run('echo caonima')


    # def Shell_env(self): # 設(shè)置shell腳本的環(huán)境變量 
    #     with shell_env(HTTP_PROXY='1.1.1.1'):
    #         run('echo $HTTP_PROXY')


    # def Path_env(self): # 配置遠(yuǎn)程服務(wù)器PATH環(huán)境變量,只對當(dāng)前會話有效高蜂,不會影響遠(yuǎn)程服務(wù)器的其他操作聪黎,path的修改支持多種模式
    #     with path('/tmp', 'prepend'):
    #         run("echo $PATH")
    #     run("echo $PATH")


    # def Mongo(self): # 嘗試連接mongodb數(shù)據(jù)庫  不知道為什么制定端口就不行了
    #     # with remote_tunnel(27017):
    #     run('mongo')


    # def Mysql(self):  # 嘗試連接mysql數(shù)據(jù)庫
    #     with remote_tunnel(3306):
    #         run('mysql -u root -p password')

    '''
    指定host時,可以同時指定用戶名和端口號: username@hostname:port
    通過命令行指定要多哪些hosts執(zhí)行人物:fab mytask:hosts="host1;host2"
    通過hosts裝飾器指定要對哪些hosts執(zhí)行當(dāng)前task
    通過env.reject_unkown_hosts控制未知host的行為备恤,默認(rèn)True稿饰,類似于SSH的StrictHostKeyChecking的選項(xiàng)設(shè)置為no,不進(jìn)行公鑰確認(rèn)露泊。
    '''

    # @hosts('root@ip:22')
    # @task
    # def Get_Ip(self):
    #     run('ifconfig') 
    #     # return run("ip a")

    # @hosts("root@ip:22")
    # @runs_once
    # def Get_One_Ip(self):
    #     run('ifconfig')

    '''
    role是對服務(wù)器進(jìn)行分類的手段喉镰,通過role可以定義服務(wù)器的角色,
    以便對不同的服務(wù)器執(zhí)行不同的操作惭笑,Role邏輯上將服務(wù)器進(jìn)行了分類侣姆,
    分類以后,我們可以對某一類服務(wù)器指定一個role名即可沉噩。
    進(jìn)行task任務(wù)時捺宗,對role進(jìn)行控制。
    '''

    # @roles('web')  # 只對role為db的主機(jī)進(jìn)行操作
    # @task
    # def Roles_Get_Ip():
    #     run('ifconfig')
        

    # def Confirm(self): # 有時候我們在某一步執(zhí)行錯誤川蒙,會給用戶提示蚜厉,是否繼續(xù)執(zhí)行時,confirm就非常有用了畜眨,它包含在 fabric.contrib.console中
    #     result = confirm('Continue Anyway?')
    #     print(result)

    # def run_python(self):
    #     run("python3 trigger.py")

    @task
    @parallel
    def celery_call(): # 執(zhí)行celery任務(wù)
        with cd('../home'):
            warn(yellow('----->Celery'))
            puts(green('----->puts'))
            run('cd ./celery_1 && celery -A Celery worker -l info')
            time.sleep(3)
            run('python3 run_tasks.py')
    

    # @task
    # def update_file(): # 上傳文件到服務(wù)器
    #     with settings(warn_only=True):
    #         local("tar -czf test.tar.gz config.py")
    #         result = put("test.tar.gz", "/home/test.tar.gz")
    #     if result.failed and not confirm("continue[y/n]?"):
    #         abort("put test.tar.gz failed")

    #     with settings(warn_only=True):
    #         local_file_md5 = local("md5sum test.tar.gz",capture=True).split(" ")[0]
    #         remote_file_md5 = run("md5sum /home/test.tar.gz").split(" ")[0]
    #     if local_file_md5 == remote_file_md5:
    #         print(green("local_file == remote_file"))
    #     else:
    #         print(red("local_file != remote"))
    #     run("mkdir /home/test")
    #     run("tar -zxf /home/test.tar.gz -C /home/scp")

    '''
    有一個地方很神奇昼牛,self和@task裝飾器在類中不能共用术瓮,否則會報錯
    '''

    # @task
    # def downloads_file(): # get文件到本地
    #     with settings(warn_only=True):
    #         result = get("/home/celery_1", "./")
    #     if result.failed and not confirm("continue[y/n]?"):
    #         abort("get test.tar.gz failed")
    #     local("mkdir ./test")
    #     local("tar zxf ./hh.tar.gz -C ./test")

    # @task
    # @parallel
    # def scp_docker_file():
    #     with settings(warn_only=True):
    #         local("tar -czf docker.tar.gz ../docker")
    #         result = put("docker.tar.gz", "/home/docker.tar.gz")
    #     if result.failed and not confirm("continue[y/n]?"):
    #         abort("put dockerfile failed")
    #     run("mkdir /home/docker")
    #     run("tar -zxf /home/docker.tar.gz -C /home")


    def Run(self):
        execute(self.celery_call)
    

h = HA()
h.Run()

我盡可能添加一些代碼注釋,更多解釋還請參考 fabric 文檔啊

看看 docker

只要能夠連接到服務(wù)器贰健,那么在這些服務(wù)器上安裝服務(wù)也就在情理之中了胞四,比如 docker
來看具體的代碼實(shí)現(xiàn)

import warnings
warnings.filterwarnings("ignore")
import time
from fabric.api import * # run,cd,env,hosts,execute,sudo,settings,hide
from fabric.colors import *
from fabric.contrib.console import confirm
import config
import json
from fabric.tasks import Task

class HA():
    def __init__(self):
        self.host = "root@{host}:{port}"
        self.ssh = "root@{host}:{port}"
        self.env = env
        self.env.warn_only = True # 這樣寫比較痛快
        self.env.hosts = [
            self.host.format(host=host[0],port=host[2]) for host in config.conf_list]
        self.env.passwords = {
            self.ssh.format(host=host[0], port=host[2]):host[1] for host in config.conf_list}


        print(self.env["hosts"])

    
    
    @task
    def get_docker_v(): # 查看docker版本
        with cd('../home'):
            run('docker version')

    @task
    def pull_images(images_name):
        with settings(warn_only=True):
            with cd("../home/"):
                try:
                    run("docker pull {}".format(images_name))
                except:
                    abort("docker pull failed")

    @task
    def push_images(images_name,username_repository,tag):
        with settings(warn_only=True):
            with cd("../home/"):
                try:
                    run("docker tag {image_name} {username_repository}:{tag}".format(images_name=images_name,username_repository=username_repository,tag=tag))
                    run("docker push {username_repository}:{tag}".format(username_repository=username_repository,tag=tag))
                except:
                    abort("docker push failed")

    @task
    def run_docker_images(images_name_tag):
        with settings(warn_only=True):
            with cd("../home/"):
                try:
                    run("docker run -p 4000:80 {}".format(images_name_tag))
                except:
                    abort("docker run failed")


    @task
    @parallel
    def execute_docker_compose():
        with settings(warn_only=True):
            with cd("../home/flask_app"):
                run("docker-compose up")


    @task
    def create_docker_service(service_name,images_name,num=4):
        with settings(warn_only=True):
            with cd("../home/"):
                run("docker service create --name {service_name} -p 4000:80 {images_name}".format(service_name=service_name,images_name=images_name))
                run("docker service scale {service_name}={num}".format(service_name=service_name,num=num))
    
    
    @task
    def stop_docker_service(service_name):
        with settings(warn_only=True):
            with cd("../home/"):
                run("docker service rm {}".format(service_name))

    def Run(self):
        # execute(self.create_docker_service,"demo","3417947630/py:hello")
        execute(self.execute_docker_compose)

h = HA()
h.Run()

嘿嘿,挺好

總結(jié)

基于python第三方庫 fabric 實(shí)現(xiàn)遠(yuǎn)程ssh分布式調(diào)度部署應(yīng)用,是一種很不錯的選擇霎烙,那么如果用于部署 爬蟲的應(yīng)用呢撬讽?
如果你是使用 scrapy 框架編寫的爬蟲(或者是其他框架,各種腳本也是一樣)悬垃,那么可以直接運(yùn)行文件上傳的方法把完整目錄拷貝到目標(biāo)服務(wù)器(當(dāng)然是批量的)
然后鍵入爬取的命令游昼,記住 fabric 是支持并行的,就能達(dá)到多機(jī)協(xié)作抓取的目的了

其實(shí)在 scrapy 中也可能用 scrapyd 來打包部署分布式爬蟲尝蠕,但是打包過程略為繁瑣烘豌,而 SSH 連接則比較直接,操作簡單
然鵝說到底 fabric 也只是一種自動化運(yùn)維的工具看彼,本質(zhì)上也只是把代碼拷貝到目標(biāo)服務(wù)器和執(zhí)行相應(yīng)的命令而已廊佩,并沒有像 scrapyd 提供爬蟲管理的可視化界面

所以這樣看來, Fabric 至少算得上是部署分布式爬蟲的一種選擇靖榕,就是因?yàn)椴渴鸷唵?/p>

以上就是我對這個 py 庫的一些看法标锄,它為我們?nèi)蘸蟛渴饝?yīng)用和服務(wù)提供了更多的選擇,多多實(shí)戰(zhàn)吧 W录啤料皇!

歡迎轉(zhuǎn)載,但要聲明出處星压,不然我順著網(wǎng)線過去就是一拳践剂。
個人技術(shù)博客:http://www.gzky.live

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市娜膘,隨后出現(xiàn)的幾起案子逊脯,更是在濱河造成了極大的恐慌,老刑警劉巖竣贪,帶你破解...
    沈念sama閱讀 212,454評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件军洼,死亡現(xiàn)場離奇詭異,居然都是意外死亡演怎,警方通過查閱死者的電腦和手機(jī)歉眷,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來颤枪,“玉大人汗捡,你說我怎么就攤上這事。” “怎么了扇住?”我有些...
    開封第一講書人閱讀 157,921評論 0 348
  • 文/不壞的土叔 我叫張陵春缕,是天一觀的道長。 經(jīng)常有香客問我艘蹋,道長锄贼,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,648評論 1 284
  • 正文 為了忘掉前任女阀,我火速辦了婚禮宅荤,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘浸策。我一直安慰自己冯键,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,770評論 6 386
  • 文/花漫 我一把揭開白布庸汗。 她就那樣靜靜地躺著惫确,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蚯舱。 梳的紋絲不亂的頭發(fā)上改化,一...
    開封第一講書人閱讀 49,950評論 1 291
  • 那天,我揣著相機(jī)與錄音枉昏,去河邊找鬼陈肛。 笑死,一個胖子當(dāng)著我的面吹牛兄裂,可吹牛的內(nèi)容都是我干的燥爷。 我是一名探鬼主播,決...
    沈念sama閱讀 39,090評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼懦窘,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了稚配?” 一聲冷哼從身側(cè)響起畅涂,我...
    開封第一講書人閱讀 37,817評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎道川,沒想到半個月后午衰,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,275評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡冒萄,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,592評論 2 327
  • 正文 我和宋清朗相戀三年臊岸,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片尊流。...
    茶點(diǎn)故事閱讀 38,724評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡帅戒,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出崖技,到底是詐尸還是另有隱情逻住,我是刑警寧澤钟哥,帶...
    沈念sama閱讀 34,409評論 4 333
  • 正文 年R本政府宣布,位于F島的核電站瞎访,受9級特大地震影響腻贰,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜扒秸,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,052評論 3 316
  • 文/蒙蒙 一播演、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧伴奥,春花似錦写烤、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至锣吼,卻和暖如春选浑,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背玄叠。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評論 1 266
  • 我被黑心中介騙來泰國打工古徒, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人读恃。 一個月前我還...
    沈念sama閱讀 46,503評論 2 361
  • 正文 我出身青樓隧膘,卻偏偏與公主長得像,于是被迫代替她去往敵國和親寺惫。 傳聞我的和親對象是個殘疾皇子疹吃,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,627評論 2 350

推薦閱讀更多精彩內(nèi)容