python登錄認證后端接口開發(fā)

實現(xiàn)基于ldap認證登錄認證以及token認證:

主要庫:flask ,mongo英支,redis跃脊,ldap

1.代碼

1.1目錄結(jié)構(gòu)
auth-manage
├── bin
│   ├── start.sh
│   └── stop.sh
├── conf
│   └── conf.ini
├── data
├── logs
│   └── auth-manage.log
└── project
    ├── auth_api.py
    └── auth_manage.py
1.2啟停腳本

1.2.1 啟動腳本start.sh

#!/bin/sh
basepath=$(cd `dirname $0`; pwd)
echo $basepath
nohup python3 $basepath/../project/auth_api.py 2>&1 /dev/null &

1.2.2停止腳本stop.sh

#!/bin/bash
name='auth_api.py'
pid=$(ps -ef|grep "../project/$name"|grep -v grep|awk '{print $2}')
if [ -z "$pid" ];then
    echo 'process is not alived'
else
    echo 'kill process'
    kill -9 $pid
    pid=$(ps -ef|grep "../project/$name"|grep -v grep|awk '{print $2}')
    if [ -z "$pid" ];then
        echo 'kill process success'
    else
        echo 'kill process error'
    fi
fi

1.3配置文件conf.ini
[log]
loglevel = debug
logname = auth-manage.log
interval = 1
backupCount = 7
when = MIDNIGHT
encoding = utf-8

[mongodb]
host = xxxxx
dbname = devops
user = xxxxx
password = xxxx

[ldap]
host = xxxx.com
port = 389
base_dn = ou=xxx,dc=xxxx,dc=com
user_dn = cn=xxx,ou=Public,dc=xxxx,dc=com
password = xxxxx

[redis]
host = xxxxxx
port = 6379
db = 0
redis_expire = 3600
password = xxxxxx

1.4工程代碼auth_manage.py
# -*- coding: UTF-8 -*-

import os
import sys
import time
import logging
import configparser
import re
from logging.handlers import TimedRotatingFileHandler
import json
import requests
from multiprocessing import Process,Lock
import threading
import hashlib
import datetime
import pymongo
from pymongo import MongoClient, DESCENDING
from bs4 import BeautifulSoup
from ldap3 import (
    Server,
    Connection,
    SUBTREE,
    LEVEL,
    ALL_ATTRIBUTES,
    RESTARTABLE,
    SAFE_RESTARTABLE,
    MODIFY_REPLACE,
    MODIFY_DELETE,
)
from ldap3.protocol.microsoft import show_deleted_control
from ldap3.utils.dn import safe_rdn
from ldap3.core.exceptions import LDAPException, LDAPInvalidCredentialsResult
import redis
import base64






BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
CONF_PATH = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))),'conf','conf.ini')


def getNowTime():
    return time.strftime(r'%Y%m%d%H%M%S',time.localtime(time.time()))


class param():
    def __init__(self,confPath):
#        self.conf = configparser.ConfigParser()
        self.conf = configparser.RawConfigParser()
        self.conf.read(confPath)
        self.log_init()
        self.mongodb_init()
        self.ldap_init()
        self.redis_init()

    def log_init(self):
        self.log = {}
        self.log['loglevel'] = self.conf.get('log','loglevel')
        self.log['logname'] = self.conf.get('log','logname')
        self.log['interval'] = self.conf.getint('log','interval')
        self.log['backupCount'] = self.conf.getint('log','backupCount')
        self.log['when'] = self.conf.get('log','when')
        self.log['encoding'] = self.conf.get('log','encoding')

    def mongodb_init(self):
        self.mongodb = {}
        self.mongodb['host'] = self.conf.get('mongodb','host').split(',')
        self.mongodb['dbname'] = self.conf.get('mongodb','dbname')
        self.mongodb['user'] = self.conf.get('mongodb','user')
        self.mongodb['password'] = self.conf.get('mongodb','password')

    def ldap_init(self):
        self.ldap = {}
        self.ldap['host'] = self.conf.get('ldap','host')
        self.ldap['port'] = int(self.conf.get('ldap','port'))
        self.ldap['base_dn'] = self.conf.get('ldap','base_dn')
        self.ldap['user_dn'] = self.conf.get('ldap','user_dn')
        self.ldap['password'] = self.conf.get('ldap','password')

    def redis_init(self):
        self.redis = {}
        self.redis['host'] = self.conf.get('redis','host')
        self.redis['port'] = self.conf.get('redis','port')
        self.redis['db'] = self.conf.get('redis','db')
        self.redis['password'] = self.conf.get('redis','password')
        self.redis['redis_expire'] = self.conf.get('redis','redis_expire')


class Auth():
    def __init__(self,conf_path):
        self.param = param(conf_path)
        # log
        self.param.log['logname'] = os.path.join(BASE_DIR,'logs',self.param.log['logname'])
        self.logger_init(**self.param.log)
        self.mongodb_init(**self.param.mongodb)
        self.ldap_init(**self.param.ldap)
        self.redis_init(**self.param.redis)

    # 初始化部分-------
    def logger_init(self,loglevel,logname,interval,backupCount,when,encoding):
        logger = logging.getLogger('log')
        logger.setLevel(logging.DEBUG)

        # 移除掉所有的handler
        while logger.hasHandlers():
            for i in logger.handlers:
                logger.removeHandler(i)

        # file log
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        fh = TimedRotatingFileHandler(logname, when=when, interval=interval, backupCount=backupCount,encoding=encoding)
        fh.setLevel(logging.DEBUG)
        fh.setFormatter(formatter)
        logger.addHandler(fh)

        # console log
        self.logger = logger

    def redis_init(self,host,port,db,password,redis_expire):
        self.logger.debug("begin init redis connection")
        try:
            self.redis_expire = int(redis_expire)
            self.redis_cli = redis.Redis(host=host, port=port, db=db,password=password,decode_responses=True)
#            self.redis_cli = redis.Redis(host=host, port=port, db=db,password=password)
        except Exception as e:
            self.logger.error("Failed to connect to redis!")
            self.logger.error(str(e))
            raise e
        self.logger.debug("connect to redis success!")

    def ldap_init(self,host, port, user_dn, password, base_dn):
        self.logger.debug("begin init ldap connection")
        try:
#            self.server = Server(host=host, port=port, use_ssl=True)
            self.server = Server(host=host, port=port)
            self.base_dn = base_dn
            self.conn = Connection(
                self.server,
                user_dn,
                password,
                check_names=True,
                auto_bind=False,
                raise_exceptions=True,
            )
        except Exception as e:
            self.logger.error("Failed to connect to ldap!")
            self.logger.error(str(e))
            raise e
        self.logger.debug("connect to ldap success!")


    def mongodb_init(self,host,dbname,user,password):
        self.logger.debug("begin init mongodb connection")
        try:
            self.client = pymongo.MongoClient(host)
            self.db = self.client[dbname]
            self.db.authenticate(user,password)
        except Exception as e:
            self.logger.error("Failed to connect to MongoDB!")
            self.logger.error(str(e))
            raise e
        self.logger.debug("connect to MongoDB success!")


    def auth_by_ldap(self, name, password):
        allowed_page = []
        try:
            self.conn.rebind()
            search_res = self.conn.search(
                                     search_base=self.base_dn,
                                     search_filter="(&(objectClass=person)(sAMAccountName={}))".format(name),
                                     search_scope=SUBTREE,
                                     attributes=["objectGUID", "sAMAccountName"],
                                     )
            if search_res:
                entry = self.conn.response[0]
            else:
                entry = ""

            self.conn.unbind()

        except Exception as e:
            self.logger.error(f"search res error as:  {e}")
            search_res = ""
            entry = ""
        if search_res:
            user_dn = entry["dn"]

            try:
                # 這個connect是通過你的用戶名和密碼還有上面搜到的入口搜索來查詢的

                user_conn = Connection(
                                          self.server,
                                          user=user_dn,
                                          password=password,
                                          check_names=True,
                                          auto_bind=False,
                                          raise_exceptions=True,
                                          )
                self.logger.debug(str(user_conn))
                user_conn.rebind()
                user_conn.unbind()

            except LDAPInvalidCredentialsResult as e:
                self.logger.error(str(e))
                error_txt = ""
                if "52e" in e.message:
                    error_txt = "密碼不正確!"
                elif "775" in e.message:
                    error_txt = "賬號已鎖定, 請聯(lián)系管理員或等待自動解鎖!"
                elif "533" in e.message:
                    error_txt = "賬號已禁用!"
                elif "773" in e.message:
                    error_txt = "用戶登陸前必須修改密碼!"
                else:
                    error_txt = "認證失敗, 請聯(lián)系管理員檢查該賬號!"
                self.logger.error(error_txt)
                return(error_txt,allowed_page)



                # 正確-success 不正確-invalidCredentials
            if user_conn.result["description"] == "success":
                if "attributes" in entry:
                    rs = self.db.devopsuser.find_one({'username':name})
                    if rs:
                        result = self.db.devopsrole.find_one({'role':rs['role']})
                        normalpages = result['normalpage']
                        for np in normalpages:
                            if np:
                                if np["permission"] != "":
                                    allowed_page.append(np['nid'])
                        return('CORRECT',allowed_page)
                    else:
                        self.db.devopsuser.insert_one({'username':name,'role':'guest'})
                        result = self.db.devopsrole.find_one({'role':'guest'})
                        normalpages = result['normalpage']
                        for np in normalpages:
                            if np:
                                if np["permission"] != "":
                                    allowed_page.append(np['nid'])
                        return('CORRECT',allowed_page)

            else:
                self.logger.error('user_conn result data abnormal')
                return('user_conn result data abnormal',allowed_page)
        else:
            self.logger.error('search res faild')
            return('search res faild',allowed_page)


    def login(self,user_id):
        self.user_id = user_id
        result = generate_token()
        token = result[0]
        create_time = result[1]
        expire_time = result[2]

        if self.user_id == 'guest':
            self.redis_cli.set(self.user_id, token)
        else:
            self.redis_cli.set(self.user_id, token, ex=self.redis_expire)
        return token,create_time,expire_time

    def check(self, user_id,token):
        self.user_id = user_id
        if self.redis_cli.exists(self.user_id):
            timetoken = self.redis_cli.get(self.user_id)
            if timetoken == token:
                status = checkout_token(timetoken)
                if status == 1:
                    new_result = generate_token()
                    new_token = new_result[0]
                    create_time = new_result[1]
                    expire_time = new_result[2]
                    self.redis_cli.set(self.user_id, new_token,ex=self.redis_expire)
                    return("SUCCESS",new_token,create_time,expire_time)
                else:
                    return ("SESSION-EXPIRED","0","0","0")
            else:
                return ("INVALID-TOKEN","0","0","0")
        else:
            return ("FAILURE","0","0","0")


def md5(arg):
    md5_pwd = hashlib.md5(bytes('demaxiya', encoding='utf-8'))
    md5_pwd.update(bytes(arg, encoding='utf-8'))
    return md5_pwd.hexdigest()

def generate_token(aging=3600):
    sha1_token = hashlib.sha1(os.urandom(24)).hexdigest()
    create_time = int(time.time())
    time_group = str(create_time)+":"+str(aging)
    time_token = base64.urlsafe_b64encode(time_group.encode("utf-8")).decode("utf-8").strip().lstrip().rstrip('=')
    token = sha1_token+time_token
    expire_time = create_time + 3600
    return token,create_time,expire_time

def checkout_token(token):
    result = {}
    decode_time = safe_b64decode(token[40:]).decode('utf-8')
    decode_split_time = decode_time.split(':')
    decode_create_time = decode_split_time[0]
    decode_aging_time = decode_split_time[1]
    now_time = int(time.time())
    difference_time = now_time-int(decode_create_time)
    if difference_time > int(decode_aging_time):
        result = 0
    else:
        result = 1
    return result

def safe_b64decode(s):
    length = len(s) % 4
    for i in range(length):
        s = s+'='
    return base64.b64decode(s)

1.5 flask代碼auth_api.py
# -*- coding: UTF-8 -*-

from flask import Flask, abort, request, make_response, jsonify, Response, render_template
from flask_cors import CORS
import json
import auth_manage
import os
import re


app = Flask(__name__)
UM = auth_manage.Auth(auth_manage.CONF_PATH)


@app.errorhandler(404)
def not_found(error):
    return make_response(jsonify({'error': 'Not found'}), 404)

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/api/auth/login', methods=['GET', 'POST'])
def user_login():
    info = {}
    r_dict = {}
    token = "0"
    token_expire_time = "0"
    if request.method == 'POST':
        if request.json:
            userid = request.json['userid']
            passwd = request.json['passwd']
        else:
            userid = request.form['userid']
            passwd = request.form['passwd']
        result = UM.auth_by_ldap(userid, passwd)

        if result[0] == "CORRECT":
            result_token = UM.login(userid)
            token = result_token[0]
            token_expire_time = result_token[2]
#            r_dict['result'] = result[0]
            r_dict['token'] = token
            r_dict['page'] = result[1]
            r_dict['token_expire_time'] = str(token_expire_time)
            r_dict['userid'] = userid

            info['status'] = 200
            info['result'] = ""
            info['data'] = r_dict
            return json.dumps(info)
        else:
            info['status'] = 500
            info['result'] = result[0]
            info['data'] = ""
            return json.dumps(info)

    else:
        info['status'] = 500
        info['result'] = "please use POST request"
        info['data'] = ""
        return json.dumps(info)


@app.route('/api/auth/tokenauth', methods=['GET', 'POST'])
def token_check():
    info = {}
    r_dict = {}

    if request.method == 'POST':
        if request.json:
            userid = request.json['userid']
            token = request.json['token']
        else:
            userid = request.form['userid']
            token = request.form['token']

        result = UM.check(userid,token)
        if result[0] == "SUCCESS":
            r_dict['token'] = result[1]
            r_dict['token_expire_time'] = str(result[3])
            r_dict['userid'] = userid
            info['status'] = 200
            info['result'] = ""
            info['data'] = r_dict
            return json.dumps(info)
        else:
            info['status'] = 500
            info['result'] = result[0]
            info['data'] = ""
            return json.dumps(info)
    else:
        info['status'] = 500
        info['result'] = "please use POST request"
        info['data'] = ""
        return json.dumps(info)


if __name__ == '__main__':
    CORS(app, supports_credentials=True)
    app.run(host='0.0.0.0', port="10086",debug=True)




2.數(shù)據(jù)庫表結(jié)構(gòu)

2.1 表devopsuser 一條數(shù)據(jù)
{
    "_id" : ObjectId("62bd3f8bacf7357197acf178"),
    "role" : "admin",
    "username" : "zhangsan"
}
2.2 表devopsrole一條數(shù)據(jù)
{
    "_id" : ObjectId("62bc14c7029deb74beb1b7ba"),
    "role" : "admin",
    "normalpage" : [
        {
            "name" : "xxx",
            "nid" : "yyy",
            "permission" : "W"
        },
        {

        }
    ]
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末杖挣,一起剝皮案震驚了整個濱河市慢洋,隨后出現(xiàn)的幾起案子摘投,更是在濱河造成了極大的恐慌盟庞,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,252評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件粉捻,死亡現(xiàn)場離奇詭異弧轧,居然都是意外死亡雪侥,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評論 3 399
  • 文/潘曉璐 我一進店門精绎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來速缨,“玉大人,你說我怎么就攤上這事代乃⊙” “怎么了?”我有些...
    開封第一講書人閱讀 168,814評論 0 361
  • 文/不壞的土叔 我叫張陵搁吓,是天一觀的道長原茅。 經(jīng)常有香客問我,道長堕仔,這世上最難降的妖魔是什么擂橘? 我笑而不...
    開封第一講書人閱讀 59,869評論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮摩骨,結(jié)果婚禮上通贞,老公的妹妹穿的比我還像新娘。我一直安慰自己恼五,他們只是感情好昌罩,可當(dāng)我...
    茶點故事閱讀 68,888評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著灾馒,像睡著了一般茎用。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,475評論 1 312
  • 那天轨功,我揣著相機與錄音旭斥,去河邊找鬼。 笑死夯辖,一個胖子當(dāng)著我的面吹牛琉预,可吹牛的內(nèi)容都是我干的董饰。 我是一名探鬼主播蒿褂,決...
    沈念sama閱讀 41,010評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼卒暂!你這毒婦竟也來了啄栓?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,924評論 0 277
  • 序言:老撾萬榮一對情侶失蹤也祠,失蹤者是張志新(化名)和其女友劉穎昙楚,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體诈嘿,經(jīng)...
    沈念sama閱讀 46,469評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡堪旧,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,552評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了奖亚。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片淳梦。...
    茶點故事閱讀 40,680評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖昔字,靈堂內(nèi)的尸體忽然破棺而出爆袍,到底是詐尸還是另有隱情,我是刑警寧澤作郭,帶...
    沈念sama閱讀 36,362評論 5 351
  • 正文 年R本政府宣布陨囊,位于F島的核電站,受9級特大地震影響夹攒,放射性物質(zhì)發(fā)生泄漏蜘醋。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,037評論 3 335
  • 文/蒙蒙 一咏尝、第九天 我趴在偏房一處隱蔽的房頂上張望压语。 院中可真熱鬧,春花似錦状土、人聲如沸无蜂。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽斥季。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間酣倾,已是汗流浹背舵揭。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留躁锡,地道東北人午绳。 一個月前我還...
    沈念sama閱讀 49,099評論 3 378
  • 正文 我出身青樓,卻偏偏與公主長得像映之,于是被迫代替她去往敵國和親拦焚。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,691評論 2 361