在Yandex.Music机器人客户端的幕后

介绍


哈Ha!再次,我涉及影响Yandex.Music API的第二篇文章。该案已计划并在第一篇文章中提到

伸手,完成。我认为,今天我将谈论我的Telegram机器人代码库中存在的有趣时刻,这些时刻将自己定位为成熟的音乐客户端。我们还将触摸Yandex音乐识别API。

在逐一介绍特定事物的实现之前,有必要对机器人本身及其功能进行一些了解。

客户端视频演示


在主要部分中,我将讨论以下内容:

  1. 通过GitHub Pages上的站点登录您的帐户(为什么和为什么)。
  2. 数据格式,其包装以及在按钮数据中使用。
  3. 更新路由,数据版本控制,将上下文放入处理程序中。
  4. 服务:
    • Telegram中的服务重装轨道。
    • “订阅”服务接收带有发送下载状态的曲目。
  5. 最简单,最优雅的查询缓存实现。
  6. 通过语音消息识别轨道以及它在机器人中的一般显示方式。
  7. 小笔记。

如果您对至少一点感兴趣-欢迎加入。
该漫游器与已登录其帐户的用户一起使用,但没有。一切都围绕四个主要服务类别:专辑,艺术家,播放列表和曲目。支持所有实体数据,页面选择分页。要与他们互动,授权用户可以使用他们的个人菜单,其中包括:智能播放列表,“我喜欢”播放列表以及他们自己的播放列表。对于未经授权的用户,搜索可用-将搜索查询作为常规消息发送。现在只有一堆干货了:通过直接链接接收曲目和其他对象,接收歌词,喜欢/不喜欢曲目和艺术家的能力,查看相似的曲目,通过语音消息识别曲目等等。

主要部分


1.登录到帐户


最初,该机器人没有计划在未经授权的情况下工作,因此,即使在设计过程中,也要通过其网站来决定如何进行授权。主要论点是安全性和透明度。在任何情况下都绝不想接受明文消息中的用户登录名和密码。并且来自用户计算机的授权是基本的。因此,在React上写了一个授权站点。这是一种授权形式,具有重定向回机器人的重定向。用Python从库中提取了一个带有验证码授权和处理的片段,并用JavaScript重写了

为了进行授权,将使用接收到的OAuth令牌,该令牌会通过深层链接传输回机器人

重定向的最终链接如下所示:t.me/music_yandex_bot?start={oauth_token}

我想要最好的,但是用户没有信任。只有十分之一的用户得到了授权(当时是强制性的)。因此,在进行以下更新时,我不得不解释为什么您应该信任。有关HTTPS的所有项目(仅有关客户端请求和开放源代码的项目)已在网站上发布以进行授权,并在bot的欢迎消息中重复出现。好多了。

另外,在授权率较低的因素中,事实证明,俄罗斯t.me的镜像无法访问,并且仅支持订阅用户(在注释的第7段中对此进行了详细介绍)。

第一项使用URItg://进行了解析,并且在紧要关头,已经存在指向镜像的链接(如果自动重定向不起作用并且按钮没有帮助),第二项再次位于项7中。


2.数据格式


对于这个机器人,我第一次决定使用NoSQL DB- MongoDB。理解:嵌入文档时,不嵌入时。喜欢与mongoengine一起工作。但是我真的不想在家里存储所有按钮数据,并且客户端在数据库中只有记录ID。我对数据量感到害怕,但是我想购买一台免费的Mongo服务器,其数据存储限制为512 MB。如果数据在多个按钮中匹配,要想很好地重用记录并清除过时的记录,比将所有内容存储在按钮本身中要困难得多。在分析了要存储的数据的大小之后,我得出结论认为它很容易适应。

起初,我只是使用JSON,但是当他遇到极限时,他很快就拒绝了。在Telegram中,按钮数据的内容在UTF-8中最多不能超过64个字节

因此,在朋友的提示下,我开始struct模块查看pack。因此,查询的类型就诞生了,原始格式,打包和拆包。现在,它在机器人中的绝对位置得到了广泛使用。 格式很简单。第一个字节是类型,第二个字节是版本。其他所有内容都是特定类型的数据。类型存储为Enum,具有ID,这是第一个字节。除了ID之外,每种类型还具有打包和拆包数据的格式。例如:键入SHOW_TRACK_MENU

其格式的值为“ s?”,其中“ s”是轨道的唯一标识符,而“?” -曲目中是否有文字。

在使用的字符串类型的轨道,因为:第一,ID轨道的可以是一个并置ID和专辑的ID轨道通过结肠,其次,它可以是所述UUID。具有UUID的轨道-自加载轨道,仅对下载它们的用户可用。

例如,由于数据并不总是与格式相对应,因此相同的轨道ID可以简单地由数字表示,在打包之前必须将其强制转换为格式的类型。在这种情况下,s因此,在该类中,有一个方法可以对传输的数据进行规范化以进行打包,以免在传递给构造函数时自己做。

字符串是自给自足的,可以在包装时指示其长度,而在拆箱时可以考虑此长度。

没有计划支持较早版本,因此如果版本不匹配,则会引发异常。在处理更新时(将在下一段中讨论),将调用必要的逻辑。

由于Telegram仅使用UTF-8,因此打包的数据在base85中进行编码是的,我在这里失去了速度,并且在不使用base64的情况下节省了最小的空间,但是鉴于数据量较小,我认为使用base85是合适的。

资源。文件callback_data.py
import struct
import base64

from ext.query_types import QueryType


class BadDataVersion(Exception):
    pass


class CallbackData:
    ACTUAL_VERSION = 7
    BASE_FORMAT = '<BB'

    def __init__(self, type_: QueryType, data=None, version=None):
        self.type = type_
        self.version = version or CallbackData.ACTUAL_VERSION
        self.data = data

        if self.data is not None and not isinstance(self.data, list):
            self.data = [self.data]

        if self.data is not None:
            self.data = self._normalize_data_to_format(self.data)

    def __repr__(self):
        return f'<CallbackData> Type: {self.type} Version: {self.version} Data: {self.data}'

    def _normalize_data_to_format(self, data, bytes_object=False):
        normalized_data = data.copy()
        for i, c in enumerate(self.type.format):
            cast = str
            if c.lower() in 'bhilqn':
                cast = int
            elif c in 'efd':
                cast = float
            elif c == '?':
                cast = bool

            casted = cast(data[i])
            if bytes_object and cast == str:
                casted = casted.encode('utf-8')
            normalized_data[i] = casted

        return normalized_data

    @staticmethod
    def decode_type(callback_data):
        decoded = base64.b85decode(callback_data.encode('utf-8'))
        type_, version = struct.unpack(CallbackData.BASE_FORMAT, decoded[:2])

        if CallbackData.ACTUAL_VERSION != version:
            raise BadDataVersion()

        return QueryType(type_), version, decoded

    @classmethod
    def decode(cls, type_, version, decoded):
        start, data = 2, []

        if start < len(decoded):
            format_iter = iter(type_.format)

            while True:
                if start >= len(decoded):
                    break

                format_ = next(format_iter, type_.format[-1])

                decode_str = format_ in 'ps'
                if decode_str:
                    # struct.calcsize('b') = 1
                    length = list(struct.unpack('b', decoded[start: start + 1]))[0]
                    start += 1

                    format_ = f'{length}{format_}'

                step = struct.calcsize(format_)

                unpacked = list(struct.unpack(f'{format_}', decoded[start: start + step]))
                if decode_str:
                    unpacked[0] = unpacked[0].decode('UTF-8')

                data += unpacked
                start += step

        return cls(type_, data if data else None, version)

    def encode(self):
        encode = struct.pack(self.BASE_FORMAT, self.type.value, self.version)

        if self.data is not None:
            format_iter = iter(self.type.format)
            normalized_data = self._normalize_data_to_format(self.data, bytes_object=True)

            for data in normalized_data:
                format_ = next(format_iter, self.type.format[-1])

                if format_ in 'ps':
                    #        .  'b'  
                    # -      ,    > 36 
                    encode += struct.pack('b', len(data))
                    encode += struct.pack(f'{len(data)}{format_}', data)
                else:
                    encode += struct.pack(f'{format_}', data)

        return base64.b85encode(encode).decode('utf-8')



3.路由更新和上下文


该项目使用python-telegram-botTelegram Bot API配合使用它已经有一个系统来注册处理程序,以处理已到达的某些类型的更新,用于正则表达式,命令的过滤器等。但是,鉴于我自己的数据格式和类型,我必须继承自TelegramHandler并实现我的Handler

更新和上下文通过参数传递给每个处理程序。在这种情况下,我有自己的上下文,它在Handler'e中它正在形成,这是:接收用户和/或将用户添加到数据库,检查令牌的相关性以获取对音乐的访问权限,并根据授权状态和订阅的可用性来初始化Yandex.Music客户端。

在我的Handler'a之外,还有更多特定的处理程序,例如CallbackQueryHandler。有了它,就为某个特定类型的更新(我的类型,具有数据格式)注册了处理程序。为了检查此更新是否适合当前的处理程序,不是解压缩所有数据,而是仅解压缩前两个字节。在此阶段,已确认需要启动回调。仅在需要启动回调的情况下-数据才完全解压缩并作为kwargs传输交给最终处理者。立即将分析数据发送到ChatBase

注册是按顺序进行的,并且谁会更早注册的优先级更高(实际上,就像在Django路由中以及在其他项目中一样)。因此,为过时版本注册处理程序是CallBackQuery处理程序中的第一项

处理过时版本的逻辑很简单-告知用户此信息并发送更新的数据(如果可能)。

4.服务


当机器人在一个控制类中启动时,所有服务都将初始化,然后在机器人(DJ中的任何地方普遍使用它

每个服务都有其自己的ThreadPoolExecutor,其中包含一定数量的工作线程,这些任务将提交到该工作线程中。

在电报中重新加载曲目


目前,此服务尚未重写为User Bot,以绕过Telegram中下载文件大小的限制。事实证明,在Yandex.Music中,有超过50 mb的文件-播客。

该服务检查文件大小,如果文件过大,则向用户发出警报。多亏了第5段中描述的缓存系统,这可以检查歌词的可用性和接收情况。曲目也会被缓存。文件的哈希存储在数据库中。如果存在,则发送具有已知缓存的音频。

如果数据库中没有文件,则会从Yandex.Music接收直接链接。尽管目前,用户无法更改质量设置,但所有设置都设置为标准值。通过用户设置中的比特率和编解码器搜索文件。

该文件及其封面将以tempfile.TemporaryFile()的形式下载,然后将其上传到Telegram。值得注意的是,TG并不总是能够正确识别曲目的持续时间,但我通常对艺术家和标题保持沉默。因此,这些数据取自Yandex,幸运的是,可以将它们传输到购物车。

通过此服务发送音频文件时,将调用finish_callback(),通过订阅向服务发出有关下载结束的信号。

订阅服务,用于接收曲目并发送下载状态


曲目不会立即加载。多个用户可能请求相同的曲目。在这种情况下,首先请求曲目的用户是曲目的发起者和所有者。重新加载一秒钟以上时,下载状态开始:“用户发送语音消息”。要求相同曲目的其他用户是普通订户。它们与所有者一样,每〜4秒发送一次下载状态,以便消息不会中断(状态挂起5秒钟)。所有者的曲目下载完成后,就会从上面的服务中调用finish_callback()。之后,所有订户将从状态列表中删除,并接收下载的曲目。

在体系结构解决方案中,轨道的所有者也是订户,但具有一定的标记,因为发送轨道的方式不同。

5.查询缓存


就像我在上一篇文章中提到的那样,对Yandex.Music API的请求非常繁重。曲目列表可以是3或5 mb。而且,只有很多请求。每次更新处理时,至少有2个请求被发送到Yandex:用于初始化客户端和执行特定操作。在某些地方,要收集足够的信息(例如,用于播放列表),您需要请求一个播放列表,接收其曲目,从登录页面获取信息(如果这是一个智能播放列表),并且不要忘记客户端的初始化。通常,在请求数量方面会让人感到恐惧。

我想要一种非常通用的东西,而不是为某些对象(相同的客户端)进行任何类型的存储。

由于该库允许您在请求之外指定自己的实例来执行查询,然后我利用了这一点。

重点很简单。缓存类本身是一个单例。它只有两个参数:缓存生存期,大小。执行请求时,将调用包装器。他被推翻了。检查缓存是通过冻结的arg和夸脱的哈希值进行的。缓存有时间添加。当检查更新数据的必要性时,要么从LimitedSizeDict获得数据,要么发出真实请求并将其添加到缓存中。

有些请求无法缓存,例如,设置“喜欢” /“不喜欢”。如果用户按以下顺序:“喜欢”,“不喜欢”,“喜欢”,则最终不会发送喜欢的图片。在这种情况下,发送请求时,您需要传递use_cache参数,其值等于False实际上,这是唯一不使用缓存的地方。

因此,我提出了最大胆的请求,以便对其进行缓存。我不会尝试将其拆分为小部分,仅在当前页面中才需要。我一次完成所有操作,在页面之间切换时,切换速度非常快(与旧方法相比)。

对于我来说,缓存的请求类非常漂亮,并且被简单地集成了。

资源
import copy
import time

from typing import Union
from collections import OrderedDict

import requests

from yandex_music.utils.request import Request as YandexMusicRequest


class LimitedSizeDict(OrderedDict):
    def __init__(self, *args, **kwargs):
        self.size_limit = kwargs.pop("size_limit", None)
        OrderedDict.__init__(self, *args, **kwargs)
        self._check_size_limit()

    def __setitem__(self, key, value):
        OrderedDict.__setitem__(self, key, value)
        self._check_size_limit()

    def _check_size_limit(self):
        if self.size_limit is not None:
            while len(self) > self.size_limit:
                self.popitem(last=False)


class CachedItem:
    def __init__(self, response: requests.Response):
        self.timestamp = time.time()
        self.response = response


class Cache:
    __singleton: 'Cache' = None

    def __init__(self, lifetime: Union[str, int], size: Union[str, int]):
        Cache.__singleton = self

        self.lifetime = int(lifetime) * 60
        self.size = int(size)

        self.storage: LimitedSizeDict = LimitedSizeDict(size_limit=int(size))

    def get(self, *args, **kwargs):
        hash_ = Cache.get_hash(*args, **kwargs)

        return self.storage[hash_].response

    def update(self, response: requests.Response, *args, **kwargs):
        hash_ = Cache.get_hash(*args, **kwargs)
        self.storage.update({hash_: CachedItem(response)})

    def need_to_fetch(self, *args, **kwargs):
        hash_ = Cache.get_hash(*args, **kwargs)
        cached_item = self.storage.get(hash_)

        if not cached_item:
            return True

        if time.time() - cached_item.timestamp > self.lifetime:
            return True

        return False

    @classmethod
    def get_instance(cls) -> 'Cache':
        if cls.__singleton is not None:
            return cls.__singleton
        else:
            raise RuntimeError(f'{cls.__name__} not initialized')

    @staticmethod
    def freeze_dict(d: dict) -> Union[frozenset, tuple, dict]:
        if isinstance(d, dict):
            return frozenset((key, Cache.freeze_dict(value)) for key, value in d.items())
        elif isinstance(d, list):
            return tuple(Cache.freeze_dict(value) for value in d)

        return d

    @staticmethod
    def get_hash(*args, **kwargs):
        return hash((args, Cache.freeze_dict(kwargs)))


class Request(YandexMusicRequest):
    def _request_wrapper(self, *args, **kwargs):
        use_cache = kwargs.get('use_cache', True)

        if 'use_cache' in kwargs:
            kwargs.pop('use_cache')

        if not use_cache:
            response = super()._request_wrapper(*args, **copy.deepcopy(kwargs))
        elif use_cache and Cache.get_instance().need_to_fetch(*args, **kwargs):
            response = super()._request_wrapper(*args, **copy.deepcopy(kwargs))
            Cache.get_instance().update(response, *args, **kwargs)
        else:
            response = Cache.get_instance().get(*args, **kwargs)

        return response



6.语音信息跟踪识别


刚开始时,没有想到将其添加到机器人中,但是发生了一个有趣的情况。机器人进行聊天(在机器人的说明中指明)。一段时间后,我注意到人们进入其中并发送带有音乐的语音消息。起初,我认为这是一种新的垃圾邮件,有人在这里开玩笑,在开玩笑。但是,当已经有10个人这样的人并且每个人都在做相同的事情时,我的朋友(由struct暗示的那个人)建议用户驱动Yandex.Music在寻找一种可以识别Yandex音乐的官方机器人,他们在那里看到聊天室,并认真地向它发送语音消息!这只是一个聪明而真实的假设。然后我开玩笑地说,是时候进行识别并在聊天中添加一个机器人了。为了好玩……一段时间之后,就完成了!

现在介绍API。最近,Yandex越来越多地使用Web套接字。我遇到了它们在i.module和i.station管理中的使用。音乐识别服务也可以在其上使用。我在机器人中投放了最低限度的解决方案,但未将实现添加到库中。

WS位于以下地址:wss://voiceservices.yandex.net/uni.ws
我们只需要两条消息-授权和识别请求
Yandex本身在其官方应用程序中会在一秒钟或三秒钟内发送短文件。作为响应,您可能需要发送更多数据或结果-是否找到。如果找到结果,则将返回轨道ID。.OGG文件

发送ENCODER = SpeechKit移动SDK v3.28.0我没有检查它如何与其他编码器一起使用,只是在Telegram记录的文件中进行了更改。

使用Web插座反转时,有时会发生魔术。有时我找不到曲目,但是当我通过识别请求更改消息中的语言时,我找到了。或首先找到它,然后停止,尽管文件是相同的。我认为曲目的语言是由客户端上的SpeechKit设置的我没有这样的机会自己做,而是进行暴力搜索。

我通过电报的语音消息通过膝盖进行语音识别
import uuid

import lomond


def get_auth_data():
    return {
        "event": {
            "header": {
                "messageId": str(uuid.uuid4()),
                "name": "SynchronizeState",
                "namespace": "System"
            },
            "payload": {
                "accept_invalid_auth": True,
                "auth_token": "5983ba91-339e-443c-8452-390fe7d9d308",
                "uuid": str(uuid.uuid4()).replace('-', ''),
            }
        }
    }


def get_asr_data():
    return {
        "event": {
            "header": {
                "messageId": str(uuid.uuid4()),
                "name": "Recognize",
                "namespace": "ASR",
                "streamId": 1
            },
            "payload": {
                "advancedASROptions": {
                    "manual_punctuation": False,
                    "partial_results": False
                },
                "disableAntimatNormalizer": False,
                "format": "audio/opus",
                "music_request2": {
                    "headers": {
                        "Content-Type": "audio/opus"
                    }
                },
                "punctuation": False,
                "tags": "PASS_AUDIO;",
                "topic": "queries"
            }
        }
    }


class Recognition:
    URI = 'wss://voiceservices.yandex.net/uni.ws'
    LANGS = ['', 'ru-RU', 'en-US']
    POLL_DELAY = 0.3

    def __init__(self, binary_data, status_msg):
        self.status_msg = status_msg
        self.websocket = lomond.WebSocket(self.URI)
        self.chunks = self.get_chunks_and_replace_encoder(binary_data)

    def get_track_id(self):
        for lang in Recognition.LANGS:
            asr_data = get_asr_data()
            if lang:
                asr_data['event']['payload'].update({'lang': lang})
                self.status_msg.edit_text(f'     {lang}...')
            else:
                self.status_msg.edit_text(f'      ...')

            for msg in self.websocket.connect(poll=self.POLL_DELAY):
                if msg.name == 'ready':
                    self.websocket.send_json(get_auth_data())
                    self.websocket.send_json(asr_data)

                    for chunk in self.chunks:
                        self.websocket.send_binary(chunk)

                if msg.name == 'text':
                    response = msg.json.get('directive')

                    if self.is_valid_response(response):
                        self.websocket.close()

                        return self.parse_track_id(response)
                    elif self.is_fatal_error(response):
                        self.websocket.close()

                        break

    def is_valid_response(self, response):
        if response.get('header', {}).get('name') == 'MusicResult' and \
                response.get('payload', {}).get('result') == 'success':
            self.status_msg.edit_text(f' ,  !')
            return True
        return False

    @staticmethod
    def is_fatal_error(response):
        if response.get('header', {}).get('name') == 'MusicResult' and \
                response.get('payload', {}).get('result') == 'music':
            return False
        return True

    @staticmethod
    def parse_track_id(response):
        return response['payload']['data']['match']['id']

    @staticmethod
    def get_chunks_and_replace_encoder(binary_data):
        chunks = []

        for chunk in binary_data.split(b'OggS')[1:]:
            if b'OpusTags' in chunk:
                pos = chunk.index(b'OpusTags') + 12
                size = len(chunk)
                chunk = chunk[:pos] + b'#\x00\x00\x00\x00ENCODER=SpeechKit Mobile SDK v3.28.0'
                chunk += b"\x00" * (size - len(chunk))

            chunks.append(b'\x00\x00\x00\x01OggS' + chunk)

        return chunks


, .

7.小笔记


最初,只有订阅用户才能使用该漫游器,因为该服务可以在有限的国家/地区使用(无订阅),并且具有漫游器的服务器位于欧洲。通过使用代理执行没有订阅的用户的请求即可解决该问题。代理服务器位于莫斯科。

可以选择页面,但最多可以选择100个页面(不能添加更多按钮,电报限制)。一些常见的页面请求具有更多页面。

在Yandex搜索中,页面上的元素数是硬编码的。多少首曲目,多少个播放列表。有时这甚至与前面显示的数据量不符。页面有变化,元素数量是浮动的。因此,它在机器人中也会跳跃,这不是很漂亮。并结合他们的分页器与他自己的-这样的事情。在其他可能每页请求一定数量元素的地方,一切都很完美。在漫游器中执行搜索后的语音消息为t.me/MarshalC/416

在Telegram中转发音频时,作者迷失了,并分配给进行转发的人。因此,所有曲目均以机器人用户名的签名发送。

实施图书馆无线电后我遇到的所有语音消息-t.me/MarshalC/422(关于轨道链,通过发送大量反馈批处理它,batch_id)。

结论


尽管事实上这是有关Telegram机器人的另一篇文章,您仍然可以在此处阅读,很可能是因为您对剪切中的一个点感兴趣,这很了不起,非常感谢

不幸的是,我没有完全打开该机器人的源代码(因为在某些地方我需要重构)。本文已描述了很多内容,但是某些方面(例如虚拟键盘及其生成方式)不受影响。在大多数情况下,本文中没有的内容只是与我的库一起使用,没有什么有趣的。

我以它们现在的形式展示了围绕一切旋转的课程。我承认有错误,但是所有这些都可以使用很长时间。在某些地方我喜欢我的代码,在某些地方我不喜欢-这是正常的。不要忘记使用WS进行识别是膝上的解决方案。准备阅读评论中的理性批评。

尽管甚至在我开始编写库时就已经计划好该机器人,但是我拒绝了这个想法,但是显然我回来了(这很无聊)。

Yandex.Music Bot-一个项目,证明使用该库与项目中的API配合使用是合适的

非常感谢妈妈,Yana,Sana'a和Glory。有人对错误进行校对,有人对提示进行提示,没有这些,本文中的某些内容可能就不存在了,有些人只是在发表前对文章进行评估。亚瑟(Arthur)为picci撰写文章,莱德(Lyod)为徽标。

言:现在,学习后我对分配感到非常尖锐。如果您准备打电话给我-请告诉我应聘简历的发送地址,请面试。我所有的联系人都在个人资料中。时间在燃烧,我希望对你有帮助。根据法律规定,我只能在白俄罗斯共和国工作。

PPS这是三篇文章的第二篇,但是毫无丝毫想法,我将有时间针对这个主题以及是否需要做另一个项目。我将公开展示他的主题-跨平台客户Yandex.Music。

Source: https://habr.com/ru/post/undefined/


All Articles