<返回更多

如何将智能音箱接入ChatGPT

2023-03-17  今日头条  羞羞的奥特曼
加入收藏

本文介绍一下某米音箱如何接入目前大火的ChatGPT ,本文的主要内容是整理Github上yihong0618作者的教程而来,经过归纳整理,并做了小幅的改动。这里对作者表示感谢,希望大家都能成功的改造自己的音箱。

某米音箱是一个开发的平台,支持个人和企业开发者定制音频产品和定制化操作,在官网可以查看具体的教程。

下面开始详细介绍如何接入chatgpt

查看你的账户ID

登陆某米官网,登陆你的账号后在菜单栏会显示你的账号ID,这个ID也可以在App的个人信息里找到,记录下你的账户名和密码。

获取你的账户ID

获取你的机器型号

在你的智能音箱的底部的标签上找到你的机器型号,我的机器是Pro 2代,在小米的底部标签上可以找到如下内容

产品名称:智能音箱
型号: L15A

记录下你的型号我这里是L15A

安装Python/ target=_blank class=infotextkey>Python库miservice

建议不要直接使用pip3 install miservice这种形式安装,这种安装方式的版本低一些,少几个查询智能音箱当前状态的函数。

在Github上搜索MiService库,作者是yihong0608,将代码clone下来,使用如下方式安装

sudo pip3 install .

库中介绍了一些查询你的机器信息和使用智能音箱文字转音频的命令,需要的同学,也可以体验一下,取决你的智能音箱型号,可能很多命令是无效的,建议根据这个库的介绍找到你的音箱对应的文字转音频的命令,上面介绍过我的型号是L15A,所以我根据教程输入

python3 micli.py spec xiaomi.wifispeaker.l15a

获取我的音箱命令

我的音箱命令

从音箱命令中找到Intelligent_Speacker后的数字我的是7,找到_Play_Text后的数字我的是3

这样组成了下面代码config.py中的"L15A": "7-3"。所以注意这里一定要找到你的命令代码。

完整代码

#!/usr/bin/env python3
import asyncio
import json
import re
import subprocess
import time
from pathlib import Path
from aiohttp import ClientSession
from miservice import MiAccount, MiNAService
from wxtgptbot import  WXTChatBot 
from config import (
    COOKIE_TEMPLATE,
    HARDWARE_COMMAND_DICT,
    KEY_word,
    LATEST_ASK_API,
    MI_ASK_SIMULATE_DATA,
    PROMPT,
)
from utils import calculate_tts_elapse, parse_cookie_string

class MiGPT:
    def __init__(
        self,
        hardware,
        mi_user,
        mi_pass,
        openai_key,
        cookie="",
        use_command=False,
        mute_xiaoai=False,
        use_gpt3=False,
        use_chatgpt_api=False,
        api_base=None,
        verbose=False,
    ):
        # TODO !!!! refactor so many of this shit
        self.mi_token_home = Path.home() / ".mi.token"
        self.hardware = hardware
        self.mi_user = mi_user
        self.mi_pass = mi_pass
        self.openai_key = openai_key
        self.cookie_string = ""
        self.last_timestamp = int(time.time()*1000)  # timestamp last call mi speaker
        self.session = None
        self.chatbot = None  # a little slow to init we move it after xiaomi init
        self.user_id = ""
        self.device_id = ""
        self.service_token = ""
        self.cookie = cookie
        self.use_command = use_command
        self.tts_command = HARDWARE_COMMAND_DICT.get(hardware, "7-3")
        self.conversation_id = None
        self.parent_id = None
        self.miboy_account = None
        self.mina_service = None
        # try to mute xiaoai config
        self.mute_xiaoai = mute_xiaoai
        # mute xiaomi in runtime
        self.this_mute_xiaoai = mute_xiaoai
        # if use gpt3 api
        self.use_gpt3 = use_gpt3
        self.use_chatgpt_api = use_chatgpt_api
        self.api_base = api_base
        self.verbose = verbose
        # this attr can be re set value in cli
        self.key_word = KEY_WORD
        self.prompt = PROMPT

    async def init_all_data(self, session):
        await self.login_miboy(session)
        await self._init_data_hardware()
        with open(self.mi_token_home) as f:
            user_data = json.loads(f.read())
        self.user_id = user_data.get("userId")
        self.service_token = user_data.get("micoapi")[1]
        self._init_cookie()
        #await self._init_first_data_and_chatbot()

    async def login_miboy(self, session):
        self.session = session
        self.account = MiAccount(
            session,
            self.mi_user,
            self.mi_pass,
            str(self.mi_token_home),
        )
        # Forced login to refresh to refresh token
        await self.account.login("micoapi")
        self.mina_service = MiNAService(self.account)

    async def _init_data_hardware(self):
        if self.cookie:
            # if use cookie do not need init
            return
        hardware_data = await self.mina_service.device_list()
        for h in hardware_data:
            if h.get("hardware", "") == self.hardware:
                self.device_id = h.get("deviceID")
                print("设备id:",self.device_id)
                break
        else:
            raise Exception(f"we have no hardware: {self.hardware} please check")
    '''初始化cookie,调用小米api时需要'''
    def _init_cookie(self):
        if self.cookie:
            self.cookie = parse_cookie_string(self.cookie)
        else:
            self.cookie_string = COOKIE_TEMPLATE.format(
                device_id=self.device_id,
                service_token=self.service_token,
                user_id=self.user_id,
            )
            self.cookie = parse_cookie_string(self.cookie_string)

    #获取小米音箱的最后一次的回答
    async def get_latest_ask_from_xiaoai(self):
        r = await self.session.get(
            LATEST_ASK_API.format(
                hardware=self.hardware, timestamp=str(int(time.time() * 1000))
            ),
            cookies=parse_cookie_string(self.cookie),
        )
        return await r.json()

    def get_last_timestamp_and_record(self, data):
        if "data" in data:
            d= data.get("data")
            records = json.loads(d).get("records")
            if not records:
                return 0, None
            last_record = records[0]
            timestamp = last_record.get("time")
            return timestamp, last_record
        else:
             return 0, None
     
    async def do_tts(self, value, wait_for_finish=False):
        if not self.use_command:
            try:
                await self.mina_service.text_to_speech(self.device_id, value)
            except:
                # do nothing is ok
                pass
        else:
            #使用micli工具
            subprocess.check_output(["micli", self.tts_command, value])
        if wait_for_finish:
            elapse = calculate_tts_elapse(value)
            await asyncio.sleep(elapse)
            print("elapse:",elapse)
            while True:
                if not await self.get_if_xiaoai_is_playing():
                    break
                await asyncio.sleep(2)
            await asyncio.sleep(2)
            print("回答完毕")

    
    #小米是否正在播报
    async def get_if_xiaoai_is_playing(self):
        #此函数没有被找到
        playing_info = await self.mina_service.player_get_status(self.device_id)
        # WTF xiaomi api
        is_playing = (
            json.loads(playing_info.get("data", {}).get("info", "{}")).get("status", -1)
            == 1
        )
        return is_playing

    async def run_forever(self):
        print(f"Running xiaogpt now, 用`{'/'.join(KEY_WORD)}`开头来提问")
        async with ClientSession() as session:
            await self.init_all_data(session)
            print("开始循环")
            while 1:
                if self.verbose:
                    print(
                        f"Now listening xiaoai new message timestamp: {self.last_timestamp}"
                    )
                try:
                    r = await self.get_latest_ask_from_xiaoai()
                except Exception:
                    # we try to init all again
                    await self.init_all_dat(session)
                    r = await self.get_latest_ask_from_xiaoai()
                # spider rule
                if not self.mute_xiaoai:
                    await asyncio.sleep(1)
                else:
                    await asyncio.sleep(0.3)
                
                    
                new_timestamp, last_record = self.get_last_timestamp_and_record(r)
                print(new_timestamp, last_record)
                if new_timestamp > self.last_timestamp:
                    self.last_timestamp = new_timestamp
                    query = last_record.get("query", "")
                    if query.startswith(tuple(self.key_word)):
                        # only mute when your clause start's with the keyword
                        self.this_mute_xiaoai = False
                        # drop 帮我回答
                        query = re.sub(rf"^({'|'.join(self.key_word)})", "", query)

                        print("-" * 20)
                        print("问题:" + query + "?")

                        query = f"{query},{PROMPT}"
                        # waiting for xiaoai speaker done
                        if not self.mute_xiaoai:
                            await asyncio.sleep(2)
                        for i in range(8):
                            if not await self.get_if_xiaoai_is_playing():
                                print("小米结束回答")
                                break
                            else:
                                print("小米正在回答")
                                await asyncio.sleep(2)
                        
                        await self.do_tts("正在问GPT请耐心等待")
                        await asyncio.sleep(0.5)
                        try:
                            print(
                                "以下是小爱的回答: ",
                                last_record.get("answers")[0]
                                .get("tts", {})
                                .get("text"),
                            )
                        except:
                            print("小爱没回")
                        # message = await self.ask_gpt(query)
                        message="以下是GPT的回答 "
                        if "清除消息" in query:
                            message="GPT 清除历史消息"
                            WXTChatBot.clear()
                        else:
                            message+=WXTChatBot.ask({"msg":query})
                      
                        # tts to xiaoai with ChatGPT answer
                        print("以下是GPT的回答: " + message)
                        await self.do_tts(message, wait_for_finish=True)
                        if self.mute_xiaoai:
                            self.this_mute_xiaoai = True
                else:
                    if self.verbose:
                        print("No new xiao ai record")                        
if __name__=="__main__":
    app=MiGPT("型号","你的ID","你的密码","")
    asyncio.run(app.run_forever())

这个代码中需要非常注意的代码时

message+=WXTChatBot.ask({"msg":query})

WXTChatBot 这个模块是我封装的访问chatgpt的代码,请按照下面的介绍封装一个你的模块。

辅助模块utils

import re
from http.cookies import SimpleCookie
from requests.utils import cookiejar_from_dict
def parse_cookie_string(cookie_string):
    cookie = SimpleCookie()
    cookie.load(cookie_string)
    cookies_dict = {}
    cookiejar = None
    for k, m in cookie.items():
        cookies_dict[k] = m.value
        cookiejar = cookiejar_from_dict(cookies_dict, cookiejar=None, overwrite=True)
    return cookiejar
_no_elapse_chars = re.compile(r"([「」『』《》“”'"()()]|(?<!-)-(?!-))", re.UNICODE)
def calculate_tts_elapse(text):
    # for simplicity, we use a fixed speed
    speed = 4.25  # this value is picked by trial and error
    # Exclude quotes and brackets that do not affect the total elapsed time
    return len(_no_elapse_chars.sub("", text)) / speed

模块config

LATEST_ASK_API = "https://userprofile.mina.mi.com/device_profile/v2/conversation?source=dialogu&hardware={hardware}×tamp={timestamp}&limit=2"
COOKIE_TEMPLATE = "deviceId={device_id}; serviceToken={service_token}; userId={user_id}"
HARDWARE_COMMAND_DICT = {
    "LX06": "5-1",
    "L05B": "5-3",
    "S12A": "5-1",
    "LX01": "5-1",
    "L06A": "5-1",
    "LX04": "5-1",
    "L05C": "5-3",
    "L17A": "7-3",
    "X08E": "7-3",
    "LX05A": "5-1",  
    "LX5A": "5-1",
    "L15A": "7-3",  
    # add more here
}
MI_USER = ""
MI_PASS = ""
OPENAI_API_KEY = ""
KEY_WORD = ["帮我", "请回答"]

这里需要注意的是,在这个模块中我添加了一个"L15A": "7-3" ,其中L15A 是我的音箱型号,"7-3" 是miservice库需要使用的文字转音频tts的命令,在我上面的介绍中介绍了你如何获取到这个命令码。

代码的主要逻辑是:

1 init_all_data 登陆智能音箱API,根据你的ID,密码获取返回的token

2 根据你的硬件型号找到你的机器

_init_data_hardware  这个函数用来根据我的型号L15A找到了我的智能音箱

3 使用do_tts函数使用智能音箱的文本转语音

4 使用get_if_xiaoai_is_playing这个函数判断小米音箱自己的回答是否结束了

5 在config.py中定义了gpt问话的头,这里定义为”请回答“,所以当你唤醒音箱后,使用”请回答“+你的问题,会触发ChatGPT的回答,其他的像播放音乐啊不会触发GPT回答

入口代码

if __name__=="__main__":
    app=MiGPT("型号","你的ID","你的密码","")
    asyncio.run(app.run_forever())

入口代码传入你的型号,你的ID,你的密码即可

ChatGPT API

使用如下的代码作为你访问chapgpt的函数,函数中你需要使用你自己的openai_key,openai_key是在openai官网获得的,每个用户有18美元的免费额度。

import openai
from xiaogpt.bot.base_bot import BaseBot
class ChatGPTBot(BaseBot):
    def __init__(self, session, openai_key, api_base=None):
        self.session = session
        self.history = []
        self.api_base = api_base
        self.openai_key = openai_key

    async def ask(self, query):
        openai.api_key = self.openai_key
        if self.api_base:
            openai.api_base = self.api_base
        ms = []
        for h in self.history:
            ms.append({"role": "user", "content": h[0]})
            ms.append({"role": "assistant", "content": h[1]})
        ms.append({"role": "user", "content": f"{query}"})
        completion = openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=ms)
        message = (
            completion["choices"][0]
            .get("message")
            .get("content")
            .encode("utf8")
            .decode()
        )
        self.history.append([f"{query}", message])
        return message

如果无法访问到ChatGPT的服务器

这时你需要要么有一台自己的外网服务器,在这个服务器上搭建一个你的服务,来访问chatgpt的api,就像我做的这样,我定义的WXTChatBot模块,实际上是用post http请求了我的云端服务器的rest api,云端服务器将请求转发到chatgpt获取回答。

如果你没有自己的外网服务器的话,目前国内有很多用户开放了非常多的免费体验的套壳API,你可以使用他们的api封装一下免费使用。我体验过使用了多个免费api效果很不错,可以在头条里搜索几个。

总结:

借助于智能音箱的开发性,我们一起diy试试吧

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>