django+celery+psutil+channels+elasticsearch實現(xiàn)監(jiān)控實時告警功能--一次面試引發(fā)的學習經歷
2019-07-25去參加了一個面試谊娇,面試過程持續(xù)了一個小時左右诫尽,聊的挺細的肢预。也暴露出一些問題读存,我沒有太深入的研究谨娜,導致某些大數(shù)據(jù)組件的優(yōu)化方面沒有回答上來航攒。整體面試效果還可以吧。面試官對我還算是沒有放棄趴梢,反復想驗證我是否有較強的學習能力漠畜,臨走時還給我布置了一個作業(yè):
使用Celery (http://docs.celeryproject.org/en/latest/index.html)中周期性任務功能,通過Python第三方模塊中的psutil (https://psutil.readthedocs.io/en/latest/)模塊坞靶,對服務運行環(huán)境 (即:本機操作系統(tǒng)) 的運行進程的活動信息 (如:CPU 使用率憔狞、內存使用率、I/O 占比) 抓取彰阴,將歷史抓取周期數(shù)據(jù)存放入 elasticsearch 中瘾敢,通過Django 進行查詢和展示,這里只需要使用Django 的 template 進行列表的查詢和展示就行尿这,并不需要進行圖形化展示簇抵。
附加項:上述抓取的數(shù)據(jù),通過配置文件對特定進程進行實時告警處理妻味,當該進程的活動信息數(shù)據(jù)出現(xiàn)異常表象時 (如CPU 使用率超過閾值等情況) 正压,能夠在 Celery 的 worker 中直接通過Django 的channels 模塊推送給前臺實現(xiàn)實時告警功能。
驗收時間
建議一周责球,最遲兩周提交驗收結果
我當時還問了一嘴焦履,是不是做不出來拓劝,就沒有然后了。對方還是禮貌的回復嘉裤,即使做不出來郑临,還是可以有后續(xù)溝通的。但是我心里還是明白屑宠,這個作業(yè)是對方在考量我的能力厢洞。如果做不出來,丟人暗浞睢躺翻!因為面試時說了,自己做過一個運維管理系統(tǒng)卫玖,這要是做不出來公你,不得灰溜溜的啊,還咋見人啊假瞬。
拿到作業(yè)陕靠,我內心挺忐忑的,因為除了psutil外脱茉,其他的我都沒接觸過剪芥。對于未知的恐懼,瞬間暴露了出來琴许,一方面是時間的期限税肪,一方面是技術的未知,折磨啊虚吟。于是先對這個需求做了拆分寸认,大致分了5部分签财,celery串慰,psutil,elasticsearch唱蒸,django讀取es數(shù)據(jù)和channels邦鲫。
當天大概查了一下celery的相關知識,不好理解啊神汹。
周五庆捺,開始具體研究celery,怎么結合django啥的屁魏,當天基本沒有收獲滔以,網(wǎng)上的資料真的是坑人啊,魚龍混雜氓拼!
周六你画,我背負著巨大的壓力抵碟,沒有陪老婆和孩子,在家憋了一天坏匪,總算在晚上拟逮,實現(xiàn)了celery+psutil采集數(shù)據(jù),但是因為elasticsearch我并不熟悉适滓,所以為了結合django敦迄,我采用了mysql數(shù)據(jù)庫作為后端存儲。但是在實際使用過程中凭迹,雖然成功創(chuàng)建了django_celery_results_taskresult表罚屋,但是卻無法向其中存儲數(shù)據(jù),難靶岢瘛Q睾蟆!朽砰!后來終于找到了不向表中存儲數(shù)據(jù)的原因尖滚,是因為我修改了settings中關于CELERY_RESULT_BACKEND,但是卻沒有重啟celery worker瞧柔,所以新的配置就一直沒有生效漆弄,坑爹啊造锅!
周天陪孩子玩了一天撼唾,晚上有時間了,又查了一下哥蔚,也沒啥收獲倒谷。
周一,我開始投入時間做大量的嘗試糙箍,但就是整不明白該表與django的orm如何關聯(lián)上渤愁,下班時我問了django QQ群里的群主,對方簡明幾句就給出了我的答案:
直接可以這么引用model深夯,這樣就不用考慮如何將django_celery_results_taskresult表與自建的model相關聯(lián)了抖格,而是通過自帶的方式,獲取到對應的model:TaskResult咕晋。瞬間我就有感覺了雹拄,晚上回家直接就搞定了數(shù)據(jù)的解析工作,由string類型轉換成了dict類型掌呜,并成功傳到前臺滓玖,顯示了出來!
周二质蕉,有了之前的成功势篡,我繼續(xù)使用mysql數(shù)據(jù)庫作為后端损姜,開始研究channels,了解到channels是實現(xiàn)websocket的官方推薦方式殊霞,具體配置說明后續(xù)給出摧阅,難點就在于如何在consumers外向channels發(fā)送信息,這就要歸功于channel_layer了绷蹲!
周三棒卷,我在task中比规,return之前蜒什,對獲取的數(shù)據(jù)做了異常判斷灾常,對于出現(xiàn)異常的數(shù)據(jù)铃拇,通過channel_layer發(fā)送回前端钞瀑,前端收到websocket信息后,讀取并展示出來了慷荔。我給面試官一個回復雕什,說我已經實現(xiàn)了大體的功能,只是后端數(shù)據(jù)庫我采用的是mysql显晶,因為elasticsearch沒有像mysql那樣現(xiàn)成的engine贷岸,可以直接結合django使用。其實我的意思是磷雇,差不多就行了偿警,沒必要太糾結后端采用的到底是什么數(shù)據(jù)庫。但是對方給我的回復是要用elasticsearch倦春,我見對方沒有松口的意思户敬,我就提了一嘴落剪,我看到網(wǎng)上有django-elasticsearch-dsl睁本,對方說這個是可以的。好吧忠怖,那我就繼續(xù)下去吧部逮,這時游沿,心里是有底的了,至少不會因為做不出來怔接,而灰頭土臉航闺。
周四侮措,我按照django-elasticsearch-dsl官方提供的方式畏吓,實現(xiàn)了簡單的model創(chuàng)建巴粪,存儲。但是如何結合celery backend設置的數(shù)據(jù)派哲,展示到django前臺,開始還沒有搞清楚。但是好在自己的頭腦還是靈活的逃片,我先是通過kibana查看了elasticsearch中存儲的數(shù)據(jù)格式呀狼,了解到index,type的概念就像對應的mysql的database和table一樣她奥,于是大膽的嘗試寫model,并修改對應的documents.py,最終真的跟我解析es中的field一樣隙赁,跟model對應上了。
周五厚掷,我整理了文檔和代碼抡爹,還有requirements.txt,發(fā)回給了面試官。
心路歷程就是這樣的袋狞,做出來后棚点,我真的很感激自己砌梆。從一無所知,到全部完成;從恐懼未知坟比,到收獲滿滿,我用了一周的時間籍琳,征服了恐懼,收獲了知識宣谈,同時也再次驗證了我之前說過的話嗦嗡,困難時刻,唯有自救谈宛,才能找到出路。如果遇到困難就退縮了,就放棄了,那根本不會有成功的時刻侄柔,面對未知和恐懼,只有不斷的去探索,才能找到出路啸胧。
下面整理一下具體的操作過程:
1. celery部分
首先需要在settings中引入app和djcelery:
然后是創(chuàng)建celery.py陷谱,用來創(chuàng)建celery實例,內容為:
并在項目主目錄下的__init__.py文件中添加如下位迂,目的是讓項目加載時自動加載celery坝橡,以便于后續(xù)可以在項目中直接使用celery命令啟動worker和beat:
celery本身是獨立于django的氯窍,即可以在django項目之外獨立運行狼讨。當然也可以結合django一起使用,這樣做可以充分利用django的優(yōu)勢柒竞,直接將個配置信息寫到settings中政供,一目了然,如下寫明了broker的地址朽基,backend的地址布隔,beat schedule杉适,task等信息:
其中,task是指定了在agent.tasks文件中的scrapy_data方法起宽,于是就需要在指定的app中,為celery創(chuàng)建task了晶乔,本次task就是通過psutil采集服務器的各項詳細信息,并將采集到的數(shù)據(jù)返回帜篇,這里的返回是指celery配置的backend,如果是mysql兔沃,就存儲到mysql數(shù)據(jù)庫中;如果是elasticsearch,就保存到es中逼肯;當然也可以是redis或者干脆就不接收返回值:
@app.task
def scrapy_data():
??? hostname = socket.gethostname()
??? cpu = psutil.cpu_percent()
??? mem = psutil.virtual_memory().percent
??? net_in =psutil.net_io_counters().bytes_recv
??? net_out =psutil.net_io_counters().bytes_sent
# 在此判斷各項指標是否超過閾值
??? # cpu閾值為:80蛾绎; 內存閾值為:80同规;
??? cpu_limit = 80
mem_limit = 80
if cpu > cpu_limit:
??????? current_time =datetime.datetime.now().strftime(
'%Y-%m-%d %H:%M:%S')
??????? channel_layer =get_channel_layer()
group_name = "agent"
async_to_sync(channel_layer.group_send)(
"agents",
{
"type": "user.message",
"text": str(current_time)+" CPU利用率已超過: "+str(cpu_limit)+"%褥影,當前CPU利用率為: "+str(cpu)+"%",
},
)
if mem > mem_limit:
??????? current_time =datetime.datetime.now().strftime(
'%Y-%m-%d %H:%M:%S')
??????? channel_layer =get_channel_layer()
group_name = "agent"
async_to_sync(channel_layer.group_send)(
"agent",
{
"type": "user.message",
"text": str(current_time)+" 內存利用率已超過: "+str(mem_limit)+"%耸成,當前內存利用率為: "+str(mem)+"%",
},
)
return hostname,cpu,mem,net_in,net_out
本次我就是先配置了celery的backend為mysql,因為mysql與django orm可以很輕松的結合汤徽,擁有現(xiàn)成的engine谒府。需要在settings中配置如下:
并指定backend拼坎,這里表示使用django配置的db:
CELERY_RESULT_BACKEND = 'django-db'? #配置backend
最后別忘了生成數(shù)據(jù)庫表:
python manage.py makemigrations
python manage.py migrate
順利的話浮毯,就會在數(shù)據(jù)庫中看到生成的表了:
其中,django_celery_results_taskresult表就是默認創(chuàng)建的泰鸡,用來存儲celery抓取后的數(shù)據(jù)债蓝。
接下來就是驗證celery worker和beat了:
在項目的主目錄下,
啟動celery worker:
celery -A homework worker-l info
啟動celery beat:
celery -A homework beat-l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
順利的話盛龄,效果如下:
和
我在運行celery時惦蚊,遇到了問題,最初我是想通過直接運行task文件中的方法讯嫂,來啟動celery worker的蹦锋,但是因為tasks.py是在django項目中的,不允許以單個文件的形式單獨運行欧芽,所以要不就是脫離開django框架莉掂,要不就是結合django框架使用。如果是脫離了django框架千扔,那么雖然運行task簡單了憎妙,但是運行beat就費勁了,所以還是想結合django框架曲楚,一起使用celery worker和beat厘唾。經過調試,最終是在項目的主目錄下啟動celery worker和beat龙誊,而不是具體到tasks中的方法抚垃。
這里需要注意的一點是,如果修改了settings中或者tasks中的內容趟大,必須要重啟celery worker鹤树,否則新的配置信息是無法生效的,我就在這吃虧了Q沸唷:辈!
數(shù)據(jù)可以存儲到mysql的django_celery_results_taskresult表了叽讳,但是怎么跟django orm結合追他,展示到前端頁面上呢?又遇到了難題岛蚤,如果是我自建的model邑狸,很容易對應orm,但是這個表是celery自動創(chuàng)建的灭美,如何跟我需要的model字段對應上呢推溃?這個問題排查了一天,也沒有查出來。最終是通過問qq群里的大牛铁坎,得出了答案蜂奸,原來django_celery_results自帶了models,最終django_celery_results_taskresult表對應的model就是TaskResult:
通過這種方式就可以不費吹灰之力的得到該表中的數(shù)據(jù)了:
對于上述表硬萍,我最終只需要result這個字段中的數(shù)據(jù)扩所,其他的對我來說都沒用,所以我需要對數(shù)據(jù)做一下整理朴乖。首先是獲取result字段的所有返回值祖屏,他們的類型是string的,所以得通過split等方法最終拿到每個字段买羞,然后轉變成dict格式的數(shù)據(jù)袁勺,最終存入list中,返回給前端:
#database為mysql的解析方式畜普。CBV
class ScrapyDataView(View):
def get(self,request,*args,**kwargs):
#從mysql數(shù)據(jù)庫中的django_celery_results_taskresult表中獲取數(shù)據(jù)
??????? results = TaskResult.objects.values_list('result', flat=True)
??????? list = []
??????? id =
0
for res in results:
??????????? id +=
1
dict = {}
print(res)
??????????? data = res.split(
',')
print(data[0][1:])
print(data[1].strip())
print(data[2].strip())
print(data[3].strip())
print(data[4][:-1].strip())
??????????? dict[
"id"] = id
??????????? dict[
"hostname"] = data[0][1:].strip()
??????????? dict[
"cpu"] = data[1].strip()
??????????? dict[
"mem"] = data[2].strip()
??????????? dict[
"net_in"] = data[3].strip()
??????????? dict[
"net_out"] = data[4][:-1].strip()
??????????? list.append(dict)
return render(request, 'list.html', {'object_list': list})
前端只需要負責展示出來就OK了:
2. channels部分
官網(wǎng)真是最好的選擇捌诜帷:
https://channels.readthedocs.io/en/latest/installation.html
我之所以花費了很長的時間研究channels,就是因為誤參考了很多文章吃挑,結果東拼西湊的都亂套了钝荡,肯定無法運行,最終還是老老實實的參照官網(wǎng)的做法舶衬,來配置channels2和ā!逛犹!
Channels is available onPyPI - to install it, just run:
pipinstall-Uchannels
Once that’s done, you
should add?channels?to
your?INSTALLED_APPSsetting:
INSTALLED_APPS =(
??? 'django.contrib.auth',
??? 'django.contrib.contenttypes',
??? 'django.contrib.sessions',
??? 'django.contrib.sites',
??? ...
??? 'channels',
)
Then, make a default
routing in?myproject/routing.py:
from channels.routing importProtocolTypeRouter
application =ProtocolTypeRouter({
??? # Empty for now (http->django views is added by default)
})
And finally, set
your?ASGI_APPLICATIONsettingto point to that routing object as your root application:
ASGI_APPLICATION = "myproject.routing.application"
From the command
line,?cdintoa directory where you’d like to store your code, then run the followingcommand:
$ django-admin startproject mysite
This will create a?mysitedirectoryin your current directory with the following contents:
mysite/
??? manage.py
??? mysite/
??????? __init__.py
??????? settings.py
??????? urls.py
??????? wsgi.py
Creating theChat app
We will put the code for the chat serverin its own app.
Make sure you’re in the same directory
as?manage.pyand typethis command:
$ python3manage.py startapp chat
That’ll create a directory?chat, which is laid out like this:
chat/
??? __init__.py
??? admin.py
??? apps.py
??? migrations/
??????? __init__.py
??? models.py
??? tests.py
??? views.py
For the purposes of this tutorial, we
will only be working with?chat/views.py?and?chat/__init__.py. So remove all other files from
the?chatdirectory.
After removing unnecessary files,
the?chatdirectory should look like:
chat/
??? __init__.py
??? views.py
We need to tell our project that
the?chat?app is installed. Edit the?mysite/settings.py?file and add?'chat'?to the?INSTALLED_APPSsetting. It’ll look like this:
# mysite/settings.py
INSTALLED_APPS = [
??? 'chat',
??? 'django.contrib.admin',
??? 'django.contrib.auth',
??? 'django.contrib.contenttypes',
??? 'django.contrib.sessions',
??? 'django.contrib.messages',
??? 'django.contrib.staticfiles',
]
Add the indexview
We will now create the first view, anindex view that lets you type the name of a chat room to join.
Create a?templates?directory in your?chat?directory. Within the?templates?directory you have just created,
create another directory called?chat, and within that create a file
called?index.htmlto hold the template for the index view.
Your chat directory should now looklike:
chat/
??? __init__.py
??? templates/
??????? chat/
??????????? index.html
??? views.py
Put the following code in?chat/templates/chat/index.html:
???
??? Chat Rooms
??? What chat room would you like to enter?
???
???
???
???????document.querySelector('#room-name-input').focus();
???????document.querySelector('#room-name-input').onkeyup = function(e) {
??????????? if (e.keyCode === 13) {? // enter, return
???????????????document.querySelector('#room-name-submit').click();
??????????? }
??????? };
???????document.querySelector('#room-name-submit').onclick = function(e) {
??????? ????var roomName =document.querySelector('#room-name-input').value;
??????????? window.location.pathname = '/chat/'+ roomName + '/';
??????? };
Create the view function for the room
view. Put the following code in?chat/views.py:
# chat/views.py
from django.shortcuts import render
def index(request):
??? return render(request, 'chat/index.html', {})
To call the view, we need to map it to aURL - and for this we need a URLconf.
To create a URLconf in the chat directory,
create a file called?urls.py. Your app directory should now looklike:
chat/
??? __init__.py
??? templates/
??????? chat/
??????????? index.html
??? urls.py
??? views.py
In the?chat/urls.pyfile include the following code:
# chat/urls.py
from django.conf.urls import url
from . import views
urlpatterns = [
??? url(r'^$', views.index, name='index'),
]
The next step is to point the root
URLconf at the?chat.urls?module. In?mysite/urls.py, add an import for?django.conf.urls.include?and insert an?include()?in the?urlpatternslist, so you have:
# mysite/urls.py
from django.conf.urls import include, url
from django.contrib import admin
urlpatterns = [
??? url(r'^chat/', include('chat.urls')),
??? url(r'^admin/', admin.site.urls),
]
Let’s verify that the index view works.Run the following command:
$ python3manage.py runserver
You’ll see the following output on thecommand line:
Performing system checks...
System check identified no issues (0 silenced).
You have 13 unapplied migration(s). Your project may not work properly until you apply the migrations for app(s): admin, auth, contenttypes, sessions.
Run 'python manage.py migrate' to apply them.
February 18, 2018 - 22:08:39
Django version 1.11.10, using settings 'mysite.settings'
Starting development server at http://127.0.0.1:8000/
Quit the server with CONTROL-C.
自此端辱,僅僅是按照官網(wǎng)搭建了一個普通的django項目和app,并且安裝了channels模塊和配置圾浅,但是卻沒有把channels與django掛上鉤掠手!
官網(wǎng)也是這么說的:
Integratethe Channels library
So far we’ve justcreated a regular Django app; we haven’t used the Channels library at all. Nowit’s time to integrate Channels.
Let’s start by creating
a root routing configuration for Channels. A Channels?routing configurationissimilar to a Django URLconf in that it tells Channels what code to run when anHTTP request is received by the Channels server.
We’ll start with an
empty routing configuration. Create a file?mysite/routing.pyandinclude the following code:
# mysite/routing.py
from channels.routing importProtocolTypeRouter
application =ProtocolTypeRouter({
??? # (http->django views is added by default)
})
Now add the Channels
library to the list of installed apps. Edit the?mysite/settings.py?file
and add?'channels'?to
the?INSTALLED_APPSsetting.It’ll look like this:
# mysite/settings.py
INSTALLED_APPS =[
??? 'channels',
??? 'chat',
??? 'django.contrib.admin',
??? 'django.contrib.auth',
??? 'django.contrib.contenttypes',
??? 'django.contrib.sessions',
??? 'django.contrib.messages',
??? 'django.contrib.staticfiles',
]
You’ll also need to
point Channels at the root routing configuration. Edit the?mysite/settings.pyfileagain and add the following to the bottom of it:
# mysite/settings.py
# Channels
ASGI_APPLICATION = 'mysite.routing.application'
With Channels now in the
installed apps, it will take control of the?runservercommand, replacingthe standard Django development server with the Channels development server.
Let’s ensure that theChannels development server is working correctly. Run the following command:
$ python3 manage.py runserver
You’ll see the followingoutput on the command line:
Performingsystemchecks...
Systemcheckidentifiednoissues(0silenced).
Youhave13unappliedmigration(s).Yourprojectmaynotworkproperlyuntilyouapplythemigrationsforapp(s):admin,auth,contenttypes,sessions.
Run 'python manage.py migrate'toapplythem.
February 18, 2018 - 22:16:23
Djangoversion1.11.10,usingsettings'mysite.settings'
StartingASGI/Channelsdevelopmentserverathttp://127.0.0.1:8000/
QuittheserverwithCONTROL-C.
2018-02-18 22:16:23,729 -INFO-server-HTTP/2supportnotenabled(installthehttp2andtlsTwistedextras)
2018-02-18 22:16:23,730 -INFO-server-Configuringendpointtcp:port=8000:interface=127.0.0.1
2018-02-18 22:16:23,731 -INFO-server-ListeningonTCPaddress127.0.0.1:8000
出現(xiàn)了上面標紅的部分,才表明是django與channels關聯(lián)上了狸捕!因為在settings中,添加上了ASGI_APPLICATION众雷,并且指定了asgi模式下的routing文件灸拍。如果沒有做上述配置,啟動時提示的信息是:
Starting development server at http://0.0.0.0:8002/
我這次也是在這塊吃了很大的一個虧鸡岗,混亂的配置,讓我根本就沒有使django與channels掛上鉤轩性,所以websocket自然就無法建立連接了狠鸳!排查來排查去揣苏,都找不到原因悯嗓,后來看官網(wǎng),才發(fā)現(xiàn)脯厨,原來是asgi沒有關聯(lián)上的問題坑质。重新按照官網(wǎng)合武,穩(wěn)穩(wěn)當當?shù)呐渲靡槐楹螅统晒α宋卸螅P聯(lián)上了channels,加載了routing汤善,就可以解析websocket的請求到consumers了:
這里要在app(agent)下創(chuàng)建consumers.py萎津,用來解析websocket請求:
from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
from channels.layers import get_channel_layer
channel_layer = get_channel_layer()
# WebsocketConsumer
class EchoConsumer(WebsocketConsumer):
def connect(self):
self.group_name = "agent"
async_to_sync(self.channel_layer.group_add)(self.group_name, self.channel_name)
# 返回給receive方法處理
??????? self.accept()
def receive(self, text_data):
??????? async_to_sync(
self.channel_layer.group_send)(
self.group_name,
{
"type": "user.message",
"text": text_data,
},
)
def user_message(self, event):
# 消費
??????? self.send(text_data=event["text"])
# self.send(text_data=json.dumps({'text':"hahaha"}))
def disconnect(self, close_code):
??????? async_to_sync(
self.channel_layer.group_discard)(self.group_name, self.channel_name)
值得一提的是抹镊,處理websocket的方式,這里引入了get_channel_layer的概念垮耳,它就像是一個接口一樣,讓所有的websocket都拋到這個接口中去俊嗽,然后將這個接口中的websocket發(fā)送給前端做websocket解析铃彰,而這個接口不是獨有的,而是共有的牙捉。這就表明,不光是在consumers中可以使用get_channel_layer芬位,consumers外也可以向該接口中拋websocket带到,連接該接口的前端頁面上,就都可以展示出具體的數(shù)據(jù)被饿。
為此,還需要在settings中添加如下配置锹漱,使用redis來緩存websocket,具體還需要添加channels-redis模塊:
# django-channels配置CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [("172.16.2.222", 6379)],
},
},
}
這里需要注意毕泌,websocket的發(fā)起嗅辣,是從前端頁面中的js開始的,并且前端頁面與后端建立了長連接澡谭,負責等待后端發(fā)來的websocket并解析出來:
var socket = new WebSocket('ws://' + window.location.host + '/ws/');
socket.onopen = function open() {
????? console.
log('WebSockets connection created.');
// socket.send("Hello world!")
};
socket.onerror = function error() {
????? console.
log('WebSockets connection error.');
};
socket.onmessage = function message(e) {
????? console.
log('WebSockets connection message.');
console.log(e.data)
????? $(
'#terms').append(e.data).append("<br/>")
??? }
;
??? if
(socket.readyState == WebSocket.OPEN) {
????? socket.
onopen();
}
</script>
當前端頁面唄訪問時蛙奖,就會自動發(fā)送websocket連接請求,因為是ws類型雁仲,所以解析時不走普通的urls,而是之前配置好的routing:
并最終解析到EchoConsumer類中缸兔,進而成功建立websocket連接吹艇!
當task中設定了閾值,比如cpu超過80抛猖,就會觸發(fā)channels鼻听,向前端頁面發(fā)送告警信息,這就屬于在consumer意外向websocket發(fā)送信息精算,所以就要用到get_channel_layer:
if cpu > cpu_limit:
??? current_time = datetime.datetime.now().strftime(
'%Y-%m-%d %H:%M:%S')
??? channel_layer = get_channel_layer()
group_name = "agent"
async_to_sync(channel_layer.group_send)(
"agents",
{
"type": "user.message",
"text": str(current_time)+" CPU利用率已超過: "+str(cpu_limit)+"%灰羽,當前CPU利用率為: "+str(cpu)+"%",
},
)
最終在前端的頁面js中,可以接收到后端發(fā)來的websocket信息廉嚼,并且追加到頁面上:
3.?elasticsearch部分
以上,除數(shù)據(jù)庫外恐似,已經完成了全部的需求傍念。但是當想切換django的數(shù)據(jù)庫engine為elasticsearch時,卻發(fā)現(xiàn)双藕,django根本就不支持elasticsearch engine阳仔。只支持sqlite3,mysql近范,postgresql和oracle。查詢中叶堆,發(fā)現(xiàn)有個django-elasticsearch-dsl可以將django與elasticsearch相關聯(lián)稚照。于是就分兩步走,一個是安裝elasticsearch數(shù)據(jù)庫上枕,通過kibana驗證數(shù)據(jù)已經存入弱恒;然后再通過django-elasticsearch-dsl操作elasticsearch讀寫。
需要注意返弹,kibana的版本必須要與es的版本一致才行,所以從需要下載kibana 6.8.2拉背,修改es的連接地址后默终,就可以正常連接了犁罩,在控制臺上可以看到對應的index和type:
這里走了點彎路两疚,先是使用了docker方式部署的elasticsearch 7.2,開始一切都順利丐巫。將settings中的配置CELERY_RESULT_BACKEND調整為如下:
CELERY_RESULT_BACKEND = 'elasticsearch://172.16.2.221:9200/elastic4/doc'
但是當按照django-elasticsearch-dsl官方提供的方式執(zhí)行:./manage.py
search_index --rebuild
創(chuàng)建index時勺美,就報錯。查找原因谓着,發(fā)現(xiàn)是我使用的elasticsearch版本的問題坛掠。官方明確規(guī)定了:
而我使用的es版本是7.2,所以出問題了屉栓。
補充一下:
django-elasticsearch-dsl官方地址:
https://pypi.org/project/django-elasticsearch-dsl/
InstallDjango Elasticsearch DSL:
pip install https://github.com/sabricot/django-elasticsearch-dsl/archive/6.4.1.tar.gz
Then add?django_elasticsearch_dsltothe INSTALLED_APPS
You must
define?ELASTICSEARCH_DSLinyour django settings.
For example:
ELASTICSEARCH_DSL={
??? 'default': {
??????? 'hosts': 'localhost:9200'
??? },
}
ELASTICSEARCH_DSL?is then passed to?elasticsearch-dsl-py.connections.configure?(see?here).
Then for amodel:
# models.py
class Car(models.Model):
??? name=models.CharField()
??? color=models.CharField()
??? description=models.TextField()
??? type=models.IntegerField(choices=[
??????? (1, "Sedan"),
??????? (2, "Truck"),
??????? (4, "SUV"),
??? ])
To make this
model work with Elasticsearch, create a subclass of?django_elasticsearch_dsl.Document, create a?class Index?inside the?Document?class to define your Elasticsearch indices, names,
settings etc and at last register the class using?registry.register_document?decorator.
It required to defined?Documentclass in?documents.pyin your app directory.
# documents.py
from django_elasticsearch_dsl importDocument
from django_elasticsearch_dsl.registries importregistry
from .models importCar
@registry.register_document
class CarDocument(Document):
??? class Index:
??????? # Name of the Elasticsearch index
??????? name='cars'
??????? # See Elasticsearch Indices API reference for available settings
??????? settings={'number_of_shards': 1,
??????????????????? 'number_of_replicas': 0}
??? class Django:
??????? model=Car# The model associated with this Document
??????? # The fields of the model you want to be indexed in Elasticsearch
??????? fields=[
??????????? 'name',
??????????? 'color',
??????????? 'description',
??????????? 'type',
??????? ]
??????? # Ignore auto updating of Elasticsearch when a model is saved
??????? # or deleted:
??????? # ignore_signals = True
??????? # Don't perform an index refresh after every update (overrides global setting):
??????? # auto_refresh = False
??????? # Paginate the django queryset used to populate the index with the specified size
??????? # (by default there is no pagination)
??????? # queryset_pagination = 5000
To create andpopulate the Elasticsearch index and mapping use the search_index command:
$ ./manage.py search_index --rebuild
Now, when youdo something like:
car=Car(
??? name="Car one",
??? color="red",
??? type=1,
??? description="A beautiful car"
)
car.save()
The object
will be saved in Elasticsearch too (using a signal handler). To get an elasticsearch-dsl-py?Searchinstance, use:
s=CarDocument.search().filter("term",color="red")
# or
s=CarDocument.search().query("match",description="beautiful")
forhitins:
??? print(
??????? "Car name : {}, description {}".format(hit.name,hit.description)
??? )
The previous
example returns a result specific to?elasticsearch_dsl, but it is also possible to convert the elastisearch resultinto a real django queryset, just be aware that this costs a sql request toretrieve the model instances with the ids returned by the elastisearch query.
s=CarDocument.search().filter("term",color="blue")[:30]
qs=s.to_queryset()
# qs is just a django queryset and it is called with order_by to keep
# the same order as the elasticsearch result.
forcarinqs:
??? print(car.name)
按照上述方法牲平,我創(chuàng)建了一個model域滥,只有一個result字段,因為我發(fā)現(xiàn)celery worker存儲進來的數(shù)據(jù)昂儒,就只有result這一個字段委可,為了與es的model對應:
同時,創(chuàng)建documents.py:
@registry.register_document
class CeleryResultDocument(Document):
class Index:
# Name of the Elasticsearch index
name = 'elastic4'
# See Elasticsearch Indices API reference for available settings
settings = {'number_of_shards': 1,
'number_of_replicas': 0}
class Django:
??????? model = CeleryResult
# The model associated with this Document
??????? # The fields of the model you want to be indexed in Elasticsearch
fields = [
'result',
]
這里是將es的index(elastic4)與model(CeleryResult)關聯(lián)起來拾酝。
成功后卡者,就可以重建索引了:
./manage.py search_index --rebuild
如果重建索引,則es中對應數(shù)據(jù)庫的內容就都沒有了3缇觥O庹岳守!這點需要注意碌冶!
還有一點需要注意,就是settings中配置的celery_result_backend配置為:elasticsearch://172.16.2.221:9200/elastic4/doc
這里如果不配置type名稱譬重,es就會默認創(chuàng)建一個罐氨。
這里相當于直接將數(shù)據(jù)存儲到對應的es的elastic4數(shù)據(jù)庫的doc表中。并且一個數(shù)據(jù)庫只能有一個type(表)栅隐。如果設置向同一個數(shù)據(jù)庫中的不同type下存儲數(shù)據(jù)租悄,就會引發(fā)異常報錯,提示只能有一個type泣棋,不能同時存在兩個type!
當一切問題都順利解決后鸯屿,就可以使用CeleryResultDocument來查詢數(shù)據(jù)了把敢,這里的邏輯不是django orm了,而是按照官網(wǎng)的方式技竟,直接去es中查詢數(shù)據(jù)出來。
#database為elasticsearch的解析方式熙尉。FBV
def user_list(request):
# s = CeleryResultDocument.search().filter("term", _type="doc")
??? #
默認只能展示10條數(shù)據(jù)搓扯,若想獲得超過10條數(shù)據(jù),則需要在此對from和size做設置
??? # [:100]表示從0開始取前100個數(shù)據(jù)
??? s = CeleryResultDocument.search().filter("term", _index="elastic4")[:100]
??? list= []
??? id =
0
for hit in s:
dict = {}
#將字符串轉換成字典
??????? dict = json.loads(hit.result)
#取出字典中的result的值铅歼,這些才是我需要的
??????? resultList = dict.get("result")
??????? id +=
1
#將獲取到的數(shù)據(jù)封裝成dict
dict['id'] = id
??????? dict[
'hostname'] = resultList[0]
??????? dict[
'cpu'] = resultList[1]
??????? dict[
'mem'] = resultList[2]
??????? dict[
'net_in'] = resultList[3]
??????? dict[
'net_out'] = resultList[4]
??????? list.append(dict)
??? context = {
'object_list': list}
return render(request, 'list.html',context)
固定寫法,查出來的數(shù)據(jù)也是string類型厦幅,同樣需要轉換成dict慨飘,然后封裝到list中,傳回前臺展示瓤的。值得注意的是這個切片處理:
s = CeleryResultDocument.search().filter("term", _index="elastic4")[:100]
如果不添加這個切片,前端只能默認顯示10條數(shù)據(jù)塔猾,而這里做了切片后稽坤,就可以顯示100條數(shù)據(jù)了。這真是最后一關袄铣妗茫多!
最后的樣子是這樣的:
瀏覽器端展示的websocket信息:
數(shù)據(jù)是通過psutil采集的天揖,celery負責異步調度周期性的執(zhí)行數(shù)據(jù)抓取工作今膊,同時對抓取的數(shù)據(jù)進行分析處理斑唬,如果超過閾值的,就通過channels實時發(fā)送給前端頁面展示出來缤谎。
待擴展的功能:
jquery局部刷新數(shù)據(jù)功能,實現(xiàn)數(shù)據(jù)與異常并存『兀現(xiàn)在是如果刷新頁面,之前的異常信息就不顯示了斟赚,只能顯示實時發(fā)送過來的汁展。如果做成了局部數(shù)據(jù)刷新食绿,那么二者就可以并存了器紧。