Apache Doris——查詢

一逗柴、查詢設置

增大內(nèi)存

一個查詢?nèi)蝿赵趩蝹€BE結(jié)點上使用的內(nèi)存默認不超過2GB伍玖,如果超過肮雨,可能會出現(xiàn)Memory limit exceeded渴析。查看內(nèi)存限制:

mysql> SHOW VARIABLES LIKE "%mem_limit%";
+----------------+------------+
| Variable_name  | Value      |
+----------------+------------+
| exec_mem_limit | 2147483648 |
| load_mem_limit | 0          |
+----------------+------------+
2 rows in set (0.00 sec)

exec_mem_limit的單位是byte,可通過set命令改變exec_mem_limit的值:

set exec_mem_limit = 8589934592;

該命令只針對當前會話屋摔,如需永久有效烁设,則要添加global參數(shù):

set global exec_mem_limit = 8589934592;

修改超時時間

默認最長查詢時間為300s,如果超時未完成钓试,則會被取消掉,查看配置:

mysql> SHOW VARIABLES LIKE "%query_timeout%";
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| query_timeout | 300   |
+---------------+-------+
1 row in set (0.00 sec)

設置方法同exec_mem_limit副瀑。

查詢重試和高可用

當部署多個FE結(jié)點時弓熏,用戶可以在多個FE之上部署負載均衡層來實現(xiàn)Doris的高可用。

代碼方式

自己在應用層代碼進行重試和負載均衡糠睡。比如發(fā)現(xiàn)一個連接掛掉挽鞠,就自動在其他連接上進行重試。應用層代碼重試需要應用自己配置多個 doris 前端節(jié)點地址。

JDBC連接器

若使用MySQL的JDBC connector來連接Doris信认,則可以使用jdbc的自動重試機制:

jdbc:mysql://[host1][:port1],[host2][:port2][,[host3][:port3]]...[/[database]][?propertyName1=propertyValue1[&propertyName2=propertyValue2]...]

ProxySQL方式

ProxySQL是靈活強大的MySQL代理層, 是一個能實實在在用在生產(chǎn)環(huán)境的 MySQL 中間件材义,可以實現(xiàn)讀寫分離,支持Query路由功能嫁赏,支持動態(tài)指定某個SQL進行cache其掂,支持動態(tài)加載配置、故障切換和一些SQL的過濾功能潦蝇。

Doris的FE進程負責接收用戶連接和查詢請求款熬,其本身是可以橫向擴展且高可用的,但是需要用戶在多個FE上架設一層proxy攘乒,來實現(xiàn)自動的連接負載均衡贤牛。

首先,安裝ProxySQL:

[root@scentos szc]# vim /etc/yum.repos.d/proxysql.repo
[proxysql_repo]
name= ProxySQL YUM repository
baseurl=http://repo.proxysql.com/ProxySQL/proxysql-1.4.x/centos/\$releasever
gpgcheck=1
gpgkey=http://repo.proxysql.com/ProxySQL/repo_pub_key

[root@scentos szc]# yum clean all
[root@scentos szc]# yum makecache
[root@scentos szc]# yum -y install proxysql
[root@scentos szc]# proxysql --version
ProxySQL version 1.4.16-23-gf954ef3, codename Truls

設置開機自啟:

[root@scentos szc]# systemctl enable proxysql && systemctl start proxysql

啟動后會監(jiān)聽兩個端口则酝, 默認為 6032 和 6033殉簸。6032 端口是 ProxySQL 的管理端口,6033 是 ProxySQL 對外提供服務的端口 (即連接到轉(zhuǎn)發(fā)后端的真正數(shù)據(jù)庫的轉(zhuǎn)發(fā)端口)沽讹。

第二步喂链,配置ProxySQL
ProxySQL有配置文件/etc/proxysql.cnf和數(shù)據(jù)庫配置文件/var/lib/proxysql/proxysql.db,如果想要前者的配置在ProxySQL重啟后生效妥泉,則需要在修改后刪除后者椭微,本場景下不用修改。
然后連接ProxySQL管理端口盲链,默認用戶名和密碼都是admin:

[root@scentos szc]# mysql -h 127.0.0.1 -P 6032 -u admin -p
Enter password:
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 1
Server version: 5.5.30 (ProxySQL Admin Module)

Copyright (c) 2000, 2022, Oracle and/or its affiliates.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql>

在ProxySQL中配置DorisFE蝇率,有多個插入多個即可:

mysql> insert into mysql_servers(hostgroup_id, hostname,port) values(10, 'scentos', 9030);
Query OK, 1 row affected (0.00 sec)

查看FE狀態(tài):

mysql> select * from mysql_servers\G
*************************** 1. row ***************************
       hostgroup_id: 10
           hostname: scentos
               port: 9030
             status: ONLINE
             weight: 1
        compression: 0
    max_connections: 1000
max_replication_lag: 0
            use_ssl: 0
     max_latency_ms: 0
            comment:
1 row in set (0.00 sec)

將FE的server配置加載到運行時,并存盤:

mysql> load mysql servers to runtime;
Query OK, 0 rows affected (0.00 sec)

mysql> save mysql servers to disk;
Query OK, 0 rows affected (0.03 sec)

監(jiān)控DorisFE結(jié)點配置刽沾,首先在DorisFE主數(shù)據(jù)結(jié)點上創(chuàng)建一個用于監(jiān)控的用戶名:

mysql> create user monitor@'192.168.31.%' identified by 'monitor';
Query OK, 0 rows affected (0.01 sec)

mysql> grant ADMIN_PRIV on *.* to monitor@'192.168.31.%';
Query OK, 0 rows affected (0.02 sec)

再回到ProxySQL中配置監(jiān)控:

mysql> set mysql-monitor_username='monitor';
Query OK, 1 row affected (0.00 sec)

mysql> set mysql-monitor_password='monitor';
Query OK, 1 row affected (0.00 sec)

mysql> load mysql servers to runtime;
Query OK, 0 rows affected (0.00 sec)

mysql> save mysql servers to disk;
Query OK, 0 rows affected (0.00 sec)

查看配置結(jié)果本慕,配置監(jiān)控用戶名和密碼前,日志中會出現(xiàn)Access denied錯誤侧漓,配置后就沒有了锅尘,且新環(huán)境中的只讀日志為空:

mysql> select * from mysql_server_connect_log;
+----------+------+------------------+-------------------------+------------------------------------------------------------------------+
| hostname | port | time_start_us    | connect_success_time_us | connect_error                                                          |
+----------+------+------------------+-------------------------+------------------------------------------------------------------------+
| scentos  | 9030 | 1652667572521858 | 0                       | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667632526585 | 0                       | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667692861809 | 0                       | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667752870084 | 2403                    | NULL                                                                   |
| scentos  | 9030 | 1652667812873528 | 1959                    | NULL                                                                   |
+----------+------+------------------+-------------------------+------------------------------------------------------------------------+
5 rows in set (0.00 sec)

mysql> select * from mysql_server_ping_log;
+----------+------+------------------+----------------------+------------------------------------------------------------------------+
| hostname | port | time_start_us    | ping_success_time_us | ping_error                                                             |
+----------+------+------------------+----------------------+------------------------------------------------------------------------+
| scentos  | 9030 | 1652667523027559 | 0                    | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667532947750 | 0                    | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667543014963 | 0                    | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667552875834 | 0                    | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667562914012 | 0                    | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667572941974 | 0                    | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667582893764 | 0                    | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667592959601 | 0                    | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667603011943 | 0                    | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667612992504 | 0                    | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667622958538 | 0                    | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667632936523 | 0                    | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667642893430 | 0                    | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667652948737 | 0                    | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667662952283 | 0                    | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667673089193 | 0                    | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667682946538 | 0                    | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667693028613 | 0                    | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667702868941 | 0                    | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667713002938 | 0                    | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667722963990 | 0                    | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667733036470 | 0                    | Access denied for user 'default_cluster:monitor' (using password: YES) |
| scentos  | 9030 | 1652667743059712 | 7171                 | NULL                                                                   |
| scentos  | 9030 | 1652667752944582 | 493                  | NULL                                                                   |
| scentos  | 9030 | 1652667762936507 | 708                  | NULL                                                                   |
| scentos  | 9030 | 1652667772993064 | 412                  | NULL                                                                   |
| scentos  | 9030 | 1652667782935183 | 1395                 | NULL                                                                   |
| scentos  | 9030 | 1652667792877624 | 436                  | NULL                                                                   |
| scentos  | 9030 | 1652667803030817 | 409                  | NULL                                                                   |
| scentos  | 9030 | 1652667812902970 | 643                  | NULL                                                                   |
| scentos  | 9030 | 1652667823086544 | 407                  | NULL                                                                   |
| scentos  | 9030 | 1652667832999904 | 400                  | NULL                                                                   |
| scentos  | 9030 | 1652667843080748 | 375                  | NULL                                                                   |
| scentos  | 9030 | 1652667853084720 | 374                  | NULL                                                                   |
| scentos  | 9030 | 1652667862940854 | 409                  | NULL                                                                   |
| scentos  | 9030 | 1652667872944847 | 432                  | NULL                                                                   |
| scentos  | 9030 | 1652667882914813 | 450                  | NULL                                                                   |
+----------+------+------------------+----------------------+------------------------------------------------------------------------+
37 rows in set (0.00 sec)

mysql> select * from  mysql_server_read_only_log;
Empty set (0.00 sec)

mysql> load mysql servers to runtime;
Query OK, 0 rows affected (0.00 sec)

mysql> save mysql servers to disk;
Query OK, 0 rows affected (0.07 sec)

mysql> select hostgroup_id,hostname,port,status,weight from mysql_servers;
+--------------+----------+------+--------+--------+
| hostgroup_id | hostname | port | status | weight |
+--------------+----------+------+--------+--------+
| 10           | scentos  | 9030 | ONLINE | 1      |
+--------------+----------+------+--------+--------+
1 row in set (0.00 sec)

然后是配置Doris用戶,包括發(fā)送SQL語句的用戶布蔗、SQL語句的路由規(guī)則藤违、SQL查詢的緩存、SQL語句的重寫等纵揍。假設Doris端有用戶root顿乒,則在ProxySQL中進行如下配置:

mysql> insert into mysql_users(username,password,default_hostgroup) values('root','root',10);
Query OK, 1 row affected (0.00 sec)

mysql> load mysql users to runtime;
Query OK, 0 rows affected (0.00 sec)

mysql> save mysql users to disk;
Query OK, 0 rows affected (0.00 sec)

查看結(jié)果:

mysql> select * from mysql_users\G
*************************** 1. row ***************************
              username: root
              password: root
                active: 1
               use_ssl: 0
     default_hostgroup: 10
        default_schema: NULL
         schema_locked: 0
transaction_persistent: 1
          fast_forward: 0
               backend: 1
              frontend: 1
       max_connections: 10000
1 row in set (0.00 sec)

確保active和transaction_persistent字段為1,然后配置參數(shù)mysql-forward_autocommit和mysql-autocommit_false_is_transaction:

mysql> UPDATE global_variables SET variable_value='true' WHERE variable_name='mysql-forward_autocommit';
Query OK, 1 row affected (0.01 sec)

mysql> UPDATE global_variables SET variable_value='true' WHERE variable_name='mysql-autocommit_false_is_transaction';
Query OK, 1 row affected (0.00 sec)

mysql> load mysql VARIABLES to runtime;
Query OK, 0 rows affected (0.00 sec)

mysql> save mysql VARIABLES to disk;
Query OK, 98 rows affected (0.09 sec)

這樣就可以在MySQL中以root的用戶名密碼連接ProxySQL了泽谨。

第三步璧榄,測試:

[root@scentos szc]# mysql -uroot -proot -P 6033 -h scentos -e "show databases;"
mysql: [Warning] Using a password on the command line interface can be insecure.
+--------------------+
| Database           |
+--------------------+
| information_schema |
| test               |
+--------------------+

二特漩、簡單查詢

基本查詢

mysql> SELECT * FROM example_site_visit LIMIT 3;
+---------+------------+--------+------+------+---------------------+--------------------------+------+----------------+----------------+
| user_id | date       | city   | age  | sex  | last_visit_date     | last_visit_date_not_null | cost | max_dwell_time | min_dwell_time |
+---------+------------+--------+------+------+---------------------+--------------------------+------+----------------+----------------+
| 10004   | 2017-10-01 | 深圳   |   35 |    0 | 2017-10-01 10:00:15 | 2017-10-01 10:00:15      |  100 |              3 |              3 |
| 10004   | 2017-10-03 | 深圳   |   35 |    0 | 2017-10-03 11:22:00 | 2017-10-03 10:20:22      |   55 |             19 |              6 |
| 10000   | 2017-10-01 | 北京   |   20 |    0 | 2017-10-01 07:00:00 | 2017-10-01 07:00:00      |   35 |             10 |              2 |
+---------+------------+--------+------+------+---------------------+--------------------------+------+----------------+----------------+
3 rows in set (0.12 sec)

mysql> SELECT * FROM example_site_visit ORDER BY user_id;
+---------+------------+--------+------+------+---------------------+--------------------------+------+----------------+----------------+
| user_id | date       | city   | age  | sex  | last_visit_date     | last_visit_date_not_null | cost | max_dwell_time | min_dwell_time |
+---------+------------+--------+------+------+---------------------+--------------------------+------+----------------+----------------+
| 10000   | 2017-10-01 | 北京   |   20 |    0 | 2017-10-01 07:00:00 | 2017-10-01 07:00:00      |   35 |             10 |              2 |
| 10001   | 2017-10-01 | 北京   |   30 |    1 | 2017-10-01 17:05:45 | 2017-10-01 07:00:00      |    2 |             22 |             22 |
| 10002   | 2017-10-02 | 上海   |   20 |    1 | 2017-10-02 12:59:12 | NULL                     |  200 |              5 |              5 |
| 10003   | 2017-10-02 | 廣州   |   32 |    0 | 2017-10-02 11:20:00 | 2017-10-02 11:20:00      |   30 |             11 |             11 |
| 10004   | 2017-10-03 | 深圳   |   35 |    0 | 2017-10-03 11:22:00 | 2017-10-03 10:20:22      |   55 |             19 |              6 |
| 10004   | 2017-10-01 | 深圳   |   35 |    0 | 2017-10-01 10:00:15 | 2017-10-01 10:00:15      |  100 |              3 |              3 |
| 10005   | 2017-10-03 | 長沙   |   29 |    1 | 2017-10-03 18:11:02 | 2017-10-03 18:11:02      |    3 |              1 |              1 |
+---------+------------+--------+------+------+---------------------+--------------------------+------+----------------+----------------+
7 rows in set (0.03 sec)

聯(lián)合(join)查詢

mysql> SELECT SUM(example_site_visit.cost) FROM example_site_visit JOIN example_site_visit2 WHERE example_site_visit.user_id = example_site_visit2.user_id;
+----------------------------------+
| sum(`example_site_visit`.`cost`) |
+----------------------------------+
|                              612 |
+----------------------------------+
1 row in set (0.12 sec)

mysql> select example_site_visit.user_id, sum(example_site_visit.cost) from example_site_visit join example_site_visit2 where example_site_visit.user_id = example_site_visit2.user_id group by example_site_visit.user_id;
+---------+----------------------------------+
| user_id | sum(`example_site_visit`.`cost`) |
+---------+----------------------------------+
| 10004   |                              310 |
| 10000   |                               70 |
| 10001   |                                2 |
| 10002   |                              200 |
| 10003   |                               30 |
+---------+----------------------------------+
5 rows in set (0.16 sec)

子查詢

mysql> SELECT SUM(cost) FROM example_site_visit2 WHERE user_id IN (SELECT user_id FROM example_site_visit WHERE user_id > 10003);
+-------------+
| sum(`cost`) |
+-------------+
|         111 |
+-------------+
1 row in set (0.07 sec)

Join查詢

廣播Join

系統(tǒng)默認實現(xiàn)Join的方式,是將小表進行條件過濾后骨杂,將其廣播到大表所在的各個節(jié)點上涂身,形成一個內(nèi)存Hash表,然后流式讀出大表的數(shù)據(jù)進行Hash Join搓蚪。Doris會自動嘗試進行Broadcast Join蛤售,如果預估小表過大則會自動切換至Shuffle Join。注意陕凹,如果此時顯式指定了Broadcast Join也會自動切換至Shuffle Join悍抑。

默認使用 [Broadcast] Join
mysql> EXPLAIN SELECT SUM(example_site_visit.cost) FROM example_site_visit JOIN example_site_visit2 WHERE example_site_visit.city = example_site_visit2.city;
+---------------------------------------------------------------------------------------+
| Explain String                                                                        |
+---------------------------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                                       |
|  OUTPUT EXPRS:<slot 4> sum(`example_site_visit`.`cost`)                               |
|   PARTITION: UNPARTITIONED                                                            |
|                                                                                       |
|   RESULT SINK                                                                         |
|                                                                                       |
|   6:AGGREGATE (merge finalize)                                                        |
|   |  output: sum(<slot 3> sum(`example_site_visit`.`cost`))                           |
|   |  group by:                                                                        |
|   |  cardinality=-1                                                                   |
|   |                                                                                   |
|   5:EXCHANGE                                                                          |
|                                                                                       |
| PLAN FRAGMENT 1                                                                       |
|  OUTPUT EXPRS:                                                                        |
|   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`example_site_visit`.`user_id`  |
|                                                                                       |
|   STREAM DATA SINK                                                                    |
|     EXCHANGE ID: 05                                                                   |
|     UNPARTITIONED                                                                     |
|                                                                                       |
|   3:AGGREGATE (update serialize)                                                      |
|   |  output: sum(`example_site_visit`.`cost`)                                         |
|   |  group by:                                                                        |
|   |  cardinality=1                                                                    |
|   |                                                                                   |
|   2:HASH JOIN                                                                         |
|   |  join op: INNER JOIN (BROADCAST)                                                  |
|   |  hash predicates:                                                                 |
|   |  colocate: false, reason: Tables are not in the same group                        |
|   |  equal join conjunct: `example_site_visit`.`city` = `example_site_visit2`.`city`  |
|   |  runtime filters: RF000[in] <- `example_site_visit2`.`city`                       |
|   |  cardinality=7                                                                    |
|   |                                                                                   |
|   |----4:EXCHANGE                                                                     |
|   |                                                                                   |
|   0:OlapScanNode                                                                      |
|      TABLE: example_site_visit                                                        |
|      PREAGGREGATION: ON                                                               |
|      runtime filters: RF000[in] -> `example_site_visit`.`city`                        |
|      partitions=1/1                                                                   |
|      rollup: example_site_visit                                                       |
|      tabletRatio=10/10                                                                |
|      tabletList=10231,10233,10235,10237,10239,10241,10243,10245,10247,10249           |
|      cardinality=7                                                                    |
|      avgRowSize=1496.4286                                                             |
|      numNodes=1                                                                       |
|                                                                                       |
| PLAN FRAGMENT 2                                                                       |
|  OUTPUT EXPRS:                                                                        |
|   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`example_site_visit2`.`user_id` |
|                                                                                       |
|   STREAM DATA SINK                                                                    |
|     EXCHANGE ID: 04                                                                   |
|     UNPARTITIONED                                                                     |
|                                                                                       |
|   1:OlapScanNode                                                                      |
|      TABLE: example_site_visit2                                                       |
|      PREAGGREGATION: OFF. Reason: null                                                |
|      partitions=1/1                                                                   |
|      rollup: example_site_visit2                                                      |
|      tabletRatio=10/10                                                                |
|      tabletList=10255,10257,10259,10261,10263,10265,10267,10269,10271,10273           |
|      cardinality=6                                                                    |
|      avgRowSize=1358.0                                                                |
|      numNodes=1                                                                       |
+---------------------------------------------------------------------------------------+
66 rows in set (0.00 sec)
顯式使用 Broadcast Join
mysql> EXPLAIN SELECT SUM(example_site_visit.cost) FROM example_site_visit JOIN [broadcast] example_site_visit2 WHERE example_site_visit.city = example_site_visit2.city;
+---------------------------------------------------------------------------------------+
| Explain String                                                                        |
+---------------------------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                                       |
|  OUTPUT EXPRS:<slot 4> sum(`example_site_visit`.`cost`)                               |
|   PARTITION: UNPARTITIONED                                                            |
|                                                                                       |
|   RESULT SINK                                                                         |
|                                                                                       |
|   6:AGGREGATE (merge finalize)                                                        |
|   |  output: sum(<slot 3> sum(`example_site_visit`.`cost`))                           |
|   |  group by:                                                                        |
|   |  cardinality=-1                                                                   |
|   |                                                                                   |
|   5:EXCHANGE                                                                          |
|                                                                                       |
| PLAN FRAGMENT 1                                                                       |
|  OUTPUT EXPRS:                                                                        |
|   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`example_site_visit`.`user_id`  |
|                                                                                       |
|   STREAM DATA SINK                                                                    |
|     EXCHANGE ID: 05                                                                   |
|     UNPARTITIONED                                                                     |
|                                                                                       |
|   3:AGGREGATE (update serialize)                                                      |
|   |  output: sum(`example_site_visit`.`cost`)                                         |
|   |  group by:                                                                        |
|   |  cardinality=1                                                                    |
|   |                                                                                   |
|   2:HASH JOIN                                                                         |
|   |  join op: INNER JOIN (BROADCAST)                                                  |
|   |  hash predicates:                                                                 |
|   |  colocate: false, reason: Has join hint                                           |
|   |  equal join conjunct: `example_site_visit`.`city` = `example_site_visit2`.`city`  |
|   |  runtime filters: RF000[in] <- `example_site_visit2`.`city`                       |
|   |  cardinality=7                                                                    |
|   |                                                                                   |
|   |----4:EXCHANGE                                                                     |
|   |                                                                                   |
|   0:OlapScanNode                                                                      |
|      TABLE: example_site_visit                                                        |
|      PREAGGREGATION: ON                                                               |
|      runtime filters: RF000[in] -> `example_site_visit`.`city`                        |
|      partitions=1/1                                                                   |
|      rollup: example_site_visit                                                       |
|      tabletRatio=10/10                                                                |
|      tabletList=10231,10233,10235,10237,10239,10241,10243,10245,10247,10249           |
|      cardinality=7                                                                    |
|      avgRowSize=1496.4286                                                             |
|      numNodes=1                                                                       |
|                                                                                       |
| PLAN FRAGMENT 2                                                                       |
|  OUTPUT EXPRS:                                                                        |
|   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`example_site_visit2`.`user_id` |
|                                                                                       |
|   STREAM DATA SINK                                                                    |
|     EXCHANGE ID: 04                                                                   |
|     UNPARTITIONED                                                                     |
|                                                                                       |
|   1:OlapScanNode                                                                      |
|      TABLE: example_site_visit2                                                       |
|      PREAGGREGATION: OFF. Reason: null                                                |
|      partitions=1/1                                                                   |
|      rollup: example_site_visit2                                                      |
|      tabletRatio=10/10                                                                |
|      tabletList=10255,10257,10259,10261,10263,10265,10267,10269,10271,10273           |
|      cardinality=6                                                                    |
|      avgRowSize=1358.0                                                                |
|      numNodes=1                                                                       |
+---------------------------------------------------------------------------------------+
66 rows in set (0.00 sec)

shuffle Join

如果當小表過濾后的數(shù)據(jù)量無法放入內(nèi)存的話,此時Join將無法完成杜耙,通常的報錯應該是首先造成內(nèi)存超限搜骡。可以顯式指定Shuffle Join佑女,也被稱作Partitioned Join记靡。即將小表和大表都按照Join的key進行Hash,然后進行分布式的Join团驱,此時對內(nèi)存的消耗就會分攤到集群的所有計算節(jié)點上:

mysql> SELECT SUM(example_site_visit.cost) FROM example_site_visit JOIN [shuffle] example_site_visit2 WHERE example_site_visit.city = example_site_visit2.city;
+----------------------------------+
| sum(`example_site_visit`.`cost`) |
+----------------------------------+
|                              651 |
+----------------------------------+
1 row in set (0.16 sec)

三摸吠、Colocation Join

Colocation Join是在Doris0.9版本引入的功能,旨在為Join查詢提供本性優(yōu)化嚎花,來減少數(shù)據(jù)在節(jié)點上的傳輸耗時寸痢,加速查詢。

原理

Colocation Join功能紊选,是將一組擁有CGS的表組成一個CG啼止。保證這些表對應的數(shù)據(jù)分片會落在同一個be節(jié)點上,那么使得兩表再進行join的時候兵罢,可以通過本地數(shù)據(jù)進行直接join献烦,減少數(shù)據(jù)在節(jié)點之間的網(wǎng)絡傳輸時間。

Colocation Group(CG)

一個CG中會包含一張及以上的Table卖词。在同一個 Group內(nèi)的Table有著相同的Colocation Group Schema巩那,并且有著相同的數(shù)據(jù)分片分布。

Colocation Group Schema(CGS)

用于描述一個CG 中的Table此蜈,和Colocation相關(guān)的通用Schema信息即横。包括分桶列類型,分桶數(shù)以及副本數(shù)等舶替。

一個表的數(shù)據(jù)令境,最終會根據(jù)分桶列值Hash、對桶數(shù)取模的后落在某一個分桶內(nèi)顾瞪。假設一個 Table 的分桶數(shù)為8舔庶,則共有[0, 1, 2, 3, 4, 5, 6, 7]8個分桶(Bucket)我們稱這樣一個序列為一個 BucketsSequence,每個Bucket內(nèi)會有一個或多個數(shù)據(jù)分片(Tablet)陈醒。當表為單分區(qū)表時惕橙,一個Bucket內(nèi)僅有一個Tablet。如果是多分區(qū)表钉跷,則會有多個弥鹦。

使用限制

  • (1)建表時兩張表的分桶列的類型和數(shù)量需要完全一致,并且桶數(shù)一致爷辙,才能保證多張表的數(shù)據(jù)分片能夠一一對應的進行分布控制彬坏;
  • (2)同一個CG內(nèi)所有表的所有分區(qū)(Partition)的副本數(shù)必須一致。如果不一致膝晾,可能出現(xiàn)某一個Tablet的某一個副本栓始,在同一個 BE 上沒有其他的表分片的副本對應;
  • (3)同一個CG內(nèi)的表血当,分區(qū)的個數(shù)幻赚、范圍以及分區(qū)列的類型不要求一致。

使用

建表

先建兩張表臊旭,分桶列都是int類型落恼,且桶數(shù)都為8:

CREATE TABLE `tbl1_j` (
    `k1` date NOT NULL COMMENT "",
    `k2` int(11) NOT NULL COMMENT "",
    `v1` int(11) SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
PARTITION BY RANGE(`k1`)
(
    PARTITION p1 VALUES LESS THAN ('2019-05-31'),
    PARTITION p2 VALUES LESS THAN ('2019-06-30')
)
DISTRIBUTED BY HASH(`k2`) BUCKETS 8
PROPERTIES (
    "colocate_with" = "group1",
    "replication_num" = "1",
    "storage_medium" = "SSD"
);

CREATE TABLE `tbl2_j` (
    `k1` datetime NOT NULL COMMENT "",
    `k2` int(11) NOT NULL COMMENT "",
    `v1` double SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
DISTRIBUTED BY HASH(`k2`) BUCKETS 8
PROPERTIES (
    "colocate_with" = "group1",
    "replication_num" = "1",
    "storage_medium" = "SSD"
);

查詢

編寫查詢語句,查看執(zhí)行計劃:

mysql> explain SELECT * FROM tbl1_j INNER JOIN tbl2_j ON (tbl1_j.k2 = tbl2_j.k2)\G
*************************** 1. row ***************************
Explain String: PLAN FRAGMENT 0
*************************** 2. row ***************************
Explain String:  OUTPUT EXPRS:`default_cluster:test`.`tbl1_j`.`k1` | `default_cluster:test`.`tbl1_j`.`k2` | `default_cluster:test`.`tbl1_j`.`v1` | `default_cluster:test`.`tbl2_j`.`k1` | `default_cluster:test`.`tbl2_j`.`k2` | `default_cluster:test`.`tbl2_j`.`v1`
*************************** 3. row ***************************
Explain String:   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`tbl1_j`.`k2`
*************************** 4. row ***************************
Explain String:
*************************** 5. row ***************************
Explain String:   RESULT SINK
*************************** 6. row ***************************
Explain String:
*************************** 7. row ***************************
Explain String:   2:HASH JOIN
*************************** 8. row ***************************
Explain String:   |  join op: INNER JOIN
*************************** 9. row ***************************
Explain String:   |  hash predicates:
*************************** 10. row ***************************
Explain String:   |  colocate: true
*************************** 11. row ***************************
Explain String:   |  equal join conjunct: (`tbl1_j`.`k2` = `tbl2_j`.`k2`)
*************************** 12. row ***************************
Explain String:   |  runtime filters: RF000[in] <- `tbl2_j`.`k2`
*************************** 13. row ***************************
Explain String:   |  cardinality=0
*************************** 14. row ***************************
Explain String:   |  
*************************** 15. row ***************************
Explain String:   |----1:OlapScanNode
*************************** 16. row ***************************
Explain String:   |       TABLE: tbl2_j
*************************** 17. row ***************************
Explain String:   |       PREAGGREGATION: OFF. Reason: null
*************************** 18. row ***************************
Explain String:   |       partitions=0/1
*************************** 19. row ***************************
Explain String:   |       rollup: null
*************************** 20. row ***************************
Explain String:   |       tabletRatio=0/0
*************************** 21. row ***************************
Explain String:   |       tabletList=
*************************** 22. row ***************************
Explain String:   |       cardinality=0
*************************** 23. row ***************************
Explain String:   |       avgRowSize=28.0
*************************** 24. row ***************************
Explain String:   |       numNodes=1
*************************** 25. row ***************************
Explain String:   |    
*************************** 26. row ***************************
Explain String:   0:OlapScanNode
*************************** 27. row ***************************
Explain String:      TABLE: tbl1_j
*************************** 28. row ***************************
Explain String:      PREAGGREGATION: OFF. Reason: No AggregateInfo
*************************** 29. row ***************************
Explain String:      runtime filters: RF000[in] -> `tbl1_j`.`k2`
*************************** 30. row ***************************
Explain String:      partitions=0/2
*************************** 31. row ***************************
Explain String:      rollup: null
*************************** 32. row ***************************
Explain String:      tabletRatio=0/0
*************************** 33. row ***************************
Explain String:      tabletList=
*************************** 34. row ***************************
Explain String:      cardinality=0
*************************** 35. row ***************************
Explain String:      avgRowSize=24.0
*************************** 36. row ***************************
Explain String:      numNodes=1
36 rows in set (0.01 sec)

查看group

mysql> SHOW PROC '/colocation_group';
+-------------+--------------+--------------+------------+-------------------------+----------+----------+----------+
| GroupId     | GroupName    | TableIds     | BucketsNum | ReplicaAllocation       | DistCols | IsStable | ErrorMsg |
+-------------+--------------+--------------+------------+-------------------------+----------+----------+----------+
| 10003.13036 | 10003_group1 | 13034, 13070 | 8          | tag.location.default: 1 | int(11)  | true     |          |
+-------------+--------------+--------------+------------+-------------------------+----------+----------+----------+
1 row in set (0.00 sec)

刪除group

當Group中最后一張表徹底刪除后(徹底刪除是指從回收站中刪除离熏。通常佳谦,一張表通過DROP TABLE命令刪除后,會在回收站默認停留一天的時間后滋戳,再刪除)钻蔑,該Group也會被自動刪除。

修改表的colocate組
mysql> ALTER TABLE tbl1_j SET ("colocate_with" = "group2");
Query OK, 0 rows affected (0.00 sec)

如果該表之前沒有指定過Group胧瓜,則該命令檢查Schema矢棚,并將該表加入到該Group(Group 不存在則會創(chuàng)建);
如果該表之前有指定其他Group府喳,則該命令會先將該表從原有Group中移除蒲肋,并加入新Group(Group 不存在則會創(chuàng)建)。

刪除表的colocate組
ALTER TABLE tbl SET ("colocate_with" = "");

另外钝满,當對一個具有Colocation屬性的表進行增加分區(qū)(ADD PARTITION)兜粘、修改副本數(shù)時,Doris會檢查修改是否會違反Colocation Group Schema弯蚜,如果違反則會拒絕孔轴。

四、Bucket Shuffle Join

Bucket Shuffle Join是在Doris 0.14版本中正式加入的新功能碎捺,旨在為某些Join查詢提供本地性優(yōu)化路鹰,來減少數(shù)據(jù)在節(jié)點間的傳輸耗時贷洲,來加速查詢。

原理

Doris支持的常規(guī)分布式Join方式包括了shuffle join和broadcast join晋柱。這兩種join都會導致不小的網(wǎng)絡開銷优构。

舉個例子,當前存在A表與B表的Join查詢雁竞,它的Join方式為HashJoin钦椭,不同Join類型的開銷如下:

  • 1、Broadcast Join: 如果根據(jù)數(shù)據(jù)分布碑诉,查詢規(guī)劃出A表有3個執(zhí)行的HashJoinNode彪腔,那么需要將B表全量地發(fā)送到3個HashJoinNode,那么它的網(wǎng)絡開銷是3B进栽,它的內(nèi)存開銷也是3B德挣;
  • 2、Shuffle Join: Shuffle Join會將A泪幌、B兩張表的數(shù)據(jù)根據(jù)哈希計算分散到集群的節(jié)點之中盲厌,所以它的網(wǎng)絡開銷為A + B,內(nèi)存開銷為B祸泪。

在FE之中保存了Doris每個表的數(shù)據(jù)分布信息吗浩,如果join語句命中了表的數(shù)據(jù)分布列,使用數(shù)據(jù)分布信息來減少join語句的網(wǎng)絡與內(nèi)存開銷没隘,這就是Bucket Shuffle Join懂扼,原理如下圖:


SQL語句為A表join B表,并且join的等值表達式命中了A的數(shù)據(jù)分布列右蒲。而Bucket Shuffle Join會根據(jù)A表的數(shù)據(jù)分布信息阀湿,將B表的數(shù)據(jù)發(fā)送到對應的A表的數(shù)據(jù)存儲計算節(jié)點。Bucket Shuffle Join開銷如下:

  • 1瑰妄、網(wǎng)絡開銷: B < min(3B, A + B)陷嘴;
  • 2、內(nèi)存開銷: B <= min(3B, B)。

可見,相比于Broadcast Join與Shuffle Join理茎,Bucket Shuffle Join有著較為明顯的性能優(yōu)勢,減少數(shù)據(jù)在節(jié)點間的傳輸耗時和Join時的內(nèi)存開銷劳澄。相對于 Doris 原有的Join方式,它有著下面的優(yōu)點:

  • 1蜈七、Bucket-Shuffle-Join降低了網(wǎng)絡與內(nèi)存開銷秒拔,使一些Join查詢具有了更好的性能,尤其是當 FE 能夠執(zhí)行左表的分區(qū)裁剪與桶裁剪時飒硅;
  • 2砂缩、與Colocate Join不同作谚,它對于表的數(shù)據(jù)分布方式并沒有侵入性,這對于用戶來說是透明的梯轻,對于表的數(shù)據(jù)分布沒有強制性的要求食磕,不容易導致數(shù)據(jù)傾斜的問題尽棕;
  • 3喳挑、它可以為Join Reorder提供更多可能的優(yōu)化空間。

使用

首先滔悉,查看并設置變量:

mysql> show variables like '%bucket_shuffle_join%';
+----------------------------+-------+
| Variable_name              | Value |
+----------------------------+-------+
| enable_bucket_shuffle_join | true  |
+----------------------------+-------+
1 row in set (0.00 sec)

mysql> set enable_bucket_shuffle_join = true;

在FE進行分布式查詢規(guī)劃時伊诵,優(yōu)先選擇的順序為Colocate Join -> Bucket Shuffle Join ->Broadcast Join -> Shuffle Join。但是如果用戶顯式指定了 Join 的類型回官,則以上優(yōu)先級無效曹宴,如:

select * from test join [shuffle] baseall on test.k1 = baseall.k1;

可以通過explain查看join類型:

mysql> EXPLAIN SELECT SUM(example_site_visit.cost) FROM example_site_visit JOIN example_site_visit2 ON example_site_visit.user_id = example_site_visit2.user_id;
+--------------------------------------------------------------------------------------------+
| Explain String                                                                             |
+--------------------------------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                                            |
|  OUTPUT EXPRS:<slot 4> sum(`example_site_visit`.`cost`)                                    |
|   PARTITION: UNPARTITIONED                                                                 |
|                                                                                            |
|   RESULT SINK                                                                              |
|                                                                                            |
|   6:AGGREGATE (merge finalize)                                                             |
|   |  output: sum(<slot 3> sum(`example_site_visit`.`cost`))                                |
|   |  group by:                                                                             |
|   |  cardinality=-1                                                                        |
|   |                                                                                        |
|   5:EXCHANGE                                                                               |
|                                                                                            |
| PLAN FRAGMENT 1                                                                            |
|  OUTPUT EXPRS:                                                                             |
|   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`example_site_visit`.`user_id`       |
|                                                                                            |
|   STREAM DATA SINK                                                                         |
|     EXCHANGE ID: 05                                                                        |
|     UNPARTITIONED                                                                          |
|                                                                                            |
|   3:AGGREGATE (update serialize)                                                           |
|   |  output: sum(`example_site_visit`.`cost`)                                              |
|   |  group by:                                                                             |
|   |  cardinality=1                                                                         |
|   |                                                                                        |
|   2:HASH JOIN                                                                              |
|   |  join op: INNER JOIN (BUCKET_SHUFFLE)                                                  |
|   |  hash predicates:                                                                      |
|   |  colocate: false, reason: Tables are not in the same group                             |
|   |  equal join conjunct: `example_site_visit`.`user_id` = `example_site_visit2`.`user_id` |
|   |  runtime filters: RF000[in] <- `example_site_visit2`.`user_id`                         |
|   |  cardinality=7                                                                         |
|   |                                                                                        |
|   |----4:EXCHANGE                                                                          |
|   |                                                                                        |
|   0:OlapScanNode                                                                           |
|      TABLE: example_site_visit                                                             |
|      PREAGGREGATION: ON                                                                    |
|      runtime filters: RF000[in] -> `example_site_visit`.`user_id`                          |
|      partitions=1/1                                                                        |
|      rollup: example_site_visit                                                            |
|      tabletRatio=10/10                                                                     |
|      tabletList=10231,10233,10235,10237,10239,10241,10243,10245,10247,10249                |
|      cardinality=7                                                                         |
|      avgRowSize=1496.4286                                                                  |
|      numNodes=1                                                                            |
|                                                                                            |
| PLAN FRAGMENT 2                                                                            |
|  OUTPUT EXPRS:                                                                             |
|   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`example_site_visit2`.`user_id`      |
|                                                                                            |
|   STREAM DATA SINK                                                                         |
|     EXCHANGE ID: 04                                                                        |
|     BUCKET_SHFFULE_HASH_PARTITIONED: `example_site_visit2`.`user_id`                       |
|                                                                                            |
|   1:OlapScanNode                                                                           |
|      TABLE: example_site_visit2                                                            |
|      PREAGGREGATION: OFF. Reason: null                                                     |
|      partitions=1/1                                                                        |
|      rollup: example_site_visit2                                                           |
|      tabletRatio=10/10                                                                     |
|      tabletList=10255,10257,10259,10261,10263,10265,10267,10269,10271,10273                |
|      cardinality=6                                                                         |
|      avgRowSize=1358.0                                                                     |
|      numNodes=1                                                                            |
+--------------------------------------------------------------------------------------------+
66 rows in set (0.00 sec)

注意事項

  • (1)Bucket Shuffle Join只生效于 Join 條件為等值的場景,原因與Colocate Join類似歉提,它們都依賴hash來計算確定的數(shù)據(jù)分布笛坦;
  • (2)在等值Join條件之中包含兩張表的分桶列,當左表的分桶列為等值的Join條件時苔巨,它有很大概率會被規(guī)劃為Bucket Shuffle Join版扩;
  • (3)由于不同的數(shù)據(jù)類型的hash值計算結(jié)果不同,所以Bucket Shuffle Join要求左表的分桶列的類型與右表等值join列的類型需要保持一致侄泽,否則無法進行對應的規(guī)劃礁芦;
  • (4)Bucket Shuffle Join只作用于Doris原生的OLAP表,對于ODBC悼尾、MySQL柿扣、ES 等外表,當其作為左表時是無法規(guī)劃生效的闺魏;
  • (5)對于分區(qū)表未状,由于每一個分區(qū)的數(shù)據(jù)分布規(guī)則可能不同,所以Bucket Shuffle Join只能保證左表為單分區(qū)時生效析桥。所以在SQL 執(zhí)行之中司草,需要盡量使用where條件使分區(qū)裁剪的策略能夠生效;
  • (6)假如左表為Colocate表烹骨,那么它每個分區(qū)的數(shù)據(jù)分布規(guī)則是確定的翻伺,BucketShuffle Join能在Colocate 表上表現(xiàn)更好。

五沮焕、運行時過濾

Runtime Filter是在Doris 0.15版本中正式加入的新功能吨岭,旨在為某些Join查詢在運行時動態(tài)生成過濾條件,來減少掃描的數(shù)據(jù)量峦树,避免不必要的I/O和網(wǎng)絡傳輸辣辫,從而加速查詢旦事。

原理

Runtime Filter在查詢規(guī)劃時生成,在HashJoinNode中構(gòu)建急灭,在ScanNode中應用姐浮。
舉個例子,當前存在T1表與T2表的Join 查詢葬馋,它的 Join 方式為HashJoin卖鲤,T1是一張事實表,數(shù)據(jù)行數(shù)為100000畴嘶,T2是一張維度表蛋逾,數(shù)據(jù)行數(shù)為2000,Doris join的實際情況是:


顯而易見對T2掃描數(shù)據(jù)要遠遠快于T1窗悯,如果我們主動等待一段時間再掃描T1区匣,等T2將掃描的數(shù)據(jù)記錄交給HashJoinNode后,HashJoinNode根據(jù)T2的數(shù)據(jù)計算出一個過濾條件蒋院,比如T2數(shù)據(jù)的最大和最小值亏钩,或者構(gòu)建一個Bloom Filter,接著將這個過濾條件發(fā)給等待掃描T1的ScanNode欺旧,后者應用這個過濾條件姑丑,將過濾后的數(shù)據(jù)交給HashJoinNode,從而減少probe hash table的次數(shù)和網(wǎng)絡開銷切端,這個過濾條件就是Runtime Filter彻坛,效果如下:


如果能將過濾條件(Runtime Filter)下推到存儲引擎,則某些情況下可以利用索引來直接減少掃描的數(shù)據(jù)量踏枣,從而大大減少掃描耗時昌屉,效果如下:


可見,和謂詞下推茵瀑、分區(qū)裁剪不同间驮,Runtime Filter是在運行時動態(tài)生成的過濾條件,即在查詢運行時解析join on clause確定過濾表達式马昨,并將表達式廣播給正在讀取左表的ScanNode竞帽,從而減少掃描的數(shù)據(jù)量,進而減少probe hash table的次數(shù)鸿捧,避免不必要的I/O和網(wǎng)絡傳輸屹篓。

Runtime Filter主要用于優(yōu)化針對大表的join,如果左表的數(shù)據(jù)量太小匙奴,或者右表的數(shù)據(jù)量太大堆巧,則Runtime Filter可能不會取得預期效果。

使用

首先指定運行過濾器的類型:

mysql> set runtime_filter_type="BLOOM_FILTER,IN,MIN_MAX";
Query OK, 0 rows affected (0.00 sec)

然后建表,插數(shù)據(jù):

CREATE TABLE test (t1 INT) DISTRIBUTED BY HASH (t1) BUCKETS 2 PROPERTIES("replication_num" = "1", "storage_medium" = "SSD");
INSERT INTO test VALUES (1), (2), (3), (4);

CREATE TABLE test2 (t2 INT) DISTRIBUTED BY HASH (t2) BUCKETS 2 PROPERTIES("replication_num" = "1", "storage_medium" = "SSD");
INSERT INTO test2 VALUES (3), (4), (5);

查看執(zhí)行計劃:

mysql> EXPLAIN SELECT t1 FROM test JOIN test2 where test.t1 = test2.t2;
+---------------------------------------------------------------------------------------------------------------+
| Explain String                                                                                                |
+---------------------------------------------------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                                                               |
|  OUTPUT EXPRS:`t1`                                                                                            |
|   PARTITION: UNPARTITIONED                                                                                    |
|                                                                                                               |
|   RESULT SINK                                                                                                 |
|                                                                                                               |
|   4:EXCHANGE                                                                                                  |
|                                                                                                               |
| PLAN FRAGMENT 1                                                                                               |
|  OUTPUT EXPRS:                                                                                                |
|   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`test`.`t1`                                             |
|                                                                                                               |
|   STREAM DATA SINK                                                                                            |
|     EXCHANGE ID: 04                                                                                           |
|     UNPARTITIONED                                                                                             |
|                                                                                                               |
|   2:HASH JOIN                                                                                                 |
|   |  join op: INNER JOIN (BUCKET_SHUFFLE)                                                                     |
|   |  hash predicates:                                                                                         |
|   |  colocate: false, reason: Tables are not in the same group                                                |
|   |  equal join conjunct: `test`.`t1` = `test2`.`t2`                                                          |
|   |  runtime filters: RF000[in] <- `test2`.`t2`, RF001[bloom] <- `test2`.`t2`, RF002[min_max] <- `test2`.`t2` |
|   |  cardinality=0                                                                                            |
|   |                                                                                                           |
|   |----3:EXCHANGE                                                                                             |
|   |                                                                                                           |
|   0:OlapScanNode                                                                                              |
|      TABLE: test                                                                                              |
|      PREAGGREGATION: ON                                                                                       |
|      runtime filters: RF000[in] -> `test`.`t1`, RF001[bloom] -> `test`.`t1`, RF002[min_max] -> `test`.`t1`    |
|      partitions=1/1                                                                                           |
|      rollup: test                                                                                             |
|      tabletRatio=2/2                                                                                          |
|      tabletList=13092,13094                                                                                   |
|      cardinality=0                                                                                            |
|      avgRowSize=4.0                                                                                           |
|      numNodes=1                                                                                               |
|                                                                                                               |
| PLAN FRAGMENT 2                                                                                               |
|  OUTPUT EXPRS:                                                                                                |
|   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`test2`.`t2`                                            |
|                                                                                                               |
|   STREAM DATA SINK                                                                                            |
|     EXCHANGE ID: 03                                                                                           |
|     BUCKET_SHFFULE_HASH_PARTITIONED: `test2`.`t2`                                                             |
|                                                                                                               |
|   1:OlapScanNode                                                                                              |
|      TABLE: test2                                                                                             |
|      PREAGGREGATION: ON                                                                                       |
|      partitions=1/1                                                                                           |
|      rollup: test2                                                                                            |
|      tabletRatio=2/2                                                                                          |
|      tabletList=13099,13101                                                                                   |
|      cardinality=0                                                                                            |
|      avgRowSize=4.0                                                                                           |
|      numNodes=1                                                                                               |
+---------------------------------------------------------------------------------------------------------------+
56 rows in set (0.00 sec)

可以看到谍肤,HASH JOIN生成了ID為RF000的IN predicate啦租,其中test2.t2的key values 僅在運行時可知,在OlapScanNode使用該IN predicate以便在讀取test.t1時過濾不必要的數(shù)據(jù)荒揣。

通過profile查看效果:

mysql> set enable_profile=true;
Query OK, 0 rows affected (0.01 sec)

mysql> SELECT t1 FROM test JOIN test2 where test.t1 = test2.t2;
+------+
| t1   |
+------+
|    3 |
|    4 |
+------+
2 rows in set (0.05 sec)

然后可以在http://scentos:8030/QueryProfile/上查看執(zhí)行效果篷角,如下所示:

可以看到每個Runtime Filter是否下推、等待耗時系任、以及OLAP_SCAN_NODE從prepare到接收到Runtime Filter的總時長:

                        RuntimeFilter:in:
                              -  HasPushDownToEngine:  true
                              -  AWaitTimeCost:  0ns
                              -  EffectTimeCost:  6.699ms
                              -

在profile的OLAP_SCAN_NODE 中可以查看Runtime Filter下推后的過濾效果和耗時:

                              -  RowsVectorPredFiltered:  0
                              ....
                              -  VectorPredEvalTime:  346ns

具體參數(shù)說明

大多數(shù)情況下恳蹲,只需要調(diào)整runtime_filter_type選項,其他選項保持默認即可: 包括BLOOM_FILTER赋除、IN阱缓、MIN_MAX(也可以通過數(shù)字設置),默認會使用IN举农,部分情況下同時使用Bloom Filter、MinMax Filter敞嗡、IN predicate時性能更高颁糟,每個類型含義如下:

  • (1)Bloom Filter: 有一定的誤判率,導致過濾的數(shù)據(jù)比預期少一點喉悴,但不會導致最終結(jié)果不準確棱貌,在大部分情況下Bloom Filter都可以提升性能或?qū)π阅軟]有顯著影響,但在部分情況下會導致性能降低箕肃。
    Bloom Filter構(gòu)建和應用的開銷較高婚脱,所以當過濾率較低時,或者左表數(shù)據(jù)量較少時勺像,Bloom Filter可能會導致性能降低;
    目前只有左表的Key列應用Bloom Filter才能下推到存儲引擎障贸,而測試結(jié)果顯示Bloom Filter不下推到存儲引擎時往往會導致性能降低;
    目前Bloom Filter僅在ScanNode上使用表達式過濾時有短路(short-circuit)邏輯,即當假陽性率(實際是假但誤辨為真的情況)過高時吟宦,不繼續(xù)使用Bloom Filter篮洁,但當Bloom Filter下推到存儲引擎后沒有短路邏輯,所以當過濾率較低時可能導致性能降低殃姓。

  • (2)MinMax Filter: 包含最大值和最小值袁波,從而過濾小于最小值和大于最大值的數(shù)據(jù),MinMax Filter的過濾效果與join on clause中Key列的類型和左右表數(shù)據(jù)分布有關(guān):
    當join on clause中Key 列的類型為int/bigint/double等時蜗侈,極端情況下篷牌,如果左右表的最大最小值相同則沒有效果,反之右表最大值小于左表最小值踏幻,或右表最小值大于左表最大值枷颊,則效果最好;
    當join on clause中Key 列的類型為varchar等時,應用MinMax Filter往往會導致性能降低偷卧。

  • (3)IN predicate: 根據(jù)join on clause中Key列在右表上的所有值構(gòu)建IN predicate豺瘤,使用構(gòu)建的IN predicate在左表上過濾,相比Bloom Filter構(gòu)建和應用的開銷更低听诸,在右表數(shù)據(jù)量較少時往往性能更高:
    默認只有右表數(shù)據(jù)行數(shù)少于1024才會下推(可通過session變量中的runtime_filter_max_in_num調(diào)整)坐求;
    目前IN predicate已實現(xiàn)合并方法;
    當同時指定In predicate和其他filter晌梨,并且in的過濾數(shù)值沒達到runtime_filter_max_in_num時桥嗤,會嘗試把其他filter去除掉。原因是In predicate是精確的過濾條件仔蝌,即使沒有其他filter也可以高效過濾泛领,如果同時使用則其他filter會做無用功。目前僅在Runtime filter的生產(chǎn)者和消費者處于同一個fragment時才會有去除非in filter的邏輯敛惊。

其他查詢選項通常僅在某些特定場景下渊鞋,才需進一步調(diào)整以達到最優(yōu)效果。通常只在性能測試后瞧挤,針對資源密集型锡宋、運行耗時足夠長且頻率足夠高的查詢進行優(yōu)化:
runtime_filter_mode: 用于調(diào)整Runtime Filter的下推策略,包括OFF特恬、LOCAL执俩、GLOBAL三種策略,默認設置為GLOBAL策略癌刽;
runtime_filter_wait_time_ms: 左表的ScanNode等待每個Runtime Filter的時間役首,默認1000ms;
runtime_filters_max_num: 每個查詢可應用的Runtime Filter中Bloom Filter的最大數(shù)量显拜,默認10衡奥;
runtime_bloom_filter_min_size: Runtime Filter中Bloom Filter的最小長度,默認1048576(1M)讼油;
runtime_bloom_filter_max_size: Runtime Filter中Bloom Filter的最大長度杰赛,默認16777216(16M);
runtime_bloom_filter_size: Runtime Filter中Bloom Filter的默認長度矮台,默認2097152(2M)乏屯;
runtime_filter_max_in_num: 如 join右表數(shù)據(jù)行數(shù)大于這個值,我們將不生成IN predicate瘦赫,默認 1024辰晕。

注意事項

  • (1)只支持對join on clause中的等值條件生成Runtime Filter,不包括Null-safe條件确虱,因為其可能會過濾掉join左表的null值;
  • (2)不支持將Runtime Filter下推到left outer含友、full outer、anti join的左表;
  • (3)不支持src expr或target expr是常量窘问;
  • (4)不支持src expr和target expr相等辆童;
  • (5)不支持src expr的類型等于HLL或者BITMAP;
  • (6)目前僅支持將Runtime Filter下推給OlapScanNode惠赫;
  • (7)不支持target expr包含NULL-checking表達式把鉴,比如COALESCE/IFNULL/CASE,因為當outer join上層其他join的join on clause包含NULL-checking表達式并生成Runtime Filter時儿咱,將這個Runtime Filter下推到outer join的左表時可能導致結(jié)果不正確庭砍;
  • (8)不支持target expr 中的列(slot)無法在原始表中找到某個等價列;
  • (9)不支持列傳導混埠,這包含兩種情況:
    一是例如join on clause包含A.k = B.k and B.k = C.k 時怠缸,目前C.k 只可以下推給B.k,而不可以下推給A.k钳宪;
    二是例如join on clause包含A.a + B.b = C.c揭北,如果A.a可以列傳導到B.a,即 A.a和 B.a是等價的列使套,那么可以用B.a替換A.a罐呼,然后可以嘗試將Runtime Filter下推給 B(如果A.a 和 B.a不是等價列,則不能下推給B侦高,因為target expr必須與唯一一個 join 左表綁定);
  • (10)Target expr和src expr的類型必須相等厌杜,因為Bloom Filter基于hash奉呛,若類型不等則會嘗試將target expr的類型轉(zhuǎn)換為src expr的類型;
  • (11)不支持PlanNode.Conjuncts生成的Runtime Filter下推夯尽,與HashJoinNode的eqJoinConjuncts和otherJoinConjuncts不同瞧壮,PlanNode.Conjuncts生成的Runtime Filter在測試中發(fā)現(xiàn)可能會導致錯誤的結(jié)果,例如IN 子查詢轉(zhuǎn)換為join時匙握,自動生成的join on clause將保存在PlanNode.Conjuncts中咆槽,此時應用Runtime Filter可能會導致結(jié)果缺少一些行。

SQL函數(shù)

查看內(nèi)置函數(shù):

mysql> show builtin functions in test;
+--------------------------------+
| Function Name                  |
+--------------------------------+
| %element_extract%              |
| abs                            |
| acos                           |
| add                            |
| add_months                     |
| adddate                        |
| aes_decrypt                    |
......
| years_add                      |
| years_diff                     |
| years_sub                      |
| yearweek                       |
+--------------------------------+
302 rows in set (0.00 sec)

查看某內(nèi)置函數(shù)的具體信息:

mysql> show full builtin functions in test like 'year';
+----------------+-------------+---------------+-------------------+------------------------------------------------------------------------------------------------------------------------------+
| Signature      | Return Type | Function Type | Intermediate Type | Properties                                                                                                                   |
+----------------+-------------+---------------+-------------------+------------------------------------------------------------------------------------------------------------------------------+
| year(DATETIME) | INT         | Scalar        | NULL              | {"symbol":"_ZN5doris18TimestampFunctions4yearEPN9doris_udf15FunctionContextERKNS1_11DateTimeValE","object_file":"","md5":""} |
+----------------+-------------+---------------+-------------------+------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.05 sec)

更多請參見官網(wǎng)圈纺。

參考:
https://blog.csdn.net/qq_37475168/article/details/125754794

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末秦忿,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子蛾娶,更是在濱河造成了極大的恐慌灯谣,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,366評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蛔琅,死亡現(xiàn)場離奇詭異胎许,居然都是意外死亡,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,521評論 3 395
  • 文/潘曉璐 我一進店門辜窑,熙熙樓的掌柜王于貴愁眉苦臉地迎上來钩述,“玉大人,你說我怎么就攤上這事穆碎⊙揽保” “怎么了?”我有些...
    開封第一講書人閱讀 165,689評論 0 356
  • 文/不壞的土叔 我叫張陵惨远,是天一觀的道長谜悟。 經(jīng)常有香客問我,道長北秽,這世上最難降的妖魔是什么葡幸? 我笑而不...
    開封第一講書人閱讀 58,925評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮贺氓,結(jié)果婚禮上蔚叨,老公的妹妹穿的比我還像新娘。我一直安慰自己辙培,他們只是感情好蔑水,可當我...
    茶點故事閱讀 67,942評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著扬蕊,像睡著了一般搀别。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上尾抑,一...
    開封第一講書人閱讀 51,727評論 1 305
  • 那天歇父,我揣著相機與錄音,去河邊找鬼再愈。 笑死榜苫,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的翎冲。 我是一名探鬼主播垂睬,決...
    沈念sama閱讀 40,447評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼抗悍!你這毒婦竟也來了驹饺?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,349評論 0 276
  • 序言:老撾萬榮一對情侶失蹤檐春,失蹤者是張志新(化名)和其女友劉穎逻淌,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體疟暖,經(jīng)...
    沈念sama閱讀 45,820評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡卡儒,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,990評論 3 337
  • 正文 我和宋清朗相戀三年田柔,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片骨望。...
    茶點故事閱讀 40,127評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡硬爆,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出擎鸠,到底是詐尸還是另有隱情缀磕,我是刑警寧澤,帶...
    沈念sama閱讀 35,812評論 5 346
  • 正文 年R本政府宣布劣光,位于F島的核電站袜蚕,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏绢涡。R本人自食惡果不足惜牲剃,卻給世界環(huán)境...
    茶點故事閱讀 41,471評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望雄可。 院中可真熱鬧凿傅,春花似錦、人聲如沸数苫。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,017評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽虐急。三九已至箱残,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間止吁,已是汗流浹背疚宇。 一陣腳步聲響...
    開封第一講書人閱讀 33,142評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留赏殃,地道東北人。 一個月前我還...
    沈念sama閱讀 48,388評論 3 373
  • 正文 我出身青樓间涵,卻偏偏與公主長得像仁热,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子勾哩,可洞房花燭夜當晚...
    茶點故事閱讀 45,066評論 2 355

推薦閱讀更多精彩內(nèi)容