Presto多協(xié)調者實現(xiàn)

寫在前面:
Presto 是facebook開發(fā)的一款開源大數(shù)據(jù)OLAP引擎况木,基于內存計算和作業(yè)分發(fā),直接使用sql分析,不需要數(shù)據(jù)預處理陶贼,類似于impala和sparksql,而且它本身不保存任何數(shù)據(jù)待秃,通過連接數(shù)據(jù)源取數(shù)拜秧。

在Presto的架構中,主要有幾種角色

Coordinator:
Coordinator服務器是用來解析語句章郁,執(zhí)行計劃分析和管理Presto的worker結點枉氮。Presto安裝必須有一個Coordinator和多個worker。
Coordinator跟蹤每個work的活動情況并協(xié)調查詢語句的執(zhí)行暖庄。 Coordinator為每個查詢建立模型聊替,模型包含多個stage,每個stage再轉為task分發(fā)到不同的worker上執(zhí)行培廓。Coordinator再做結果的聚合惹悄。

Worker:
Worker是負責執(zhí)行任務和處理數(shù)據(jù)。

Discovery Server:
通常內嵌于coordinator節(jié)點中肩钠,也可以單獨部署泣港,用于Worker的注冊象缀,讓Coordinator知道Worker有多少,地址是啥

問題:
前面提到爷速,presto集群是基于內存和作業(yè)并行處理來執(zhí)行分析任務央星,對于機器不多的單個集群來講,并發(fā)很低惫东,不過這倒是次要的莉给,很多情況下,會出現(xiàn)cpu和內存打滿的情形廉沮,這就直接影響到了可用性(最主要的原因是窮颓遏,機器的硬件都是低配且機器只有三臺),而最為明顯的瓶頸就首先出現(xiàn)在了Coordinator上滞时,多條查詢語句并發(fā)執(zhí)行時叁幢,有一些直接被阻塞了,而且內存直接打滿坪稽,通過jstat命令查看曼玩,GC就沒停下來過,但是Worker的話窒百,都還好

屏幕截圖 2021-03-31 162921.jpg

開始我的設想是黍判,增加一臺協(xié)調者,使用集群的Discovery Server篙梢,然后使用nginx做負載顷帖,將請求分流到兩臺協(xié)調者上,以減輕Coordinator壓力同時增強并發(fā)能力

nginx.conf

   upstream backend {
      server 10.100.218.110:8090;
      server 10.101.126.93:8090;
    }


    server {
        listen       8060;
        server_name  tt;

        location / {
            root   html;
            index  index.html index.htm;
            proxy_pass http://backend;
            proxy_redirect off;
            proxy_set_header Host $http_host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        }

        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }

    }

Coordinator-1(最開始的協(xié)調者) config:

coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8090
discovery-server.enabled=true
discovery.uri=http://10.100.218.110:8090
query.max-memory-per-node=6GB
query.max-total-memory-per-node=8GB
query.low-memory-killer.policy=total-reservation
http-server.max-request-header-size=32MB
http-server.max-response-header-size=32MB
task.concurrency=16

Coordinator-2(新加入的協(xié)調者) config:

coordinator=true
node-scheduler.include-coordinator=false
http-server.http.port=8090
discovery-server.enabled=false
discovery.uri=http://10.100.218.110:8090
query.max-memory-per-node=4GB
query.max-total-memory-per-node=5GB
query.low-memory-killer.policy=total-reservation
http-server.max-request-header-size=32MB
http-server.max-response-header-size=32MB
task.concurrency=16

但是渤滞,出問題了贬墩,所有查詢都失敗,原因是presto-jdbc執(zhí)行查詢請求并不只是發(fā)一個http請求就完事兒妄呕,而是會有多個請求去發(fā)送預處理陶舞,賦值,查詢隊列狀況趴腋,獲取執(zhí)行結果等等吊说。通過Fiddler抓包的結果也驗證了這一點

企業(yè)微信截圖_16171750768073.png

要想解決這個問題就必須將這些請求路由到同一個Coordinator節(jié)點上,我預先設想的是在presto-jdbc驅動發(fā)送請求的時候攔截下來优炬,然后在Header里面強行塞線程id,然后nginx再根據(jù)每個請求的線程id來求模做路由

后面發(fā)現(xiàn)厅贪,在應用端做路由也是可行的,就是注入兩個數(shù)據(jù)源蠢护,獲取連接的時候根據(jù)線程id求模來決定走哪個連接

PrestoRouteDataSource (多數(shù)據(jù)源實現(xiàn))


import com.zaxxer.hikari.HikariDataSource;

import javax.sql.DataSource;
import java.io.PrintWriter;
import java.sql.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Logger;


public class PrestoRouteDataSource implements DataSource {


    private CopyOnWriteArrayList<DataSource> corAddr = new CopyOnWriteArrayList<DataSource>();


    public PrestoRouteDataSource(String addr) {

            String[] split = addr.split(";");
            for (String s : split) {
                HikariDataSource hikariDataSource = new HikariDataSource();
                hikariDataSource.setJdbcUrl(s);
                hikariDataSource.setUsername("root");
                hikariDataSource.setDriverClassName("com.facebook.presto.jdbc.PrestoDriver");
                hikariDataSource.setPoolName("presto:" + s);
                corAddr.add(hikariDataSource);
            }

    }

    private DataSource getPrestoRouteDataSource(){
        long sessionId =  Thread.currentThread().getId();
        String as = String.valueOf(sessionId);
        int subIndex = (as.length()>=4)?(as.length()-3):(as.length()-1);
        int delSessionId =  Integer.parseInt(as.substring(subIndex));
        int index = delSessionId % corAddr.size() ;
        DataSource dataSource = corAddr.get(index);
        return dataSource ;
    }

    @Override
    public Connection getConnection() throws SQLException {


        return getPrestoRouteDataSource().getConnection();
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        return getPrestoRouteDataSource().getConnection();
    }

    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        throw new UnsupportedOperationException();
    }

    @Override
    public PrintWriter getLogWriter() throws SQLException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void setLogWriter(PrintWriter out) throws SQLException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void setLoginTimeout(int seconds) throws SQLException {
        throw new UnsupportedOperationException();
    }

    @Override
    public int getLoginTimeout() throws SQLException {
        throw new UnsupportedOperationException();
    }

    @Override
    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
        throw new UnsupportedOperationException();
    }
}


數(shù)據(jù)源配置


import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;

import javax.sql.DataSource;
import java.io.IOException;
import java.util.*;


@Configuration
@MapperScan(basePackages = {"lvye.java.datamiddle.dao.presto"}, sqlSessionFactoryRef = "prestoSqlSessionFactory")
public class PerstoDataSourceConfig {

    @Value("${presto.jdbc-url}")
    private String url;

    @Bean(name = "prestoDataSource")
    public DataSource prestoDataSource() {
        return new PrestoRouteDataSource(url);

    }

    @Bean(name = "prestoSqlSessionFactory")
    public SqlSessionFactory mallSqlSessionFactory(@Qualifier("prestoDataSource") DataSource mallDataSource)
            throws Exception {
        final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
        sessionFactory.setTypeAliasesPackage("lvye.java.datamiddle.model.entity");
        sessionFactory.setDataSource(mallDataSource);
        sessionFactory.setMapperLocations(resolveMapperLocations());
        try {
            //開啟駝峰命名轉換
            Objects.requireNonNull(sessionFactory.getObject()).getConfiguration().setMapUnderscoreToCamelCase(true);
            sessionFactory.getObject().getConfiguration().setJdbcTypeForNull(null);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
        return sessionFactory.getObject();
    }

    private Resource[] resolveMapperLocations() {
        ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
        List<String> mapperLocations = new ArrayList<>();
        mapperLocations.add("classpath*:/mapper/presto/*.xml");
        List<Resource> resources = new ArrayList<>();
        for (String mapperLocation : mapperLocations) {
            try {
                Resource[] mappers = resourceResolver.getResources(mapperLocation);
                resources.addAll(Arrays.asList(mappers));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return resources.toArray(new Resource[0]);
    }


    @Bean(name = "prestoTransactionManager")
    public DataSourceTransactionManager mallTransactionManager(@Qualifier("prestoDataSource") DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

    @Bean(name = "prestoTemplate")
    public SqlSessionTemplate prestoJdbcTemplate(@Qualifier("prestoSqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
        return new SqlSessionTemplate(sqlSessionFactory);
    }

}


application.properties

presto.jdbc-url=jdbc:presto://10.100.218.110:8090/kudu/tclydm;jdbc:presto://10.101.126.93:8090/kudu/tclydm

測試1(多協(xié)調者)


multic.jpg

測試2(單協(xié)調者)


single.jpg

性能對比:
(多協(xié)調者)總耗時 37.66s 平均耗時:3.13s 最大耗時:6.67s 最小耗時:764ms

(單協(xié)調者)總耗時 131.01s 平均耗時:10.91s 最大耗時:15.78s 最小耗時:1.6s

寫在后面:presto官方顯然也意識到了這個問題,多協(xié)調者集群目前加入了路線圖养涮,目前版本還沒有發(fā)布

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末葵硕,一起剝皮案震驚了整個濱河市眉抬,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌懈凹,老刑警劉巖蜀变,帶你破解...
    沈念sama閱讀 219,589評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異介评,居然都是意外死亡库北,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,615評論 3 396
  • 文/潘曉璐 我一進店門们陆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來寒瓦,“玉大人,你說我怎么就攤上這事坪仇≡友” “怎么了?”我有些...
    開封第一講書人閱讀 165,933評論 0 356
  • 文/不壞的土叔 我叫張陵椅文,是天一觀的道長喂很。 經常有香客問我,道長皆刺,這世上最難降的妖魔是什么恤筛? 我笑而不...
    開封第一講書人閱讀 58,976評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮芹橡,結果婚禮上毒坛,老公的妹妹穿的比我還像新娘。我一直安慰自己林说,他們只是感情好煎殷,可當我...
    茶點故事閱讀 67,999評論 6 393
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著腿箩,像睡著了一般豪直。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上珠移,一...
    開封第一講書人閱讀 51,775評論 1 307
  • 那天弓乙,我揣著相機與錄音,去河邊找鬼钧惧。 笑死暇韧,一個胖子當著我的面吹牛,可吹牛的內容都是我干的浓瞪。 我是一名探鬼主播懈玻,決...
    沈念sama閱讀 40,474評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼乾颁!你這毒婦竟也來了涂乌?” 一聲冷哼從身側響起艺栈,我...
    開封第一講書人閱讀 39,359評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎湾盒,沒想到半個月后湿右,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 45,854評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡罚勾,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,007評論 3 338
  • 正文 我和宋清朗相戀三年毅人,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片荧库。...
    茶點故事閱讀 40,146評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡堰塌,死狀恐怖,靈堂內的尸體忽然破棺而出分衫,到底是詐尸還是另有隱情场刑,我是刑警寧澤,帶...
    沈念sama閱讀 35,826評論 5 346
  • 正文 年R本政府宣布蚪战,位于F島的核電站牵现,受9級特大地震影響,放射性物質發(fā)生泄漏邀桑。R本人自食惡果不足惜瞎疼,卻給世界環(huán)境...
    茶點故事閱讀 41,484評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望壁畸。 院中可真熱鬧贼急,春花似錦、人聲如沸捏萍。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,029評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽令杈。三九已至走敌,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間逗噩,已是汗流浹背掉丽。 一陣腳步聲響...
    開封第一講書人閱讀 33,153評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留异雁,地道東北人捶障。 一個月前我還...
    沈念sama閱讀 48,420評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像片迅,于是被迫代替她去往敵國和親残邀。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,107評論 2 356

推薦閱讀更多精彩內容