本文介绍一下某米音箱如何接入目前大火的ChatGPT ,本文的主要内容是整理Github上yihong0618作者的教程而来,经过归纳整理,并做了小幅的改动。这里对作者表示感谢,希望大家都能成功的改造自己的音箱。
某米音箱是一个开发的平台,支持个人和企业开发者定制音频产品和定制化操作,在官网可以查看具体的教程。
下面开始详细介绍如何接入chatgpt
查看你的账户ID
登陆某米官网,登陆你的账号后在菜单栏会显示你的账号ID,这个ID也可以在App的个人信息里找到,记录下你的账户名和密码。
获取你的账户ID
获取你的机器型号
在你的智能音箱的底部的标签上找到你的机器型号,我的机器是Pro 2代,在小米的底部标签上可以找到如下内容
产品名称:智能音箱
型号: L15A
记录下你的型号我这里是L15A
安装Python/ target=_blank class=infotextkey>Python库miservice
建议不要直接使用pip3 install miservice这种形式安装,这种安装方式的版本低一些,少几个查询智能音箱当前状态的函数。
在Github上搜索MiService库,作者是yihong0608,将代码clone下来,使用如下方式安装
sudo pip3 install .
库中介绍了一些查询你的机器信息和使用智能音箱文字转音频的命令,需要的同学,也可以体验一下,取决你的智能音箱型号,可能很多命令是无效的,建议根据这个库的介绍找到你的音箱对应的文字转音频的命令,上面介绍过我的型号是L15A,所以我根据教程输入
python3 micli.py spec xiaomi.wifispeaker.l15a
获取我的音箱命令
我的音箱命令
从音箱命令中找到Intelligent_Speacker后的数字我的是7,找到_Play_Text后的数字我的是3
这样组成了下面代码config.py中的"L15A": "7-3"。所以注意这里一定要找到你的命令代码。
完整代码
#!/usr/bin/env python3
import asyncio
import json
import re
import subprocess
import time
from pathlib import Path
from aiohttp import ClientSession
from miservice import MiAccount, MiNAService
from wxtgptbot import WXTChatBot
from config import (
COOKIE_TEMPLATE,
HARDWARE_COMMAND_DICT,
KEY_word,
LATEST_ASK_API,
MI_ASK_SIMULATE_DATA,
PROMPT,
)
from utils import calculate_tts_elapse, parse_cookie_string
class MiGPT:
def __init__(
self,
hardware,
mi_user,
mi_pass,
openai_key,
cookie="",
use_command=False,
mute_xiaoai=False,
use_gpt3=False,
use_chatgpt_api=False,
api_base=None,
verbose=False,
):
# TODO !!!! refactor so many of this shit
self.mi_token_home = Path.home() / ".mi.token"
self.hardware = hardware
self.mi_user = mi_user
self.mi_pass = mi_pass
self.openai_key = openai_key
self.cookie_string = ""
self.last_timestamp = int(time.time()*1000) # timestamp last call mi speaker
self.session = None
self.chatbot = None # a little slow to init we move it after xiaomi init
self.user_id = ""
self.device_id = ""
self.service_token = ""
self.cookie = cookie
self.use_command = use_command
self.tts_command = HARDWARE_COMMAND_DICT.get(hardware, "7-3")
self.conversation_id = None
self.parent_id = None
self.miboy_account = None
self.mina_service = None
# try to mute xiaoai config
self.mute_xiaoai = mute_xiaoai
# mute xiaomi in runtime
self.this_mute_xiaoai = mute_xiaoai
# if use gpt3 api
self.use_gpt3 = use_gpt3
self.use_chatgpt_api = use_chatgpt_api
self.api_base = api_base
self.verbose = verbose
# this attr can be re set value in cli
self.key_word = KEY_WORD
self.prompt = PROMPT
async def init_all_data(self, session):
await self.login_miboy(session)
await self._init_data_hardware()
with open(self.mi_token_home) as f:
user_data = json.loads(f.read())
self.user_id = user_data.get("userId")
self.service_token = user_data.get("micoapi")[1]
self._init_cookie()
#await self._init_first_data_and_chatbot()
async def login_miboy(self, session):
self.session = session
self.account = MiAccount(
session,
self.mi_user,
self.mi_pass,
str(self.mi_token_home),
)
# Forced login to refresh to refresh token
await self.account.login("micoapi")
self.mina_service = MiNAService(self.account)
async def _init_data_hardware(self):
if self.cookie:
# if use cookie do not need init
return
hardware_data = await self.mina_service.device_list()
for h in hardware_data:
if h.get("hardware", "") == self.hardware:
self.device_id = h.get("deviceID")
print("设备id:",self.device_id)
break
else:
raise Exception(f"we have no hardware: {self.hardware} please check")
'''初始化cookie,调用小米api时需要'''
def _init_cookie(self):
if self.cookie:
self.cookie = parse_cookie_string(self.cookie)
else:
self.cookie_string = COOKIE_TEMPLATE.format(
device_id=self.device_id,
service_token=self.service_token,
user_id=self.user_id,
)
self.cookie = parse_cookie_string(self.cookie_string)
#获取小米音箱的最后一次的回答
async def get_latest_ask_from_xiaoai(self):
r = await self.session.get(
LATEST_ASK_API.format(
hardware=self.hardware, timestamp=str(int(time.time() * 1000))
),
cookies=parse_cookie_string(self.cookie),
)
return await r.json()
def get_last_timestamp_and_record(self, data):
if "data" in data:
d= data.get("data")
records = json.loads(d).get("records")
if not records:
return 0, None
last_record = records[0]
timestamp = last_record.get("time")
return timestamp, last_record
else:
return 0, None
async def do_tts(self, value, wait_for_finish=False):
if not self.use_command:
try:
await self.mina_service.text_to_speech(self.device_id, value)
except:
# do nothing is ok
pass
else:
#使用micli工具
subprocess.check_output(["micli", self.tts_command, value])
if wait_for_finish:
elapse = calculate_tts_elapse(value)
await asyncio.sleep(elapse)
print("elapse:",elapse)
while True:
if not await self.get_if_xiaoai_is_playing():
break
await asyncio.sleep(2)
await asyncio.sleep(2)
print("回答完毕")
#小米是否正在播报
async def get_if_xiaoai_is_playing(self):
#此函数没有被找到
playing_info = await self.mina_service.player_get_status(self.device_id)
# WTF xiaomi api
is_playing = (
json.loads(playing_info.get("data", {}).get("info", "{}")).get("status", -1)
== 1
)
return is_playing
async def run_forever(self):
print(f"Running xiaogpt now, 用`{'/'.join(KEY_WORD)}`开头来提问")
async with ClientSession() as session:
await self.init_all_data(session)
print("开始循环")
while 1:
if self.verbose:
print(
f"Now listening xiaoai new message timestamp: {self.last_timestamp}"
)
try:
r = await self.get_latest_ask_from_xiaoai()
except Exception:
# we try to init all again
await self.init_all_dat(session)
r = await self.get_latest_ask_from_xiaoai()
# spider rule
if not self.mute_xiaoai:
await asyncio.sleep(1)
else:
await asyncio.sleep(0.3)
new_timestamp, last_record = self.get_last_timestamp_and_record(r)
print(new_timestamp, last_record)
if new_timestamp > self.last_timestamp:
self.last_timestamp = new_timestamp
query = last_record.get("query", "")
if query.startswith(tuple(self.key_word)):
# only mute when your clause start's with the keyword
self.this_mute_xiaoai = False
# drop 帮我回答
query = re.sub(rf"^({'|'.join(self.key_word)})", "", query)
print("-" * 20)
print("问题:" + query + "?")
query = f"{query},{PROMPT}"
# waiting for xiaoai speaker done
if not self.mute_xiaoai:
await asyncio.sleep(2)
for i in range(8):
if not await self.get_if_xiaoai_is_playing():
print("小米结束回答")
break
else:
print("小米正在回答")
await asyncio.sleep(2)
await self.do_tts("正在问GPT请耐心等待")
await asyncio.sleep(0.5)
try:
print(
"以下是小爱的回答: ",
last_record.get("answers")[0]
.get("tts", {})
.get("text"),
)
except:
print("小爱没回")
# message = await self.ask_gpt(query)
message="以下是GPT的回答 "
if "清除消息" in query:
message="GPT 清除历史消息"
WXTChatBot.clear()
else:
message+=WXTChatBot.ask({"msg":query})
# tts to xiaoai with ChatGPT answer
print("以下是GPT的回答: " + message)
await self.do_tts(message, wait_for_finish=True)
if self.mute_xiaoai:
self.this_mute_xiaoai = True
else:
if self.verbose:
print("No new xiao ai record")
if __name__=="__main__":
app=MiGPT("型号","你的ID","你的密码","")
asyncio.run(app.run_forever())
这个代码中需要非常注意的代码时
message+=WXTChatBot.ask({"msg":query})
WXTChatBot 这个模块是我封装的访问chatgpt的代码,请按照下面的介绍封装一个你的模块。
辅助模块utils
import re
from http.cookies import SimpleCookie
from requests.utils import cookiejar_from_dict
def parse_cookie_string(cookie_string):
cookie = SimpleCookie()
cookie.load(cookie_string)
cookies_dict = {}
cookiejar = None
for k, m in cookie.items():
cookies_dict[k] = m.value
cookiejar = cookiejar_from_dict(cookies_dict, cookiejar=None, overwrite=True)
return cookiejar
_no_elapse_chars = re.compile(r"([「」『』《》“”'"()()]|(?<!-)-(?!-))", re.UNICODE)
def calculate_tts_elapse(text):
# for simplicity, we use a fixed speed
speed = 4.25 # this value is picked by trial and error
# Exclude quotes and brackets that do not affect the total elapsed time
return len(_no_elapse_chars.sub("", text)) / speed
模块config
LATEST_ASK_API = "https://userprofile.mina.mi.com/device_profile/v2/conversation?source=dialogu&hardware={hardware}×tamp={timestamp}&limit=2"
COOKIE_TEMPLATE = "deviceId={device_id}; serviceToken={service_token}; userId={user_id}"
HARDWARE_COMMAND_DICT = {
"LX06": "5-1",
"L05B": "5-3",
"S12A": "5-1",
"LX01": "5-1",
"L06A": "5-1",
"LX04": "5-1",
"L05C": "5-3",
"L17A": "7-3",
"X08E": "7-3",
"LX05A": "5-1",
"LX5A": "5-1",
"L15A": "7-3",
# add more here
}
MI_USER = ""
MI_PASS = ""
OPENAI_API_KEY = ""
KEY_WORD = ["帮我", "请回答"]
这里需要注意的是,在这个模块中我添加了一个"L15A": "7-3" ,其中L15A 是我的音箱型号,"7-3" 是miservice库需要使用的文字转音频tts的命令,在我上面的介绍中介绍了你如何获取到这个命令码。
代码的主要逻辑是:
1 init_all_data 登陆智能音箱API,根据你的ID,密码获取返回的token
2 根据你的硬件型号找到你的机器
_init_data_hardware 这个函数用来根据我的型号L15A找到了我的智能音箱
3 使用do_tts函数使用智能音箱的文本转语音
4 使用get_if_xiaoai_is_playing这个函数判断小米音箱自己的回答是否结束了
5 在config.py中定义了gpt问话的头,这里定义为”请回答“,所以当你唤醒音箱后,使用”请回答“+你的问题,会触发ChatGPT的回答,其他的像播放音乐啊不会触发GPT回答
入口代码
if __name__=="__main__":
app=MiGPT("型号","你的ID","你的密码","")
asyncio.run(app.run_forever())
入口代码传入你的型号,你的ID,你的密码即可
ChatGPT API
使用如下的代码作为你访问chapgpt的函数,函数中你需要使用你自己的openai_key,openai_key是在openai官网获得的,每个用户有18美元的免费额度。
import openai
from xiaogpt.bot.base_bot import BaseBot
class ChatGPTBot(BaseBot):
def __init__(self, session, openai_key, api_base=None):
self.session = session
self.history = []
self.api_base = api_base
self.openai_key = openai_key
async def ask(self, query):
openai.api_key = self.openai_key
if self.api_base:
openai.api_base = self.api_base
ms = []
for h in self.history:
ms.append({"role": "user", "content": h[0]})
ms.append({"role": "assistant", "content": h[1]})
ms.append({"role": "user", "content": f"{query}"})
completion = openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=ms)
message = (
completion["choices"][0]
.get("message")
.get("content")
.encode("utf8")
.decode()
)
self.history.append([f"{query}", message])
return message
如果无法访问到ChatGPT的服务器
这时你需要要么有一台自己的外网服务器,在这个服务器上搭建一个你的服务,来访问chatgpt的api,就像我做的这样,我定义的WXTChatBot模块,实际上是用post http请求了我的云端服务器的rest api,云端服务器将请求转发到chatgpt获取回答。
如果你没有自己的外网服务器的话,目前国内有很多用户开放了非常多的免费体验的套壳API,你可以使用他们的api封装一下免费使用。我体验过使用了多个免费api效果很不错,可以在头条里搜索几个。
总结:
借助于智能音箱的开发性,我们一起diy试试吧