web項(xiàng)目中redis十大經(jīng)典使用場(chǎng)景

1. 用戶會(huì)話管理

環(huán)境設(shè)置

首先喘沿,確保你安裝了必要的Python庫(kù):

pip install flask redis

示例代碼

  1. 初始化Flask應(yīng)用和Redis客戶端

    from flask import Flask, session, redirect, url_for, request
    import redis
    import os
    
    app = Flask(__name__)
    
    # 配置Flask應(yīng)用的密鑰
    app.secret_key = os.urandom(24)
    
    # 配置Redis客戶端
    redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True)
    
    # Redis會(huì)話存儲(chǔ)的前綴
    SESSION_PREFIX = "session:"
    
  2. 設(shè)置會(huì)話管理的輔助函數(shù)

    def save_session_to_redis(session_id, session_data):
        redis_client.hmset(f"{SESSION_PREFIX}{session_id}", session_data)
    
    def load_session_from_redis(session_id):
        return redis_client.hgetall(f"{SESSION_PREFIX}{session_id}")
    
    def delete_session_from_redis(session_id):
        redis_client.delete(f"{SESSION_PREFIX}{session_id}")
    
  3. 定義登錄整慎、登出和主頁(yè)路由

    @app.route('/')
    def index():
        if 'username' in session:
            username = session['username']
            return f'Logged in as {username}'
        return 'You are not logged in'
    
    @app.route('/login', methods=['GET', 'POST'])
    def login():
        if request.method == 'POST':
            session_id = request.form['session_id']
            session['username'] = request.form['username']
            session_data = {'username': session['username']}
            save_session_to_redis(session_id, session_data)
            return redirect(url_for('index'))
        return '''
            <form method="post">
                Session ID: <input type="text" name="session_id"><br>
                Username: <input type="text" name="username"><br>
                <input type="submit" value="Login">
            </form>
        '''
    
    @app.route('/logout')
    def logout():
        session_id = request.args.get('session_id')
        session.pop('username', None)
        delete_session_from_redis(session_id)
        return redirect(url_for('index'))
    
  4. 在應(yīng)用啟動(dòng)時(shí)加載會(huì)話數(shù)據(jù)

    @app.before_request
    def load_user_session():
        session_id = request.args.get('session_id')
        if session_id:
            session_data = load_session_from_redis(session_id)
            if session_data:
                session.update(session_data)
    

    運(yùn)行應(yīng)用

    保存上述代碼為一個(gè)Python文件(例如app.py),然后運(yùn)行它:

    python app.py

    打開瀏覽器性宏,訪問(wèn) http://localhost:5000/,你可以看到登錄和登出的頁(yè)面,并且會(huì)話數(shù)據(jù)會(huì)存儲(chǔ)在Redis中七兜。

2. 消息隊(duì)列

環(huán)境設(shè)置

確保你安裝了必要的Python庫(kù):

pip install redis

示例代碼

我們將創(chuàng)建兩個(gè)腳本:一個(gè)生產(chǎn)者(Producer)腳本昌屉,用于將消息放入隊(duì)列钙蒙;一個(gè)消費(fèi)者(Consumer)腳本,用于從隊(duì)列中讀取并處理消息间驮。

  1. Producer腳本

    import redis
    
    # 配置Redis客戶端
    redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True)
    
    # 定義隊(duì)列名稱
    QUEUE_NAME = "message_queue"
    
    def send_message(message):
        # 將消息放入隊(duì)列
        redis_client.rpush(QUEUE_NAME, message)
        print(f"Message sent: {message}")
    
    if __name__ == "__main__":
        # 示例消息
        messages = ["Hello", "World", "Redis", "Queue"]
        for message in messages:
            send_message(message)
    
  2. Consumer腳本

    import redis
    import time
    
    # 配置Redis客戶端
    redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True)
    
    # 定義隊(duì)列名稱
    QUEUE_NAME = "message_queue"
    
    def process_message():
        while True:
            # 從隊(duì)列中獲取消息
            message = redis_client.blpop(QUEUE_NAME, timeout=0)
            if message:
                print(f"Message received: {message[1]}")
                # 模擬消息處理
                time.sleep(1)
    
    if __name__ == "__main__":
        print("Consumer is running...")
        process_message()
    

    運(yùn)行示例

    1. 啟動(dòng)Consumer腳本: 打開一個(gè)終端窗口躬厌,運(yùn)行消費(fèi)者腳本:

      python consumer.py

    2. 運(yùn)行Producer腳本

      打開另一個(gè)終端窗口,運(yùn)行生產(chǎn)者腳本:python producer.py

    你會(huì)看到Consumer腳本從Redis隊(duì)列中讀取消息并處理它們竞帽。

    消息處理邏輯

    • Producer腳本使用 rpush 方法將消息放入Redis列表的右端扛施。
    • Consumer腳本使用 blpop 方法從Redis列表的左端讀取消息。blpop 是一個(gè)阻塞操作屹篓,當(dāng)列表為空時(shí)會(huì)等待疙渣,直到有新的消息進(jìn)入。

3. 實(shí)時(shí)分析和統(tǒng)計(jì)

環(huán)境設(shè)置

確保你安裝了必要的Python庫(kù):

pip install flask redis

示例代碼

我們將創(chuàng)建一個(gè)簡(jiǎn)單的Flask Web應(yīng)用堆巧,用于統(tǒng)計(jì)和顯示網(wǎng)站訪問(wèn)量妄荔。

  1. 初始化Flask應(yīng)用和Redis客戶端

    from flask import Flask, request, jsonify
    import redis
    import time
    
    app = Flask(__name__)
    
    # 配置Redis客戶端
    redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True)
    
    # 定義統(tǒng)計(jì)數(shù)據(jù)存儲(chǔ)的前綴
    STATS_PREFIX = "stats:"
    
    # 獲取當(dāng)前日期(用于統(tǒng)計(jì)按天存儲(chǔ))
    def get_current_date():
        return time.strftime("%Y-%m-%d")
    
    # 增加訪問(wèn)量統(tǒng)計(jì)
    def increment_page_views(page):
        current_date = get_current_date()
        redis_client.hincrby(f"{STATS_PREFIX}{current_date}", page, 1)
    
    # 獲取某天的訪問(wèn)量統(tǒng)計(jì)
    def get_page_views(date):
        return redis_client.hgetall(f"{STATS_PREFIX}{date}")
    
    @app.route('/')
    def home():
        increment_page_views("home")
        return "Welcome to the homepage!"
    
    @app.route('/about')
    def about():
        increment_page_views("about")
        return "Welcome to the about page!"
    
    @app.route('/stats')
    def stats():
        date = request.args.get('date', get_current_date())
        stats = get_page_views(date)
        return jsonify(stats)
    
    if __name__ == "__main__":
        app.run(debug=True)
    

    運(yùn)行應(yīng)用

    保存上述代碼為一個(gè)Python文件(例如app.py),然后運(yùn)行它:

    python app.py

    打開瀏覽器恳邀,訪問(wèn)以下URL:

    1. 訪問(wèn)主頁(yè):http://localhost:5000/
    2. 訪問(wèn)關(guān)于頁(yè):http://localhost:5000/about
    3. 查看統(tǒng)計(jì)數(shù)據(jù):http://localhost:5000/stats

    代碼解釋

    • increment_page_views(page):增加指定頁(yè)面的訪問(wèn)量統(tǒng)計(jì)懦冰。使用Redis的 hincrby 命令將訪問(wèn)量存儲(chǔ)在哈希表中,以當(dāng)前日期為鍵谣沸,以頁(yè)面名稱為字段刷钢。
    • get_page_views(date):獲取指定日期的頁(yè)面訪問(wèn)量統(tǒng)計(jì)。使用Redis的 hgetall 命令讀取哈希表中的所有字段和值乳附。
    • /stats 路由:返回指定日期的訪問(wèn)量統(tǒng)計(jì)内地,默認(rèn)為當(dāng)前日期伴澄。

    實(shí)時(shí)統(tǒng)計(jì)功能

    通過(guò)上述示例,你可以實(shí)現(xiàn)實(shí)時(shí)的訪問(wèn)量統(tǒng)計(jì)和顯示阱缓。這對(duì)于需要高效處理大量數(shù)據(jù)的應(yīng)用程序來(lái)說(shuō)非常有用非凌,例如實(shí)時(shí)監(jiān)控、分析和報(bào)告荆针。

4. 分布式鎖

環(huán)境設(shè)置

確保你安裝了必要的Python庫(kù):

pip install redis

示例代碼

我們將創(chuàng)建一個(gè)簡(jiǎn)單的分布式鎖類敞嗡,并演示如何在多個(gè)進(jìn)程或線程中使用它。

  1. 分布式鎖類

    import redis
    import time
    import uuid
    
    class RedisDistributedLock:
        def __init__(self, redis_client, lock_key, timeout=10):
            self.redis_client = redis_client
            self.lock_key = lock_key
            self.timeout = timeout
            self.lock_id = str(uuid.uuid4())
    
        def acquire(self):
            while True:
                if self.redis_client.set(self.lock_key, self.lock_id, nx=True, ex=self.timeout):
                    return True
                time.sleep(0.01)
    
        def release(self):
            lock_value = self.redis_client.get(self.lock_key)
            if lock_value and lock_value == self.lock_id:
                self.redis_client.delete(self.lock_key)
    
        def __enter__(self):
            self.acquire()
    
        def __exit__(self, exc_type, exc_value, traceback):
            self.release()
    
  2. 示例使用

    import redis
    import threading
    
    # 配置Redis客戶端
    redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True)
    
    def critical_section(lock):
        with lock:
            print(f"{threading.current_thread().name} has acquired the lock")
            time.sleep(2)
            print(f"{threading.current_thread().name} is releasing the lock")
    
    if __name__ == "__main__":
        lock_key = "distributed_lock"
        lock = RedisDistributedLock(redis_client, lock_key, timeout=5)
    
        threads = []
        for i in range(5):
            thread = threading.Thread(target=critical_section, args=(lock,))
            threads.append(thread)
            thread.start()
    
        for thread in threads:
            thread.join()
    

    代碼解釋

    1. RedisDistributedLock類
      • __init__:初始化分布式鎖航背,設(shè)置Redis客戶端喉悴、鎖鍵、超時(shí)時(shí)間和唯一鎖ID玖媚。
      • acquire:嘗試獲取鎖箕肃。如果鎖已存在,則等待并重試今魔。使用 SET 命令的 NXEX 參數(shù)確保操作是原子的勺像。
      • release:釋放鎖。首先檢查當(dāng)前鎖的值是否與自己的鎖ID匹配错森,確保不會(huì)釋放其他客戶端的鎖吟宦。
      • __enter____exit__:實(shí)現(xiàn)上下文管理協(xié)議,支持使用 with 語(yǔ)句问词。
    2. 示例使用
      • critical_section:模擬臨界區(qū)代碼督函,在獲取鎖后打印信息并休眠2秒,然后釋放鎖激挪。
      • 多線程模擬:創(chuàng)建多個(gè)線程,每個(gè)線程嘗試進(jìn)入臨界區(qū)锋叨,使用分布式鎖確保同一時(shí)間只有一個(gè)線程進(jìn)入臨界區(qū)垄分。

    運(yùn)行示例

    保存上述代碼為一個(gè)Python文件(例如distributed_lock.py),然后運(yùn)行它:

    python distributed_lock.py

    你會(huì)看到線程按順序獲取和釋放鎖娃磺,確保在分布式環(huán)境中數(shù)據(jù)的一致性薄湿。

    結(jié)論

    通過(guò)這個(gè)示例,你可以理解如何使用Redis實(shí)現(xiàn)一個(gè)簡(jiǎn)單但有效的分布式鎖系統(tǒng)偷卧。這對(duì)于需要確保多個(gè)進(jìn)程或線程之間數(shù)據(jù)一致性的場(chǎng)景非常有用豺瘤,例如分布式任務(wù)調(diào)度、資源管理等听诸。

    擴(kuò)展

    self.redis_client.set(self.lock_key, self.lock_id, nx=True, ex=self.timeout) 這一行代碼的解釋如下:

    代碼背景

    這行代碼使用Redis的SET命令來(lái)嘗試設(shè)置一個(gè)分布式鎖坐求。我們通過(guò)設(shè)置特定的參數(shù)來(lái)確保操作的原子性和時(shí)效性。

    參數(shù)解釋

    • self.lock_key:這是鎖的鍵名(key)晌梨,表示我們?cè)赗edis中使用的鍵桥嗤。例如须妻,可以是 "distributed_lock"
    • self.lock_id:這是鎖的值(value)泛领,我們使用一個(gè)唯一的ID來(lái)表示持有鎖的客戶端荒吏。這可以防止誤釋放他人的鎖。通常使用 uuid.uuid4() 生成的唯一ID渊鞋。
    • nx=True:這是SET命令的NX選項(xiàng)绰更,表示“僅當(dāng)鍵不存在時(shí)設(shè)置鍵”。這確保了只有當(dāng)鎖鍵不存在時(shí)锡宋,才能成功設(shè)置鎖儡湾。
    • ex=self.timeout:這是SET命令的EX選項(xiàng),表示“設(shè)置鍵的過(guò)期時(shí)間(以秒為單位)”员辩。這確保了鎖在指定的時(shí)間后會(huì)自動(dòng)釋放盒粮,以防止死鎖。

    操作解釋

    1. 原子性
      • SET命令與NXEX選項(xiàng)一起使用時(shí)是原子的奠滑,這意味著整個(gè)操作在Redis服務(wù)器上是一次性完成的丹皱,不會(huì)被其他命令打斷。這對(duì)于分布式鎖至關(guān)重要宋税,因?yàn)槲覀冃枰_保在設(shè)置鎖的同時(shí)不會(huì)有其他客戶端同時(shí)成功設(shè)置相同的鎖摊崭。
    2. 確保唯一性
      • self.lock_id是一個(gè)唯一的ID(通常使用UUID生成),用于標(biāo)識(shí)持有鎖的客戶端杰赛。即使多個(gè)客戶端嘗試設(shè)置相同的鎖鍵呢簸,它們使用不同的鎖值,確保每個(gè)鎖的持有者是唯一的乏屯。
    3. 防止死鎖
      • EX選項(xiàng)設(shè)置鎖的過(guò)期時(shí)間根时,防止持有鎖的客戶端在崩潰或失去連接后一直持有鎖。這樣可以確保在指定的超時(shí)時(shí)間后鎖會(huì)自動(dòng)釋放辰晕,使其他客戶端有機(jī)會(huì)獲取鎖蛤迎。

    整體邏輯

    當(dāng)一個(gè)客戶端嘗試獲取鎖時(shí):

    • 它嘗試使用 SET 命令在Redis中設(shè)置一個(gè)鍵(lock_key),值為 lock_id含友。
    • 如果 lock_key 不存在(即鎖當(dāng)前未被占用)替裆,Redis會(huì)設(shè)置鍵和值,并返回True窘问。
    • 如果 lock_key 已存在(即鎖當(dāng)前已被占用)辆童,Redis不會(huì)設(shè)置鍵和值,并返回False惠赫。

    這樣把鉴,我們可以通過(guò)檢查 SET 命令的返回值來(lái)確定是否成功獲取了鎖。

    代碼片段解釋

    def acquire(self):
        while True:
            if self.redis_client.set(self.lock_key, self.lock_id, nx=True, ex=self.timeout):
                return True
            time.sleep(0.01)
    

    acquire 方法中汉形,代碼不斷嘗試獲取鎖:

    • 如果 SET 命令返回 True纸镊,表示成功獲取鎖倍阐,方法返回 True
    • 如果 SET 命令返回 False逗威,表示鎖已被占用峰搪,代碼等待0.01秒后重試。這種循環(huán)確笨瘢客戶端最終會(huì)成功獲取鎖(除非鎖永遠(yuǎn)無(wú)法釋放)概耻。

5. 緩存數(shù)據(jù)庫(kù)查詢結(jié)果

環(huán)境設(shè)置

確保你安裝了必要的Python庫(kù):

pip install flask redis sqlalchemy

假設(shè)我們使用SQLite作為數(shù)據(jù)庫(kù),你可以根據(jù)需要替換為其他數(shù)據(jù)庫(kù)罐呼。

示例代碼

  1. 設(shè)置Flask應(yīng)用和數(shù)據(jù)庫(kù)

    from flask import Flask, request, jsonify
    from flask_sqlalchemy import SQLAlchemy
    import redis
    import json
    import time
    
    app = Flask(__name__)
    
    # 配置Flask應(yīng)用的數(shù)據(jù)庫(kù)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///example.db'
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
    db = SQLAlchemy(app)
    
    # 配置Redis客戶端
    redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True)
    
    # 定義數(shù)據(jù)庫(kù)模型
    class User(db.Model):
        id = db.Column(db.Integer, primary_key=True)
        name = db.Column(db.String(80), unique=True, nullable=False)
        age = db.Column(db.Integer, nullable=False)
    
    # 初始化數(shù)據(jù)庫(kù)
    with app.app_context():
        db.create_all()
    
  2. 緩存輔助函數(shù)

    CACHE_TIMEOUT = 60  # 緩存超時(shí)時(shí)間鞠柄,以秒為單位
    
    def get_cache(key):
        cached_data = redis_client.get(key)
        if cached_data:
            return json.loads(cached_data)
        return None
    
    def set_cache(key, data):
        redis_client.setex(key, CACHE_TIMEOUT, json.dumps(data))
    
  3. 數(shù)據(jù)庫(kù)查詢和緩存的示例

    @app.route('/user/<int:user_id>', methods=['GET'])
    def get_user(user_id):
        cache_key = f"user:{user_id}"
        # 嘗試從緩存中獲取數(shù)據(jù)
        cached_data = get_cache(cache_key)
        if cached_data:
            return jsonify(cached_data)
    
        # 如果緩存中沒(méi)有數(shù)據(jù),從數(shù)據(jù)庫(kù)中查詢
        user = User.query.get(user_id)
        if not user:
            return jsonify({"error": "User not found"}), 404
    
        user_data = {"id": user.id, "name": user.name, "age": user.age}
        # 將查詢結(jié)果緩存到Redis
        set_cache(cache_key, user_data)
    
        return jsonify(user_data)
    
  4. 插入新用戶的示例

    @app.route('/user', methods=['POST'])
    def add_user():
        data = request.json
        new_user = User(name=data['name'], age=data['age'])
        db.session.add(new_user)
        db.session.commit()
    
        return jsonify({"message": "User added", "user": {"id": new_user.id, "name": new_user.name, "age": new_user.age}}), 201
    

    運(yùn)行應(yīng)用

    保存上述代碼為一個(gè)Python文件(例如app.py)嫉柴,然后運(yùn)行它:

    python app.py

    打開瀏覽器厌杜,訪問(wèn)以下URL:

    1. 添加新用戶:

      curl -X POST http://localhost:5000/user -H "Content-Type: application/json" -d '{"name": "John Doe", "age": 30}'

      你應(yīng)該看到類似這樣的響應(yīng):

      {
          "message": "User added",
          "user": {
              "id": 1,
              "name": "John Doe",
              "age": 30
          }
      }
      
    2. 獲取用戶信息:

      curl http://localhost:5000/user/1

      如果緩存有效,你應(yīng)該看到類似這樣的響應(yīng):

      {
          "id": 1,
          "name": "John Doe",
          "age": 30
      }
      

代碼解釋

  • get_cacheset_cache:這兩個(gè)函數(shù)用于從Redis緩存中獲取和設(shè)置數(shù)據(jù)计螺。set_cache 使用 setex 方法來(lái)設(shè)置帶有超時(shí)時(shí)間的緩存鳖粟。
  • get_user 路由:首先嘗試從緩存中獲取用戶數(shù)據(jù)跛蛋。如果緩存中沒(méi)有數(shù)據(jù)遏乔,則從數(shù)據(jù)庫(kù)中查詢囚枪,并將結(jié)果緩存到Redis。
  • add_user 路由:用于添加新用戶到數(shù)據(jù)庫(kù)陈轿。

通過(guò)這個(gè)示例圈纺,你可以理解如何使用Redis緩存數(shù)據(jù)庫(kù)查詢結(jié)果,以提高應(yīng)用程序的性能和響應(yīng)速度麦射。

6. 排行榜系統(tǒng)

環(huán)境設(shè)置

確保你安裝了必要的Python庫(kù):

pip install flask redis

示例代碼

我們將創(chuàng)建一個(gè)Flask Web應(yīng)用蛾娶,用于管理和顯示用戶的排行榜。

  1. 初始化Flask應(yīng)用和Redis客戶端

    from flask import Flask, request, jsonify
    import redis
    
    app = Flask(__name__)
    
    # 配置Redis客戶端
    redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True)
    
    # 定義排行榜鍵
    LEADERBOARD_KEY = "leaderboard"
    
  2. 增加用戶分?jǐn)?shù)和獲取排行榜的輔助函數(shù)

    def add_score(user, score):
        redis_client.zadd(LEADERBOARD_KEY, {user: score})
    
    def get_leaderboard(top_n):
        return redis_client.zrevrange(LEADERBOARD_KEY, 0, top_n-1, withscores=True)
    
  3. 增加用戶分?jǐn)?shù)和獲取排行榜的API路由

    @app.route('/add_score', methods=['POST'])
    def add_score_route():
        data = request.json
        user = data['user']
        score = data['score']
        add_score(user, score)
        return jsonify({"message": "Score added successfully"}), 201
    
    @app.route('/leaderboard', methods=['GET'])
    def leaderboard_route():
        top_n = int(request.args.get('top_n', 10))  # 默認(rèn)為前10名
        leaderboard = get_leaderboard(top_n)
        return jsonify(leaderboard)
    

運(yùn)行應(yīng)用

保存上述代碼為一個(gè)Python文件(例如app.py)潜秋,然后運(yùn)行它:

python app.py

打開瀏覽器茫叭,使用以下命令測(cè)試應(yīng)用:

  1. 增加用戶分?jǐn)?shù)

    curl -X POST http://localhost:5000/add_score -H "Content-Type: application/json" -d '{"user": "Alice", "score": 1500}'
    curl -X POST http://localhost:5000/add_score -H "Content-Type: application/json" -d '{"user": "Bob", "score": 2000}'
    curl -X POST http://localhost:5000/add_score -H "Content-Type: application/json" -d '{"user": "Charlie", "score": 1200}'
    
  2. 獲取排行榜

    curl http://localhost:5000/leaderboard?top_n=3

    你應(yīng)該看到類似這樣的響應(yīng):

    [
        ["Bob", 2000.0],
        ["Alice", 1500.0],
        ["Charlie", 1200.0]
    ]
    

    代碼解釋

    • add_score(user, score):使用 zadd 命令將用戶和分?jǐn)?shù)添加到Redis的Sorted Set中。Sorted Set中的元素根據(jù)分?jǐn)?shù)自動(dòng)排序半等。
    • get_leaderboard(top_n):使用 zrevrange 命令獲取排行榜的前top_n個(gè)用戶,按照分?jǐn)?shù)從高到低排序呐萨。
    • /add_score 路由:接受POST請(qǐng)求杀饵,將用戶和分?jǐn)?shù)添加到排行榜中。
    • /leaderboard 路由:接受GET請(qǐng)求谬擦,返回排行榜的前top_n個(gè)用戶切距。

    通過(guò)這個(gè)示例,你可以理解如何使用Redis的Sorted Set來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)單但高效的排行榜系統(tǒng)惨远。這對(duì)于需要實(shí)時(shí)顯示排名的應(yīng)用程序非常有用谜悟,例如游戲排行榜话肖、網(wǎng)站活躍度排行等。

7. 實(shí)時(shí)聊天應(yīng)用

環(huán)境設(shè)置

確保你安裝了必要的Python庫(kù):

pip install flask redis gevent

示例代碼

我們將創(chuàng)建一個(gè)Flask Web應(yīng)用葡幸,使用WebSocket來(lái)處理實(shí)時(shí)消息傳遞最筒。Redis將用于消息的發(fā)布和訂閱。

  1. 初始化Flask應(yīng)用和Redis客戶端

    from flask import Flask, render_template
    from flask_sockets import Sockets
    import redis
    import gevent
    from geventwebsocket.handler import WebSocketHandler
    from gevent.pywsgi import WSGIServer
    
    app = Flask(__name__)
    sockets = Sockets(app)
    
    # 配置Redis客戶端
    redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True)
    
    # 定義頻道
    CHANNEL = 'chat'
    
  2. WebSocket處理和Redis Pub/Sub

    class ChatBackend(object):
        def __init__(self):
            self.clients = []
            self.pubsub = redis_client.pubsub()
            self.pubsub.subscribe(CHANNEL)
    
        def send(self, client, message):
            try:
                client.send(message)
            except:
                self.clients.remove(client)
    
        def send_all(self, message):
            for client in self.clients:
                self.send(client, message)
    
        def run(self):
            for message in self.pubsub.listen():
                if message['type'] == 'message':
                    self.send_all(message['data'])
    
        def start(self):
            gevent.spawn(self.run)
    
    chat_backend = ChatBackend()
    chat_backend.start()
    
  3. WebSocket路由

    @sockets.route('/chat')
    def chat(ws):
        chat_backend.clients.append(ws)
        while not ws.closed:
            message = ws.receive()
            if message:
                redis_client.publish(CHANNEL, message)
    
  4. HTML模板(保存為templates/chat.html):

    <!DOCTYPE html>
    <html>
    <head>
        <title>Chat</title>
    </head>
    <body>
        <h1>Chat Room</h1>
        <ul id="messages"></ul>
        <input id="message" autocomplete="off"><button onclick="sendMessage()">Send</button>
        <script>
            var ws = new WebSocket("ws://" + window.location.host + "/chat");
    
            ws.onmessage = function(event) {
                var messages = document.getElementById('messages');
                var message = document.createElement('li');
                message.textContent = event.data;
                messages.appendChild(message);
            };
    
            function sendMessage() {
                var input = document.getElementById("message");
                ws.send(input.value);
                input.value = '';
            }
        </script>
    </body>
    </html>
    
  5. 主頁(yè)路由

    @app.route('/')
    def index():
        return render_template('chat.html')
    
  6. 運(yùn)行應(yīng)用

    if __name__ == "__main__":
        http_server = WSGIServer(("0.0.0.0", 5000), app, handler_class=WebSocketHandler)
        http_server.serve_forever()
    

運(yùn)行應(yīng)用

保存上述代碼為一個(gè)Python文件(例如app.py)蔚叨,然后運(yùn)行它:

python app.py

打開瀏覽器床蜘,訪問(wèn) http://localhost:5000/,你會(huì)看到一個(gè)簡(jiǎn)單的聊天界面蔑水。

代碼解釋

  • Redis Pub/SubChatBackend類使用Redis的發(fā)布/訂閱功能來(lái)監(jiān)聽聊天消息邢锯。當(dāng)有新的消息發(fā)布到頻道時(shí),所有連接的客戶端都會(huì)收到這條消息搀别。
  • WebSocket處理/chat路由處理WebSocket連接丹擎。每當(dāng)一個(gè)新客戶端連接時(shí),它被添加到客戶端列表中歇父。當(dāng)有新的消息時(shí)蒂培,這些消息會(huì)被發(fā)送到Redis頻道,其他客戶端會(huì)通過(guò)訂閱該頻道收到消息庶骄。
  • HTML模板:一個(gè)簡(jiǎn)單的HTML頁(yè)面毁渗,使用JavaScript來(lái)處理WebSocket連接和消息發(fā)送。

通過(guò)這個(gè)示例单刁,你可以理解如何使用Redis的Pub/Sub功能和Flask的WebSocket來(lái)構(gòu)建一個(gè)實(shí)時(shí)聊天應(yīng)用灸异。這對(duì)于需要實(shí)時(shí)通信的應(yīng)用程序非常有用,例如聊天室羔飞、實(shí)時(shí)通知系統(tǒng)等肺樟。

8. 頻率限制

環(huán)境設(shè)置

確保你安裝了必要的Python庫(kù):

pip install flask redis

示例代碼

我們將創(chuàng)建一個(gè)簡(jiǎn)單的Flask Web應(yīng)用,用于演示如何使用Redis進(jìn)行頻率限制逻淌。

  1. 初始化Flask應(yīng)用和Redis客戶端

    from flask import Flask, request, jsonify
    import redis
    import time
    
    app = Flask(__name__)
    
    # 配置Redis客戶端
    redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True)
    
    # 頻率限制設(shè)置
    RATE_LIMIT = 5          # 每分鐘最多請(qǐng)求次數(shù)
    RATE_LIMIT_WINDOW = 60  # 窗口時(shí)間么伯,以秒為單位
    
  2. 頻率限制輔助函數(shù)

    def is_rate_limited(ip):
        current_time = int(time.time())
        key = f"rate_limit:{ip}"
        request_count = redis_client.get(key)
        
        if request_count is None:
            redis_client.set(key, 1, ex=RATE_LIMIT_WINDOW)
            return False
        elif int(request_count) < RATE_LIMIT:
            redis_client.incr(key)
            return False
        else:
            return True
    
  3. 應(yīng)用頻率限制的API路由

    @app.route('/limited')
    def limited():
        client_ip = request.remote_addr
        if is_rate_limited(client_ip):
            return jsonify({"error": "Too many requests, please try again later."}), 429
        return jsonify({"message": "Request successful"}), 200
    
  4. 啟動(dòng)應(yīng)用

    if __name__ == "__main__":
        app.run(debug=True)
    

運(yùn)行應(yīng)用

保存上述代碼為一個(gè)Python文件(例如app.py),然后運(yùn)行它:

python app.py

打開瀏覽器卡儒,訪問(wèn) http://localhost:5000/limited田柔。

代碼解釋

  • is_rate_limited(ip):該函數(shù)檢查給定IP地址的請(qǐng)求頻率。如果在限制的時(shí)間窗口內(nèi)請(qǐng)求次數(shù)超過(guò)設(shè)定值骨望,則返回True硬爆,否則返回False
    • 使用Redis的 getset 方法來(lái)存儲(chǔ)和更新每個(gè)IP地址的請(qǐng)求計(jì)數(shù)擎鸠。
    • 當(dāng)請(qǐng)求計(jì)數(shù)不存在時(shí)缀磕,設(shè)置計(jì)數(shù)為1,并設(shè)置過(guò)期時(shí)間。
    • 當(dāng)請(qǐng)求計(jì)數(shù)存在且小于限制值時(shí)袜蚕,遞增計(jì)數(shù)糟把。
    • 當(dāng)請(qǐng)求計(jì)數(shù)超過(guò)限制值時(shí),返回True牲剃,表示頻率限制已觸發(fā)遣疯。
  • /limited 路由:處理受頻率限制保護(hù)的請(qǐng)求。它檢查客戶端的IP地址是否被限制颠黎,如果是另锋,則返回429狀態(tài)碼,否則返回成功消息狭归。

測(cè)試頻率限制

  1. 初次請(qǐng)求:訪問(wèn) http://localhost:5000/limited夭坪,應(yīng)該返回 {"message": "Request successful"}
  2. 超過(guò)限制的請(qǐng)求:快速連續(xù)訪問(wèn)該URL超過(guò)設(shè)定的限制次數(shù)(在這個(gè)例子中是5次)过椎,應(yīng)該返回 {"error": "Too many requests, please try again later."}室梅,狀態(tài)碼為429。

進(jìn)一步優(yōu)化

  • 按用戶頻率限制:如果有用戶登錄系統(tǒng)疚宇,可以按用戶ID進(jìn)行頻率限制亡鼠,而不是IP地址。
  • 動(dòng)態(tài)頻率限制:可以根據(jù)用戶的角色或訂閱級(jí)別動(dòng)態(tài)調(diào)整頻率限制參數(shù)敷待。

9. 地理位置服務(wù)

環(huán)境設(shè)置

確保你安裝了必要的Python庫(kù):

pip install flask redis

示例代碼

我們將創(chuàng)建一個(gè)簡(jiǎn)單的Flask Web應(yīng)用间涵,用于管理和查詢用戶的地理位置。

  1. 初始化Flask應(yīng)用和Redis客戶端

    from flask import Flask, request, jsonify
    import redis
    
    app = Flask(__name__)
    
    # 配置Redis客戶端
    redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True)
    
    # 定義Geo數(shù)據(jù)類型的鍵
    GEO_KEY = "users:locations"
    
  2. 添加用戶地理位置的API

    @app.route('/add_location', methods=['POST'])
    def add_location():
        data = request.json
        user_id = data['user_id']
        longitude = data['longitude']
        latitude = data['latitude']
        
        redis_client.geoadd(GEO_KEY, (longitude, latitude, user_id))
        return jsonify({"message": "Location added successfully"}), 201
    
  3. 查找附近用戶的API

    @app.route('/nearby', methods=['GET'])
    def nearby():
        longitude = float(request.args.get('longitude'))
        latitude = float(request.args.get('latitude'))
        radius = float(request.args.get('radius', 10))  # 默認(rèn)查找10公里內(nèi)的用戶
        unit = request.args.get('unit', 'km')  # 默認(rèn)單位是公里
        
        nearby_users = redis_client.georadius(GEO_KEY, longitude, latitude, radius, unit, withdist=True)
        results = [{"user_id": user[0], "distance": user[1]} for user in nearby_users]
        
        return jsonify(results), 200
    
  4. 啟動(dòng)應(yīng)用

    if __name__ == "__main__":
        app.run(debug=True)
    

運(yùn)行應(yīng)用

保存上述代碼為一個(gè)Python文件(例如app.py)榜揖,然后運(yùn)行它:

python app.py

打開瀏覽器勾哩,使用以下命令測(cè)試應(yīng)用:

  1. 添加用戶地理位置

    curl -X POST http://localhost:5000/add_location -H "Content-Type: application/json" -d '{"user_id": "user1", "longitude": 13.361389, "latitude": 38.115556}'
    curl -X POST http://localhost:5000/add_location -H "Content-Type: application/json" -d '{"user_id": "user2", "longitude": 15.087269, "latitude": 37.502669}'
    
  2. 查找附近用戶

    curl http://localhost:5000/nearby?longitude=13.361389&latitude=38.115556&radius=200&unit=km

    你應(yīng)該看到類似這樣的響應(yīng):

    [
        {"user_id": "user1", "distance": 0.0},
        {"user_id": "user2", "distance": 190.4424}
    ]
    

代碼解釋

  • geoadd 命令:將用戶的地理位置(經(jīng)度、緯度)添加到Redis中举哟。GEO_KEY 是用來(lái)存儲(chǔ)用戶位置數(shù)據(jù)的鍵思劳。
  • georadius 命令:查找指定位置(經(jīng)度、緯度)附近一定范圍內(nèi)的用戶妨猩。返回的結(jié)果包含用戶ID和距離潜叛。

API說(shuō)明

  • /add_location:POST請(qǐng)求,接受JSON格式的用戶ID壶硅、經(jīng)度和緯度數(shù)據(jù)威兜,并將其存儲(chǔ)到Redis中。
  • /nearby:GET請(qǐng)求庐椒,接受查詢位置的經(jīng)度牡属、緯度、搜索半徑和單位(可選扼睬,默認(rèn)為公里),返回附近的用戶及其距離。

進(jìn)一步優(yōu)化

  • 按用戶ID查找位置:可以添加按用戶ID查找位置的API窗宇。
  • 動(dòng)態(tài)調(diào)整搜索半徑和單位:允許客戶端動(dòng)態(tài)調(diào)整搜索半徑和單位措伐。
  • 批量添加位置:可以實(shí)現(xiàn)批量添加用戶位置的功能,以提高性能军俊。

10. 內(nèi)容管理系統(tǒng)(CMS)緩存

環(huán)境設(shè)置

確保你安裝了必要的Python庫(kù):

pip install flask redis sqlalchemy

假設(shè)我們使用SQLite作為數(shù)據(jù)庫(kù)侥加,你可以根據(jù)需要替換為其他數(shù)據(jù)庫(kù)。

示例代碼

我們將創(chuàng)建一個(gè)簡(jiǎn)單的Flask Web應(yīng)用粪躬,用于管理和緩存內(nèi)容担败。

  1. 初始化Flask應(yīng)用、數(shù)據(jù)庫(kù)和Redis客戶端

    from flask import Flask, request, jsonify
    from flask_sqlalchemy import SQLAlchemy
    import redis
    import json
    import time
    
    app = Flask(__name__)
    
    # 配置Flask應(yīng)用的數(shù)據(jù)庫(kù)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///cms.db'
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
    db = SQLAlchemy(app)
    
    # 配置Redis客戶端
    redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True)
    
    # 定義緩存前綴和超時(shí)時(shí)間
    CACHE_PREFIX = "cms:"
    CACHE_TIMEOUT = 300  # 緩存時(shí)間镰官,以秒為單位
    
  2. 定義數(shù)據(jù)庫(kù)模型

    class Content(db.Model):
        id = db.Column(db.Integer, primary_key=True)
        title = db.Column(db.String(100), nullable=False)
        body = db.Column(db.Text, nullable=False)
        timestamp = db.Column(db.DateTime, default=db.func.current_timestamp())
    
  3. 緩存輔助函數(shù)

    def get_cache(key):
        cached_data = redis_client.get(key)
        if cached_data:
            return json.loads(cached_data)
        return None
    
    def set_cache(key, data):
        redis_client.setex(key, CACHE_TIMEOUT, json.dumps(data))
    
  4. API路由

    @app.route('/content/<int:content_id>', methods=['GET'])
    def get_content(content_id):
        cache_key = f"{CACHE_PREFIX}{content_id}"
        
        # 嘗試從緩存中獲取數(shù)據(jù)
        cached_data = get_cache(cache_key)
        if cached_data:
            return jsonify(cached_data)
        
        # 如果緩存中沒(méi)有數(shù)據(jù)提前,從數(shù)據(jù)庫(kù)中查詢
        content = Content.query.get(content_id)
        if not content:
            return jsonify({"error": "Content not found"}), 404
        
        content_data = {
            "id": content.id,
            "title": content.title,
            "body": content.body,
            "timestamp": content.timestamp.isoformat()
        }
        
        # 將查詢結(jié)果緩存到Redis
        set_cache(cache_key, content_data)
        
        return jsonify(content_data)
    
    @app.route('/content', methods=['POST'])
    def add_content():
        data = request.json
        new_content = Content(title=data['title'], body=data['body'])
        db.session.add(new_content)
        db.session.commit()
        
        return jsonify({"message": "Content added", "content": {
            "id": new_content.id,
            "title": new_content.title,
            "body": new_content.body,
            "timestamp": new_content.timestamp.isoformat()
        }}), 201
    
  5. 初始化數(shù)據(jù)庫(kù)

    with app.app_context():
        db.create_all()
    
  6. 啟動(dòng)應(yīng)用

    if __name__ == "__main__":
        app.run(debug=True)
    

運(yùn)行應(yīng)用

保存上述代碼為一個(gè)Python文件(例如app.py),然后運(yùn)行它:

python app.py

打開瀏覽器泳唠,使用以下命令測(cè)試應(yīng)用:

  1. 添加內(nèi)容

    curl -X POST http://localhost:5000/content -H "Content-Type: application/json" -d '{"title": "Sample Title", "body": "Sample body content."}'

  2. 獲取內(nèi)容

    curl http://localhost:5000/content/1

代碼解釋

  • get_cache(key)set_cache(key, data):這兩個(gè)函數(shù)用于從Redis緩存中獲取和設(shè)置數(shù)據(jù)狈网。set_cache 使用 setex 方法來(lái)設(shè)置帶有超時(shí)時(shí)間的緩存。
  • get_content(content_id) 路由:首先嘗試從緩存中獲取內(nèi)容數(shù)據(jù)笨腥。如果緩存中沒(méi)有數(shù)據(jù)拓哺,則從數(shù)據(jù)庫(kù)中查詢,并將結(jié)果緩存到Redis脖母。
  • add_content 路由:用于添加新內(nèi)容到數(shù)據(jù)庫(kù)士鸥。

進(jìn)一步優(yōu)化

  • 緩存失效:當(dāng)內(nèi)容更新或刪除時(shí),確保緩存同步更新或失效谆级,以避免緩存不一致的問(wèn)題烤礁。
  • 緩存層次:可以實(shí)現(xiàn)多級(jí)緩存,例如先在內(nèi)存中緩存哨苛,然后在Redis中緩存鸽凶,進(jìn)一步提高性能。
  • 緩存預(yù)加載:在系統(tǒng)啟動(dòng)或特定時(shí)間點(diǎn)預(yù)加載熱點(diǎn)內(nèi)容到緩存建峭,以提高訪問(wèn)速度玻侥。

緩存擴(kuò)展內(nèi)容

1. 緩存失效

緩存失效是一個(gè)重要的問(wèn)題,特別是在內(nèi)容更新或刪除時(shí)亿蒸,確保緩存與數(shù)據(jù)庫(kù)的數(shù)據(jù)一致是關(guān)鍵凑兰。我們可以通過(guò)以下方法來(lái)解決緩存失效問(wèn)題:

  1. 刪除緩存:在內(nèi)容更新或刪除時(shí),直接刪除對(duì)應(yīng)的緩存鍵边锁。
  2. 更新緩存:在內(nèi)容更新時(shí)姑食,更新緩存中的數(shù)據(jù)。
  3. 使用緩存失效策略:如設(shè)置合理的緩存過(guò)期時(shí)間茅坛。

修改后的代碼

讓我們修改之前的代碼音半,添加處理緩存失效的邏輯则拷。

初始化Flask應(yīng)用和Redis客戶端

from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
import redis
import json
import time

app = Flask(__name__)

# 配置Flask應(yīng)用的數(shù)據(jù)庫(kù)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///cms.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)

# 配置Redis客戶端
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True)

# 定義緩存前綴和超時(shí)時(shí)間
CACHE_PREFIX = "cms:"
CACHE_TIMEOUT = 300  # 緩存時(shí)間,以秒為單位

定義數(shù)據(jù)庫(kù)模型

class Content(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(100), nullable=False)
    body = db.Column(db.Text, nullable=False)
    timestamp = db.Column(db.DateTime, default=db.func.current_timestamp())

緩存輔助函數(shù)

def get_cache(key):
    cached_data = redis_client.get(key)
    if cached_data:
        return json.loads(cached_data)
    return None

def set_cache(key, data):
    redis_client.setex(key, CACHE_TIMEOUT, json.dumps(data))

def delete_cache(key):
    redis_client.delete(key)

API路由

@app.route('/content/<int:content_id>', methods=['GET'])
def get_content(content_id):
    cache_key = f"{CACHE_PREFIX}{content_id}"
    
    # 嘗試從緩存中獲取數(shù)據(jù)
    cached_data = get_cache(cache_key)
    if cached_data:
        return jsonify(cached_data)
    
    # 如果緩存中沒(méi)有數(shù)據(jù)曹鸠,從數(shù)據(jù)庫(kù)中查詢
    content = Content.query.get(content_id)
    if not content:
        return jsonify({"error": "Content not found"}), 404
    
    content_data = {
        "id": content.id,
        "title": content.title,
        "body": content.body,
        "timestamp": content.timestamp.isoformat()
    }
    
    # 將查詢結(jié)果緩存到Redis
    set_cache(cache_key, content_data)
    
    return jsonify(content_data)

@app.route('/content', methods=['POST'])
def add_content():
    data = request.json
    new_content = Content(title=data['title'], body=data['body'])
    db.session.add(new_content)
    db.session.commit()
    
    # 設(shè)置緩存
    cache_key = f"{CACHE_PREFIX}{new_content.id}"
    content_data = {
        "id": new_content.id,
        "title": new_content.title,
        "body": new_content.body,
        "timestamp": new_content.timestamp.isoformat()
    }
    set_cache(cache_key, content_data)
    
    return jsonify({"message": "Content added", "content": content_data}), 201

@app.route('/content/<int:content_id>', methods=['PUT'])
def update_content(content_id):
    data = request.json
    content = Content.query.get(content_id)
    if not content:
        return jsonify({"error": "Content not found"}), 404
    
    content.title = data['title']
    content.body = data['body']
    db.session.commit()
    
    # 更新緩存
    cache_key = f"{CACHE_PREFIX}{content.id}"
    content_data = {
        "id": content.id,
        "title": content.title,
        "body": content.body,
        "timestamp": content.timestamp.isoformat()
    }
    set_cache(cache_key, content_data)
    
    return jsonify({"message": "Content updated", "content": content_data}), 200

@app.route('/content/<int:content_id>', methods=['DELETE'])
def delete_content(content_id):
    content = Content.query.get(content_id)
    if not content:
        return jsonify({"error": "Content not found"}), 404
    
    db.session.delete(content)
    db.session.commit()
    
    # 刪除緩存
    cache_key = f"{CACHE_PREFIX}{content.id}"
    delete_cache(cache_key)
    
    return jsonify({"message": "Content deleted"}), 200

初始化數(shù)據(jù)庫(kù)

with app.app_context():
    db.create_all()

啟動(dòng)應(yīng)用

if __name__ == "__main__":
    app.run(debug=True)

運(yùn)行應(yīng)用

保存上述代碼為一個(gè)Python文件(例如app.py)煌茬,然后運(yùn)行它:

python app.py

打開命令終端,使用以下命令測(cè)試應(yīng)用:

  1. 添加內(nèi)容

    curl -X POST http://localhost:5000/content -H "Content-Type: application/json" -d '{"title": "Sample Title", "body": "Sample body content."}'

  2. 獲取內(nèi)容

    curl http://localhost:5000/content/1

  3. 更新內(nèi)容

    curl -X PUT http://localhost:5000/content/1 -H "Content-Type: application/json" -d '{"title": "Updated Title", "body": "Updated body content."}'

  4. 刪除內(nèi)容

    curl -X DELETE http://localhost:5000/content/1

代碼解釋

  • 刪除緩存:在內(nèi)容刪除時(shí)彻桃,使用 delete_cache 函數(shù)從Redis中刪除對(duì)應(yīng)的緩存鍵坛善。
  • 更新緩存:在內(nèi)容更新時(shí),使用 set_cache 函數(shù)更新Redis中的緩存數(shù)據(jù)邻眷。
  • 添加緩存:在內(nèi)容添加時(shí)眠屎,立即將新的內(nèi)容緩存到Redis中。

2. 緩存層次

實(shí)現(xiàn)緩存層次可以提高系統(tǒng)性能肆饶,減少數(shù)據(jù)庫(kù)查詢次數(shù)改衩,進(jìn)一步優(yōu)化應(yīng)用程序的響應(yīng)時(shí)間。常見(jiàn)的緩存層次結(jié)構(gòu)包括內(nèi)存緩存(如memcachedlocal cache)和分布式緩存(如Redis)抖拴。下面我們將展示如何在Flask應(yīng)用中實(shí)現(xiàn)這種緩存層次結(jié)構(gòu)燎字。

環(huán)境設(shè)置

確保你安裝了必要的Python庫(kù):

pip install flask redis cachetools sqlalchemy

示例代碼

我們將創(chuàng)建一個(gè)簡(jiǎn)單的Flask Web應(yīng)用,使用內(nèi)存緩存和Redis進(jìn)行緩存阿宅。

  1. 初始化Flask應(yīng)用候衍、數(shù)據(jù)庫(kù)和Redis客戶端

    from flask import Flask, request, jsonify
    from flask_sqlalchemy import SQLAlchemy
    import redis
    import json
    import time
    from cachetools import TTLCache
    
    app = Flask(__name__)
    
    # 配置Flask應(yīng)用的數(shù)據(jù)庫(kù)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///cms.db'
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
    db = SQLAlchemy(app)
    
    # 配置Redis客戶端
    redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True)
    
    # 定義緩存前綴和超時(shí)時(shí)間
    CACHE_PREFIX = "cms:"
    CACHE_TIMEOUT = 300  # 緩存時(shí)間,以秒為單位
    
    # 內(nèi)存緩存
    memory_cache = TTLCache(maxsize=100, ttl=CACHE_TIMEOUT)
    
  2. 定義數(shù)據(jù)庫(kù)模型:

class Content(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(100), nullable=False)
    body = db.Column(db.Text, nullable=False)
    timestamp = db.Column(db.DateTime, default=db.func.current_timestamp())
  1. 緩存輔助函數(shù)

    def get_cache(key):
        # 嘗試從內(nèi)存緩存中獲取數(shù)據(jù)
        if key in memory_cache:
            return memory_cache[key]
        
        # 嘗試從Redis緩存中獲取數(shù)據(jù)
        cached_data = redis_client.get(key)
        if cached_data:
            data = json.loads(cached_data)
            # 將數(shù)據(jù)加載到內(nèi)存緩存
            memory_cache[key] = data
            return data
        return None
    
    def set_cache(key, data):
        # 設(shè)置內(nèi)存緩存
        memory_cache[key] = data
        # 設(shè)置Redis緩存
        redis_client.setex(key, CACHE_TIMEOUT, json.dumps(data))
    
    def delete_cache(key):
        # 刪除內(nèi)存緩存
        if key in memory_cache:
            del memory_cache[key]
        # 刪除Redis緩存
        redis_client.delete(key)
    
  2. API路由

    @app.route('/content/<int:content_id>', methods=['GET'])
    def get_content(content_id):
        cache_key = f"{CACHE_PREFIX}{content_id}"
        
        # 嘗試從緩存中獲取數(shù)據(jù)
        cached_data = get_cache(cache_key)
        if cached_data:
            return jsonify(cached_data)
        
        # 如果緩存中沒(méi)有數(shù)據(jù)洒放,從數(shù)據(jù)庫(kù)中查詢
        content = Content.query.get(content_id)
        if not content:
            return jsonify({"error": "Content not found"}), 404
        
        content_data = {
            "id": content.id,
            "title": content.title,
            "body": content.body,
            "timestamp": content.timestamp.isoformat()
        }
        
        # 將查詢結(jié)果緩存到Redis和內(nèi)存緩存
        set_cache(cache_key, content_data)
        
        return jsonify(content_data)
    
    @app.route('/content', methods=['POST'])
    def add_content():
        data = request.json
        new_content = Content(title=data['title'], body=data['body'])
        db.session.add(new_content)
        db.session.commit()
        
        # 設(shè)置緩存
        cache_key = f"{CACHE_PREFIX}{new_content.id}"
        content_data = {
            "id": new_content.id,
            "title": new_content.title,
            "body": new_content.body,
            "timestamp": new_content.timestamp.isoformat()
        }
        set_cache(cache_key, content_data)
        
        return jsonify({"message": "Content added", "content": content_data}), 201
    
    @app.route('/content/<int:content_id>', methods=['PUT'])
    def update_content(content_id):
        data = request.json
        content = Content.query.get(content_id)
        if not content:
            return jsonify({"error": "Content not found"}), 404
        
        content.title = data['title']
        content.body = data['body']
        db.session.commit()
        
        # 更新緩存
        cache_key = f"{CACHE_PREFIX}{content.id}"
        content_data = {
            "id": content.id,
            "title": content.title,
            "body": content.body,
            "timestamp": content.timestamp.isoformat()
        }
        set_cache(cache_key, content_data)
        
        return jsonify({"message": "Content updated", "content": content_data}), 200
    
    @app.route('/content/<int:content_id>', methods=['DELETE'])
    def delete_content(content_id):
        content = Content.query.get(content_id)
        if not content:
            return jsonify({"error": "Content not found"}), 404
        
        db.session.delete(content)
        db.session.commit()
        
        # 刪除緩存
        cache_key = f"{CACHE_PREFIX}{content.id}"
        delete_cache(cache_key)
        
        return jsonify({"message": "Content deleted"}), 200
    
  3. 初始化數(shù)據(jù)庫(kù)

    with app.app_context():
        db.create_all()
    
  4. 啟動(dòng)應(yīng)用

    if __name__ == "__main__":
        app.run(debug=True)
    

運(yùn)行應(yīng)用

保存上述代碼為一個(gè)Python文件(例如app.py)蛉鹿,然后運(yùn)行它:

python app.py

打開瀏覽器,使用以下命令測(cè)試應(yīng)用:

  1. 添加內(nèi)容

    curl -X POST http://localhost:5000/content -H "Content-Type: application/json" -d '{"title": "Sample Title", "body": "Sample body content."}'

  2. 獲取內(nèi)容

    curl http://localhost:5000/content/1

  3. 更新內(nèi)容

    curl -X PUT http://localhost:5000/content/1 -H "Content-Type: application/json" -d '{"title": "Updated Title", "body": "Updated body content."}'

  4. 刪除內(nèi)容

    curl -X DELETE http://localhost:5000/content/1

代碼解釋

  • 內(nèi)存緩存(TTLCacheTTLCache 提供了一個(gè)具有時(shí)間限制的內(nèi)存緩存往湿。我們首先嘗試從內(nèi)存緩存中獲取數(shù)據(jù)妖异,如果未命中,再嘗試從Redis緩存中獲取领追。

  • 緩存輔助函數(shù)

    • get_cache:從內(nèi)存緩存中獲取數(shù)據(jù)他膳,如果未命中,再嘗試從Redis緩存中獲取绒窑,并將數(shù)據(jù)加載到內(nèi)存緩存棕孙。
    • set_cache:將數(shù)據(jù)設(shè)置到內(nèi)存緩存和Redis緩存。
    • delete_cache:從內(nèi)存緩存和Redis緩存中刪除數(shù)據(jù)些膨。
  • API路由:包含獲取蟀俊、添加、更新和刪除內(nèi)容的API订雾,每個(gè)操作都包括相應(yīng)的緩存邏輯肢预。

3. 緩存預(yù)加載

緩存預(yù)加載是一種優(yōu)化策略,旨在在系統(tǒng)啟動(dòng)或特定時(shí)間點(diǎn)預(yù)加載熱點(diǎn)數(shù)據(jù)到緩存中洼哎,以減少首次訪問(wèn)的延遲烫映。我們可以在系統(tǒng)啟動(dòng)時(shí)沼本,或者通過(guò)定時(shí)任務(wù)定期預(yù)加載數(shù)據(jù)到緩存。

環(huán)境設(shè)置

確保你安裝了必要的Python庫(kù):

pip install flask redis sqlalchemy cachetools apscheduler

示例代碼

我們將創(chuàng)建一個(gè)簡(jiǎn)單的Flask Web應(yīng)用窑邦,包含內(nèi)存緩存擅威、Redis緩存和緩存預(yù)加載功能。

  1. 初始化Flask應(yīng)用冈钦、數(shù)據(jù)庫(kù)和Redis客戶端

    from flask import Flask, request, jsonify
    from flask_sqlalchemy import SQLAlchemy
    import redis
    import json
    from cachetools import TTLCache
    from apscheduler.schedulers.background import BackgroundScheduler
    
    app = Flask(__name__)
    
    # 配置Flask應(yīng)用的數(shù)據(jù)庫(kù)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///cms.db'
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
    db = SQLAlchemy(app)
    
    # 配置Redis客戶端
    redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True)
    
    # 定義緩存前綴和超時(shí)時(shí)間
    CACHE_PREFIX = "cms:"
    CACHE_TIMEOUT = 300  # 緩存時(shí)間,以秒為單位
    
    # 內(nèi)存緩存
    memory_cache = TTLCache(maxsize=100, ttl=CACHE_TIMEOUT)
    
  2. 定義數(shù)據(jù)庫(kù)模型

    class Content(db.Model):
        id = db.Column(db.Integer, primary_key=True)
        title = db.Column(db.String(100), nullable=False)
        body = db.Column(db.Text, nullable=False)
        timestamp = db.Column(db.DateTime, default=db.func.current_timestamp())
    
  3. 緩存輔助函數(shù)

    def get_cache(key):
        # 嘗試從內(nèi)存緩存中獲取數(shù)據(jù)
        if key in memory_cache:
            return memory_cache[key]
        
        # 嘗試從Redis緩存中獲取數(shù)據(jù)
        cached_data = redis_client.get(key)
        if cached_data:
            data = json.loads(cached_data)
            # 將數(shù)據(jù)加載到內(nèi)存緩存
            memory_cache[key] = data
            return data
        return None
    
    def set_cache(key, data):
        # 設(shè)置內(nèi)存緩存
        memory_cache[key] = data
        # 設(shè)置Redis緩存
        redis_client.setex(key, CACHE_TIMEOUT, json.dumps(data))
    
    def delete_cache(key):
        # 刪除內(nèi)存緩存
        if key in memory_cache:
            del memory_cache[key]
        # 刪除Redis緩存
        redis_client.delete(key)
    
  4. API路由

    @app.route('/content/<int:content_id>', methods=['GET'])
    def get_content(content_id):
        cache_key = f"{CACHE_PREFIX}{content_id}"
        
        # 嘗試從緩存中獲取數(shù)據(jù)
        cached_data = get_cache(cache_key)
        if cached_data:
            return jsonify(cached_data)
        
        # 如果緩存中沒(méi)有數(shù)據(jù)李请,從數(shù)據(jù)庫(kù)中查詢
        content = Content.query.get(content_id)
        if not content:
            return jsonify({"error": "Content not found"}), 404
        
        content_data = {
            "id": content.id,
            "title": content.title,
            "body": content.body,
            "timestamp": content.timestamp.isoformat()
        }
        
        # 將查詢結(jié)果緩存到Redis和內(nèi)存緩存
        set_cache(cache_key, content_data)
        
        return jsonify(content_data)
    
    @app.route('/content', methods=['POST'])
    def add_content():
        data = request.json
        new_content = Content(title=data['title'], body=data['body'])
        db.session.add(new_content)
        db.session.commit()
        
        # 設(shè)置緩存
        cache_key = f"{CACHE_PREFIX}{new_content.id}"
        content_data = {
            "id": new_content.id,
            "title": new_content.title,
            "body": new_content.body,
            "timestamp": new_content.timestamp.isoformat()
        }
        set_cache(cache_key, content_data)
        
        return jsonify({"message": "Content added", "content": content_data}), 201
    
    @app.route('/content/<int:content_id>', methods=['PUT'])
    def update_content(content_id):
        data = request.json
        content = Content.query.get(content_id)
        if not content:
            return jsonify({"error": "Content not found"}), 404
        
        content.title = data['title']
        content.body = data['body']
        db.session.commit()
        
        # 更新緩存
        cache_key = f"{CACHE_PREFIX}{content.id}"
        content_data = {
            "id": content.id,
            "title": content.title,
            "body": content.body,
            "timestamp": content.timestamp.isoformat()
        }
        set_cache(cache_key, content_data)
        
        return jsonify({"message": "Content updated", "content": content_data}), 200
    
    @app.route('/content/<int:content_id>', methods=['DELETE'])
    def delete_content(content_id):
        content = Content.query.get(content_id)
        if not content:
            return jsonify({"error": "Content not found"}), 404
        
        db.session.delete(content)
        db.session.commit()
        
        # 刪除緩存
        cache_key = f"{CACHE_PREFIX}{content.id}"
        delete_cache(cache_key)
        
        return jsonify({"message": "Content deleted"}), 200
    
  5. 緩存預(yù)加載函數(shù)

    def preload_cache():
        print("Preloading cache...")
        contents = Content.query.all()
        for content in contents:
            cache_key = f"{CACHE_PREFIX}{content.id}"
            content_data = {
                "id": content.id,
                "title": content.title,
                "body": content.body,
                "timestamp": content.timestamp.isoformat()
            }
            set_cache(cache_key, content_data)
        print("Cache preloading completed.")
    
  6. 定時(shí)任務(wù)和初始化數(shù)據(jù)庫(kù)

    from apscheduler.schedulers.background import BackgroundScheduler
    
    if __name__ == "__main__":
        # 初始化數(shù)據(jù)庫(kù)
        with app.app_context():
            db.create_all()
    
        # 啟動(dòng)定時(shí)任務(wù)
        scheduler = BackgroundScheduler()
        scheduler.add_job(preload_cache, 'interval', minutes=10)  # 每10分鐘預(yù)加載一次緩存
        scheduler.start()
    
        # 預(yù)加載緩存
        preload_cache()
    
        # 啟動(dòng)Flask應(yīng)用
        app.run(debug=True)
    

    運(yùn)行應(yīng)用

    保存上述代碼為一個(gè)Python文件(例如app.py)瞧筛,然后運(yùn)行它:

    python app.py

    代碼解釋

    • 內(nèi)存緩存(TTLCacheTTLCache 提供了一個(gè)具有時(shí)間限制的內(nèi)存緩存。我們首先嘗試從內(nèi)存緩存中獲取數(shù)據(jù)导盅,如果未命中较幌,再嘗試從Redis緩存中獲取。
    • 緩存輔助函數(shù):
      • get_cache:從內(nèi)存緩存中獲取數(shù)據(jù)白翻,如果未命中乍炉,再嘗試從Redis緩存中獲取,并將數(shù)據(jù)加載到內(nèi)存緩存滤馍。
      • set_cache:將數(shù)據(jù)設(shè)置到內(nèi)存緩存和Redis緩存岛琼。
      • delete_cache:從內(nèi)存緩存和Redis緩存中刪除數(shù)據(jù)。
    • API路由:包含獲取巢株、添加槐瑞、更新和刪除內(nèi)容的API,每個(gè)操作都包括相應(yīng)的緩存邏輯阁苞。
    • 緩存預(yù)加載:
      • preload_cache:預(yù)加載緩存函數(shù)困檩,從數(shù)據(jù)庫(kù)中讀取所有內(nèi)容,并將其加載到內(nèi)存緩存和Redis緩存那槽。
      • 定時(shí)任務(wù):使用 apscheduler 庫(kù)定期執(zhí)行緩存預(yù)加載任務(wù)悼沿。
      • 啟動(dòng)時(shí)預(yù)加載:在應(yīng)用啟動(dòng)時(shí)立即執(zhí)行一次緩存預(yù)加載。

    通過(guò)這些步驟骚灸,你可以實(shí)現(xiàn)一個(gè)多層次的緩存系統(tǒng)糟趾,包括內(nèi)存緩存和分布式緩存,并使用緩存預(yù)加載技術(shù)提高系統(tǒng)性能和響應(yīng)速度逢唤。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末拉讯,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子鳖藕,更是在濱河造成了極大的恐慌魔慷,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,122評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件著恩,死亡現(xiàn)場(chǎng)離奇詭異院尔,居然都是意外死亡蜻展,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門邀摆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)纵顾,“玉大人,你說(shuō)我怎么就攤上這事栋盹∈┯猓” “怎么了?”我有些...
    開封第一講書人閱讀 164,491評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵例获,是天一觀的道長(zhǎng)汉额。 經(jīng)常有香客問(wèn)我,道長(zhǎng)榨汤,這世上最難降的妖魔是什么蠕搜? 我笑而不...
    開封第一講書人閱讀 58,636評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮收壕,結(jié)果婚禮上妓灌,老公的妹妹穿的比我還像新娘。我一直安慰自己蜜宪,他們只是感情好虫埂,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著端壳,像睡著了一般告丢。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上损谦,一...
    開封第一講書人閱讀 51,541評(píng)論 1 305
  • 那天岖免,我揣著相機(jī)與錄音,去河邊找鬼照捡。 笑死颅湘,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的栗精。 我是一名探鬼主播闯参,決...
    沈念sama閱讀 40,292評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼悲立!你這毒婦竟也來(lái)了鹿寨?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,211評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤薪夕,失蹤者是張志新(化名)和其女友劉穎脚草,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體原献,經(jīng)...
    沈念sama閱讀 45,655評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡馏慨,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評(píng)論 3 336
  • 正文 我和宋清朗相戀三年埂淮,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片写隶。...
    茶點(diǎn)故事閱讀 39,965評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡倔撞,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出慕趴,到底是詐尸還是另有隱情痪蝇,我是刑警寧澤,帶...
    沈念sama閱讀 35,684評(píng)論 5 347
  • 正文 年R本政府宣布冕房,位于F島的核電站霹俺,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏毒费。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評(píng)論 3 329
  • 文/蒙蒙 一愈魏、第九天 我趴在偏房一處隱蔽的房頂上張望觅玻。 院中可真熱鬧,春花似錦培漏、人聲如沸溪厘。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)畸悬。三九已至,卻和暖如春珊佣,著一層夾襖步出監(jiān)牢的瞬間蹋宦,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工咒锻, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留冷冗,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,126評(píng)論 3 370
  • 正文 我出身青樓惑艇,卻偏偏與公主長(zhǎng)得像蒿辙,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子滨巴,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評(píng)論 2 355