ambari-agent心跳流程分析

ambari-agent啟動(dòng)流程

??ambari-agent啟動(dòng)通過(guò)命令ambari-agent start戳鹅,命令實(shí)際執(zhí)行nohup $PYTHON $AMBARI_AGENT_PY_SCRIPTpython AmbariAgent.py

AMBARI_AGENT_PY_SCRIPT=/usr/lib/ambari-agent/lib/ambari_agent/AmbariAgent.py

??AmbariAgent.py通過(guò)subprocess子進(jìn)程執(zhí)行main.py,該進(jìn)程會(huì)會(huì)從配置文件中獲取相關(guān)需要的配置信息昏兆,數(shù)據(jù)文件清理(err,auto,output*等文件)枫虏,通過(guò)啟動(dòng)參數(shù)來(lái)重啟或關(guān)閉agent,監(jiān)聽(tīng)端口亮垫,獲取server的地址模软,然后和server建立連接,一旦建立連接饮潦,調(diào)用run-thread方法開(kāi)始agent和server的通信過(guò)程燃异,在run_threads中啟動(dòng)了Controller線程。
??Ambari-agent 通過(guò)Controller.py的方法與Ambari-server 中的AgentResource(org.apache.ambari.server.agent.rest) 進(jìn)行交互(獲得集群配置變更继蜡,報(bào)告節(jié)點(diǎn)屬性回俐,以及節(jié)點(diǎn)上運(yùn)行服務(wù)運(yùn)行狀態(tài)),并通過(guò)HTTP Response返回ambari-server投遞過(guò)來(lái)的狀態(tài)操作到操作隊(duì)列ActionQueue稀并。ActionQueue默認(rèn)是使用并行模式執(zhí)行command

Post請(qǐng)求 注冊(cè)關(guān)于主機(jī)的信息
Post請(qǐng)求 心跳連接(更新節(jié)點(diǎn)的狀態(tài))
Get 請(qǐng)求 檢索用于集群上的組件映射

基礎(chǔ)參數(shù)釋義

ExecuteCommand:對(duì)服務(wù)組件執(zhí)行INSTALL/START/STOP等操作仅颇。

StatusCommand:對(duì)服務(wù)組件執(zhí)行死活檢查(由Server定期下發(fā))。需要server確定

CancelCommand:取消其他已經(jīng)下發(fā)的Task(當(dāng)Stage中的某個(gè)Task失敗時(shí))碘举。

RegistrationCommand:要求Agent向Server重新注冊(cè)(當(dāng)發(fā)現(xiàn)Server維護(hù)的心跳序號(hào)與Agent上報(bào)的不一致時(shí))

心跳流程源碼分析

heartbeatWithServer方法屬性釋義

self.DEBUG_HEARTBEAT_RETRIES = 0 # debug心跳重試次數(shù)
self.DEBUG_SUCCESSFULL_HEARTBEATS = 0 # debug心跳成功次數(shù) 
retry = False # 是否重試,心跳開(kāi)始的時(shí)候默認(rèn)False
certVerifFailed = False 
state_interval = int(self.config.get('heartbeat', 'state_interval_seconds', '60')) # 日志記錄間隔 默認(rèn)60s
 # last time when state was successfully sent to server
last_state_timestamp = 0.0 # 上一次記錄日志的時(shí)間
# 為了確保心跳的正常運(yùn)行,我們會(huì)記錄一些日志,但是為了避免高頻次的日志記錄,引用了state_interval來(lái)間隔記錄日志
heartbeat_running_msg_timestamp = 0.0 # 

# 通過(guò)只在特定的時(shí)間間隔內(nèi)進(jìn)行日志記錄來(lái)防止過(guò)度日志記錄
getrecoverycommands_timestamp = 0.0 # 獲取recoverycommands的時(shí)間戳
getrecoverycommands_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC #獲取recoverycommands時(shí)間間隔默認(rèn)為10s
heartbeat_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC # 心跳的默認(rèn)時(shí)間間隔是10s

出于測(cè)試目的

DEBUG_HEARTBEAT_RETRIES = 0
DEBUG_SUCCESSFULL_HEARTBEATS = 0
DEBUG_STOP_HEARTBEATING = False

非測(cè)試環(huán)境下初始化心跳進(jìn)入while循環(huán)

while not self.DEBUG_STOP_HEARTBEATING:

while循環(huán)體內(nèi)部解析:

  1. 默認(rèn)的日志級(jí)別為DEBUG,如果當(dāng)前時(shí)間 - 心跳日志輸出時(shí)間 > 日志輸出間隔 則將日志級(jí)別改成INFO,將心跳日志輸出時(shí)間設(shè)為當(dāng)前時(shí)間
while not self.DEBUG_STOP_HEARTBEATING:
    current_time = time.time()
    logging_level = logging.DEBUG
    if current_time - heartbeat_running_msg_timestamp > state_interval:
        # log more steps every minute or so
        logging_level = logging.INFO
        heartbeat_running_msg_timestamp = current_time
  1. 發(fā)送心跳前的數(shù)據(jù)準(zhǔn)備
  • 日志記錄當(dāng)前的responseId,注冊(cè)系統(tǒng)時(shí)返回的或者默認(rèn)的-1
  • send_state 是否發(fā)送系統(tǒng)狀態(tài)信息(通過(guò)shell命令獲取主機(jī)信息) ,默認(rèn)不發(fā)送
  • 判斷是否重試retry,如果為true,則DEBUG_HEARTBEAT_RETRIES +1 否則判斷當(dāng)前時(shí)間 - 上一次發(fā)送系統(tǒng)狀態(tài)信息的時(shí)間 > state_interval(10s) 如果為true,則send_state 置為true,意味著需要發(fā)送系統(tǒng)狀態(tài)信息給ambari-server端
  • 組織需要發(fā)送給ambari-server的json數(shù)據(jù)
  • 根據(jù)當(dāng)前設(shè)置的日志級(jí)別 進(jìn)行日志記錄.如果是INFO級(jí)別的則會(huì)記錄下當(dāng)前發(fā)出的json數(shù)據(jù)
try:
    logger.log(logging_level, "Heartbeat (response id = %s) with server is running...", self.responseId)
    send_state = False
    if not retry:
        if current_time - last_state_timestamp > state_interval:
            send_state = True
        logger.log(logging_level, "Building heartbeat message")
        data = json.dumps(self.heartbeat.build(self.responseId, send_state, self.hasMappedComponents))
    else:
        self.DEBUG_HEARTBEAT_RETRIES += 1
    if logger.isEnabledFor(logging.DEBUG):
        logger.log(logging_level, "Sending Heartbeat (id = %s): %s", self.responseId, data)
    else:
        logger.log(logging_level, "Sending Heartbeat (id = %s)", self.responseId)
  1. 與ambari-server建立通信并解析response
    3.1 發(fā)送請(qǐng)求至ambari-server,并接受server傳過(guò)來(lái)的response
    • exitstatus==0則表示本次通信成功, 不為0則拋出異常
    • 獲取responseId,并記錄日志
    • 或者集群節(jié)點(diǎn)數(shù),來(lái)及時(shí)調(diào)整心跳的頻率,如果集群節(jié)點(diǎn)>0 則 集群節(jié)點(diǎn)//HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC(10s)取整除 - 返回商的整數(shù)部分(向下取整)
    # HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC = 10
    # HEARTBEAT_IDLE_INTERVAL_DEFAULT_MIN_SEC = 1
    if(0 < cluster_size and cluster_size < 9 ) 
        cluster_size // HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC = 0 ==> 則心跳頻率為1s
    if(cluster_size > 9 and cluster_size <=100)
        cluster_size // HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC 整除取返回商部分
    if(cluster_size > 100 )
        cluster_size // HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC =10===>則心跳頻率為10s
    
    • 日志記錄新的心跳頻率
response = self.sendRequest(self.heartbeatUrl, data)
exitStatus = 0
if 'exitstatus' in response.keys():
    exitStatus = int(response['exitstatus'])
if exitStatus != 0:
    raise Exception(response)
serverId = int(response['responseId'])
logger.log(logging_level, 'Heartbeat response received (id = %s)', serverId)
cluster_size = int(response['clusterSize']) if 'clusterSize' in response.keys() else -1
# TODO: this needs to be revised if hosts can be shared across multiple clusters
heartbeat_interval = self.get_heartbeat_interval(cluster_size) \
    if cluster_size > 0 \
    else self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
logger.log(logging_level, "Heartbeat interval is %s seconds", heartbeat_interval)
  1. 根據(jù)ambari-server 返回的值來(lái)調(diào)整部分設(shè)置
    4.1 是否有映射的組件信息-----主要是心跳發(fā)送數(shù)據(jù)的時(shí)候如果無(wú)組件映射的話,會(huì)執(zhí)行一些比較耗費(fèi)性能的操作,根據(jù)response返回的信息來(lái)決定是否執(zhí)行該操作
    4.2 是否有處于等待的task,如果有的話,會(huì)暫停 自動(dòng)恢復(fù)管理機(jī)(recovery_manager)的操作
    4.3 是否存在注冊(cè)命令,如果存在的話,則退出心跳,將設(shè)置isRegistered = False 以及repeatRegistration = True 并再次進(jìn)行注冊(cè)操作,意味著從頭再來(lái)一次
if 'hasMappedComponents' in response.keys():
    self.hasMappedComponents = response['hasMappedComponents'] is not False
if 'hasPendingTasks' in response.keys():
    has_pending_tasks = bool(response['hasPendingTasks'])
    self.recovery_manager.set_paused(has_pending_tasks)
if 'registrationCommand' in response.keys():
    # check if the registration command is None. If none skip
    if response['registrationCommand'] is not None:
        logger.info("RegistrationCommand received - repeat agent registration")
        self.isRegistered = False
        self.repeatRegistration = True
        return
  1. 根據(jù)指標(biāo)來(lái)決定是否需要重啟agent
    5.1 獲取當(dāng)前進(jìn)程使用的內(nèi)存大小(單位KB)/1000 = MB
    默認(rèn)的軟性指標(biāo)為400MB 硬性指標(biāo)為1000GB 可通過(guò)ambari-agent.ini配置
    • 當(dāng)超過(guò)軟性指標(biāo)時(shí)且沒(méi)有正在處理的任務(wù)時(shí),進(jìn)行agent重啟
    • 當(dāng)大于等于硬性指標(biāo)時(shí),則強(qiáng)制進(jìn)行agent重啟

5.2 ambari-server返回的responseId與agent當(dāng)前記錄的responseId+1做比較,如果不相符則進(jìn)入重啟反之則更新最新的responseId,并更新上一次獲取主機(jī)狀態(tài)信息的時(shí)間last_state_timestamp=current_time

used_ram = get_used_ram() / 1000
# dealing with a possible memory leaks
if self.max_ram_soft and used_ram >= self.max_ram_soft and not self.actionQueue.tasks_in_progress_or_pending():
    logger.error(
        AGENT_RAM_OVERUSE_MESSAGE.format(used_ram=used_ram, config_name="memory_threshold_soft_mb",
                                         max_ram=self.max_ram_soft))
    self.restartAgent()
if self.max_ram_hard and used_ram >= self.max_ram_hard:
    logger.error(
        AGENT_RAM_OVERUSE_MESSAGE.format(used_ram=used_ram, config_name="memory_threshold_hard_mb",
                                         max_ram=self.max_ram_hard))
    self.restartAgent()
if serverId != self.responseId + 1:
    logger.error(
        "Error in responseId sequence - received responseId={0} from server while expecting {1} - restarting..."
        .format(serverId, self.responseId + 1))
    self.restartAgent()
else:
    self.responseId = serverId
    if send_state:
        last_state_timestamp = current_time
  1. 通過(guò)心跳返回信息更新agent配置
# if the response contains configurations, update the in-memory and
# disk-based configuration cache (execution and alert commands have this)
logger.log(logging_level, "Updating configurations from heartbeat")
self.cluster_configuration.update_configurations_from_heartbeat(response)

7.根據(jù)ambari-server返回的不同command來(lái)分別執(zhí)行相應(yīng)的操作

  • 因?yàn)樾枰∠腃ommands可能會(huì)在其它類型Commands之前進(jìn)行,這可能導(dǎo)致actionQueue執(zhí)行操作的混亂,所以為了避免命令執(zhí)行失敗,所以會(huì)將actionQueue進(jìn)行原子性操作,
  • 先將cancelCommand進(jìn)行移除(從actionQueue中remove假如它還沒(méi)執(zhí)行的話,或者直接kill假如它已經(jīng)在執(zhí)行中)
  • executionCommands存在的話,則recovery_manager會(huì)根據(jù)執(zhí)行的操作來(lái)動(dòng)態(tài)調(diào)整預(yù)期狀態(tài)方便之后進(jìn)行recover,緊接著將executionCommands放入actionQueue
  • statusCommands存在的話,recovery_manager會(huì)執(zhí)行相應(yīng)的操作,并放入statusCommandsExecutor通過(guò)線程去執(zhí)行
  • 通過(guò)時(shí)間的間隔查詢來(lái)定期生成recovery_commands
# there's case when canceled task can be processed in
# Action Queue.execute before adding rescheduled task to queue
# this can cause command failure instead result suppression
# so canceling and putting rescheduled commands should be executed atomically
if 'cancelCommands' in response_keys or 'executionCommands' in response_keys:
    logger.log(logging_level, "Adding cancel/execution commands")
with self.actionQueue.lock:
    if 'cancelCommands' in response_keys:
        self.cancelCommandInQueue(response['cancelCommands'])

    if 'executionCommands' in response_keys:
        execution_commands = response['executionCommands']
        self.recovery_manager.process_execution_commands(execution_commands)
        self.addToQueue(execution_commands)

if 'statusCommands' in response_keys:
    # try storing execution command details and desired state
    self.addToStatusQueue(response['statusCommands'])

if current_time - getrecoverycommands_timestamp > getrecoverycommands_interval:
    getrecoverycommands_timestamp = current_time
    if not self.actionQueue.tasks_in_progress_or_pending():
        logger.log(logging_level, "Adding recovery commands")
        recovery_commands = self.recovery_manager.get_recovery_commands()
        for recovery_command in recovery_commands:
            logger.info("Adding recovery command %s for component %s",
                        recovery_command['roleCommand'], recovery_command['role'])
            self.addToQueue([recovery_command])

if 'alertDefinitionCommands' in response_keys:
    logger.log(logging_level, "Updating alert definitions")
    self.alert_scheduler_handler.update_definitions(response)

if 'alertExecutionCommands' in response_keys:
    logger.log(logging_level, "Executing alert commands")
    self.alert_scheduler_handler.execute_alert(response['alertExecutionCommands'])

  1. server下發(fā)restart命令的話則進(jìn)行agent重啟,如果response中存在recoveryConfig則進(jìn)行配置更新
if "true" == response['restartAgent']:
    logger.error("Received the restartAgent command")
    self.restartAgent()
else:
    logger.debug("No commands sent from %s", self.serverHostname)

if retry:
    logger.info("Reconnected to %s", self.heartbeatUrl)

if "recoveryConfig" in response:
    # update the list of components enabled for recovery
    logger.log(logging_level, "Updating recovery config")
    self.recovery_manager.update_configuration_from_registration(response)

retry = False
certVerifFailed = False
self.DEBUG_SUCCESSFULL_HEARTBEATS += 1
self.DEBUG_HEARTBEAT_RETRIES = 0
self.heartbeat_stop_callback.reset_heartbeat()
  1. heartbeat_stop_callback 這個(gè)方法使用了python的threading,每一次心跳結(jié)束后進(jìn)入阻塞,thread.wait(timeout)后才會(huì)進(jìn)入下一次心跳,特殊情況,一旦非statusCommand類型的命令執(zhí)行完成,也會(huì)立即發(fā)送心跳,觸發(fā)threading.event.set(),立即進(jìn)入下一次心跳

相關(guān)博客

ambari-agent 源碼梳理

如果有相關(guān)問(wèn)題的話,可以給我留言,歡迎一起探討ambari-agent !!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末忘瓦,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子引颈,更是在濱河造成了極大的恐慌耕皮,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,692評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蝙场,死亡現(xiàn)場(chǎng)離奇詭異凌停,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)售滤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門罚拟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人完箩,你說(shuō)我怎么就攤上這事赐俗。” “怎么了嗜憔?”我有些...
    開(kāi)封第一講書人閱讀 162,995評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵秃励,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我吉捶,道長(zhǎng)夺鲜,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書人閱讀 58,223評(píng)論 1 292
  • 正文 為了忘掉前任呐舔,我火速辦了婚禮币励,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘珊拼。我一直安慰自己食呻,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,245評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布澎现。 她就那樣靜靜地躺著仅胞,像睡著了一般。 火紅的嫁衣襯著肌膚如雪剑辫。 梳的紋絲不亂的頭發(fā)上干旧,一...
    開(kāi)封第一講書人閱讀 51,208評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音妹蔽,去河邊找鬼椎眯。 笑死,一個(gè)胖子當(dāng)著我的面吹牛胳岂,可吹牛的內(nèi)容都是我干的编整。 我是一名探鬼主播,決...
    沈念sama閱讀 40,091評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼乳丰,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼掌测!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起产园,我...
    開(kāi)封第一講書人閱讀 38,929評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤汞斧,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后淆两,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體断箫,經(jīng)...
    沈念sama閱讀 45,346評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,570評(píng)論 2 333
  • 正文 我和宋清朗相戀三年秋冰,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了仲义。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,739評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡剑勾,死狀恐怖埃撵,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情虽另,我是刑警寧澤暂刘,帶...
    沈念sama閱讀 35,437評(píng)論 5 344
  • 正文 年R本政府宣布,位于F島的核電站捂刺,受9級(jí)特大地震影響谣拣,放射性物質(zhì)發(fā)生泄漏募寨。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,037評(píng)論 3 326
  • 文/蒙蒙 一森缠、第九天 我趴在偏房一處隱蔽的房頂上張望拔鹰。 院中可真熱鬧,春花似錦贵涵、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,677評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至跨晴,卻和暖如春欧聘,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背坟奥。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,833評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工树瞭, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人爱谁。 一個(gè)月前我還...
    沈念sama閱讀 47,760評(píng)論 2 369
  • 正文 我出身青樓晒喷,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親访敌。 傳聞我的和親對(duì)象是個(gè)殘疾皇子凉敲,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,647評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)寺旺,斷路器爷抓,智...
    卡卡羅2017閱讀 134,652評(píng)論 18 139
  • feisky云計(jì)算、虛擬化與Linux技術(shù)筆記posts - 1014, comments - 298, trac...
    不排版閱讀 3,844評(píng)論 0 5
  • 在我搭建基于Spring Cloud的微服務(wù)體系應(yīng)用的時(shí)候所需要或者是常用的屬性配置文件阻塑,還有這些屬性的用途蓝撇,此配...
    StrongManAlone閱讀 4,020評(píng)論 0 18
  • 小時(shí)候看星爺?shù)摹断矂≈酢罚幻靼诪槭裁催@是星爺?shù)慕?jīng)典陈莽,巔峰之作渤昌,也不明白“跑龍?zhí)椎摹焙汀八琅荦執(zhí)椎摹钡膮^(qū)別...
    徐言亂語(yǔ)閱讀 883評(píng)論 1 3
  • 《2018年独柑,孫晶最好的一年》 計(jì)劃書 1 3個(gè)成功 Q:在過(guò)去的一年,你的3個(gè)成功(收獲)是什么私植?從中學(xué)到的價(jià)值...
    黃靜芬閱讀 183評(píng)論 0 0