1. 用戶會(huì)話管理
環(huán)境設(shè)置
首先喘沿,確保你安裝了必要的Python庫(kù):
pip install flask redis
示例代碼
-
初始化Flask應(yīng)用和Redis客戶端:
from flask import Flask, session, redirect, url_for, request import redis import os app = Flask(__name__) # 配置Flask應(yīng)用的密鑰 app.secret_key = os.urandom(24) # 配置Redis客戶端 redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True) # Redis會(huì)話存儲(chǔ)的前綴 SESSION_PREFIX = "session:"
-
設(shè)置會(huì)話管理的輔助函數(shù):
def save_session_to_redis(session_id, session_data): redis_client.hmset(f"{SESSION_PREFIX}{session_id}", session_data) def load_session_from_redis(session_id): return redis_client.hgetall(f"{SESSION_PREFIX}{session_id}") def delete_session_from_redis(session_id): redis_client.delete(f"{SESSION_PREFIX}{session_id}")
-
定義登錄整慎、登出和主頁(yè)路由:
@app.route('/') def index(): if 'username' in session: username = session['username'] return f'Logged in as {username}' return 'You are not logged in' @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': session_id = request.form['session_id'] session['username'] = request.form['username'] session_data = {'username': session['username']} save_session_to_redis(session_id, session_data) return redirect(url_for('index')) return ''' <form method="post"> Session ID: <input type="text" name="session_id"><br> Username: <input type="text" name="username"><br> <input type="submit" value="Login"> </form> ''' @app.route('/logout') def logout(): session_id = request.args.get('session_id') session.pop('username', None) delete_session_from_redis(session_id) return redirect(url_for('index'))
-
在應(yīng)用啟動(dòng)時(shí)加載會(huì)話數(shù)據(jù):
@app.before_request def load_user_session(): session_id = request.args.get('session_id') if session_id: session_data = load_session_from_redis(session_id) if session_data: session.update(session_data)
運(yùn)行應(yīng)用
保存上述代碼為一個(gè)Python文件(例如app.py),然后運(yùn)行它:
python app.py
打開瀏覽器性宏,訪問(wèn)
http://localhost:5000/
,你可以看到登錄和登出的頁(yè)面,并且會(huì)話數(shù)據(jù)會(huì)存儲(chǔ)在Redis中七兜。
2. 消息隊(duì)列
環(huán)境設(shè)置
確保你安裝了必要的Python庫(kù):
pip install redis
示例代碼
我們將創(chuàng)建兩個(gè)腳本:一個(gè)生產(chǎn)者(Producer)腳本昌屉,用于將消息放入隊(duì)列钙蒙;一個(gè)消費(fèi)者(Consumer)腳本,用于從隊(duì)列中讀取并處理消息间驮。
-
Producer腳本:
import redis # 配置Redis客戶端 redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True) # 定義隊(duì)列名稱 QUEUE_NAME = "message_queue" def send_message(message): # 將消息放入隊(duì)列 redis_client.rpush(QUEUE_NAME, message) print(f"Message sent: {message}") if __name__ == "__main__": # 示例消息 messages = ["Hello", "World", "Redis", "Queue"] for message in messages: send_message(message)
-
Consumer腳本:
import redis import time # 配置Redis客戶端 redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True) # 定義隊(duì)列名稱 QUEUE_NAME = "message_queue" def process_message(): while True: # 從隊(duì)列中獲取消息 message = redis_client.blpop(QUEUE_NAME, timeout=0) if message: print(f"Message received: {message[1]}") # 模擬消息處理 time.sleep(1) if __name__ == "__main__": print("Consumer is running...") process_message()
運(yùn)行示例
-
啟動(dòng)Consumer腳本: 打開一個(gè)終端窗口躬厌,運(yùn)行消費(fèi)者腳本:
python consumer.py
-
運(yùn)行Producer腳本:
打開另一個(gè)終端窗口,運(yùn)行生產(chǎn)者腳本:
python producer.py
你會(huì)看到Consumer腳本從Redis隊(duì)列中讀取消息并處理它們竞帽。
消息處理邏輯
-
Producer腳本使用
rpush
方法將消息放入Redis列表的右端扛施。 -
Consumer腳本使用
blpop
方法從Redis列表的左端讀取消息。blpop
是一個(gè)阻塞操作屹篓,當(dāng)列表為空時(shí)會(huì)等待疙渣,直到有新的消息進(jìn)入。
-
3. 實(shí)時(shí)分析和統(tǒng)計(jì)
環(huán)境設(shè)置
確保你安裝了必要的Python庫(kù):
pip install flask redis
示例代碼
我們將創(chuàng)建一個(gè)簡(jiǎn)單的Flask Web應(yīng)用堆巧,用于統(tǒng)計(jì)和顯示網(wǎng)站訪問(wèn)量妄荔。
-
初始化Flask應(yīng)用和Redis客戶端:
from flask import Flask, request, jsonify import redis import time app = Flask(__name__) # 配置Redis客戶端 redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True) # 定義統(tǒng)計(jì)數(shù)據(jù)存儲(chǔ)的前綴 STATS_PREFIX = "stats:" # 獲取當(dāng)前日期(用于統(tǒng)計(jì)按天存儲(chǔ)) def get_current_date(): return time.strftime("%Y-%m-%d") # 增加訪問(wèn)量統(tǒng)計(jì) def increment_page_views(page): current_date = get_current_date() redis_client.hincrby(f"{STATS_PREFIX}{current_date}", page, 1) # 獲取某天的訪問(wèn)量統(tǒng)計(jì) def get_page_views(date): return redis_client.hgetall(f"{STATS_PREFIX}{date}") @app.route('/') def home(): increment_page_views("home") return "Welcome to the homepage!" @app.route('/about') def about(): increment_page_views("about") return "Welcome to the about page!" @app.route('/stats') def stats(): date = request.args.get('date', get_current_date()) stats = get_page_views(date) return jsonify(stats) if __name__ == "__main__": app.run(debug=True)
運(yùn)行應(yīng)用
保存上述代碼為一個(gè)Python文件(例如app.py),然后運(yùn)行它:
python app.py
打開瀏覽器恳邀,訪問(wèn)以下URL:
- 訪問(wèn)主頁(yè):http://localhost:5000/
- 訪問(wèn)關(guān)于頁(yè):http://localhost:5000/about
- 查看統(tǒng)計(jì)數(shù)據(jù):http://localhost:5000/stats
代碼解釋
-
increment_page_views(page)
:增加指定頁(yè)面的訪問(wèn)量統(tǒng)計(jì)懦冰。使用Redis的hincrby
命令將訪問(wèn)量存儲(chǔ)在哈希表中,以當(dāng)前日期為鍵谣沸,以頁(yè)面名稱為字段刷钢。 -
get_page_views(date)
:獲取指定日期的頁(yè)面訪問(wèn)量統(tǒng)計(jì)。使用Redis的hgetall
命令讀取哈希表中的所有字段和值乳附。 -
/stats
路由:返回指定日期的訪問(wèn)量統(tǒng)計(jì)内地,默認(rèn)為當(dāng)前日期伴澄。
實(shí)時(shí)統(tǒng)計(jì)功能
通過(guò)上述示例,你可以實(shí)現(xiàn)實(shí)時(shí)的訪問(wèn)量統(tǒng)計(jì)和顯示阱缓。這對(duì)于需要高效處理大量數(shù)據(jù)的應(yīng)用程序來(lái)說(shuō)非常有用非凌,例如實(shí)時(shí)監(jiān)控、分析和報(bào)告荆针。
4. 分布式鎖
環(huán)境設(shè)置
確保你安裝了必要的Python庫(kù):
pip install redis
示例代碼
我們將創(chuàng)建一個(gè)簡(jiǎn)單的分布式鎖類敞嗡,并演示如何在多個(gè)進(jìn)程或線程中使用它。
-
分布式鎖類:
import redis import time import uuid class RedisDistributedLock: def __init__(self, redis_client, lock_key, timeout=10): self.redis_client = redis_client self.lock_key = lock_key self.timeout = timeout self.lock_id = str(uuid.uuid4()) def acquire(self): while True: if self.redis_client.set(self.lock_key, self.lock_id, nx=True, ex=self.timeout): return True time.sleep(0.01) def release(self): lock_value = self.redis_client.get(self.lock_key) if lock_value and lock_value == self.lock_id: self.redis_client.delete(self.lock_key) def __enter__(self): self.acquire() def __exit__(self, exc_type, exc_value, traceback): self.release()
-
示例使用:
import redis import threading # 配置Redis客戶端 redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True) def critical_section(lock): with lock: print(f"{threading.current_thread().name} has acquired the lock") time.sleep(2) print(f"{threading.current_thread().name} is releasing the lock") if __name__ == "__main__": lock_key = "distributed_lock" lock = RedisDistributedLock(redis_client, lock_key, timeout=5) threads = [] for i in range(5): thread = threading.Thread(target=critical_section, args=(lock,)) threads.append(thread) thread.start() for thread in threads: thread.join()
代碼解釋
-
RedisDistributedLock類:
-
__init__
:初始化分布式鎖航背,設(shè)置Redis客戶端喉悴、鎖鍵、超時(shí)時(shí)間和唯一鎖ID玖媚。 -
acquire
:嘗試獲取鎖箕肃。如果鎖已存在,則等待并重試今魔。使用SET
命令的NX
和EX
參數(shù)確保操作是原子的勺像。 -
release
:釋放鎖。首先檢查當(dāng)前鎖的值是否與自己的鎖ID匹配错森,確保不會(huì)釋放其他客戶端的鎖吟宦。 -
__enter__
和__exit__
:實(shí)現(xiàn)上下文管理協(xié)議,支持使用with
語(yǔ)句问词。
-
-
示例使用:
-
critical_section
:模擬臨界區(qū)代碼督函,在獲取鎖后打印信息并休眠2秒,然后釋放鎖激挪。 - 多線程模擬:創(chuàng)建多個(gè)線程,每個(gè)線程嘗試進(jìn)入臨界區(qū)锋叨,使用分布式鎖確保同一時(shí)間只有一個(gè)線程進(jìn)入臨界區(qū)垄分。
-
運(yùn)行示例
保存上述代碼為一個(gè)Python文件(例如distributed_lock.py),然后運(yùn)行它:
python distributed_lock.py
你會(huì)看到線程按順序獲取和釋放鎖娃磺,確保在分布式環(huán)境中數(shù)據(jù)的一致性薄湿。
結(jié)論
通過(guò)這個(gè)示例,你可以理解如何使用Redis實(shí)現(xiàn)一個(gè)簡(jiǎn)單但有效的分布式鎖系統(tǒng)偷卧。這對(duì)于需要確保多個(gè)進(jìn)程或線程之間數(shù)據(jù)一致性的場(chǎng)景非常有用豺瘤,例如分布式任務(wù)調(diào)度、資源管理等听诸。
擴(kuò)展
self.redis_client.set(self.lock_key, self.lock_id, nx=True, ex=self.timeout)
這一行代碼的解釋如下:代碼背景
這行代碼使用Redis的
SET
命令來(lái)嘗試設(shè)置一個(gè)分布式鎖坐求。我們通過(guò)設(shè)置特定的參數(shù)來(lái)確保操作的原子性和時(shí)效性。參數(shù)解釋
-
self.lock_key
:這是鎖的鍵名(key)晌梨,表示我們?cè)赗edis中使用的鍵桥嗤。例如须妻,可以是"distributed_lock"
。 -
self.lock_id
:這是鎖的值(value)泛领,我們使用一個(gè)唯一的ID來(lái)表示持有鎖的客戶端荒吏。這可以防止誤釋放他人的鎖。通常使用uuid.uuid4()
生成的唯一ID渊鞋。 -
nx=True
:這是SET
命令的NX
選項(xiàng)绰更,表示“僅當(dāng)鍵不存在時(shí)設(shè)置鍵”。這確保了只有當(dāng)鎖鍵不存在時(shí)锡宋,才能成功設(shè)置鎖儡湾。 -
ex=self.timeout
:這是SET
命令的EX
選項(xiàng),表示“設(shè)置鍵的過(guò)期時(shí)間(以秒為單位)”员辩。這確保了鎖在指定的時(shí)間后會(huì)自動(dòng)釋放盒粮,以防止死鎖。
操作解釋
-
原子性:
-
SET
命令與NX
和EX
選項(xiàng)一起使用時(shí)是原子的奠滑,這意味著整個(gè)操作在Redis服務(wù)器上是一次性完成的丹皱,不會(huì)被其他命令打斷。這對(duì)于分布式鎖至關(guān)重要宋税,因?yàn)槲覀冃枰_保在設(shè)置鎖的同時(shí)不會(huì)有其他客戶端同時(shí)成功設(shè)置相同的鎖摊崭。
-
-
確保唯一性:
-
self.lock_id
是一個(gè)唯一的ID(通常使用UUID生成),用于標(biāo)識(shí)持有鎖的客戶端杰赛。即使多個(gè)客戶端嘗試設(shè)置相同的鎖鍵呢簸,它們使用不同的鎖值,確保每個(gè)鎖的持有者是唯一的乏屯。
-
-
防止死鎖:
-
EX
選項(xiàng)設(shè)置鎖的過(guò)期時(shí)間根时,防止持有鎖的客戶端在崩潰或失去連接后一直持有鎖。這樣可以確保在指定的超時(shí)時(shí)間后鎖會(huì)自動(dòng)釋放辰晕,使其他客戶端有機(jī)會(huì)獲取鎖蛤迎。
-
整體邏輯
當(dāng)一個(gè)客戶端嘗試獲取鎖時(shí):
- 它嘗試使用
SET
命令在Redis中設(shè)置一個(gè)鍵(lock_key
),值為lock_id
含友。 - 如果
lock_key
不存在(即鎖當(dāng)前未被占用)替裆,Redis會(huì)設(shè)置鍵和值,并返回True
窘问。 - 如果
lock_key
已存在(即鎖當(dāng)前已被占用)辆童,Redis不會(huì)設(shè)置鍵和值,并返回False
惠赫。
這樣把鉴,我們可以通過(guò)檢查
SET
命令的返回值來(lái)確定是否成功獲取了鎖。代碼片段解釋
def acquire(self): while True: if self.redis_client.set(self.lock_key, self.lock_id, nx=True, ex=self.timeout): return True time.sleep(0.01)
在
acquire
方法中汉形,代碼不斷嘗試獲取鎖:- 如果
SET
命令返回True
纸镊,表示成功獲取鎖倍阐,方法返回True
。 - 如果
SET
命令返回False
逗威,表示鎖已被占用峰搪,代碼等待0.01秒后重試。這種循環(huán)確笨瘢客戶端最終會(huì)成功獲取鎖(除非鎖永遠(yuǎn)無(wú)法釋放)概耻。
-
RedisDistributedLock類:
5. 緩存數(shù)據(jù)庫(kù)查詢結(jié)果
環(huán)境設(shè)置
確保你安裝了必要的Python庫(kù):
pip install flask redis sqlalchemy
假設(shè)我們使用SQLite作為數(shù)據(jù)庫(kù),你可以根據(jù)需要替換為其他數(shù)據(jù)庫(kù)罐呼。
示例代碼
-
設(shè)置Flask應(yīng)用和數(shù)據(jù)庫(kù):
from flask import Flask, request, jsonify from flask_sqlalchemy import SQLAlchemy import redis import json import time app = Flask(__name__) # 配置Flask應(yīng)用的數(shù)據(jù)庫(kù) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///example.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False db = SQLAlchemy(app) # 配置Redis客戶端 redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True) # 定義數(shù)據(jù)庫(kù)模型 class User(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(80), unique=True, nullable=False) age = db.Column(db.Integer, nullable=False) # 初始化數(shù)據(jù)庫(kù) with app.app_context(): db.create_all()
-
緩存輔助函數(shù):
CACHE_TIMEOUT = 60 # 緩存超時(shí)時(shí)間鞠柄,以秒為單位 def get_cache(key): cached_data = redis_client.get(key) if cached_data: return json.loads(cached_data) return None def set_cache(key, data): redis_client.setex(key, CACHE_TIMEOUT, json.dumps(data))
-
數(shù)據(jù)庫(kù)查詢和緩存的示例:
@app.route('/user/<int:user_id>', methods=['GET']) def get_user(user_id): cache_key = f"user:{user_id}" # 嘗試從緩存中獲取數(shù)據(jù) cached_data = get_cache(cache_key) if cached_data: return jsonify(cached_data) # 如果緩存中沒(méi)有數(shù)據(jù),從數(shù)據(jù)庫(kù)中查詢 user = User.query.get(user_id) if not user: return jsonify({"error": "User not found"}), 404 user_data = {"id": user.id, "name": user.name, "age": user.age} # 將查詢結(jié)果緩存到Redis set_cache(cache_key, user_data) return jsonify(user_data)
-
插入新用戶的示例:
@app.route('/user', methods=['POST']) def add_user(): data = request.json new_user = User(name=data['name'], age=data['age']) db.session.add(new_user) db.session.commit() return jsonify({"message": "User added", "user": {"id": new_user.id, "name": new_user.name, "age": new_user.age}}), 201
運(yùn)行應(yīng)用
保存上述代碼為一個(gè)Python文件(例如app.py)嫉柴,然后運(yùn)行它:
python app.py
打開瀏覽器厌杜,訪問(wèn)以下URL:
-
添加新用戶:
curl -X POST http://localhost:5000/user -H "Content-Type: application/json" -d '{"name": "John Doe", "age": 30}'
你應(yīng)該看到類似這樣的響應(yīng):
{ "message": "User added", "user": { "id": 1, "name": "John Doe", "age": 30 } }
-
獲取用戶信息:
curl http://localhost:5000/user/1
如果緩存有效,你應(yīng)該看到類似這樣的響應(yīng):
{ "id": 1, "name": "John Doe", "age": 30 }
-
代碼解釋
-
get_cache
和set_cache
:這兩個(gè)函數(shù)用于從Redis緩存中獲取和設(shè)置數(shù)據(jù)计螺。set_cache
使用setex
方法來(lái)設(shè)置帶有超時(shí)時(shí)間的緩存鳖粟。 -
get_user
路由:首先嘗試從緩存中獲取用戶數(shù)據(jù)跛蛋。如果緩存中沒(méi)有數(shù)據(jù)遏乔,則從數(shù)據(jù)庫(kù)中查詢囚枪,并將結(jié)果緩存到Redis。 -
add_user
路由:用于添加新用戶到數(shù)據(jù)庫(kù)陈轿。
通過(guò)這個(gè)示例圈纺,你可以理解如何使用Redis緩存數(shù)據(jù)庫(kù)查詢結(jié)果,以提高應(yīng)用程序的性能和響應(yīng)速度麦射。
6. 排行榜系統(tǒng)
環(huán)境設(shè)置
確保你安裝了必要的Python庫(kù):
pip install flask redis
示例代碼
我們將創(chuàng)建一個(gè)Flask Web應(yīng)用蛾娶,用于管理和顯示用戶的排行榜。
-
初始化Flask應(yīng)用和Redis客戶端:
from flask import Flask, request, jsonify import redis app = Flask(__name__) # 配置Redis客戶端 redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True) # 定義排行榜鍵 LEADERBOARD_KEY = "leaderboard"
-
增加用戶分?jǐn)?shù)和獲取排行榜的輔助函數(shù):
def add_score(user, score): redis_client.zadd(LEADERBOARD_KEY, {user: score}) def get_leaderboard(top_n): return redis_client.zrevrange(LEADERBOARD_KEY, 0, top_n-1, withscores=True)
-
增加用戶分?jǐn)?shù)和獲取排行榜的API路由:
@app.route('/add_score', methods=['POST']) def add_score_route(): data = request.json user = data['user'] score = data['score'] add_score(user, score) return jsonify({"message": "Score added successfully"}), 201 @app.route('/leaderboard', methods=['GET']) def leaderboard_route(): top_n = int(request.args.get('top_n', 10)) # 默認(rèn)為前10名 leaderboard = get_leaderboard(top_n) return jsonify(leaderboard)
運(yùn)行應(yīng)用
保存上述代碼為一個(gè)Python文件(例如app.py)潜秋,然后運(yùn)行它:
python app.py
打開瀏覽器茫叭,使用以下命令測(cè)試應(yīng)用:
-
增加用戶分?jǐn)?shù):
curl -X POST http://localhost:5000/add_score -H "Content-Type: application/json" -d '{"user": "Alice", "score": 1500}' curl -X POST http://localhost:5000/add_score -H "Content-Type: application/json" -d '{"user": "Bob", "score": 2000}' curl -X POST http://localhost:5000/add_score -H "Content-Type: application/json" -d '{"user": "Charlie", "score": 1200}'
-
獲取排行榜:
curl http://localhost:5000/leaderboard?top_n=3
你應(yīng)該看到類似這樣的響應(yīng):
[ ["Bob", 2000.0], ["Alice", 1500.0], ["Charlie", 1200.0] ]
代碼解釋
-
add_score(user, score)
:使用zadd
命令將用戶和分?jǐn)?shù)添加到Redis的Sorted Set中。Sorted Set中的元素根據(jù)分?jǐn)?shù)自動(dòng)排序半等。 -
get_leaderboard(top_n)
:使用zrevrange
命令獲取排行榜的前top_n
個(gè)用戶,按照分?jǐn)?shù)從高到低排序呐萨。 -
/add_score
路由:接受POST請(qǐng)求杀饵,將用戶和分?jǐn)?shù)添加到排行榜中。 -
/leaderboard
路由:接受GET請(qǐng)求谬擦,返回排行榜的前top_n
個(gè)用戶切距。
通過(guò)這個(gè)示例,你可以理解如何使用Redis的Sorted Set來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)單但高效的排行榜系統(tǒng)惨远。這對(duì)于需要實(shí)時(shí)顯示排名的應(yīng)用程序非常有用谜悟,例如游戲排行榜话肖、網(wǎng)站活躍度排行等。
-
7. 實(shí)時(shí)聊天應(yīng)用
環(huán)境設(shè)置
確保你安裝了必要的Python庫(kù):
pip install flask redis gevent
示例代碼
我們將創(chuàng)建一個(gè)Flask Web應(yīng)用葡幸,使用WebSocket來(lái)處理實(shí)時(shí)消息傳遞最筒。Redis將用于消息的發(fā)布和訂閱。
-
初始化Flask應(yīng)用和Redis客戶端:
from flask import Flask, render_template from flask_sockets import Sockets import redis import gevent from geventwebsocket.handler import WebSocketHandler from gevent.pywsgi import WSGIServer app = Flask(__name__) sockets = Sockets(app) # 配置Redis客戶端 redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True) # 定義頻道 CHANNEL = 'chat'
-
WebSocket處理和Redis Pub/Sub:
class ChatBackend(object): def __init__(self): self.clients = [] self.pubsub = redis_client.pubsub() self.pubsub.subscribe(CHANNEL) def send(self, client, message): try: client.send(message) except: self.clients.remove(client) def send_all(self, message): for client in self.clients: self.send(client, message) def run(self): for message in self.pubsub.listen(): if message['type'] == 'message': self.send_all(message['data']) def start(self): gevent.spawn(self.run) chat_backend = ChatBackend() chat_backend.start()
-
WebSocket路由:
@sockets.route('/chat') def chat(ws): chat_backend.clients.append(ws) while not ws.closed: message = ws.receive() if message: redis_client.publish(CHANNEL, message)
-
HTML模板(保存為
templates/chat.html
):<!DOCTYPE html> <html> <head> <title>Chat</title> </head> <body> <h1>Chat Room</h1> <ul id="messages"></ul> <input id="message" autocomplete="off"><button onclick="sendMessage()">Send</button> <script> var ws = new WebSocket("ws://" + window.location.host + "/chat"); ws.onmessage = function(event) { var messages = document.getElementById('messages'); var message = document.createElement('li'); message.textContent = event.data; messages.appendChild(message); }; function sendMessage() { var input = document.getElementById("message"); ws.send(input.value); input.value = ''; } </script> </body> </html>
-
主頁(yè)路由:
@app.route('/') def index(): return render_template('chat.html')
-
運(yùn)行應(yīng)用:
if __name__ == "__main__": http_server = WSGIServer(("0.0.0.0", 5000), app, handler_class=WebSocketHandler) http_server.serve_forever()
運(yùn)行應(yīng)用
保存上述代碼為一個(gè)Python文件(例如app.py)蔚叨,然后運(yùn)行它:
python app.py
打開瀏覽器床蜘,訪問(wèn) http://localhost:5000/,你會(huì)看到一個(gè)簡(jiǎn)單的聊天界面蔑水。
代碼解釋
-
Redis Pub/Sub:
ChatBackend
類使用Redis的發(fā)布/訂閱功能來(lái)監(jiān)聽聊天消息邢锯。當(dāng)有新的消息發(fā)布到頻道時(shí),所有連接的客戶端都會(huì)收到這條消息搀别。 -
WebSocket處理:
/chat
路由處理WebSocket連接丹擎。每當(dāng)一個(gè)新客戶端連接時(shí),它被添加到客戶端列表中歇父。當(dāng)有新的消息時(shí)蒂培,這些消息會(huì)被發(fā)送到Redis頻道,其他客戶端會(huì)通過(guò)訂閱該頻道收到消息庶骄。 - HTML模板:一個(gè)簡(jiǎn)單的HTML頁(yè)面毁渗,使用JavaScript來(lái)處理WebSocket連接和消息發(fā)送。
通過(guò)這個(gè)示例单刁,你可以理解如何使用Redis的Pub/Sub功能和Flask的WebSocket來(lái)構(gòu)建一個(gè)實(shí)時(shí)聊天應(yīng)用灸异。這對(duì)于需要實(shí)時(shí)通信的應(yīng)用程序非常有用,例如聊天室羔飞、實(shí)時(shí)通知系統(tǒng)等肺樟。
8. 頻率限制
環(huán)境設(shè)置
確保你安裝了必要的Python庫(kù):
pip install flask redis
示例代碼
我們將創(chuàng)建一個(gè)簡(jiǎn)單的Flask Web應(yīng)用,用于演示如何使用Redis進(jìn)行頻率限制逻淌。
-
初始化Flask應(yīng)用和Redis客戶端:
from flask import Flask, request, jsonify import redis import time app = Flask(__name__) # 配置Redis客戶端 redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True) # 頻率限制設(shè)置 RATE_LIMIT = 5 # 每分鐘最多請(qǐng)求次數(shù) RATE_LIMIT_WINDOW = 60 # 窗口時(shí)間么伯,以秒為單位
-
頻率限制輔助函數(shù):
def is_rate_limited(ip): current_time = int(time.time()) key = f"rate_limit:{ip}" request_count = redis_client.get(key) if request_count is None: redis_client.set(key, 1, ex=RATE_LIMIT_WINDOW) return False elif int(request_count) < RATE_LIMIT: redis_client.incr(key) return False else: return True
-
應(yīng)用頻率限制的API路由:
@app.route('/limited') def limited(): client_ip = request.remote_addr if is_rate_limited(client_ip): return jsonify({"error": "Too many requests, please try again later."}), 429 return jsonify({"message": "Request successful"}), 200
-
啟動(dòng)應(yīng)用:
if __name__ == "__main__": app.run(debug=True)
運(yùn)行應(yīng)用
保存上述代碼為一個(gè)Python文件(例如app.py),然后運(yùn)行它:
python app.py
打開瀏覽器卡儒,訪問(wèn) http://localhost:5000/limited田柔。
代碼解釋
-
is_rate_limited(ip)
:該函數(shù)檢查給定IP地址的請(qǐng)求頻率。如果在限制的時(shí)間窗口內(nèi)請(qǐng)求次數(shù)超過(guò)設(shè)定值骨望,則返回True
硬爆,否則返回False
。- 使用Redis的
get
和set
方法來(lái)存儲(chǔ)和更新每個(gè)IP地址的請(qǐng)求計(jì)數(shù)擎鸠。 - 當(dāng)請(qǐng)求計(jì)數(shù)不存在時(shí)缀磕,設(shè)置計(jì)數(shù)為1,并設(shè)置過(guò)期時(shí)間。
- 當(dāng)請(qǐng)求計(jì)數(shù)存在且小于限制值時(shí)袜蚕,遞增計(jì)數(shù)糟把。
- 當(dāng)請(qǐng)求計(jì)數(shù)超過(guò)限制值時(shí),返回
True
牲剃,表示頻率限制已觸發(fā)遣疯。
- 使用Redis的
-
/limited
路由:處理受頻率限制保護(hù)的請(qǐng)求。它檢查客戶端的IP地址是否被限制颠黎,如果是另锋,則返回429狀態(tài)碼,否則返回成功消息狭归。
測(cè)試頻率限制
-
初次請(qǐng)求:訪問(wèn) http://localhost:5000/limited夭坪,應(yīng)該返回
{"message": "Request successful"}
。 -
超過(guò)限制的請(qǐng)求:快速連續(xù)訪問(wèn)該URL超過(guò)設(shè)定的限制次數(shù)(在這個(gè)例子中是5次)过椎,應(yīng)該返回
{"error": "Too many requests, please try again later."}
室梅,狀態(tài)碼為429。
進(jìn)一步優(yōu)化
- 按用戶頻率限制:如果有用戶登錄系統(tǒng)疚宇,可以按用戶ID進(jìn)行頻率限制亡鼠,而不是IP地址。
- 動(dòng)態(tài)頻率限制:可以根據(jù)用戶的角色或訂閱級(jí)別動(dòng)態(tài)調(diào)整頻率限制參數(shù)敷待。
9. 地理位置服務(wù)
環(huán)境設(shè)置
確保你安裝了必要的Python庫(kù):
pip install flask redis
示例代碼
我們將創(chuàng)建一個(gè)簡(jiǎn)單的Flask Web應(yīng)用间涵,用于管理和查詢用戶的地理位置。
-
初始化Flask應(yīng)用和Redis客戶端:
from flask import Flask, request, jsonify import redis app = Flask(__name__) # 配置Redis客戶端 redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True) # 定義Geo數(shù)據(jù)類型的鍵 GEO_KEY = "users:locations"
-
添加用戶地理位置的API:
@app.route('/add_location', methods=['POST']) def add_location(): data = request.json user_id = data['user_id'] longitude = data['longitude'] latitude = data['latitude'] redis_client.geoadd(GEO_KEY, (longitude, latitude, user_id)) return jsonify({"message": "Location added successfully"}), 201
-
查找附近用戶的API:
@app.route('/nearby', methods=['GET']) def nearby(): longitude = float(request.args.get('longitude')) latitude = float(request.args.get('latitude')) radius = float(request.args.get('radius', 10)) # 默認(rèn)查找10公里內(nèi)的用戶 unit = request.args.get('unit', 'km') # 默認(rèn)單位是公里 nearby_users = redis_client.georadius(GEO_KEY, longitude, latitude, radius, unit, withdist=True) results = [{"user_id": user[0], "distance": user[1]} for user in nearby_users] return jsonify(results), 200
-
啟動(dòng)應(yīng)用:
if __name__ == "__main__": app.run(debug=True)
運(yùn)行應(yīng)用
保存上述代碼為一個(gè)Python文件(例如app.py)榜揖,然后運(yùn)行它:
python app.py
打開瀏覽器勾哩,使用以下命令測(cè)試應(yīng)用:
-
添加用戶地理位置:
curl -X POST http://localhost:5000/add_location -H "Content-Type: application/json" -d '{"user_id": "user1", "longitude": 13.361389, "latitude": 38.115556}' curl -X POST http://localhost:5000/add_location -H "Content-Type: application/json" -d '{"user_id": "user2", "longitude": 15.087269, "latitude": 37.502669}'
-
查找附近用戶:
curl http://localhost:5000/nearby?longitude=13.361389&latitude=38.115556&radius=200&unit=km
你應(yīng)該看到類似這樣的響應(yīng):
[ {"user_id": "user1", "distance": 0.0}, {"user_id": "user2", "distance": 190.4424} ]
代碼解釋
-
geoadd
命令:將用戶的地理位置(經(jīng)度、緯度)添加到Redis中举哟。GEO_KEY
是用來(lái)存儲(chǔ)用戶位置數(shù)據(jù)的鍵思劳。 -
georadius
命令:查找指定位置(經(jīng)度、緯度)附近一定范圍內(nèi)的用戶妨猩。返回的結(jié)果包含用戶ID和距離潜叛。
API說(shuō)明
-
/add_location
:POST請(qǐng)求,接受JSON格式的用戶ID壶硅、經(jīng)度和緯度數(shù)據(jù)威兜,并將其存儲(chǔ)到Redis中。 -
/nearby
:GET請(qǐng)求庐椒,接受查詢位置的經(jīng)度牡属、緯度、搜索半徑和單位(可選扼睬,默認(rèn)為公里),返回附近的用戶及其距離。
進(jìn)一步優(yōu)化
- 按用戶ID查找位置:可以添加按用戶ID查找位置的API窗宇。
- 動(dòng)態(tài)調(diào)整搜索半徑和單位:允許客戶端動(dòng)態(tài)調(diào)整搜索半徑和單位措伐。
- 批量添加位置:可以實(shí)現(xiàn)批量添加用戶位置的功能,以提高性能军俊。
10. 內(nèi)容管理系統(tǒng)(CMS)緩存
環(huán)境設(shè)置
確保你安裝了必要的Python庫(kù):
pip install flask redis sqlalchemy
假設(shè)我們使用SQLite作為數(shù)據(jù)庫(kù)侥加,你可以根據(jù)需要替換為其他數(shù)據(jù)庫(kù)。
示例代碼
我們將創(chuàng)建一個(gè)簡(jiǎn)單的Flask Web應(yīng)用粪躬,用于管理和緩存內(nèi)容担败。
-
初始化Flask應(yīng)用、數(shù)據(jù)庫(kù)和Redis客戶端:
from flask import Flask, request, jsonify from flask_sqlalchemy import SQLAlchemy import redis import json import time app = Flask(__name__) # 配置Flask應(yīng)用的數(shù)據(jù)庫(kù) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///cms.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False db = SQLAlchemy(app) # 配置Redis客戶端 redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True) # 定義緩存前綴和超時(shí)時(shí)間 CACHE_PREFIX = "cms:" CACHE_TIMEOUT = 300 # 緩存時(shí)間镰官,以秒為單位
-
定義數(shù)據(jù)庫(kù)模型:
class Content(db.Model): id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(100), nullable=False) body = db.Column(db.Text, nullable=False) timestamp = db.Column(db.DateTime, default=db.func.current_timestamp())
-
緩存輔助函數(shù):
def get_cache(key): cached_data = redis_client.get(key) if cached_data: return json.loads(cached_data) return None def set_cache(key, data): redis_client.setex(key, CACHE_TIMEOUT, json.dumps(data))
-
API路由:
@app.route('/content/<int:content_id>', methods=['GET']) def get_content(content_id): cache_key = f"{CACHE_PREFIX}{content_id}" # 嘗試從緩存中獲取數(shù)據(jù) cached_data = get_cache(cache_key) if cached_data: return jsonify(cached_data) # 如果緩存中沒(méi)有數(shù)據(jù)提前,從數(shù)據(jù)庫(kù)中查詢 content = Content.query.get(content_id) if not content: return jsonify({"error": "Content not found"}), 404 content_data = { "id": content.id, "title": content.title, "body": content.body, "timestamp": content.timestamp.isoformat() } # 將查詢結(jié)果緩存到Redis set_cache(cache_key, content_data) return jsonify(content_data) @app.route('/content', methods=['POST']) def add_content(): data = request.json new_content = Content(title=data['title'], body=data['body']) db.session.add(new_content) db.session.commit() return jsonify({"message": "Content added", "content": { "id": new_content.id, "title": new_content.title, "body": new_content.body, "timestamp": new_content.timestamp.isoformat() }}), 201
-
初始化數(shù)據(jù)庫(kù):
with app.app_context(): db.create_all()
-
啟動(dòng)應(yīng)用:
if __name__ == "__main__": app.run(debug=True)
運(yùn)行應(yīng)用
保存上述代碼為一個(gè)Python文件(例如app.py),然后運(yùn)行它:
python app.py
打開瀏覽器泳唠,使用以下命令測(cè)試應(yīng)用:
-
添加內(nèi)容:
curl -X POST http://localhost:5000/content -H "Content-Type: application/json" -d '{"title": "Sample Title", "body": "Sample body content."}'
-
獲取內(nèi)容:
curl http://localhost:5000/content/1
代碼解釋
-
get_cache(key)
和set_cache(key, data)
:這兩個(gè)函數(shù)用于從Redis緩存中獲取和設(shè)置數(shù)據(jù)狈网。set_cache
使用setex
方法來(lái)設(shè)置帶有超時(shí)時(shí)間的緩存。 -
get_content(content_id)
路由:首先嘗試從緩存中獲取內(nèi)容數(shù)據(jù)笨腥。如果緩存中沒(méi)有數(shù)據(jù)拓哺,則從數(shù)據(jù)庫(kù)中查詢,并將結(jié)果緩存到Redis脖母。 -
add_content
路由:用于添加新內(nèi)容到數(shù)據(jù)庫(kù)士鸥。
進(jìn)一步優(yōu)化
- 緩存失效:當(dāng)內(nèi)容更新或刪除時(shí),確保緩存同步更新或失效谆级,以避免緩存不一致的問(wèn)題烤礁。
- 緩存層次:可以實(shí)現(xiàn)多級(jí)緩存,例如先在內(nèi)存中緩存哨苛,然后在Redis中緩存鸽凶,進(jìn)一步提高性能。
- 緩存預(yù)加載:在系統(tǒng)啟動(dòng)或特定時(shí)間點(diǎn)預(yù)加載熱點(diǎn)內(nèi)容到緩存建峭,以提高訪問(wèn)速度玻侥。
緩存擴(kuò)展內(nèi)容
1. 緩存失效
緩存失效是一個(gè)重要的問(wèn)題,特別是在內(nèi)容更新或刪除時(shí)亿蒸,確保緩存與數(shù)據(jù)庫(kù)的數(shù)據(jù)一致是關(guān)鍵凑兰。我們可以通過(guò)以下方法來(lái)解決緩存失效問(wèn)題:
- 刪除緩存:在內(nèi)容更新或刪除時(shí),直接刪除對(duì)應(yīng)的緩存鍵边锁。
- 更新緩存:在內(nèi)容更新時(shí)姑食,更新緩存中的數(shù)據(jù)。
- 使用緩存失效策略:如設(shè)置合理的緩存過(guò)期時(shí)間茅坛。
修改后的代碼
讓我們修改之前的代碼音半,添加處理緩存失效的邏輯则拷。
初始化Flask應(yīng)用和Redis客戶端
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
import redis
import json
import time
app = Flask(__name__)
# 配置Flask應(yīng)用的數(shù)據(jù)庫(kù)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///cms.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
# 配置Redis客戶端
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True)
# 定義緩存前綴和超時(shí)時(shí)間
CACHE_PREFIX = "cms:"
CACHE_TIMEOUT = 300 # 緩存時(shí)間,以秒為單位
定義數(shù)據(jù)庫(kù)模型
class Content(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(100), nullable=False)
body = db.Column(db.Text, nullable=False)
timestamp = db.Column(db.DateTime, default=db.func.current_timestamp())
緩存輔助函數(shù)
def get_cache(key):
cached_data = redis_client.get(key)
if cached_data:
return json.loads(cached_data)
return None
def set_cache(key, data):
redis_client.setex(key, CACHE_TIMEOUT, json.dumps(data))
def delete_cache(key):
redis_client.delete(key)
API路由
@app.route('/content/<int:content_id>', methods=['GET'])
def get_content(content_id):
cache_key = f"{CACHE_PREFIX}{content_id}"
# 嘗試從緩存中獲取數(shù)據(jù)
cached_data = get_cache(cache_key)
if cached_data:
return jsonify(cached_data)
# 如果緩存中沒(méi)有數(shù)據(jù)曹鸠,從數(shù)據(jù)庫(kù)中查詢
content = Content.query.get(content_id)
if not content:
return jsonify({"error": "Content not found"}), 404
content_data = {
"id": content.id,
"title": content.title,
"body": content.body,
"timestamp": content.timestamp.isoformat()
}
# 將查詢結(jié)果緩存到Redis
set_cache(cache_key, content_data)
return jsonify(content_data)
@app.route('/content', methods=['POST'])
def add_content():
data = request.json
new_content = Content(title=data['title'], body=data['body'])
db.session.add(new_content)
db.session.commit()
# 設(shè)置緩存
cache_key = f"{CACHE_PREFIX}{new_content.id}"
content_data = {
"id": new_content.id,
"title": new_content.title,
"body": new_content.body,
"timestamp": new_content.timestamp.isoformat()
}
set_cache(cache_key, content_data)
return jsonify({"message": "Content added", "content": content_data}), 201
@app.route('/content/<int:content_id>', methods=['PUT'])
def update_content(content_id):
data = request.json
content = Content.query.get(content_id)
if not content:
return jsonify({"error": "Content not found"}), 404
content.title = data['title']
content.body = data['body']
db.session.commit()
# 更新緩存
cache_key = f"{CACHE_PREFIX}{content.id}"
content_data = {
"id": content.id,
"title": content.title,
"body": content.body,
"timestamp": content.timestamp.isoformat()
}
set_cache(cache_key, content_data)
return jsonify({"message": "Content updated", "content": content_data}), 200
@app.route('/content/<int:content_id>', methods=['DELETE'])
def delete_content(content_id):
content = Content.query.get(content_id)
if not content:
return jsonify({"error": "Content not found"}), 404
db.session.delete(content)
db.session.commit()
# 刪除緩存
cache_key = f"{CACHE_PREFIX}{content.id}"
delete_cache(cache_key)
return jsonify({"message": "Content deleted"}), 200
初始化數(shù)據(jù)庫(kù)
with app.app_context():
db.create_all()
啟動(dòng)應(yīng)用
if __name__ == "__main__":
app.run(debug=True)
運(yùn)行應(yīng)用
保存上述代碼為一個(gè)Python文件(例如app.py)煌茬,然后運(yùn)行它:
python app.py
打開命令終端,使用以下命令測(cè)試應(yīng)用:
-
添加內(nèi)容:
curl -X POST http://localhost:5000/content -H "Content-Type: application/json" -d '{"title": "Sample Title", "body": "Sample body content."}'
-
獲取內(nèi)容:
curl http://localhost:5000/content/1
-
更新內(nèi)容:
curl -X PUT http://localhost:5000/content/1 -H "Content-Type: application/json" -d '{"title": "Updated Title", "body": "Updated body content."}'
-
刪除內(nèi)容:
curl -X DELETE http://localhost:5000/content/1
代碼解釋
-
刪除緩存:在內(nèi)容刪除時(shí)彻桃,使用
delete_cache
函數(shù)從Redis中刪除對(duì)應(yīng)的緩存鍵坛善。 -
更新緩存:在內(nèi)容更新時(shí),使用
set_cache
函數(shù)更新Redis中的緩存數(shù)據(jù)邻眷。 - 添加緩存:在內(nèi)容添加時(shí)眠屎,立即將新的內(nèi)容緩存到Redis中。
2. 緩存層次
實(shí)現(xiàn)緩存層次可以提高系統(tǒng)性能肆饶,減少數(shù)據(jù)庫(kù)查詢次數(shù)改衩,進(jìn)一步優(yōu)化應(yīng)用程序的響應(yīng)時(shí)間。常見(jiàn)的緩存層次結(jié)構(gòu)包括內(nèi)存緩存(如memcached
或local cache
)和分布式緩存(如Redis
)抖拴。下面我們將展示如何在Flask應(yīng)用中實(shí)現(xiàn)這種緩存層次結(jié)構(gòu)燎字。
環(huán)境設(shè)置
確保你安裝了必要的Python庫(kù):
pip install flask redis cachetools sqlalchemy
示例代碼
我們將創(chuàng)建一個(gè)簡(jiǎn)單的Flask Web應(yīng)用,使用內(nèi)存緩存和Redis進(jìn)行緩存阿宅。
-
初始化Flask應(yīng)用候衍、數(shù)據(jù)庫(kù)和Redis客戶端:
from flask import Flask, request, jsonify from flask_sqlalchemy import SQLAlchemy import redis import json import time from cachetools import TTLCache app = Flask(__name__) # 配置Flask應(yīng)用的數(shù)據(jù)庫(kù) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///cms.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False db = SQLAlchemy(app) # 配置Redis客戶端 redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True) # 定義緩存前綴和超時(shí)時(shí)間 CACHE_PREFIX = "cms:" CACHE_TIMEOUT = 300 # 緩存時(shí)間,以秒為單位 # 內(nèi)存緩存 memory_cache = TTLCache(maxsize=100, ttl=CACHE_TIMEOUT)
定義數(shù)據(jù)庫(kù)模型:
class Content(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(100), nullable=False)
body = db.Column(db.Text, nullable=False)
timestamp = db.Column(db.DateTime, default=db.func.current_timestamp())
-
緩存輔助函數(shù):
def get_cache(key): # 嘗試從內(nèi)存緩存中獲取數(shù)據(jù) if key in memory_cache: return memory_cache[key] # 嘗試從Redis緩存中獲取數(shù)據(jù) cached_data = redis_client.get(key) if cached_data: data = json.loads(cached_data) # 將數(shù)據(jù)加載到內(nèi)存緩存 memory_cache[key] = data return data return None def set_cache(key, data): # 設(shè)置內(nèi)存緩存 memory_cache[key] = data # 設(shè)置Redis緩存 redis_client.setex(key, CACHE_TIMEOUT, json.dumps(data)) def delete_cache(key): # 刪除內(nèi)存緩存 if key in memory_cache: del memory_cache[key] # 刪除Redis緩存 redis_client.delete(key)
-
API路由:
@app.route('/content/<int:content_id>', methods=['GET']) def get_content(content_id): cache_key = f"{CACHE_PREFIX}{content_id}" # 嘗試從緩存中獲取數(shù)據(jù) cached_data = get_cache(cache_key) if cached_data: return jsonify(cached_data) # 如果緩存中沒(méi)有數(shù)據(jù)洒放,從數(shù)據(jù)庫(kù)中查詢 content = Content.query.get(content_id) if not content: return jsonify({"error": "Content not found"}), 404 content_data = { "id": content.id, "title": content.title, "body": content.body, "timestamp": content.timestamp.isoformat() } # 將查詢結(jié)果緩存到Redis和內(nèi)存緩存 set_cache(cache_key, content_data) return jsonify(content_data) @app.route('/content', methods=['POST']) def add_content(): data = request.json new_content = Content(title=data['title'], body=data['body']) db.session.add(new_content) db.session.commit() # 設(shè)置緩存 cache_key = f"{CACHE_PREFIX}{new_content.id}" content_data = { "id": new_content.id, "title": new_content.title, "body": new_content.body, "timestamp": new_content.timestamp.isoformat() } set_cache(cache_key, content_data) return jsonify({"message": "Content added", "content": content_data}), 201 @app.route('/content/<int:content_id>', methods=['PUT']) def update_content(content_id): data = request.json content = Content.query.get(content_id) if not content: return jsonify({"error": "Content not found"}), 404 content.title = data['title'] content.body = data['body'] db.session.commit() # 更新緩存 cache_key = f"{CACHE_PREFIX}{content.id}" content_data = { "id": content.id, "title": content.title, "body": content.body, "timestamp": content.timestamp.isoformat() } set_cache(cache_key, content_data) return jsonify({"message": "Content updated", "content": content_data}), 200 @app.route('/content/<int:content_id>', methods=['DELETE']) def delete_content(content_id): content = Content.query.get(content_id) if not content: return jsonify({"error": "Content not found"}), 404 db.session.delete(content) db.session.commit() # 刪除緩存 cache_key = f"{CACHE_PREFIX}{content.id}" delete_cache(cache_key) return jsonify({"message": "Content deleted"}), 200
-
初始化數(shù)據(jù)庫(kù):
with app.app_context(): db.create_all()
-
啟動(dòng)應(yīng)用:
if __name__ == "__main__": app.run(debug=True)
運(yùn)行應(yīng)用
保存上述代碼為一個(gè)Python文件(例如app.py)蛉鹿,然后運(yùn)行它:
python app.py
打開瀏覽器,使用以下命令測(cè)試應(yīng)用:
-
添加內(nèi)容:
curl -X POST http://localhost:5000/content -H "Content-Type: application/json" -d '{"title": "Sample Title", "body": "Sample body content."}'
-
獲取內(nèi)容:
curl http://localhost:5000/content/1
-
更新內(nèi)容:
curl -X PUT http://localhost:5000/content/1 -H "Content-Type: application/json" -d '{"title": "Updated Title", "body": "Updated body content."}'
-
刪除內(nèi)容:
curl -X DELETE http://localhost:5000/content/1
代碼解釋
內(nèi)存緩存(
TTLCache
):TTLCache
提供了一個(gè)具有時(shí)間限制的內(nèi)存緩存往湿。我們首先嘗試從內(nèi)存緩存中獲取數(shù)據(jù)妖异,如果未命中,再嘗試從Redis緩存中獲取领追。-
緩存輔助函數(shù)
:
-
get_cache
:從內(nèi)存緩存中獲取數(shù)據(jù)他膳,如果未命中,再嘗試從Redis緩存中獲取绒窑,并將數(shù)據(jù)加載到內(nèi)存緩存棕孙。 -
set_cache
:將數(shù)據(jù)設(shè)置到內(nèi)存緩存和Redis緩存。 -
delete_cache
:從內(nèi)存緩存和Redis緩存中刪除數(shù)據(jù)些膨。
-
API路由:包含獲取蟀俊、添加、更新和刪除內(nèi)容的API订雾,每個(gè)操作都包括相應(yīng)的緩存邏輯肢预。
3. 緩存預(yù)加載
緩存預(yù)加載是一種優(yōu)化策略,旨在在系統(tǒng)啟動(dòng)或特定時(shí)間點(diǎn)預(yù)加載熱點(diǎn)數(shù)據(jù)到緩存中洼哎,以減少首次訪問(wèn)的延遲烫映。我們可以在系統(tǒng)啟動(dòng)時(shí)沼本,或者通過(guò)定時(shí)任務(wù)定期預(yù)加載數(shù)據(jù)到緩存。
環(huán)境設(shè)置
確保你安裝了必要的Python庫(kù):
pip install flask redis sqlalchemy cachetools apscheduler
示例代碼
我們將創(chuàng)建一個(gè)簡(jiǎn)單的Flask Web應(yīng)用窑邦,包含內(nèi)存緩存擅威、Redis緩存和緩存預(yù)加載功能。
-
初始化Flask應(yīng)用冈钦、數(shù)據(jù)庫(kù)和Redis客戶端:
from flask import Flask, request, jsonify from flask_sqlalchemy import SQLAlchemy import redis import json from cachetools import TTLCache from apscheduler.schedulers.background import BackgroundScheduler app = Flask(__name__) # 配置Flask應(yīng)用的數(shù)據(jù)庫(kù) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///cms.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False db = SQLAlchemy(app) # 配置Redis客戶端 redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True) # 定義緩存前綴和超時(shí)時(shí)間 CACHE_PREFIX = "cms:" CACHE_TIMEOUT = 300 # 緩存時(shí)間,以秒為單位 # 內(nèi)存緩存 memory_cache = TTLCache(maxsize=100, ttl=CACHE_TIMEOUT)
-
定義數(shù)據(jù)庫(kù)模型:
class Content(db.Model): id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(100), nullable=False) body = db.Column(db.Text, nullable=False) timestamp = db.Column(db.DateTime, default=db.func.current_timestamp())
-
緩存輔助函數(shù):
def get_cache(key): # 嘗試從內(nèi)存緩存中獲取數(shù)據(jù) if key in memory_cache: return memory_cache[key] # 嘗試從Redis緩存中獲取數(shù)據(jù) cached_data = redis_client.get(key) if cached_data: data = json.loads(cached_data) # 將數(shù)據(jù)加載到內(nèi)存緩存 memory_cache[key] = data return data return None def set_cache(key, data): # 設(shè)置內(nèi)存緩存 memory_cache[key] = data # 設(shè)置Redis緩存 redis_client.setex(key, CACHE_TIMEOUT, json.dumps(data)) def delete_cache(key): # 刪除內(nèi)存緩存 if key in memory_cache: del memory_cache[key] # 刪除Redis緩存 redis_client.delete(key)
-
API路由:
@app.route('/content/<int:content_id>', methods=['GET']) def get_content(content_id): cache_key = f"{CACHE_PREFIX}{content_id}" # 嘗試從緩存中獲取數(shù)據(jù) cached_data = get_cache(cache_key) if cached_data: return jsonify(cached_data) # 如果緩存中沒(méi)有數(shù)據(jù)李请,從數(shù)據(jù)庫(kù)中查詢 content = Content.query.get(content_id) if not content: return jsonify({"error": "Content not found"}), 404 content_data = { "id": content.id, "title": content.title, "body": content.body, "timestamp": content.timestamp.isoformat() } # 將查詢結(jié)果緩存到Redis和內(nèi)存緩存 set_cache(cache_key, content_data) return jsonify(content_data) @app.route('/content', methods=['POST']) def add_content(): data = request.json new_content = Content(title=data['title'], body=data['body']) db.session.add(new_content) db.session.commit() # 設(shè)置緩存 cache_key = f"{CACHE_PREFIX}{new_content.id}" content_data = { "id": new_content.id, "title": new_content.title, "body": new_content.body, "timestamp": new_content.timestamp.isoformat() } set_cache(cache_key, content_data) return jsonify({"message": "Content added", "content": content_data}), 201 @app.route('/content/<int:content_id>', methods=['PUT']) def update_content(content_id): data = request.json content = Content.query.get(content_id) if not content: return jsonify({"error": "Content not found"}), 404 content.title = data['title'] content.body = data['body'] db.session.commit() # 更新緩存 cache_key = f"{CACHE_PREFIX}{content.id}" content_data = { "id": content.id, "title": content.title, "body": content.body, "timestamp": content.timestamp.isoformat() } set_cache(cache_key, content_data) return jsonify({"message": "Content updated", "content": content_data}), 200 @app.route('/content/<int:content_id>', methods=['DELETE']) def delete_content(content_id): content = Content.query.get(content_id) if not content: return jsonify({"error": "Content not found"}), 404 db.session.delete(content) db.session.commit() # 刪除緩存 cache_key = f"{CACHE_PREFIX}{content.id}" delete_cache(cache_key) return jsonify({"message": "Content deleted"}), 200
-
緩存預(yù)加載函數(shù):
def preload_cache(): print("Preloading cache...") contents = Content.query.all() for content in contents: cache_key = f"{CACHE_PREFIX}{content.id}" content_data = { "id": content.id, "title": content.title, "body": content.body, "timestamp": content.timestamp.isoformat() } set_cache(cache_key, content_data) print("Cache preloading completed.")
-
定時(shí)任務(wù)和初始化數(shù)據(jù)庫(kù):
from apscheduler.schedulers.background import BackgroundScheduler if __name__ == "__main__": # 初始化數(shù)據(jù)庫(kù) with app.app_context(): db.create_all() # 啟動(dòng)定時(shí)任務(wù) scheduler = BackgroundScheduler() scheduler.add_job(preload_cache, 'interval', minutes=10) # 每10分鐘預(yù)加載一次緩存 scheduler.start() # 預(yù)加載緩存 preload_cache() # 啟動(dòng)Flask應(yīng)用 app.run(debug=True)
運(yùn)行應(yīng)用
保存上述代碼為一個(gè)Python文件(例如app.py)瞧筛,然后運(yùn)行它:
python app.py
代碼解釋
-
內(nèi)存緩存(
TTLCache
):TTLCache
提供了一個(gè)具有時(shí)間限制的內(nèi)存緩存。我們首先嘗試從內(nèi)存緩存中獲取數(shù)據(jù)导盅,如果未命中较幌,再嘗試從Redis緩存中獲取。 - 緩存輔助函數(shù):
-
get_cache
:從內(nèi)存緩存中獲取數(shù)據(jù)白翻,如果未命中乍炉,再嘗試從Redis緩存中獲取,并將數(shù)據(jù)加載到內(nèi)存緩存滤馍。 -
set_cache
:將數(shù)據(jù)設(shè)置到內(nèi)存緩存和Redis緩存岛琼。 -
delete_cache
:從內(nèi)存緩存和Redis緩存中刪除數(shù)據(jù)。
-
- API路由:包含獲取巢株、添加槐瑞、更新和刪除內(nèi)容的API,每個(gè)操作都包括相應(yīng)的緩存邏輯阁苞。
- 緩存預(yù)加載:
-
preload_cache
:預(yù)加載緩存函數(shù)困檩,從數(shù)據(jù)庫(kù)中讀取所有內(nèi)容,并將其加載到內(nèi)存緩存和Redis緩存那槽。 -
定時(shí)任務(wù):使用
apscheduler
庫(kù)定期執(zhí)行緩存預(yù)加載任務(wù)悼沿。 - 啟動(dòng)時(shí)預(yù)加載:在應(yīng)用啟動(dòng)時(shí)立即執(zhí)行一次緩存預(yù)加載。
-
通過(guò)這些步驟骚灸,你可以實(shí)現(xiàn)一個(gè)多層次的緩存系統(tǒng)糟趾,包括內(nèi)存緩存和分布式緩存,并使用緩存預(yù)加載技術(shù)提高系統(tǒng)性能和響應(yīng)速度逢唤。
-
內(nèi)存緩存(