Como escribí mi mensajero

Una tarde, después de otro día frustrante, lleno de intentos de equilibrar el juego, decidí que necesitaba un descanso urgente. ¡Cambiaré a otro proyecto, lo haré rápidamente, devolveré la autoestima que se ha reducido durante el desarrollo del juego y lo tomará por asalto con renovado vigor! Lo principal es elegir un proyecto agradable y relajante ... ¿Escribir su propio mensajero? ¡Decir ah! ¿Qué tan difícil puede ser?

El código se puede encontrar aquí .


Breves antecedentes


Durante casi un año antes de comenzar a trabajar en el messenger, había estado trabajando en el juego multijugador en línea Line Tower Wars. La programación fue bien, todo lo demás (equilibrio y visual en particular) no fue muy bueno. De repente resultó que hacer un juego y hacer un juego divertido (divertido para alguien que no sea él mismo) son dos cosas diferentes. Después de un año de prueba, necesitaba distraerme, así que decidí probar algo más. La elección recayó en el desarrollo móvil, a saber, Flutter. Escuché muchas cosas buenas sobre Flutter, y me gustó el dardo después de un breve experimento. Decidí escribir mi propio mensajero. En primer lugar, es una buena práctica implementar tanto el cliente como el servidor. En segundo lugar, habrá algo importante para poner en la cartera para buscar trabajo, solo estoy en el proceso.

Funcionalidad programada


  • Chats privados y grupales
  • Envío de texto, imágenes y videos.
  • Llamadas de audio y video
  • Confirmación de recibo y lectura (ticks de Votsap)
  • "Impresiones ..."
  • Notificaciones
  • Búsqueda por código QR y geolocalización

Mirando hacia el futuro, puedo decir con orgullo (y con alivio) que casi todo lo planeado se ha implementado, y que aún no se ha implementado, se implementará en el futuro cercano.



Selección de idioma


No pensé durante mucho tiempo con la elección del idioma. Al principio, era tentador usar el dardo tanto para el cliente como para el servidor, pero una inspección más detallada mostró que no hay muchos controladores para los dardos disponibles, y aquellos que no inspiran mucha confianza. Aunque no responderé para hablar sobre el momento actual, la situación puede haber mejorado. Entonces mi elección recayó en C #, con el que trabajé en Unity.

Arquitectura


Comenzó pensando en la arquitectura. Por supuesto, considerando que 3 y media personas probablemente usarán mi messenger, uno no tendría que preocuparse por la arquitectura en general. Tomas y haces lo mismo en innumerables tutoriales. Aquí está el nodo, aquí está el mongo, aquí están los sockets web. Hecho. Y Firebase está por aquí. Pero no es interesante. Decidí hacer un mensajero que pueda escalar fácilmente horizontalmente, como si esperara millones de clientes simultáneos. Sin embargo, como no tenía experiencia en esta área, tuve que aprender todo en la práctica por el método de errores y nuevamente errores.

La arquitectura final se ve así


No afirmo que dicha arquitectura sea súper genial y confiable, pero es viable y en teoría debería soportar cargas pesadas y escalar horizontalmente, pero realmente no entiendo cómo verificar. Y espero que no me haya perdido algún momento obvio que todos conozcan, excepto yo.

A continuación se muestra una descripción detallada de los componentes individuales.

Servidor frontend


Incluso antes de comenzar a hacer el juego, me fascinaba el concepto de un servidor asincrónico de un solo subproceso. Efectivamente y sin potencial race'ov: ¿qué más se puede pedir? Para entender cómo se organizan dichos servidores, comencé a profundizar en el módulo de asynciolenguaje python. La solución que vi me pareció muy elegante. En resumen, la solución de pseudocódigo se ve así.
//  ,      ,    
//       .      socket.Receive
//     , :
var bytesReceived = Completer<object>();
selector.Register(
    socket,
    SocketEvent.Receive,
    () => bytesReceived.Complete(null)
);

await bytesReceived.Future;

int n = socket.Receive(...); //   

// selector -     poll.   
//        (Receive 
//  ), ,    ,  .
//   completer,      ,
//        , ,     .
//     ,       .

Usando esta técnica simple, podemos servir una gran cantidad de sockets en un solo hilo. Nunca bloqueamos una secuencia mientras esperamos que se reciban o envíen bytes. La transmisión siempre está ocupada con trabajo útil. Concurrencia, en una palabra.

Los servidores frontend se implementan de esa manera. Todos son de un solo subproceso y asíncronos. Por lo tanto, para obtener el máximo rendimiento, debe ejecutar tantos servidores en una máquina como núcleos (4 en la imagen).

El servidor Frontend lee el mensaje del cliente y, según el código del mensaje, lo envía a uno de los temas en Kafka.

Una pequeña nota al pie para aquellos que no están familiarizados con kafa
, RabbitMQ. . , ( authentication backend authentication, ). ? - , (partition). , . , , . , ( , , , (headers)).

? ? . (consumer) ( consumer'), ( ) . , , , 2 , . 3 — 2. .

El servidor frontend envía un mensaje al kafka sin una clave (cuando no hay clave, el kafka simplemente envía mensajes a la parte por turno). El mensaje es extraído del tema por uno de los servidores de fondo correspondientes. El servidor procesa el mensaje y ... ¿qué sigue? Y lo que más depende del tipo de mensaje.

En el caso más común, se produce un ciclo de solicitud-respuesta. Por ejemplo, para una solicitud de registro, solo necesitamos darle una respuesta al cliente ( Success,EmailAlreadyInUse, etc.) Pero para un mensaje que contiene una invitación a un chat existente de nuevos miembros (Vasya, Emil y Julia), necesitamos responder de inmediato con tres tipos diferentes de mensajes. El primer tipo: debe notificar al invitador sobre el resultado de la operación (de repente se produjo un error del servidor). El segundo tipo: debe notificar a todos los miembros actuales del chat que ahora hay miembros nuevos y nuevos en el chat. El tercero es enviar invitaciones a Vasya, Emil y Yulia.

De acuerdo, eso no parece muy difícil, pero para enviar un mensaje a cualquier cliente necesitamos: 1) averiguar a qué servidor frontend está conectado este cliente (no elegimos a qué servidor se conectará el cliente, el equilibrador decide por nosotros); 2) enviar un mensaje desde el servidor de fondo al servidor de interfaz deseado; 3) de hecho, envíe un mensaje al cliente.

Para implementar los puntos 1 y 2, decidí usar un tema separado (tema "servidores frontend"). La separación de los temas de autenticación, sesión y llamada en particiones sirve como un mecanismo de paralelización. ¿Vemos que el servidor de sesión está muy cargado? Solo agregamos un par de nuevos servidores de partición y sesión, y Kafka redistribuirá la carga por nosotros, descargando los servidores de sesión existentes. La separación del tema "servidores frontend" en la partición sirve como un mecanismo de enrutamiento .

Cada servidor frontend corresponde a una parte del tema "servidores frontend" (con el mismo índice que el propio servidor). Es decir, servidor 0 - partición 0, y así sucesivamente. Kafka hace posible suscribirse no solo a un tema específico, sino también a una parte específica de un tema determinado. Todos los servidores frontend en las nuevas empresas se suscriben a la partición correspondiente. Por lo tanto, el servidor de fondo puede enviar un mensaje a un servidor frontend específico enviando un mensaje a una partición específica.

Bien, ahora cuando el cliente se une, solo necesita guardar en algún lugar un par de UserId - Frontend Server Index. En caso de desconexión - eliminar. Para estos propósitos, cualquiera de las muchas bases de datos clave-valor en memoria funcionará. Elegí un rábano.

Cómo se ve en la práctica. En primer lugar, una vez establecida la conexión, el cliente Andrey envía un mensaje al servidor Join. El servidor Frontend recibe el mensaje y lo reenvía al tema de la sesión, agregando preliminarmente el encabezado "Servidor Frontend": {index}. Uno de los servidores de sesión de fondo recibirá un mensaje, leerá el token de autorización, determinará qué tipo de usuario se ha unido, leerá el índice agregado por el servidor frontend y escribirá UserId - Index en el rábano. A partir de este momento, el cliente se considera en línea, y ahora sabemos a través de qué servidor frontend (y, en consecuencia, a través de qué parte del tema "servidores frontend") podemos "comunicarnos" con él cuando otros clientes envían mensajes a Andrey.

* De hecho, el proceso es un poco más complicado de lo que describí. Puedes encontrarlo en el código fuente.

Seudocódigo del servidor frontend


// Frontend Server 6
while (true) {
    // Consume from "Frontend Servers" topic, partition 6
    var messageToClient = consumer.Consume();
    if (message != null) {
        relayMessageToClient(messageToClient);
    }

    var callbacks = selector.Poll();
    while (callbacks.TryDequeue(out callback)) {
        callback();
    }

    long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
    while (!callAtQueue.IsEmpty && callAtQueue.PeekPriority() <= now) {
        callAtQueue.Dequeue()();
    }

    while (messagesToRelayToBackendServers.TryDequeue(out messageFromClient)) {
        // choose topic
        producer.Produce(topic, messageFromClient);
    }
}


Hay algunos trucos aquí.
1) relayMessageToClient. Será un error tomar simplemente la toma que desee y de inmediato comenzar a enviar un mensaje a ella, porque a lo mejor estamos ya enviando otro mensaje al cliente. Si comenzamos a enviar bytes sin verificar si el socket está actualmente ocupado, los mensajes se mezclarán. Como en muchos otros lugares donde se requiere el procesamiento ordenado de datos, el truco consiste en utilizar una cola, es decir, una cola de Completers ( TaskCompletionSourceen C #).
void async relayMessageToClient(message) {
    // find client
    await client.ReadyToSend();
    await sendMessage(client, message);
    client.CompleteSend();
}

class Client {
    // ...
    sendMessageQueue = new LinkedList<Completer<object>>();

    async Future ReadyToSend() {
        var sendMessage = Completer<object>();
	if (sendMessageQueue.IsEmpty) {
	    sendMessageQueue.AddLast(sendMessage);
	} else {
	    var prevSendMessage = sendMessageQueue.Last;
	    sendMessageQueue.AddLast(sendMessage);
	    await prevSendMessage.Future;
	}
    }

    void CompleteSend() {
        var sendMessage = sendMessageQueue.RemoveFirst();
	sendMessage.Complete(null);
    }
}

Si la cola no está vacía, el socket ya está ocupado en este momento. Cree uno nuevo completer, agréguelo a la cola y awaital anterior completer . Por lo tanto, cuando se envía el mensaje anterior, se CompleteSendcompletará completer, lo que hará que el servidor comience a enviar el siguiente mensaje. Tal cola también permite propagar excepciones sin problemas. Supongamos que se produce un error al enviar un mensaje a un cliente. En este caso, debemos completar, con la excepción de enviar no solo este mensaje, sino también todos los mensajes que actualmente están esperando en la cola (espere await'ah). Si no lo hacemos, continuarán colgándose y recibiremos una pérdida de memoria. Por brevedad, el código que hace esto no se muestra aquí.

2)selector.Poll. En realidad, ni siquiera es un truco, sino solo un intento de suavizar las deficiencias de la implementación del método Socket.Select( selector- solo una envoltura sobre este método). Dependiendo del sistema operativo bajo el capó, este método utiliza selecto poll. Pero esto no es importante aquí. Lo importante es cómo funciona este método con las listas que lo alimentamos a la entrada (lista de sockets para lectura, escritura, verificación de errores). Este método toma listas, sondea los sockets y deja solo esos sockets en las listas que están listos para realizar la operación requerida. Todos los demás enchufes se eliminan de las listas. La "patada" ocurre a través deRemoveAt(es decir, todos los elementos posteriores se desplazan, lo que es ineficiente). Además, dado que necesitamos sondear todos los enchufes registrados en cada iteración del ciclo, tal "limpieza" es generalmente dañina, tenemos que volver a llenar las listas cada vez. Podemos solucionar todos estos problemas utilizando uno personalizado List, RemoveAtcuyo método no elimina el elemento de la lista, sino que simplemente lo marca como eliminado. La clase ListForPollinges mi implementación de dicha lista. ListForPollingsolo funciona con el método Socket.Selecty no es adecuado para nada más.

3)callAtQueue. En la mayoría de los casos, el servidor frontend, después de haber enviado el mensaje del cliente al servidor back-end, espera una respuesta (confirmación de que la operación fue exitosa o un error si algo salió mal). Si no espera una respuesta dentro de un período de tiempo configurable, envía un error al cliente para que no espere una respuesta que nunca llegará. callAtQueueEs una cola prioritaria. Inmediatamente después de que el servidor envía el mensaje a Kafka, hace algo como esto:
long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
callAtQueue.Enqueue(callback, now + config.WaitForReplyMSec);

En la devolución de llamada, se cancela la espera de una respuesta y comienza el envío de error del servidor. Si se recibe una respuesta del servidor de fondo, la devolución de llamada no hace nada. No hay forma de

usarlo await Task.WhenAny(answerReceivedTask, Task.Delay(x)), ya que el código después de que Task.Delayse ejecuta en el subproceso del grupo.

Aquí, de hecho, todo sobre los servidores frontend. Se requiere una ligera corrección aquí. De hecho, el servidor no está completamenteUn solo hilo. Por supuesto, kafka debajo del capó usa hilos, pero me refiero al código de la aplicación. El hecho es que enviar un mensaje al tema de kafka (producir) puede no tener éxito. En caso de falla, Kafka repite el envío de un cierto número de veces configurables, pero, si fallan las salidas repetidas, Kafka abandona este negocio como desesperado. Puede verificar si el mensaje se envió con éxito o no en el deliveryHandlerque pasamos al método Produce. Kafka llama a este controlador en el hilo de E / S del productor (el hilo que envía mensajes). Debemos asegurarnos de que el mensaje se haya enviado correctamente y, si no, cancelar la espera de una respuesta del servidor de fondo (la respuesta no llegará porque la solicitud no se envió) y enviar un error al cliente. Es decir, no podemos evitar interactuar con otro hilo.

* Al escribir un artículo, de repente me di cuenta de que no podemos pasar deliveryHandleral método Produceo simplemente ignorar todos los errores de kafka (el error aún se enviará al cliente por el tiempo de espera que describí anteriormente), entonces todo nuestro código será de un solo subproceso. Ahora estoy pensando en cómo hacerlo mejor.

¿Por qué, de hecho, kafka, no conejo?
, , , , , RabbitMQ? . , , . ? , frontend . , backend , , . , , . , error-prone. , basicGet , , , . . basicGet, , . .


Servidor de fondo


En comparación con el servidor frontend, prácticamente no hay puntos interesantes aquí. Todos los servidores de fondo funcionan de la misma manera. Al inicio, el servidor se suscribe al tema (autenticación, sesión o llamada según el rol), y el kafka le asigna una o más particiones. El servidor recibe el mensaje de Kafka, procesa y generalmente envía uno o más mensajes en respuesta. Código casi real:
void Run() {
    long lastCommitTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();

    while (true) {
        var consumeResult = consumer.Consume(
            TimeSpan.FromMilliseconds(config.Consumer.PollTimeoutMSec)
        );

        if (consumeResult != null) {
            var workUnit = new WorkUnit() {
                ConsumeResult = consumeResult,
            };

            LinkedList<WorkUnit> workUnits;
            if (partitionToWorkUnits.ContainsKey(consumeResult.Partition)) {
                workUnits = partitionToWorkUnits[consumeResult.Partition];
            } else {
                workUnits = partitionToWorkUnits[consumeResult.Partition] =
                    new LinkedList<WorkUnit>();
            }

            workUnits.AddLast(workUnit);

            handleWorkUnit(workUnit);
        }

	if (
            DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - lastCommitTime >=
            config.Consumer.CommitIntervalMSec
        ) {
            commitOffsets();
	    lastCommitTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
	}
    }
}

¿Qué tipo de compensaciones para comprometer?
. — (offset) (0, 1 ). 0. TopicPartitionOffset. (consume) , ConsumeResult, , , TopicPartitionOffset. ?

at least once delivery, , ( ). , (commited) . , consumer 16, , 16 , , , . - consumer' consumer' , 16 + 1 ( + 1). 17 . N , .

Deshabilité el autocompromiso y el compromiso. Esto es necesario porque handleWorkUnit, cuando el procesamiento del mensaje se lleva a cabo realmente, este es un async voidmétodo, por lo tanto, no hay garantía de que el mensaje 5 se procesará antes del mensaje 6. Kafka almacena solo un desplazamiento comprometido (y no un conjunto de desplazamiento), respectivamente, antes de confirmar el desplazamiento 6, debemos asegurarnos de que todos los mensajes anteriores también se hayan procesado. Además, un servidor de fondo puede consumir mensajes de varias particiones al mismo tiempo y, por lo tanto, debe asegurarse de que confirma el desplazamiento correcto en la partición correspondiente. Para esto, usamos un mapa hash de la partición de formulario: unidades de trabajo. Así es como se ve el código commitOffsets(código real esta vez):
private void commitOffsets() {
    foreach (LinkedList<WorkUnit> workUnits in partitionToWorkUnits.Values) {
        WorkUnit lastFinishedWorkUnit = null;
        LinkedListNode<WorkUnit> workUnit;
        while ((workUnit = workUnits.First) != null && workUnit.Value.IsFinished) {
            lastFinishedWorkUnit = workUnit.Value;
            workUnits.RemoveFirst();
        }

        if (lastFinishedWorkUnit != null) {
            offsets.Add(lastFinishedWorkUnit.ConsumeResult.TopicPartitionOffset);
        }
    }

    if (offsets.Count > 0) {
        consumer.Commit(offsets);
        foreach (var offset in offsets) {
            logger.Debug(
                "{Identifier}: Commited offset {TopicPartitionOffset}",
                identifier,
                offset
            );
        }
        offsets.Clear();
    }
}

Como puede ver, iteramos sobre las unidades, encontramos la última unidad completada en este momento, después de lo cual no hay unidades incompletas , y confirmamos el desplazamiento correspondiente. Tal bucle nos permite evitar cometer "holey". Por ejemplo, si actualmente tenemos 4 unidades ( 0: Finished, 1: Not Finished, 2: Finished, 3: Finished), solo podemos comprometer la unidad 0, ya que si confirmamos la tercera de inmediato, esto puede conducir a la pérdida potencial de la primera si el servidor muere en este momento.
class WorkUnit {
    public ConsumeResult<Null, byte[]> ConsumeResult { get; set; }
    private int finished = 0;

    public bool IsFinished => finished == 1;

    public void Finish() {
        Interlocked.Increment(ref finished);
    }
}


handleWorkUnitcomo se dijo, el async voidmétodo y, en consecuencia, está completamente envuelto try-catch-finally. En tryél llama al servicio necesario, y en finally- workUnit.Finish().

Los servicios son bastante triviales. Aquí, por ejemplo, qué código se ejecuta cuando el usuario envía un nuevo mensaje:
private async Task<ServiceResult> createShareItem(CreateShareItemMessage msg) {
    byte[] message;
    byte[] messageToPals1 = null;
    int?[] partitions1 = null;

    //  UserId  .
    long? userId = hashService.ValidateSessionIdentifier(msg.SessionIdentifier);
    if (userId != null) {
        var shareItem = new ShareItemModel(
            requestIdentifier: msg.RequestIdentifier,
            roomIdentifier: msg.RoomIdentifier,
            creatorId: userId,
            timeOfCreation: null,
            type: msg.ShareItemType,
            content: msg.Content
        );

        //      null,
        //     .
        long? timeOfCreation = await storageService.CreateShareItem(shareItem);
        if (timeOfCreation != null) {
            //      .
            List<long> pals = await inMemoryStorageService.GetRoomPals(
                msg.RoomIdentifier
            );
            if (pals == null) {
            	//     -       .
                pals = await storageService.GetRoomPals(msg.RoomIdentifier);
                await inMemoryStorageService.SaveRoomPals(msg.RoomIdentifier, pals);
            }

            //    ,  .
            pals.Remove(userId.Value);

            if (pals.Count > 0) {
            	//  ack,  ,    
                //    .
                await storageService.CreateAck(
                    msg.RequestIdentifier, userId.Value, msg.RoomIdentifier,
                    timeOfCreation.Value, pals
                );

                // in -  UserId, out -   frontend ,
                //    .  -   -
                //   null.
                partitions1 = await inMemoryStorageService.GetUserPartitions(pals);

                List<long> onlinePals = getOnlinePals(pals, partitions1);

                //    ,       .
                //         .
                if (onlinePals.Count > 0) {
                    messageToPals1 = converterService.EncodeNewShareItemMessage(
                        userId.Value, timeOfCreation.Value, onlinePals, shareItem
                    );
                    nullRepeatedPartitions(partitions1);
                    // -         
                    // frontend ,    null' .
                }
            }

            message = converterService.EncodeSuccessfulShareItemCreationMessage(
                msg.RequestIdentifier, timeOfCreation.Value
            );
        } else {
            message = converterService.EncodeMessage(
                MessageCode.RoomNotFound, msg.RequestIdentifier
            );
        }
    } else {
        message = converterService.EncodeMessage(
            MessageCode.UserNotFound, msg.RequestIdentifier
        );
    }

    return new ServiceResult(
        message: message, //    .
        messageToPals1: messageToPals1, //  -    .
        partitions1: partitions1
    );
}


Base de datos


La mayor parte de la funcionalidad de los servicios llamados por los servidores de back-end es simplemente agregar nuevos datos a la base de datos y procesar los existentes. Obviamente, cómo está organizada la base de datos y cómo operamos en ella es muy importante para el mensajero, y aquí me gustaría decir que abordé el tema de elegir una base de datos con mucho cuidado después de estudiar cuidadosamente todas las opciones, pero esto no es así. Acabo de elegir CockroachDb porque promete mucho con un mínimo de esfuerzo y tiene una sintaxis compatible con postgres (trabajé con postgres antes). Pensé en usar Cassandra, pero al final decidí pensar en algo familiar. Nunca había trabajado con Kafka, o con Rabbit, o con Flutter y Dart, o con WebRtc, así que decidí no arrastrar a Cassandra también, porque tenía miedo de ahogarme en una gran cantidad de nuevas tecnologías para mí.

De todas las partes de mi proyecto, el diseño de la base de datos es lo que más dudo. No estoy seguro de que las decisiones que tomé sean realmente buenas . Todo funciona, pero podría hacerse mejor. Por ejemplo, hay tablas ShareRooms (como llamo chats) y ShareItems (como llamo mensajes). Por lo tanto, todos los usuarios que ingresan a una sala se registran en el campo jsonb de esta sala. Esto es conveniente, pero obviamente muy lento, por lo que probablemente lo volveré a hacer usando claves foráneas. O, por ejemplo, la tabla ShareItems almacena todos los mensajes. Lo cual también es conveniente, pero dado que ShareItems es una de las tablas más cargadas (persistente selectyinsert), podría valer la pena crear una nueva tabla para cada habitación o algo así. Kokroach dispersa los registros en diferentes nodos, por lo tanto, debe pensar cuidadosamente qué registro se utilizará para lograr el máximo rendimiento, pero no lo hice. En general, como se puede entender de todo lo anterior, las bases de datos no son mi punto más fuerte. En este momento, generalmente estoy probando todo para postgres, y no kokroach, porque hay menos carga en mi máquina de trabajo, ya es tan pobre por las cargas que despegará pronto. Afortunadamente, el código para postgres y kokroach difiere bastante, por lo que cambiar no es difícil.

Ahora estoy en el proceso de estudiar cómo funciona realmente el cocroach (cómo se produce el mapeo entre SQL y el valor clave (el cocroach usa RocksDb debajo del capó), cómo distribuye datos entre nodos, réplicas, etc.). Por supuesto, valió la pena estudiar el cocroach antes de usarlo, pero más vale tarde que nunca.

Creo que la base sufrirá grandes cambios cuando llegue a comprender mejor este problema. En este momento, la mesa de Acks me persigue. En esta tabla, almaceno datos sobre quién aún no ha recibido y quién aún no ha leído el mensaje (para mostrar las marcas de verificación del usuario). Es fácil notificar al usuario que su mensaje ha sido leído si el usuario está en línea ahora, pero si no, necesitamos guardar esta información para notificarlo más tarde. Y como los chats grupales están disponibles, no basta con almacenar la bandera, sino que necesita datos sobre usuarios individuales. Entonces, aquí pedimos directamente el uso de cadenas de bits (una línea para usuarios que aún no han recibido, la segunda, para aquellos que aún no han leído). Especialmente soporte de kokroach bitybit varying. Sin embargo, nunca descubrí cómo implementar este negocio, dado que la composición de las habitaciones puede cambiar constantemente. Para que las cadenas de bits conserven su significado, los usuarios en la sala deben permanecer en el mismo orden, lo cual es bastante difícil de hacer cuando, por ejemplo, algún usuario abandona la sala. Hay opciones aquí. Quizás valga la pena escribir -1 en lugar de eliminar al usuario del campo jsonb para que se mantenga el orden, o usar algún método de control de versiones, de modo que sepamos que esta cadena de bits se refiere al orden de los usuarios, que era entonces, y no en el orden actual de los usuarios. Todavía estoy en el proceso de pensar en cómo implementar mejor este negocio, pero por el momento, aquellos que aún no han recibido y no han leído a los usuarios también son solo campos jsonb. Dado que la tabla Acks se escribe con cada mensaje, la cantidad de datos es grande.Aunque el registro, por supuesto, se elimina cuando el mensaje es recibido y leído por todos.

Aleteo


Durante mucho tiempo trabajé en el lado del servidor y utilicé clientes de consola simples para la prueba, por lo que ni siquiera creé un proyecto de Flutter. Y cuando lo creé, pensé que la parte del servidor era una parte compleja , y la aplicación es así, basura, lo resolveré en un par de días. Mientras trabajaba en el servidor, creé Hello Worlds para aletear un par de veces para tener una idea del marco, y dado que el mensajero no necesita ninguna interfaz de usuario compleja, pensé que estaba completamente listo. Entonces, la interfaz de usuario, realmente, es basura, pero la implementación de la funcionalidad me dio problemas (y aún se entregará, ya que no todo está listo).

Administración del Estado


El tema más popular. Hay mil maneras de controlar su condición, y el enfoque recomendado se cambia cada seis meses. Ahora la corriente principal es el proveedor. Personalmente, elegí 2 formas para mí: bloc y redux. Bloc (Business Logic Component) para gestionar el estado local y redux para gestionar el global.

Bloc no es algún tipo de biblioteca (aunque, por supuesto, también hay una biblioteca que reduce la repetitiva, pero no la uso). Bloc es un enfoque basado en la transmisión. En general, el dardo es un lenguaje bastante agradable, y las transmisiones son generalmente tan dulces. La esencia de este enfoque es que empujamos toda la lógica empresarial a los servicios, y nos comunicamos entre la interfaz de usuario y los servicios a través de un controlador que nos proporciona varias corrientes. ¿El usuario hizo clic en el botón "buscar contacto"? Utilizandosink(el otro extremo de la transmisión) enviamos un evento al controlador SearchContactsEvent, el controlador llamará al servicio deseado, esperará el resultado y también devolverá la lista de usuarios a la interfaz de usuario a través de la transmisión. La interfaz de usuario espera los resultados utilizando StreamBuilder(widget que se reconstruye cada vez que llegan datos nuevos a la secuencia a la que está suscrito). Eso, de hecho, es todo. En algunos casos, necesitamos actualizar la interfaz de usuario sin la participación del usuario (por ejemplo, cuando llegó un nuevo mensaje), pero esto también se puede hacer fácilmente a través de las transmisiones. De hecho, un MVC simple con transmisiones, sin magia.

En comparación con otros enfoques, el bloque requiere más repetitivo, pero, en mi opinión, es mejor usar soluciones nativas sin la participación de bibliotecas de terceros, a menos que el uso de una solución de terceros proporcione algo significativoventajas Cuantas más abstracciones en la parte superior, más difícil es comprender cuál es el error cuando se produce un error. No considero que las ventajas del proveedor sean lo suficientemente significativas como para cambiar a él. Pero tengo poca experiencia en esta área, por lo que es probable que cambie el campamento en el futuro.

Bueno, sobre redux, y así todos lo saben todo, así que no hay nada que decir. Además, lo corté de la aplicación :) Lo utilicé para administrar mi cuenta, pero luego, al darme cuenta de que en este caso no hay ventajas especiales sobre el bloque, lo corté para no arrastrar demasiado. Pero en general considero que redux es algo útil para administrar el estado global.

La parte más insoportable


¿Qué debo hacer si el usuario envió un mensaje, pero antes de que se enviara, se perdió la conexión a Internet? ¿Qué debo hacer si el usuario recibió una confirmación de lectura, pero cerró la aplicación antes de que se actualizara el registro correspondiente en la base de datos? ¿Qué debo hacer si el usuario invitó a su amigo a la sala, pero antes de que se enviara la invitación, su batería se agotó? ¿Alguna vez has hecho preguntas similares? Aquí estoy. Antes de. Pero en el proceso de desarrollo comencé a preguntarme. Como la conexión puede desaparecer en cualquier momento y el teléfono se apaga en cualquier momento, todo debe confirmarse . No es divertido. Por lo tanto, el primer mensaje que el cliente envía al servidor ( Joinsi recuerda) no es solo "Hola, estoy en línea" , es"Hola, estoy en línea y aquí hay habitaciones sin confirmar, aquí hay confirmaciones no confirmadas, aquí hay operaciones de membresía de sala sin confirmar y aquí están los últimos mensajes recibidos por habitación" . Y el servidor responde con una hoja similar: “Mientras estabas desconectado, tal y tal tus mensajes fueron leídos por tales y tales usuarios, y también invitaron a Petya a esta sala, y Sveta salió de esta sala, y tú fuiste invitado a esta sala, pero a estas dos habitaciones tienen 40 nuevos puestos " . Realmente me gustaría saber cómo se hacen cosas similares en otros mensajeros, porque mi implementación no brilla con gracia.

Imágenes


Por el momento, puede enviar texto, texto + imágenes y solo imágenes. La carga de video aún no se ha implementado. Las imágenes se comprimen un poco y se guardan en el almacenamiento de Firebase. El mensaje en sí contiene enlaces. Al recibir el mensaje, el cliente descarga imágenes, genera miniaturas y guarda todo en el sistema de archivos. Las rutas de archivo se escriben en la base de datos. Por cierto, la generación de miniaturas es el único código ejecutado en un subproceso separado, ya que es una operación de procesamiento pesado. Acabo de comenzar una secuencia de trabajo, le doy una imagen y, a cambio, obtengo una miniatura. El código es extremadamente simple, ya que dart proporciona abstracciones convenientes para trabajar con flujos.

ThumbnailGeneratorService
class ThumbnailGeneratorService {
  SendPort _sendPort;
  final Queue<Completer<Uint8List>> _completerQueue =
      Queue<Completer<Uint8List>>();

  ThumbnailGeneratorService() {
    var receivePort = ReceivePort();
    Isolate.spawn(startWorker, receivePort.sendPort);

    receivePort.listen((data) {
      if (data is SendPort) {
        _sendPort = data;
      } else {
        var completer = _completerQueue.removeFirst();
        completer.complete(data);
      }
    });
  }

  static void startWorker(SendPort sendPort) async {
    var receivePort = ReceivePort();
    sendPort.send(receivePort.sendPort);

    receivePort.listen((imageBytes) {
      Image image = decodeImage(imageBytes);
      Image thumbnail = copyResize(image, width: min(image.width, 200));

      sendPort.send(Uint8List.fromList(encodePng(thumbnail)));
    });
  }

  Future<Uint8List> generate(Uint8List imageBytes) {
    var completer = Completer<Uint8List>();
    _completerQueue.add(completer);
    
    _sendPort.send(imageBytes);

    return completer.future;
  }
}


La autenticación de Firebase también se usa, pero solo para la autorización de acceso al almacenamiento de Firebase (para que el usuario no pueda, por ejemplo, completar la foto de perfil de otra persona ). Toda otra autorización se realiza a través de mis servidores.

Formato de mensaje


Probablemente estés horrorizado aquí, ya que uso conjuntos de bytes regulares. Json desaparece porque se requiere eficiencia, y no sabía sobre protobuf cuando comencé. El uso de matrices requiere mucho cuidado porque un índice está mal y las cosas salen mal.

Los primeros 4 bytes son la longitud del mensaje.
El siguiente byte es el código del mensaje.
Los siguientes 16 bytes son el identificador de solicitud (uuid).
Los siguientes 40 bytes son el token de autorización.
El resto del mensaje .

Longitud del mensajerequerido, ya que no utilizo sockets http o web, o algún otro protocolo que proporcione la separación de un mensaje de otro. Mis servidores frontend solo ven flujos de bytes, y necesitan saber dónde termina un mensaje y comienza otro. Hay varias formas de separar los mensajes (por ejemplo, para usar algún tipo de carácter que nunca se encuentra en los mensajes como separador), pero preferí especificar la longitud, ya que este método es el más fácil, aunque implica una sobrecarga, ya que faltan la mayoría de los mensajes y un byte para indicar la longitud.

El código del mensaje es solo uno de los miembros de la enumeraciónMessageCode. El enrutamiento se lleva a cabo de acuerdo con el código, y dado que podemos extraer el código de la matriz sin deserialización preliminar, el servidor frontend decide en qué tema del kafka enviar un mensaje en lugar de delegar esta responsabilidad a otra persona.

ID de solicitudpresente en la mayoría de las publicaciones, pero no en todas. Realiza 2 funciones: mediante este identificador, el cliente establece la correspondencia entre la solicitud enviada y la respuesta recibida (si el cliente envió los mensajes A, B, C en este orden, esto no significa que las respuestas también estarán en orden). La segunda función es evitar duplicados. Como se mencionó anteriormente, kafka garantiza al menos una vez la entrega. Es decir, en casos raros, los mensajes aún pueden duplicarse. Al agregar la columna RequestIdentifier con una restricción única a la tabla de base de datos deseada, podemos evitar insertar un duplicado.

Token de autorizaciónEs un UserId (8 bytes) + 32 bytes de firma HmacSha256. No creo que valga la pena usar Jwt aquí. ¿Jwt es aproximadamente 7-8 veces más grande para qué? Mis usuarios no tienen ningún reclamo, por lo que una simple firma hmac está bien. La autorización a través de otros servicios no es ni está planificada.

Llamadas de audio y video


Es curioso que pospuse deliberadamente la implementación de las llamadas de audio y video, porque estaba seguro de que no podría resolver los problemas, pero en realidad resultó ser una de las características más fáciles de implementar. Al menos la funcionalidad básica. En general, solo agregar WebRtc a la aplicación y obtener la primera sesión de video tomó solo unas pocas horas y, milagrosamente, la primera prueba fue exitosa. Antes de eso, pensé que el código que funcionó la primera vez era un mito. Por lo general, la primera prueba de una nueva característica siempre falla debido a algún tipo de error estúpido como "se agregó un servicio, pero no se registró en un contenedor DI".

No es muy breve sobre WebRtc para los no iniciados
WebRtc — , peer-to-peer , , peer-to-peer , . - , , . , .

(peer-to-peer), , 3 ( , 3 . 3 ).

— stun . stun , — Source IP Source Port , . ? - . IP . - , , , Source IP Source Port IP - NAT [ Source IP | Source Port | Router External IP | Router Port ]. - , Dest IP Dest Port Router External IP Router Port NAT, , Source IP — Source Port , . , , , , , , NAT . stun NAT . stun Router External IP — Router Port. — . , «» NAT (NAT traversal) , NAT , stun .
* NAT , . , , WebRtc .

— turn. , , peer-to-peer . Fallback . , , , , , peer-to-peer . turn — coturn, .

— . , . , . — . . , , , :) — .

WebRtc 3 : offer, answer candidate. offer , answer, . , , , , . ( ) , .


La tecnología WebRtc en sí misma establece una conexión y maneja las secuencias de un lado a otro, pero este no es un marco para crear llamadas completas. Por llamada, me refiero a una sesión de comunicación con la capacidad de cancelar, rechazar y aceptar la llamada, así como colgar. Además, debe informar a la persona que llama si el otro lado ya está ocupado. Y también para implementar pequeñas cosas como "esperar una respuesta a la llamada N segundos, luego reiniciar". Si simplemente implementa WebRtc en la aplicación de forma simple, con una llamada entrante, la cámara y el video se encenderán espontáneamente, lo que, por supuesto, es inaceptable.

En su forma pura, WebRtc generalmente implica enviar candidatos a la otra parte lo antes posible para que las negociaciones comiencen lo más rápido posible, lo cual es lógico. En mis pruebas, los candidatos al partido receptor generalmente siemprecomenzó a llegar incluso antes de que llegue la oferta. Dichos candidatos "tempranos" no pueden descartarse, deben recordarse, de modo que más tarde, cuando llegue la oferta y RTCPeerConnectionse cree, agréguelos a la conexión. El hecho de que los candidatos puedan comenzar a llegar incluso antes de la oferta, así como algunas otras razones, hacen que la implementación de llamadas completas sea una tarea no trivial. ¿Qué hacer si varios usuarios nos llaman a la vez? Recibiremos candidatos de todos, y aunque podemos separar a los candidatos de un usuario de otro, no queda claro qué candidatos rechazar, porque no sabemos qué oferta vendrá antes. También habrá problemas si los candidatos comienzan a venir a nosotros y luego una oferta en el momento en que nosotros mismos llamamos a alguien.

Después de probar varias opciones con un WebRtc simple, llegué a la conclusión de que de esta forma sería problemático y lleno de pérdidas de memoria tratar de hacer llamadas, así que decidí agregar otra etapa al proceso de negociación de WebRtc. Yo llamo a esta etapa Inquire - Grant/Refuse.

La idea es muy simple, pero me llevó bastante tiempo alcanzarla. La persona que llama, incluso antes de crear la transmisión y RTCPeerConnection(y generalmente antes de ejecutar cualquier código relacionado con WebRtc) envía un mensaje a través del servidor de señales al otro lado Inquire. En el lado receptor, se verifica si el usuario está en alguna otra sesión de comunicación en este momento ( boolcampo simple ). Si es así, se devuelve un mensaje.Refuse, y de esta manera le informamos a la persona que llama que el usuario está ocupado y al receptor, que tal o cual teléfono llamó mientras estaba ocupado con otra conversación. Si el usuario está actualmente libre, entonces está reservado . El Inquireidentificador de sesión se envía en el mensaje , y este identificador se establece como el identificador de la sesión actual . Si el usuario está reservado, rechazará todos los Inquire/Offer/Candidatemensajes con identificadores de sesión distintos del actual. Después de la reserva, el receptor envía un mensaje a través del servidor de señal a la persona que llama Grant. Vale la pena decir que este proceso no es visible para el usuario receptor, ya que todavía no hay una llamada. Y lo principal aquí es no olvidar colgar un tiempo de espera en el lado receptor. De repente, reservaremos una sesión y no se ofrecerá ninguna oferta.

La persona que llama recibe Grant, y aquí es donde WebRtc comienza con ofertas, candidatos, y esto es para todos. La oferta vuela hacia el receptor y él, al recibirla, muestra una pantalla con los botones Responder / Rechazar. Pero los candidatos, como siempre, no esperan a nadie. De nuevo comienzan a llegar incluso antes de la oferta, porque no hay razón para esperar a que el usuario responda la llamada. Es posible que no responda, pero rechace o espere hasta que expire el tiempo de espera, entonces los candidatos simplemente serán expulsados.

Estado actual y planes futuros


  • Chats privados y grupales
  • Envío de texto, imágenes y videos.
  • Llamadas de audio y video
  • Confirmación de recibo y lectura
  • "Impresiones ..."
  • Notificaciones
  • Búsqueda por código QR y geolocalización


La búsqueda por código QR es, inesperadamente, bastante problemática de implementar, porque casi todos los complementos para el escaneo de código que intenté se niegan a comenzar o no funcionan correctamente. Pero creo que los problemas se resolverán aquí. Y para la implementación de la búsqueda de geolocalización, aún no he retomado. En teoría, no debería haber ningún problema especial.

Notificaciones en progreso, así como el envío de videos.

¿Qué más hay que hacer?


Oh mucho
En primer lugar, no hay pruebas. Los colegas solían escribir exámenes, así que me relajé por completo.
En segundo lugar, invitar a los usuarios a un chat existente y abandonar el chat actualmente no es posible. El código del servidor está listo para esto, el código del cliente no.
En tercer lugar, si el manejo de errores en el servidor es más o menos, entonces no hay manejo de errores en el cliente. No basta con hacer una entrada de registro; debe volver a intentar la operación. Ahora, por ejemplo, el mecanismo para reenviar mensajes no está implementado.
Cuarto, el servidor no hace ping al cliente, por lo que no se detecta la desconexión si, por ejemplo, el cliente ha perdido Internet. La desconexión se detecta solo cuando el cliente cierra la aplicación.
Quinto, los índices no se usan en la base de datos.
Sexto, optimización. El código tiene una gran cantidad de lugares donde se escribe algo así // @@TODO: Pool. La mayoría de las matrices son solo neweso. El servidor de fondo crea muchas matrices de longitud fija, por lo que aquí puede y debe usar el grupo.
Séptimo, hay muchos lugares en el cliente donde awaittermina el código , aunque esto no es necesario. El envío de imágenes, por ejemplo, parece lento porque el códigoawaitGuarda imágenes en el sistema de archivos y genera miniaturas antes de mostrar el mensaje, aunque no es necesario hacer nada de esto. O, por ejemplo, si abre la aplicación y, durante su ausencia, le enviaron imágenes, el inicio será lento, porque nuevamente todas estas imágenes se descargan, se guardan en el sistema, se generan miniaturas, y solo después de eso finaliza el inicio y se le arroja desde la pantalla de inicio en la pantalla de inicio Todos estos redundantes awaitse hicieron para una depuración más fácil, pero, por supuesto, debe deshacerse de las esperas innecesarias antes del lanzamiento.
OctavoLa interfaz de usuario ahora está medio lista, porque no he decidido cómo quiero verla. Por lo tanto, ahora no todo es intuitivo, la mitad de los botones no tienen claro qué están haciendo. Y los botones a menudo no se presionan la primera vez, porque ahora son solo iconos con GestureDetectory sin relleno, por lo que no siempre es posible acceder a ellos. Además, en algunos lugares, el desbordamiento de píxeles no es fijo.
Noveno, ahora es incluso imposible iniciar sesión en una cuenta, solo registrarse. Por lo tanto, si desinstala la aplicación y la reinstala, no podrá iniciar sesión en su cuenta :)
Décimo, el código de verificación no se envía al correo. Ahora el código generalmente es siempre el mismo, nuevamente porque es más fácil de depurar.
UndécimoEl principio de responsabilidad única se viola en muchos lugares. Necesito un refactor. Las clases responsables de interactuar con la base de datos (tanto en el cliente como en el servidor) generalmente están muy hinchadas porque participan en todas las operaciones de la base de datos.
Duodécimo, el servidor frontend ahora siempre espera una respuesta del servidor back-end, incluso si el mensaje no implica enviar una respuesta (por ejemplo, un mensaje con un código IsTypingy algunos mensajes relacionados con WebRtc). Por lo tanto, sin esperar una respuesta, escribe un error en la consola, aunque esto no es un error.
Decimotercero, las imágenes completas no se abren al tocar.
Cien millones de quintosAlgunos mensajes que deben enviarse en lotes se envían por separado. Lo mismo se aplica a algunas operaciones de la base de datos. En lugar de ejecutar un solo comando, los comandos se ejecutan en un bucle con await(brr ..).
Cien millones de sextos, algunos valores están codificados, en lugar de ser configurables.
Ciento un millón de séptimosiniciar sesión en el servidor ahora es solo a la consola, y en el cliente en general, directamente al widget. En la pantalla principal hay una pestaña de Registros, donde se sueltan todos los registros al tocar. El hecho es que mi máquina de trabajo se niega a ejecutar tanto el emulador como todo lo necesario para el servidor (kafka, base de datos, rábano y todos los servidores). El débito con un dispositivo conectado tampoco funcionó, todo simplemente se colgó en la mitad de los casos, porque la computadora no podía hacer frente a las cargas. Por lo tanto, debe realizar una compilación cada vez, soltarla en el dispositivo, instalarla y probarla de esta manera. Para ver los registros, los dejo caer directamente en el widget. Perversión, lo sé, pero no hay otra opción. Por la misma razón, muchos métodos regresan FutureyawaitSon (para atrapar la excepción y tirar al widget), aunque no deberían. Si observa el código, verá un _logErrormétodo feo en muchas clases que hace esto. Esto, por supuesto, también irá a la basura.
Ciento millones y ocho, no hay sonidos.
Ciento millones y noveno, debe usar el almacenamiento en caché más.
Cien millones de décimas, mucho código repetitivo. Por ejemplo, muchas acciones primero comprueban la validez del token y, si no es válido, envían un error. Creo que necesita implementar una tubería de middleware simple.

Y muchas pequeñas cosas, como concatenar cadenas en lugar de usar StringBuilder'a,Disposeno en todas partes se llama donde debería, y así sucesivamente. En general, el estado normal del proyecto está en desarrollo. Todo lo anterior es solucionable, pero hay un problema fundamental en el que no pensé hasta el último momento, porque se me pasó por la cabeza: el mensajero debería funcionar incluso cuando la aplicación no está abierta, y la mía no funciona. Para ser honesto, la solución a este problema aún no se me ha pasado por la cabeza. Aquí, aparentemente, no puede prescindir del código nativo.

Calificaría la preparación del proyecto en un 70%.

Resumen


Han pasado seis meses desde el inicio del trabajo en el proyecto. Combinado con el trabajo a tiempo parcial y tomó largos descansos, pero de todos modos, el tiempo y el esfuerzo fueron decentemente. Planeo implementar todas las características declaradas + agregar algo inusual como tic-tac-toe o borradores en la habitación. Sin ninguna razón, solo porque es interesante.

Si tiene alguna pregunta, escriba. El correo está en github.

All Articles