對于ambari-collector 部分源碼流程的簡單理解

工作中遇到了ambari指標(biāo)的搜集和展示問題惭蹂,因需要增添部分python腳本對平臺的腳本數(shù)據(jù)進(jìn)行匯聚和整理伞插,所以需要理解ambari-collector的部分流程,所以進(jìn)行了簡要的閱讀盾碗,故做此分析媚污,以防止后續(xù)遺忘。以下代碼都存在刪減廷雅,僅供參考耗美。

  • 首先來看一下main.py
from core.controller import Controller
def main(argv=None):
  # Allow Ctrl-C
  stop_handler = bind_signal_handlers()
  server_process_main(stop_handler)
def server_process_main(stop_handler, scmStatus=None):
  if scmStatus is not None:
    scmStatus.reportStartPending()
  config = Configuration()
  _init_logging(config)
  controller = Controller(config, stop_handler)
  logger.info('Starting Server RPC Thread: %s' % ' '.join(sys.argv))
  controller.start()
  print "Server out at: " + SERVER_OUT_FILE
  print "Server log at: " + SERVER_LOG_FILE
  save_pid(os.getpid(), PID_OUT_FILE)
  if scmStatus is not None:
    scmStatus.reportStarted()
  #The controller thread finishes when the stop event is signaled
  controller.join()
  remove_file(PID_OUT_FILE)
  pass

由上述代碼可以看出來,server_process_main 里面實例化了Controller類榜轿,并controller.start()開啟了一個線程,那么我們看一下Controller里面的的代碼(有刪略)

from emitter import Emitter
class Controller(threading.Thread):
  def __init__(self, config, stop_handler):
    # Process initialization code
    threading.Thread.__init__(self)
    self.emitter = Emitter(self.config, self.application_metric_map, stop_handler)
  def run(self):
    self.start_emitter()
    Timer(1, self.addsecond).start()
    while True:
      if (self.event_queue.full()):
        logger.warn('Event Queue full!! Suspending further collections.')
      else:
        self.enqueque_events()
      pass
      if 0 == self._stop_handler.wait(self.sleep_interval):
        logger.info('Shutting down Controller thread')
        break
      if not self._t is None:
        self._t.cancel()
        self._t.join(5)
     self.emitter.join(5)
     pass
  def start_emitter(self):
    self.emitter.start()

run函數(shù)里面執(zhí)行了start_emitter()诬烹,然后對Emitter進(jìn)行實例化幢痘,執(zhí)行了emmitter.start(),接下來我們看一下Emitter的代碼

class Emitter(threading.Thread):
  COLLECTOR_URL = "xxxxx"
  RETRY_SLEEP_INTERVAL = 5
  MAX_RETRY_COUNT = 3
  def __init__(self, config, application_metric_map, stop_handler):
    threading.Thread.__init__(self)
    self.lock = threading.Lock()
    self.collector_address = config.get_server_address()
    self.send_interval = config.get_send_interval()
    self._stop_handler = stop_handler
    self.application_metric_map = application_metric_map
  def run(self):
    while True:
      try:
        self.submit_metrics()
      except Exception, e:
        logger.warn('Unable to emit events. %s' % str(e))
      pass
      if 0 == self._stop_handler.wait(self.send_interval):
        logger.info('Shutting down Emitter thread')
        return
    pass
def submit_metrics(self):
    retry_count = 0
    # This call will acquire lock on the map and clear contents before returning
    # After configured number of retries the data will not be sent to the
    # collector
    json_data = self.application_metric_map.flatten(None, True)
    if json_data is None:
      logger.info("Nothing to emit, resume waiting.")
      return
    pass
    response = None
    while retry_count < self.MAX_RETRY_COUNT:
      try:
        response = self.push_metrics(json_data)
      except Exception, e:
        logger.warn('Error sending metrics to server. %s' % str(e))
      pass
      if response and response.getcode() == 200:
        retry_count = self.MAX_RETRY_COUNT
      else:
        logger.warn("Retrying after {0} ...".format(self.RETRY_SLEEP_INTERVAL))
        retry_count += 1
        #Wait for the service stop event instead of sleeping blindly
        if 0 == self._stop_handler.wait(self.RETRY_SLEEP_INTERVAL):
          return
      pass
    pass
  def push_metrics(self, data):
    headers = {"Content-Type" : "application/json", "Accept" : "*/*"}
    server = self.COLLECTOR_URL.format(self.collector_address.strip())
    logger.info("server: %s" % server)
    logger.debug("message to sent: %s" % data)
    req = urllib2.Request(server, data, headers)
    response = urllib2.urlopen(req, timeout=int(self.send_interval - 10))
    if response:
      logger.debug("POST response from server: retcode = {0}".format(response.getcode()))
      logger.debug(str(response.read()))
    pass
    return response

由上述代碼可以看出來玄妈,run函數(shù)執(zhí)行的時候,執(zhí)行了submit_metrics()函數(shù)瞭郑,重點來了擒权,該函數(shù)的核心就是 json_data = self.application_metric_map.flatten(None, True),當(dāng)前類繼承自ApplicationMetricsMap,讓我們?nèi)ゲ榭匆幌翧pplicationMetricsMap的代碼

def flatten(self, application_id = None, clear_once_flattened = False):
    with self.lock:
      timeline_metrics = { "metrics" : [] }
      local_metric_map = {}
      if application_id:
        if self.app_metric_map.has_key(application_id):
          local_metric_map = { application_id : self.app_metric_map[application_id] }
        else:
          logger.info("application_id: {0}, not present in the map.".format(application_id))
      else:
        local_metric_map = self.app_metric_map.copy()
      pass
      for appId, metrics in local_metric_map.iteritems():
        for metricId, metricData in dict(metrics).iteritems():
          # Create a timeline metric object
          timeline_metric = {
            "hostname" : self.hostname if appId == "HOST" else "",
            "metricname" : metricId,
            #"appid" : "HOST",
            "appid" : appId,
            "instanceid" : "",
            "starttime" : self.get_start_time(appId, metricId),
            "metrics" : metricData
          }
          timeline_metrics[ "metrics" ].append( timeline_metric )
        pass
      pass
      json_data = json.dumps(timeline_metrics) if len(timeline_metrics[ "metrics" ]) > 0 else None
      if clear_once_flattened:
        self.app_metric_map.clear()
      pass
      return json_data
  pass

由此函數(shù)可以看得出來璧尸,該函數(shù)主要就是對數(shù)據(jù)進(jìn)行一些合并,匯聚形成新的數(shù)據(jù)結(jié)構(gòu)蛀序,但是當(dāng)?shù)谝淮卧贑ontroller里面執(zhí)行start_emmiter()時候,該函數(shù)并未執(zhí)行重贺,因為self.app_metric_map的數(shù)據(jù)結(jié)構(gòu)并未生成,讓我們往前看健民,在Controller的run函數(shù)里面有這么一行代碼,self.enqueue_events()崇堵,從字面意思看出來是事件入隊列,讓我們找到該函數(shù)赏廓,最終進(jìn)行相互調(diào)用后是執(zhí)行了process_service_collection_event

 def process_service_collection_event(self, event):
    startTime = int(round(time() * 1000))
    metrics = None
    path = os.path.abspath('.')
    for root, dirs, files in os.walk("%s/libs/" % path):
      appid = event.get_group_name().split('_')[0]
      metricgroup = event.get_group_name().split('_')[1]
      if ("%s_metrics.sh" % appid) in filter(lambda x: ".sh" in x, files):
        metrics = {appid: self.service_info.get_service_metrics(appid, metricgroup)}
      else:
        logger.warn('have no %s modules' % appid)
    if metrics:
      for item in metrics:
        self.application_metric_map.put_metric(item, metrics[item], startTime)
    pass

這段代碼就是執(zhí)行各個服務(wù)的腳本,然后匯聚數(shù)據(jù)驱负,最終生成metrics變量,然后執(zhí)行了self.application_metric_map.put_metric(item, metrics[item], startTime),這個application_metric_map其實就是ApplicationMetricMap類的實例拼缝,其中有一個函數(shù)如下所示:

 def put_metric(self, application_id, metric_id_to_value_map, timestamp):
    with self.lock:
      for metric_name, value in metric_id_to_value_map.iteritems():
        metric_map = self.app_metric_map.get(application_id)
        if not metric_map:
          metric_map = { metric_name : { timestamp : value } }
          self.app_metric_map[ application_id ] = metric_map
        else:
          metric_id_map = metric_map.get(metric_name)
          if not metric_id_map:
            metric_id_map = { timestamp : value }
            metric_map[ metric_name ] = metric_id_map
          else:
            metric_map[ metric_name ].update( { timestamp : value } )
          pass
        pass
  pass

其實這段代碼主要是從腳本中搜集的數(shù)據(jù)任斋,形成最終的app_metric_map數(shù)據(jù),在Controller中一直被無限調(diào)用,只是我們第一次執(zhí)行start_emitter()時候并未執(zhí)行而已趴俘,當(dāng)從腳本中搜集到數(shù)據(jù),才會執(zhí)行真正的調(diào)用疲憋,然后通過requests模塊埃脏,上報到 metrics collector的6188端口中佩谷,最終數(shù)據(jù)落于hbase中谐檀。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末溃肪,一起剝皮案震驚了整個濱河市躺涝,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌诱建,老刑警劉巖碟绑,帶你破解...
    沈念sama閱讀 212,816評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異,居然都是意外死亡惭蟋,警方通過查閱死者的電腦和手機(jī)癌佩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人铅匹,你說我怎么就攤上這事策精⊙玻” “怎么了哆档?”我有些...
    開封第一講書人閱讀 158,300評論 0 348
  • 文/不壞的土叔 我叫張陵杠巡,是天一觀的道長。 經(jīng)常有香客問我,道長,這世上最難降的妖魔是什么骂束? 我笑而不...
    開封第一講書人閱讀 56,780評論 1 285
  • 正文 為了忘掉前任,我火速辦了婚禮昆汹,結(jié)果婚禮上愚争,老公的妹妹穿的比我還像新娘鞍陨。我一直安慰自己,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,890評論 6 385
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 50,084評論 1 291
  • 那天兢孝,我揣著相機(jī)與錄音窗轩,去河邊找鬼。 笑死,一個胖子當(dāng)著我的面吹牛陵吸,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播谆构,決...
    沈念sama閱讀 39,151評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼粱哼,長吁一口氣:“原來是場噩夢啊……” “哼绊含!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,912評論 0 268
  • 序言:老撾萬榮一對情侶失蹤眉反,失蹤者是張志新(化名)和其女友劉穎昙啄,沒想到半個月后韧拒,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體十性,經(jīng)...
    沈念sama閱讀 44,355評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,666評論 2 327
  • 正文 我和宋清朗相戀三年草雕,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,809評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡纯路,死狀恐怖霹抛,靈堂內(nèi)的尸體忽然破棺而出杯拐,到底是詐尸還是另有隱情霞篡,我是刑警寧澤,帶...
    沈念sama閱讀 34,504評論 4 334
  • 正文 年R本政府宣布端逼,位于F島的核電站朗兵,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏顶滩。R本人自食惡果不足惜余掖,卻給世界環(huán)境...
    茶點故事閱讀 40,150評論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望礁鲁。 院中可真熱鬧浊吏,春花似錦、人聲如沸救氯。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽着憨。三九已至墩衙,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間甲抖,已是汗流浹背漆改。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評論 1 267
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留准谚,地道東北人挫剑。 一個月前我還...
    沈念sama閱讀 46,628評論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像柱衔,于是被迫代替她去往敵國和親樊破。 傳聞我的和親對象是個殘疾皇子愉棱,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,724評論 2 351