用Golang處理每分鐘百萬級(jí)請(qǐng)求

翻譯原文鏈接 轉(zhuǎn)帖/轉(zhuǎn)載請(qǐng)注明出處

原文鏈接@medium.com 發(fā)表于2017/08/30

我在防垃圾郵件霹琼,防病毒和防惡意軟件領(lǐng)域已經(jīng)工作了15年趟章,前后在好幾個(gè)公司任職染厅。我知道這些系統(tǒng)最后都會(huì)因?yàn)橐幚砗A康臄?shù)據(jù)而變得非常復(fù)雜咳焚。

我現(xiàn)在是smsjunk.com的CEO并且是KnowBe4的首席架構(gòu)師。這兩個(gè)公司在網(wǎng)絡(luò)安全領(lǐng)域都非澄庠埽活躍张抄。

有趣的是,在過去10年里作為一個(gè)碼農(nóng)洼怔,所有我經(jīng)歷過的網(wǎng)站后臺(tái)開發(fā)用的幾乎都是用Ruby on Rails署惯。不要誤解,我很喜歡Ruby on Rails并且認(rèn)為它是一個(gè)非常棒的開發(fā)環(huán)境镣隶。往往在一段時(shí)間后极谊,你開始以ruby的方式來設(shè)計(jì)系統(tǒng)。這時(shí)你會(huì)忘記利用多線程安岂,并行轻猖,快速執(zhí)行(fast executions)和較小的內(nèi)存開銷(small memory overhead),軟件的架構(gòu)會(huì)變得簡(jiǎn)單而高效域那。很多年來咙边,我一直是C/C++DelphiC#的開發(fā)者次员。我開始意識(shí)到使用正確的工具败许,工作會(huì)變得簡(jiǎn)單很多。

我對(duì)語言和框架并不是很熱衷淑蔚。我相信效率檐束,產(chǎn)出和代碼的可維護(hù)性取決于你如何架構(gòu)一個(gè)簡(jiǎn)潔地解決方案。

問題

在開發(fā)我們的匿名遙測(cè)和分析系統(tǒng)時(shí)束倍,我們的目標(biāo)是能夠處理從上百萬個(gè)端點(diǎn)發(fā)來的大量POST請(qǐng)求被丧。HTTP請(qǐng)求處理函數(shù)會(huì)收到包含很多載荷(payloads)的JSON文檔。這些載荷(payloads)需要被寫到Amazon S3上绪妹,接著由map-reduce系統(tǒng)來處理甥桂。

通常我們會(huì)創(chuàng)建一個(gè)worker池架構(gòu)(worker-tier architecture)。利用如下的一些工具:

然后設(shè)置兩個(gè)集群邮旷,一個(gè)用作處理HTTP請(qǐng)求黄选,另外一個(gè)用作workers。這樣我們能夠根據(jù)處理的后臺(tái)工作量進(jìn)行擴(kuò)容婶肩。

從一開始我們小組就覺得應(yīng)該用Go來實(shí)現(xiàn)办陷,因?yàn)樵谟懻撾A段我們估計(jì)這可能會(huì)是一個(gè)處理非常大流量的系統(tǒng)。我已經(jīng)使用Go語言兩年并用它在工作中開發(fā)了一些系統(tǒng)律歼,但它們都沒有處理過這么大的負(fù)載(load)民镜。

我們首先創(chuàng)建了幾個(gè)結(jié)構(gòu)來定義HTTP請(qǐng)求的載荷。我們通過POST請(qǐng)求接收這些載荷险毁,然后用一個(gè)函數(shù)上傳到S3 bucket制圈。

type PayloadCollection struct {
    WindowsVersion  string    `json:"version"`
    Token           string    `json:"token"`
    Payloads        []Payload `json:"data"`
}

type Payload struct {
    // [redacted]
}

func (p *Payload) UploadToS3() error {
    // the storageFolder method ensures that there are no name collision in
    // case we get same timestamp in the key name
    storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

    bucket := S3Bucket

    b := new(bytes.Buffer)
    encodeErr := json.NewEncoder(b).Encode(payload)
    if encodeErr != nil {
        return encodeErr
    }

    // Everything we post to the S3 bucket should be marked 'private'
    var acl = s3.Private
    var contentType = "application/octet-stream"

    return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}

簡(jiǎn)單地使用Goroutines

一開始我們用了最簡(jiǎn)單的方法來實(shí)現(xiàn)POST請(qǐng)求的處理函數(shù)们童。我們嘗試通過goroutine來并行處理請(qǐng)求。

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // Read the body into a string for json decoding
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
        w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {
        go payload.UploadToS3()   // <----- DON'T DO THIS
    }

    w.WriteHeader(http.StatusOK)
}

對(duì)于適量的負(fù)載鲸鹦,這個(gè)方案應(yīng)該沒有問題慧库。但是負(fù)載增加以后這個(gè)方法就不能很好地工作。當(dāng)我們把這個(gè)版本部署到生產(chǎn)環(huán)境中后馋嗜,我們遇到了比預(yù)期大一個(gè)數(shù)量級(jí)的請(qǐng)求量齐板。我們完全低估了流量。

這個(gè)方法有些不盡如人意葛菇。它無法控制創(chuàng)建goroutine的數(shù)量覆积。因?yàn)槲覀兠糠昼娛盏搅艘话偃f個(gè)POST請(qǐng)求,上面的代碼很快就奔潰了熟呛。

再次嘗試

我們需要一個(gè)不同的解決方案宽档。在一開始,我們就討論到需要把HTTP請(qǐng)求處理函數(shù)寫的簡(jiǎn)潔庵朝,然后把處理工作轉(zhuǎn)移到后臺(tái)吗冤。當(dāng)然,這是你在Ruby on Rails世界里必須做的九府,否則你會(huì)阻塞所有worker的工作(例如puma椎瘟,unicorn,passenger等等侄旬,我們這里就不繼續(xù)討論JRuby了)肺蔚。我們需要用到Resque,Sidekiq儡羔,SQS等常用的解決方案宣羊。這個(gè)列表可以很長(zhǎng),因?yàn)橛性S多方法來完成這項(xiàng)任務(wù)汰蜘。

第二個(gè)版本是創(chuàng)建帶緩沖的channel仇冯。這樣我們可以把工作任務(wù)放到隊(duì)列里然后再上傳到S3。因?yàn)榭梢钥刂脐?duì)列的長(zhǎng)度并且有充足的內(nèi)存族操,我們覺得把工作任務(wù)緩存在channel隊(duì)列里應(yīng)該沒有問題苛坚。

var Queue chan Payload

func init() {
    Queue = make(chan Payload, MAX_QUEUE)
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
    ...
    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {
        Queue <- payload
    }
    ...
}

然后我們需要從隊(duì)列里提取工作任務(wù)并進(jìn)行處理。代碼下圖所示:

func StartProcessor() {
    for {
        select {
        case job := <-Queue:
            job.payload.UploadToS3()  // <-- STILL NOT GOOD
        }
    }
}

坦白的說色难,我不知道我們當(dāng)時(shí)在想什么泼舱。這肯定是熬夜喝紅牛的結(jié)果。這個(gè)方法并沒有給我們帶來任何幫助枷莉。隊(duì)列僅僅是將問題延后了娇昙。我們的處理函數(shù)(processor)一次僅上傳一個(gè)載荷(payload),而接收請(qǐng)求的速率比一個(gè)處理函數(shù)上傳S3的能力大太多了依沮,帶緩沖的channel很快就到達(dá)了它的極限涯贞。從而阻塞了HTTP請(qǐng)求處理函數(shù)往隊(duì)列里添加更多的工作任務(wù)。

我們僅僅是延緩了問題的觸發(fā)危喉。系統(tǒng)在倒計(jì)時(shí)宋渔,最后還是崩潰了。在這個(gè)有問題的版本被部署之后辜限,系統(tǒng)的延遲以恒定速度在不停地增長(zhǎng)皇拣。

0_latency.png

更好的解決辦法

我們決定使用Go channel的常用編程模式。使用一個(gè)兩級(jí)channel系統(tǒng)薄嫡,一個(gè)用來存放任務(wù)隊(duì)列氧急,另一個(gè)用來控制處理任務(wù)隊(duì)列的并發(fā)量。

這里的想法是根據(jù)一個(gè)可持續(xù)的速率將S3上傳并行化毫深。這個(gè)速率不會(huì)使機(jī)器變慢或者導(dǎo)致S3的連接錯(cuò)誤吩坝。我們選擇了一個(gè)Job/Worker模式。如果你們對(duì)Java哑蔫,C#等語言熟悉的話钉寝,可以把它想象成是Go語言用channel來實(shí)現(xiàn)的工作線程池。

var (
    MaxWorker = os.Getenv("MAX_WORKERS")
    MaxQueue  = os.Getenv("MAX_QUEUE")
)

// Job represents the job to be run
type Job struct {
    Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// Worker represents the worker that executes the job
type Worker struct {
    WorkerPool  chan chan Job
    JobChannel  chan Job
    quit        chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool)}
}

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel

            select {
            case job := <-w.JobChannel:
                // we have received a work request.
                if err := job.Payload.UploadToS3(); err != nil {
                    log.Errorf("Error uploading to S3: %s", err.Error())
                }

            case <-w.quit:
                // we have received a signal to stop
                return
            }
        }
    }()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

我們修改了HTTP請(qǐng)求處理函數(shù)來創(chuàng)建一個(gè)含有載荷(payload)的Job結(jié)構(gòu)闸迷,然后將它送到一個(gè)叫JobQueue的channel嵌纲。worker會(huì)對(duì)它們進(jìn)行處理。

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // Read the body into a string for json decoding
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
        w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {

        // let's create a job with the payload
        work := Job{Payload: payload}

        // Push the work onto the queue.
        JobQueue <- work
    }

    w.WriteHeader(http.StatusOK)
}

在初始化服務(wù)的時(shí)候腥沽,我們創(chuàng)建了一個(gè)Dispatcher并且調(diào)用了Run()函數(shù)來創(chuàng)建worker池逮走。這些worker會(huì)監(jiān)聽JobQueue上是否有新的任務(wù)并進(jìn)行處理。

dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()

下面是我們的dispatcher實(shí)現(xiàn)代碼:

type Dispatcher struct {
    // A pool of workers channels that are registered with the dispatcher
    WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
    // starting n number of workers
    for i := 0; i < d.maxWorkers; i++ {
        worker := NewWorker(d.pool)
        worker.Start()
    }

    go d.dispatch()
}

func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            // a job request has been received
            go func(job Job) {
                // try to obtain a worker job channel that is available.
                // this will block until a worker is idle
                jobChannel := <-d.WorkerPool

                // dispatch the job to the worker job channel
                jobChannel <- job
            }(job)
        }
    }
}

這里我們提供了創(chuàng)建worker的最大數(shù)目作為參數(shù)今阳,并把這些worker加入到worker池里师溅。因?yàn)槲覀円呀?jīng)在docker化的Go環(huán)境里使用了Amazon的Elasticbeanstalk并且嚴(yán)格按照12-factor方法來配置我們的生產(chǎn)環(huán)境,這些參數(shù)值可以從環(huán)境變量里獲得盾舌。我們可以方便地控制worker數(shù)目和任務(wù)隊(duì)列的長(zhǎng)度险胰。我們可以快速地調(diào)整這些值而不需要重新部署整個(gè)集群。

var (
  MaxWorker = os.Getenv("MAX_WORKERS")
  MaxQueue  = os.Getenv("MAX_QUEUE")
)

部署了新版本之后矿筝,我們看到系統(tǒng)延遲一下子就降到了可以忽略的量級(jí)起便。同時(shí)處理請(qǐng)求的能力也大幅攀升。

1_latency.png

Elastic Load Balancers熱身后幾分鐘窖维,我們看到Elasticbeanstalk應(yīng)用開始處理將近每分鐘一百萬個(gè)請(qǐng)求榆综。我們的流量通常在早上的時(shí)候會(huì)攀升至超過每分鐘一百萬個(gè)請(qǐng)求。同時(shí)铸史,我們也將服務(wù)器的數(shù)目從100臺(tái)縮減到了20臺(tái)鼻疮。

2_host.png

通過合理地配置集群和auto-scaling,我們能夠做到只配置4臺(tái)EC2 c4.Large實(shí)例琳轿。然后當(dāng)CPU使用率持續(xù)5分鐘在90%以上時(shí)用Elastic Auto-Scaling來創(chuàng)建新的實(shí)例判沟。

3_util.png

結(jié)束語

對(duì)我來說簡(jiǎn)潔(simplicity)是第一位的耿芹。我們可以利用無數(shù)隊(duì)列,很多后臺(tái)worker以及復(fù)雜的部署來設(shè)計(jì)一個(gè)復(fù)雜系統(tǒng)挪哄,最終我們還是使用了Elasticbeanstalk auto-scaling的強(qiáng)大功能和Go語言提供的應(yīng)對(duì)并發(fā)的簡(jiǎn)單方法吧秕。用僅僅4臺(tái)機(jī)器(可能還不如我的MacBook Pro強(qiáng)大)來處理每分鐘一百萬次POST請(qǐng)求對(duì)Amazon S3進(jìn)行寫操作。

每項(xiàng)任務(wù)都有對(duì)應(yīng)的正確工具迹炼。當(dāng)你的Ruby on Rails系統(tǒng)需要一個(gè)很強(qiáng)大的HTTP請(qǐng)求處理器砸彬,可以嘗試看看ruby生態(tài)系統(tǒng)以外的其它更強(qiáng)大的選項(xiàng)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末斯入,一起剝皮案震驚了整個(gè)濱河市砂碉,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌刻两,老刑警劉巖增蹭,帶你破解...
    沈念sama閱讀 206,311評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異磅摹,居然都是意外死亡沪铭,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門偏瓤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來杀怠,“玉大人,你說我怎么就攤上這事厅克∨馔耍” “怎么了?”我有些...
    開封第一講書人閱讀 152,671評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵证舟,是天一觀的道長(zhǎng)硕旗。 經(jīng)常有香客問我,道長(zhǎng)女责,這世上最難降的妖魔是什么漆枚? 我笑而不...
    開封第一講書人閱讀 55,252評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮抵知,結(jié)果婚禮上墙基,老公的妹妹穿的比我還像新娘。我一直安慰自己刷喜,他們只是感情好残制,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評(píng)論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著掖疮,像睡著了一般初茶。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上浊闪,一...
    開封第一講書人閱讀 49,031評(píng)論 1 285
  • 那天恼布,我揣著相機(jī)與錄音螺戳,去河邊找鬼。 笑死折汞,一個(gè)胖子當(dāng)著我的面吹牛倔幼,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播字支,決...
    沈念sama閱讀 38,340評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼凤藏,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼奸忽!你這毒婦竟也來了堕伪?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,973評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤栗菜,失蹤者是張志新(化名)和其女友劉穎欠雌,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體疙筹,經(jīng)...
    沈念sama閱讀 43,466評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡富俄,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了而咆。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片霍比。...
    茶點(diǎn)故事閱讀 38,039評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖暴备,靈堂內(nèi)的尸體忽然破棺而出悠瞬,到底是詐尸還是另有隱情,我是刑警寧澤涯捻,帶...
    沈念sama閱讀 33,701評(píng)論 4 323
  • 正文 年R本政府宣布浅妆,位于F島的核電站,受9級(jí)特大地震影響障癌,放射性物質(zhì)發(fā)生泄漏凌外。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評(píng)論 3 307
  • 文/蒙蒙 一涛浙、第九天 我趴在偏房一處隱蔽的房頂上張望康辑。 院中可真熱鬧,春花似錦轿亮、人聲如沸晾捏。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽惦辛。三九已至,卻和暖如春仓手,著一層夾襖步出監(jiān)牢的瞬間胖齐,已是汗流浹背玻淑。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留呀伙,地道東北人补履。 一個(gè)月前我還...
    沈念sama閱讀 45,497評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像剿另,于是被迫代替她去往敵國(guó)和親箫锤。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容