翻譯原文鏈接 轉(zhuǎn)帖/轉(zhuǎn)載請(qǐng)注明出處
原文鏈接@medium.com 發(fā)表于2017/08/30
我在防垃圾郵件霹琼,防病毒和防惡意軟件領(lǐng)域已經(jīng)工作了15年趟章,前后在好幾個(gè)公司任職染厅。我知道這些系統(tǒng)最后都會(huì)因?yàn)橐幚砗A康臄?shù)據(jù)而變得非常復(fù)雜咳焚。
我現(xiàn)在是smsjunk.com的CEO并且是KnowBe4的首席架構(gòu)師。這兩個(gè)公司在網(wǎng)絡(luò)安全領(lǐng)域都非澄庠埽活躍张抄。
有趣的是,在過去10年里作為一個(gè)碼農(nóng)洼怔,所有我經(jīng)歷過的網(wǎng)站后臺(tái)開發(fā)用的幾乎都是用Ruby on Rails
署惯。不要誤解,我很喜歡Ruby on Rails
并且認(rèn)為它是一個(gè)非常棒的開發(fā)環(huán)境镣隶。往往在一段時(shí)間后极谊,你開始以ruby的方式來設(shè)計(jì)系統(tǒng)。這時(shí)你會(huì)忘記利用多線程安岂,并行轻猖,快速執(zhí)行(fast executions)和較小的內(nèi)存開銷(small memory overhead),軟件的架構(gòu)會(huì)變得簡(jiǎn)單而高效域那。很多年來咙边,我一直是C/C++
,Delphi
和C#
的開發(fā)者次员。我開始意識(shí)到使用正確的工具败许,工作會(huì)變得簡(jiǎn)單很多。
我對(duì)語言和框架并不是很熱衷淑蔚。我相信效率檐束,產(chǎn)出和代碼的可維護(hù)性取決于你如何架構(gòu)一個(gè)簡(jiǎn)潔地解決方案。
問題
在開發(fā)我們的匿名遙測(cè)和分析系統(tǒng)時(shí)束倍,我們的目標(biāo)是能夠處理從上百萬個(gè)端點(diǎn)發(fā)來的大量POST請(qǐng)求被丧。HTTP請(qǐng)求處理函數(shù)會(huì)收到包含很多載荷(payloads)的JSON文檔。這些載荷(payloads)需要被寫到Amazon S3上绪妹,接著由map-reduce系統(tǒng)來處理甥桂。
通常我們會(huì)創(chuàng)建一個(gè)worker池架構(gòu)(worker-tier architecture)。利用如下的一些工具:
然后設(shè)置兩個(gè)集群邮旷,一個(gè)用作處理HTTP請(qǐng)求黄选,另外一個(gè)用作workers。這樣我們能夠根據(jù)處理的后臺(tái)工作量進(jìn)行擴(kuò)容婶肩。
從一開始我們小組就覺得應(yīng)該用Go來實(shí)現(xiàn)办陷,因?yàn)樵谟懻撾A段我們估計(jì)這可能會(huì)是一個(gè)處理非常大流量的系統(tǒng)。我已經(jīng)使用Go語言兩年并用它在工作中開發(fā)了一些系統(tǒng)律歼,但它們都沒有處理過這么大的負(fù)載(load)民镜。
我們首先創(chuàng)建了幾個(gè)結(jié)構(gòu)來定義HTTP請(qǐng)求的載荷。我們通過POST請(qǐng)求接收這些載荷险毁,然后用一個(gè)函數(shù)上傳到S3 bucket制圈。
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}
type Payload struct {
// [redacted]
}
func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())
bucket := S3Bucket
b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}
// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"
return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
簡(jiǎn)單地使用Goroutines
一開始我們用了最簡(jiǎn)單的方法來實(shí)現(xiàn)POST請(qǐng)求的處理函數(shù)们童。我們嘗試通過goroutine來并行處理請(qǐng)求。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
go payload.UploadToS3() // <----- DON'T DO THIS
}
w.WriteHeader(http.StatusOK)
}
對(duì)于適量的負(fù)載鲸鹦,這個(gè)方案應(yīng)該沒有問題慧库。但是負(fù)載增加以后這個(gè)方法就不能很好地工作。當(dāng)我們把這個(gè)版本部署到生產(chǎn)環(huán)境中后馋嗜,我們遇到了比預(yù)期大一個(gè)數(shù)量級(jí)的請(qǐng)求量齐板。我們完全低估了流量。
這個(gè)方法有些不盡如人意葛菇。它無法控制創(chuàng)建goroutine的數(shù)量覆积。因?yàn)槲覀兠糠昼娛盏搅艘话偃f個(gè)POST請(qǐng)求,上面的代碼很快就奔潰了熟呛。
再次嘗試
我們需要一個(gè)不同的解決方案宽档。在一開始,我們就討論到需要把HTTP請(qǐng)求處理函數(shù)寫的簡(jiǎn)潔庵朝,然后把處理工作轉(zhuǎn)移到后臺(tái)吗冤。當(dāng)然,這是你在Ruby on Rails
世界里必須做的九府,否則你會(huì)阻塞所有worker的工作(例如puma椎瘟,unicorn,passenger等等侄旬,我們這里就不繼續(xù)討論JRuby了)肺蔚。我們需要用到Resque,Sidekiq儡羔,SQS等常用的解決方案宣羊。這個(gè)列表可以很長(zhǎng),因?yàn)橛性S多方法來完成這項(xiàng)任務(wù)汰蜘。
第二個(gè)版本是創(chuàng)建帶緩沖的channel仇冯。這樣我們可以把工作任務(wù)放到隊(duì)列里然后再上傳到S3。因?yàn)榭梢钥刂脐?duì)列的長(zhǎng)度并且有充足的內(nèi)存族操,我們覺得把工作任務(wù)緩存在channel隊(duì)列里應(yīng)該沒有問題苛坚。
var Queue chan Payload
func init() {
Queue = make(chan Payload, MAX_QUEUE)
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
Queue <- payload
}
...
}
然后我們需要從隊(duì)列里提取工作任務(wù)并進(jìn)行處理。代碼下圖所示:
func StartProcessor() {
for {
select {
case job := <-Queue:
job.payload.UploadToS3() // <-- STILL NOT GOOD
}
}
}
坦白的說色难,我不知道我們當(dāng)時(shí)在想什么泼舱。這肯定是熬夜喝紅牛的結(jié)果。這個(gè)方法并沒有給我們帶來任何幫助枷莉。隊(duì)列僅僅是將問題延后了娇昙。我們的處理函數(shù)(processor)一次僅上傳一個(gè)載荷(payload),而接收請(qǐng)求的速率比一個(gè)處理函數(shù)上傳S3的能力大太多了依沮,帶緩沖的channel很快就到達(dá)了它的極限涯贞。從而阻塞了HTTP請(qǐng)求處理函數(shù)往隊(duì)列里添加更多的工作任務(wù)。
我們僅僅是延緩了問題的觸發(fā)危喉。系統(tǒng)在倒計(jì)時(shí)宋渔,最后還是崩潰了。在這個(gè)有問題的版本被部署之后辜限,系統(tǒng)的延遲以恒定速度在不停地增長(zhǎng)皇拣。
更好的解決辦法
我們決定使用Go channel的常用編程模式。使用一個(gè)兩級(jí)channel系統(tǒng)薄嫡,一個(gè)用來存放任務(wù)隊(duì)列氧急,另一個(gè)用來控制處理任務(wù)隊(duì)列的并發(fā)量。
這里的想法是根據(jù)一個(gè)可持續(xù)的速率將S3上傳并行化毫深。這個(gè)速率不會(huì)使機(jī)器變慢或者導(dǎo)致S3的連接錯(cuò)誤吩坝。我們選擇了一個(gè)Job/Worker模式。如果你們對(duì)Java
哑蔫,C#
等語言熟悉的話钉寝,可以把它想象成是Go語言用channel來實(shí)現(xiàn)的工作線程池。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
// Job represents the job to be run
type Job struct {
Payload Payload
}
// A buffered channel that we can send work requests on.
var JobQueue chan Job
// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
我們修改了HTTP請(qǐng)求處理函數(shù)來創(chuàng)建一個(gè)含有載荷(payload)的Job
結(jié)構(gòu)闸迷,然后將它送到一個(gè)叫JobQueue
的channel嵌纲。worker會(huì)對(duì)它們進(jìn)行處理。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
// let's create a job with the payload
work := Job{Payload: payload}
// Push the work onto the queue.
JobQueue <- work
}
w.WriteHeader(http.StatusOK)
}
在初始化服務(wù)的時(shí)候腥沽,我們創(chuàng)建了一個(gè)Dispatcher
并且調(diào)用了Run()
函數(shù)來創(chuàng)建worker池逮走。這些worker會(huì)監(jiān)聽JobQueue
上是否有新的任務(wù)并進(jìn)行處理。
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
下面是我們的dispatcher實(shí)現(xiàn)代碼:
type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
這里我們提供了創(chuàng)建worker的最大數(shù)目作為參數(shù)今阳,并把這些worker加入到worker池里师溅。因?yàn)槲覀円呀?jīng)在docker化的Go環(huán)境里使用了Amazon的Elasticbeanstalk并且嚴(yán)格按照12-factor方法來配置我們的生產(chǎn)環(huán)境,這些參數(shù)值可以從環(huán)境變量里獲得盾舌。我們可以方便地控制worker數(shù)目和任務(wù)隊(duì)列的長(zhǎng)度险胰。我們可以快速地調(diào)整這些值而不需要重新部署整個(gè)集群。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
部署了新版本之后矿筝,我們看到系統(tǒng)延遲一下子就降到了可以忽略的量級(jí)起便。同時(shí)處理請(qǐng)求的能力也大幅攀升。
在Elastic Load Balancers熱身后幾分鐘窖维,我們看到Elasticbeanstalk應(yīng)用開始處理將近每分鐘一百萬個(gè)請(qǐng)求榆综。我們的流量通常在早上的時(shí)候會(huì)攀升至超過每分鐘一百萬個(gè)請(qǐng)求。同時(shí)铸史,我們也將服務(wù)器的數(shù)目從100臺(tái)縮減到了20臺(tái)鼻疮。
通過合理地配置集群和auto-scaling,我們能夠做到只配置4臺(tái)EC2 c4.Large實(shí)例琳轿。然后當(dāng)CPU使用率持續(xù)5分鐘在90%以上時(shí)用Elastic Auto-Scaling來創(chuàng)建新的實(shí)例判沟。
結(jié)束語
對(duì)我來說簡(jiǎn)潔(simplicity)是第一位的耿芹。我們可以利用無數(shù)隊(duì)列,很多后臺(tái)worker以及復(fù)雜的部署來設(shè)計(jì)一個(gè)復(fù)雜系統(tǒng)挪哄,最終我們還是使用了Elasticbeanstalk auto-scaling的強(qiáng)大功能和Go語言提供的應(yīng)對(duì)并發(fā)的簡(jiǎn)單方法吧秕。用僅僅4臺(tái)機(jī)器(可能還不如我的MacBook Pro強(qiáng)大)來處理每分鐘一百萬次POST請(qǐng)求對(duì)Amazon S3進(jìn)行寫操作。
每項(xiàng)任務(wù)都有對(duì)應(yīng)的正確工具迹炼。當(dāng)你的Ruby on Rails
系統(tǒng)需要一個(gè)很強(qiáng)大的HTTP請(qǐng)求處理器砸彬,可以嘗試看看ruby生態(tài)系統(tǒng)以外的其它更強(qiáng)大的選項(xiàng)。