工作中遇到了ambari指標(biāo)的搜集和展示問題惭蹂,因需要增添部分python腳本對平臺的腳本數(shù)據(jù)進(jìn)行匯聚和整理伞插,所以需要理解ambari-collector的部分流程,所以進(jìn)行了簡要的閱讀盾碗,故做此分析媚污,以防止后續(xù)遺忘。以下代碼都存在刪減廷雅,僅供參考耗美。
- 首先來看一下main.py
from core.controller import Controller
def main(argv=None):
# Allow Ctrl-C
stop_handler = bind_signal_handlers()
server_process_main(stop_handler)
def server_process_main(stop_handler, scmStatus=None):
if scmStatus is not None:
scmStatus.reportStartPending()
config = Configuration()
_init_logging(config)
controller = Controller(config, stop_handler)
logger.info('Starting Server RPC Thread: %s' % ' '.join(sys.argv))
controller.start()
print "Server out at: " + SERVER_OUT_FILE
print "Server log at: " + SERVER_LOG_FILE
save_pid(os.getpid(), PID_OUT_FILE)
if scmStatus is not None:
scmStatus.reportStarted()
#The controller thread finishes when the stop event is signaled
controller.join()
remove_file(PID_OUT_FILE)
pass
由上述代碼可以看出來,server_process_main 里面實例化了Controller類榜轿,并controller.start()開啟了一個線程,那么我們看一下Controller里面的的代碼(有刪略)
from emitter import Emitter
class Controller(threading.Thread):
def __init__(self, config, stop_handler):
# Process initialization code
threading.Thread.__init__(self)
self.emitter = Emitter(self.config, self.application_metric_map, stop_handler)
def run(self):
self.start_emitter()
Timer(1, self.addsecond).start()
while True:
if (self.event_queue.full()):
logger.warn('Event Queue full!! Suspending further collections.')
else:
self.enqueque_events()
pass
if 0 == self._stop_handler.wait(self.sleep_interval):
logger.info('Shutting down Controller thread')
break
if not self._t is None:
self._t.cancel()
self._t.join(5)
self.emitter.join(5)
pass
def start_emitter(self):
self.emitter.start()
run函數(shù)里面執(zhí)行了start_emitter()诬烹,然后對Emitter進(jìn)行實例化幢痘,執(zhí)行了emmitter.start(),接下來我們看一下Emitter的代碼
class Emitter(threading.Thread):
COLLECTOR_URL = "xxxxx"
RETRY_SLEEP_INTERVAL = 5
MAX_RETRY_COUNT = 3
def __init__(self, config, application_metric_map, stop_handler):
threading.Thread.__init__(self)
self.lock = threading.Lock()
self.collector_address = config.get_server_address()
self.send_interval = config.get_send_interval()
self._stop_handler = stop_handler
self.application_metric_map = application_metric_map
def run(self):
while True:
try:
self.submit_metrics()
except Exception, e:
logger.warn('Unable to emit events. %s' % str(e))
pass
if 0 == self._stop_handler.wait(self.send_interval):
logger.info('Shutting down Emitter thread')
return
pass
def submit_metrics(self):
retry_count = 0
# This call will acquire lock on the map and clear contents before returning
# After configured number of retries the data will not be sent to the
# collector
json_data = self.application_metric_map.flatten(None, True)
if json_data is None:
logger.info("Nothing to emit, resume waiting.")
return
pass
response = None
while retry_count < self.MAX_RETRY_COUNT:
try:
response = self.push_metrics(json_data)
except Exception, e:
logger.warn('Error sending metrics to server. %s' % str(e))
pass
if response and response.getcode() == 200:
retry_count = self.MAX_RETRY_COUNT
else:
logger.warn("Retrying after {0} ...".format(self.RETRY_SLEEP_INTERVAL))
retry_count += 1
#Wait for the service stop event instead of sleeping blindly
if 0 == self._stop_handler.wait(self.RETRY_SLEEP_INTERVAL):
return
pass
pass
def push_metrics(self, data):
headers = {"Content-Type" : "application/json", "Accept" : "*/*"}
server = self.COLLECTOR_URL.format(self.collector_address.strip())
logger.info("server: %s" % server)
logger.debug("message to sent: %s" % data)
req = urllib2.Request(server, data, headers)
response = urllib2.urlopen(req, timeout=int(self.send_interval - 10))
if response:
logger.debug("POST response from server: retcode = {0}".format(response.getcode()))
logger.debug(str(response.read()))
pass
return response
由上述代碼可以看出來玄妈,run函數(shù)執(zhí)行的時候,執(zhí)行了submit_metrics()函數(shù)瞭郑,重點來了擒权,該函數(shù)的核心就是 json_data = self.application_metric_map.flatten(None, True),當(dāng)前類繼承自ApplicationMetricsMap,讓我們?nèi)ゲ榭匆幌翧pplicationMetricsMap的代碼
def flatten(self, application_id = None, clear_once_flattened = False):
with self.lock:
timeline_metrics = { "metrics" : [] }
local_metric_map = {}
if application_id:
if self.app_metric_map.has_key(application_id):
local_metric_map = { application_id : self.app_metric_map[application_id] }
else:
logger.info("application_id: {0}, not present in the map.".format(application_id))
else:
local_metric_map = self.app_metric_map.copy()
pass
for appId, metrics in local_metric_map.iteritems():
for metricId, metricData in dict(metrics).iteritems():
# Create a timeline metric object
timeline_metric = {
"hostname" : self.hostname if appId == "HOST" else "",
"metricname" : metricId,
#"appid" : "HOST",
"appid" : appId,
"instanceid" : "",
"starttime" : self.get_start_time(appId, metricId),
"metrics" : metricData
}
timeline_metrics[ "metrics" ].append( timeline_metric )
pass
pass
json_data = json.dumps(timeline_metrics) if len(timeline_metrics[ "metrics" ]) > 0 else None
if clear_once_flattened:
self.app_metric_map.clear()
pass
return json_data
pass
由此函數(shù)可以看得出來璧尸,該函數(shù)主要就是對數(shù)據(jù)進(jìn)行一些合并,匯聚形成新的數(shù)據(jù)結(jié)構(gòu)蛀序,但是當(dāng)?shù)谝淮卧贑ontroller里面執(zhí)行start_emmiter()時候,該函數(shù)并未執(zhí)行重贺,因為self.app_metric_map的數(shù)據(jù)結(jié)構(gòu)并未生成,讓我們往前看健民,在Controller的run函數(shù)里面有這么一行代碼,self.enqueue_events()崇堵,從字面意思看出來是事件入隊列,讓我們找到該函數(shù)赏廓,最終進(jìn)行相互調(diào)用后是執(zhí)行了process_service_collection_event
def process_service_collection_event(self, event):
startTime = int(round(time() * 1000))
metrics = None
path = os.path.abspath('.')
for root, dirs, files in os.walk("%s/libs/" % path):
appid = event.get_group_name().split('_')[0]
metricgroup = event.get_group_name().split('_')[1]
if ("%s_metrics.sh" % appid) in filter(lambda x: ".sh" in x, files):
metrics = {appid: self.service_info.get_service_metrics(appid, metricgroup)}
else:
logger.warn('have no %s modules' % appid)
if metrics:
for item in metrics:
self.application_metric_map.put_metric(item, metrics[item], startTime)
pass
這段代碼就是執(zhí)行各個服務(wù)的腳本,然后匯聚數(shù)據(jù)驱负,最終生成metrics變量,然后執(zhí)行了self.application_metric_map.put_metric(item, metrics[item], startTime),這個application_metric_map其實就是ApplicationMetricMap類的實例拼缝,其中有一個函數(shù)如下所示:
def put_metric(self, application_id, metric_id_to_value_map, timestamp):
with self.lock:
for metric_name, value in metric_id_to_value_map.iteritems():
metric_map = self.app_metric_map.get(application_id)
if not metric_map:
metric_map = { metric_name : { timestamp : value } }
self.app_metric_map[ application_id ] = metric_map
else:
metric_id_map = metric_map.get(metric_name)
if not metric_id_map:
metric_id_map = { timestamp : value }
metric_map[ metric_name ] = metric_id_map
else:
metric_map[ metric_name ].update( { timestamp : value } )
pass
pass
pass
其實這段代碼主要是從腳本中搜集的數(shù)據(jù)任斋,形成最終的app_metric_map數(shù)據(jù),在Controller中一直被無限調(diào)用,只是我們第一次執(zhí)行start_emitter()時候并未執(zhí)行而已趴俘,當(dāng)從腳本中搜集到數(shù)據(jù),才會執(zhí)行真正的調(diào)用疲憋,然后通過requests模塊埃脏,上報到 metrics collector的6188端口中佩谷,最終數(shù)據(jù)落于hbase中谐檀。