寫在前面:
Presto 是facebook開發(fā)的一款開源大數(shù)據(jù)OLAP引擎况木,基于內存計算和作業(yè)分發(fā),直接使用sql分析,不需要數(shù)據(jù)預處理陶贼,類似于impala和sparksql,而且它本身不保存任何數(shù)據(jù)待秃,通過連接數(shù)據(jù)源取數(shù)拜秧。
在Presto的架構中,主要有幾種角色
Coordinator:
Coordinator服務器是用來解析語句章郁,執(zhí)行計劃分析和管理Presto的worker結點枉氮。Presto安裝必須有一個Coordinator和多個worker。
Coordinator跟蹤每個work的活動情況并協(xié)調查詢語句的執(zhí)行暖庄。 Coordinator為每個查詢建立模型聊替,模型包含多個stage,每個stage再轉為task分發(fā)到不同的worker上執(zhí)行培廓。Coordinator再做結果的聚合惹悄。
Worker:
Worker是負責執(zhí)行任務和處理數(shù)據(jù)。
Discovery Server:
通常內嵌于coordinator節(jié)點中肩钠,也可以單獨部署泣港,用于Worker的注冊象缀,讓Coordinator知道Worker有多少,地址是啥
問題:
前面提到爷速,presto集群是基于內存和作業(yè)并行處理來執(zhí)行分析任務央星,對于機器不多的單個集群來講,并發(fā)很低惫东,不過這倒是次要的莉给,很多情況下,會出現(xiàn)cpu和內存打滿的情形廉沮,這就直接影響到了可用性(最主要的原因是窮颓遏,機器的硬件都是低配且機器只有三臺),而最為明顯的瓶頸就首先出現(xiàn)在了Coordinator上滞时,多條查詢語句并發(fā)執(zhí)行時叁幢,有一些直接被阻塞了,而且內存直接打滿坪稽,通過jstat命令查看曼玩,GC就沒停下來過,但是Worker的話窒百,都還好
開始我的設想是黍判,增加一臺協(xié)調者,使用集群的Discovery Server篙梢,然后使用nginx做負載顷帖,將請求分流到兩臺協(xié)調者上,以減輕Coordinator壓力同時增強并發(fā)能力
nginx.conf
upstream backend {
server 10.100.218.110:8090;
server 10.101.126.93:8090;
}
server {
listen 8060;
server_name tt;
location / {
root html;
index index.html index.htm;
proxy_pass http://backend;
proxy_redirect off;
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
Coordinator-1(最開始的協(xié)調者) config:
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8090
discovery-server.enabled=true
discovery.uri=http://10.100.218.110:8090
query.max-memory-per-node=6GB
query.max-total-memory-per-node=8GB
query.low-memory-killer.policy=total-reservation
http-server.max-request-header-size=32MB
http-server.max-response-header-size=32MB
task.concurrency=16
Coordinator-2(新加入的協(xié)調者) config:
coordinator=true
node-scheduler.include-coordinator=false
http-server.http.port=8090
discovery-server.enabled=false
discovery.uri=http://10.100.218.110:8090
query.max-memory-per-node=4GB
query.max-total-memory-per-node=5GB
query.low-memory-killer.policy=total-reservation
http-server.max-request-header-size=32MB
http-server.max-response-header-size=32MB
task.concurrency=16
但是渤滞,出問題了贬墩,所有查詢都失敗,原因是presto-jdbc執(zhí)行查詢請求并不只是發(fā)一個http請求就完事兒妄呕,而是會有多個請求去發(fā)送預處理陶舞,賦值,查詢隊列狀況趴腋,獲取執(zhí)行結果等等吊说。通過Fiddler抓包的結果也驗證了這一點
要想解決這個問題就必須將這些請求路由到同一個Coordinator節(jié)點上,我預先設想的是在presto-jdbc驅動發(fā)送請求的時候攔截下來优炬,然后在Header里面強行塞線程id,然后nginx再根據(jù)每個請求的線程id來求模做路由
后面發(fā)現(xiàn)厅贪,在應用端做路由也是可行的,就是注入兩個數(shù)據(jù)源蠢护,獲取連接的時候根據(jù)線程id求模來決定走哪個連接
PrestoRouteDataSource (多數(shù)據(jù)源實現(xiàn))
import com.zaxxer.hikari.HikariDataSource;
import javax.sql.DataSource;
import java.io.PrintWriter;
import java.sql.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Logger;
public class PrestoRouteDataSource implements DataSource {
private CopyOnWriteArrayList<DataSource> corAddr = new CopyOnWriteArrayList<DataSource>();
public PrestoRouteDataSource(String addr) {
String[] split = addr.split(";");
for (String s : split) {
HikariDataSource hikariDataSource = new HikariDataSource();
hikariDataSource.setJdbcUrl(s);
hikariDataSource.setUsername("root");
hikariDataSource.setDriverClassName("com.facebook.presto.jdbc.PrestoDriver");
hikariDataSource.setPoolName("presto:" + s);
corAddr.add(hikariDataSource);
}
}
private DataSource getPrestoRouteDataSource(){
long sessionId = Thread.currentThread().getId();
String as = String.valueOf(sessionId);
int subIndex = (as.length()>=4)?(as.length()-3):(as.length()-1);
int delSessionId = Integer.parseInt(as.substring(subIndex));
int index = delSessionId % corAddr.size() ;
DataSource dataSource = corAddr.get(index);
return dataSource ;
}
@Override
public Connection getConnection() throws SQLException {
return getPrestoRouteDataSource().getConnection();
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
return getPrestoRouteDataSource().getConnection();
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public PrintWriter getLogWriter() throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public void setLogWriter(PrintWriter out) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public void setLoginTimeout(int seconds) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public int getLoginTimeout() throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
throw new UnsupportedOperationException();
}
}
數(shù)據(jù)源配置
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import javax.sql.DataSource;
import java.io.IOException;
import java.util.*;
@Configuration
@MapperScan(basePackages = {"lvye.java.datamiddle.dao.presto"}, sqlSessionFactoryRef = "prestoSqlSessionFactory")
public class PerstoDataSourceConfig {
@Value("${presto.jdbc-url}")
private String url;
@Bean(name = "prestoDataSource")
public DataSource prestoDataSource() {
return new PrestoRouteDataSource(url);
}
@Bean(name = "prestoSqlSessionFactory")
public SqlSessionFactory mallSqlSessionFactory(@Qualifier("prestoDataSource") DataSource mallDataSource)
throws Exception {
final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
sessionFactory.setTypeAliasesPackage("lvye.java.datamiddle.model.entity");
sessionFactory.setDataSource(mallDataSource);
sessionFactory.setMapperLocations(resolveMapperLocations());
try {
//開啟駝峰命名轉換
Objects.requireNonNull(sessionFactory.getObject()).getConfiguration().setMapUnderscoreToCamelCase(true);
sessionFactory.getObject().getConfiguration().setJdbcTypeForNull(null);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
return sessionFactory.getObject();
}
private Resource[] resolveMapperLocations() {
ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
List<String> mapperLocations = new ArrayList<>();
mapperLocations.add("classpath*:/mapper/presto/*.xml");
List<Resource> resources = new ArrayList<>();
for (String mapperLocation : mapperLocations) {
try {
Resource[] mappers = resourceResolver.getResources(mapperLocation);
resources.addAll(Arrays.asList(mappers));
} catch (IOException e) {
e.printStackTrace();
}
}
return resources.toArray(new Resource[0]);
}
@Bean(name = "prestoTransactionManager")
public DataSourceTransactionManager mallTransactionManager(@Qualifier("prestoDataSource") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Bean(name = "prestoTemplate")
public SqlSessionTemplate prestoJdbcTemplate(@Qualifier("prestoSqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
application.properties
presto.jdbc-url=jdbc:presto://10.100.218.110:8090/kudu/tclydm;jdbc:presto://10.101.126.93:8090/kudu/tclydm
測試1(多協(xié)調者)
測試2(單協(xié)調者)
性能對比:
(多協(xié)調者)總耗時 37.66s 平均耗時:3.13s 最大耗時:6.67s 最小耗時:764ms
(單協(xié)調者)總耗時 131.01s 平均耗時:10.91s 最大耗時:15.78s 最小耗時:1.6s
寫在后面:presto官方顯然也意識到了這個問題,多協(xié)調者集群目前加入了路線圖养涮,目前版本還沒有發(fā)布