Gnocchi processing流程走讀+關(guān)鍵字段debug打印

簡介

processing是gnocchi處理數(shù)據(jù)的核心流程。

程序入口:

class MetricProcessor(MetricProcessBase):
    name = "processing"

    def __init__(self, worker_id, conf, queue):
        super(MetricProcessor, self).__init__(worker_id, conf, 0)
        self.queue = queue

    def _run_job(self):
        try:
            try:
                metrics = self.queue.get(block=True, timeout=10)
            except six.moves.queue.Empty:
                # NOTE(sileht): Allow the process to exit gracefully every
                # 10 seconds
                return
            self.store.process_background_tasks(self.index, metrics)
        except Exception:
            LOG.error("Unexpected error during measures processing",
                      exc_info=True)

解析:
1.metrics是從多進(jìn)程隊(duì)列queue中獲取metrics值煮嫌,metrics列表是metrics id列表

(Pdb) metrics
[u'a08ca3ac-d834-4bf6-b89d-1f8576976287', u'd4bd1d46-8df2-4436-8b39-fba6e732b2b5', u'11065ce7-1baf-425d-916a-4ff8e37423d7', u'9c8310fe-3110-49fe-85c5-49d91c283dcc']

gnocchi.storage.StorageDriver.process_background_tasks

    def process_background_tasks(self, index, metrics, sync=False):
        """Process background tasks for this storage.

        This calls :func:`process_new_measures` to process new measures

        :param index: An indexer to be used for querying metrics
        :param block_size: number of metrics to process
        :param sync: If True, then process everything synchronously and raise
                     on error
        :type sync: bool
        """
        LOG.debug("Processing new measures")
        try:
            self.process_new_measures(index, metrics, sync)
        except Exception:
            if sync:
                raise
            LOG.error("Unexpected error during measures processing",
                      exc_info=True)

gnocchi.storage._carbonara.CarbonaraBasedStorage.process_new_measures

    def process_new_measures(self, indexer, metrics_to_process,
                             sync=False):
        metrics = indexer.list_metrics(ids=metrics_to_process)
        # This build the list of deleted metrics, i.e. the metrics we have
        # measures to process for but that are not in the indexer anymore.
        deleted_metrics_id = (set(map(uuid.UUID, metrics_to_process))
                              - set(m.id for m in metrics))
        for metric_id in deleted_metrics_id:
            # NOTE(jd): We need to lock the metric otherwise we might delete
            # measures that another worker might be processing. Deleting
            # measurement files under its feet is not nice!
            try:
                with self._lock(metric_id)(blocking=sync):
                    self.incoming.delete_unprocessed_measures_for_metric_id(
                        metric_id)
            except coordination.LockAcquireFailed:
                LOG.debug("Cannot acquire lock for metric %s, postponing "
                          "unprocessed measures deletion", metric_id)

        for metric in metrics:
            lock = self._lock(metric.id)
            # Do not block if we cannot acquire the lock, that means some other
            # worker is doing the job. We'll just ignore this metric and may
            # get back later to it if needed.
            if not lock.acquire(blocking=sync):
                continue
            try:
                locksw = timeutils.StopWatch().start()
                LOG.debug("Processing measures for %s", metric)
                with self.incoming.process_measure_for_metric(metric) \
                        as measures:
                    self._compute_and_store_timeseries(metric, measures)
                LOG.debug("Metric %s locked during %.2f seconds",
                          metric.id, locksw.elapsed())
            except Exception:
                LOG.debug("Metric %s locked during %.2f seconds",
                          metric.id, locksw.elapsed())
                if sync:
                    raise
                LOG.error("Error processing new measures", exc_info=True)
            finally:
                lock.release()

解析:
1.根據(jù)metrics_id查找到metrics對象氨距,將被刪除的metrics從redis中刪除掉。
2.self.incoming.process_measure_for_metric(metric) 這個(gè)函數(shù)從redis中獲取measures對象竞端,是一個(gè)帶時(shí)間戳和value的變量的對象[(Timestamp('2020-12-25 08:03:34.551648'), 0.0)]

(Pdb) metrics
[<Metric 11065ce7-1baf-425d-916a-4ff8e37423d7>, <Metric 9c8310fe-3110-49fe-85c5-49d91c283dcc>, <Metric a08ca3ac-d834-4bf6-b89d-1f8576976287>, <Metric d4bd1d46-8df2-4436-8b39-fba6e732b2b5>]
(Pdb) metrics[0].__dict__
{'status': u'active', '_sa_instance_state': <sqlalchemy.orm.state.InstanceState object at 0x7fc3cc482a50>, 'name': u'disk.read.requests.rate', 'creator': u'4c90cbb7dfaa47798790c3cd411e4575:7e88c2ceced8412dab3d80ed131cf53a', 'resource_id': UUID('be25a992-e971-488c-a4c9-64bd6f6bb753'), 'archive_policy': <gnocchi.indexer.sqlalchemy_base.ArchivePolicy object at 0x7fc3cc482bd0>, 'archive_policy_name': u'frequency_300s', 'id': UUID('11065ce7-1baf-425d-916a-4ff8e37423d7'), 'unit': u'request/s'}
(Pdb) measures
[(Timestamp('2020-12-25 08:03:34.551648'), 0.0), (Timestamp('2020-12-25 08:08:34.615949'), 0.0), (Timestamp('2020-12-25 08:13:34.586165'), 0.0)]

gnocchi.storage._carbonara.CarbonaraBasedStorage._compute_and_store_timeseries

    def _compute_and_store_timeseries(self, metric, measures):
        # NOTE(mnaser): The metric could have been handled by
        #               another worker, ignore if no measures.
        if len(measures) == 0:
            LOG.debug("Skipping %s (already processed)", metric)
            return

        measures = sorted(measures, key=operator.itemgetter(0))

        agg_methods = list(metric.archive_policy.aggregation_methods)
        block_size = metric.archive_policy.max_block_size
        back_window = metric.archive_policy.back_window
        definition = metric.archive_policy.definition

        try:
            ts = self._get_unaggregated_timeserie_and_unserialize(
                metric, block_size=block_size, back_window=back_window)
        except storage.MetricDoesNotExist:
            try:
                self._create_metric(metric)
            except storage.MetricAlreadyExists:
                # Created in the mean time, do not worry
                pass
            ts = None
        except CorruptionError as e:
            LOG.error(e)
            ts = None

        if ts is None:
            # This is the first time we treat measures for this
            # metric, or data are corrupted, create a new one
            ts = carbonara.BoundTimeSerie(block_size=block_size,
                                          back_window=back_window)
            current_first_block_timestamp = None
        else:
            current_first_block_timestamp = ts.first_block_timestamp()

        # NOTE(jd) This is Python where you need such
        # hack to pass a variable around a closure,
        # sorry.
        computed_points = {"number": 0}

        def _map_add_measures(bound_timeserie):
            # NOTE (gordc): bound_timeserie is entire set of
            # unaggregated measures matching largest
            # granularity. the following takes only the points
            # affected by new measures for specific granularity
            tstamp = max(bound_timeserie.first, measures[0][0])
            new_first_block_timestamp = bound_timeserie.first_block_timestamp()
            computed_points['number'] = len(bound_timeserie)
            for d in definition:
                ts = bound_timeserie.group_serie(
                    d.granularity, carbonara.round_timestamp(
                        tstamp, d.granularity * 10e8))

                self._map_in_thread(
                    self._add_measures,
                    ((aggregation, d, metric, ts,
                        current_first_block_timestamp,
                        new_first_block_timestamp)
                        for aggregation in agg_methods))

        with timeutils.StopWatch() as sw:
            ts.set_values(measures,
                          before_truncate_callback=_map_add_measures,
                          ignore_too_old_timestamps=True)

            elapsed = sw.elapsed()
            number_of_operations = (len(agg_methods) * len(definition))
            perf = ""
            if elapsed > 0:
                perf = " (%d points/s, %d measures/s)" % (
                    ((number_of_operations * computed_points['number']) /
                        elapsed),
                    ((number_of_operations * len(measures)) / elapsed)
                )
            LOG.debug("Computed new metric %s with %d new measures "
                      "in %.2f seconds%s",
                      metric.id, len(measures), elapsed, perf)

        self._store_unaggregated_timeserie(metric, ts.serialize())

解析:
1.在Gnocchi中有三層數(shù)據(jù)川蒙,resources -> metric -> measure
resoures為資源,每個(gè)resource下有很多的計(jì)量項(xiàng)metric蹈矮。計(jì)量項(xiàng)下面有具體的采樣數(shù)據(jù)
2.ts包含了之前存存儲在ceph里面存儲的(時(shí)間戳砰逻,value值),采樣點(diǎn)泛鸟,粒度蝠咆,時(shí)間間隔等信息。measure里面存放著的當(dāng)前的需要?dú)w檔的數(shù)據(jù)北滥。

(Pdb) metric.__dict__
{'status': u'active', '_sa_instance_state': <sqlalchemy.orm.state.InstanceState object at 0x7fc3cc482a50>, 'name': u'disk.read.requests.rate', 'creator': u'4c90cbb7dfaa47798790c3cd411e4575:7e88c2ceced8412dab3d80ed131cf53a', 'resource_id': UUID('be25a992-e971-488c-a4c9-64bd6f6bb753'), 'archive_policy': <gnocchi.indexer.sqlalchemy_base.ArchivePolicy object at 0x7fc3cc482bd0>, 'archive_policy_name': u'frequency_300s', 'id': UUID('11065ce7-1baf-425d-916a-4ff8e37423d7'), 'unit': u'request/s'}
(Pdb) measures
[(Timestamp('2020-12-25 08:03:34.551648'), 0.0), (Timestamp('2020-12-25 08:08:34.615949'), 0.0), (Timestamp('2020-12-25 08:13:34.586165'), 0.0)]
(Pdb) agg_methods
[u'max', u'mean', u'min']
(Pdb) block_size
86400.0
(Pdb) back_window
0
(Pdb) definition
[{'points': 300, 'granularity': 300.0, 'timespan': 90000.0}, {'points': 100, 'granularity': 900.0, 'timespan': 90000.0}, {'points': 100, 'granularity': 7200.0, 'timespan': 720000.0}, {'points': 200, 'granularity': 86400.0, 'timespan': 17280000.0}]
(Pdb) ts.__dict__
{'back_window': 0, 'block_size': <86400000000000 * Nanos>, 'ts': 2020-12-25 03:59:50.191696     9.739510
2020-12-25 04:04:50.157824     0.120014
2020-12-25 04:09:50.118074     0.000000
2020-12-25 04:14:50.238515     0.000000
2020-12-25 04:19:50.091682     0.000000
2020-12-25 04:24:50.074567     0.000000
2020-12-25 04:29:50.144667     0.046656
2020-12-25 04:34:50.078793     0.000000
2020-12-25 04:39:50.125287     0.000000
2020-12-25 04:44:50.285588     0.000000
2020-12-25 04:49:50.180779     0.003334
2020-12-25 04:54:50.130977     0.000000
2020-12-25 04:59:50.134970     0.000000
2020-12-25 05:04:50.273585     0.000000
2020-12-25 05:09:50.102544     0.003335
2020-12-25 05:14:50.192206     0.000000
2020-12-25 05:19:50.158147     0.000000
2020-12-25 05:24:50.157283     0.000000
2020-12-25 05:29:50.099026     0.000000
2020-12-25 05:34:50.141446     0.000000
2020-12-25 05:39:50.280643     0.000000
2020-12-25 05:43:34.498113     0.000000
2020-12-25 05:48:34.481620     0.000000
2020-12-25 05:53:34.586508     0.000000
2020-12-25 05:58:34.522015     0.000000
2020-12-25 06:03:34.555708     0.000000
2020-12-25 06:08:34.530322     0.000000
2020-12-25 06:13:34.681981     0.000000
2020-12-25 06:18:34.551089     0.000000
2020-12-25 06:23:34.500688     0.000000
2020-12-25 06:28:34.542119     0.000000
2020-12-25 06:33:34.508253     0.000000
2020-12-25 06:39:50.190674    18.369239
2020-12-25 06:44:50.328215     0.663029
2020-12-25 06:48:34.701798     0.000000
2020-12-25 06:53:34.569040     0.000000
2020-12-25 06:58:34.538735     0.000000
2020-12-25 07:03:34.554803     0.106661
2020-12-25 07:08:34.622109     0.000000
2020-12-25 07:13:34.498547     0.000000
2020-12-25 07:18:34.508618     0.000000
2020-12-25 07:23:34.799742     0.000000
2020-12-25 07:28:34.562884     0.000000
2020-12-25 07:33:34.649724     0.000000
2020-12-25 07:38:34.593196     0.000000
2020-12-25 07:43:34.584885     0.000000
2020-12-25 07:48:34.524464     0.000000
2020-12-25 07:53:34.682062     0.000000
2020-12-25 07:58:34.659174     0.000000
dtype: float64}
(Pdb) current_first_block_timestamp
Timestamp('2020-12-25 00:00:00')
_map_add_measures函數(shù)中變量:
(Pdb) tstamp
Timestamp('2020-12-25 08:03:34.551648')
(Pdb) new_first_block_timestamp
Timestamp('2020-12-25 00:00:00')
(Pdb) definition
[{'points': 300, 'granularity': 300.0, 'timespan': 90000.0}, {'points': 100, 'granularity': 900.0, 'timespan': 90000.0}, {'points': 100, 'granularity': 7200.0, 'timespan': 720000.0}, {'points': 200, 'granularity': 86400.0, 'timespan': 17280000.0}]
(Pdb) ts
<gnocchi.carbonara.GroupedTimeSeries object at 0x7fc3cc482150>
(Pdb) ts.__dict__
{'tstamps': array([  1.60888320e+18,   1.60888350e+18,   1.60888380e+18]), 'counts': array([1, 1, 1]), '_ts': 2020-12-25 08:03:34.551648    0.0
2020-12-25 08:08:34.615949    0.0
2020-12-25 08:13:34.586165    0.0
dtype: float64, 'indexes': array([  1.60888320e+18,   1.60888350e+18,   1.60888380e+18])

gnocchi.storage._carbonara.CarbonaraBasedStorage._get_unaggregated_timeserie_and_unserialize
解析:

  1. raw_measures是從ceph中取出的值刚操,經(jīng)過lz4算法壓縮過的數(shù)據(jù),里面一部分存時(shí)間戳再芋,一部分存value值菊霜。
  2. carbonara.BoundTimeSerie.unserialize函數(shù)將這個(gè)壓縮過的數(shù)據(jù),反序列話處理济赎。
    def _get_unaggregated_timeserie_and_unserialize(
            self, metric, block_size, back_window):
        """Retrieve unaggregated timeserie for a metric and unserialize it.

        Returns a gnocchi.carbonara.BoundTimeSerie object. If the data cannot
        be retrieved, returns None.

        """
        with timeutils.StopWatch() as sw:
            raw_measures = (
                self._get_unaggregated_timeserie(
                    metric)
            )
            LOG.debug(
                "Retrieve unaggregated measures "
                "for %s in %.2fs",
                metric.id, sw.elapsed())
        try:
            return carbonara.BoundTimeSerie.unserialize(
                raw_measures, block_size, back_window)
        except ValueError:
            raise CorruptionError(
                "Data corruption detected for %s "
                "unaggregated timeserie" % metric.id)
(Pdb) raw_measures
'\x10\x03\x00\x00\xf1\x04\x80\xa8\x96\xf9\xa3\xd9S\x16\x80\xdf_\xd7E\x00\x00\x00\x90.\x06\x08\x00\xf1=\xa8\x80\x92\xe0E\x00\x00\x00\x989\xa4\xd0E\x00\x00\x00\x88\x90_\xd8E\x00\x00\x00 \\\x92\xddE\x00\x00\x00\xb0\x8fw\xd5E\x00\x00\x000)*\xdcE\x00\x00\x00\xc8\xb7\xf2\xe2E\x00\x00\x00\xd8u%\xd3E\x00\x00\x00\xf0\xccl\xd6E\x00\x00\x00\xa8\xa5\xa1\xd9\x18\x00\xb1\xd0\xa7\xe1E\x00\x00\x00\x18\xd72\xcf8\x00\xa1\xda\xbc\xdeE\x00\x00\x00\x08\x05]p\x00A\x00\x89W\xd9p\x00!\xc9\xebX\x00\xb1 \xff\xeb\xdbE\x00\x00\x00H\xb2\xb08\x00\xb10\x14g44\x00\x00\x008\x0ei\x88\x00\xb1\xc0.\xa5\xdfE\x00\x00\x008\xa2\x8c0\x001H\xd5f0\x001\xf0[\xe1P\x001\xf8\xd9n\x90\x00A w\x97\xd1x\x001\xa9c\xd6\x88\x00"\xe7\xdc(\x00\x12\xf6\xf0\x00\xb1\x08\xd1jxW\x00\x00\x00\x88m\x97h\x00\xb2\x98,\xb5=4\x00\x00\x00\x10\xfez8\x00!M\x96P\x00\xb1\xa0\xe5Y\xdaE\x00\x00\x00\x10\xbag\x00\x01A\xf0O\x07\xd2P\x00!c\xfe\xb8\x00\xf1\x04 \xeb\xbe\xeaE\x00\x00\x00p\x8dF\xcbE\x00\x00\x00\xc0\xca\x91\xe0\x001\x80+\x06x\x001(\xe7\xe5\xb8\x001x\xc4\xca\xb0\x0010y\xc9\xa0\x001\xc0y\x07\x18\x00\xff\x02\xcdfX\x1c\xa1z#@\xf0m\xbeA5\xb9\xbe?\x00\x01\x00\x0c\x7f\x07\xfc\xb5\xafC\xe3\xa7(\x00\x06\x7fu\xe2\x9c"\xf3Pk \x00\x06o\xf51\x88\x96~R \x00\x07\x0f\x02\x00]\xff\x00#\xe2qj\x86^2@\xbbo\x1bV\x897\xe5\xb8\x00\x06\x7f\xabL\xc9\xdc!N\xbb \x00\x06\x0f\x02\x00(P\x00\x00\x00\x00\x00'

gnocchi.storage.ceph.CephStorage._get_unaggregated_timeserie
解析:

  1. build_unaggregated_timeserie_path函數(shù)拼接一個(gè)路徑的值:
    格式gnocchi
    <$metric.id>_none_v3
  2. 根據(jù)這個(gè)路徑去ceph里面去ioctx.read讀取壓縮過的數(shù)據(jù)
    def _get_unaggregated_timeserie(self, metric, version=3):
        try:
            return self._get_object_content(
                self._build_unaggregated_timeserie_path(metric, version))
        except rados.ObjectNotFound:
            raise storage.MetricDoesNotExist(metric)
    @staticmethod
    def _build_unaggregated_timeserie_path(metric, version):
        return (('gnocchi_%s_none' % metric.id)
                + ("_v%s" % version if version else ""))

gnocchi.storage.ceph.CephStorage._get_object_content

    def _get_object_content(self, name):
        offset = 0
        content = b''
        while True:
            data = self.ioctx.read(name, offset=offset)
            if not data:
                break
            content += data
            offset += len(data)
        return content

gnocchi.carbonara.BoundTimeSerie.unserialize
解析:
1.lz4解壓處理鉴逞,篩選出時(shí)間戳和value值
2.可以看出時(shí)間戳是之前所有的時(shí)間戳

@classmethod
    def unserialize(cls, data, block_size, back_window):
        uncompressed = lz4.loads(data)
        nb_points = (
            len(uncompressed) // cls._SERIALIZATION_TIMESTAMP_VALUE_LEN
        )
        timestamps_raw = uncompressed[
            :nb_points*cls._SERIALIZATION_TIMESTAMP_LEN]
        timestamps = numpy.frombuffer(timestamps_raw, dtype='<Q')
        timestamps = numpy.cumsum(timestamps)
        timestamps = numpy.array(timestamps, dtype='datetime64[ns]')

        values_raw = uncompressed[nb_points*cls._SERIALIZATION_TIMESTAMP_LEN:]
        values = numpy.frombuffer(values_raw, dtype='<d')

        return cls.from_data(
            pandas.to_datetime(timestamps),
            values,
            block_size=block_size,
            back_window=back_window)
(Pdb) uncompressed
'\x80\xa8\x96\xf9\xa3\xd9S\x16\x80\xdf_\xd7E\x00\x00\x00\x90.\x06\xd7E\x00\x00\x00\xa8\x80\x92\xe0E\x00\x00\x00\x989\xa4\xd0E\x00\x00\x00\x88\x90_\xd8E\x00\x00\x00 \\\x92\xddE\x00\x00\x00\xb0\x8fw\xd5E\x00\x00\x000)*\xdcE\x00\x00\x00\xc8\xb7\xf2\xe2E\x00\x00\x00\xd8u%\xd3E\x00\x00\x00\xf0\xccl\xd6E\x00\x00\x00\xa8\xa5\xa1\xd9E\x00\x00\x00\xd8\xd0\xa7\xe1E\x00\x00\x00\x18\xd72\xcfE\x00\x00\x000\xda\xbc\xdeE\x00\x00\x00\x08\x05]\xd7E\x00\x00\x00\x00\x89W\xd9E\x00\x00\x00\x98\xc9\xeb\xd5E\x00\x00\x00 \xff\xeb\xdbE\x00\x00\x00H\xb2\xb0\xe1E\x00\x00\x000\x14g44\x00\x00\x008\x0ei\xd8E\x00\x00\x00\xc0.\xa5\xdfE\x00\x00\x008\xa2\x8c\xd5E\x00\x00\x00H\xd5f\xdbE\x00\x00\x00\xf0[\xe1\xd7E\x00\x00\x00\xf8\xd9n\xe2E\x00\x00\x00 w\x97\xd1E\x00\x00\x00\x18\xa9c\xd6E\x00\x00\x00\xd8\xe7\xdc\xdbE\x00\x00\x00\xf0\xf6_\xd7E\x00\x00\x00\x08\xd1jxW\x00\x00\x00\x88m\x97\xe1E\x00\x00\x00\x98,\xb5=4\x00\x00\x00\x10\xfez\xd1E\x00\x00\x00\x18M\x96\xd7E\x00\x00\x00\xa0\xe5Y\xdaE\x00\x00\x00\x10\xbag\xddE\x00\x00\x00\xf0O\x07\xd2E\x00\x00\x00\xd8c\xfe\xd9E\x00\x00\x00 \xeb\xbe\xeaE\x00\x00\x00p\x8dF\xcbE\x00\x00\x00\xc0\xca\x91\xdeE\x00\x00\x00\x80+\x06\xd6E\x00\x00\x00(\xe7\xe5\xd8E\x00\x00\x00x\xc4\xca\xd5E\x00\x00\x000y\xc9\xe2E\x00\x00\x00\xc0y\x07\xd8E\x00\x00\x00\xcdfX\x1c\xa1z#@\xf0m\xbeA5\xb9\xbe?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07\xfc\xb5\xafC\xe3\xa7?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00u\xe2\x9c"\xf3Pk?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf51\x88\x96~Rk?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00#\xe2qj\x86^2@\xbbo\x1bV\x897\xe5?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xabL\xc9\xdc!N\xbb?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
(Pdb) cls._SERIALIZATION_TIMESTAMP_VALUE_LEN
16
(Pdb) timestamps
array(['2020-12-25T03:59:50.191696000', '2020-12-25T04:04:50.157824000',
       '2020-12-25T04:09:50.118074000', '2020-12-25T04:14:50.238515000',
       '2020-12-25T04:19:50.091682000', '2020-12-25T04:24:50.074567000',
       '2020-12-25T04:29:50.144667000', '2020-12-25T04:34:50.078793000',
       '2020-12-25T04:39:50.125287000', '2020-12-25T04:44:50.285588000',
       '2020-12-25T04:49:50.180779000', '2020-12-25T04:54:50.130977000',
       '2020-12-25T04:59:50.134970000', '2020-12-25T05:04:50.273585000',
       '2020-12-25T05:09:50.102544000', '2020-12-25T05:14:50.192206000',
       '2020-12-25T05:19:50.158147000', '2020-12-25T05:24:50.157283000',
       '2020-12-25T05:29:50.099026000', '2020-12-25T05:34:50.141446000',
       '2020-12-25T05:39:50.280643000', '2020-12-25T05:43:34.498113000',
       '2020-12-25T05:48:34.481620000', '2020-12-25T05:53:34.586508000',
       '2020-12-25T05:58:34.522015000', '2020-12-25T06:03:34.555708000',
       '2020-12-25T06:08:34.530322000', '2020-12-25T06:13:34.681981000',
       '2020-12-25T06:18:34.551089000', '2020-12-25T06:23:34.500688000',
       '2020-12-25T06:28:34.542119000', '2020-12-25T06:33:34.508253000',
       '2020-12-25T06:39:50.190674000', '2020-12-25T06:44:50.328215000',
       '2020-12-25T06:48:34.701798000', '2020-12-25T06:53:34.569040000',
       '2020-12-25T06:58:34.538735000', '2020-12-25T07:03:34.554803000',
       '2020-12-25T07:08:34.622109000', '2020-12-25T07:13:34.498547000',
       '2020-12-25T07:18:34.508618000', '2020-12-25T07:23:34.799742000',
       '2020-12-25T07:28:34.562884000', '2020-12-25T07:33:34.649724000',
       '2020-12-25T07:38:34.593196000', '2020-12-25T07:43:34.584885000',
       '2020-12-25T07:48:34.524464000', '2020-12-25T07:53:34.682062000',
       '2020-12-25T07:58:34.659174000'], dtype='datetime64[ns]')
(Pdb) values_raw
'\xcdfX\x1c\xa1z#@\xf0m\xbeA5\xb9\xbe?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07\xfc\xb5\xafC\xe3\xa7?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00u\xe2\x9c"\xf3Pk?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf51\x88\x96~Rk?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00#\xe2qj\x86^2@\xbbo\x1bV\x897\xe5?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xabL\xc9\xdc!N\xbb?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
(Pdb) values
array([  9.73951043e+00,   1.20013550e-01,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         4.66557648e-02,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   3.33449828e-03,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   3.33523487e-03,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   1.83692385e+01,
         6.63029354e-01,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   1.06660954e-01,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00])

解析:
1.set_values函數(shù),將redis中的measure值調(diào)用_add_measures函數(shù)存儲到ceph中

    def set_values(self, values, before_truncate_callback=None,
                   ignore_too_old_timestamps=False):
        # NOTE: values must be sorted when passed in.
        if self.block_size is not None and not self.ts.empty:
            first_block_timestamp = self.first_block_timestamp()
            if ignore_too_old_timestamps:
                for index, (timestamp, value) in enumerate(values):
                    if timestamp >= first_block_timestamp:
                        values = values[index:]
                        break
                else:
                    values = []
            else:
                # Check that the smallest timestamp does not go too much back
                # in time.
                smallest_timestamp = values[0][0]
                if smallest_timestamp < first_block_timestamp:
                    raise NoDeloreanAvailable(first_block_timestamp,
                                              smallest_timestamp)
        super(BoundTimeSerie, self).set_values(values)
        if before_truncate_callback:
            before_truncate_callback(self)
        self._truncate()
(Pdb) values
[(Timestamp('2020-12-25 08:03:34.551648'), 0.0), (Timestamp('2020-12-25 08:08:34.615949'), 0.0), (Timestamp('2020-12-25 08:13:34.586165'), 0.0)]
    def _add_measures(self, aggregation, archive_policy_def,
                      metric, grouped_serie,
                      previous_oldest_mutable_timestamp,
                      oldest_mutable_timestamp):
        ts = carbonara.AggregatedTimeSerie.from_grouped_serie(
            grouped_serie, archive_policy_def.granularity,
            aggregation, max_size=archive_policy_def.points)

        # Don't do anything if the timeserie is empty
        if not ts:
            return

        # We only need to check for rewrite if driver is not in WRITE_FULL mode
        # and if we already stored splits once
        need_rewrite = (
            not self.WRITE_FULL
            and previous_oldest_mutable_timestamp is not None
        )

        if archive_policy_def.timespan or need_rewrite:
            existing_keys = self._list_split_keys_for_metric(
                metric, aggregation, archive_policy_def.granularity)

        # First delete old splits
        if archive_policy_def.timespan:
            oldest_point_to_keep = ts.last - datetime.timedelta(
                seconds=archive_policy_def.timespan)
            oldest_key_to_keep = ts.get_split_key(oldest_point_to_keep)
            oldest_key_to_keep_s = str(oldest_key_to_keep)
            for key in list(existing_keys):
                # NOTE(jd) Only delete if the key is strictly inferior to
                # the timestamp; we don't delete any timeserie split that
                # contains our timestamp, so we prefer to keep a bit more
                # than deleting too much
                if key < oldest_key_to_keep_s:
                    self._delete_metric_measures(
                        metric, key, aggregation,
                        archive_policy_def.granularity)
                    existing_keys.remove(key)
        else:
            oldest_key_to_keep = carbonara.SplitKey(0, 0)

        # Rewrite all read-only splits just for fun (and compression). This
        # only happens if `previous_oldest_mutable_timestamp' exists, which
        # means we already wrote some splits at some point – so this is not the
        # first time we treat this timeserie.
        if need_rewrite:
            previous_oldest_mutable_key = str(ts.get_split_key(
                previous_oldest_mutable_timestamp))
            oldest_mutable_key = str(ts.get_split_key(
                oldest_mutable_timestamp))

            if previous_oldest_mutable_key != oldest_mutable_key:
                for key in existing_keys:
                    if previous_oldest_mutable_key <= key < oldest_mutable_key:
                        LOG.debug(
                            "Compressing previous split %s (%s) for metric %s",
                            key, aggregation, metric)
                        # NOTE(jd) Rewrite it entirely for fun (and later for
                        # compression). For that, we just pass None as split.
                        self._store_timeserie_split(
                            metric, carbonara.SplitKey(
                                float(key), archive_policy_def.granularity),
                            None, aggregation, archive_policy_def,
                            oldest_mutable_timestamp)

        for key, split in ts.split():
            if key >= oldest_key_to_keep:
                LOG.debug(
                    "Storing split %s (%s) for metric %s",
                    key, aggregation, metric)
                self._store_timeserie_split(
                    metric, key, split, aggregation, archive_policy_def,
                    oldest_mutable_timestamp)
(Pdb) aggregation
u'max'
(Pdb) archive_policy_def
{'points': 100, 'granularity': 900.0, 'timespan': 90000.0}
(Pdb) metric
<Metric 11065ce7-1baf-425d-916a-4ff8e37423d7>
(Pdb) grouped_serie
<gnocchi.carbonara.GroupedTimeSeries object at 0x7fc3cc492490>
(Pdb) grouped_serie.__dict__
{'tstamps': array([  1.60888320e+18]), 'counts': array([3]), '_ts': 2020-12-25 08:03:34.551648    0.0
2020-12-25 08:08:34.615949    0.0
2020-12-25 08:13:34.586165    0.0
dtype: float64, 'indexes': array([  1.60888320e+18,   1.60888320e+18,   1.60888320e+18])}
(Pdb) 2020-12-25 16:53:57.033 72 WARNING gnocchi.cli [-] Metric processing lagging scheduling rate. It is recommended to increase the number of workers or to lengthen processing interval.
previous_oldest_mutable_timestamp
Timestamp('2020-12-25 00:00:00')
(Pdb) oldest_mutable_timestamp
Timestamp('2020-12-25 00:00:00')
(Pdb) ts.__dict__
{'aggregation_method': u'max', 'ts': 2020-12-25 08:00:00    0.0
dtype: float64, 'max_size': 100, 'sampling': 900.0}
(Pdb) existing_keys
set([u'1607040000.0'])
(Pdb) oldest_point_to_keep
Timestamp('2020-12-24 07:00:00')
(Pdb) oldest_key_to_keep
<SplitKey: 1607040000.0 / 900.000000s>
(Pdb) oldest_key_to_keep_s
'1607040000.0
(Pdb) previous_oldest_mutable_key
'1607040000.0'
(Pdb) oldest_mutable_key
'1607040000.0'
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末司训,一起剝皮案震驚了整個(gè)濱河市构捡,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌壳猜,老刑警劉巖勾徽,帶你破解...
    沈念sama閱讀 212,454評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異统扳,居然都是意外死亡喘帚,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評論 3 385
  • 文/潘曉璐 我一進(jìn)店門咒钟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來啥辨,“玉大人,你說我怎么就攤上這事盯腌。” “怎么了陨瘩?”我有些...
    開封第一講書人閱讀 157,921評論 0 348
  • 文/不壞的土叔 我叫張陵腕够,是天一觀的道長级乍。 經(jīng)常有香客問我,道長帚湘,這世上最難降的妖魔是什么玫荣? 我笑而不...
    開封第一講書人閱讀 56,648評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮大诸,結(jié)果婚禮上捅厂,老公的妹妹穿的比我還像新娘。我一直安慰自己资柔,他們只是感情好焙贷,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,770評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著贿堰,像睡著了一般辙芍。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上羹与,一...
    開封第一講書人閱讀 49,950評論 1 291
  • 那天故硅,我揣著相機(jī)與錄音,去河邊找鬼纵搁。 笑死吃衅,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的腾誉。 我是一名探鬼主播徘层,決...
    沈念sama閱讀 39,090評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼妄辩!你這毒婦竟也來了惑灵?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,817評論 0 268
  • 序言:老撾萬榮一對情侶失蹤眼耀,失蹤者是張志新(化名)和其女友劉穎英支,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體哮伟,經(jīng)...
    沈念sama閱讀 44,275評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡干花,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,592評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了楞黄。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片池凄。...
    茶點(diǎn)故事閱讀 38,724評論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖鬼廓,靈堂內(nèi)的尸體忽然破棺而出肿仑,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 34,409評論 4 333
  • 正文 年R本政府宣布尤慰,位于F島的核電站馏锡,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏伟端。R本人自食惡果不足惜杯道,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,052評論 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望责蝠。 院中可真熱鬧党巾,春花似錦、人聲如沸霜医。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽支子。三九已至创肥,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間值朋,已是汗流浹背叹侄。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留昨登,地道東北人趾代。 一個(gè)月前我還...
    沈念sama閱讀 46,503評論 2 361
  • 正文 我出身青樓,卻偏偏與公主長得像丰辣,于是被迫代替她去往敵國和親撒强。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,627評論 2 350

推薦閱讀更多精彩內(nèi)容

  • 【編者按】劉斌笙什,OneAPM后端研發(fā)工程師飘哨,擁有10多年編程經(jīng)驗(yàn),參與過大型金融琐凭、通信以及Android手機(jī)操作系...
    OneAPM閱讀 476評論 0 3
  • 轉(zhuǎn)自http://www.ituring.com.cn/article/497377芽隆,讓大家感受一下什么叫優(yōu)秀的架...
    smooth00閱讀 817評論 0 2
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)统屈,斷路器胚吁,智...
    卡卡羅2017閱讀 134,637評論 18 139
  • 推薦指數(shù): 6.0 書籍主旨關(guān)鍵詞:特權(quán)、焦點(diǎn)愁憔、注意力腕扶、語言聯(lián)想、情景聯(lián)想 觀點(diǎn): 1.統(tǒng)計(jì)學(xué)現(xiàn)在叫數(shù)據(jù)分析吨掌,社會(huì)...
    Jenaral閱讀 5,705評論 0 5
  • 城空了,有樹長出來 我的城死了 鑄起它的人代虾,殺死它的人 不愿因?yàn)檫@件事而驕傲 一座城的終結(jié) 永遠(yuǎn)因?yàn)榻K結(jié)這件事而顯...
    于十六閱讀 2,852評論 6 17