Under the hood of the Yandex.Music bot client

Introduction


Hello, Habr! Again, I am with the second article affecting the Yandex.Music API. The case is planned and mentioned in the first article .

Hands reached, done. Today I will talk about interesting, in my opinion, moments that are present in the codebase of my Telegram bot, which positions itself as a full-fledged client of music. We will also touch the Yandex music recognition API.

Before proceeding to the point-by-point story of the implementation of a particular thing, it would be worthwhile to have an idea about the bot itself and its functional capabilities.

Client video demo


In the main part I will talk about the following:

  1. Sign in to your account through the site on GitHub Pages (why and why).
  2. The data format, its packaging, and use in button data.
  3. Update routing, data versioning, context throwing into handlers.
  4. Services:
    • Service reloading track in Telegram.
    • The service of ā€œsubscriptionsā€ to receive a track with sending the status of the download.
  5. The simplest and most elegant query caching implementation.
  6. Recognition of a track by voice message and how it generally appeared in the bot.
  7. Small notes.

If you are interested in at least one point - welcome to cat.
The bot works with users who have logged into their account, but no. Everything revolves around four main classes of service: album, artist, playlist and track. All entity data is supported, there is pagination with page selection. To interact with them, an authorized user can use their personal menu, which includes: smart playlists, the ā€œI likeā€ playlist and their own playlists. For unauthorized users, search is available - sending a search query as a regular message. Now thereā€™s just a dry list of what else is there: receiving tracks and other objects via a direct link, receiving lyrics, the ability to like / dislike the track and the artist, viewing similar tracks, recognizing the track by voice message and more.

Main part


1. Login to account


Initially, the bot was not planned to work without authorization, therefore, even during the design, it was decided how authorization should occur - through its website . The main arguments were security and transparency. In no case did not want to accept user logins and passwords in clear messages. And authorization from the user's computer is elementary. Therefore, a site was written for authorization on React . It is a form of authorization with a redirect back to the bot. A piece with authorization and processing of captcha was taken from the library in Python and rewritten in JavaScript .

For authorization, the received OAuth token is used, which is transferred back to the bot via deep linking.

The final link for redirection looks like this: t.me/music_yandex_bot?start={oauth_token}

I wanted the best, but the users did not have trust. Only every tenth user was authorized (at the time when it was mandatory). Therefore, with the following updates, I had to explain why you should trust. All items about HTTPS , exclusively about requests from the client side and open source code were published on the site for authorization and duplicated in a welcome message in the bot. Got better.

Also, among the factors of low authorization, the inaccessibility of mirrors for t.me in Russia turned out to be and support for only users with a subscription (more on this in the notes, paragraph 7).

The first item was resolved using the URI ( tg: // ), and in a pinch, thereā€™s already a link to the mirror (if the automatic redirect didnā€™t work, and the button didnā€™t help), and the second one is again in item 7.


2. Data format


For this bot, I decided for the first time to use NoSQL DB - MongoDB . Understood: when embed a document, when not. Liked working with mongoengine. But I really didnā€™t want to store all the button data at home, and the client only has the record ID in the database. I was terrified of the amount of data, but I wanted to take a free Mongo server with a limit of 512 MB for data storage. To come up with a beautiful reuse of records if the data matches in several buttons and to clean outdated ones is much more difficult than storing everything in the buttons themselves. After analyzing the size of the data to be stored, I concluded that it fits easily.

At first, I just used JSON, but he very quickly refused it when he ran into a limit. In Telegram, the contents of the button data can only be no more than 64 bytes in UTF-8 .

Therefore, with a friendā€™s prompt, I started looking at pack from the struct module . So the types of queries were born, the primitive format, packaging and unpacking. Now it is used in the bot absolutely everywhere.

The format is very simple. The first byte is the type, the second is the version. Everything else is data for a particular type. Types are stored as Enum, have an ID , which is the first byte. In addition to ID , each type has a format for packing and unpacking data. For example: type SHOW_TRACK_MENUwhose format has the value "s?", where "s" is the unique identifier of the track, and "?" - Does the track have text.

At tracks used string type because: first, ID of the track can be a concatenation of ID and album ID of the track through the colon, and secondly, it may be the UUID . Tracks with UUID - self-loaded tracks, available only to the user who downloaded them.

Since the data does not always correspond to the format, for example, the same track ID can be represented simply by a number, before packing it must be cast into a type for the format. In this case, s. Therefore, in the class there is a method that normalizes the transferred data for packaging, so as not to do it yourself when passing to the constructor.

Strings are self-sufficient and are able to indicate their length when packaging and take this length into account when unpacking.

Support for older versions was not planned, so an exception is thrown if versions do not match. When processing updates, which I will discuss in the next paragraph, the necessary logic is called.

Since Telegram eats exclusively UTF-8 , the packed data is encoded in base85 . Yes, I am losing speed here and saving the smallest in size without using base64 , but given the small data, I consider using base85 appropriate.

Source. File callback_data.py
import struct
import base64

from ext.query_types import QueryType


class BadDataVersion(Exception):
    pass


class CallbackData:
    ACTUAL_VERSION = 7
    BASE_FORMAT = '<BB'

    def __init__(self, type_: QueryType, data=None, version=None):
        self.type = type_
        self.version = version or CallbackData.ACTUAL_VERSION
        self.data = data

        if self.data is not None and not isinstance(self.data, list):
            self.data = [self.data]

        if self.data is not None:
            self.data = self._normalize_data_to_format(self.data)

    def __repr__(self):
        return f'<CallbackData> Type: {self.type} Version: {self.version} Data: {self.data}'

    def _normalize_data_to_format(self, data, bytes_object=False):
        normalized_data = data.copy()
        for i, c in enumerate(self.type.format):
            cast = str
            if c.lower() in 'bhilqn':
                cast = int
            elif c in 'efd':
                cast = float
            elif c == '?':
                cast = bool

            casted = cast(data[i])
            if bytes_object and cast == str:
                casted = casted.encode('utf-8')
            normalized_data[i] = casted

        return normalized_data

    @staticmethod
    def decode_type(callback_data):
        decoded = base64.b85decode(callback_data.encode('utf-8'))
        type_, version = struct.unpack(CallbackData.BASE_FORMAT, decoded[:2])

        if CallbackData.ACTUAL_VERSION != version:
            raise BadDataVersion()

        return QueryType(type_), version, decoded

    @classmethod
    def decode(cls, type_, version, decoded):
        start, data = 2, []

        if start < len(decoded):
            format_iter = iter(type_.format)

            while True:
                if start >= len(decoded):
                    break

                format_ = next(format_iter, type_.format[-1])

                decode_str = format_ in 'ps'
                if decode_str:
                    # struct.calcsize('b') = 1
                    length = list(struct.unpack('b', decoded[start: start + 1]))[0]
                    start += 1

                    format_ = f'{length}{format_}'

                step = struct.calcsize(format_)

                unpacked = list(struct.unpack(f'{format_}', decoded[start: start + step]))
                if decode_str:
                    unpacked[0] = unpacked[0].decode('UTF-8')

                data += unpacked
                start += step

        return cls(type_, data if data else None, version)

    def encode(self):
        encode = struct.pack(self.BASE_FORMAT, self.type.value, self.version)

        if self.data is not None:
            format_iter = iter(self.type.format)
            normalized_data = self._normalize_data_to_format(self.data, bytes_object=True)

            for data in normalized_data:
                format_ = next(format_iter, self.type.format[-1])

                if format_ in 'ps':
                    #        .  'b'  
                    # -      ,    > 36 
                    encode += struct.pack('b', len(data))
                    encode += struct.pack(f'{len(data)}{format_}', data)
                else:
                    encode += struct.pack(f'{format_}', data)

        return base64.b85encode(encode).decode('utf-8')



3. Routing updates and context


The project uses the python-telegram-bot library to work with the Telegram Bot API . It already has a system for registering handlers for certain types of updates that have arrived, filters for regular expressions, commands, and so on. But, given my own data format and my types, I had to inherit from TelegramHandler and implement my Handler .

Update and context are passed to each handler through arguments. In this case, I have my own context and it is in Handler'eit is being formed, and this is: receiving and / or adding a user to the database, checking the relevance of the token to gain access to music, initializing the Yandex.Music client, depending on the authorization status and the availability of a subscription.

Further from my Handler'a there are more specific handlers, for example, CallbackQueryHandler . With it, a handler is registered for a certain type of update (my type, with a data format). To check whether this update is suitable for the current handler, not all data is unpacked, but only the first two bytes. At this stage, the need to launch a callback is verified. Only if launching the callback is necessary - is the data completely unpacked and transferred as kwargsto the final handler. Immediately there is a sending of analytical data to ChatBase .

Registration takes place sequentially, and the priority is higher for who will be registered earlier (in fact, as in Django routing, and in other projects). Therefore, registering a handler for an obsolete version is the first among CallBackQuery handlers .

The logic of processing the outdated version is simple - inform the user about this and send updated data, if possible.

4. Services


All services are initialized when the bot is launched in one control class, which is then universally used anywhere in the bot ( DJ ).

Each service has its own ThreadPoolExecutor with a certain number of workers into which tasks are submitted.

Reloading a track in Telegram


At the moment, this service has not been rewritten to User Bot to bypass the limit on the size of the downloaded file in Telegram. As it turned out, in Yandex.Music there are files larger than 50 mb - podcasts.

The service checks the file size and, in case of excess, throws an alert to the user. Thanks to the caching system described in paragraph 5, this checks for the availability and receipt of the lyrics. Tracks are also cached. The hash of the file is stored in the database. If there is one, audio with a known cache is being sent.

In the absence of a file in the database, a direct link is received from Yandex.Music. Although at the moment, users do not have the ability to change the quality settings, but all are set to standard values. The file is searched for by bitrate and codec from the user settings.

The file and its cover are downloaded as tempfile.TemporaryFile () , after which they are uploaded to Telegram. It is worth noting that TG does not always correctly recognize the duration of the track, but I generally am silent about the artist and the title. Therefore, these data are taken from Yandex, fortunately, it is possible to transmit them to the cart.

When an audio file is sent by this service, finished_callback () is called , signaling the service by subscription about the end of the download.

Subscription service for receiving tracks and sending download status


Tracks are not loaded instantly. It is possible that several users requested the same track. In this case, the user who requested the track first is the initiator and owner of the track. When reloading more than a second, the download status starts: ā€œThe user sends a voice messageā€. Other users who requested the same track are regular subscribers. They, like the owner, are sent the download status once every ~ 4 seconds so that the message does not interrupt (the status hangs for 5 seconds). As soon as the download of the track for the owner is completed, finished_callback () is called from the service above. After that, all subscribers are deleted from the status list and receive the downloaded track.

In the architectural solution, the owner of the track is also a subscriber, but with a certain mark, since the ways of sending the track are different.

5. Query caching


As we recall from my last article, requests to the Yandex.Music API are very heavy. The list of tracks can be 3 or 5 mb. Moreover, there are just a lot of requests. With each update processing, at least 2 requests are sent to Yandex: for initializing the client and for a specific action. In some places, to collect enough information (for example, for a playlist), you need to make a request for a playlist, for receiving its tracks, for information from the landing page (if this is a smart playlist), and do not forget about the initialization of the client. In general, quiet horror in terms of the number of requests.

I wanted something very universal, and not make any kind of storage for certain objects, the same clients.

Since the library allows you to specify your own instance for query execution, on top of requests, then I took advantage of this.

The point is simple. The cache class itself is a singleton. It has only two parameters: cache lifetime, size. When the request is executed, the wrapper is called. He is overridden. Checking the cache occurs by hash of frozen args and quarts. The cache has time to add. When checking the necessity of updating the data, either data from LimitedSizeDict is obtained, or a real request is made and added to the cache.

Some requests cannot be cached, for example, setting a like / dislike. If the user presses the following sequence: like, dislike, like, then in the end the like will not be delivered. For such cases, when sending a request, you need to pass the use_cache argument with a value equal to False. Actually, this is the only place where the cache is not used.

Thanks to this, I make the most bold requests so that they are cached. Iā€™m not trying to break it into small ones and needed only for the current page. I take everything at once, and when switching between pages I have a huge switching speed (in comparison with the old approach).

As for me, the cached request class turned out beautifully and was simply integrated.

Source
import copy
import time

from typing import Union
from collections import OrderedDict

import requests

from yandex_music.utils.request import Request as YandexMusicRequest


class LimitedSizeDict(OrderedDict):
    def __init__(self, *args, **kwargs):
        self.size_limit = kwargs.pop("size_limit", None)
        OrderedDict.__init__(self, *args, **kwargs)
        self._check_size_limit()

    def __setitem__(self, key, value):
        OrderedDict.__setitem__(self, key, value)
        self._check_size_limit()

    def _check_size_limit(self):
        if self.size_limit is not None:
            while len(self) > self.size_limit:
                self.popitem(last=False)


class CachedItem:
    def __init__(self, response: requests.Response):
        self.timestamp = time.time()
        self.response = response


class Cache:
    __singleton: 'Cache' = None

    def __init__(self, lifetime: Union[str, int], size: Union[str, int]):
        Cache.__singleton = self

        self.lifetime = int(lifetime) * 60
        self.size = int(size)

        self.storage: LimitedSizeDict = LimitedSizeDict(size_limit=int(size))

    def get(self, *args, **kwargs):
        hash_ = Cache.get_hash(*args, **kwargs)

        return self.storage[hash_].response

    def update(self, response: requests.Response, *args, **kwargs):
        hash_ = Cache.get_hash(*args, **kwargs)
        self.storage.update({hash_: CachedItem(response)})

    def need_to_fetch(self, *args, **kwargs):
        hash_ = Cache.get_hash(*args, **kwargs)
        cached_item = self.storage.get(hash_)

        if not cached_item:
            return True

        if time.time() - cached_item.timestamp > self.lifetime:
            return True

        return False

    @classmethod
    def get_instance(cls) -> 'Cache':
        if cls.__singleton is not None:
            return cls.__singleton
        else:
            raise RuntimeError(f'{cls.__name__} not initialized')

    @staticmethod
    def freeze_dict(d: dict) -> Union[frozenset, tuple, dict]:
        if isinstance(d, dict):
            return frozenset((key, Cache.freeze_dict(value)) for key, value in d.items())
        elif isinstance(d, list):
            return tuple(Cache.freeze_dict(value) for value in d)

        return d

    @staticmethod
    def get_hash(*args, **kwargs):
        return hash((args, Cache.freeze_dict(kwargs)))


class Request(YandexMusicRequest):
    def _request_wrapper(self, *args, **kwargs):
        use_cache = kwargs.get('use_cache', True)

        if 'use_cache' in kwargs:
            kwargs.pop('use_cache')

        if not use_cache:
            response = super()._request_wrapper(*args, **copy.deepcopy(kwargs))
        elif use_cache and Cache.get_instance().need_to_fetch(*args, **kwargs):
            response = super()._request_wrapper(*args, **copy.deepcopy(kwargs))
            Cache.get_instance().update(response, *args, **kwargs)
        else:
            response = Cache.get_instance().get(*args, **kwargs)

        return response



6. Track recognition by voice message


In the beginning, there was no thought to add this to the bot, but an interesting situation happened. The bot has a chat (it is indicated in the description of the bot). After some time, I noticed that people go into it and send voice messages with music. At first, I thought it was a new kind of spam such that someone botted and was kidding. But, when there were already 10 such people and everyone was doing the same thing, my friend (the same one suggested by struct ) suggested that users drive Yandex.Music in search of an official bot for recognizing music from Yandex, they see a chat room there and send voice messages to it in all seriousness! It was just a brilliant and true assumption. Then I jokingly said that it was time to make recognition and add a bot to the chat. For fun ... After a while, this was done!

Now about the API . Recently, Yandex has increasingly used web sockets. I met their use in the management of an i.module and an i.station. The music recognition service also works on it. I threw the minimum working solution in my bot, but I did not add the implementation to the library.

WS is located at the following address: wss: //voiceservices.yandex.net/uni.ws
We need only two messages - authorization and a request for recognize .
Yandex itself, in its official application, sends short files in a second or three. In response, you can get the need to send more data or the result - found or not. If the result is found, then the track ID will be returned. .Ogg files are

sent withENCODER = SpeechKit Mobile SDK v3.28.0 . I did not check how it works with other encoders, I just change it in the file recorded by Telegram.

When reversing with a web socket, sometimes magic happened. Sometimes I couldnā€™t find the track, but when I changed the language in the message with the recognition request, I did. Or at first it found, and then it stopped, although the file is the same. I thought that the language of the track is set by their SpeechKit on the client. Not having such an opportunity to do it myself, I do a brute force search.

My knee-made implementation of voice recognition by voice messages from Telegram
import uuid

import lomond


def get_auth_data():
    return {
        "event": {
            "header": {
                "messageId": str(uuid.uuid4()),
                "name": "SynchronizeState",
                "namespace": "System"
            },
            "payload": {
                "accept_invalid_auth": True,
                "auth_token": "5983ba91-339e-443c-8452-390fe7d9d308",
                "uuid": str(uuid.uuid4()).replace('-', ''),
            }
        }
    }


def get_asr_data():
    return {
        "event": {
            "header": {
                "messageId": str(uuid.uuid4()),
                "name": "Recognize",
                "namespace": "ASR",
                "streamId": 1
            },
            "payload": {
                "advancedASROptions": {
                    "manual_punctuation": False,
                    "partial_results": False
                },
                "disableAntimatNormalizer": False,
                "format": "audio/opus",
                "music_request2": {
                    "headers": {
                        "Content-Type": "audio/opus"
                    }
                },
                "punctuation": False,
                "tags": "PASS_AUDIO;",
                "topic": "queries"
            }
        }
    }


class Recognition:
    URI = 'wss://voiceservices.yandex.net/uni.ws'
    LANGS = ['', 'ru-RU', 'en-US']
    POLL_DELAY = 0.3

    def __init__(self, binary_data, status_msg):
        self.status_msg = status_msg
        self.websocket = lomond.WebSocket(self.URI)
        self.chunks = self.get_chunks_and_replace_encoder(binary_data)

    def get_track_id(self):
        for lang in Recognition.LANGS:
            asr_data = get_asr_data()
            if lang:
                asr_data['event']['payload'].update({'lang': lang})
                self.status_msg.edit_text(f'     {lang}...')
            else:
                self.status_msg.edit_text(f'      ...')

            for msg in self.websocket.connect(poll=self.POLL_DELAY):
                if msg.name == 'ready':
                    self.websocket.send_json(get_auth_data())
                    self.websocket.send_json(asr_data)

                    for chunk in self.chunks:
                        self.websocket.send_binary(chunk)

                if msg.name == 'text':
                    response = msg.json.get('directive')

                    if self.is_valid_response(response):
                        self.websocket.close()

                        return self.parse_track_id(response)
                    elif self.is_fatal_error(response):
                        self.websocket.close()

                        break

    def is_valid_response(self, response):
        if response.get('header', {}).get('name') == 'MusicResult' and \
                response.get('payload', {}).get('result') == 'success':
            self.status_msg.edit_text(f' ,  !')
            return True
        return False

    @staticmethod
    def is_fatal_error(response):
        if response.get('header', {}).get('name') == 'MusicResult' and \
                response.get('payload', {}).get('result') == 'music':
            return False
        return True

    @staticmethod
    def parse_track_id(response):
        return response['payload']['data']['match']['id']

    @staticmethod
    def get_chunks_and_replace_encoder(binary_data):
        chunks = []

        for chunk in binary_data.split(b'OggS')[1:]:
            if b'OpusTags' in chunk:
                pos = chunk.index(b'OpusTags') + 12
                size = len(chunk)
                chunk = chunk[:pos] + b'#\x00\x00\x00\x00ENCODER=SpeechKit Mobile SDK v3.28.0'
                chunk += b"\x00" * (size - len(chunk))

            chunks.append(b'\x00\x00\x00\x01OggS' + chunk)

        return chunks


, .

7. Small notes


Initially, only users with a subscription could use the bot due to the fact that the service can be used in a limited number of countries (without a subscription), and the server with the bot is located in Europe. The problem is solved by using a proxy to execute requests from users without a subscription. The proxy server is located in Moscow.

There is a choice of pages, but it is limited to 100 (no more buttons can be added, Telegram restriction). Some common page requests have a lot more pages.

In Yandex search, the number of elements on the page is hardcoded. How many tracks, how many playlists. Sometimes this does not even correspond to the amount of data displayed on the front. There is a page change, the number of elements is floating. Therefore, in the bot it also jumps, which is not very beautiful. And to combine their paginator with his own - something like that. In other places where it is possible to request a certain number of elements per page, everything is perfect. The voice message after implementing the search in the bot is t.me/MarshalC/416 .

When forward audio in Telegram, the author is lost and assigned to the one who made the forward. Therefore, all tracks are sent with the signature of the bot username.

Voice message with everything that I met after the implementation of the radio in the library - t.me/MarshalC/422(about the chain of tracks, going through it with sending a heap of feedback, batch_id ).

Conclusion


Despite the fact that this is another article about the Telegram bot, you read it right up to here, most likely because you were interested in one of the items in the cut and this is wonderful, thank you very much !

Unfortunately, I did not open the source code of the bot entirely (because in some places I need to refactor). Much has been described in this article, but some aspects, for example, with virtual keyboards and their generation are not affected. For the most part, what is not in the article is just working with my library, nothing interesting.

The classes around which everything revolves, I showed in the form in which they are now. I admit there are bugs, but it all works for a long time. In some places I like my code, in some places I donā€™t - and this is normal. Do not forget that working with WS for recognition is a solution on the knee. Ready to read reasoned criticism in the comments.

Although the bot was planned even when I started writing the library, then I disowned this idea, but, apparently, I returned (it was boring).

Yandex.Music Bot - a project that proves the suitability of using the library to work with the API in projects.

Many thanks to Mom, Yana, Sana'a, Glory. Someone for proofreading mistakes, some for hints, without which some points in this article might not have existed, and for some simply for evaluating the article before publication. Arthur for picci for the article, Lyod for the logo.

PS Now I have an acute issue with distribution after study. If you are ready to make a call on me - tell me where to send the CV, interview me, please. All my contacts are in the profile. Timing is burning, I hope for you. According to the law, I can only work out in the Republic of Belarus.

PPS This was the second article of three, but without the slightest idea I will have time to do another project on this topic and whether it is needed. Iā€™ll publicly reveal his topic - the cross-platform client Yandex.Music.

Source: https://habr.com/ru/post/undefined/


All Articles