簡介
processing是gnocchi處理數(shù)據(jù)的核心流程。
程序入口:
class MetricProcessor(MetricProcessBase):
name = "processing"
def __init__(self, worker_id, conf, queue):
super(MetricProcessor, self).__init__(worker_id, conf, 0)
self.queue = queue
def _run_job(self):
try:
try:
metrics = self.queue.get(block=True, timeout=10)
except six.moves.queue.Empty:
# NOTE(sileht): Allow the process to exit gracefully every
# 10 seconds
return
self.store.process_background_tasks(self.index, metrics)
except Exception:
LOG.error("Unexpected error during measures processing",
exc_info=True)
解析:
1.metrics是從多進(jìn)程隊(duì)列queue中獲取metrics值煮嫌,metrics列表是metrics id列表
(Pdb) metrics
[u'a08ca3ac-d834-4bf6-b89d-1f8576976287', u'd4bd1d46-8df2-4436-8b39-fba6e732b2b5', u'11065ce7-1baf-425d-916a-4ff8e37423d7', u'9c8310fe-3110-49fe-85c5-49d91c283dcc']
gnocchi.storage.StorageDriver.process_background_tasks
def process_background_tasks(self, index, metrics, sync=False):
"""Process background tasks for this storage.
This calls :func:`process_new_measures` to process new measures
:param index: An indexer to be used for querying metrics
:param block_size: number of metrics to process
:param sync: If True, then process everything synchronously and raise
on error
:type sync: bool
"""
LOG.debug("Processing new measures")
try:
self.process_new_measures(index, metrics, sync)
except Exception:
if sync:
raise
LOG.error("Unexpected error during measures processing",
exc_info=True)
gnocchi.storage._carbonara.CarbonaraBasedStorage.process_new_measures
def process_new_measures(self, indexer, metrics_to_process,
sync=False):
metrics = indexer.list_metrics(ids=metrics_to_process)
# This build the list of deleted metrics, i.e. the metrics we have
# measures to process for but that are not in the indexer anymore.
deleted_metrics_id = (set(map(uuid.UUID, metrics_to_process))
- set(m.id for m in metrics))
for metric_id in deleted_metrics_id:
# NOTE(jd): We need to lock the metric otherwise we might delete
# measures that another worker might be processing. Deleting
# measurement files under its feet is not nice!
try:
with self._lock(metric_id)(blocking=sync):
self.incoming.delete_unprocessed_measures_for_metric_id(
metric_id)
except coordination.LockAcquireFailed:
LOG.debug("Cannot acquire lock for metric %s, postponing "
"unprocessed measures deletion", metric_id)
for metric in metrics:
lock = self._lock(metric.id)
# Do not block if we cannot acquire the lock, that means some other
# worker is doing the job. We'll just ignore this metric and may
# get back later to it if needed.
if not lock.acquire(blocking=sync):
continue
try:
locksw = timeutils.StopWatch().start()
LOG.debug("Processing measures for %s", metric)
with self.incoming.process_measure_for_metric(metric) \
as measures:
self._compute_and_store_timeseries(metric, measures)
LOG.debug("Metric %s locked during %.2f seconds",
metric.id, locksw.elapsed())
except Exception:
LOG.debug("Metric %s locked during %.2f seconds",
metric.id, locksw.elapsed())
if sync:
raise
LOG.error("Error processing new measures", exc_info=True)
finally:
lock.release()
解析:
1.根據(jù)metrics_id查找到metrics對象氨距,將被刪除的metrics從redis中刪除掉。
2.self.incoming.process_measure_for_metric(metric) 這個(gè)函數(shù)從redis中獲取measures對象竞端,是一個(gè)帶時(shí)間戳和value的變量的對象[(Timestamp('2020-12-25 08:03:34.551648'), 0.0)]
(Pdb) metrics
[<Metric 11065ce7-1baf-425d-916a-4ff8e37423d7>, <Metric 9c8310fe-3110-49fe-85c5-49d91c283dcc>, <Metric a08ca3ac-d834-4bf6-b89d-1f8576976287>, <Metric d4bd1d46-8df2-4436-8b39-fba6e732b2b5>]
(Pdb) metrics[0].__dict__
{'status': u'active', '_sa_instance_state': <sqlalchemy.orm.state.InstanceState object at 0x7fc3cc482a50>, 'name': u'disk.read.requests.rate', 'creator': u'4c90cbb7dfaa47798790c3cd411e4575:7e88c2ceced8412dab3d80ed131cf53a', 'resource_id': UUID('be25a992-e971-488c-a4c9-64bd6f6bb753'), 'archive_policy': <gnocchi.indexer.sqlalchemy_base.ArchivePolicy object at 0x7fc3cc482bd0>, 'archive_policy_name': u'frequency_300s', 'id': UUID('11065ce7-1baf-425d-916a-4ff8e37423d7'), 'unit': u'request/s'}
(Pdb) measures
[(Timestamp('2020-12-25 08:03:34.551648'), 0.0), (Timestamp('2020-12-25 08:08:34.615949'), 0.0), (Timestamp('2020-12-25 08:13:34.586165'), 0.0)]
gnocchi.storage._carbonara.CarbonaraBasedStorage._compute_and_store_timeseries
def _compute_and_store_timeseries(self, metric, measures):
# NOTE(mnaser): The metric could have been handled by
# another worker, ignore if no measures.
if len(measures) == 0:
LOG.debug("Skipping %s (already processed)", metric)
return
measures = sorted(measures, key=operator.itemgetter(0))
agg_methods = list(metric.archive_policy.aggregation_methods)
block_size = metric.archive_policy.max_block_size
back_window = metric.archive_policy.back_window
definition = metric.archive_policy.definition
try:
ts = self._get_unaggregated_timeserie_and_unserialize(
metric, block_size=block_size, back_window=back_window)
except storage.MetricDoesNotExist:
try:
self._create_metric(metric)
except storage.MetricAlreadyExists:
# Created in the mean time, do not worry
pass
ts = None
except CorruptionError as e:
LOG.error(e)
ts = None
if ts is None:
# This is the first time we treat measures for this
# metric, or data are corrupted, create a new one
ts = carbonara.BoundTimeSerie(block_size=block_size,
back_window=back_window)
current_first_block_timestamp = None
else:
current_first_block_timestamp = ts.first_block_timestamp()
# NOTE(jd) This is Python where you need such
# hack to pass a variable around a closure,
# sorry.
computed_points = {"number": 0}
def _map_add_measures(bound_timeserie):
# NOTE (gordc): bound_timeserie is entire set of
# unaggregated measures matching largest
# granularity. the following takes only the points
# affected by new measures for specific granularity
tstamp = max(bound_timeserie.first, measures[0][0])
new_first_block_timestamp = bound_timeserie.first_block_timestamp()
computed_points['number'] = len(bound_timeserie)
for d in definition:
ts = bound_timeserie.group_serie(
d.granularity, carbonara.round_timestamp(
tstamp, d.granularity * 10e8))
self._map_in_thread(
self._add_measures,
((aggregation, d, metric, ts,
current_first_block_timestamp,
new_first_block_timestamp)
for aggregation in agg_methods))
with timeutils.StopWatch() as sw:
ts.set_values(measures,
before_truncate_callback=_map_add_measures,
ignore_too_old_timestamps=True)
elapsed = sw.elapsed()
number_of_operations = (len(agg_methods) * len(definition))
perf = ""
if elapsed > 0:
perf = " (%d points/s, %d measures/s)" % (
((number_of_operations * computed_points['number']) /
elapsed),
((number_of_operations * len(measures)) / elapsed)
)
LOG.debug("Computed new metric %s with %d new measures "
"in %.2f seconds%s",
metric.id, len(measures), elapsed, perf)
self._store_unaggregated_timeserie(metric, ts.serialize())
解析:
1.在Gnocchi中有三層數(shù)據(jù)川蒙,resources -> metric -> measure
resoures為資源,每個(gè)resource下有很多的計(jì)量項(xiàng)metric蹈矮。計(jì)量項(xiàng)下面有具體的采樣數(shù)據(jù)
2.ts包含了之前存存儲在ceph里面存儲的(時(shí)間戳砰逻,value值),采樣點(diǎn)泛鸟,粒度蝠咆,時(shí)間間隔等信息。measure里面存放著的當(dāng)前的需要?dú)w檔的數(shù)據(jù)北滥。
(Pdb) metric.__dict__
{'status': u'active', '_sa_instance_state': <sqlalchemy.orm.state.InstanceState object at 0x7fc3cc482a50>, 'name': u'disk.read.requests.rate', 'creator': u'4c90cbb7dfaa47798790c3cd411e4575:7e88c2ceced8412dab3d80ed131cf53a', 'resource_id': UUID('be25a992-e971-488c-a4c9-64bd6f6bb753'), 'archive_policy': <gnocchi.indexer.sqlalchemy_base.ArchivePolicy object at 0x7fc3cc482bd0>, 'archive_policy_name': u'frequency_300s', 'id': UUID('11065ce7-1baf-425d-916a-4ff8e37423d7'), 'unit': u'request/s'}
(Pdb) measures
[(Timestamp('2020-12-25 08:03:34.551648'), 0.0), (Timestamp('2020-12-25 08:08:34.615949'), 0.0), (Timestamp('2020-12-25 08:13:34.586165'), 0.0)]
(Pdb) agg_methods
[u'max', u'mean', u'min']
(Pdb) block_size
86400.0
(Pdb) back_window
0
(Pdb) definition
[{'points': 300, 'granularity': 300.0, 'timespan': 90000.0}, {'points': 100, 'granularity': 900.0, 'timespan': 90000.0}, {'points': 100, 'granularity': 7200.0, 'timespan': 720000.0}, {'points': 200, 'granularity': 86400.0, 'timespan': 17280000.0}]
(Pdb) ts.__dict__
{'back_window': 0, 'block_size': <86400000000000 * Nanos>, 'ts': 2020-12-25 03:59:50.191696 9.739510
2020-12-25 04:04:50.157824 0.120014
2020-12-25 04:09:50.118074 0.000000
2020-12-25 04:14:50.238515 0.000000
2020-12-25 04:19:50.091682 0.000000
2020-12-25 04:24:50.074567 0.000000
2020-12-25 04:29:50.144667 0.046656
2020-12-25 04:34:50.078793 0.000000
2020-12-25 04:39:50.125287 0.000000
2020-12-25 04:44:50.285588 0.000000
2020-12-25 04:49:50.180779 0.003334
2020-12-25 04:54:50.130977 0.000000
2020-12-25 04:59:50.134970 0.000000
2020-12-25 05:04:50.273585 0.000000
2020-12-25 05:09:50.102544 0.003335
2020-12-25 05:14:50.192206 0.000000
2020-12-25 05:19:50.158147 0.000000
2020-12-25 05:24:50.157283 0.000000
2020-12-25 05:29:50.099026 0.000000
2020-12-25 05:34:50.141446 0.000000
2020-12-25 05:39:50.280643 0.000000
2020-12-25 05:43:34.498113 0.000000
2020-12-25 05:48:34.481620 0.000000
2020-12-25 05:53:34.586508 0.000000
2020-12-25 05:58:34.522015 0.000000
2020-12-25 06:03:34.555708 0.000000
2020-12-25 06:08:34.530322 0.000000
2020-12-25 06:13:34.681981 0.000000
2020-12-25 06:18:34.551089 0.000000
2020-12-25 06:23:34.500688 0.000000
2020-12-25 06:28:34.542119 0.000000
2020-12-25 06:33:34.508253 0.000000
2020-12-25 06:39:50.190674 18.369239
2020-12-25 06:44:50.328215 0.663029
2020-12-25 06:48:34.701798 0.000000
2020-12-25 06:53:34.569040 0.000000
2020-12-25 06:58:34.538735 0.000000
2020-12-25 07:03:34.554803 0.106661
2020-12-25 07:08:34.622109 0.000000
2020-12-25 07:13:34.498547 0.000000
2020-12-25 07:18:34.508618 0.000000
2020-12-25 07:23:34.799742 0.000000
2020-12-25 07:28:34.562884 0.000000
2020-12-25 07:33:34.649724 0.000000
2020-12-25 07:38:34.593196 0.000000
2020-12-25 07:43:34.584885 0.000000
2020-12-25 07:48:34.524464 0.000000
2020-12-25 07:53:34.682062 0.000000
2020-12-25 07:58:34.659174 0.000000
dtype: float64}
(Pdb) current_first_block_timestamp
Timestamp('2020-12-25 00:00:00')
_map_add_measures函數(shù)中變量:
(Pdb) tstamp
Timestamp('2020-12-25 08:03:34.551648')
(Pdb) new_first_block_timestamp
Timestamp('2020-12-25 00:00:00')
(Pdb) definition
[{'points': 300, 'granularity': 300.0, 'timespan': 90000.0}, {'points': 100, 'granularity': 900.0, 'timespan': 90000.0}, {'points': 100, 'granularity': 7200.0, 'timespan': 720000.0}, {'points': 200, 'granularity': 86400.0, 'timespan': 17280000.0}]
(Pdb) ts
<gnocchi.carbonara.GroupedTimeSeries object at 0x7fc3cc482150>
(Pdb) ts.__dict__
{'tstamps': array([ 1.60888320e+18, 1.60888350e+18, 1.60888380e+18]), 'counts': array([1, 1, 1]), '_ts': 2020-12-25 08:03:34.551648 0.0
2020-12-25 08:08:34.615949 0.0
2020-12-25 08:13:34.586165 0.0
dtype: float64, 'indexes': array([ 1.60888320e+18, 1.60888350e+18, 1.60888380e+18])
gnocchi.storage._carbonara.CarbonaraBasedStorage._get_unaggregated_timeserie_and_unserialize
解析:
- raw_measures是從ceph中取出的值刚操,經(jīng)過lz4算法壓縮過的數(shù)據(jù),里面一部分存時(shí)間戳再芋,一部分存value值菊霜。
- carbonara.BoundTimeSerie.unserialize函數(shù)將這個(gè)壓縮過的數(shù)據(jù),反序列話處理济赎。
def _get_unaggregated_timeserie_and_unserialize(
self, metric, block_size, back_window):
"""Retrieve unaggregated timeserie for a metric and unserialize it.
Returns a gnocchi.carbonara.BoundTimeSerie object. If the data cannot
be retrieved, returns None.
"""
with timeutils.StopWatch() as sw:
raw_measures = (
self._get_unaggregated_timeserie(
metric)
)
LOG.debug(
"Retrieve unaggregated measures "
"for %s in %.2fs",
metric.id, sw.elapsed())
try:
return carbonara.BoundTimeSerie.unserialize(
raw_measures, block_size, back_window)
except ValueError:
raise CorruptionError(
"Data corruption detected for %s "
"unaggregated timeserie" % metric.id)
(Pdb) raw_measures
'\x10\x03\x00\x00\xf1\x04\x80\xa8\x96\xf9\xa3\xd9S\x16\x80\xdf_\xd7E\x00\x00\x00\x90.\x06\x08\x00\xf1=\xa8\x80\x92\xe0E\x00\x00\x00\x989\xa4\xd0E\x00\x00\x00\x88\x90_\xd8E\x00\x00\x00 \\\x92\xddE\x00\x00\x00\xb0\x8fw\xd5E\x00\x00\x000)*\xdcE\x00\x00\x00\xc8\xb7\xf2\xe2E\x00\x00\x00\xd8u%\xd3E\x00\x00\x00\xf0\xccl\xd6E\x00\x00\x00\xa8\xa5\xa1\xd9\x18\x00\xb1\xd0\xa7\xe1E\x00\x00\x00\x18\xd72\xcf8\x00\xa1\xda\xbc\xdeE\x00\x00\x00\x08\x05]p\x00A\x00\x89W\xd9p\x00!\xc9\xebX\x00\xb1 \xff\xeb\xdbE\x00\x00\x00H\xb2\xb08\x00\xb10\x14g44\x00\x00\x008\x0ei\x88\x00\xb1\xc0.\xa5\xdfE\x00\x00\x008\xa2\x8c0\x001H\xd5f0\x001\xf0[\xe1P\x001\xf8\xd9n\x90\x00A w\x97\xd1x\x001\xa9c\xd6\x88\x00"\xe7\xdc(\x00\x12\xf6\xf0\x00\xb1\x08\xd1jxW\x00\x00\x00\x88m\x97h\x00\xb2\x98,\xb5=4\x00\x00\x00\x10\xfez8\x00!M\x96P\x00\xb1\xa0\xe5Y\xdaE\x00\x00\x00\x10\xbag\x00\x01A\xf0O\x07\xd2P\x00!c\xfe\xb8\x00\xf1\x04 \xeb\xbe\xeaE\x00\x00\x00p\x8dF\xcbE\x00\x00\x00\xc0\xca\x91\xe0\x001\x80+\x06x\x001(\xe7\xe5\xb8\x001x\xc4\xca\xb0\x0010y\xc9\xa0\x001\xc0y\x07\x18\x00\xff\x02\xcdfX\x1c\xa1z#@\xf0m\xbeA5\xb9\xbe?\x00\x01\x00\x0c\x7f\x07\xfc\xb5\xafC\xe3\xa7(\x00\x06\x7fu\xe2\x9c"\xf3Pk \x00\x06o\xf51\x88\x96~R \x00\x07\x0f\x02\x00]\xff\x00#\xe2qj\x86^2@\xbbo\x1bV\x897\xe5\xb8\x00\x06\x7f\xabL\xc9\xdc!N\xbb \x00\x06\x0f\x02\x00(P\x00\x00\x00\x00\x00'
gnocchi.storage.ceph.CephStorage._get_unaggregated_timeserie
解析:
-
build_unaggregated_timeserie_path函數(shù)拼接一個(gè)路徑的值:
格式gnocchi<$metric.id>_none_v3 - 根據(jù)這個(gè)路徑去ceph里面去ioctx.read讀取壓縮過的數(shù)據(jù)
def _get_unaggregated_timeserie(self, metric, version=3):
try:
return self._get_object_content(
self._build_unaggregated_timeserie_path(metric, version))
except rados.ObjectNotFound:
raise storage.MetricDoesNotExist(metric)
@staticmethod
def _build_unaggregated_timeserie_path(metric, version):
return (('gnocchi_%s_none' % metric.id)
+ ("_v%s" % version if version else ""))
gnocchi.storage.ceph.CephStorage._get_object_content
def _get_object_content(self, name):
offset = 0
content = b''
while True:
data = self.ioctx.read(name, offset=offset)
if not data:
break
content += data
offset += len(data)
return content
gnocchi.carbonara.BoundTimeSerie.unserialize
解析:
1.lz4解壓處理鉴逞,篩選出時(shí)間戳和value值
2.可以看出時(shí)間戳是之前所有的時(shí)間戳
@classmethod
def unserialize(cls, data, block_size, back_window):
uncompressed = lz4.loads(data)
nb_points = (
len(uncompressed) // cls._SERIALIZATION_TIMESTAMP_VALUE_LEN
)
timestamps_raw = uncompressed[
:nb_points*cls._SERIALIZATION_TIMESTAMP_LEN]
timestamps = numpy.frombuffer(timestamps_raw, dtype='<Q')
timestamps = numpy.cumsum(timestamps)
timestamps = numpy.array(timestamps, dtype='datetime64[ns]')
values_raw = uncompressed[nb_points*cls._SERIALIZATION_TIMESTAMP_LEN:]
values = numpy.frombuffer(values_raw, dtype='<d')
return cls.from_data(
pandas.to_datetime(timestamps),
values,
block_size=block_size,
back_window=back_window)
(Pdb) uncompressed
'\x80\xa8\x96\xf9\xa3\xd9S\x16\x80\xdf_\xd7E\x00\x00\x00\x90.\x06\xd7E\x00\x00\x00\xa8\x80\x92\xe0E\x00\x00\x00\x989\xa4\xd0E\x00\x00\x00\x88\x90_\xd8E\x00\x00\x00 \\\x92\xddE\x00\x00\x00\xb0\x8fw\xd5E\x00\x00\x000)*\xdcE\x00\x00\x00\xc8\xb7\xf2\xe2E\x00\x00\x00\xd8u%\xd3E\x00\x00\x00\xf0\xccl\xd6E\x00\x00\x00\xa8\xa5\xa1\xd9E\x00\x00\x00\xd8\xd0\xa7\xe1E\x00\x00\x00\x18\xd72\xcfE\x00\x00\x000\xda\xbc\xdeE\x00\x00\x00\x08\x05]\xd7E\x00\x00\x00\x00\x89W\xd9E\x00\x00\x00\x98\xc9\xeb\xd5E\x00\x00\x00 \xff\xeb\xdbE\x00\x00\x00H\xb2\xb0\xe1E\x00\x00\x000\x14g44\x00\x00\x008\x0ei\xd8E\x00\x00\x00\xc0.\xa5\xdfE\x00\x00\x008\xa2\x8c\xd5E\x00\x00\x00H\xd5f\xdbE\x00\x00\x00\xf0[\xe1\xd7E\x00\x00\x00\xf8\xd9n\xe2E\x00\x00\x00 w\x97\xd1E\x00\x00\x00\x18\xa9c\xd6E\x00\x00\x00\xd8\xe7\xdc\xdbE\x00\x00\x00\xf0\xf6_\xd7E\x00\x00\x00\x08\xd1jxW\x00\x00\x00\x88m\x97\xe1E\x00\x00\x00\x98,\xb5=4\x00\x00\x00\x10\xfez\xd1E\x00\x00\x00\x18M\x96\xd7E\x00\x00\x00\xa0\xe5Y\xdaE\x00\x00\x00\x10\xbag\xddE\x00\x00\x00\xf0O\x07\xd2E\x00\x00\x00\xd8c\xfe\xd9E\x00\x00\x00 \xeb\xbe\xeaE\x00\x00\x00p\x8dF\xcbE\x00\x00\x00\xc0\xca\x91\xdeE\x00\x00\x00\x80+\x06\xd6E\x00\x00\x00(\xe7\xe5\xd8E\x00\x00\x00x\xc4\xca\xd5E\x00\x00\x000y\xc9\xe2E\x00\x00\x00\xc0y\x07\xd8E\x00\x00\x00\xcdfX\x1c\xa1z#@\xf0m\xbeA5\xb9\xbe?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07\xfc\xb5\xafC\xe3\xa7?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00u\xe2\x9c"\xf3Pk?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf51\x88\x96~Rk?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00#\xe2qj\x86^2@\xbbo\x1bV\x897\xe5?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xabL\xc9\xdc!N\xbb?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
(Pdb) cls._SERIALIZATION_TIMESTAMP_VALUE_LEN
16
(Pdb) timestamps
array(['2020-12-25T03:59:50.191696000', '2020-12-25T04:04:50.157824000',
'2020-12-25T04:09:50.118074000', '2020-12-25T04:14:50.238515000',
'2020-12-25T04:19:50.091682000', '2020-12-25T04:24:50.074567000',
'2020-12-25T04:29:50.144667000', '2020-12-25T04:34:50.078793000',
'2020-12-25T04:39:50.125287000', '2020-12-25T04:44:50.285588000',
'2020-12-25T04:49:50.180779000', '2020-12-25T04:54:50.130977000',
'2020-12-25T04:59:50.134970000', '2020-12-25T05:04:50.273585000',
'2020-12-25T05:09:50.102544000', '2020-12-25T05:14:50.192206000',
'2020-12-25T05:19:50.158147000', '2020-12-25T05:24:50.157283000',
'2020-12-25T05:29:50.099026000', '2020-12-25T05:34:50.141446000',
'2020-12-25T05:39:50.280643000', '2020-12-25T05:43:34.498113000',
'2020-12-25T05:48:34.481620000', '2020-12-25T05:53:34.586508000',
'2020-12-25T05:58:34.522015000', '2020-12-25T06:03:34.555708000',
'2020-12-25T06:08:34.530322000', '2020-12-25T06:13:34.681981000',
'2020-12-25T06:18:34.551089000', '2020-12-25T06:23:34.500688000',
'2020-12-25T06:28:34.542119000', '2020-12-25T06:33:34.508253000',
'2020-12-25T06:39:50.190674000', '2020-12-25T06:44:50.328215000',
'2020-12-25T06:48:34.701798000', '2020-12-25T06:53:34.569040000',
'2020-12-25T06:58:34.538735000', '2020-12-25T07:03:34.554803000',
'2020-12-25T07:08:34.622109000', '2020-12-25T07:13:34.498547000',
'2020-12-25T07:18:34.508618000', '2020-12-25T07:23:34.799742000',
'2020-12-25T07:28:34.562884000', '2020-12-25T07:33:34.649724000',
'2020-12-25T07:38:34.593196000', '2020-12-25T07:43:34.584885000',
'2020-12-25T07:48:34.524464000', '2020-12-25T07:53:34.682062000',
'2020-12-25T07:58:34.659174000'], dtype='datetime64[ns]')
(Pdb) values_raw
'\xcdfX\x1c\xa1z#@\xf0m\xbeA5\xb9\xbe?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07\xfc\xb5\xafC\xe3\xa7?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00u\xe2\x9c"\xf3Pk?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf51\x88\x96~Rk?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00#\xe2qj\x86^2@\xbbo\x1bV\x897\xe5?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xabL\xc9\xdc!N\xbb?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
(Pdb) values
array([ 9.73951043e+00, 1.20013550e-01, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
4.66557648e-02, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 3.33449828e-03, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 3.33523487e-03,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 1.83692385e+01,
6.63029354e-01, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 1.06660954e-01, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00])
解析:
1.set_values函數(shù),將redis中的measure值調(diào)用_add_measures函數(shù)存儲到ceph中
def set_values(self, values, before_truncate_callback=None,
ignore_too_old_timestamps=False):
# NOTE: values must be sorted when passed in.
if self.block_size is not None and not self.ts.empty:
first_block_timestamp = self.first_block_timestamp()
if ignore_too_old_timestamps:
for index, (timestamp, value) in enumerate(values):
if timestamp >= first_block_timestamp:
values = values[index:]
break
else:
values = []
else:
# Check that the smallest timestamp does not go too much back
# in time.
smallest_timestamp = values[0][0]
if smallest_timestamp < first_block_timestamp:
raise NoDeloreanAvailable(first_block_timestamp,
smallest_timestamp)
super(BoundTimeSerie, self).set_values(values)
if before_truncate_callback:
before_truncate_callback(self)
self._truncate()
(Pdb) values
[(Timestamp('2020-12-25 08:03:34.551648'), 0.0), (Timestamp('2020-12-25 08:08:34.615949'), 0.0), (Timestamp('2020-12-25 08:13:34.586165'), 0.0)]
def _add_measures(self, aggregation, archive_policy_def,
metric, grouped_serie,
previous_oldest_mutable_timestamp,
oldest_mutable_timestamp):
ts = carbonara.AggregatedTimeSerie.from_grouped_serie(
grouped_serie, archive_policy_def.granularity,
aggregation, max_size=archive_policy_def.points)
# Don't do anything if the timeserie is empty
if not ts:
return
# We only need to check for rewrite if driver is not in WRITE_FULL mode
# and if we already stored splits once
need_rewrite = (
not self.WRITE_FULL
and previous_oldest_mutable_timestamp is not None
)
if archive_policy_def.timespan or need_rewrite:
existing_keys = self._list_split_keys_for_metric(
metric, aggregation, archive_policy_def.granularity)
# First delete old splits
if archive_policy_def.timespan:
oldest_point_to_keep = ts.last - datetime.timedelta(
seconds=archive_policy_def.timespan)
oldest_key_to_keep = ts.get_split_key(oldest_point_to_keep)
oldest_key_to_keep_s = str(oldest_key_to_keep)
for key in list(existing_keys):
# NOTE(jd) Only delete if the key is strictly inferior to
# the timestamp; we don't delete any timeserie split that
# contains our timestamp, so we prefer to keep a bit more
# than deleting too much
if key < oldest_key_to_keep_s:
self._delete_metric_measures(
metric, key, aggregation,
archive_policy_def.granularity)
existing_keys.remove(key)
else:
oldest_key_to_keep = carbonara.SplitKey(0, 0)
# Rewrite all read-only splits just for fun (and compression). This
# only happens if `previous_oldest_mutable_timestamp' exists, which
# means we already wrote some splits at some point – so this is not the
# first time we treat this timeserie.
if need_rewrite:
previous_oldest_mutable_key = str(ts.get_split_key(
previous_oldest_mutable_timestamp))
oldest_mutable_key = str(ts.get_split_key(
oldest_mutable_timestamp))
if previous_oldest_mutable_key != oldest_mutable_key:
for key in existing_keys:
if previous_oldest_mutable_key <= key < oldest_mutable_key:
LOG.debug(
"Compressing previous split %s (%s) for metric %s",
key, aggregation, metric)
# NOTE(jd) Rewrite it entirely for fun (and later for
# compression). For that, we just pass None as split.
self._store_timeserie_split(
metric, carbonara.SplitKey(
float(key), archive_policy_def.granularity),
None, aggregation, archive_policy_def,
oldest_mutable_timestamp)
for key, split in ts.split():
if key >= oldest_key_to_keep:
LOG.debug(
"Storing split %s (%s) for metric %s",
key, aggregation, metric)
self._store_timeserie_split(
metric, key, split, aggregation, archive_policy_def,
oldest_mutable_timestamp)
(Pdb) aggregation
u'max'
(Pdb) archive_policy_def
{'points': 100, 'granularity': 900.0, 'timespan': 90000.0}
(Pdb) metric
<Metric 11065ce7-1baf-425d-916a-4ff8e37423d7>
(Pdb) grouped_serie
<gnocchi.carbonara.GroupedTimeSeries object at 0x7fc3cc492490>
(Pdb) grouped_serie.__dict__
{'tstamps': array([ 1.60888320e+18]), 'counts': array([3]), '_ts': 2020-12-25 08:03:34.551648 0.0
2020-12-25 08:08:34.615949 0.0
2020-12-25 08:13:34.586165 0.0
dtype: float64, 'indexes': array([ 1.60888320e+18, 1.60888320e+18, 1.60888320e+18])}
(Pdb) 2020-12-25 16:53:57.033 72 WARNING gnocchi.cli [-] Metric processing lagging scheduling rate. It is recommended to increase the number of workers or to lengthen processing interval.
previous_oldest_mutable_timestamp
Timestamp('2020-12-25 00:00:00')
(Pdb) oldest_mutable_timestamp
Timestamp('2020-12-25 00:00:00')
(Pdb) ts.__dict__
{'aggregation_method': u'max', 'ts': 2020-12-25 08:00:00 0.0
dtype: float64, 'max_size': 100, 'sampling': 900.0}
(Pdb) existing_keys
set([u'1607040000.0'])
(Pdb) oldest_point_to_keep
Timestamp('2020-12-24 07:00:00')
(Pdb) oldest_key_to_keep
<SplitKey: 1607040000.0 / 900.000000s>
(Pdb) oldest_key_to_keep_s
'1607040000.0
(Pdb) previous_oldest_mutable_key
'1607040000.0'
(Pdb) oldest_mutable_key
'1607040000.0'