用Tornado進行API開發(fā)——接收信息
物聯(lián)網(wǎng)設(shè)備定時向API接口發(fā)送數(shù)據(jù)吩翻,存儲數(shù)據(jù)到數(shù)據(jù)庫寇荧。
接收物聯(lián)網(wǎng)設(shè)備POST的請求
訪問量:每隔10秒同時接收65次連接
解決方案一
使用python官方的http.server
里的HTTPServer
和BaseHTTPRequestHandler
#! /usr/bin/env python3
# -*- coding:UTF-8 -*-
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn
import json
import cgi
import pandas as pd
import datetime
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
import smtplib
import sys
host = ('',8001)
engine = create_engine('mysql+pymysql://*****:3306/pointgrab_info',pool_size=50,max_overflow=10,pool_timeout =10,poolclass=QueuePool,pool_recycle=-1)
def data_select_sql(areaId):
sql = " SELECT * FROM pointgrab_count_event_result WHERE capture_area_id ='{0}' ORDER BY id DESC LIMIT 1;".format(areaId)
df = pd.read_sql_query(sql, engine)
return df
def data_into_sql(df):
df.to_sql('pointgrab_count_event_result', engine, index=False, if_exists='append')
return True
class TodoHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_error(415, 'Only post is supported')
log.error(self.client_address[0]+" - 415 - Only post is supported")
def do_POST(self):
ctype, pdict = cgi.parse_header(self.headers['content-type'])
# print(ctype, pdict)
token = self.headers['X-Auth-Token']
# print(token)
if token == 'token' and ctype == 'application/json':
path = str(self.path) # 獲取請求的url
if path == '/api/counting/':
# print(path)
length = int(self.headers['content-length']) # 獲取除頭部后的請求參數(shù)的長度
datas = self.rfile.read(length) # 獲取請求參數(shù)數(shù)據(jù),請求數(shù)據(jù)為json字符串
# print(datas)
rjson = json.loads(datas.decode())
# print(rjson,type(rjson))
lastdata = data_select_sql(rjson['areaId'])
if rjson['count'] not in lastdata['capture_counts'].values :
data_into_sql(rjson)
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps('Counting data is inserted').encode())
else:
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps('Counting data is duplication').encode())
else:
self.send_error(404, "Not Found")
else:
self.send_error(415, "Only json data is supported.")
# 混合類重寫了process_request
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
pass
def main():
global server
server = ThreadingHTTPServer(host, TodoHandler)
# print(time.asctime(),"Starting server, listen at: %s:%s" % host)
log.info("Starting server, listen at: %s:%s" % host)
try:
server.serve_forever()
except KeyboardInterrupt:
server.server_close()
send_mail("Keyboard interrupt received: EXITING")
# print(time.asctime(), "Keyboard interrupt received: EXITING")
except Exception as e:
# print(e)
server.server_close()
# print(time.asctime(), "Server Stopped")
if __name__ == '__main__':
main()
出現(xiàn)問題:TCP連接數(shù)暴漲(Linux下查看tcp連接數(shù)及狀態(tài))齐疙,長時間運行后出現(xiàn)程序在運行,端口無法訪問的狀態(tài)
解決方案:并行程序6個即API接口服務(wù)開6個,使用Nginx代理轉(zhuǎn)發(fā)到6個端口莉兰。
upstream pointgrab {
least_conn;
server 127.0.0.1:8002;
server 127.0.0.1:8003;
server 127.0.0.1:8004;
server 127.0.0.1:8005;
server 127.0.0.1:8006;
server 127.0.0.1:8007;
keepalive 16;
}
server {
listen 8001;
server_name 127.0.0.1;
location / {
proxy_pass http://pointgrab;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}
新問題:內(nèi)存占用量比較大
解決方案:定時重啟程序
解決方案二
方案一勉強可以用,但穩(wěn)定性稍差
使用web服務(wù)框架Django礁竞、Flask糖荒、Twisted、Tornado等來開發(fā)
本次采用Tornado來實現(xiàn) Tornado官方文檔
# tornado的GET模捂、POST請求示例
import json
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
from tornado.options import define, options
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
import datetime
#定義端口為8080
define("port", default=8001, help="run on the given port", type=int)
# 數(shù)據(jù)庫連接信息
engine = create_engine('mysql+pymysql://*****:3306/pointgrab_info',pool_size=50,max_overflow=10,pool_timeout =30,poolclass=QueuePool,pool_recycle=-1)
def data_select_sql(areaId):
sql = " SELECT * FROM pointgrab_count_event_result_copytest WHERE capture_area_id ='{0}' ORDER BY id DESC LIMIT 1;".format(areaId)
df = pd.read_sql_query(sql, engine)
return df
def data_into_sql(df):
df.to_sql('pointgrab_count_event_result_copytest', engine, index=False, if_exists='append')
return True
# GET請求
class IndexHandler(tornado.web.RequestHandler):
# get函數(shù)
def get(self):
self.send_error(415, reason="Only post is supported")
# POST請求
# POST請求參數(shù): name, age, city
class CountingHandler(tornado.web.RequestHandler):
# post函數(shù)
async def post(self):
token = self.request.headers['X-Auth-Token']
ctype = self.request.headers['Content-Type']
if token == 'token' and ctype == 'application/json':
path = str(self.request.path) # 獲取請求的url
if path == '/api/counting/':
datas = self.request.body
rjson = json.loads(datas.decode())
# print(rjson)
lastdata = data_select_sql(rjson['areaId'])
if rjson['count'] not in lastdata['capture_counts'].values :
countingdf = counting(rjson)
data_into_sql(countingdf)
self.set_status(200)
self.set_header('Content-type', 'application/json')
self.write(json.dumps('Counting data is inserted').encode())
else:
self.set_status(200)
self.set_header('Content-type', 'application/json')
self.write(json.dumps('Counting data is duplication').encode())
# 主函數(shù)
def main():
tornado.options.parse_command_line()
# 定義app
app = tornado.web.Application(
handlers=[(r'/', IndexHandler), (r'/api/counting/', CountingHandler)], #網(wǎng)頁路徑控制
)
http_server = tornado.httpserver.HTTPServer(app)
http_server.listen(options.port)
tornado.ioloop.IOLoop.instance().start()
main()
代碼解釋
實現(xiàn)web.RequestHandler子類捶朵,重載其中的get()和post()
web.Application對象定義路由映射和監(jiān)聽端口
tornado.ioloop.IOLoop.instance().start()啟動IOLoop,該函數(shù)將一直運行且不退出狂男。
HTTPServerRequest常用對象屬性