前言
IM全稱是『Instant Messaging』,中文名是即時通訊聋亡。在這個高度信息化的移動互聯(lián)網(wǎng)時代湖饱,生活中IM類產(chǎn)品已經(jīng)成為必備品,比較有名的如釘釘杀捻、微信、QQ等以IM為核心功能的產(chǎn)品蚓庭。當然目前微信已經(jīng)成長為一個生態(tài)型產(chǎn)品致讥,但其核心功能還是IM。還有一些非以IM系統(tǒng)為核心的應用器赞,最典型的如一些在線游戲垢袱、社交應用,IM也是其重要的功能模塊港柜∏肫酰可以說,帶有社交屬性的應用夏醉,IM功能一定是必不可少的爽锥。
IM系統(tǒng)在互聯(lián)網(wǎng)初期即存在飞涂,其基礎技術架構在這十幾年的發(fā)展中更新迭代多次苏揣,從早期的CS、P2P架構撼短,到現(xiàn)在后臺已經(jīng)演變?yōu)橐粋€復雜的分布式系統(tǒng)靶擦,涉及移動端腮考、網(wǎng)絡、安全和存儲等技術的方方面面玄捕。其支撐的規(guī)模也從早期的少量日活踩蔚,到現(xiàn)在微信這個巨頭最新公布的達到9億的日活的體量。
IM系統(tǒng)中最核心的部分是消息系統(tǒng)馅闽,消息系統(tǒng)中最核心的功能是消息的同步和存儲:
- 消息的同步:將消息完整的赌结、快速的從發(fā)送方傳遞到接收方,就是消息的同步柬姚。消息同步系統(tǒng)最重要的衡量指標就是消息傳遞的實時性拟杉、完整性以及能支撐的消息規(guī)模量承。從功能上來說穴店,一般至少要支持在線和離線推送,高級的IM系統(tǒng)還支持『多端同步』拿穴。
- 消息的存儲:消息存儲即消息的持久化保存泣洞,這里不是指消息在客戶端本地的保存默色,而是指云端的保存腿宰,功能上對應的就是『消息漫游』∷Υ欤『消息漫游』的好處是可以實現(xiàn)賬號在任意端登陸查看所有歷史消息椿每,這也是高級IM系統(tǒng)特有的功能之一。
本篇文章內容主要涉及IM系統(tǒng)中的消息系統(tǒng)架構亦渗,會介紹一種基于TableStore構建的消息同步以及存儲系統(tǒng)的架構實現(xiàn)汁尺,能夠支持消息系統(tǒng)中的高級特性『多端同步』以及『消息漫游』均函。在性能和規(guī)模上,能夠做到全量消息云端存儲洛勉,百萬TPS以及毫秒級延遲的消息同步能力如迟。
架構設計
本章主要會介紹基于TableStore的現(xiàn)代IM消息系統(tǒng)的架構設計,在詳細介紹架構設計之前殷勘,會先介紹一種Timeline邏輯模型此再,來抽象和簡化對IM消息同步和存儲模型的理解输拇。理解了Timeline模型后贤斜,會介紹如何基于此模型對消息的同步以及存儲進行建模『锬ǎ基于Timeline模型蟀给,在實現(xiàn)消息同步和存儲時還會有各方面的技術權衡,例如如何對消息同步常見的讀擴散和寫擴散兩種模型進行對比和選擇拍霜,以及針對Timeline模型的特征如何來選擇底層數(shù)據(jù)庫。
傳統(tǒng)架構 vs 現(xiàn)代架構
上圖是消息系統(tǒng)傳統(tǒng)架構與現(xiàn)代架構的簡單對比
傳統(tǒng)架構下汁政,消息是先同步后存儲缀旁。對于在線的用戶并巍,消息會直接實時同步到在線的接收方,消息同步成功后刽射,并不會進行持久化剃执。而對于離線的用戶或者消息無法實時同步成功時肾档,消息會持久化到離線庫,當接收方重新連接后俗慈,會從離線庫拉取所有未讀消息闺阱。當離線庫中的消息成功同步到接收方后舵变,消息會從離線庫中刪除示血。傳統(tǒng)的消息系統(tǒng)难审,服務端的主要工作是維護發(fā)送方和接收方的連接狀態(tài)亿絮,并提供在線消息同步和離線消息緩存的能力派昧,保證消息一定能夠從發(fā)送方傳遞到接收方蒂萎。服務端不會對消息進行持久化,所以也無法支持消息漫游纳寂。
現(xiàn)代架構下泻拦,消息是先存儲后同步争拐。先存儲后同步的好處是,如果接收方確認接收到了消息隘冲,那這條消息一定是已經(jīng)在云端保存了对嚼。并且消息會有兩個庫來保存绳慎,一個是消息存儲庫杏愤,用于全量保存所有會話的消息,主要用于支持消息漫游通殃。另一個是消息同步庫,主要用于接收方的多端同步堕担。消息從發(fā)送方發(fā)出后霹购,經(jīng)過服務端轉發(fā)朋腋,服務端會先將消息保存到消息存儲庫旭咽,后保存到消息同步庫。完成消息的持久化保存后轿塔,對于在線的接收方勾缭,會直接選擇在線推送宗收。但在線推送并不是一個必須路徑混稽,只是一個更優(yōu)的消息傳遞路徑审胚。對于在線推送失敗或者離線的接收方膳叨,會有另外一個統(tǒng)一的消息同步方式。接收方會主動的向服務端拉取所有未同步消息饿自,但接收方何時來同步以及會在哪些端來同步消息對服務端來說是未知的昭雌,所以要求服務端必須保存所有需要同步到接收方的消息健田,這是消息同步庫的主要作用妓局。對于新的同步設備,會有消息漫游的需求局雄,這是消息存儲庫的主要作用哎榴,在消息存儲庫中尚蝌,可以拉取任意會話的全量歷史消息。
以上是傳統(tǒng)架構和現(xiàn)代架構的一個簡單的對比衣形,現(xiàn)代架構上整個消息的同步和存儲流程谆吴,并沒有變復雜太多苛预,但是其能實現(xiàn)多端同步以及消息漫游∪饶常現(xiàn)代架構中最核心的就是兩個消息庫『消息同步庫』和『消息存儲庫』,是消息同步和存儲最核心的基礎筹吐。而本篇文章接下來的部分丘薛,都是圍繞這兩個庫的設計和實現(xiàn)來展開邦危。
Timeline模型
在分析『消息同步庫』和『消息存儲庫』的設計和實現(xiàn)之前倦蚪,在本章會先介紹一個邏輯模型-Timeline审丘。Timeline模型會幫助我們簡化對消息同步和存儲模型的理解,而消息庫的設計和實現(xiàn)也是圍繞Timeline的特性和需求來展開锅知。
如圖是Timeline模型的一個抽象表述售睹,Timeline可以簡單理解為是一個消息隊列昌妹,但這個消息隊列有如下特性:
- 每個消息擁有一個順序ID(SeqId),在隊列后面的消息的SeqId一定比前面的消息的SeqId大烂叔,也就是保證SeqId一定是增長的蒜鸡,但是不要求嚴格遞增逢防。
- 新的消息永遠在尾部添加蒲讯,保證新的消息的SeqId永遠比已經(jīng)存在隊列中的消息都大判帮。
- 可根據(jù)SeqId隨機定位到具體的某條消息進行讀取,也可以任意讀取某個給定范圍內的所有消息导狡。
有了這些特性后,消息的同步可以拿Timeline來很簡單的實現(xiàn)踩麦。圖中的例子中氓癌,消息發(fā)送方是A贪婉,消息接收方是B,同時B存在多個接收端才顿,分別是B1郑气、B2和B3尾组。A向B發(fā)送消息,消息需要同步到B的多個端呵萨,待同步的消息通過一個Timeline來進行交換甘桑。A向B發(fā)送的所有消息跑杭,都會保存在這個Timeline中咆耿,B的每個接收端都是獨立的從這個Timeline中拉取消息萨螺。每個接收端同步完畢后慰技,都會在本地記錄下最新同步到的消息的SeqId,即最新的一個位點掏颊,作為下次消息同步的起始位點乌叶。服務端不會保存各個端的同步狀態(tài)准浴,各個端均可以在任意時間從任意點開始拉取消息捎稚。
消息漫游也是基于Timeline,和消息同步唯一的區(qū)別是谈撒,消息漫游要求服務端能夠對Timeline內的所有數(shù)據(jù)進行持久化啃匿。
基于Timeline蛆楞,從邏輯模型上能夠很簡單的理解在服務端如何去實現(xiàn)消息同步和存儲豹爹,并支持多端同步和消息漫游這些高級功能臂聋。落地到實現(xiàn)的難點主要在如何將邏輯模型映射到物理模型,Timeline的實現(xiàn)對數(shù)據(jù)庫會有哪些要求艾君?我們應該選擇何種數(shù)據(jù)庫去實現(xiàn)冰垄?這些是接下來會討論到的問題虹茶。
消息存儲模型
如圖是基于Timeline的消息存儲模型蝴罪,消息存儲要求每個會話都對應一個獨立的Timeline要门。如圖例子所示尼啡,A與B/C/D/E/F均發(fā)生了會話询微,每個會話對應一個獨立的Timeline撑毛,每個Timeline內存有這個會話中的所有消息,服務端會對每個Timeline進行持久化斩个。服務端能夠對所有會話Timeline中的全量消息進行持久化驯杜,也就擁有了消息漫游的能力鸽心。
消息同步模型
消息同步模型會比消息存儲模型稍復雜一些顽频,消息的同步一般有讀擴散和寫擴散兩種不同的方式糯景,分別對應不同的Timeline物理模型蟀淮。
如圖是讀擴散和寫擴散兩種不同同步模式下對應的不同的Timeline模型怠惶,按圖中的示例甚疟,A作為消息接收者,其與B/C/D/E/F發(fā)生了會話轧拄,每個會話中的新的消息都需要同步到A的某個端檩电,看下讀擴散和寫擴散兩種模式下消息如何做同步俐末。
讀擴散:消息存儲模型中卓箫,每個會話的Timeline中保存了這個會話的全量消息烹卒。讀擴散的消息同步模式下,每個會話中產(chǎn)生的新的消息逢勾,只需要寫一次到其用于存儲的Timeline中溺拱,接收端從這個Timeline中拉取新的消息盟迟。優(yōu)點是消息只需要寫一次攒菠,相比寫擴散的模式歉闰,能夠大大降低消息寫入次數(shù)和敬,特別是在群消息這種場景下昼弟。但其缺點也比較明顯舱痘,接收端去同步消息的邏輯會相對復雜和低效变骡。接收端需要對每個會話都拉取一次才能獲取全部消息,讀被大大的放大芭逝,并且會產(chǎn)生很多無效的讀塌碌,因為并不是每個會話都會有新消息產(chǎn)生。
寫擴散:寫擴散的消息同步模式旬盯,需要有一個額外的Timeline來專門用于消息同步台妆,通常是每個接收端都會擁有一個獨立的同步Timeline,用于存放需要向這個接收端同步的所有消息胖翰。每個會話中的消息,會產(chǎn)生多次寫萨咳,除了寫入用于消息存儲的會話Timeline懊缺,還需要寫入需要同步到的接收端的同步Timeline。在個人與個人的會話中某弦,消息會被額外寫兩次桐汤,除了寫入這個會話的存儲Timeline,還需要寫入?yún)⑴c這個會話的兩個接收者的同步Timeline靶壮。而在群這個場景下怔毛,寫入會被更加的放大,如果這個群擁有N個參與者腾降,那每條消息都需要額外的寫N次拣度。寫擴散同步模式的優(yōu)點是,在接收端消息同步邏輯會非常簡單螃壤,只需要從其同步Timeline中讀取一次即可抗果,大大降低了消息同步所需的讀的壓力。其缺點就是消息寫入會被放大奸晴,特別是針對群這種場景冤馏。
在IM這種應用場景下,通常會選擇寫擴散這種消息同步模式寄啼。IM場景下逮光,一條消息只會產(chǎn)生一次,但是會被讀取多次墩划,是典型的讀多寫少的場景涕刚,消息的讀寫比例大概是10:1。若使用讀擴散同步模式乙帮,整個系統(tǒng)的讀寫比例會被放大到100:1杜漠。一個優(yōu)化的好的系統(tǒng),必須從設計上去平衡這種讀寫壓力察净,避免讀或寫任意一維觸碰到天花板驾茴。所以IM系統(tǒng)這類場景下,通常會應用寫擴散這種同步模式氢卡,來平衡讀和寫沟涨,將100:1的讀寫比例平衡到30:30。當然寫擴散這種同步模式异吻,還需要處理一些極端場景裹赴,例如萬人大群。針對這種極端寫擴散的場景诀浪,會退化到使用讀擴散棋返。一個簡單的IM系統(tǒng),通常會在產(chǎn)品層面限制這種大群的存在雷猪,而對于一個高級的IM系統(tǒng)睛竣,會采用讀寫擴散混合的同步模式,來滿足這類產(chǎn)品的需求求摇。
消息庫設計
基于Timeline模型射沟,以及Timeline模型在消息存儲和消息同步的應用殊者,我們看下消息同步庫和消息存儲庫的設計。
如圖是基于Timeline的消息庫設計验夯。
- 消息同步庫:消息同步庫用于存儲所有用于消息同步的Timeline猖吴,每個Timeline對應一個接收端,主要用作寫擴散模式的消息同步挥转。這個庫不需要永久保留所有需要同步的消息海蔽,因為消息在同步到所有端后其生命周期就可以結束,就可以被回收绑谣。但是如前面所介紹的党窜,一個實現(xiàn)簡單的多端同步消息系統(tǒng),在服務端不會保存有所有端的同步狀態(tài)借宵,而是依賴端自己主動來做同步幌衣。所以服務端不知道消息何時可以回收,通常的做法是為這個庫里的消息設定一個固定的生命周期壤玫,例如一周或者一個月泼掠,生命周期結束可被淘汰。
- 消息存儲庫:消息存儲庫用于存儲所有會話的Timeline垦细,每個Timeline包含了一個會話中的所有消息择镇。這個庫主要用于消息漫游時拉取某個會話的所有歷史消息,也用于讀擴散模式的消息同步括改。
消息同步庫和消息存儲庫腻豌,對數(shù)據(jù)庫有不同的要求,如何對數(shù)據(jù)庫做選型嘱能,在下面會討論吝梅。
數(shù)據(jù)庫選型
消息系統(tǒng)最核心的兩個庫是消息同步庫和消息存儲庫,兩個庫對數(shù)據(jù)庫有不同的要求:
消息同步庫 | 消息存儲庫 | |
---|---|---|
數(shù)據(jù)模型 | Timeline模型 | Timeline模型 |
寫能力 | 高并發(fā)寫惹骂,十萬級TPS | 高并發(fā)寫苏携,少量讀,萬級TPS |
讀能力 | 高并發(fā)范圍讀对粪,十萬級TPS | 少量范圍讀右冻,千級TPS |
存儲規(guī)模 | 保存一段時間內的同步消息,TB級著拭。保留千萬級的Timeline規(guī)模纱扭。 | 保存全量消息,百TB級儡遮。保留億級的Timeline規(guī)模乳蛾。 |
總結下來,對數(shù)據(jù)庫的要求有如下幾點:
1. 表結構設計能夠滿足Timeline模型的功能要求:不要求關系模型,能夠實現(xiàn)隊列模型肃叶,并能夠支持生成自增的SeqId蹂随。
2. 能夠支持高并發(fā)寫和范圍讀,規(guī)模在十萬級TPS因惭。
3. 能夠保存海量數(shù)據(jù)岳锁,百TB級。
4. 能夠為數(shù)據(jù)定義生命周期筛欢。
阿里云表格存儲(TableStore)是基于LSM存儲引擎的分布式NoSQL數(shù)據(jù)庫浸锨,支持百萬TPS高并發(fā)讀寫唇聘,PB級數(shù)據(jù)存儲版姑,數(shù)據(jù)支持TTL,能夠很好的滿足以上需求迟郎,并且支持自增列剥险,能夠非常完美的設計和實現(xiàn)Timeline的物理模型。
架構實現(xiàn)
本章會以一段非常精簡的代碼宪肖,來展示如何基于TableStore實現(xiàn)Timeline模型表制,并基于Timeline模型進行消息存儲和推送。
這篇文章中給出的代碼控乾,主要目的是為了演示如何能夠實現(xiàn)一個精簡Timeline的最基本功能么介。馬上我們會推出一個完整的Timeline Library,來將基于Timeline進行消息存儲和推送的代碼的開發(fā)變得無比簡單蜕衡。
所有示例代碼基于如下SDK版本:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore</artifactId>
<version>4.3.1</version>
</dependency>[大數(shù)據(jù)/運維/java架構]:649917651
表結構設計
主鍵列名 | 類型 | 描述 |
---|---|---|
timeline_id | String | 用于唯一標識Timeline的ID壤短,在消息同步庫中,通常以接收者的ID作為Timeine ID慨仿。在消息存儲庫中久脯,通常以會話ID作為Timeline ID。在TableStore中镰吆,主鍵第一列作為分區(qū)鍵帘撰,取值需要盡量的散列。 |
seq_id | AUTO_INCREMENT | 利用TableStore自增列的特性万皿,可以很好的用作Timeline的SeqId摧找。 |
public static void main(String[] args) {
String endpoint = "<endpoint>";
String accessId = "<access_id>";
String accessKey = "<access_key>";
String instanceName = "<instance_name>";
SyncClient client = new SyncClient(endpoint, accessId, accessKey, instanceName);
String pushTable = "PushTable";
String storeTable = "StoreTable";
createTimelineTable(client, pushTable);
createTimelineTable(client, storeTable);
client.shutdown();
}
[大數(shù)據(jù)/運維/java架構]:649917651
private static void createTimelineTable(SyncClient client, String tableName) {
TableMeta tableMeta = new TableMeta(tableName);
tableMeta.addPrimaryKeyColumn("timeline_id", PrimaryKeyType.STRING);
tableMeta.addAutoIncrementPrimaryKeyColumn("seq_id");
TableOptions options = new TableOptions();
options.setMaxVersions(1);
options.setTimeToLive(-1); // 配置消息永久保留
CreateTableRequest request = new CreateTableRequest(tableMeta, options);
client.createTable(request);
}
以上是創(chuàng)建Timeline表的示例代碼,總共需要創(chuàng)建兩張表牢硅,一張表作為消息同步庫慰于,名稱為『PushTable』,另一張表作為消息存儲庫唤衫,名稱為『StoreTable』婆赠。
推送和存儲實現(xiàn)
public static void main(String[] args) {
String endpoint = "<endpoint>";
String accessId = "<access_id>";
String accessKey = "<access_key>";
String instanceName = "<instance_name>";
SyncClient client = new SyncClient(endpoint, accessId, accessKey, instanceName);
String pushTable = "PushTable";
String storeTable = "StoreTable";
String groupName = "TableStore(釘釘號:11789671)";
List<String> groupMembers = Arrays.asList("A", "B", "C", "D", "E");
// 群產(chǎn)生新的消息,并且推送給所有的群成員
pushGroupMessages(client, pushTable, storeTable, groupName, groupMembers, "Hello World!");
pushGroupMessages(client, pushTable, storeTable, groupName, groupMembers, "Hello Alibaba!");
pushGroupMessages(client, pushTable, storeTable, groupName, groupMembers, "Hello Aliyun!");
pushGroupMessages(client, pushTable, storeTable, groupName, groupMembers, "Hello TableStore!");
pushGroupMessages(client, pushTable, storeTable, groupName, groupMembers, "Bye!");
client.shutdown();
}[大數(shù)據(jù)/運維/java架構]:649917651
private static void pushGroupMessages(SyncClient client, String pushTable, String storeTable, String groupName, List<String> groupMembers, String message) {
// 先將群消息持久化到存儲Timeline
writeMessage(client, storeTable, groupName, message);
// 通過寫擴散的模式將群消息同步到所有的群成員
for (String groupMember : groupMembers) {
writeMessage(client, pushTable, groupMember, message);
}
}
private static void writeMessage(SyncClient client, String timelineTable, String timelineId, String message) {
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("timeline_id", PrimaryKeyValue.fromString(timelineId))
.addPrimaryKeyColumn("seq_id", PrimaryKeyValue.AUTO_INCREMENT).build();
RowPutChange rowChange = new RowPutChange(timelineTable, primaryKey);
rowChange.addColumn("message", ColumnValue.fromString(message));
PutRowRequest request = new PutRowRequest(rowChange);
client.putRow(request);
}
以上是模擬一個群內消息同步和存儲的示例代碼。群名稱為『TableStore』休里,群內成員有『A, B, C, D, E』蛆挫。群內新的消息,需要先存儲到群的存儲Timeline(Timeline ID為群名稱)妙黍,之后需要以寫擴散的模式推送到群內每個成員的同步Timeline(以群成員名稱作為Timeline ID)悴侵。
public static void main(String[] args) {
String endpoint = "<endpoint>";
String accessId = "<access_id>";
String accessKey = "<access_key>";
String instanceName = "<instance_name>";
SyncClient client = new SyncClient(endpoint, accessId, accessKey, instanceName);
[大數(shù)據(jù)/運維/java架構]:649917651
String pushTable = "PushTable";
String storeTable = "StoreTable";
String groupName = "TableStore(釘釘號:11789671)";
List<String> groupMembers = Arrays.asList("A", "B", "C", "D", "E");
// 某個群成員同步群消息
List<String> messages = syncMessages(client, pushTable, "A", 0);
for (String message : messages) {
System.out.println(message);
}
// 某個群成員查看該群所有的歷史消息
messages = syncMessages(client, storeTable, groupName, 0);
for (String message : messages) {
System.out.println(message);
}
client.shutdown();
}
private static List<String> syncMessages(SyncClient client, String timelineTable, String timelineId, long seqId) {
RangeIteratorParameter param = new RangeIteratorParameter(timelineTable);
PrimaryKey startKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("timeline_id", PrimaryKeyValue.fromString(timelineId))
.addPrimaryKeyColumn("seq_id", PrimaryKeyValue.fromLong(seqId)).build();
param.setInclusiveStartPrimaryKey(startKey);
PrimaryKey endKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("timeline_id", PrimaryKeyValue.fromString(timelineId))
.addPrimaryKeyColumn("seq_id", PrimaryKeyValue.INF_MAX).build();
param.setExclusiveEndPrimaryKey(endKey);
param.setMaxVersions(1);
Iterator<Row> iter = client.createRangeIterator(param);
List<String> messages = new ArrayList<String>();
while (iter.hasNext()) {
Row row = iter.next();
messages.add(row.getLatestColumn("message").getValue().asString());
}
return messages;
}
以上是拉取群內歷史消息以及某個群成員進行消息同步的示例代碼,主要邏輯在syncMessages函數(shù)內拭嫁。示例代碼中可免,拉取消息都是從seq_id為0開始,0為TableStore自增列中最小值做粤,所以代表了從最小的一個位點開始拉取消息浇借,即拉取全量消息。
后記
這篇文章主要介紹了現(xiàn)代IM系統(tǒng)中消息推送和存儲架構的實現(xiàn)怕品,基于邏輯的Timeline模型妇垢,我們可以很清晰明了的理解整個消息推送和存儲的架構∪饪担基于TableStore闯估,可以非常簡單的實現(xiàn)Timeline模型,其中自增列功能吼和,完美的匹配了Timeline模型中所需要的最關鍵的SeqId自增涨薪。
TableStore(表格存儲)是阿里云自主研發(fā)的專業(yè)級分布式NoSQL數(shù)據(jù)庫,是基于共享存儲的高性能炫乓、低成本刚夺、易擴展、全托管的半結構化數(shù)據(jù)存儲平臺厢岂,支撐互聯(lián)網(wǎng)和物聯(lián)網(wǎng)數(shù)據(jù)的高效計算與分析光督。IM系統(tǒng)的消息推送和存儲場景,是TableStore在社交領域的重要應用之一塔粒。
基于Timeline的消息存儲和推送模型结借,將不光應用在IM消息系統(tǒng)中,還可應用在例如Feeds流卒茬、實時消息同步船老、直播彈幕等場景。