[Python] 纯文本查看 复制代码
from http import HTTPStatus
import os
import wave
import requests
import time
import base64
from pyaudio import PyAudio, paInt16
from aip import AipSpeech
import pygame.mixer
import dashscope
# 采用在代码中直接设置API密钥的方式,通义api
dashscope.api_key = 'sk-b168bcc****b40faa7b********93c4c'
def call_with_messages(messages):
response = dashscope.Generation.call(
dashscope.Generation.Models.qwen_turbo,
messages=messages,
result_format='message', # 设置结果为 "message" 格式
)
if response.status_code == HTTPStatus.OK:
# 处理并打印返回的message内容
assistant_response = None
for message in response.output['choices']:
if 'content' in message:
print(f"小助手: {message['content']}")
assistant_response = message['content']
elif 'message' in message and 'content' in message['message']:
print(f"小助手:{message['message']['content']}")
assistant_response = message['message']['content']
else:
print(f"Warning: Message object is missing the 'content' key: {message}")
return assistant_response # 返回小助手的回答
else:
print(f'Request id: {response.request_id}, Status code: {response.status_code}, '
f'error code: {response.code}, error message: {response.message}')
return None
framerate = 16000 # 采样率
num_samples = 2000 # 采样点
channels = 1 # 声道
sampwidth = 2 # 采样宽度2bytes
FILEPATH = 'speech.wav'
APP_ID = '4***22'
API_KEY = "mjW9Q5t******qG5PQDp"
SECRET_KEY = "rWNfuHrL*******C1SwXsPNMA"
client = AipSpeech(APP_ID, API_KEY, SECRET_KEY)
### 百du语音识别api
base_url = "https://openapi.baidu.com/oauth/2.0/token?grant_type=client_credentials&client_id=%s&client_secret=%s"
APIKey = "mjW9Q5t******qG5PQDp"
SecretKey = "rWNfuHrL*******C1SwXsPNMA"
HOST = base_url % (APIKey, SecretKey)
def getToken(host):
res = requests.post(host)
return res.json()['access_token']
def save_wave_file(filepath, data):
wf = wave.open(filepath, 'wb')
wf.setnchannels(channels)
wf.setsampwidth(sampwidth)
wf.setframerate(framerate)
wf.writeframes(b''.join(data))
wf.close()
def my_record():
pa = PyAudio()
stream = pa.open(format=paInt16, channels=channels,
rate=framerate, input=True, frames_per_buffer=num_samples)
my_buf = []
t = time.time()
print('正在录音...')
while time.time() < t + 4: # 秒
string_audio_data = stream.read(num_samples)
my_buf.append(string_audio_data)
print('录音结束.')
save_wave_file(FILEPATH, my_buf)
stream.close()
def get_audio(file):
with open(file, 'rb') as f:
data = f.read()
return data
def speech2text(speech_data, token, dev_pid=1537):
FORMAT = 'wav'
RATE = '16000'
CHANNEL = 1
CUID = 'l1aQEcqYSH2BfZe7eCG84nv3qRnErsMA'
SPEECH = base64.b64encode(speech_data).decode('utf-8')
data = {
'format': FORMAT,
'rate': RATE,
'channel': CHANNEL,
'cuid': CUID,
'len': len(speech_data),
'speech': SPEECH,
'token': token,
'dev_pid': dev_pid
}
url = 'https://vop.baidu.com/server_api'
headers = {'Content-Type': 'application/json'}
print('正在识别...')
r = requests.post(url, json=data, headers=headers)
Result = r.json()
if 'result' in Result:
return Result['result'][0]
else:
return Result
def baidu_text_to_speech(text):
result = client.synthesis(text, 'zh', 1, {'vol': 5, 'per': 4})
if not isinstance(result, dict):
with open('output.mp3', 'wb') as f:
f.write(result)
audio_file_path = 'output.mp3'
else:
audio_file_path = None
print('Error:', result['error_msg'])
return audio_file_path
if __name__ == '__main__':
flag = 'y'
while flag.lower() == 'y':
print('请输入数字选择语言:')
devpid = input('1536:普通话(简单英文),1537:普通话(有标点),1737:英语,1637:粤语,1837:四川话\n')
my_record()
TOKEN = getToken(HOST)
speech = get_audio(FILEPATH)
result = speech2text(speech, TOKEN, int(devpid))
print(result)
# 将语音识别结果作为代码1的content
user_message = {'role': 'user', 'content': result}
# 调用代码1的call_with_messages函数
messages = [{'role': 'system', 'content': 'You are a helpful assistant.'}, user_message]
if isinstance(result, str):
user_message = {'role': 'user', 'content': result}
else:
print("识别结果不是字符串类型,无法进行后续处理")
continue
assistant_answer = call_with_messages(messages) # 获取小助手的回答
if assistant_answer is not None:
# 对小助手的回答进行语音播报
audio_file_path = baidu_text_to_speech(assistant_answer)
if audio_file_path is not None and os.path.isfile(audio_file_path):
pygame.mixer.init()
pygame.mixer.music.load(audio_file_path)
pygame.mixer.music.play()
while pygame.mixer.music.get_busy(): # 等待播放完成
time.sleep(0.1) # 可适当调整睡眠时间
pygame.mixer.quit()
os.remove(audio_file_path) # 在播放结束后立即删除临时文件
else:
print("未能成功合成小助手语音,请查看日志中的错误信息")
flag = input('Continue?(y/n):')