原訊飛平臺python代碼
# -*- coding: utf-8 -*-
#
# author: yanmeng2
#
# 非實時轉(zhuǎn)寫調(diào)用demo
import base64
import hashlib
import hmac
import json
import os
import time
import requests
lfasr_host = 'http://raasr.xfyun.cn/api'
# 請求的接口名
api_prepare = '/prepare'
api_upload = '/upload'
api_merge = '/merge'
api_get_progress = '/getProgress'
api_get_result = '/getResult'
# 文件分片大小10M
file_piece_sice = 10485760
# ——————————————————轉(zhuǎn)寫可配置參數(shù)————————————————
# 參數(shù)可在官網(wǎng)界面(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看,根據(jù)需求可自行在gene_params方法里添加修改
# 轉(zhuǎn)寫類型
lfasr_type = 0
# 是否開啟分詞
has_participle = 'false'
has_seperate = 'true'
# 多候選詞個數(shù)
max_alternatives = 0
# 子用戶標(biāo)識
suid = ''
class SliceIdGenerator:
"""slice id生成器"""
def __init__(self):
self.__ch = 'aaaaaaaaa`'
def getNextSliceId(self):
ch = self.__ch
j = len(ch) - 1
while j >= 0:
cj = ch[j]
if cj != 'z':
ch = ch[:j] + chr(ord(cj) + 1) + ch[j + 1:]
break
else:
ch = ch[:j] + 'a' + ch[j + 1:]
j = j - 1
self.__ch = ch
return self.__ch
class RequestApi(object):
def __init__(self, appid, secret_key, upload_file_path):
self.appid = appid
self.secret_key = secret_key
self.upload_file_path = upload_file_path
# 根據(jù)不同的apiname生成不同的參數(shù),本示例中未使用全部參數(shù)您可在官網(wǎng)(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看后選擇適合業(yè)務(wù)場景的進行更換
def gene_params(self, apiname, taskid=None, slice_id=None):
appid = self.appid
secret_key = self.secret_key
upload_file_path = self.upload_file_path
ts = str(int(time.time()))
m2 = hashlib.md5()
m2.update((appid + ts).encode('utf-8'))
md5 = m2.hexdigest()
md5 = bytes(md5, encoding='utf-8')
# 以secret_key為key, 上面的md5為msg乏德, 使用hashlib.sha1加密結(jié)果為signa
signa = hmac.new(secret_key.encode('utf-8'), md5, hashlib.sha1).digest()
signa = base64.b64encode(signa)
signa = str(signa, 'utf-8')
file_len = os.path.getsize(upload_file_path)
file_name = os.path.basename(upload_file_path)
param_dict = {}
if apiname == api_prepare:
# slice_num是指分片數(shù)量鳍怨,如果您使用的音頻都是較短音頻也可以不分片,直接將slice_num指定為1即可
slice_num = int(file_len / file_piece_sice) + (0 if (file_len % file_piece_sice == 0) else 1)
param_dict['app_id'] = appid
param_dict['signa'] = signa
param_dict['ts'] = ts
param_dict['file_len'] = str(file_len)
param_dict['file_name'] = file_name
param_dict['slice_num'] = str(slice_num)
elif apiname == api_upload:
param_dict['app_id'] = appid
param_dict['signa'] = signa
param_dict['ts'] = ts
param_dict['task_id'] = taskid
param_dict['slice_id'] = slice_id
elif apiname == api_merge:
param_dict['app_id'] = appid
param_dict['signa'] = signa
param_dict['ts'] = ts
param_dict['task_id'] = taskid
param_dict['file_name'] = file_name
elif apiname == api_get_progress or apiname == api_get_result:
param_dict['app_id'] = appid
param_dict['signa'] = signa
param_dict['ts'] = ts
param_dict['task_id'] = taskid
return param_dict
# 請求和結(jié)果解析阵难,結(jié)果中各個字段的含義可參考:https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html
def gene_request(self, apiname, data, files=None, headers=None):
response = requests.post(lfasr_host + apiname, data=data, files=files, headers=headers)
result = json.loads(response.text)
if result["ok"] == 0:
print("{} success:".format(apiname) + str(result))
return result
else:
print("{} error:".format(apiname) + str(result))
exit(0)
return result
# 預(yù)處理
def prepare_request(self):
return self.gene_request(apiname=api_prepare,
data=self.gene_params(api_prepare))
# 上傳
def upload_request(self, taskid, upload_file_path):
file_object = open(upload_file_path, 'rb')
try:
index = 1
sig = SliceIdGenerator()
while True:
content = file_object.read(file_piece_sice)
if not content or len(content) == 0:
break
files = {
"filename": self.gene_params(api_upload).get("slice_id"),
"content": content
}
response = self.gene_request(api_upload,
data=self.gene_params(api_upload, taskid=taskid,
slice_id=sig.getNextSliceId()),
files=files)
if response.get('ok') != 0:
# 上傳分片失敗
print('upload slice fail, response: ' + str(response))
return False
print('upload slice ' + str(index) + ' success')
index += 1
finally:
'file index:' + str(file_object.tell())
file_object.close()
return True
# 合并
def merge_request(self, taskid):
return self.gene_request(api_merge, data=self.gene_params(api_merge, taskid=taskid))
# 獲取進度
def get_progress_request(self, taskid):
return self.gene_request(api_get_progress, data=self.gene_params(api_get_progress, taskid=taskid))
# 獲取結(jié)果
def get_result_request(self, taskid):
return self.gene_request(api_get_result, data=self.gene_params(api_get_result, taskid=taskid))
def all_api_request(self):
# 1. 預(yù)處理
pre_result = self.prepare_request()
taskid = pre_result["data"]
# 2 . 分片上傳
self.upload_request(taskid=taskid, upload_file_path=self.upload_file_path)
# 3 . 文件合并
self.merge_request(taskid=taskid)
# 4 . 獲取任務(wù)進度
while True:
# 每隔20秒獲取一次任務(wù)進度
progress = self.get_progress_request(taskid)
progress_dic = progress
if progress_dic['err_no'] != 0 and progress_dic['err_no'] != 26605:
print('task error: ' + progress_dic['failed'])
return
else:
data = progress_dic['data']
task_status = json.loads(data)
if task_status['status'] == 9:
print('task ' + taskid + ' finished')
break
print('The task ' + taskid + ' is in processing, task status: ' + str(data))
# 每次獲取進度間隔20S
time.sleep(20)
# 5 . 獲取結(jié)果
self.get_result_request(taskid=taskid)
# 注意:如果出現(xiàn)requests模塊報錯:"NoneType" object has no attribute 'read', 請嘗試將requests模塊更新到2.20.0或以上版本(本demo測試版本為2.20.0)
# 輸入訊飛開放平臺的appid,secret_key和待轉(zhuǎn)寫的文件路徑
if __name__ == '__main__':
api = RequestApi(appid="", secret_key="", upload_file_path=r"")
api.all_api_request()
這里需要本地文件才可以上傳芒填,識別并返回內(nèi)容呜叫。爬蟲直接就可以獲取二進制字符串,所以在這里對其進行了改裝殿衰,直接傳遞二進制流字符串和名字即可
# -*- coding: utf-8 -*-
import base64
import hashlib
import hmac
import json
import os
import time
import requests
lfasr_host = 'http://raasr.xfyun.cn/api'
# 請求的接口名
api_prepare = '/prepare'
api_upload = '/upload'
api_merge = '/merge'
api_get_progress = '/getProgress'
api_get_result = '/getResult'
# 文件分片大小10M
file_piece_sice = 10485760
# ——————————————————轉(zhuǎn)寫可配置參數(shù)————————————————
# 參數(shù)可在官網(wǎng)界面(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看朱庆,根據(jù)需求可自行在gene_params方法里添加修改
# 轉(zhuǎn)寫類型
lfasr_type = 0
# 是否開啟分詞
has_participle = 'false'
has_seperate = 'true'
# 多候選詞個數(shù)
max_alternatives = 0
# 子用戶標(biāo)識
suid = ''
class SliceIdGenerator:
"""slice id生成器"""
def __init__(self):
self.__ch = 'aaaaaaaaa`'
def getNextSliceId(self):
ch = self.__ch
j = len(ch) - 1
while j >= 0:
cj = ch[j]
if cj != 'z':
ch = ch[:j] + chr(ord(cj) + 1) + ch[j + 1:]
break
else:
ch = ch[:j] + 'a' + ch[j + 1:]
j = j - 1
self.__ch = ch
return self.__ch
class RequestApi(object):
def __init__(self, appid, secret_key, filename,upload_file):
self.appid = appid
self.secret_key = secret_key
self.upload_file = upload_file # 文件路徑,是否可改成content
self.filename = filename #文件名字
# 根據(jù)不同的apiname生成不同的參數(shù),本示例中未使用全部參數(shù)可在官網(wǎng)(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看后選擇適合業(yè)務(wù)場景的進行更換
def gene_params(self, apiname, taskid=None, slice_id=None):
appid = self.appid
secret_key = self.secret_key
upload_file_path = self.upload_file
ts = str(int(time.time()))
m2 = hashlib.md5()
m2.update((appid + ts).encode('utf-8'))
md5 = m2.hexdigest()
md5 = bytes(md5, encoding='utf-8')
# 以secret_key為key, 上面的md5為msg闷祥, 使用hashlib.sha1加密結(jié)果為signa
signa = hmac.new(secret_key.encode('utf-8'), md5, hashlib.sha1).digest()
signa = base64.b64encode(signa)
signa = str(signa, 'utf-8')
file_name = os.path.basename(upload_file_path) # 獲取名字
param_dict = {}
if apiname == api_prepare:
# slice_num是指分片數(shù)量娱颊,如果您使用的音頻都是較短音頻也可以不分片,直接將slice_num指定為1即可
slice_num = int(len(self.upload_file) / file_piece_sice) + (0 if (len(self.upload_file) % file_piece_sice == 0) else 1)
param_dict['app_id'] = appid
param_dict['signa'] = signa
param_dict['ts'] = ts
param_dict['file_len'] = str(len(self.upload_file))
param_dict['file_name'] = self.filename
param_dict['slice_num'] = str(slice_num)
elif apiname == api_upload:
param_dict['app_id'] = appid
param_dict['signa'] = signa
param_dict['ts'] = ts
param_dict['task_id'] = taskid
param_dict['slice_id'] = slice_id
elif apiname == api_merge:
param_dict['app_id'] = appid
param_dict['signa'] = signa
param_dict['ts'] = ts
param_dict['task_id'] = taskid
param_dict['file_name'] = file_name
elif apiname == api_get_progress or apiname == api_get_result:
param_dict['app_id'] = appid
param_dict['signa'] = signa
param_dict['ts'] = ts
param_dict['task_id'] = taskid
return param_dict
# 請求和結(jié)果解析凯砍,結(jié)果中各個字段的含義可參考:https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html
def gene_request(self, apiname, data, files=None, headers=None):
response = requests.post(lfasr_host + apiname, data=data, files=files, headers=headers)
result = json.loads(response.text)
if result["ok"] == 0:
print("{} success:".format(apiname) + str(result))
return result
else:
print("{} error:".format(apiname) + str(result))
exit(0)
return result
# 預(yù)處理
def prepare_request(self):
return self.gene_request(apiname=api_prepare,
data=self.gene_params(api_prepare))
# 上傳
def upload_request(self, taskid, upload_file):
index = 1
sig = SliceIdGenerator()
content = self.upload_file # 不限制大小
files = {
"filename": self.gene_params(api_upload).get("slice_id"),
"content": content
}
response = self.gene_request(api_upload,
data=self.gene_params(api_upload, taskid=taskid,
slice_id=sig.getNextSliceId()),
files=files)
if response.get('ok') != 0:
# 上傳分片失敗
print('upload slice fail, response: ' + str(response))
return False
print('upload slice ' + str(index) + ' success')
index += 1
# finally:
# 'file index:' + str(file_object.tell())
# file_object.close()
return True
# 合并
def merge_request(self, taskid):
return self.gene_request(api_merge, data=self.gene_params(api_merge, taskid=taskid))
# 獲取進度
def get_progress_request(self, taskid):
return self.gene_request(api_get_progress, data=self.gene_params(api_get_progress, taskid=taskid))
# 獲取結(jié)果
def get_result_request(self, taskid):
return self.gene_request(api_get_result, data=self.gene_params(api_get_result, taskid=taskid))
def all_api_request(self):
# 1. 預(yù)處理
pre_result = self.prepare_request()
taskid = pre_result["data"]
# 2 . 分片上傳
self.upload_request(taskid=taskid, upload_file=self.upload_file)
# 3 . 文件合并
self.merge_request(taskid=taskid)
# 4 . 獲取任務(wù)進度
while True:
# 每隔20秒獲取一次任務(wù)進度
progress = self.get_progress_request(taskid)
progress_dic = progress
if progress_dic['err_no'] != 0 and progress_dic['err_no'] != 26605:
print('task error: ' + progress_dic['failed'])
return
else:
data = progress_dic['data']
task_status = json.loads(data)
if task_status['status'] == 9:
print('task ' + taskid + ' finished')
break
print('The task ' + taskid + ' is in processing, task status: ' + str(data))
# 每次獲取進度間隔20S
time.sleep(20)
# 5 . 獲取結(jié)果
return self.get_result_request(taskid=taskid)
# 注意:如果出現(xiàn)requests模塊報錯:"NoneType" object has no attribute 'read', 請嘗試將requests模塊更新到2.20.0或以上版本(本demo測試版本為2.20.0)
# 輸入訊飛開放平臺的appid箱硕,secret_key和待轉(zhuǎn)寫的文件路徑
def get_content(filename,medium): # 傳入文件名字(帶后綴,二進制流字符串)
# dirname = r'D:\pycharmproject\myproject\spiderfm\spiderfm\20210310'
# files = os.listdir(dirname) # 獲取當(dāng)前文件夾下所有音頻文件名字
api = RequestApi(appid="控制臺獲取的appid", secret_key="控制臺獲取",filename=filename, upload_file=medium)
result = api.all_api_request()
content = ''
for data in json.loads(result['data']):
content += data['onebest']
# print(filename,content)
return content
# 9.12
res = '爬蟲獲取的音頻二進制流字符串'
print(get_content('demo.mp3',res))
改裝了以下夠自己用的就行悟衩。用人家平臺就是慢剧罩,沒辦法,自己又不會寫語音轉(zhuǎn)文字座泳。