Electron 進程管理工具開發(fā)日記3:進程池負載均衡、智能啟停

>> 原文鏈接

文中實現(xiàn)的部分工具方法正處于早期/測試階段凤粗,仍在持續(xù)優(yōu)化中酥泛,僅供參考...

在 Ubuntu20.04 上進行開發(fā)/測試,可用于 Electron 項目,測試版本:Electron@8.2.0 / 9.3.5

Contents


├── Contents (you are here!)
│
├── I. 前言
├── II. 架構(gòu)圖
│
├── III.electron-re 可以用來做什么柔袁?
│   ├── 1) 用于 Electron 應(yīng)用
│   └── 2) 用于 Electron/Nodejs 應(yīng)用
│
├── IV. UI 功能介紹
│   ├── 主界面
│   ├── 功能1:Kill 進程
│   ├── 功能2:一鍵開啟 DevTools
│   ├── 功能3:查看進程日志
│   ├── 功能4:查看進程 CPU/Memory 占用趨勢
│   └── 功能5:查看 MessageChannel 請求發(fā)送日志
│
├── V. 新特性:進程池負載均衡
│   ├── 關(guān)于負載均衡
│   ├── 負載均衡策略說明
│   ├── 負載均衡策略的簡易實現(xiàn)
│   ├── 負載均衡器的實現(xiàn)
│   └── 進程池配合 LoadBalancer 來實現(xiàn)負載均衡
│
├── VI. 新特性:子進程智能啟停
│   ├── 使進程休眠的各種方式
│   ├── 生命周期 LifeCycle 的實現(xiàn)
│   └── 進程互斥鎖的雛形
│
├── VII. 存在的已知問題
├── VIII. Next To Do
│
├── IX. 幾個實際使用示例
│   ├── 1) Service/MessageChannel 使用示例
│   ├── 2) 一個實際用于生產(chǎn)項目的例子
│   ├── 3) ChildProcessPool/ProcessHost 使用示例
│   ├── 3) test 測試目錄示例
│   └── 4) github README 說明
│

I. 前言


之前在做 Electron 應(yīng)用開發(fā)的時候呆躲,寫了個 Electron 進程管理工具 electron-re,支持 Electron/Node 多進程管理捶索、service 模擬插掂、進程實時監(jiān)控(UI功能)、Node.js 進程池等特性腥例。已經(jīng)發(fā)布為npm組件辅甥,可以直接安裝(最新特性還沒發(fā)布到線上,需要再進行測試):

>> github地址

$: npm install electron-re --save
# or
$: yarn add electron-re

本主題前面兩篇文章:

  1. 《Electron/Node多進程工具開發(fā)日記》 描述了electron-re的開發(fā)背景院崇、針對的問題場景以及詳細的使用方法肆氓。
  2. 《Electron多進程工具開發(fā)日記2》 介紹了新特性 "多進程管理 UI" 的開發(fā)和使用相關(guān)底瓣。UI 界面基于 electron-re 已有的 BrowserService/MessageChannelChildProcessPool/ProcessHost 基礎(chǔ)架構(gòu)驅(qū)動谢揪,使用 React17 / Babel7 開發(fā)。

這篇文章主要是描述最近支持的進程池模塊新特性 - "進程池負載均衡" 和 "子進程智能啟停"捐凭,以及相關(guān)的基本實現(xiàn)原理拨扶。同時提出自己遇到的一些問題,以及對這些問題的思考茁肠、解決方案患民,對之后版本迭代的一些想法等等。

II. electron-re 架構(gòu)圖


electron-re_arch.png
  • Electron Core:Electron 應(yīng)用的一系列核心功能垦梆,包含了應(yīng)用的主進程匹颤、渲染進程、窗口等等(Electron 自帶)托猩。
  • BrowserWindow:渲染窗口進程印蓖,一般用于UI渲染 (Electron 自帶)。
  • ProcessManager:進程管理器京腥,負責(zé)進程占用資源采集赦肃、異步刷新UI、響應(yīng)和發(fā)出各種進程管理信號公浪,作為一個觀察者對象給其它模塊和UI提供服務(wù) (electron-re 引入)他宛。
  • MessageChannel:適用于主進程、渲染進程欠气、Service 進程的消息發(fā)送工具厅各,基于原生 IPC 封裝,主要服務(wù)于 BrowserService预柒,也可替代原生的 IPC 通信方法 (electron-re 引入)讯检。
  • ChildProcess:由 child_process.fork 方法生成的子進程琐鲁,不過以裝飾器的方式為其添加了簡單的進程休眠和喚醒邏輯 (electron-re 引入)卫旱。
  • ProcessHost:配合進程池使用的工具人灼,我稱它為 "進程事務(wù)中心",封裝了 process.send / process.on 基本邏輯顾翼,提供了 Promise 的調(diào)用方式讓 主進程/子進程 之間 IPC 消息通信更簡單 (electron-re 引入)投放。
  • LoadBalancer:服務(wù)于進程池的負載均衡器 (electron-re 引入)。
  • LifeCycle:服務(wù)于進程池的生命周期 (electron-re 引入)适贸。
  • ChildProcessPool:基于 Node.js - child_process.fork 方法實現(xiàn)的進程池灸芳,內(nèi)部管理多個 ChildProcess 實例對象,支持自定義負載均衡策略拜姿、子進程智能啟停烙样、子進程異常退出后自動重啟等特性 (electron-re 引入)。
  • BrowserService:基于 BrowserWindow 實現(xiàn)的 Service 進程蕊肥,可以看成是一個運行在后臺的隱藏渲染窗口進程谒获,允許 Node 注入,不過僅支持 CommonJs 規(guī)范 (electron-re 引入)壁却。

III. electron-re 可以用來做什么批狱?


1. 用于 Electron 應(yīng)用

  • BrowserService
  • MessageChannel

在 Electron 的一些“最佳實踐”中,建議將占用cpu的代碼放到渲染過程中而不是直接放在主過程中展东,這里先看下 chromium 的架構(gòu)圖:

chromium.jpg

每個渲染進程都有一個全局對象 RenderProcess赔硫,用來管理與父瀏覽器進程的通信,同時維護著一份全局狀態(tài)盐肃。瀏覽器進程為每個渲染進程維護一個 RenderProcessHost 對象爪膊,用來管理瀏覽器狀態(tài)和與渲染進程的通信。瀏覽器進程和渲染進程使用 Chromium 的 IPC 系統(tǒng)進行通信砸王。在 chromium 中推盛,頁面渲染時,UI進程需要和 main process 不斷的進行 IPC 同步处硬,若此時 main process 忙小槐,則 UIprocess 就會在 IPC 時阻塞。所以如果主進程持續(xù)進行消耗 CPU 時間的任務(wù)或阻塞同步 IO 的任務(wù)的話荷辕,就會在一定程度上阻塞凿跳,從而影響主進程和各個渲染進程之間的 IPC 通信,IPC 通信有延遲或是受阻疮方,渲染進程窗口就會卡頓掉幀控嗜,嚴重的話甚至?xí)ㄗ〔粍印?/p>

因此 electron-re 在 Electron 已有的 Main Process 主進程 和 Renderer Process 渲染進程邏輯的基礎(chǔ)上獨立出一個單獨的 Service 概念。Service即不需要顯示界面的后臺進程骡显,它不參與 UI 交互疆栏,單獨為主進程或其它渲染進程提供服務(wù)曾掂,它的底層實現(xiàn)為一個允許 node注入remote調(diào)用隱藏渲染窗口進程

這樣就可以將代碼中耗費 cpu 的操作(比如文件上傳中維護一個數(shù)千個上傳任務(wù)的隊列)編寫成一個單獨的js文件壁顶,然后使用 BrowserService 構(gòu)造函數(shù)以這個 js 文件的地址 path 為參數(shù)構(gòu)造一個 Service 實例珠洗,從而將他們從主進程中分離。如果你說那這部分耗費 cpu 的操作直接放到渲染窗口進程可以嘛若专?這其實取決于項目自身的架構(gòu)設(shè)計许蓖,以及對進程之間數(shù)據(jù)傳輸性能損耗和傳輸時間等各方面的權(quán)衡,創(chuàng)建一個 Service 的簡單示例:

const { BrowserService } = require('electron-re');
const myServcie = new BrowserService('app', path.join(__dirname, 'path/to/app.service.js'));

如果使用了 BrowserService 的話调衰,要想在主進程膊爪、渲染進程、service 進程之間相互發(fā)送消息就要使用 electron-re 提供的 MessageChannel 通信工具嚎莉,它的接口設(shè)計跟 Electron 內(nèi)建的IPC基本一致米酬,底層也是基于原生的 IPC 異步通信原理來實現(xiàn)的,簡單示例如下:

/* ---- main.js ---- */
const { BrowserService } = require('electron-re');
// 主進程中向一個 service 'app' 發(fā)送消息
MessageChannel.send('app', 'channel1', { value: 'test1' });

2. 用于 Electron/Nodejs 應(yīng)用

  • ChildProcessPool
  • ProcessHost

此外趋箩,如果要創(chuàng)建一些不依賴于 Electron 運行時的子進程(相關(guān)參考nodejs child_process)赃额,可以使用 electron-re 提供的專門為 nodejs 運行時編寫的進程池 ChildProcessPool 。因為創(chuàng)建進程本身所需的開銷很大阁簸,使用進程池來重復(fù)利用已經(jīng)創(chuàng)建了的子進程爬早,將多進程架構(gòu)帶來的性能效益最大化,簡單示例如下:

/* --- 主進程中 --- */
const { ChildProcessPool, LoadBalancer } = require('electron-re');

const pool = new ChildProcessPool({
  path: path.join(app.getAppPath(), 'app/services/child.js'), // 子進程執(zhí)行文件路徑
  max: 3, // 最大進程數(shù)
  strategy: LoadBalancer.ALGORITHM.WEIGHTS, // 負載均衡策略 - 權(quán)重
  weights: [1, 2, 3], // 權(quán)重分配
});

pool
  .send('sync-work', params)
  .then(rsp => console.log(rsp));

一般情況下启妹,在我們的子進程執(zhí)行文件中筛严,為了在主進程和子進程之間同步數(shù)據(jù),可以使用 process.send('channel', params)process.on('channel', function) 的方式實現(xiàn)(前提是進程以以 fork 方式創(chuàng)建或者手動開啟了 IPC 通信)饶米。但是這樣在處理業(yè)務(wù)邏輯的同時也強迫我們?nèi)リP(guān)注進程之間的通信桨啃,你需要知道子進程什么時候能處理完畢,然后再使用process.send再將數(shù)據(jù)返回主進程檬输,使用方式繁瑣照瘾。

electron-re 引入了 ProcessHost 的概念,我稱之為"進程事務(wù)中心"丧慈。實際使用時在子進程執(zhí)行文件中只需要將各個任務(wù)函數(shù)通過 ProcessHost.registry('task-name', function) 注冊成多個被監(jiān)聽的事務(wù)析命,然后配合進程池的 ChildProcessPool.send('task-name', params) 來觸發(fā)子進程事務(wù)邏輯的調(diào)用即可,ChildProcessPool.send() 同時會返回一個 Promise 實例以便獲取回調(diào)數(shù)據(jù)逃默,簡單示例如下:

/* --- 子進程中 --- */
const { ProcessHost } = require('electron-re');

ProcessHost
  .registry('sync-work', (params) => {
    return { value: 'task-value' };
  })
  .registry('async-work', (params) => {
    return fetch(params.url);
  });

IV. UI 功能介紹


UI 功能基于 electron-re 基礎(chǔ)架構(gòu)開發(fā)鹃愤,它通過異步 IPC 和主進程的 ProcessManager 進行通信,實時刷新進程狀態(tài)完域。操作者可以通過 UI 手動 Kill 進程娇豫、查看進程 console 數(shù)據(jù)街望、查看進程數(shù) CPU/Memory 占用趨勢以及查看 MessageChannel 工具的請求發(fā)送記錄。

主界面

UI參考 electron-process-manager 設(shè)計

預(yù)覽圖:

process-manager.main.png

主要功能如下:

  1. 展示 Electron 應(yīng)用中所有開啟的進程,包括主進程椎组、普通的渲染進程、Service 進程(electron-re 引入)、ChildProcessPool 創(chuàng)建的子進程(electron-re 引入)。

  2. 進程列表中顯示各個進程進程號备典、進程標識、父進程號虐沥、內(nèi)存占用大小熊经、CPU 占用百分比等,所有進程標識分為:main(主進程)欲险、service(服務(wù)進程)、renderer(渲染進程)匹涮、node(進程池子進程)天试,點擊表格頭可以針對對某項進行遞增/遞減排序。

  3. 選中某個進程后可以 Kill 此進程然低、查看進程控制臺 Console 數(shù)據(jù)喜每、查看1分鐘內(nèi)進程 CPU/Memory 占用趨勢,如果此進程是渲染進程的話還可以通過 DevTools 按鈕一鍵打開內(nèi)置調(diào)試工具雳攘。

  4. ChildProcessPool 創(chuàng)建的子進程暫不支持直接打開 DevTools 進行調(diào)試带兜,不過由于創(chuàng)建子進程時添加了 --inspect 參數(shù),可以使用 chrome 的 chrome://inspect 進行遠程調(diào)試吨灭。

  5. 點擊 Signals 按鈕可以查看 MessageChannel 工具的請求發(fā)送日志刚照,包括簡單的請求參數(shù)、請求名喧兄、請求返回數(shù)據(jù)等无畔。

功能:Kill 進程

kill.gif

功能:一鍵開啟 DevTools

devtools.gif

功能:查看進程日志

console.gif

功能:查看進程 CPU/Memory 占用趨勢

trends.gif

功能:查看 MessageChannel 請求發(fā)送日志

signals.png

V. 新特性:進程池負載均衡


簡化的初版實現(xiàn)

>> 代碼地址

? 關(guān)于負載均衡

“ 負載均衡,英文名稱為 Load Balance吠冤,其含義就是指將負載(工作任務(wù))進行平衡浑彰、分攤到多個操作單元上進行運行,例如 FTP 服務(wù)器拯辙、Web服務(wù)器郭变、企業(yè)核心應(yīng)用服務(wù)器和其它主要任務(wù)服務(wù)器等,從而協(xié)同完成工作任務(wù)涯保。
負載均衡構(gòu)建在原有網(wǎng)絡(luò)結(jié)構(gòu)之上诉濒,它提供了一種透明且廉價有效的方法擴展服務(wù)器和網(wǎng)絡(luò)設(shè)備的帶寬、加強網(wǎng)絡(luò)數(shù)據(jù)處理能力遭赂、增加吞吐量循诉、提高網(wǎng)絡(luò)的可用性和靈活性∑菜” -- 《百度百科》

? 負載均衡策略說明

之前的實現(xiàn)中茄猫,進程池創(chuàng)建好后狈蚤,當使用 pool 發(fā)送請求時,采用兩種方式處理請求發(fā)送策略:

  1. 默認使用輪詢策略選擇一個子進程處理請求划纽,只能保證基本的請求平均分配脆侮。

  2. 另一種使用情況是通過手動指定發(fā)送請求時的額外參數(shù) id:pool.send(channel, params, id),這樣子讓 id 相同的請求發(fā)送到同一個子進程上勇劣。一個適用情景就是:第一次我們向某個子進程發(fā)送請求靖避,該子進程處理請求后在其運行時內(nèi)存空間中存儲了一些處理結(jié)果,之后某個情況下需要將之前那次請求產(chǎn)生的處理結(jié)果再次拿回主進程比默,這時候就需要使用 id 來區(qū)分請求幻捏。

新版本引入了一些負載均衡策略,包括:

  • POLLING - 輪詢:子進程輪流處理請求
  • WEIGHTS - 權(quán)重:子進程根據(jù)設(shè)置的權(quán)重來處理請求
  • RANDOM - 隨機:子進程隨機處理請求
  • SPECIFY - 指定:子進程根據(jù)指定的進程 id 處理請求
  • WEIGHTS_POLLING - 權(quán)重輪詢:權(quán)重輪詢策略與輪詢策略類似命咐,但是權(quán)重輪詢策略會根據(jù)權(quán)重來計算子進程的輪詢次數(shù)篡九,從而穩(wěn)定每個子進程的平均處理請求數(shù)量。
  • WEIGHTS_RANDOM - 權(quán)重隨機:權(quán)重隨機策略與隨機策略類似醋奠,但是權(quán)重隨機策略會根據(jù)權(quán)重來計算子進程的隨機次數(shù)榛臼,從而穩(wěn)定每個子進程的平均處理請求數(shù)量。
  • MINIMUM_CONNECTION - 最小連接數(shù):選擇子進程上具有最小連接活動數(shù)量的子進程處理請求窜司。
  • WEIGHTS_MINIMUM_CONNECTION - 權(quán)重最小連接數(shù):權(quán)重最小連接數(shù)策略與最小連接數(shù)策略類似沛善,不過各個子進程被選中的概率由連接數(shù)和權(quán)重共同決定。

? 負載均衡策略的簡易實現(xiàn)

參數(shù)說明:

  • tasks:任務(wù)數(shù)組塞祈,一個示例:[{id: 11101, weight: 2}, {id: 11102, weight: 1}]金刁。
  • currentIndex: 目前所處的任務(wù)索引,默認為 0织咧,每次調(diào)用時會自動加 1胀葱,超出任務(wù)數(shù)組長度時會自動取模。
  • context:主進程參數(shù)上下文笙蒙,用于動態(tài)更新當前任務(wù)索引和權(quán)重索引抵屿。
  • weightIndex:權(quán)重索引,用于權(quán)重策略捅位,默認為 0轧葛,每次調(diào)用時會自動加 1,超出權(quán)重總和時會自動取模艇搀。
  • weightTotal:權(quán)重總和尿扯,用于權(quán)重策略相關(guān)計算。
  • connectionsMap:各個進程活動連接數(shù)的映射焰雕,用于最小連接數(shù)策略相關(guān)計算衷笋。
1. 輪詢策略(POLLING)

原理:索引值遞增,每次調(diào)用時會自動加 1矩屁,超出任務(wù)數(shù)組長度時會自動取模辟宗,保證平均調(diào)用爵赵。
時間復(fù)雜度 O(n) = 1

/* polling algorithm */
module.exports = function (tasks, currentIndex, context) {
  if (!tasks.length) return null;

  const task = tasks[currentIndex];
  context.currentIndex ++;
  context.currentIndex %= tasks.length;

  return task || null;
};
2. 權(quán)重策略(WEIGHTS)

原理:每個進程根據(jù) (權(quán)重值 + (權(quán)重總和 * 隨機因子)) 生成最終計算值,最終計算值中的最大值被命中泊脐。
時間復(fù)雜度 O(n) = n

/* weight algorithm */
module.exports = function (tasks, weightTotal, context) {

  if (!tasks.length) return null;

  let max = tasks[0].weight, maxIndex = 0, sum;

  for (let i = 0; i < tasks.length; i++) {
    sum = (tasks[i].weight || 0) + Math.random() * weightTotal;
    if (sum >= max) {
      max = sum;
      maxIndex = i;
    }
  }

  context.weightIndex += 1;
  context.weightIndex %= (weightTotal + 1);

  return tasks[maxIndex];
};
3. 隨機策略(RANDOM)

原理:隨機函數(shù)在 [0, length) 中任意選取一個索引即可
時間復(fù)雜度 O(n) = 1

/* random algorithm */
module.exports = function (tasks) {

  const length = tasks.length;
  const target = tasks[Math.floor(Math.random() * length)];

  return target || null;
};
4. 權(quán)重輪詢策略(WEIGHTS_POLLING)

原理:類似輪詢策略空幻,不過輪詢的區(qū)間為:[最小權(quán)重值, 權(quán)重總和],根據(jù)各項權(quán)重累加值進行命中區(qū)間計算容客。每次調(diào)用時權(quán)重索引會自動加 1秕铛,超出權(quán)重總和時會自動取模。
時間復(fù)雜度 O(n) = n

/* weights polling */
module.exports = function (tasks, weightIndex, weightTotal, context) {

  if (!tasks.length) return null;

  let weight = 0;
  let task;

  for (let i = 0; i < tasks.length; i++) {
    weight += tasks[i].weight || 0;
    if (weight >= weightIndex) {
      task = tasks[i];
      break;
    }
  }

  context.weightIndex += 1;
  context.weightIndex %= (weightTotal + 1);

  return task;
};
5. 權(quán)重隨機策略(WEIGHTS_RANDOM)

原理:由 (權(quán)重總和 * 隨機因子) 產(chǎn)生計算值缩挑,將各項權(quán)重值與其相減但两,第一個不大于零的最終值即被命中。
時間復(fù)雜度 O(n) = n

/* weights random algorithm */
module.exports = function (tasks, weightTotal) {
  let task;
  let weight = Math.ceil(Math.random() * weightTotal);

  for (let i = 0; i < tasks.length; i++) {
    weight -= tasks[i].weight || 0;
    if (weight <= 0) {
      task = tasks[i];
      break;
    }
  }

  return task || null;
};
6. 最小連接數(shù)策略(MINIMUM_CONNECTION)

原理:直接選擇當前連接數(shù)最小的項即可调煎。
時間復(fù)雜度 O(n) = n

/* minimum connections algorithm */
module.exports = function (tasks, connectionsMap={}) {
  if (tasks.length < 2) return tasks[0] || null;

  let min = connectionsMap[tasks[0].id];
  let minIndex = 0;

  for (let i = 1; i < tasks.length; i++) {
    const con = connectionsMap[tasks[i].id] || 0;
    if (con <= min) {
      min = con;
      minIndex = i;
    }
  }

  return tasks[minIndex] || null;
};
7. 權(quán)重最小連接數(shù)(WEIGHTS_MINIMUM_CONNECTION)

原理:權(quán)重 + ( 隨機因子 * 權(quán)重總和 ) + ( 連接數(shù)占比 * 權(quán)重總和 ) 三個因子镜遣,計算出最終值,根據(jù)最終值的大小進行比較士袄,最小值所代表項即被命中。
時間復(fù)雜度 O(n) = n

/* weights minimum connections algorithm */
module.exports = function (tasks, weightTotal, connectionsMap, context) {

  if (!tasks.length) return null;

  let min = tasks[0].weight, minIndex = 0, sum;

  const connectionsTotal = tasks.reduce((total, cur) => {
    total += (connectionsMap[cur.id] || 0);
    return total;
  }, 0);

  // algorithm: (weight + connections'weight) + random factor
  for (let i = 0; i < tasks.length; i++) {
    sum =
      (tasks[i].weight || 0) + (Math.random() * weightTotal) +
      (( (connectionsMap[tasks[i].id] || 0) * weightTotal ) / connectionsTotal);
    if (sum <= min) {
      min = sum;
      minIndex = i;
    }
  }

  context.weightIndex += 1;
  context.weightIndex %= (weightTotal + 1);

  return tasks[minIndex];
};

? 負載均衡器的實現(xiàn)

代碼都不復(fù)雜谎僻,有幾點需要說明:

  1. params 對象保存了用于各種策略計算的一些參數(shù)娄柳,比如權(quán)重索引、權(quán)重總和艘绍、連接數(shù)赤拒、CPU/Memory占用等等。
  2. scheduler 對象用于調(diào)用各種策略進行計算诱鞠,scheduler.calculate() 會返回一個命中的進程 id挎挖。
  3. targets 即所有用于計算的目標進程,不過其中僅存放了目標進程 pid 和 其權(quán)重 weight:[{id: [pid], weight: [number]}, ...]航夺。
  4. algorithm 為特定的負載均衡策略蕉朵,默認值為輪詢策略。
  5. ProcessManager.on('refresh', this.refreshParams)阳掐,負載均衡器通過監(jiān)聽 ProcessManager 的 refresh 事件來定時更新各個進程的計算參數(shù)始衅。ProcessManager 中有一個定時器,每隔一段時間就會采集一次各個被監(jiān)聽的進程的資源占用情況缭保,并攜帶采集數(shù)據(jù)觸發(fā)一次 refresh 事件汛闸。
const CONSTS = require("./consts");
const Scheduler = require("./scheduler");
const {
  RANDOM,
  POLLING,
  WEIGHTS,
  SPECIFY,
  WEIGHTS_RANDOM,
  WEIGHTS_POLLING,
  MINIMUM_CONNECTION,
  WEIGHTS_MINIMUM_CONNECTION,
} = CONSTS;
const ProcessManager = require('../ProcessManager');

/* Load Balance Instance */
class LoadBalancer {
  /**
    * @param  {Object} options [ options object ]
    * @param  {Array } options.targets [ targets for load balancing calculation: [{id: 1, weight: 1}, {id: 2, weight: 2}] ]
    * @param  {String} options.algorithm [ strategies for load balancing calculation : RANDOM | POLLING | WEIGHTS | SPECIFY | WEIGHTS_RANDOM | WEIGHTS_POLLING | MINIMUM_CONNECTION | WEIGHTS_MINIMUM_CONNECTION]
    */
  constructor(options) {
    this.targets = options.targets;
    this.algorithm = options.algorithm || POLLING;
    this.params = { // data for algorithm
      currentIndex: 0, // index
      weightIndex: 0, // index for weight alogrithm
      weightTotal: 0, // total weight
      connectionsMap: {}, // connections of each target
      cpuOccupancyMap: {}, // cpu occupancy of each target
      memoryOccupancyMap: {}, // cpu occupancy of each target
    };
    this.scheduler = new Scheduler(this.algorithm);
    this.memoParams = this.memorizedParams();
    this.calculateWeightIndex();
    ProcessManager.on('refresh', this.refreshParams);
  }

  /* params formatter */
  memorizedParams = () => {
    return {
      [RANDOM]: () => [],
      [POLLING]: () => [this.params.currentIndex, this.params],
      [WEIGHTS]: () => [this.params.weightTotal, this.params],
      [SPECIFY]: (id) => [id],
      [WEIGHTS_RANDOM]: () => [this.params.weightTotal],
      [WEIGHTS_POLLING]: () => [this.params.weightIndex, this.params.weightTotal, this.params],
      [MINIMUM_CONNECTION]: () => [this.params.connectionsMap],
      [WEIGHTS_MINIMUM_CONNECTION]: () => [this.params.weightTotal, this.params.connectionsMap, this.params],
    };
  }

  /* refresh params data */
  refreshParams = (pidMap) => { ... }

  /* pick one task from queue */
  pickOne = (...params) => {
    return this.scheduler.calculate(
      this.targets, this.memoParams[this.algorithm](...params)
    );
  }

  /* pick multi task from queue */
  pickMulti = (count = 1, ...params) => {
    return new Array(count).fill().map(
      () => this.pickOne(...params)
    );
  }

  /* calculate weight */
  calculateWeightIndex = () => {
    this.params.weightTotal = this.targets.reduce((total, cur) => total + (cur.weight || 0), 0);
    if (this.params.weightIndex > this.params.weightTotal) {
      this.params.weightIndex = this.params.weightTotal;
    }
  }

  /* calculate index */
  calculateIndex = () => {
    if (this.params.currentIndex >= this.targets.length) {
      this.params.currentIndex = (ths.params.currentIndex - 1 >= 0) ? (this.params.currentIndex - 1) : 0;
    }
  }

  /* clean data of a task or all task */
  clean = (id) => { ... }

  /* add a task */
  add = (task) => {...}

  /* remove target from queue */
  del = (target) => {...}

  /* wipe queue and data */
  wipe = () => {...}

  /* update calculate params */
  updateParams = (object) => {
    Object.entries(object).map(([key, value]) => {
      if (key in this.params) {
        this.params[key] = value;
      }
    });
  }

  /* reset targets */
  setTargets = (targets) => {...}

  /* change algorithm strategy */
  setAlgorithm = (algorithm) => {...}
}

module.exports = Object.assign(LoadBalancer, { ALGORITHM: CONSTS });

? 進程池配合 LoadBalancer 來實現(xiàn)負載均衡

有幾點需要說明:

  1. 當我們使用 pool.send('channel', params) 時,pool 內(nèi)部 getForkedFromPool() 函數(shù)會被調(diào)用艺骂,函數(shù)從進程池中選擇一個進程來執(zhí)行任務(wù)诸老,如果子進程數(shù)未達到最大設(shè)定數(shù),則優(yōu)先創(chuàng)建一個子進程來處理請求钳恕。
  2. 子進程 創(chuàng)建/銷毀/退出 時需要同步更新 LoadBalancer 中監(jiān)聽的 targets别伏,否則已被銷毀的進程 pid 可能會在執(zhí)行負載均衡策略計算后被返回蹄衷。
  3. ForkedProcess 是一個裝飾器類,封裝了 child_process.fork 邏輯畸肆,為其增加了一些額外功能宦芦,如:進程睡眠、喚醒轴脐、綁定事件调卑、發(fā)送請求等基本方法。
const _path = require('path');
const EventEmitter = require('events');

const ForkedProcess = require('./ForkedProcess');
const ProcessLifeCycle = require('../ProcessLifeCycle.class');
const ProcessManager = require('../ProcessManager/index');
const { defaultLifecycle } = require('../ProcessLifeCycle.class');
const LoadBalancer = require('../LoadBalancer');
let { inspectStartIndex } = require('../../conf/global.json');
const { getRandomString, removeForkedFromPool, convertForkedToMap, isValidValue } = require('../utils');
const { UPDATE_CONNECTIONS_SIGNAL } = require('../consts');

const defaultStrategy = LoadBalancer.ALGORITHM.POLLING;

class ChildProcessPool extends EventEmitter {
  constructor({
    path, max=6, cwd, env={},
    weights=[], // weights of processes, the length is equal to max
    strategy=defaultStrategy,
    ...
  }) {
    super();
    this.cwd = cwd || _path.dirname(path);
    this.env = {
      ...process.env,
      ...env
    };
    this.callbacks = {};
    this.pidMap = new Map();
    this.callbacksMap = new Map();
    this.connectionsMap={};
    this.forked = [];
    this.connectionsTimer = null;
    this.forkedMap = {};
    this.forkedPath = path;
    this.forkIndex = 0;
    this.maxInstance = max;
    this.weights = new Array(max).fill().map(
      (_, i) => (isValidValue(weights[i]) ? weights[i] : 1)
    );
    this.LB = new LoadBalancer({
      algorithm: strategy,
      targets: [],
    });

    this.initEvents();
  }

  /* -------------- internal -------------- */

  /* init events */
  initEvents = () => {
    // process exit
    this.on('forked_exit', (pid) => {
      this.onForkedDisconnect(pid);
    });
    ...
  }

  /**
    * onForkedCreate [triggered when a process instance created]
    * @param  {[String]} pid [process pid]
    */
  onForkedCreate = (forked) => {
    const pidsValue = this.forked.map(f => f.pid);
    const length = this.forked.length;

    this.LB.add({
      id: forked.pid,
      weight: this.weights[length - 1],
    });
    ProcessManager.listen(pidsValue, 'node', this.forkedPath);
    ...
  }

  /**
    * onForkedDisconnect [triggered when a process instance disconnect]
    * @param  {[String]} pid [process pid]
    */
   onForkedDisconnect = (pid) => {
    const length = this.forked.length;

    removeForkedFromPool(this.forked, pid, this.pidMap);
    this.LB.del({
      id: pid,
      weight: this.weights[length - 1],
    });
    ProcessManager.unlisten([pid]);
    ...
  }

  /* Get a process instance from the pool */
  getForkedFromPool = (id="default") => {
    let forked;
    if (!this.pidMap.get(id)) {
      // create new process and put it into the pool
      if (this.forked.length < this.maxInstance) {
        inspectStartIndex ++;
        forked = new ForkedProcess(
          this,
          this.forkedPath,
          this.env.NODE_ENV === "development" ? [`--inspect=${inspectStartIndex}`] : [],
          { cwd: this.cwd, env: { ...this.env, id }, stdio: 'pipe' }
        );
        this.forked.push(forked);
        this.onForkedCreate(forked);
      } else {
      // get a process from the pool based on load balancing strategy
        forked = this.forkedMap[this.LB.pickOne().id];
      }
      if (id !== 'default') {
        this.pidMap.set(id, forked.pid);
      }
    } else {
      // pick a special process from the pool
      forked = this.forkedMap[this.pidMap.get(id)];
    }

    if (!forked) throw new Error(`Get forked process from pool failed! the process pid: ${this.pidMap.get(id)}.`);

    return forked;
  }

  /* -------------- caller -------------- */

  /**
  * send [Send request to a process]
  * @param  {[String]} taskName [task name - necessary]
  * @param  {[Any]} params [data passed to process - necessary]
  * @param  {[String]} id [the unique id bound to a process instance - not necessary]
  * @return {[Promise]} [return a Promise instance]
  */
  send = (taskName, params, givenId) => {
    if (givenId === 'default') throw new Error('ChildProcessPool: Prohibit the use of this id value: [default] !')

    const id = getRandomString();
    const forked = this.getForkedFromPool(givenId);
    this.lifecycle.refresh([forked.pid]);

    return new Promise(resolve => {
      this.callbacks[id] = resolve;
      forked.send({action: taskName, params, id });
    });
  }
  ...
}

module.exports = ChildProcessPool;

VI. 新特性:子進程智能啟停


這個特性我也將其稱為 進程生命周期 (lifecycle)大咱。

主要作用是:當子進程一段時間未被調(diào)用恬涧,則自動進入休眠狀態(tài),減少 CPU 占用 (減少內(nèi)存占用很難)碴巾。進入休眠狀態(tài)的時間可以和由創(chuàng)建者控制溯捆,默認為 10 min。當子進程進入休眠后厦瓢,如果有新的請求到來并分發(fā)到該休眠的進程上提揍,則會自動喚醒該進程并繼續(xù)處理當前請求。一段時間閑置后煮仇,將會再次進入休眠狀態(tài)劳跃。

? 使進程休眠的各種方式

1)如果是讓進程暫停的話,可以向進程發(fā)送 SIGSTOP 信號浙垫,發(fā)送 SIGCONT 信號可以恢復(fù)進程刨仑。

Node.js:

process.kill([pid], "SIGSTOP");
process.kill([pid], "SIGCONT");

Unix System (Windows 暫未測試):

kill -STOP [pid]
kill -CONT [pid]

2)Node.js 新的 Atomic.wait API 也可以做到編程控制。該方法會監(jiān)聽一個 Int32Array 對象的給定下標下的值夹姥,若值未發(fā)生改變杉武,則一直等待(阻塞 event loop),直到發(fā)生超時(由 ms 參數(shù)決定)辙售∏岜В可以在主進程中操作這塊共享數(shù)據(jù),然后為子進程解除休眠鎖定圾亏。

const nil = new Int32Array(new SharedArrayBuffer(4));
const array = new Array(100000).fill(0);
setInterval(() => {
console.log(1);
}, 1e3);
Atomics.wait(nil, 0, 0, Number(600e3));

? 生命周期 LifeCycle 的實現(xiàn)

代碼同樣很簡單十拣,有幾點需要說明:

  1. 采用了 標記清除法,子進程觸發(fā)請求時更新調(diào)用時間志鹃,同時使用定時器循環(huán)計算各個被監(jiān)聽子進程的 ( 當前時間 - 上次調(diào)用時間) 差值夭问。如果有超過設(shè)定的時間的進程則發(fā)送 sleep 信號,同時攜帶所有進程 pid曹铃。

  2. 每個 ChildProcessPool 進程池實例都會擁有一個 ProcessLifeCycle 實例對象用于控制當前進程池中的進程的 休眠/喚醒缰趋。ChildProcessPool 會監(jiān)聽 ProcessLifeCycle 對象的 sleep 事件,拿到需要 sleep 的進程 pid 后調(diào)用 ForkedProcesssleep() 方法使其睡眠。下個請求分發(fā)到該進程時秘血,會自動喚醒該進程味抖。

const EventEmitter = require('events');

const defaultLifecycle = {
  expect: 600e3, // default timeout 10 minutes
  internal: 30e3 // default loop check interval 30 seconds
};

class ProcessLifeCycle extends EventEmitter {
  constructor(options) {
    super();
    const {
      expect=defaultLifecycle.expect,
      internal=defaultLifecycle.internal
    } = options;
    this.timer = null;
    this.internal = internal;
    this.expect = expect;
    this.params = {
      activities: new Map()
    };
  }

  /* task check loop */
  taskLoop = () => {
    if (this.timer) return console.warn('ProcessLifeCycle: the task loop is already running');

    this.timer = setInterval(() => {
      const sleepTasks = [];
      const date = new Date();
      const { activities } = this.params;
      ([...activities.entries()]).map(([key, value]) => {
        if (date - value > this.expect) {
          sleepTasks.push(key);
        }
      });
      if (sleepTasks.length) {
        // this.unwatch(sleepTasks);
        this.emit('sleep', sleepTasks);
      }
    }, this.internal);
  }

  /* watch processes */
  watch = (ids=[]) => {
    ids.forEach(id => {
      this.params.activities.set(id, new Date());
    });
  }

  /* unwatch processes */
  unwatch = (ids=[]) => {
    ids.forEach(id => {
      this.params.activities.delete(id);
    });
  }

  /* stop task check loop */
  stop = () => {
    clearInterval(this.timer);
    this.timer = null;
  }

  /* start task check loop */
  start = () => {
    this.taskLoop();
  }

  /* refresh tasks */
  refresh = (ids=[]) => {
    ids.forEach(id => {
      if (this.params.activities.has(id)) {
        this.params.activities.set(id, new Date());
      } else {
        console.warn(`The task with id ${id} is not being watched.`);
      }
    });
  }
}

module.exports = Object.assign(ProcessLifeCycle, { defaultLifecycle });

? 進程互斥鎖的雛形

之前看文章時看到關(guān)于 API - Atomic.wait 的一篇文章,Atomic 除了用于實現(xiàn)進程睡眠灰粮,也能基于它來理解進程互斥鎖的實現(xiàn)原理仔涩。這里有個基本雛形可以作為參考,相關(guān)文檔可以參閱 MDN粘舟。

AsyncLock 對象需要在子進程中引入熔脂,創(chuàng)建 AsyncLock 的構(gòu)造函數(shù)中有一個參數(shù) sab 需要注意。這個參數(shù)是一個 SharedArrayBuffer 共享數(shù)據(jù)塊柑肴,這個共享數(shù)據(jù)快需要在主進程創(chuàng)建霞揉,然后通過 IPC 通信發(fā)送到各個子進程,通常 IPC 通信會序列化一般的諸如 Object / Array 等數(shù)據(jù)晰骑,導(dǎo)致消息接受者和消息發(fā)送者拿到的不是同一個對象适秩,但是經(jīng)由 IPC 發(fā)送的 SharedArrayBuffer 對象卻會指向同一個內(nèi)存塊。

在子進程中使用 SharedArrayBuffer 數(shù)據(jù)創(chuàng)建 AsyncLock 實例后硕舆,任意一個子進程對共享數(shù)據(jù)的修改都會導(dǎo)致其它進程內(nèi)指向這塊內(nèi)存的 SharedArrayBuffer 數(shù)據(jù)內(nèi)容變化秽荞,這就是我們使用它實現(xiàn)進程鎖的基本要點。

先對 Atomic API 做個簡單說明:

  • Atomics.compareExchange(typedArray, index, expectedValue, newValue):Atomics.compareExchange() 靜態(tài)方法會在數(shù)組的值與期望值相等的時候抚官,將給定的替換值替換掉數(shù)組上的值蚂会,然后返回舊值。此原子操作保證在寫上修改的值之前不會發(fā)生其他寫操作耗式。
  • Atomics.waitAsync(typedArray, index, value[, timeout]):靜態(tài)方法 Atomics.wait() 確保了一個在 Int32Array 數(shù)組中給定位置的值沒有發(fā)生變化且仍然是給定的值時進程將會睡眠,直到被喚醒或超時趁猴。該方法返回一個字符串刊咳,值為"ok", "not-equal", 或 "timed-out" 之一。
  • Atomics.notify(typedArray, index[, count]):靜態(tài)方法 Atomics.notify() 喚醒指定數(shù)量的在等待隊列中休眠的進程儡司,不指定 count 時默認喚醒所有娱挨。

AsyncLock 即異步鎖,等待鎖釋放的時候不會阻塞主線程捕犬。主要關(guān)注 executeAfterLocked() 這個方法跷坝,調(diào)用該方法并傳入回調(diào)函數(shù),該回調(diào)函數(shù)會在鎖被獲取后執(zhí)行碉碉,并且在執(zhí)行完畢后自動釋放鎖柴钻。其中一步的關(guān)鍵就是 tryGetLock() 函數(shù),它返回了一個 Promise 對象垢粮,因此我們等待鎖釋放的邏輯在微任務(wù)隊列中執(zhí)行而并不阻塞主線程贴届。

/**
  * @name AsyncLock
  * @description
  *   Use it in child processes, mutex lock logic.
  *   First create SharedArrayBuffer in main process and transfer it to all child processes to control the lock.
  */

class AsyncLock {
  static INDEX = 0;
  static UNLOCKED = 0;
  static LOCKED = 1;

  constructor(sab) {
    this.sab = sab; // data like this: const sab = new SharedArrayBuffer(16);
    this.i32a = new Int32Array(sab);
  }

  lock() {
    while (true) {
      const oldValue = Atomics.compareExchange(
        this.i32a, AsyncLock.INDEX,
        AsyncLock.UNLOCKED, // old
        AsyncLock.LOCKED // new
      );
      if (oldValue == AsyncLock.UNLOCKED) { // success
        return;
      }
      Atomics.wait( // wait
        this.i32a,
        AsyncLock.INDEX,
        AsyncLock.LOCKED // expect
      );
    }
  }

  unlock() {
    const oldValue = Atomics.compareExchange(
      this.i32a, AsyncLock.INDEX,
      AsyncLock.LOCKED,
      AsyncLock.UNLOCKED
    );
    if (oldValue != AsyncLock.LOCKED) { // failed
      throw new Error('Tried to unlock while not holding the mutex');
    }
    Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
  }

  /**
    * executeLocked [async function to acquired the lock and execute callback]
    * @param  {Function} callback [callback function]
    */
  executeAfterLocked(callback) {

    const tryGetLock = async () => {
      while (true) {
        const oldValue = Atomics.compareExchange(
          this.i32a,
          AsyncLock.INDEX,
          AsyncLock.UNLOCKED,
          AsyncLock.LOCKED
        );
        if (oldValue == AsyncLock.UNLOCKED) { // success if AsyncLock.UNLOCKED
          callback();
          this.unlock();
          return;
        }
        const result = Atomics.waitAsync( // wait when AsyncLock.LOCKED
          this.i32a,
          AsyncLock.INDEX,
          AsyncLock.LOCKED
        );
        await result.value; // return a Promise, will not block the main thread
      }
    }

    tryGetLock();
  }
}

VII. 存在的已知問題


  1. 由于使用了 Electron 原生的 remote API,因此 electron-re 部分特性(Service 相關(guān))不支持 Electron 14 以及以上版本(已經(jīng)移除 remote),正考慮近期使用第三方 remote 庫進行替代兼容毫蚓。

  2. 容錯處理做的不夠好占键,這一塊會成為之后的重要優(yōu)化點。

  3. 采集進程池中活動連接數(shù)時采用了"調(diào)用計數(shù)"的方式元潘。這個處理方法不太好畔乙,準確性也不夠高,但是目前還未想到更好的解決方法用于統(tǒng)計子進程中活躍的連接數(shù)翩概。我覺得還是要從底層進行解決牲距,比如:宏任務(wù)和微任務(wù)隊列、V8 虛擬機氮帐、垃圾回收嗅虏、Libuv 底層原理、Node 進程和線程原理...

  4. 暫時沒在 windows 平臺測試進程休眠功能上沐,win 平臺本身不支持進程信號皮服,但是 Node 提供了模擬支持,但是具體表現(xiàn)還需測試参咙。

VIII. Next To Do


  • 讓 Service 支持代碼更新后自動重啟
  • 添加 ChildProcessPool 子進程調(diào)度邏輯
  • 優(yōu)化 ChildProcessPool 多進程console輸出
  • 添加可視化進程管理界面
  • 增強 ChildProcessPool 進程池功能
  • 增強 ProcessHost 事務(wù)中心功能
  • 子進程之間互斥鎖邏輯的實現(xiàn)
  • 使用外部 remote 庫以支持最新版本的 Electron
  • Kill Bugs ??

IX. 幾個實際使用示例


  1. electronux - 我的一個Electron項目龄广,使用了 BrowserService/MessageChannel,并且附帶了ChildProcessPool/ProcessHost使用demo蕴侧。

  2. 暗影襪子-electron - 我的另一個Electron 跨平臺桌面應(yīng)用項目(不提供鏈接择同,可以點擊上面的查看原文),使用 electron-re 進行調(diào)試開發(fā)净宵,并且在生產(chǎn)環(huán)境下可以打開 ProcessManager UI 用于 CPU/Memory 資源占用監(jiān)控和請求日志查看敲才。

  3. file-slice-upload - 一個關(guān)于多文件分片并行上傳的demo,使用了 ChildProcessPool and ProcessHost择葡,基于 Electron@9.3.5開發(fā)紧武。

  4. 也可直接查看 index.test.jstest 目錄下的測試樣例文件,包含了一些使用示例敏储。

  5. 當然 github - README 也有相關(guān)說明項阻星。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市已添,隨后出現(xiàn)的幾起案子妥箕,更是在濱河造成了極大的恐慌,老刑警劉巖更舞,帶你破解...
    沈念sama閱讀 221,273評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件畦幢,死亡現(xiàn)場離奇詭異,居然都是意外死亡疏哗,警方通過查閱死者的電腦和手機呛讲,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,349評論 3 398
  • 文/潘曉璐 我一進店門禾怠,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人贝搁,你說我怎么就攤上這事吗氏。” “怎么了雷逆?”我有些...
    開封第一講書人閱讀 167,709評論 0 360
  • 文/不壞的土叔 我叫張陵弦讽,是天一觀的道長。 經(jīng)常有香客問我膀哲,道長往产,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,520評論 1 296
  • 正文 為了忘掉前任某宪,我火速辦了婚禮仿村,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘兴喂。我一直安慰自己蔼囊,他們只是感情好,可當我...
    茶點故事閱讀 68,515評論 6 397
  • 文/花漫 我一把揭開白布衣迷。 她就那樣靜靜地躺著畏鼓,像睡著了一般。 火紅的嫁衣襯著肌膚如雪壶谒。 梳的紋絲不亂的頭發(fā)上云矫,一...
    開封第一講書人閱讀 52,158評論 1 308
  • 那天,我揣著相機與錄音汗菜,去河邊找鬼让禀。 笑死,一個胖子當著我的面吹牛陨界,可吹牛的內(nèi)容都是我干的堆缘。 我是一名探鬼主播,決...
    沈念sama閱讀 40,755評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼普碎,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了录平?” 一聲冷哼從身側(cè)響起麻车,我...
    開封第一講書人閱讀 39,660評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎斗这,沒想到半個月后动猬,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,203評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡表箭,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,287評論 3 340
  • 正文 我和宋清朗相戀三年赁咙,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,427評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡彼水,死狀恐怖崔拥,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情凤覆,我是刑警寧澤链瓦,帶...
    沈念sama閱讀 36,122評論 5 349
  • 正文 年R本政府宣布,位于F島的核電站盯桦,受9級特大地震影響慈俯,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜拥峦,卻給世界環(huán)境...
    茶點故事閱讀 41,801評論 3 333
  • 文/蒙蒙 一贴膘、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧略号,春花似錦刑峡、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,272評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至随闪,卻和暖如春阳似,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背铐伴。 一陣腳步聲響...
    開封第一講書人閱讀 33,393評論 1 272
  • 我被黑心中介騙來泰國打工撮奏, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人当宴。 一個月前我還...
    沈念sama閱讀 48,808評論 3 376
  • 正文 我出身青樓畜吊,卻偏偏與公主長得像,于是被迫代替她去往敵國和親户矢。 傳聞我的和親對象是個殘疾皇子玲献,可洞房花燭夜當晚...
    茶點故事閱讀 45,440評論 2 359

推薦閱讀更多精彩內(nèi)容