讀取大量數據批量更新

1. 先看結果對比

  • 1.1 測試數據

測試租戶:znfangcadmin

經紀人數據量:199003

做的事情:將經紀人表的mobile_tel_encrypted更新為mobile_tel加密后的字符串破衔,原本mobile_tel_encrypted字段為空黑界。

  • 1.2 優(yōu)化前

CPU使用率和內存使用率:

image.png

可以看到CPU實用率基本在60%徘徊淹仑,內存是在11%徘徊。

耗時:更新工具記錄上一次是執(zhí)行時間:


image.png
  • 1.3優(yōu)化后

image.png

可以看到CPU實用率基本在16%徘徊杠娱,內存是在0.3徘徊。

耗時:測過3次,基本是3分鐘左右


image.png

2. 優(yōu)化思路

首先優(yōu)化前用的是yii2框架的each/batch獲取數據泉手,每次拿到100條數據后就處理更新(更新在foreach里面完成铣口,直接是賦值后$model->save())滤钱,看源碼用的是pdo的fetch方法,正常的話脑题,這個獲取數據是用游標的方式件缸,但是執(zhí)行測試過程中,發(fā)現cpu和內存居高不下叔遂。并且在中途還會報php內存用完的錯:
Allowed memory size of 1073741824 bytes exhausted (tried to allocate 82 bytes)
太變態(tài)了他炊,1g都用完了争剿。。佑稠。秒梅。什么代碼呀,說好的batch能節(jié)省內存的呢舌胶?這里頭肯定有哪個地方出差錯了捆蜀。剛開始覺得有可能是php版本太低(現在公司是php5.6 歷史原因,不會輕易升級的)幔嫂,后來記起來前公司同事之前也試過處理大量權限代碼的辆它,前公司用的是php7也是報內存耗盡。

下午要發(fā)版本履恩,不管了锰茉,先用mysqli的MYSQLI_USE_RESULT,這個之前試過切心,可行的飒筑。

于是優(yōu)化主要方向:

  1. 將pdo的fetch查詢改完mysqli的query(并采用MYSQLI_USE_RESULT);
  2. 一條一條更新改完批量更新绽昏。

于是優(yōu)化后的代碼:
這是自己寫的mysqli的一個trait

<?php
/**
 * Created by PhpStorm.
 * User: zhengxj
 * Date: 2018/12/24
 * Time: 15:23
 */

namespace common\traits;

trait MysqliTrait
{

    /**
     * 解析租戶庫連接字符串协屡,并創(chuàng)建數據庫連接對象返回
     * @return \mysqli
     */
    private function getOrgDbConn(){
        $result = $dbConnection; //你是數據庫連接
        //自己定義讀寫超時常量
        if (!defined('MYSQL_OPT_READ_TIMEOUT')) {
            define('MYSQL_OPT_READ_TIMEOUT',  11);
        }
        if (!defined('MYSQL_OPT_WRITE_TIMEOUT')) {
            define('MYSQL_OPT_WRITE_TIMEOUT', 12);
        }
        $mysqli = mysqli_init();
        $mysqli->options(MYSQL_OPT_READ_TIMEOUT, 10000);
        $mysqli->options(MYSQL_OPT_WRITE_TIMEOUT, 10000);

        //連接數據庫
        $port = isset($result['port'])?$result['port']:null;
        $mysqli->real_connect($result['server'], $result['uid'], $result['pwd'],$result['database'],$port);
        if ($mysqli->connect_errno) {
            return null;
        }
        $mysqli->set_charset("utf8");
        return $mysqli;
    }

    /**
     * 對每一行進行處理的函數,可被重寫
     * @param $row
     * @return array
     */
    public function dealRow(&$row)
    {
        return $row;
    }

    /**
     * 從mysql服務器讀取大量數據并處理數據
     * @param string $sql 執(zhí)行的sql
     * @param int $limit 每次處理多少條數據
     * @param callable $handleFunction 處理方法
     * @throws \Exception
     * @return int
     */
    public function handleData($sql, $limit = 500, callable $handleFunction)
    {
        $db = $this->getOrgDbConn();
        $total = 0;
        try {
            $records = [];
            //讀取大量的數據時使用 MYSQLI_USE_RESULT
            if ($result = $db->query($sql,MYSQLI_USE_RESULT)){
                while ($row = $result->fetch_assoc()) {
                    $total ++;
                    $records[] = $this->dealRow($row);
                    if(count($records) >= $limit){
                        call_user_func_array($handleFunction, [$records]);
                        unset($records);
                        $records = [];
                    }
                }
                if(count($records)> 0){
                    call_user_func_array($handleFunction, [$records]);
                }
                $result->free();
            }else{
                echo "mysql 查詢失斎:errno:".$db->errno.",error:".$db->error;
            }
            return $total;
        }  catch(\Exception $e){
            $db->close();
            throw $e;
        }
    }
}

controller代碼

use MysqliTrait; //使用trait
public function actionTest($action, $mobiles='')
{
            $sql = 'SELECT id,mobile,mobile_encrypted FROM `broker`';
            if($action == 'part') {
                $sql .= ' WHERE mobile != "" and mobile_encrypted  = ""';
            } elseif ($action == 'mobile') {
                if(empty($mobiles)) {
                    die('Error mobiles !');
                }
                $mobilesStr = '("'. implode(',"', $mobiles) .'")';
                $sql .= ' WHERE mobile IN '.$mobilesStr;
            }
            echo '開始處理時間: ' . date('Y-m-d H:i:s', time()) . PHP_EOL;
            $db = BrokerEntity::getDb();
            $logger = $this->logger;
            //回調函數
            $function = function ($updateData) use ($db, $logger) {
                if(empty($updateData)) {
                    return;
                }
                $updateSql = 'Update `broker` set `mobile_encrypted` =  CASE `id`';
                foreach ($updateData as $item) {
                    $updateSql .= " WHEN '{$item['id']}' THEN '{$item['mobile_encrypted']}'";
                }
                $updateSql .= ' END WHERE `id` IN ("'.implode('","', array_column($updateData, 'id')).'")';
                try {
                    $db->createCommand($updateSql)->execute();
                } catch (\Exception $e) {
                    $logger->error('update error:'.$e->getMessage());
                }
            };
            $total = $this->handleData($sql, 1000, $function); //此方法就是用trait的handleData代碼
            echo '完成處理時間: ' . date('Y-m-d H:i:s', time()) . PHP_EOL;
}
  /**
     * 對每一行進行處理的函數肤晓,可被重寫
     * @param $row
     * @return array
     */
    public function dealRow(&$row)
    {
        $row['mobile_encrypted'] = TelSecurity::encrypt($row['mobile']);
        return $row;
    }

20181225更新
昨天完成了優(yōu)化,今天有點空认然,研究一下為什么yii2的each/batch沒有作用补憾。
在網上查了很久,發(fā)現這個:

image.png

直接打開連接:
http://php.net/manual/en/mysqlinfo.concepts.buffering.php

  • 主要解釋:

Buffered and Unbuffered queries ?

Queries are using the buffered mode by default. This means that query results are immediately transferred from the MySQL Server to PHP and then are kept in the memory of the PHP process. This allows additional operations like counting the number of rows, and moving (seeking) the current result pointer. It also allows issuing further queries on the same connection while working on the result set. The downside of the buffered mode is that larger result sets might require quite a lot memory. The memory will be kept occupied till all references to the result set are unset or the result set was explicitly freed, which will automatically happen during request end the latest. The terminology "store result" is also used for buffered mode, as the whole result set is stored at once.

Note:

When using libmysqlclient as library PHP's memory limit won't count the memory used for result sets unless the data is fetched into PHP variables. With mysqlnd the memory accounted for will include the full result set.

Unbuffered MySQL queries execute the query and then return a resource while the data is still waiting on the MySQL server for being fetched. This uses less memory on the PHP-side, but can increase the load on the server. Unless the full result set was fetched from the server no further queries can be sent over the same connection. Unbuffered queries can also be referred to as "use result".

Following these characteristics buffered queries should be used in cases where you expect only a limited result set or need to know the amount of returned rows before reading all rows. Unbuffered mode should be used when you expect larger results.

Because buffered queries are the default, the examples below will demonstrate how to execute unbuffered queries with each API.

我的理解是:默認情況下卷员,查詢是以緩存模式進行盈匾,這意味著mysql服務器查詢的數據返回后會存儲在php的內存中。如果查詢大數據毕骡,就要求給php分配的內存必須足夠大削饵。

緩存模式適用于讀取一個有限集合,或者在讀取所有行之前先讀取一部分數據挺峡。而大批量數據則得用到非緩存模式葵孤。

下面是mysqli和pdo的例子:

  • mysqli (之前優(yōu)化采取的方式就是用這個)
<?php
$mysqli  = new mysqli("localhost", "my_user", "my_password", "world");
$uresult = $mysqli->query("SELECT Name FROM City", MYSQLI_USE_RESULT);

if ($uresult) {
   while ($row = $uresult->fetch_assoc()) {
       echo $row['Name'] . PHP_EOL;
   }
}
$uresult->close();
?>
  • pdo
$pdo = new PDO("mysql:host=localhost;dbname=world", 'my_user', 'my_pass');
$pdo->setAttribute(PDO::MYSQL_ATTR_USE_BUFFERED_QUERY, false);

$uresult = $pdo->query("SELECT Name FROM City");
if ($uresult) {
   while ($row = $uresult->fetch(PDO::FETCH_ASSOC)) {
       echo $row['Name'] . PHP_EOL;
   }
}

我馬上試了一下,測試代碼:

    /**
     * 初始化經紀人加密手機號
     * php yii qdgj/paas-accounts/test-mobile (part/init/mobile) (18812342234,18812343234)  --orgcode=fangzhiadmin_test
     * @param string $action
     * @param string $mobiles 多個,隔開
     * @return mixed
     */
    public function actionTestMobile($action = 'part', $mobiles = '')
    {
        if(!in_array($action, ['part','init','mobile'])) {
            die('Error Params !');
        }
        try {
            /** @var Connection $connection */
            $connection = BrokerEntity::getDb();
            $pdo = new \PDO($connection->dsn, $connection->username, $connection->password);
            $pdo->setAttribute(\PDO::MYSQL_ATTR_USE_BUFFERED_QUERY, false);

            $uresult = $pdo->query('SELECT b_regbrokerId,mobile_tel,capacity_des,mobile_tel_encrypted FROM `b_regbroker` WHERE mobile_tel != "" and mobile_tel_encrypted = ""');
            if ($uresult) {
                while ($row = $uresult->fetch(\PDO::FETCH_ASSOC)) {
                    print_r($row);
                }
            }
        } catch (\Exception $e) {
            var_dump($e->getMessage());
        }
        return true;
    }

cpu和內存使用率果然降下來了:

$ ps aux | grep test-mobile
www      18847  7.4  0.2 336084 23180 pts/1    S+   11:44   0:01 php yii qdgj/paas-accounts/test-mobile part --orgcode=znfangcadmin

回到正題橱赠,yii2的each/batch為啥無效尤仍,因為人家用的pdo默認都是用緩存模式,代碼又沒有設置這個模式狭姨,當然就沒有用了宰啦,如果要生效苏遥,還必須自己設置那個pdo模式。那是否自己設置了那個模式就可以呢赡模?很遺憾田炭,我試了一下,不可以漓柑,因為yii源碼里面教硫,設置模式是在fetch和fetchAll的上層,然而fetchAll是必須是緩存模式辆布,不然會出錯瞬矩,而yii用的過程中,肯定會用到fetchAll的锋玲,比如取一個表的所有字段景用。簡言之,如果直接設置了fetch為非緩存模式惭蹂,那么所有用到fetchAll的都會報錯伞插。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市盾碗,隨后出現的幾起案子媚污,更是在濱河造成了極大的恐慌,老刑警劉巖置尔,帶你破解...
    沈念sama閱讀 210,914評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件杠步,死亡現場離奇詭異氢伟,居然都是意外死亡榜轿,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 89,935評論 2 383
  • 文/潘曉璐 我一進店門朵锣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來谬盐,“玉大人,你說我怎么就攤上這事诚些》煽” “怎么了?”我有些...
    開封第一講書人閱讀 156,531評論 0 345
  • 文/不壞的土叔 我叫張陵诬烹,是天一觀的道長砸烦。 經常有香客問我,道長绞吁,這世上最難降的妖魔是什么幢痘? 我笑而不...
    開封第一講書人閱讀 56,309評論 1 282
  • 正文 為了忘掉前任,我火速辦了婚禮家破,結果婚禮上颜说,老公的妹妹穿的比我還像新娘购岗。我一直安慰自己,他們只是感情好门粪,可當我...
    茶點故事閱讀 65,381評論 5 384
  • 文/花漫 我一把揭開白布喊积。 她就那樣靜靜地躺著,像睡著了一般玄妈。 火紅的嫁衣襯著肌膚如雪乾吻。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,730評論 1 289
  • 那天拟蜻,我揣著相機與錄音溶弟,去河邊找鬼。 笑死瞭郑,一個胖子當著我的面吹牛辜御,可吹牛的內容都是我干的。 我是一名探鬼主播屈张,決...
    沈念sama閱讀 38,882評論 3 404
  • 文/蒼蘭香墨 我猛地睜開眼擒权,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了阁谆?” 一聲冷哼從身側響起碳抄,我...
    開封第一講書人閱讀 37,643評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎场绿,沒想到半個月后剖效,有當地人在樹林里發(fā)現了一具尸體,經...
    沈念sama閱讀 44,095評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡焰盗,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,448評論 2 325
  • 正文 我和宋清朗相戀三年璧尸,在試婚紗的時候發(fā)現自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片熬拒。...
    茶點故事閱讀 38,566評論 1 339
  • 序言:一個原本活蹦亂跳的男人離奇死亡爷光,死狀恐怖,靈堂內的尸體忽然破棺而出澎粟,到底是詐尸還是另有隱情蛀序,我是刑警寧澤,帶...
    沈念sama閱讀 34,253評論 4 328
  • 正文 年R本政府宣布活烙,位于F島的核電站徐裸,受9級特大地震影響,放射性物質發(fā)生泄漏啸盏。R本人自食惡果不足惜重贺,卻給世界環(huán)境...
    茶點故事閱讀 39,829評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧檬姥,春花似錦曾我、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,715評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至秉犹,卻和暖如春蛉谜,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背崇堵。 一陣腳步聲響...
    開封第一講書人閱讀 31,945評論 1 264
  • 我被黑心中介騙來泰國打工型诚, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人鸳劳。 一個月前我還...
    沈念sama閱讀 46,248評論 2 360
  • 正文 我出身青樓狰贯,卻偏偏與公主長得像,于是被迫代替她去往敵國和親赏廓。 傳聞我的和親對象是個殘疾皇子涵紊,可洞房花燭夜當晚...
    茶點故事閱讀 43,440評論 2 348

推薦閱讀更多精彩內容

  • pdo類PDO是一個“數據庫訪問抽象層”,作用是統一各種數據庫的訪問接口幔摸,與mysql和mysqli的函數庫相比摸柄,...
    桖辶殤閱讀 859評論 0 0
  • Lua 5.1 參考手冊 by Roberto Ierusalimschy, Luiz Henrique de F...
    蘇黎九歌閱讀 13,763評論 0 38
  • pyspark.sql模塊 模塊上下文 Spark SQL和DataFrames的重要類: pyspark.sql...
    mpro閱讀 9,448評論 0 13
  • 如今隨著互聯網的發(fā)展,數據的量級也是撐指數的增長既忆,從GB到TB到PB驱负。對數據的各種操作也是愈加的困難,傳統的關系性...
    CaesarXia閱讀 11,826評論 1 30
  • 上個星期我們小組完成了一個連接數據庫的小頁面患雇,并且添加了增刪改的功能跃脊。 第一個是要連庫。下面是我連庫的代碼庆亡,其中我...
    諳逸binz閱讀 383評論 1 2