typora-copy-images-to: ipic
[TOC]
配置環(huán)境
首先檢查你的python
版本:
$ python3 -V
Python 3.6.3
安裝aiohttp
:
$ pip3 install aiohttp
查看aiohttp版本號:
$ python3 -c 'import aiohttp; print(aiohttp.__version__)'
3.0.7
項目結(jié)構(gòu)與其他基于python的web項目非常相似:
.
├── README.rst
└── polls
├── Makefile
├── README.rst
├── aiohttpdemo_polls
│ ├── __init__.py
│ ├── __main__.py
│ ├── db.py
│ ├── main.py
│ ├── routes.py
│ ├── templates
│ ├── utils.py
│ └── views.py
├── config
│ └── polls.yaml
├── images
│ └── example.png
├── setup.py
├── sql
│ ├── create_tables.sql
│ ├── install.sh
│ └── sample_data.sql
└── static
└── style.css
開始第一個aiohttp應(yīng)用
這個教程基于Django的投票應(yīng)用教程。
應(yīng)用
所有的aiohttp服務(wù)器都圍繞aiohttp.web.Application實例來構(gòu)建沛硅。用于注冊startup/cleanup信號眼刃,以及連接路由等。
創(chuàng)建一個項目:
vote
├── config
│ └── __init__.py
├── models
│ └── __init__.py
├── static
├── template
└── application
└── __init__.py
目錄vote下面分別創(chuàng)建了config摇肌、models擂红、application、static围小、template篮条。
這里我使用pycharm開發(fā),圖示如下:
創(chuàng)建一個應(yīng)用:
from aiohttp import web
app = web.Application()
web.run_app(app, host='0.0.0.0', port=9000)
保存于vote/main.py并啟動服務(wù)器:
$ python3 /Users/junxi/program/vote/main.py
這里的vote是項目的根目錄吩抓。
你將在命令行中看到如下輸出:
======== Running on http://0.0.0.0:9000 ========
(Press CTRL+C to quit)
在瀏覽器中打開http://localhost:9000/
或者使用命令
$ curl -X GET http://localhost:9000
不過涉茧,對于全部請求現(xiàn)在只會返回404: Not Found
,讓我們創(chuàng)建一個路由和視圖來展示一些更有意義的東西疹娶。
視圖
讓我們從第一個視圖開始伴栓。創(chuàng)建application/views.py
并加入如下代碼:
from aiohttp import web
async def hello(request):
return web.Response(text='Hello Aiohttp!')
現(xiàn)在我們應(yīng)該為這個 index
視圖創(chuàng)建一個路由。 將如下代碼寫入 application/routes.py
(分離視圖雨饺,路由钳垮,模型是種很好的做法。 因為你可能擁有很多這些組件额港,放在不同的地方可以方便地管理代碼):
from .views import hello
def setup_routes(app):
app.router.add_get('/hello', hello)
此外饺窿,我們應(yīng)該在某個地方調(diào)用 setup_routes
函數(shù),最好是在 main.py
中調(diào)用它:
from aiohttp import web
from application.routes import setup_routes
app = web.Application()
setup_routes(app)
web.run_app(app, host='0.0.0.0', port=9000)
再次啟動服務(wù)器. 現(xiàn)在我們打開瀏覽器就可以看見:
$ curl -X GET localhost:9000/hello
Hello Aiohttp!
工作目錄應(yīng)該是像下面這樣:
vote
├── application
│ ├── __init__.py
│ ├── routes.py
│ └── views.py
├── config
│ ├── __init__.py
│ └── settings.py
├── main.py
├── models
│ ├── __init__.py
├── static
└── template
配置文件
aiohttp 的配置是不可知的移斩。 這意味著這個庫不需要任何配置方法肚医,并且也沒有內(nèi)置支持任何配置模式绢馍。
但是請考慮下面這些事實:
99% 的服務(wù)器都有配置文件.
每個產(chǎn)品(除了像 Django 和 Flask 等基于 Python 的解決方案外)都不將配置文件寫入源代碼。
比如 Nginx 默認將自己的配置文件存儲在
/etc/nginx
文件夾下肠套。Mongo 將配置文件存為
/etc/mongodb.conf
舰涌。驗證配置文件是個好主意,充分的檢查可以在產(chǎn)品部署時避免許多愚蠢的錯誤你稚。
因此瓷耙,我們 建議 使用以下方法:
- 將配置存為
yaml
文件(json
或ini
格式也不錯,但是yaml
格式是最好的).- 從預(yù)定位置加載
yaml
配置刁赖。例如./config/app_cfg.yaml
,/etc/app_cfg.yaml
搁痛。- 保持可以通過命令行參數(shù)覆蓋配置文件的能力。例如
./run_app --config=/opt/config/app_cfg.yaml
宇弛。- 對于加載的字典應(yīng)用嚴格的檢查鸡典。 trafaret, colander or JSON schema 是這類型工作的好候選。
加載配置并在應(yīng)用中讀取:
# load config from yaml file in current dir
conf = load_config(str(pathlib.Path('.') / 'config' / 'settings.yaml'))
app['config'] = conf
或者使用py文件當作配置文件:
├── config
│ ├── __init__.py
│ └── settings.py
構(gòu)建數(shù)據(jù)庫
數(shù)據(jù)庫模式
操作MySQL數(shù)據(jù)庫的工具涯肩,之前django項目一直使用本身自帶的orm,tornado項目使用的torndb.py巢钓。其他項目則使用的pymysql庫病苗,pymysql庫的用法在這里。
本文使用MySQL數(shù)據(jù)庫和aiomysql這個異步操作MySQL的庫症汹。
安裝aiomysql
需要依賴pymysql
$ pip3 install pymysql
$ pip3 install aiomysql
我們使用 aiomysql 來描述數(shù)據(jù)庫模式硫朦。
aiomysql官網(wǎng)連接示例
import asyncio
from aiomysql import create_pool
loop = asyncio.get_event_loop()
async def go():
async with create_pool(host='127.0.0.1', port=3306,
user='root', password='',
db='mysql', loop=loop) as pool:
async with pool.get() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 42;")
value = await cur.fetchone()
print(value)
loop.run_until_complete(go())
aiomysql官網(wǎng)連接池示例
import asyncio
import aiomysql
async def test_example(loop):
pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
user='root', password='',
db='mysql', loop=loop)
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 42;")
print(cur.description)
(r,) = await cur.fetchone()
assert r == 42
pool.close()
await pool.wait_closed()
loop = asyncio.get_event_loop()
loop.run_until_complete(test_example(loop))
SQLAlchemy可選集成的示例
這里不使用sqlalchemy這個orm,原因:遷移功能不怎么好使背镇,用慣了django的orm咬展,感覺別的不咋好用。寫原生sql練習自己的原生sql編寫能力瞒斩。
import asyncio
import sqlalchemy as sa
from aiomysql.sa import create_engine
metadata = sa.MetaData()
tbl = sa.Table('tbl', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('val', sa.String(255)))
async def go(loop):
engine = await create_engine(user='root', db='test_pymysql',
host='127.0.0.1', password='', loop=loop)
async with engine.acquire() as conn:
await conn.execute(tbl.insert().values(val='abc'))
await conn.execute(tbl.insert().values(val='xyz'))
async for row in conn.execute(tbl.select()):
print(row.id, row.val)
engine.close()
await engine.wait_closed()
loop = asyncio.get_event_loop()
loop.run_until_complete(go(loop))
創(chuàng)建數(shù)據(jù)庫表
查看mysql
版本
$ mysql --version
/usr/local/mysql/bin/mysql Ver 14.14 Distrib 5.7.20, for macos10.12 (x86_64) using EditLine wrapper
創(chuàng)建一個數(shù)據(jù)庫vote破婆,并增加授權(quán)用戶
$ mysql -uroot -p123456
mysql> CREATE DATABASE IF NOT EXISTS vote CHARACTER SET utf8 COLLATE utf8_general_ci;
mysql> grant all on vote.* to vote identified by '123456';
創(chuàng)建表user
CREATE TABLE IF NOT EXISTS `user`(
`id` INT(11) NOT NULL AUTO_INCREMENT COMMENT '用戶ID',
`delete_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT '刪除標志',
`name` VARCHAR(40) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '昵稱',
`phone` VARCHAR(11) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '電話',
`email` VARCHAR(40) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '郵箱',
`password` VARCHAR(16) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '密碼',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時間',
PRIMARY KEY ( `id` ),
INDEX `email` (`email`) USING BTREE,
INDEX `phone` (`phone`) USING BTREE
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci
ROW_FORMAT=DYNAMIC
;
查看user表結(jié)構(gòu)
+-------------+-------------+------+-----+-------------------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+-------------+------+-----+-------------------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| delete_flag | tinyint(1) | NO | | 0 | |
| name | varchar(40) | NO | | NULL | |
| phone | varchar(11) | NO | MUL | NULL | |
| email | varchar(40) | NO | MUL | NULL | |
| password | varchar(16) | NO | | NULL | |
| create_time | datetime | NO | | CURRENT_TIMESTAMP | |
+-------------+-------------+------+-----+-------------------+----------------+
創(chuàng)建表question
CREATE TABLE IF NOT EXISTS `question`(
`id` INT(11) NOT NULL AUTO_INCREMENT COMMENT '問題ID',
`delete_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT '刪除標志',
`user_id` INT(11) NOT NULL COMMENT '用戶ID',
`question_text` VARCHAR(200) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '問題內(nèi)容',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時間',
PRIMARY KEY ( `id` ),
FOREIGN KEY (`user_id`) REFERENCES `user` (`id`) ON DELETE RESTRICT ON UPDATE RESTRICT,
INDEX `user_id` (`user_id`) USING BTREE
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci
ROW_FORMAT=DYNAMIC
;
查看question表結(jié)構(gòu)
+---------------+--------------+------+-----+-------------------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+--------------+------+-----+-------------------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| delete_flag | tinyint(1) | NO | | 0 | |
| user_id | int(11) | NO | MUL | NULL | |
| question_text | varchar(200) | NO | | NULL | |
| create_time | datetime | NO | | CURRENT_TIMESTAMP | |
+---------------+--------------+------+-----+-------------------+----------------+
創(chuàng)建表choice
CREATE TABLE IF NOT EXISTS `choice`(
`id` INT(11) NOT NULL AUTO_INCREMENT COMMENT '選擇ID',
`delete_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT '刪除標志',
`choice_text` VARCHAR(200) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '選擇內(nèi)容',
`votes` INT(11) NOT NULL COMMENT '得票數(shù)',
`question_id` INT(11) NOT NULL COMMENT '問題ID',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時間',
PRIMARY KEY ( `id` ),
FOREIGN KEY (`question_id`) REFERENCES `question` (`id`) ON DELETE RESTRICT ON UPDATE RESTRICT,
INDEX `question_id` (`question_id`) USING BTREE
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci
ROW_FORMAT=DYNAMIC
;
查看choice表結(jié)構(gòu)
+-------------+--------------+------+-----+-------------------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+--------------+------+-----+-------------------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| delete_flag | tinyint(1) | NO | | 0 | |
| choice_text | varchar(200) | YES | | NULL | |
| votes | int(11) | NO | | NULL | |
| question_id | int(11) | NO | MUL | NULL | |
| create_time | datetime | NO | | CURRENT_TIMESTAMP | |
+-------------+--------------+------+-----+-------------------+----------------+
創(chuàng)建連接池
我們需要創(chuàng)建一個全局的連接池,每個HTTP請求都可以從連接池中直接獲取數(shù)據(jù)庫連接胸囱。使用連接池的好處是不必頻繁地打開和關(guān)閉數(shù)據(jù)庫連接祷舀,而是能復(fù)用就盡量復(fù)用。
缺省情況下將編碼設(shè)置為utf8
烹笔,自動提交事務(wù):
async def create_pool(loop, **kw):
"""定義mysql全局連接池"""
logging.info('create database connection pool...')
global _mysql_pool
_mysql_pool = await aiomysql.create_pool(host=DATABASES['host'], port=DATABASES['port'], user=DATABASES['user'],
password=DATABASES['password'], db=DATABASES['db'], loop=loop,
charset=kw.get('charset', 'utf8'), autocommit=kw.get('autocommit', True),
maxsize=kw.get('maxsize', 10), minsize=kw.get('minsize', 1))
return _mysql_pool
封裝增刪改查
Web App里面有很多地方都要訪問數(shù)據(jù)庫裳扯。訪問數(shù)據(jù)庫需要創(chuàng)建數(shù)據(jù)庫連接、游標對象谤职,然后執(zhí)行SQL語句饰豺,最后處理異常,清理資源允蜈。這些訪問數(shù)據(jù)庫的代碼如果分散到各個函數(shù)中冤吨,勢必無法維護蒿柳,也不利于代碼復(fù)用。
所以锅很,我們要首先把常用的SELECT其馏、INSERT、UPDATE和DELETE操作用函數(shù)封裝起來爆安。
由于Web框架使用了基于asyncio的aiohttp叛复,這是基于協(xié)程的異步模型。在協(xié)程中扔仓,不能調(diào)用普通的同步IO操作褐奥,因為所有用戶都是由一個線程服務(wù)的,協(xié)程的執(zhí)行速度必須非城檀兀快撬码,才能處理大量用戶的請求。而耗時的IO操作不能在協(xié)程中以同步的方式調(diào)用版保,否則呜笑,等待一個IO操作時,系統(tǒng)無法響應(yīng)任何其他用戶彻犁。
這就是異步編程的一個原則:一旦決定使用異步叫胁,則系統(tǒng)每一層都必須是異步,“開弓沒有回頭箭”汞幢。
幸運的是aiomysql
為MySQL數(shù)據(jù)庫提供了異步IO的驅(qū)動驼鹅。
要執(zhí)行SELECT語句,我們用select
函數(shù)執(zhí)行森篷,需要傳入SQL語句和SQL參數(shù):
async def fetchone(sql, args=(), size=None):
"""封裝select输钩,查詢單個,返回數(shù)據(jù)為字典"""
log(sql, args)
async with _mysql_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
rs = await cur.fetchone()
return rs
async def select(sql, args=(), size=None):
"""封裝select仲智,查詢多個买乃,返回數(shù)據(jù)為列表"""
log(sql, args)
async with _mysql_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
if size:
rs = await cur.fetchmany(size)
else:
rs = await cur.fetchall()
logging.info('rows returned: %s' % len(rs))
return rs
注意要始終堅持使用帶參數(shù)的SQL,而不是自己拼接SQL字符串钓辆,這樣可以防止SQL注入攻擊为牍。
注意到yield from
將調(diào)用一個子協(xié)程(也就是在一個協(xié)程中調(diào)用另一個協(xié)程)并直接獲得子協(xié)程的返回結(jié)果。
如果傳入size
參數(shù)岩馍,就通過fetchmany()
獲取最多指定數(shù)量的記錄碉咆,否則,通過fetchall()
獲取所有記錄蛀恩。
Insert, Update, Delete
要執(zhí)行INSERT疫铜、UPDATE、DELETE語句双谆,可以定義一個通用的execute()
函數(shù)壳咕,因為這3種SQL的執(zhí)行都需要相同的參數(shù)席揽,以及返回一個整數(shù)表示影響的行數(shù):
async def execute(sql, args=()):
"""封裝insert, delete, update"""
log(sql, args)
async with _mysql_pool.acquire() as conn:
async with conn.cursor() as cur:
try:
await cur.execute(sql, args)
except BaseException:
await conn.rollback()
return
else:
affected = cur.rowcount
return affected
execute()
函數(shù)和select()
函數(shù)所不同的是,cursor對象不返回結(jié)果集谓厘,而是通過rowcount
返回結(jié)果數(shù)幌羞。
這三個函數(shù)定義在models文件夾下的db.py中(db.py是新創(chuàng)建的文件):
完整代碼如下:
import logging
logging.basicConfig(level=logging.INFO)
import aiomysql
import aioredis
from config.settings import DATABASES, CACHES
def log(sql, args=()):
logging.info('SQL: %s' % sql, *args)
async def create_pool(loop, **kw):
"""定義mysql全局連接池"""
logging.info('create database connection pool...')
global _mysql_pool
_mysql_pool = await aiomysql.create_pool(host=DATABASES['host'], port=DATABASES['port'], user=DATABASES['user'],
password=DATABASES['password'], db=DATABASES['db'], loop=loop,
charset=kw.get('charset', 'utf8'), autocommit=kw.get('autocommit', True),
maxsize=kw.get('maxsize', 10), minsize=kw.get('minsize', 1))
return _mysql_pool
async def fetchone(sql, args=(), size=None):
"""封裝select,查詢單個竟稳,返回數(shù)據(jù)為字典"""
log(sql, args)
async with _mysql_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
rs = await cur.fetchone()
return rs
async def select(sql, args=(), size=None):
"""封裝select属桦,查詢多個,返回數(shù)據(jù)為列表"""
log(sql, args)
async with _mysql_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
if size:
rs = await cur.fetchmany(size)
else:
rs = await cur.fetchall()
logging.info('rows returned: %s' % len(rs))
return rs
async def execute(sql, args=()):
"""封裝insert, delete, update"""
log(sql, args)
async with _mysql_pool.acquire() as conn:
async with conn.cursor() as cur:
try:
await cur.execute(sql, args)
except BaseException:
await conn.rollback()
return
else:
affected = cur.rowcount
return affected
把執(zhí)行SQL的函數(shù)導(dǎo)入到models/init.py文件中他爸,方便別的模塊引用:
from .db import *
__all__ = ['create_pool', 'select', 'execute', 'fetchone']
把我們創(chuàng)建表的sql語句保存到models/create_table.sql文件中:
CREATE TABLE IF NOT EXISTS `user`(
`id` INT(11) NOT NULL AUTO_INCREMENT COMMENT '用戶ID',
`delete_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT '刪除標志',
`name` VARCHAR(40) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '昵稱',
`phone` VARCHAR(11) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '電話',
`email` VARCHAR(40) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '郵箱',
`password` VARCHAR(16) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '密碼',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時間',
PRIMARY KEY ( `id` ),
INDEX `email` (`email`) USING BTREE,
INDEX `phone` (`phone`) USING BTREE
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci
ROW_FORMAT=DYNAMIC
;
CREATE TABLE IF NOT EXISTS `question`(
`id` INT(11) NOT NULL AUTO_INCREMENT COMMENT '問題ID',
`delete_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT '刪除標志',
`user_id` INT(11) NOT NULL COMMENT '用戶ID',
`question_text` VARCHAR(200) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '問題內(nèi)容',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時間',
PRIMARY KEY ( `id` ),
FOREIGN KEY (`user_id`) REFERENCES `user` (`id`) ON DELETE RESTRICT ON UPDATE RESTRICT,
INDEX `user_id` (`user_id`) USING BTREE
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci
ROW_FORMAT=DYNAMIC
;
CREATE TABLE IF NOT EXISTS `choice`(
`id` INT(11) NOT NULL AUTO_INCREMENT COMMENT '選擇ID',
`delete_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT '刪除標志',
`choice_text` VARCHAR(200) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '選擇內(nèi)容',
`votes` INT(11) NOT NULL COMMENT '得票數(shù)',
`question_id` INT(11) NOT NULL COMMENT '問題ID',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時間',
PRIMARY KEY ( `id` ),
FOREIGN KEY (`question_id`) REFERENCES `question` (`id`) ON DELETE RESTRICT ON UPDATE RESTRICT,
INDEX `question_id` (`question_id`) USING BTREE
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci
ROW_FORMAT=DYNAMIC
;
models目錄結(jié)構(gòu):
models/
├── __init__.py
└── db.py
編寫配置文件
之前我們說過的配置文件聂宾,我使用py文件當作配置文件,conf/settings.py內(nèi)容如下:
DATABASES = {
'engine': 'mysql',
'db': 'vote',
'user': 'vote',
'password': '123456',
'host': 'localhost',
'port': 3306,
}
插入模擬數(shù)據(jù)
INSERT INTO user(name, phone, email, password) VALUES('露西', '16666666661', 'luxi@qq.com', '123456'), ('南希', '16666666662', 'nanxi@qq.com', '123456'), ('雪靈', '16666666663', 'xueling@qq.com', '123456');
INSERT INTO question(question_text, user_id) VALUES('最受歡迎的計算機語言诊笤?', 1), ('最受歡迎的水果系谐?', 2), ('男人最喜歡女人什么地方?', 3);
INSERT INTO choice(choice_text, question_id, votes) VALUES('python', 1, 3), ('java', 1, 2), ('go', 1, 1);
INSERT INTO choice(choice_text, question_id, votes) VALUES('香蕉', 2, 3), ('蘋果', 2, 2), ('草莓', 2, 1);
INSERT INTO choice(choice_text, question_id, votes) VALUES('漂亮臉蛋', 3, 3), ('大胸', 3, 2), ('大長腿', 3, 1);
基礎(chǔ)視圖類
aiohttp.web
提供django風格的基礎(chǔ)試圖類讨跟。
你可以從 View
類中繼承故黑,并自定義http請求的處理方法:
from aiohttp import web
from models import select
import json
import datetime
import decimal
class RewriteJsonEncoder(json.JSONEncoder):
"""重寫json類篮绰,為了解決datetime類型的數(shù)據(jù)無法被json格式化"""
def default(self, obj):
if isinstance(obj, datetime.datetime):
return obj.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(obj, datetime.date):
return obj.strftime("%Y-%m-%d")
elif isinstance(obj, decimal.Decimal):
return str(obj)
elif hasattr(obj, 'isoformat'):
# 處理日期類型
return obj.isoformat()
else:
return json.JSONEncoder.default(self, obj)
def json_dumps(obj):
return json.dumps(obj, cls=RewriteJsonEncoder)
async def hello(request):
return web.Response(text='Hello Aiohttp!')
class QuestionChoices(web.View):
"""查看一個問題的可選答案"""
async def get(self):
question_id = self.request.match_info.get('question_id')
result = await select(self.request.app['db'], 'select * from choice where question_id = %s', (question_id,))
return web.json_response(data=result, dumps=json_dumps)
定義路由:
from .views import hello, QuestionChoices
def setup_routes(app):
app.router.add_get('/hello', hello, name='hello')
app.router.add_route('*', '/question/{question_id}/choice', QuestionChoices)
打開瀏覽器或輸入下面命令訪問:
$ curl -X GET http://127.0.0.1:9000/question/1/choice
[{"id": 1, "delete_flag": 0, "choice_text": "python", "votes": 3, "question_id": 1, "create_time": "2018-04-15 19:47:16"}, {"id": 2, "delete_flag": 0, "choice_text": "java", "votes": 2, "question_id": 1, "create_time": "2018-04-15 19:47:16"}, {"id": 3, "delete_flag": 0, "choice_text": "go", "votes": 1, "question_id": 1, "create_time": "2018-04-15 19:47:16"}]j
之前使用django比較多祝懂,個人喜歡使用類視圖暑椰。
裝飾器視圖
路由裝飾器有點像Flask風格:
routes = web.RouteTableDef()
@routes.get('/get')
async def handle_get(request):
...
@routes.post('/post')
async def handle_post(request):
...
app.router.add_routes(routes)
首先是要創(chuàng)建一個 aiohttp.web.RouteTableDef
對象吏颖。
該對象是一個類列表對象男娄,額外提供aiohttp.web.RouteTableDef.get()
震蒋,aiohttp.web.RouteTableDef.post()
這些裝飾器來注冊路由瞬场。
最后調(diào)用add_routes()
添加到應(yīng)用的路由里乾巧。
靜態(tài)文件
處理靜態(tài)文件( 圖片句喜,JavaScripts, CSS文件等)最好的方法是使用反向代理,像是nginx或CDN服務(wù)沟于。
但就開發(fā)來說咳胃,aiohttp
服務(wù)器本身可以很方便的處理靜態(tài)文件。
只需要通過 UrlDispatcher.add_static()
注冊個新的靜態(tài)路由即可:
app.router.add_static('/static', path_to_static_folder)
當訪問靜態(tài)文件的目錄時旷太,默認服務(wù)器會返回 HTTP/403 Forbidden(禁止訪問)展懈。 使用show_index
并將其設(shè)置為True
可以顯示出索引:
app.router.add_static('/static', path_to_static_folder, show_index=True)
當從靜態(tài)文件目錄訪問一個符號鏈接(軟鏈接)時,默認服務(wù)器會響應(yīng) HTTP/404 Not Found(未找到)供璧。使用follow_symlinks
并將其設(shè)置為True
可以讓服務(wù)器使用符號鏈接:
app.router.add_static('/static', path_to_static_folder, follow_symlinks=True)
如果你想允許緩存清除存崖,使用append_version
并設(shè)為True
。
緩存清除會對資源文件像JavaScript 和 CSS文件等的文件名上添加一個hash后的版本睡毒。這樣的好處是我們可以讓瀏覽器無限期緩存這些文件而不用擔心這些文件是否發(fā)布了新版本来惧。
app.router.add_static('/static', path_to_static_folder, append_version=True)
這里我們添加一個靜態(tài)文件的路由
首先在配置文件conf/settings.py中指定項目、靜態(tài)文件演顾、模版HTML路徑:
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # 項目路徑
STATIC_DIR = os.path.join(BASE_DIR, 'static') # 靜態(tài)文件路徑
TEMPLATE_DIR = os.path.join(BASE_DIR, 'template') # 模版HTML路徑
接下里在application/routes.py文件中添加一個靜態(tài)文件路由:
def setup_static_routes(app):
app.router.add_static('/static/', path=STATIC_DIR, name='static')
下載uikit的靜態(tài)文件到static目錄下:
static
├── css
│ ├── uikit-rtl.css
│ ├── uikit-rtl.min.css
│ ├── uikit.css
│ └── uikit.min.css
└── js
├── uikit-icons.js
├── uikit-icons.min.js
├── uikit.js
└── uikit.min.js
把添加靜態(tài)路由的函數(shù)添加到application/main.py文件的init函數(shù)中:
async def init(loop):
mysql_pool = await create_pool(loop)
app = web.Application(loop=loop)
app['db'] = mysql_pool
setup_routes(app)
setup_static_routes(app)
return app
重啟服務(wù)器訪問http://127.0.0.1:9000/static/js/bootstrap.js
$ curl -X GET http://127.0.0.1:9000/static/js/bootstrap.js
/*!
* Bootstrap v4.0.0 (https://getbootstrap.com)
* Copyright 2011-2018 The Bootstrap Authors (https://github.com/twbs/bootstrap/graphs/contributors)
* Licensed under MIT (https://github.com/twbs/bootstrap/blob/master/LICENSE)
*/
供搀。隅居。。葛虐。胎源。
。屿脐。涕蚤。。摄悯。
可以正常訪問赞季,靜態(tài)路由已經(jīng)添加成功了。
模版
aiohttp.web
并不直接提供模板讀取奢驯,不過可以使用第三方庫 aiohttp_jinja2
申钩,該庫是由aiohttp
作者維護的。
使用起來也很簡單瘪阁。首先我們用aiohttp_jinja2.setup()
來設(shè)置下jinja2
環(huán)境撒遣。
安裝aiohttp_jinja2:
$ pip3 install aiohttp_jinja2
在application/routes.py文件中添加一個模版文件路由:
from config.settings import STATIC_DIR, TEMPLATE_DIR
def setup_template_routes(app):
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(TEMPLATE_DIR))
把添加模版路由的函數(shù)添加到vote/main.py文件的init函數(shù)中:
from application.routes import setup_routes, setup_static_routes, setup_template_routes
async def init(loop):
mysql_pool = await create_pool(loop)
app = web.Application(loop=loop)
app['db'] = mysql_pool
setup_routes(app)
setup_static_routes(app)
setup_template_routes(app)
return app
增加pycharm普通項目對jinja2模版的支持,編輯.idea/vote.iml管跺,在component標簽的同級添加如下內(nèi)容:
<component name="TemplatesService">
<option name="TEMPLATE_CONFIGURATION" value="Jinja2" />
<option name="TEMPLATE_FOLDERS">
<list>
<option value="$MODULE_DIR$/template" />
</list>
</option>
</component>
新建一個模版HTML文件保存到template/index.html中义黎,內(nèi)容如下:
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
{% block title %}
<title>首頁</title>
{% endblock %}
<link rel="stylesheet" href="/static/css/uikit.min.css">
<link rel="stylesheet" href="/static/css/base.css">
<script src="/static/js/uikit.min.js"></script>
</head>
<body>
<nav class="uk-navbar-container uk-margin" uk-navbar>
<div class="uk-navbar-left">
<a class="uk-navbar-item uk-logo" href="#">Primumest</a>
<ul class="uk-navbar-nav">
<li class="uk-active"><a href="#">首頁</a></li>
<li>
<a href="#">編程語言</a>
<div class="uk-navbar-dropdown">
<ul class="uk-nav uk-navbar-dropdown-nav">
<li class="uk-active"><a href="#">python</a></li>
<li><a href="#">go</a></li>
<li><a href="#">c</a></li>
</ul>
</div>
</li>
<li><a href="#">問答</a></li>
</ul>
</div>
<div class="uk-navbar-right content">
<div class="uk-navbar-item">
<form class="uk-search uk-search-default">
<a href="" class="uk-search-icon-flip" uk-search-icon></a>
<input class="uk-input uk-form-width-smal" type="search" placeholder="Search">
</form>
</div>
<ul class="uk-navbar-nav">
<li class="uk-active"><a href="#">登錄</a></li>
<li><a href="{{ url('register') }}">注冊</a></li>
</ul>
</div>
</nav>
{% block content %}
{% endblock %}
</body>
</html>
新建注冊頁面保存到template/register.html中,內(nèi)容如下:
{% extends "index.html" %}
{% block title %}
<title>注冊</title>
{% endblock %}
{% block content %}
<div class="uk-container content">
<form class="uk-form register-box">
<fieldset style="width: 30%">
<legend>注冊賬號</legend>
<div class="uk-form-row">
<span class="uk-form-label">昵稱 </span>
<input type="text" name="name" placeholder="請輸入你的名字" class="uk-form-width-medium uk-form-small">
</div>
<div class="uk-form-row">
<span class="uk-form-label">郵箱 </span>
<input type="text" name="email" placeholder="請輸入你的郵箱" class="uk-form-width-medium uk-form-small">
</div>
<div class="uk-form-row">
<span class="uk-form-label">手機 </span>
<input type="text" name="phone" placeholder="請輸入你的手機號" class="uk-form-width-medium uk-form-small">
</div>
<div class="uk-form-row">
<span class="uk-form-label">密碼 </span>
<input type="password" name="password" placeholder="請輸入你的密碼" class="uk-form-width-medium uk-form-small">
</div>
<button type="submit" class="uk-button-primary">提交</button>
</fieldset>
</form>
</div>
{% endblock %}
頁面用到了jinja2模版的語法豁跑。
創(chuàng)建視圖函數(shù)用來訪問這個模版文件:
@aiohttp_jinja2.template('index.html')
async def index(request):
return
@aiohttp_jinja2.template('register.html')
async def register(request):
return
創(chuàng)建與之對應(yīng)的路由:
def setup_routes(app):
app.router.add_get('/hello', hello, name='hello')
app.router.add_get('/', index, name='index')
app.router.add_get('/register', register, name='register')
app.router.add_route('*', '/question/{question_id}/choice', QuestionChoices, name='QuestionChoices')
重啟服務(wù)器廉涕,瀏覽器訪問http://127.0.0.1:9000
瀏覽器訪問http://127.0.0.1:9000/register
調(diào)試工具箱
開發(fā)aiohttp.web
應(yīng)用項目時,aiohttp_debugtoolbar
是非常好用的一個調(diào)試工具艇拍。
可使用pip進行安裝:
$ pip3 install aiohttp_debugtoolbar
之后將aiohttp_debugtoolba
r中間件添加到aiohttp.web.Applicaiton
中并調(diào)用aiohttp_debugtoolbar.setup()
來部署:
import aiohttp_debugtoolbar
from aiohttp_debugtoolbar import toolbar_middleware_factory
app = web.Application(middlewares=[toolbar_middleware_factory])
aiohttp_debugtoolbar.setup(app)
這里是我們的配置:
import asyncio
import aiohttp_debugtoolbar
from aiohttp import web
from application.routes import setup_routes, setup_static_routes, setup_template_routes
from models import create_pool
from aiohttp_debugtoolbar import toolbar_middleware_factory
async def init(loop):
mysql_pool = await create_pool(loop)
app = web.Application(loop=loop, middlewares=[toolbar_middleware_factory])
app['db'] = mysql_pool
aiohttp_debugtoolbar.setup(app)
setup_routes(app)
setup_static_routes(app)
setup_template_routes(app)
return app
瀏覽器輸入地址http://127.0.0.1:9000/_debugtoolbar
可以看到如下頁面:
開發(fā)工具
aiohttp-devtools
提供幾個簡化開發(fā)的小工具狐蜕。
可以使用pip安裝:
$ pip3 install aiohttp-devtools
* ``runserver`` 提供自動重載,實時重載卸夕,靜態(tài)文件服務(wù)和aiohttp_debugtoolbar_integration层释。
* ``start`` 是一個幫助做繁雜且必須的創(chuàng)建'aiohttp.web'應(yīng)用的命令。
這是我們的項目啟動的例子:
$ adev runserver -v main.py --app-factory init -p 9000 --debug-toolbar --host localhost
這個adev著實難用快集,我們定義的init函數(shù)是個協(xié)程函數(shù)贡羔,但是它命令--app-factory要求必須是個普通函數(shù),并且返回一個aiohttp.web.Application个初。由于我們要使用數(shù)據(jù)庫連接池乖寒,必須使用await協(xié)程語法。所以我放棄使用這個東西了院溺。
創(chuàng)建和運行本地應(yīng)用的文檔和指南請看aiohttp-devtools
楣嘁。
下面準備編寫注冊、登錄的邏輯了,這里先使用session會話機制马澈。以后使用oauth2.0的token認證機制瓢省。
處理session會話
你經(jīng)常想要一個可以通過請求存儲用戶數(shù)據(jù)的倉庫。一般簡稱為會話痊班。
aiohttp.web
沒有內(nèi)置會話勤婚,不過你可以使用第三方庫aiohttp_session
來提供會話支持。
官網(wǎng)例子:
import asyncio
import aioredis
import time
from aiohttp import web
from aiohttp_session import setup, get_session
from aiohttp_session.redis_storage import RedisStorage
async def handler(request):
session = await get_session(request)
last_visit = session['last_visit'] if 'last_visit' in session else None
session['last_visit'] = time.time()
text = 'Last visited: {}'.format(last_visit)
return web.Response(text=text)
async def make_redis_pool():
redis_address = ('127.0.0.1', '6379')
return await aioredis.create_redis_pool(redis_address, timeout=1)
def make_app():
loop = asyncio.get_event_loop()
redis_pool = loop.run_until_complete(make_redis_pool())
storage = RedisStorage(redis_pool)
async def dispose_redis_pool(app):
redis_pool.close()
await redis_pool.wait_closed()
app = web.Application()
setup(app, storage)
app.on_cleanup.append(dispose_redis_pool)
app.router.add_get('/', handler)
return app
web.run_app(make_app())
安裝aiohttp_session
:
$ pip3 install aiohttp_session
session存儲使用redis涤伐,這里使用aioredis連接redis馒胆。
安裝aioredis:
$ pip3 install aioredis
創(chuàng)建redis全局連接池與redis命令簡單封裝,編輯models/db.py:
import aioredis
from config.settings import DATABASES, CACHES
async def create_redis_pool(loop):
"""定義redis全局連接池"""
logging.info('create redis connection pool...')
global _reids_pool
_reids_pool = await aioredis.create_pool(address=CACHES['address'], db=CACHES['db'], password=CACHES['password'],
minsize=CACHES['minsize'], maxsize=CACHES['maxsize'], loop=loop)
return _reids_pool
async def cache_set(*args, **kwargs):
"""redis set 命令封裝"""
with await aioredis.commands.Redis(_reids_pool) as redis:
await redis.set(*args, **kwargs)
async def cache_get(*args, **kwargs):
"""redis get 命令封裝"""
with await aioredis.commands.Redis(_reids_pool) as redis:
return await redis.get(*args, **kwargs)
async def cache_del(*args, **kwargs):
"""redis del 命令封裝"""
with await aioredis.commands.Redis(_reids_pool) as redis:
return await redis.delete(*args, **kwargs)
CACHES
在我們config/settings.py里面定義:
CACHES = {
'engine': 'redis',
'address': ('localhost', 6379),
'password': None,
'db': None,
'minsize': 1,
'maxsize': 10
}
把執(zhí)行redis命令的函數(shù)導(dǎo)入到models/init.py文件中凝果,方便別的模塊引用:
from .db import *
__all__ = ['create_pool', 'select', 'execute', 'fetchone', 'create_redis_pool', 'cache_set', 'cache_get', 'cache_del']
注冊頁面:
{% extends "index.html" %}
{% block title %}
<title>注冊</title>
{% endblock %}
{% block head_js %}
{% endblock %}
{% block content %}
<div class="uk-container content">
<form class="uk-form register-box" method="post" action="{{ url('Register') }}">
<fieldset style="width: 25%; padding: 1rem 0 1rem 5rem">
<legend style="text-align: center">注冊賬號</legend>
<div class="uk-form-row">
<span class="uk-form-label">昵稱 </span>
<input type="text" name="name" placeholder="請輸入你的名字" class="uk-width-1-2 uk-form-small">
</div>
<div class="uk-form-row">
<span class="uk-form-label">郵箱 </span>
<input type="text" name="email" placeholder="請輸入你的郵箱" class="uk-width-1-2 uk-form-small">
</div>
<div class="uk-form-row">
<span class="uk-form-label">手機 </span>
<input type="text" name="phone" placeholder="請輸入你的手機號" class="uk-width-1-2 uk-form-small">
</div>
<div class="uk-form-row">
<span class="uk-form-label">密碼 </span>
<input type="password" name="password" placeholder="請輸入你的密碼" class="uk-width-1-2 uk-form-small">
</div>
<button type="submit" class="uk-button-primary">提交</button>
</fieldset>
</form>
</div>
{% endblock %}
注冊視圖函數(shù):
class Register(web.View):
"""a view handler for register page"""
@aiohttp_jinja2.template('register.html')
async def get(self):
return
async def post(self):
data = await self.request.post()
user = await fetchone('select id from user where email = %s or phone = %s', (data.get('email'), data.get('phone')))
# print(await self.request.multipart())
if user:
msg = {'error_code': 20001, 'error_msg': 'The email or phone has been registered'}
else:
params = (data.get('name'), data.get('email'), data.get('phone'), data.get('password'))
result = await fetchone('INSERT INTO user(name, email, phone, password) VALUES(%s, %s, %s, %s)', params)
if result:
msg = {'error_code': 0, 'error_msg': 'ok'}
else:
msg = {'error_code': 20002, 'error_msg': 'Please try again if registration fails'}
# return web.json_response(data=msg, dumps=json_dumps)
return web.json_response(data=msg, dumps=json_dumps)
登錄頁面:
{% extends "index.html" %}
{% block title %}
<title>登錄</title>
{% endblock %}
{% block head_js %}
{% endblock %}
{% block content %}
<div class="uk-container content">
<form class="uk-form register-box uk-text-center" method="post" action="{{ url('Login') }}" style="margin-top: 2rem;">
<div class="uk-form-row">
<input type="text" name="account" placeholder="請輸入郵箱或手機號" class="uk-width-1-5 uk-form-small">
</div>
<div class="uk-form-row">
<input type="password" name="password" placeholder="請輸入你的密碼" class="uk-width-1-5 uk-form-small">
</div>
<button type="submit" class="uk-width-1-5 uk-button-primary uk-button-small">提交</button>
{% if msg %}
<p class="uk-text-danger">{{ msg.error_msg }}</p>
{% endif %}
</form>
</div>
{% endblock %}
{% block bottom_js %}
{% endblock %}
登錄視圖函數(shù):
class Login(web.View):
"""a view handler for login page"""
async def get(self):
return aiohttp_jinja2.render_template('login.html', self.request, locals())
async def post(self):
data = await self.request.post()
account = data.get('account')
password = data.get('password')
columns = 'id, name, email, phone, password'
if len(account) == 11 and re.match(r'^1[35678]\d{9}', account):
user = await fetchone('select {} from user where phone = %s'.format(columns), (account,))
elif re.match(r'^[\w-]+(\.[\w-]+)*@[\w-]+(\.[\w-]+)+$', account):
user = await fetchone('select {} from user where email = %s'.format(columns), (account,))
else:
msg = {'error_code': 20003, 'error_msg': 'User does not exists'}
return aiohttp_jinja2.render_template('login.html', self.request, locals())
if password != user.get('password'):
msg = {'error_code': 20004, 'error_msg': 'Password mismatch'}
return aiohttp_jinja2.render_template('login.html', self.request, locals())
session = await get_session(self.request)
session['uid'] = user.get('id')
# sessionid = session.identity
return web.Response(status=302, headers={'location': '/'})
給首頁視圖函數(shù)增加個驗證登錄到裝飾器:
from aiohttp_session import get_session
from functools import wraps
def login_required(func): # 用戶登錄狀態(tài)校驗
"""This function applies only to class views."""
@wraps(func)
async def inner(cls, *args, **kwargs):
session = await get_session(cls.request)
uid = session.get("uid")
if uid:
user = await fetchone('select id, name, email, phone from user where id = %s', (uid,))
cls.request.app.userdata = user
return await func(cls, *args, **kwargs)
else:
return web.Response(status=302, headers={'location': '/login'})
return inner
class Index(web.View):
"""a view handler for home page"""
@login_required
async def get(self):
# response.headers['Content-Language'] = 'utf-8'
return aiohttp_jinja2.render_template('index.html', self.request, locals())
這里我把視圖處理函數(shù)全部改為類視圖方式編寫了祝迂。
增加路由:
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'junxi'
import aiohttp_jinja2
import jinja2
import uuid
from application.views import Hello, Index, Register, Login, QuestionChoices, Questions, hash_sha256
from config.settings import STATIC_DIR, TEMPLATE_DIR
from aiohttp_session import setup
from aiohttp_session.redis_storage import RedisStorage
def setup_session(app, redis_pool):
storage = RedisStorage(redis_pool=redis_pool, cookie_name='sessionid', key_factory=lambda: hash_sha256(uuid.uuid4().hex))
setup(app, storage)
def setup_routes(app):
app.router.add_view('/hello', Hello, name='Hello')
app.router.add_view('', Index, name='Index')
app.router.add_view('/register', Register, name='Register')
app.router.add_view('/login', Login, name='Login')
app.router.add_view('/questions/{question_id}/choice', QuestionChoices, name='QuestionChoices'
main.py
增加session處理:
async def init(loop):
mysql_pool = await create_pool(loop)
redis_pool = await create_redis_pool(loop)
# app = web.Application(loop=loop, middlewares=[toolbar_middleware_factory])
# aiohttp_debugtoolbar.setup(app)
async def dispose_mysql_pool():
mysql_pool.close()
await mysql_pool.wait_closed()
async def dispose_redis_pool():
redis_pool.close()
await redis_pool.wait_closed()
async def dispose_pool(app):
await dispose_mysql_pool()
await dispose_redis_pool()
app = web.Application(loop=loop)
setup_session(app, redis_pool)
setup_routes(app)
setup_static_routes(app)
setup_template_routes(app)
app.on_cleanup.append(dispose_pool)
return app
重新啟動服務(wù)器,輸入地址http://127.0.0.1:9000/
器净, 會跳轉(zhuǎn)到登錄頁面:
輸入賬號密碼登錄:
跳轉(zhuǎn)到首頁型雳,可以看到右上角顯示昵稱,已經(jīng)登錄成功了山害。
增加問答頁面:
{% extends "index.html" %}
{% block title %}
<title>問答</title>
{% endblock %}
{% block head_js %}
{% endblock %}
{% block content %}
<div class="uk-container content">
<div class="uk-child-width-1-2@s" uk-grid>
{% for question in questions %}
<div>
<div class="uk-dark uk-background-muted uk-padding">
<h3 class="uk-text-danger">{{ question.question_text }}</h3>
{% for i in question.question_choice|choice_split %}
<p><label><input class="uk-radio" type="radio" name="radio2" value="{{ i.0 }}"> {{ i.1 }}</label></p>
{% endfor %}
<button class="uk-button-primary uk-button-small">提交</button>
</div>
</div>
{% endfor %}
</div>
</div>
{% endblock %}
{% block bottom_js %}
{% endblock %}
增加問答視圖函數(shù):
class Questions(web.View):
"""a view handler for look at all questions"""
@login_required
async def get(self):
questions = await select('select q.id as qid, q.question_text, (select group_concat(concat_ws("|", c.id, c.choice_text)) from choice c where c.question_id = q.id) as question_choice from question q;')
return aiohttp_jinja2.render_template('questions.html', self.request, locals())
增加路由以及我們自定義的jinja2模版上下文處理函數(shù):
import aiohttp_jinja2
import jinja2
import uuid
from application.views import Hello, Index, Register, Login, QuestionChoices, Questions, hash_sha256
from config.settings import STATIC_DIR, TEMPLATE_DIR
from aiohttp_session import setup
from aiohttp_session.redis_storage import RedisStorage
def setup_session(app, redis_pool):
storage = RedisStorage(redis_pool=redis_pool, cookie_name='sessionid', key_factory=lambda: hash_sha256(uuid.uuid4().hex))
setup(app, storage)
def setup_routes(app):
app.router.add_view('/hello', Hello, name='Hello')
app.router.add_view('', Index, name='Index')
app.router.add_view('/register', Register, name='Register')
app.router.add_view('/login', Login, name='Login')
app.router.add_view('/questions/{question_id}/choice', QuestionChoices, name='QuestionChoices')
app.router.add_view('/questions', Questions, name='Questions')
def setup_static_routes(app):
app.router.add_static('/static/', path=STATIC_DIR, name='static')
def setup_template_routes(app):
aiohttp_jinja2.setup(app, filters={'choice_split': choice_split}, loader=jinja2.FileSystemLoader(TEMPLATE_DIR))
def choice_split(choices):
for i in choices.split(','):
single = i.split('|')
yield single
重啟服務(wù)后查看問答頁面http://127.0.0.1:9000/questions
項目展示
這是完整代碼纠俭。
supervisor部署項目
安裝supervisor:
mkdir ~/supervisor
cd ~/supervisor/
wget https://files.pythonhosted.org/packages/44/60/698e54b4a4a9b956b2d709b4b7b676119c833d811d53ee2500f1b5e96dc3/supervisor-3.3.4.tar.gz
tar zxf supervisor-3.3.4.tar.gz
cd supervisor-3.3.4
sudo python setup.py install
supervisord -v
生成配置文件:
$ echo_supervisord_conf > supervisord.conf
啟動:
$ supervisord -c supervisord.conf
查看 supervisord 是否在運行:
$ ps aux|grep supervisord
junxi 5064 0.0 0.0 4267768 900 s000 S+ 10:37上午 0:00.00 grep --color supervisord
junxi 5059 0.0 0.0 4344312 2196 ?? Ss 10:37上午 0:00.01 /usr/bin/python /usr/local/bin/supervisord -c supervisord.conf
打開配置文件:
vim supervisord.conf
創(chuàng)建aio目錄:
mkdir aio
在配置文件底部,配置include
[include]
files = aio/*.conf
其他參數(shù)配置:
# grep -Ev '^;|^$' supervisord.conf
[unix_http_server]
file=/var/log/supervisor/supervisor.sock ; the path to the socket file
[inet_http_server] ; inet (TCP) server disabled by default
port=127.0.0.1:9001 ; ip_address:port specifier, *:port for all iface
username=user ; default is no username (open server)
password=123 ; default is no password (open server)
[supervisord]
logfile=/var/log/supervisor/supervisord.log ; main log file; default $CWD/supervisord.log
logfile_maxbytes=50MB ; max main logfile bytes b4 rotation; default 50MB
logfile_backups=10 ; # of main logfile backups; 0 means none, default 10
loglevel=info ; log level; default info; others: debug,warn,trace
pidfile=/var/log/supervisor/supervisord.pid ; supervisord pidfile; default supervisord.pid
nodaemon=false ; start in foreground if true; default false
minfds=1024 ; min. avail startup file descriptors; default 1024
minprocs=200 ; min. avail process descriptors;default 200
childlogdir=/var/log/supervisor ; 'AUTO' child log dir, default $TEMP
[include]
files = /Users/junxi/supervisor/aio/*.conf
在aio文件夾下新建vote.conf文件用于啟動我們的vote項目浪慌,內(nèi)容如下:
# vim aio/vote.conf
[program:vote]
numprocs = 4
numprocs_start = 1
process_name = vote_910%(process_num)s
command=python3 /Users/junxi/program/vote/main.py --port=910%(process_num)s
directory=/Users/junxi/program/vote
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/vote/access.log
loglevel=info
創(chuàng)建存放日志的文件夾:
$ sudo mkdir /var/log/supervisor
$ sudo chown -R junxi:admin /var/log/supervisor
$ sudo mkdir /var/log/vote/
$ sudo chown -R junxi:admin /var/log/vote/
重啟supervisor:
$ kill -Hup `ps -ef|grep supervisord|awk 'NR==1{print $2}'`
或者手動找到pid重啟冤荆。
使用客戶端supervisorctl管理進程的啟動
連接到服務(wù)端:
$ supervisorctl -c supervisord.conf
輸入默認的賬戶user
,密碼123
進入命令行权纤。
查看狀態(tài):
supervisor> help
default commands (type help <topic>):
=====================================
add exit open reload restart start tail
avail fg pid remove shutdown status update
clear maintail quit reread signal stop version
supervisor> status
vote:vote_9101 STOPPED Apr 17 11:00 PM
vote:vote_9102 STOPPED Apr 17 11:00 PM
vote:vote_9103 STOPPED Apr 17 11:00 PM
vote:vote_9104
啟動vote:
supervisor> start all
vote:vote_9101: started
vote:vote_9102: started
vote:vote_9103: started
vote:vote_9104: started
瀏覽器輸入 http://127.0.0.1:9001/
打開web頁面查看supervisor狀態(tài)钓简,就是我們配置文件中的inet_http_server。
瀏覽器輸入4個端口(分別為9101汹想、9102外邓、9103、9104)分別進行訪問測試:
然后再使用nginx做個負載均衡:
proxy_next_upstream error;
upstream votes {
server 127.0.0.1:9101;
server 127.0.0.1:9102;
server 127.0.0.1:9103;
server 127.0.0.1:9104;
}
server {
listen 8008;
server_name localhost;
access_log /var/log/nginx/vote/access.log;
error_log /var/log/nginx/vote/error.log;
proxy_read_timeout 200;
location /static/ {
alias /Users/junxi/program/vote/static/;
}
location / {
proxy_pass_header Server;
proxy_set_header Host $http_host;
proxy_redirect off;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Scheme $scheme;
proxy_pass http://votes;
}
}
別忘了設(shè)置Nginx的worker_rlimit_nofile欧宜、worker_connections坐榆、worker_processes拴魄。
訪問http://localhost:8008/hello
Nice冗茸。
先寫到這里了。