前言
從Servlet3規(guī)范出來以后绢涡,利用Servlet3支持的異步特性,我們創(chuàng)建異步上下文asyncContext之后將它保存下來遣疯,同時不釋放雄可,那么這樣就達到了長連接的目的。同時在配合tomcat nio的使用缠犀,利用Servlet3構(gòu)建一個http長連接推送系統(tǒng)就有了支持基礎(chǔ)数苫,本篇文章將重點介紹基于Servlet3構(gòu)建http長連接推送系統(tǒng)的實踐。有關(guān)Servlet3異步的詳細介紹可以參看《servlet3異步原理與實踐》辨液。
一虐急、WEB網(wǎng)絡結(jié)構(gòu)及配置
1.1、網(wǎng)絡結(jié)構(gòu)
用戶訪問vip-->vip發(fā)布在lvs上-->lvs將請求轉(zhuǎn)發(fā)給后端的haproxy-->haproxy再把請求代理轉(zhuǎn)發(fā)給后端的nginx滔迈。vip實際路由發(fā)布在lvs上止吁,但是vip配置屬性在haproxy上(比如ACL, 域名,規(guī)則之類)
這里lvs轉(zhuǎn)發(fā)給后端的haproxy燎悍,用戶請求經(jīng)過lvs敬惦,但是響應是haproxy直接反饋給客戶端的,這也就是lvs的dr模式谈山。
1.2俄删、基本配置
我們知道http連接的特點就是一個request,一個response,然后關(guān)閉連接畴椰。這個過程包括建立連接和關(guān)閉連接臊诊。再往深處說就是調(diào)用了TCP/IP協(xié)議的三次握手,TCP協(xié)議多次傳輸斜脂,以及關(guān)閉連接的時候四次握手抓艳。頻繁的做這些操作肯定很耗費系統(tǒng)的資源。從HTTP1.1以后秽褒,開始支持keepalive 壶硅,比如瀏覽器一旦與服務器建立連接后,會保持住一段時間销斟,也就是減少了上面的握手和傳輸?shù)拇螖?shù),在這個時間段內(nèi)傳輸數(shù)據(jù)都是復用同一個連接椒舵。當客戶端主動告知關(guān)閉蚂踊,或者達到了TCP關(guān)閉的條件,TCP/IP再關(guān)閉笔宿。那么通過HTTP keepalive 機制就可以讓TCP連接保持住犁钟,具體保持多長時間可以通過參數(shù)來設置,下文會有介紹泼橘。
如果要保持長連接涝动,那么根據(jù)上圖的結(jié)構(gòu),瀏覽器與haproxy之間保持長連接(timeout http-keep-alive),haproxy與nginx之間保持長連接,nginx與tomcat之間保持長連接炬灭。我們的web應用架構(gòu)一般都是如上圖所示醋粟,會包含LVS、轉(zhuǎn)發(fā)重归、反向代理米愿。但簡單起來說就是nginx+tomcat,也就是虛線框內(nèi)標識的鼻吮,其實我們研發(fā)人員能接觸到的也是這兩層育苟,其余由運維和網(wǎng)絡組的同學來維護。那么我重點介紹一下nginx層的配置參數(shù)椎木。
http {
//...
keepalive_timeout 3600s; //Nginx 默認是支持 keepalive的违柏,是通過 keepalive_timeout 設置的,默認值是75s香椎。它表示在長連接開啟的情況下漱竖,在75s內(nèi)如果沒有 http 請求,則關(guān)閉長連接(其實就是關(guān)閉 tcp)
keepalive_requests 800; //此值容易被忽略士鸥,它是值在 keepalive_timeout 的時間范圍內(nèi)闲孤,一個長連接最大允許的請求次數(shù),如果超過此值,也會關(guān)閉此長連接讼积。默認值為100肥照。
gzip off; //這個在1.3中敘述
//...
upstream TEST_BACKEND {
server 192.168.1.1:8080 weight=1 max_fails=2 fail_timeout=30s;
server 192.168.1.2:8080 weight=1 max_fails=2 fail_timeout=30s;
keepalive 1000; //此處keepalive的含義不是開啟、關(guān)閉長連接的開關(guān)勤众;也不是用來設置超時的timeout舆绎;更不是設置長連接池最大連接數(shù);而是連接程池中最大空閑連接的數(shù)量
}
server {
listen 8080 default_server;
server_name "";
location / {
proxy_pass http://TEST_BACKEND;
//...
proxy_http_version 1.1; //指定 HTTP 版本,防止 1.0 版本導致 keepalive 無效们颜。
proxy_set_header Connection ""; //清空將客戶端的一些設置吕朵,防止導致 keepalive 無效
//...
}
}
}
1.3、Transfer-Encoding: chunked
普通短連接的時候瀏覽器根據(jù)連接關(guān)閉的狀態(tài)來寫response的內(nèi)容窥突。在長連接下努溃,一段時間內(nèi)傳輸?shù)膬?nèi)容,連接都是不關(guān)閉的阻问。因此如果沒有一種機制來告知什么節(jié)點吐出內(nèi)容梧税,瀏覽器就只能一直等待后面是否還有數(shù)據(jù),則遲遲不會寫response的內(nèi)容称近。那么我們可以想到利用Content-Length在傳輸之前標識一個包的大小第队,但是對于動態(tài)輸出的內(nèi)容,傳輸之前就不太好判斷Content-Length的長度刨秆。在HTTP1.1最新的規(guī)范中定義了一種傳輸方式凳谦,就是chunked,分塊編碼衡未。請求頭部加入 Transfer-Encoding: chunked 之后尸执,就代表這個報文采用了分塊編碼。報文中的實體需要改為用一系列分塊來傳輸眠屎。每個分塊包含十六進制的長度值和數(shù)據(jù)剔交,長度值獨占一行,長度不包括它結(jié)尾的 CRLF(\r\n)改衩,也不包括分塊數(shù)據(jù)結(jié)尾的 CRLF岖常。最后一個分塊長度值必須為 0,對應的分塊數(shù)據(jù)沒有內(nèi)容葫督,表示實體結(jié)束竭鞍。這樣在長連接下動態(tài)輸出內(nèi)容的時候瀏覽器就能夠判斷當前這次報文結(jié)束的位置了。
在1.2中我們留了一個gzip沒有介紹橄镜,我們知道開啟gzip偎快,在文本傳輸?shù)那闆r下,所需流量大約會降至1/4-1/3洽胶。在gzip關(guān)閉的情況下晒夹,以前長連接沒有任何問題,但是如果gzip打開,長連接則會失效丐怯。這是因為整個壓縮過程在內(nèi)存中完成喷好,是流式的。也就是說读跷,Nginx 不會等文件 gzip 完成再返回響應梗搅,而是邊壓縮邊響應,這樣可以顯著提高 TTFB(Time To First Byte效览,首字節(jié)時間无切,WEB 性能優(yōu)化重要指標)。這樣唯一的問題是丐枉,Nginx 開始返回響應時哆键,它無法知道將要傳輸?shù)奈募罱K有多大,
也就是無法給出 Content-Length 這個響應頭部瘦锹。因此根據(jù)chunked傳輸方式原理洼哎,解決了既可壓縮傳輸也能支持長連接方式傳輸了。
二沼本、HTTP長連接系統(tǒng)組成結(jié)構(gòu)
2.1、SESSION管理
SESSION是客戶端到服務端的一次會話或者說是連接會話锭沟,會話信息中保存了用戶PIN抽兆、連接創(chuàng)建時間、這次request產(chǎn)生的AsyncContext上下文信息族淮。我們會將會話信息保存到內(nèi)存一份辫红,
private Map<String, Session> sessions = new ConcurrentHashMap<String, Session>(); MAP的key為用戶PIN。同時把這份HASH數(shù)據(jù)也保存到redis一份祝辣,并設置好過期時間贴妻,具體設置多久沒有固定的標準,我們設置是8小時蝙斜。這個在心跳邏輯中名惩,如果沒有心跳會將SESSION信息刪除。
2.2孕荠、心跳
心跳的目的是判斷連接客戶端是否還活著娩鹉,隔一段時間比如5s發(fā)一次心跳包,一般是從客戶端往服務端發(fā)送心跳包稚伍,我們現(xiàn)在HTTP長連接是從服務端往客戶端發(fā)送弯予,當初的想法是節(jié)省客戶端資源。心跳的邏輯是從當前服務器內(nèi)存中輪詢出所有的會話信息个曙,在發(fā)送心跳包后如果收到錯誤信息則標記會失敗锈嫩,關(guān)閉上下文asyncContext.complete();this.asyncContext = null;同時從會話列表中刪除,內(nèi)存和redis中都要刪除。
2.3呼寸、消息接收
消息推送系統(tǒng)負責消息會話的創(chuàng)建艳汽、保持、心跳等舔、通知推送骚灸。另外一部分就是通過MQ接收業(yè)務變更信息,通過MQ的廣播機制保證每臺推送系統(tǒng)服務器都能夠收到業(yè)務變更信息慌植。
2.4甚牲、消息推送
利用了MQ的廣播所有的服務器都會收到消息,那么推送的時候是如何找到需要哪一臺服務器來負責推送任務呢蝶柿,在創(chuàng)建會話的時候我們將用戶會話信息保存到了本臺服務器的內(nèi)存中丈钙,那么只需要判斷消息中的USERPIN是否在本機內(nèi)存中即可。如果不在本機內(nèi)存直接丟棄該條消息交汤。通過MQ接收到業(yè)務信息雏赦,解析出USERPIN,再根據(jù)USERPIN找到會話芙扎,拿到asyncContext星岗,然后將通知包發(fā)送給客戶端。
2.5戒洼、消息追蹤
整個消息推送鏈相對比較長俏橘,需要做到對每個環(huán)節(jié)的埋點和跟蹤,便與后續(xù)問題的跟蹤處理圈浇。在業(yè)務中是通過kafka+hbase的方式寥掐,系統(tǒng)中把埋點數(shù)據(jù)寫到本地,由采集器將數(shù)據(jù)發(fā)送到kafka磷蜀,進而消費kafka插入到hbase集群召耘。
三、HTTP長連接系統(tǒng)時序調(diào)用
結(jié)合第二節(jié)和本節(jié)的時序圖我們清楚的知道實現(xiàn)一個推送系統(tǒng)主要包含會話維護褐隆、心跳污它、消息接收、消息推送妓灌,這其中共涉及以下三個數(shù)據(jù)包
創(chuàng)建會話連接包:{"protocol":1,"time":1510210650650,"state":"registered"}
心跳包:{"protocol":0,"time":1510211080780}
發(fā)送通知包:{"protocol":2,"time":1448610190241,"cmd":110001}
接下來看下重要環(huán)節(jié)的代碼實現(xiàn):
3.1轨蛤、創(chuàng)建會話(連接)
public Session createSession(String sessionId, HttpServletRequest request, HttpServletResponse response) {
//省略代碼...
try {
//省略代碼...
session = new HttpStreamingSession();
session.setSessionId(sessionId);
session.setValid(true);
session.setMaxInactiveInterval(this.getMaxInactiveInterval());
session.setCreationTime(System.currentTimeMillis());
session.setLastAccessedTime(System.currentTimeMillis());
session.setSessionManager(this);
session.setConnection(createHttpConnection(session, request, response));
//省略代碼...
return session;
} catch (Exception e) {
//省略代碼...
} finally {
//省略代碼...
}
return null;
}
public void connect(){
//省略代碼...
if (isClosed()) {
PushException e = new PushException("use a closed connection " + connectionId);
this.fireError(e);
}
try {
AsyncContext ac = request.startAsync();//開啟上下文
ac.setTimeout(this.asyncTimeout);
ac.addListener(new AsyncAdapter() {
/**
*
* @param asyncevent
*
**/
@Override
public void onError(AsyncEvent asyncevent) throws IOException {
session.close();
}
/**
*
* @param asyncevent
*
**/
@Override
public void onTimeout(AsyncEvent asyncevent) throws IOException {
session.close();
}
});
this.asyncContext = ac;//保存上下文
} catch (Exception e) {
this.fireError(new PushException("StartAsync exception! May be the servlet or filter is not async.", e));
} finally {
//省略代碼...
}
}
3.2、心跳邏輯
public void run() {//線程循環(huán)發(fā)送
while (!this.stop) {
try {
Thread.sleep(getCheckPeriod());//停5秒
} catch (InterruptedException e) {
}
if(this.stop)
break;
//省略代碼...
try {
//省略代碼...
Map<String, Set<String>> result = heartbeatBroadcast(MessageProtocol.generateHeartBeat());//調(diào)用心跳方法
//省略代碼...
} catch (Exception e) {
//省略代碼...
_logger.error("check destination! ", e);
} finally {
//省略代碼...
}
}
}
protected Map<String, Set<String>> heartbeatBroadcast(String msg) {
if(isEmpty())
return null;
Map<String, Set<String>> result = new HashMap<String, Set<String>>(2);
//省略代碼...
for(Iterator<String> it = httpSessionManager.getSessionKeys().iterator(); it.hasNext(); ) {
try {
identity = it.next();
session = httpSessionManager.getSession(identity);
if(session.expire()) {//只有 session 過期后才發(fā)送心跳
_logger.info("--befor hear beat --SessionId:"+session.getSessionId());
session.getConnection().send(msg);
session.access();
//省略代碼...
}
} catch (Exception e) {
//省略代碼...
}
}
return result;
}
3.3虫埂、消息接收
public void onMessage(List<Message> messages) throws Exception {
if (messages == null || messages.isEmpty()) {
return;
}
for (Message message : messages) {
//省略代碼...
//處理消息
}
}
3.4祥山、消息推送
public void sendMessage(String key,String context) throws DispatchException, PushException {
?
//獲取USERPIN
String userPin = mem.hget(key,SessionProtocol.SESSION_FIELD_LOCALHOST);
if(!localhostUserPin.equals(localhostRedis)){//如果消息中的USERPIN不在當前主機內(nèi)存中則直接丟棄該消息,由其它主機來消費發(fā)送
return ;
}
Session session = httpSessionManager.getSession(key);
if (session == null) {
_logger.info("session " + key + " no exist!");
return;
}
try {
//省略代碼...
session.getConnection().send(context);
session.access();
} catch (PushException e) {
session.close();
throw new PushException(e);
} catch (Exception e) {
session.close();
throw new PushException(e);
}
}
四掉伏、半推半拉
4.1缝呕、消息存儲
消息實體保存到redis集群澳窑,根據(jù)每個UERPIN組成N個HASH結(jié)構(gòu)的數(shù)據(jù)體,如上圖所示數(shù)據(jù)結(jié)構(gòu)供常。因為USERPIN的數(shù)量很大摊聋,會均勻的散落到redis集群里,大量用戶訪問不會造成熱點問題栈暇。不過有些大用戶數(shù)據(jù)量會比較大麻裁,訪問頻率又比較高的,可以做二次HASH源祈。
4.2煎源、拉取方式
我們在長連接中推送的是消息通知,并不是消息實體香缺。在第三節(jié)中當瀏覽器收到通知后會發(fā)送一次http請求帶上CMD標識手销,服務器接收到USERPIN+CMD標識到對應的redis集群中查詢數(shù)據(jù),返回給客戶端图张。這也就是我們說的半推半拉方式锋拖,那么我們?yōu)槭裁床恢苯影严嶓w推送過去呢?推送一個簡短的通知命令字祸轮,只是告訴客戶端有數(shù)據(jù)變化兽埃,那么用戶很有可能是不去看的,這種情況下如果直接推送實體數(shù)據(jù)适袜,則會浪費數(shù)據(jù)傳輸讲仰。其實這個類似我們的公眾號,比如我們收到的是一個標題和概要痪蝇。如果我不去點擊則不會發(fā)生文章大量內(nèi)容的數(shù)據(jù)傳輸。
五冕房、系統(tǒng)優(yōu)化
5.1躏啰、NIO
長連接推送系統(tǒng)的最大特點就是服務器要HOLD住大量的連接,這個時候我們首先要考慮的IO模型就是要使用基于I/O復用模型的NIO耙册「基于事件驅(qū)動利用Selector機制使用少量的線程保持住大量的連接是NIO擅長的能力。如果你使用的是tomcat7以下版本详拙,在Connector節(jié)點配置protocol="org.apache.coyote.http11.Http11NioProtocol"帝际,以便啟用Http11NioProtocol協(xié)議。該協(xié)議下默認最大連接數(shù)是10000饶辙,可以重新修改maxConnections的值蹲诀。有關(guān)tomcat nio詳細介紹請參看《深度解讀Tomcat中的NIO模型》。
5.2弃揽、參數(shù)優(yōu)化
一臺Linux服務器可以負載多少個連接脯爪?首先我們來看如何標識一個TCP連接则北?系統(tǒng)是通過一個四元組來識別,(src_ip,src_port,dst_ip,dst_port)即源IP痕慢、源端口尚揣、目標IP、目標端口掖举。比如我們有一臺服務192.168.0.1快骗,開啟端口80.那么所有的客戶端都會連接到這臺服務的80端口上面。有一種誤解塔次,就是我們常說一臺機器有65536個端口方篮,那么承載的連接數(shù)就是65536個,這個說法是極其錯誤的俺叭,這就混淆了源端口和訪問目標端口恭取。我們做壓測的時候,利用壓測客戶端熄守,這個客戶端的連接數(shù)是受到端口數(shù)的限制蜈垮,但是服務器上面的連接數(shù)可以達到成千上萬個,一般可以達到百萬(4C8G配置)裕照,至于上限是多少攒发,需要看優(yōu)化的程度。最重要的一步是修改文件句柄數(shù)量限制晋南。
查看當前用戶允許TCP打開的文件句柄最大數(shù)
ulimit -n
修改文件句柄
vim /etc/security/limits.conf
soft nofile 655350
hard nofile 655350
修改后惠猿,退出終端窗口,重新登錄(不需要重啟服務器)负间,就能看到最新的結(jié)果了偶妖。
還有其他有關(guān)TCP參數(shù)的修改,請參看
《一臺Linux服務器可以負載多少個連接政溃?》
六趾访、測試
在做http長連接測試的時候,無論使用chrome還是Firefox瀏覽器董虱,都因為緩存的原因測試不出長連接下通過web服務動態(tài)吐內(nèi)容的效果扼鞋,所以我們自己寫一個client。
public class HttpConnectionTest {
public static final String URL = "http://push.test.com/async?pin=123";
public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newFixedThreadPool(1);
for(int i=0;i<1;i++){
es.submit(new Runnable() {
public void run() {
String URL=URL+"&client_id="+UUID.randomUUID().toString();
connection(URL);
}
});
}
}
static void connection(String url) {
InputStream is = null;
URLConnection conn = null;
byte[] buf = new byte[1024];
try {
URL a = new URL(url);
conn = a.openConnection();
is = conn.getInputStream();
int ret = 0;
while ((ret = is.read(buf)) > 0) {
processBuf(buf, ret);
}
// close the inputstream
is.close();
} catch (IOException e) {
try {
int respCode = ((HttpURLConnection) conn).getResponseCode();
InputStream es = ((HttpURLConnection) conn).getErrorStream();
int ret = 0;
// read the response body
while ((ret = es.read(buf)) > 0) {
processBuf(buf, ret);
}
// close the errorstream
es.close();
} catch (IOException ex) {
e.printStackTrace();
}
}
}
static void processBuf(byte[] buf, int length) {
System.out.println(new String(buf, 0, length));
}
}
七愤诱、總結(jié)
在這篇文章里我們從web系統(tǒng)的部署結(jié)構(gòu)云头,http1.1和nginx的配置,再到實現(xiàn)一個http長連接系統(tǒng)的組成部分淫半,推送系統(tǒng)的流程時序關(guān)系溃槐,最后說到系統(tǒng)參數(shù)調(diào)整如何來支持海量的連接。當然實現(xiàn)一個類似http長連接推送系統(tǒng)的方式還有其他比如websocket等技術(shù)科吭,但是長連接推送系統(tǒng)的組成部分基本不會變也就是會話連接竿痰、心跳邏輯脆粥、消息接收、消息存儲影涉、消息推送变隔。那么servlet3異步+tomcat nio給我們提供了一個實現(xiàn)http長連接推送的基礎(chǔ)支持與實踐參考。
轉(zhuǎn)載請注明作者及出處蟹倾,并附上鏈接http://www.reibang.com/p/b060bb158631
參考資料:
http://nginx.org/en/docs/http/ngx_http_upstream_module.html#keepalive
https://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.6.1