import base64
import hashlib
import hmac
import json
import random
import re
import string
import time
from hashlibimport sha256
from Crypto.Cipherimport AES,PKCS1_v1_5
from Crypto.PublicKeyimport RSA
import binascii
from Crypto.Util.Paddingimport unpad
from cryptography.hazmat.primitives.asymmetricimport rsa
from cryptography.hazmat.primitivesimport serialization
from cryptography.hazmat.primitives.serializationimport Encoding
import os
# jarpath = os.path.join(os.path.abspath('.') + '\\aes.jar')? # 第二個參數(shù)是jar包的路徑
# # jarpath = os.path.join(os.path.abspath('.') + '\\utils.jar')
# opensslpath = os.path.join(os.path.abspath('.') + '\\openssl-0.9.8k_X64\\bin\\openssl.exe ')
#
# starkey = 'a2164ada0026ccf7'.encode('utf-8')
# stariv = '82c91325e74bef0f'.encode('utf-8')
# portalkey = 't5SJPflE47xsysmJ42tGqtHG+oKaDg41'.encode('utf-8')
# portaliv = '5UnBXl4uSVtweMyX'.encode('utf-8')
# stagekey = 't5SJPflE47xsysmJ42tGqtHG+oKaDg41'.encode('utf-8')
# stageiv = 'oCDjWzAFbqBriHEF'.encode('utf-8')
# starsigsecret = '9af2e7b2d7562ad5'.encode('utf-8')
# jvslkey = 'suWD3qOZr01FNDIESjSMeMO7HF/5zVMw'.encode('utf-8')
# jarpath = os.path.join(
# os.path.abspath('') +
# '\\aes.jar')
# opensslpath = os.path.join(os.path.abspath('') + '\\openssl-0.9.8k_X64\\bin\\openssl.exe ')
from Crypto.Utilimport Padding
def IV32(IV='oCDjWzAFbqBriHEF'):
? ? # 16位隨機(jī)數(shù)字字母? 這里寫死不影響 一般為接口中的xjv
? ? iv32 = binascii.b2a_hex(IV.encode()).decode("utf-8")
? ? return iv32
def IV16(iv):
? ? iv16 = binascii.a2b_hex(iv.encode()).decode("utf-8")
? ? return iv16
#
# def Generate_key_pkcs8():
#? ? """生成pkcs8公鑰"""
#? ? # lujin = "C:/Python36/Lib/site-packages/AesLibrary/openssl-0.9.8k_X64/bin/openssl.exe "
#? ? # 生成pkcs1私鑰
#? ? os.system(
#? ? ? ? opensslpath + 'genrsa -out pkcs1_private.pem 1024')
#? ? # 轉(zhuǎn)成pkcs8私鑰
#? ? os.system(
#
#? ? ? ? opensslpath +
#? ? ? ? 'pkcs8 -topk8 -inform PEM -in pkcs1_private.pem -outform pem -nocrypt -out pkcs8_private.pem')
#? ? # 轉(zhuǎn)成pkcs8公鑰
#? ? os.system(
#
#? ? ? ? opensslpath +
#? ? ? ? 'rsa -in pkcs8_private.pem -pubout -out pkcs8_public.pem')
#? ? with open('C:\\Users\\LCHEN205\\Desktop\\Wallboxapitest\\tests\\Business_scene\\Portal\\pkcs8_public.pem') as fp:
#? ? ? ? publickey_pkcs8 = fp.read()
#? ? return publickey_pkcs8.replace("\n","")
def RSA_keypair():
? ? # 生成 RSA 公私鑰對
? ? private_key = rsa.generate_private_key(
? ? ? ? public_exponent=65537,? # 公鑰指數(shù)
? ? ? ? key_size=1024)? # 密鑰長度
? ? public_key = private_key.public_key()
? ? return private_key,public_key
def RSA_privatekeyPkcs8():
? ? # 序列化私鑰為PEM PKCS8私鑰
? ? privatekeyPkcs8 = RSA_keypair()[0].private_bytes(
? ? ? ? encoding=serialization.Encoding.PEM,
? ? ? ? format=serialization.PrivateFormat.PKCS8,? # PKCS#8 私鑰格式
? ? ? ? encryption_algorithm=serialization.NoEncryption())? # 不進(jìn)行加密
? ? return privatekeyPkcs8.decode('utf_8')
def RSA_publickeyPkcs8():
? ? pem_public_key = RSA_keypair()[1].public_bytes(
? ? ? ? encoding=serialization.Encoding.PEM,
? ? ? ? format=serialization.PublicFormat.SubjectPublicKeyInfo)? # 公鑰格式
? ? return pem_public_key.decode('utf_8')
# 序列化私鑰為PEM PKCS8公鑰
def Generate_key_pkcs8():
? ? # 序列化私鑰為PEM PKCS8公鑰
? ? # luji = "C:/Python36/Lib/site-packages/AesLibrary/openssl-0.9.8k_X64/bin/openssl.exe "
? ? pem_public_key = RSA_keypair()[1].public_bytes(
? ? ? ? encoding=serialization.Encoding.PEM,
? ? ? ? format=serialization.PublicFormat.SubjectPublicKeyInfo)? # 公鑰格式
? ? return pem_public_key.decode('utf-8').replace("\n","")
? ? # cur_path = os.path.abspath(os.path.dirname(__file__))
? ? # opensslpath = cur_path[
? ? #? ? ? ? ? ? ? :cur_path.find(
? ? #? ? ? ? ? ? ? ? ? 'wallboxapitest')] + 'wallboxapitest/utils/openssl-0.9.8k_X64/bin/openssl.exe '
? ? #
? ? # root_path = cur_path[
? ? #? ? ? ? ? ? :cur_path.find(
? ? #? ? ? ? ? ? ? ? 'wallboxapitest')] + 'wallboxapitest/tests/Business_scene/Portal/pkcs8_public.pem'
? ? # os.system(opensslpath + 'genrsa -out pkcs1_private.pem 1024')
? ? # os.system(
? ? #? ? opensslpath + 'pkcs8 -topk8 -inform PEM -in pkcs1_private.pem -outform pem -nocrypt -out pkcs8_private.pem')
? ? # os.system(opensslpath + 'rsa -in pkcs8_private.pem -pubout -out pkcs8_public.pem')
? ? # with open(root_path) as fp:
? ? #? ? publickey_pkcs8 = fp.read()
? ? # return publickey_pkcs8.replace("\n","")
def RSA_privatekeyPkcs1():
? ? # 序列化私鑰為PEM PKCS1私鑰
? ? privatekeyPkcs1 = RSA_keypair()[0].private_bytes(
? ? ? ? encoding=Encoding.PEM,
? ? ? ? format=serialization.PrivateFormat.TraditionalOpenSSL,? # PKCS#8 私鑰格式
? ? ? ? encryption_algorithm=serialization.NoEncryption())? # 不進(jìn)行加密
? ? return privatekeyPkcs1.decode('utf_8')
def RSA_publickeyPkcs1():
? ? # 序列化私鑰為PEM PKCS1公鑰
? ? privatekeyPkcs1 = RSA_keypair()[1].public_bytes(
? ? ? ? encoding=Encoding.PEM,
? ? ? ? format=serialization.PublicFormat.PKCS1)? # 不進(jìn)行加密
? ? return privatekeyPkcs1.decode('utf_8')
# def pkcs7padding(text):
#? ? """
#? ? ? ? 明文使用PKCS7填充
#? ? ? ? """
#? ? padded_data = Padding.pad(str(text).encode('utf-8'), AES.block_size, style='pkcs7')
#? ? return padded_data
# bs = 16
# length = len(text)
# bytes_length = len(str(text).encode('utf-8'))
# padding_size = length if (bytes_length == length) else bytes_length
# padding = bs - padding_size % bs
# padding_text = chr(padding) * padding
#
# coding = chr(padding)
# return str(text) + padding_text
def aes_encrypt(content,key,iv):
? ? """
? ? ? ? AES加密
? ? """
? ? cipher = AES.new(key.encode('utf-8'),AES.MODE_CBC,iv.encode('utf-8'))
? ? # 處理明文,使用Padding.pad()函數(shù)來實現(xiàn)PKCS7Padding,并使用style='pkcs7'參數(shù)來指定填充方式。
? ? # content = json.dumps(content)
? ? if type(content) !=str:
? ? ? ? content = json.dumps(content)
? ? content_padding = Padding.pad(content.encode('utf-8'),AES.block_size,style='pkcs7')
? ? # 加密
? ? encrypt_bytes = cipher.encrypt(content_padding)
? ? # encrypt_bytes = cipher.encrypt(content_padding.encode('utf-8'))
? ? # 重新編碼
? ? result =str(base64.b64encode(encrypt_bytes),encoding='utf-8')
? ? return result
# def aes_jvsl_encrypt(content):
#? ? """
#? ? ? ? AES加密
#? ? ? ? applogout
#? ? ? ? """
#? ? cipher = AES.new(
#? ? ? ? jvslkey,AES.MODE_CBC,
#? ? ? ? stageiv)
#? ? # 處理明文
#? ? content_padding = \
#? ? ? ? pkcs7padding(content)
#? ? # 加密
#? ? encrypt_bytes = cipher.encrypt(content_padding.encode('utf-8'))
#? ? # 重新編碼
#? ? result = str(base64.b64encode(encrypt_bytes),encoding='utf-8')
#? ? return result
def aes_Decrypt(content,key,iv):
? ? """
? ? ? ? AES解密
? ? """
? ? cipher = AES.new(key.encode('utf-8'),AES.MODE_CBC,iv.encode('utf-8'))
? ? base64_decrypted = base64.b64decode(content.encode('utf-8'))
? ? result = unpad(cipher.decrypt(base64_decrypted),AES.block_size).decode('utf-8')
? ? return result
# def aes_jvsl_Decrypt(content):
#? ? """
#? ? ? ? AES解密
#? ? ? ? """
#? ? cipher = AES.new(
#? ? ? ? jvslkey,AES.MODE_CBC,
#? ? ? ? stageiv)
#? ? base64_decrypted = base64.b64decode(content.encode('utf-8'))
#? ? DataStr = cipher.decrypt(base64_decrypted)
#? ? # print(DataStr)
#? ? try:
#? ? ? ? result = re.compile(
#? ? ? ? ? ? '[\\x00-\\x08\\x0b-\\x0c\\x0e-\\x1f\n\r\t]').sub('',DataStr.decode())
#? ? except BaseException:
#? ? ? ? result = '轉(zhuǎn)碼失敗'
#? ? return result
def notifications_body(params,key,iv,starsigsecret,OperatorID="313744932"):
? ? """notifications加密加簽返回處理后的body
? ? ? ? 等同于requestEncodeLocal稽亏,為python本地化方法"""
? ? # OperatorID,默認(rèn)727:313744932,821:MA21D2RF2
? ? # print(params)
? ? # OperatorID = "313744932"
? ? # 時間戳
? ? timestamp = time.strftime("%Y%m%d%H%M%S",time.localtime())
? ? # 隨機(jī)的四位數(shù)字福青,加簽需要的參數(shù),隨機(jī)生成就可以
? ? requestSeq ="".join(random.sample(string.digits,k=4))
? ? # 傳參為body,對body加密
? ? # cipher = AES.new(key.encode('utf-8'),AES.MODE_CBC,iv.encode('utf-8'))
? ? # # 處理明文,傳參為body无午,對params加密
? ? # content_padding = pkcs7padding(params)
? ? # 加密
? ? aesdata = aes_encrypt(params,key,iv)
? ? # 重新編碼
? ? # aesdata = str(base64.b64encode(encrypt_bytes),encoding='utf-8')
? ? # aesdata =
? ? # aes_encrypt(params, key, iv)
? ? # 拼接加簽數(shù)據(jù)媒役,數(shù)據(jù)分別為運(yùn)營商id,加密后的body宪迟,時間戳酣衷,隨機(jī)四位數(shù)字字符串
? ? data = OperatorID + aesdata +str(timestamp) +str(requestSeq)
? ? # sha256對data加簽,參數(shù)為sigsecret次泽,在init屬性定義穿仪,還有拼接的加簽數(shù)據(jù)data,該方法適用于星星的sha256
? ? signdata = hmac.new(starsigsecret.encode('utf-8'),data.encode("utf8"),digestmod=sha256).hexdigest().upper()
? ? # 組裝加密加簽后的密文body意荤,return出去
? ? result ={"Sig": signdata,
? ? ? ? ? ? ? "Data": aesdata,
? ? ? ? ? ? ? "OperatorID": OperatorID,
? ? ? ? ? ? ? "TimeStamp": timestamp,
? ? ? ? ? ? ? "Seq": requestSeq}
? ? # return json.dumps(result)
? ? return result
def RSA_pkcs8encrypt(content):
? ? """portal pkcs8加密"""
? ? pkcs8key = RSA.importKey('''-----BEGIN PUBLIC KEY-----
? ? MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAsPALr6qX1CKpDYhuvm7FoHr4nTlARjVqpsotXPm639ZMlvDR3rQ8/k+AZKRHxJkHjgdV2Pm6RQonIONYXTv9FRLctxCJAsOHIig0b9yEoKh3hUCxocoP7mAItLcMwXCYhKilEeoEPSWvQTXf6inNrpM/GDcHrgoghU5OT8YZ5l8qlvwGkZZWrXklchMt84ezv9Gb2akFnj/A39wtKI33Jvhk3zbCC/jh9S7XWuLgty1RakmdI64PFGH9j/+OSE9t1ETNOgvju8VxuNfxqf8eh2IP5sYXV8GRmMeyuFNt5GmFZkHKeds15pfhfz9nb8B67wgS6dWi4ZdE6iM5qJ/4XwIDAQAB
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? -----END PUBLIC KEY-----''')
? ? cipher = PKCS1_v1_5.new(pkcs8key)
? ? # 加密手機(jī)號
? ? crypto = base64.urlsafe_b64encode(cipher.encrypt(content.encode('utf-8'))).decode('utf-8')
? ? return crypto
def RSA_pkcs1encrypt(content):
? ? """pkcs1加密"""
? ? pkcs1key = RSA.importKey(
? ? ? ? '''-----BEGIN RSA PUBLIC KEY-----\nMIIBCgKCAQEAsPALr6qX1CKpDYhuvm7FoHr4nTlARjVqpsotXPm639ZMlvDR3rQ8\n/k+AZKRHxJkHjgdV2Pm6RQonIONYXTv9FRLctxCJAsOHIig0b9yEoKh3hUCxocoP\n7mAItLcMwXCYhKilEeoEPSWvQTXf6inNrpM/GDcHrgoghU5OT8YZ5l8qlvwGkZZW\nrXklchMt84ezv9Gb2akFnj/A39wtKI33Jvhk3zbCC/jh9S7XWuLgty1RakmdI64P\nFGH9j/+OSE9t1ETNOgvju8VxuNfxqf8eh2IP5sYXV8GRmMeyuFNt5GmFZkHKeds1\n5pfhfz9nb8B67wgS6dWi4ZdE6iM5qJ/4XwIDAQAB\n-----END RSA PUBLIC KEY-----\n''')
? ? cipher = PKCS1_v1_5.new(pkcs1key)
? ? # 加密手機(jī)號
? ? crypto = base64.urlsafe_b64encode(cipher.encrypt(content.encode('utf-8'))).decode('utf-8')
? ? return crypto
def R2_sign(param,payloadkey,secretKey):
? ? """新接口對參數(shù)body進(jìn)行排序并且加簽
? ? ? ? 不要傳value為空的key"""
? ? # 拿到日期格式‘2021-12-01啊片,進(jìn)行sha256處理
? ? sha256date = sha256(time.strftime('%Y-%m-%d').encode('utf-8')).hexdigest()
? ? # sha256處理后的日期,拼接上payloadkey玖像,進(jìn)行sha256處理紫谷,得到secretKey2
? ? secretKey2 = sha256((sha256date + payloadkey).encode('utf-8')).hexdigest()
? ? # 對入?yún)ody進(jìn)行排序,格式{"pageIndex":"1","pageSize":"20","xjv":"37514a4e4f4e6e4256337a4f324c3347","timestamp":"1638337506000"}
? ? darr = json.loads(json.dumps(param,sort_keys=True,ensure_ascii=False))
? ? stack =list(darr.keys())
? ? my_list =[]
? ? # for i in stack:
? ? #? ? my_list.append("&" + str(i) + "=" + str(darr[i]).replace("'", ''))
? ? for iin stack:
? ? ? ? if darr[i] =="":
? ? ? ? ? ? dict(darr).pop(i)
? ? ? ? elif isinstance(darr[i],(dict,list)):
? ? ? ? ? ? my_list.append("&" +str(i) +"=" +str(json.dumps(darr[i],ensure_ascii=False).replace(" ","")))
? ? ? ? else:
? ? ? ? ? ? my_list.append("&" +str(i) +"=" +str(darr[i]))
? ? # 拼接secretKey2+排序后的body+secretKey捐寥,進(jìn)行sha256處理碴里,得到xjw
? ? return hashlib.sha256(
? ? ? ? ("secretKey2=" + secretKey2 +''.join(my_list) +"&secretKey=" + secretKey).encode('utf-8')).hexdigest()
def portal_signtimestap(params):
? ? """portal對入?yún)⒓雍灒瑀eturn sign和時間戳
? ? ? ? 老接口加簽上真,share,dkey接口也用這個方法,"""
? ? timestap =str(round(time.time() *1000))
? ? Ym = time.strftime("%Y%m",time.localtime())
? ? # 對Ym進(jìn)行加簽,該方法適用于福特的sha256
? ? sha256Timestamp = hashlib.sha256(Ym.encode('utf-8')).hexdigest()
? ? # 對動態(tài)鹽取值區(qū)間羹膳,左區(qū)間為月份乘3睡互,右區(qū)間為月份乘3+10
? ? m =int(time.strftime("%m",time.localtime())) *3
? ? Sha256DynamicSalt = sha256Timestamp[m:m +10]
? ? sign = hashlib.sha256((params + timestap + Sha256DynamicSalt).encode('utf-8')).hexdigest()
? ? return [sign,timestap]
def ms_timestap():
? ? timestap =str(round(time.time() *1000))
? ? return timestap
def sha256guid(guid):
? ? return sha256(guid.encode('utf-8')).hexdigest()
def R1_sign(param,secretKey):
? ? darr = json.loads(
? ? ? ? json.dumps(
? ? ? ? ? ? eval(param),
? ? ? ? ? ? sort_keys=True,
? ? ? ? ? ? ensure_ascii=False))
? ? stack =list(darr.keys())
? ? my_list =[]
? ? for iin stack:
? ? ? ? if darr[i] =="":
? ? ? ? ? ? dict(darr).pop(i)
? ? ? ? elif isinstance(darr[i],(dict,list)):
? ? ? ? ? ? my_list.append(
? ? ? ? ? ? ? ? "&" +str(i) +"=" +str(json.dumps(darr[i],ensure_ascii=False).replace(" ","")))
? ? ? ? else:
? ? ? ? ? ? my_list.append("&" +str(i) +"=" +str(darr[i]))
? ? return hashlib.sha256((''.join(my_list) +"&secretKey=" + secretKey).lstrip("&").encode('utf-8')).hexdigest()