Como eu escrevi meu messenger

Uma noite, depois de outro dia frustrante, cheio de tentativas de equilibrar o jogo, decidi que precisava urgentemente de um descanso. Vou mudar para outro projeto, fazê-lo rapidamente, devolver a auto-estima que diminuiu durante o desenvolvimento do jogo e atacar o jogo com vigor renovado! O principal é escolher um projeto agradável e relaxante ... Escreva seu próprio messenger? Ha! Quão difícil isso pode ser?

O código pode ser encontrado aqui .


Breve histórico


Por quase um ano antes de começar a trabalhar no messenger, ele trabalhava no jogo multiplayer online Line Tower Wars. A programação correu bem, todo o resto (equilíbrio e visual em particular) não foi muito bom. De repente, verificou-se que fazer um jogo e fazer um jogo divertido (diversão para alguém que não seja ele mesmo) são duas coisas diferentes. Depois de um ano de provação, eu precisava me distrair, então decidi tentar outra coisa. A escolha recaiu sobre o desenvolvimento móvel, chamado Flutter. Ouvi muitas coisas boas sobre Flutter e gostei do dardo após um breve experimento. Eu decidi escrever meu próprio mensageiro. Em primeiro lugar, é uma boa prática implementar cliente e servidor. Em segundo lugar, haverá algo significativo para colocar no portfólio em busca de trabalho, estou apenas no processo.

Funcionalidade agendada


  • Bate-papo privado e em grupo
  • Enviando texto, imagens e vídeos
  • Chamadas de áudio e vídeo
  • Confirmação de recebimento e leitura (ticks da Votsap)
  • "Imprime ..."
  • Notificações
  • Pesquisa por código QR e geolocalização

No futuro, posso dizer com orgulho (e com alívio) que quase tudo o que foi planejado foi implementado e ainda não foi implementado - será implementado em um futuro próximo.



Seleção de idioma


Não pensei muito tempo com a escolha do idioma. No começo, era tentador usar o dardo para o cliente e o servidor, mas uma inspeção mais detalhada mostrou que não há muitos drivers para dardos disponíveis e aqueles que não inspiram muita confiança. Embora eu não prometa falar sobre o momento atual, a situação pode ter melhorado. Então, minha escolha recaiu sobre C #, com a qual trabalhei no Unity.

Arquitetura


Ele começou pensando sobre arquitetura. Obviamente, considerando que 3 pessoas e meia provavelmente usarão meu messenger, não seria necessário se preocupar com a arquitetura em geral. Você pega e faz como em inúmeros tutoriais. Aqui está o nó, aqui está o mongo, aqui estão os soquetes da web. Feito. E o Firebase está por aqui. Mas não é interessante. Decidi criar um mensageiro que pode ser facilmente escalonado horizontalmente, como se eu esperasse milhões de clientes simultâneos. No entanto, como não tinha experiência nessa área, tive que aprender tudo na prática pelo método dos erros e novamente erros.

A arquitetura final se parece com isso


Não afirmo que essa arquitetura seja super legal e confiável, mas é viável e, em teoria, deve suportar cargas pesadas e escalar horizontalmente, mas não entendo realmente como verificar. E espero não ter perdido um momento óbvio conhecido por todos, exceto eu.

Abaixo está uma descrição detalhada dos componentes individuais.

Servidor front-end


Mesmo antes de começar a fazer o jogo, fiquei fascinado pelo conceito de um servidor de thread único assíncrono. Efetivamente e sem potencial race'ov - o que mais você pode pedir. Para entender como esses servidores são organizados, comecei a me aprofundar no módulo da asynciolinguagem python. A solução que vi parecia muito elegante. Em suma, a solução de pseudo-código se parece com isso.
//  ,      ,    
//       .      socket.Receive
//     , :
var bytesReceived = Completer<object>();
selector.Register(
    socket,
    SocketEvent.Receive,
    () => bytesReceived.Complete(null)
);

await bytesReceived.Future;

int n = socket.Receive(...); //   

// selector -     poll.   
//        (Receive 
//  ), ,    ,  .
//   completer,      ,
//        , ,     .
//     ,       .

Usando esta técnica simples, podemos atender um grande número de soquetes em uma única rosca. Nunca bloqueamos um fluxo enquanto esperamos que os bytes sejam recebidos ou enviados. O fluxo está sempre ocupado com um trabalho útil. Concorrência, em uma palavra.

Servidores front-end são implementados dessa maneira. Eles são todos de thread único e assíncrono. Portanto, para obter o desempenho máximo, você precisa executar o número de servidores em uma máquina que possua núcleos (4 na imagem).

O servidor Frontend lê a mensagem do cliente e, com base no código da mensagem, envia-a para um dos tópicos do Kafka.

Uma pequena nota de rodapé para quem não conhece o kafa
, RabbitMQ. . , ( authentication backend authentication, ). ? - , (partition). , . , , . , ( , , , (headers)).

? ? . (consumer) ( consumer'), ( ) . , , , 2 , . 3 — 2. .

O servidor front-end envia uma mensagem para o kafka sem uma chave (quando não há chave, o kafka simplesmente envia mensagens para a parte). A mensagem é retirada do tópico por um dos servidores de back-end correspondentes. O servidor processa a mensagem e ... o que vem depois? E o que mais depende do tipo de mensagem.

No caso mais comum, ocorre um ciclo de solicitação-resposta. Por exemplo, para uma solicitação de registro, basta responder ao cliente ( Success,EmailAlreadyInUseetc). Mas para uma mensagem que contém um convite para um bate-papo existente de novos membros (Vasya, Emil e Julia), precisamos responder imediatamente com três tipos diferentes de mensagens. O primeiro tipo - você precisa notificar o anfitrião sobre o resultado da operação (de repente ocorreu um erro no servidor). O segundo tipo - você precisa notificar todos os membros atuais do bate-papo que agora existem novos e novos membros no bate-papo. O terceiro é enviar convites para Vasya, Emil e Yulia.

Ok, isso não parece muito difícil, mas para enviar uma mensagem para qualquer cliente, precisamos: 1) descobrir em qual servidor front-end esse cliente está conectado (não escolhemos em qual servidor o cliente se conectará, o balanceador decide por nós); 2) envie uma mensagem do servidor back-end para o servidor front-end desejado; 3) envie uma mensagem para o cliente.

Para implementar os pontos 1 e 2, decidi usar um tópico separado (tópico "servidores frontend"). A separação dos tópicos de autenticação, sessão e chamada em partições serve como um mecanismo de paralelização. Vemos que o servidor de sessões está muito carregado? Apenas adicionamos alguns novos servidores de partição e sessão, e o Kafka redistribuirá a carga para nós, descarregando os servidores de sessão existentes. A separação do tópico "servidores frontend" na partição serve como um mecanismo de roteamento .

Cada servidor frontend corresponde a uma parte do tópico "servidores frontend" (com o mesmo índice que o próprio servidor). Ou seja, servidor 0 - partição 0 e assim por diante. Kafka possibilita a assinatura não apenas de um tópico específico, mas também de uma parte específica de um determinado tópico. Todos os servidores front-end nas start-ups assinam a partição correspondente. Portanto, o servidor back-end pode enviar uma mensagem para um servidor front-end específico enviando uma mensagem para uma partição específica.

Ok, agora quando o cliente ingressa, você só precisa salvar em algum lugar um par de UserId - Frontend Server Index. Em caso de desconexão - excluir. Para esses fins, qualquer um dos muitos bancos de dados de valores-chave na memória funcionará. Eu escolhi um rabanete.

Como fica na prática. Antes de tudo, depois que a conexão é estabelecida, o cliente Andrey envia uma mensagem para o servidor Join. O servidor Frontend recebe a mensagem e a encaminha para o tópico da sessão, adicionando preliminarmente o cabeçalho “Servidor Frontend”: {index}. Um dos servidores de sessão de back-end receberá uma mensagem, lerá o token de autorização, determinará que tipo de usuário ingressou, lerá o índice adicionado pelo servidor de front-end e gravará UserId - Index no rabanete. A partir desse momento, o cliente é considerado online e agora sabemos através de qual servidor front-end (e, consequentemente, através de qual parte do tópico "servidores front-end") podemos "alcançá-lo" quando outros clientes enviam mensagens para Andrey.

* De fato, o processo é um pouco mais complicado do que eu descrevi. Você pode encontrá-lo no código fonte.

Pseudo-código do servidor front-end


// Frontend Server 6
while (true) {
    // Consume from "Frontend Servers" topic, partition 6
    var messageToClient = consumer.Consume();
    if (message != null) {
        relayMessageToClient(messageToClient);
    }

    var callbacks = selector.Poll();
    while (callbacks.TryDequeue(out callback)) {
        callback();
    }

    long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
    while (!callAtQueue.IsEmpty && callAtQueue.PeekPriority() <= now) {
        callAtQueue.Dequeue()();
    }

    while (messagesToRelayToBackendServers.TryDequeue(out messageFromClient)) {
        // choose topic
        producer.Produce(topic, messageFromClient);
    }
}


Existem alguns truques aqui.
1) relayMessageToClient. Será um erro simplesmente pegar o soquete desejado e começar imediatamente a enviar uma mensagem para ele, porque talvez estamos enviando outra mensagem para o cliente. Se começarmos a enviar bytes sem verificar se o soquete está ocupado no momento, as mensagens serão misturadas. Como em muitos outros lugares onde o processamento ordenado de dados é necessário, o truque é usar uma fila, a saber, uma fila de Completadores ( TaskCompletionSourceem C #).
void async relayMessageToClient(message) {
    // find client
    await client.ReadyToSend();
    await sendMessage(client, message);
    client.CompleteSend();
}

class Client {
    // ...
    sendMessageQueue = new LinkedList<Completer<object>>();

    async Future ReadyToSend() {
        var sendMessage = Completer<object>();
	if (sendMessageQueue.IsEmpty) {
	    sendMessageQueue.AddLast(sendMessage);
	} else {
	    var prevSendMessage = sendMessageQueue.Last;
	    sendMessageQueue.AddLast(sendMessage);
	    await prevSendMessage.Future;
	}
    }

    void CompleteSend() {
        var sendMessage = sendMessageQueue.RemoveFirst();
	sendMessage.Complete(null);
    }
}

Se a fila não estiver vazia, o soquete já estará ocupado no momento. Crie um novo completer, adicione-o à fila e awaitao anterior completer . Assim, quando a mensagem anterior for enviada, ela CompleteSendserá concluída completer, o que fará com que o servidor comece a enviar a próxima mensagem. Essa fila também permite propagar suavemente exceções. Suponha que ocorreu um erro ao enviar uma mensagem para um cliente. Nesse caso, precisamos concluir, com a exceção de enviar não apenas esta mensagem, mas também todas as mensagens que estão atualmente aguardando na fila (aguarde await'ah'). Caso contrário, eles continuarão travando e receberemos um vazamento de memória. Por uma questão de brevidade, o código que faz isso não é mostrado aqui.

2)selector.Poll. Na verdade, não é nem um truque, mas apenas uma tentativa de atenuar as deficiências da implementação do método Socket.Select( selectorapenas um invólucro sobre esse método). Dependendo do sistema operacional, este método usa selectou poll. Mas isso não é importante aqui. O importante é como esse método funciona com as listas que alimentamos na entrada (lista de soquetes para leitura, gravação, verificação de erros). Esse método pega listas, consulta soquetes e deixa apenas os soquetes nas listas que estão prontos para executar a operação necessária. Todos os outros soquetes são lançados das listas. O "chute" ocorre atravésRemoveAt(ou seja, todos os elementos subsequentes são alterados, o que é ineficiente). Além disso, como precisamos pesquisar todos os soquetes registrados a cada iteração do ciclo, essa "limpeza" geralmente é prejudicial, e precisamos preencher as listas todas as vezes. Podemos solucionar todos esses problemas usando um problema personalizado List, RemoveAtcujo método não remove o item da lista, mas simplesmente o marca como excluído. A classe ListForPollingé a minha implementação dessa lista. ListForPollingsó funciona com o método Socket.Selecte não é adequado para mais nada.

3)callAtQueue. Na maioria dos casos, o servidor front-end, tendo enviado a mensagem do cliente para o servidor back-end, espera uma resposta (confirmação de que a operação foi bem-sucedida ou um erro se algo der errado). Se ele não esperar por uma resposta dentro de um período configurável, ele envia um erro ao cliente para que ele não espere por uma resposta que nunca virá. callAtQueueÉ uma fila prioritária. Imediatamente após o servidor enviar a mensagem para Kafka, ele faz algo assim:
long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
callAtQueue.Enqueue(callback, now + config.WaitForReplyMSec);

No retorno de chamada, a espera de uma resposta é cancelada e o envio do erro do servidor começa. Se uma resposta do servidor back-end for recebida, o retorno de chamada não fará nada. Não há como

usá- await Task.WhenAny(answerReceivedTask, Task.Delay(x))lo, pois o código após a Task.Delayexecução no thread do pool.

Aqui, de fato, tudo sobre servidores front-end. Uma pequena correção é necessária aqui. De fato, o servidor não está totalmenterosca simples. Obviamente, o kafka sob o capô usa threads, mas quero dizer o código do aplicativo. O fato é que o envio de uma mensagem para o tópico kafka (produção) pode não ter êxito. No caso de uma falha, Kafka repete o envio de um certo número configurável de vezes, mas, se saídas repetidas falharem, Kafka abandona esse negócio como inútil. Você pode verificar se a mensagem foi enviada com sucesso ou não na deliveryHandlerqual passamos para o método Produce. Kafka chama esse manipulador no encadeamento de E / S do produtor (o encadeamento que envia mensagens). Devemos garantir que a mensagem tenha sido enviada com êxito e, caso contrário, cancelar a espera de uma resposta do servidor back-end (a resposta não ocorrerá porque a solicitação não foi enviada) e enviar um erro ao cliente. Ou seja, não podemos evitar a interação com outro segmento.

* Ao escrever um artigo, de repente percebi que não podemos passar deliveryHandlerpara o método Produceou simplesmente ignorar todos os erros kafka (o erro ainda será enviado ao cliente pelo tempo limite que eu descrevi anteriormente) - então todo o nosso código será de thread único. Agora estou pensando em como fazer melhor.

Por que, de fato, kafka, não coelho?
, , , , , RabbitMQ? . , , . ? , frontend . , backend , , . , , . , error-prone. , basicGet , , , . . basicGet, , . .


Servidor back-end


Comparado ao servidor front-end, praticamente não há pontos interessantes aqui. Todos os servidores back-end funcionam da mesma maneira. Na inicialização, o servidor se inscreve no tópico (autenticação, sessão ou chamada, dependendo da função), e o kafka atribui uma ou mais partições a ele. O servidor recebe a mensagem da Kafka, processa e geralmente envia uma ou mais mensagens em resposta. Código quase real:
void Run() {
    long lastCommitTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();

    while (true) {
        var consumeResult = consumer.Consume(
            TimeSpan.FromMilliseconds(config.Consumer.PollTimeoutMSec)
        );

        if (consumeResult != null) {
            var workUnit = new WorkUnit() {
                ConsumeResult = consumeResult,
            };

            LinkedList<WorkUnit> workUnits;
            if (partitionToWorkUnits.ContainsKey(consumeResult.Partition)) {
                workUnits = partitionToWorkUnits[consumeResult.Partition];
            } else {
                workUnits = partitionToWorkUnits[consumeResult.Partition] =
                    new LinkedList<WorkUnit>();
            }

            workUnits.AddLast(workUnit);

            handleWorkUnit(workUnit);
        }

	if (
            DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - lastCommitTime >=
            config.Consumer.CommitIntervalMSec
        ) {
            commitOffsets();
	    lastCommitTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
	}
    }
}

Que tipo de compensações a serem cometidas?
. — (offset) (0, 1 ). 0. TopicPartitionOffset. (consume) , ConsumeResult, , , TopicPartitionOffset. ?

at least once delivery, , ( ). , (commited) . , consumer 16, , 16 , , , . - consumer' consumer' , 16 + 1 ( + 1). 17 . N , .

Desativei a confirmação automática e me comprometo. Isso é necessário porque handleWorkUnit, onde o processamento da mensagem é realmente realizado, este é um async voidmétodo, portanto, não há garantia de que a mensagem 5 será processada antes da mensagem 6. Kafka armazena apenas um deslocamento confirmado (e não um conjunto de deslocamento), respectivamente, antes de confirmar o deslocamento 6, precisamos garantir que todas as mensagens anteriores também tenham sido processadas. Além disso, um servidor back-end pode consumir mensagens de várias partições ao mesmo tempo e, portanto, deve certificar-se de confirmar o deslocamento correto para a partição correspondente. Para isso, usamos um mapa de hash da partição de formulário: unidades de trabalho. Aqui está a aparência do código commitOffsets(código real desta vez):
private void commitOffsets() {
    foreach (LinkedList<WorkUnit> workUnits in partitionToWorkUnits.Values) {
        WorkUnit lastFinishedWorkUnit = null;
        LinkedListNode<WorkUnit> workUnit;
        while ((workUnit = workUnits.First) != null && workUnit.Value.IsFinished) {
            lastFinishedWorkUnit = workUnit.Value;
            workUnits.RemoveFirst();
        }

        if (lastFinishedWorkUnit != null) {
            offsets.Add(lastFinishedWorkUnit.ConsumeResult.TopicPartitionOffset);
        }
    }

    if (offsets.Count > 0) {
        consumer.Commit(offsets);
        foreach (var offset in offsets) {
            logger.Debug(
                "{Identifier}: Commited offset {TopicPartitionOffset}",
                identifier,
                offset
            );
        }
        offsets.Clear();
    }
}

Como você pode ver, iteramos sobre as unidades, localizamos a última unidade concluída neste momento, após a qual não há unidades incompletas e comprometemos o deslocamento correspondente. Esse loop permite evitar confirmações "holey". Por exemplo, se atualmente tivermos 4 unidades ( 0: Finished, 1: Not Finished, 2: Finished, 3: Finished), podemos confirmar apenas a 0ª unidade, porque se confirmarmos a 3ª imediatamente, isso poderá levar à perda potencial da 1ª, se o servidor morrer agora.
class WorkUnit {
    public ConsumeResult<Null, byte[]> ConsumeResult { get; set; }
    private int finished = 0;

    public bool IsFinished => finished == 1;

    public void Finish() {
        Interlocked.Increment(ref finished);
    }
}


handleWorkUnitcomo foi dito, o async voidmétodo e, consequentemente, está completamente envolvido try-catch-finally. Em tryele chama o serviço necessário, e em finally- workUnit.Finish().

Os serviços são bastante triviais. Aqui, por exemplo, qual código é executado quando o usuário envia uma nova mensagem:
private async Task<ServiceResult> createShareItem(CreateShareItemMessage msg) {
    byte[] message;
    byte[] messageToPals1 = null;
    int?[] partitions1 = null;

    //  UserId  .
    long? userId = hashService.ValidateSessionIdentifier(msg.SessionIdentifier);
    if (userId != null) {
        var shareItem = new ShareItemModel(
            requestIdentifier: msg.RequestIdentifier,
            roomIdentifier: msg.RoomIdentifier,
            creatorId: userId,
            timeOfCreation: null,
            type: msg.ShareItemType,
            content: msg.Content
        );

        //      null,
        //     .
        long? timeOfCreation = await storageService.CreateShareItem(shareItem);
        if (timeOfCreation != null) {
            //      .
            List<long> pals = await inMemoryStorageService.GetRoomPals(
                msg.RoomIdentifier
            );
            if (pals == null) {
            	//     -       .
                pals = await storageService.GetRoomPals(msg.RoomIdentifier);
                await inMemoryStorageService.SaveRoomPals(msg.RoomIdentifier, pals);
            }

            //    ,  .
            pals.Remove(userId.Value);

            if (pals.Count > 0) {
            	//  ack,  ,    
                //    .
                await storageService.CreateAck(
                    msg.RequestIdentifier, userId.Value, msg.RoomIdentifier,
                    timeOfCreation.Value, pals
                );

                // in -  UserId, out -   frontend ,
                //    .  -   -
                //   null.
                partitions1 = await inMemoryStorageService.GetUserPartitions(pals);

                List<long> onlinePals = getOnlinePals(pals, partitions1);

                //    ,       .
                //         .
                if (onlinePals.Count > 0) {
                    messageToPals1 = converterService.EncodeNewShareItemMessage(
                        userId.Value, timeOfCreation.Value, onlinePals, shareItem
                    );
                    nullRepeatedPartitions(partitions1);
                    // -         
                    // frontend ,    null' .
                }
            }

            message = converterService.EncodeSuccessfulShareItemCreationMessage(
                msg.RequestIdentifier, timeOfCreation.Value
            );
        } else {
            message = converterService.EncodeMessage(
                MessageCode.RoomNotFound, msg.RequestIdentifier
            );
        }
    } else {
        message = converterService.EncodeMessage(
            MessageCode.UserNotFound, msg.RequestIdentifier
        );
    }

    return new ServiceResult(
        message: message, //    .
        messageToPals1: messageToPals1, //  -    .
        partitions1: partitions1
    );
}


Base de dados


A maioria das funcionalidades dos serviços chamados pelos servidores de back-end é simplesmente adicionar novos dados ao banco de dados e processar os existentes. Obviamente, como o banco de dados está organizado e como operamos é muito importante para o messenger, e aqui gostaria de dizer que abordei a questão de escolher um banco de dados com muito cuidado depois de estudar cuidadosamente todas as opções, mas não é assim. Acabei de escolher o CockroachDb porque promete muito com o mínimo de esforço e possui sintaxe compatível com o postgres (eu trabalhei com o postgres antes). Havia pensamentos em usar Cassandra, mas no final decidi me debruçar sobre algo familiar. Eu nunca havia trabalhado com Kafka, ou com Rabbit, ou com Flutter e Dart, ou com WebRtc, então decidi não arrastar Cassandra também, porque tinha medo de me afogar em uma série de novas tecnologias para mim.

De todas as partes do meu projeto, o design do banco de dados é o que mais duvido. Não tenho certeza de que as decisões que tomei sejam realmente boas . Tudo funciona, mas poderia ser feito melhor. Por exemplo, existem tabelas ShareRooms (como chamo chats) e ShareItems (como chamo mensagens). Portanto, todos os usuários que entram em uma sala são registrados no campo jsonb dessa sala. Isso é conveniente, mas obviamente muito lento, então provavelmente o refarei usando chaves estrangeiras. Ou, por exemplo, a tabela ShareItems armazena todas as mensagens. O que também é conveniente, mas como o ShareItems é uma das tabelas mais carregadas (persistente selecteinsert), pode valer a pena criar uma nova tabela para cada sala ou algo parecido. O Kokroach dispersa registros em diferentes nós; portanto, é preciso pensar cuidadosamente sobre qual registro irá para obter o máximo desempenho, mas não o fiz. Em geral, como pode ser entendido de todas as opções acima, os bancos de dados não são meu ponto forte. No momento, geralmente estou testando tudo para o postgres, e não para o kokroach, porque há menos carga na minha máquina de trabalho, pois ela é tão pobre em cargas que decolará em breve. Felizmente, o código para postgres e kokroach difere bastante, portanto, mudar não é difícil.

Agora, estou estudando como a cocroach realmente funciona (como o mapeamento ocorre entre SQL e o valor-chave (a cocroach usa o RocksDb sob o capô), como distribui dados entre nós, replica etc.). Certamente, valia a pena estudar o cocroach antes de usá-lo, mas antes tarde do que nunca.

Eu acho que a base sofrerá grandes mudanças quando eu melhorar a compreensão desse problema. No momento, a mesa Acks está me assombrando. Nesta tabela, armazeno dados sobre quem ainda não recebeu e quem ainda não leu a mensagem (para mostrar as marcas de seleção do usuário). É fácil notificar o usuário que sua mensagem foi lida se o usuário estiver on-line agora, mas se não estiver, precisamos salvar essas informações para notificá-lo mais tarde. E como as conversas em grupo estão disponíveis, não basta armazenar a sinalização, você precisa de dados sobre usuários individuais. Então aqui pedimos diretamente o uso de strings de bits (uma linha para usuários que ainda não receberam, a segunda - para aqueles que ainda não leram). Especialmente suporte kokroach bitebit varying. No entanto, nunca descobri como implementar esse negócio, uma vez que a composição dos quartos pode mudar constantemente. Para que as cadeias de bits mantenham seu significado, os usuários na sala devem permanecer na mesma ordem, o que é bastante difícil de fazer quando, por exemplo, algum usuário sai da sala. Existem opções aqui. Talvez valha a pena escrever -1 em vez de excluir o usuário do campo jsonb para que a ordem seja preservada ou usando algum método de controle de versão, para que saibamos que essa cadeia de bits se refere à ordem dos usuários, que era então, e não na ordem atual de usuários. Ainda estou pensando em como implementar melhor esse negócio, mas, por enquanto, aqueles que ainda não receberam e não leram os usuários também são apenas campos jsonb. Dado que a tabela Acks é gravada com cada mensagem, a quantidade de dados é grande.Embora o registro, é claro, seja excluído quando a mensagem for recebida e lida por todos.

Flutter


Durante muito tempo, trabalhei no servidor e usei clientes simples de console para o teste, por isso nem criei um projeto Flutter. E quando eu o criei, pensei que a parte do servidor era uma parte complexa e o aplicativo é assim, lixo, eu vou descobrir isso em alguns dias. Enquanto trabalhava no servidor, criei o Hello Worlds para flutter algumas vezes para ter uma ideia da estrutura e, como o messenger não requer nenhuma interface do usuário complexa, pensei que estava completamente pronto. Portanto, a interface do usuário é realmente um lixo, mas a implementação da funcionalidade me deu problemas (e ainda será entregue, pois nem tudo está pronto).

Gestão estatal


O tópico mais popular. Existem milhares de maneiras de gerenciar sua condição, e a abordagem recomendada é alterada a cada seis meses. Agora, o mainstream é o provedor. Pessoalmente, eu escolhi duas maneiras para mim: bloco e redux. Bloco (Business Logic Component) para gerenciamento de estado local e redux para gerenciamento global.

O bloco não é um tipo de biblioteca (embora, é claro, também exista uma biblioteca que reduz o padrão, mas eu não a uso). O bloco é uma abordagem baseada em fluxo. Em geral, o dardo é uma linguagem bastante agradável, e os fluxos são geralmente muito agradáveis. A essência dessa abordagem é que colocamos toda a lógica de negócios em serviços e nos comunicamos entre a interface do usuário e os serviços por meio de um controlador que nos fornece vários fluxos. O usuário clicou no botão "encontrar contato"? Usandosink(no outro extremo do fluxo), enviamos um evento para o controlador SearchContactsEvent, o controlador chama o serviço desejado, aguarda o resultado e retorna a lista de usuários de volta à interface do usuário através do fluxo também. A interface do usuário aguarda resultados usando StreamBuilder(widget que é reconstruído toda vez que novos dados chegam ao fluxo em que são inscritos). Isso, de fato, é tudo. Em alguns casos, precisamos atualizar a interface do usuário sem nenhum envolvimento do usuário (por exemplo, quando uma nova mensagem chegou), mas isso também é feito facilmente através de fluxos. De fato, um MVC simples com fluxos, sem mágica.

Comparado a algumas outras abordagens, o bloco requer mais clichê, mas, na minha opinião, é melhor usar soluções nativas sem a participação de bibliotecas de terceiros, a menos que o uso de uma solução de terceiros dê alguns significantesvantagens. Quanto mais abstrações, mais difícil é entender qual é o erro quando ocorre um erro. Não considero as vantagens do provedor significativas o suficiente para mudar para ele. Mas como tenho pouca experiência nessa área, é provável que mude de campo no futuro.

Bem, sobre redux, e todo mundo sabe tudo, então não há nada a dizer. Além disso, recortei o aplicativo :) Utilizei-o para gerenciar minha conta, mas, percebendo que, nesse caso, não há vantagens especiais sobre o bloco, recortei-o para não arrastar muito. Mas, em geral, considero o redux uma coisa útil para gerenciar o estado global.

A parte mais excruciante


O que devo fazer se o usuário enviou uma mensagem, mas antes de ser enviada, a conexão com a Internet foi perdida? O que devo fazer se o usuário receber uma confirmação de leitura, mas ele fechar o aplicativo antes da atualização do registro correspondente no banco de dados? O que devo fazer se o usuário convidar seu amigo para a sala, mas antes do envio do convite, sua bateria acaba? Você já fez perguntas semelhantes? Aqui estou. Antes. Mas no processo de desenvolvimento, comecei a me perguntar. Como a conexão pode desaparecer a qualquer momento e o telefone desligar a qualquer momento, tudo deve ser confirmado . Não tem graça. Portanto, a primeira mensagem que o cliente envia ao servidor ( Joinse você se lembra) não é apenas "Olá, estou online" , é"Olá, estou on-line e aqui estão as salas não confirmadas, as ações não confirmadas, as operações de associação de quartos não confirmadas e as últimas mensagens recebidas por sala . " E o servidor responde com uma folha semelhante: “Enquanto você estava offline, essas e tais mensagens foram lidas por esses e tais usuários, e eles também convidaram Petya para esta sala, e Sveta deixou essa sala e você foi convidado para esta sala, mas estes dois quartos têm 40 novos postos . " Eu realmente gostaria de saber como coisas semelhantes são feitas em outros mensageiros, porque minha implementação não brilha com graça.

Imagens


No momento, você pode enviar texto, texto + imagens e apenas imagens. O upload de vídeo ainda não foi implementado. As imagens são compactadas um pouco e salvas no armazenamento do Firebase. A mensagem em si contém links. Após o recebimento da mensagem, o cliente baixa imagens, gera miniaturas e salva tudo no sistema de arquivos. Os caminhos do arquivo são gravados no banco de dados. A propósito, a geração de miniaturas é o único código executado em um encadeamento separado, pois é uma operação de computação pesada. Eu apenas inicio um fluxo de trabalho, alimento uma imagem e, em troca, recebo uma miniatura. O código é extremamente simples, pois o dardo fornece abstrações convenientes para trabalhar com fluxos.

ThumbnailGeneratorService
class ThumbnailGeneratorService {
  SendPort _sendPort;
  final Queue<Completer<Uint8List>> _completerQueue =
      Queue<Completer<Uint8List>>();

  ThumbnailGeneratorService() {
    var receivePort = ReceivePort();
    Isolate.spawn(startWorker, receivePort.sendPort);

    receivePort.listen((data) {
      if (data is SendPort) {
        _sendPort = data;
      } else {
        var completer = _completerQueue.removeFirst();
        completer.complete(data);
      }
    });
  }

  static void startWorker(SendPort sendPort) async {
    var receivePort = ReceivePort();
    sendPort.send(receivePort.sendPort);

    receivePort.listen((imageBytes) {
      Image image = decodeImage(imageBytes);
      Image thumbnail = copyResize(image, width: min(image.width, 200));

      sendPort.send(Uint8List.fromList(encodePng(thumbnail)));
    });
  }

  Future<Uint8List> generate(Uint8List imageBytes) {
    var completer = Completer<Uint8List>();
    _completerQueue.add(completer);
    
    _sendPort.send(imageBytes);

    return completer.future;
  }
}


A autenticação do Firebase também é usada, mas apenas para autorização de acesso ao armazenamento do Firebase (para que o usuário não possa, digamos, preencher a foto do perfil para outra pessoa ). Todas as outras autorizações são feitas através dos meus servidores.

Formato da mensagem


Você provavelmente está horrorizado aqui, pois eu uso matrizes regulares de bytes. Json desaparece porque é necessária eficiência, e eu não sabia sobre o protobuf quando comecei. O uso de matrizes requer muito cuidado, pois um índice está errado e as coisas dão errado.

Os primeiros 4 bytes são o comprimento da mensagem.
O próximo byte é o código da mensagem.
Os próximos 16 bytes são o identificador de solicitação (uuid).
Os próximos 40 bytes são o token de autorização.
O resto da mensagem .

Comprimento da mensagemnecessário, pois não uso soquetes http ou web, ou algum outro protocolo que forneça a separação de uma mensagem da outra. Meus servidores de front-end veem apenas fluxos de bytes e precisam saber onde uma mensagem termina e outra começa. Existem várias maneiras de separar mensagens (por exemplo, use algum tipo de caractere nunca encontrado nas mensagens como separador), mas eu preferi especificar o tamanho, pois esse método é o mais fácil, embora implique sobrecarga, pois muitas mensagens estão ausentes e um byte para indicar o comprimento.

O código da mensagem é apenas um dos membros da enumeraçãoMessageCode. O roteamento é realizado de acordo com o código e, como podemos extrair o código da matriz sem desserialização preliminar, o servidor front-end decide em qual tópico do kafka enviar uma mensagem em vez de delegar essa responsabilidade a outra pessoa.

Identificação do Pedidopresente na maioria das postagens, mas não em todas. Ele executa 2 funções: por esse identificador, o cliente estabelece a correspondência entre a solicitação enviada e a resposta recebida (se o cliente enviou as mensagens A, B, C nesta ordem, isso não significa que as respostas também serão ordenadas). A segunda função é evitar duplicatas. Como mencionado anteriormente, o kafka garante pelo menos uma vez a entrega. Ou seja, em casos raros, as mensagens ainda podem ser duplicadas. Ao adicionar a coluna RequestIdentifier com uma restrição exclusiva à tabela de banco de dados desejada, podemos evitar a inserção de uma duplicata.

Token de autorizaçãoÉ uma assinatura de UserId (8 bytes) + 32 bytes HmacSha256. Eu não acho que vale a pena usar o Jwt aqui. JWT é cerca de 7-8 vezes maior para quê? Meus usuários não têm reivindicações, portanto, uma assinatura hmac simples é adequada. A autorização através de outros serviços não é e não é planejada.

Chamadas de áudio e vídeo


É engraçado adiar deliberadamente a implementação de chamadas de áudio e vídeo, porque tinha certeza de que não seria capaz de resolver os problemas, mas, na verdade, esse foi um dos recursos mais fáceis de implementar. Pelo menos a funcionalidade básica. Em geral, basta adicionar o WebRtc ao aplicativo e obter a primeira sessão de vídeo em apenas algumas horas e, milagrosamente, o primeiro teste foi bem-sucedido. Antes disso, pensei que o código que funcionava pela primeira vez era um mito. Normalmente, o primeiro teste de um novo recurso sempre falha devido a algum tipo de erro estúpido como "adicionou um serviço, mas não o registrou em um contêiner DI".

Não é muito breve sobre o WebRtc para os não iniciados
WebRtc — , peer-to-peer , , peer-to-peer , . - , , . , .

(peer-to-peer), , 3 ( , 3 . 3 ).

— stun . stun , — Source IP Source Port , . ? - . IP . - , , , Source IP Source Port IP - NAT [ Source IP | Source Port | Router External IP | Router Port ]. - , Dest IP Dest Port Router External IP Router Port NAT, , Source IP — Source Port , . , , , , , , NAT . stun NAT . stun Router External IP — Router Port. — . , «» NAT (NAT traversal) , NAT , stun .
* NAT , . , , WebRtc .

— turn. , , peer-to-peer . Fallback . , , , , , peer-to-peer . turn — coturn, .

— . , . , . — . . , , , :) — .

WebRtc 3 : offer, answer candidate. offer , answer, . , , , , . ( ) , .


A própria tecnologia WebRtc estabelece uma conexão e está envolvida na transferência de fluxos para lá e para cá, mas essa não é uma estrutura para a criação de chamadas completas. Por chamada, quero dizer uma sessão de comunicação com a capacidade de cancelar, rejeitar e aceitar a chamada, além de desligar. Além disso, você precisa informar ao chamador se o outro lado já está ocupado. E também para implementar pequenas coisas como "aguarde uma resposta para a chamada N segundos e redefina". Se você simplesmente implementar o WebRtc no aplicativo de forma simples, com uma chamada recebida, a câmera e o vídeo serão ativados espontaneamente, o que, é claro, é inaceitável.

Em sua forma pura, o WebRtc geralmente implica enviar candidatos para a outra parte o mais rápido possível, para que as negociações comecem o mais rápido possível, o que é lógico. Nos meus testes, os candidatos à parte receptora geralmente semprecomeçou a chegar mesmo antes da oferta chegar. Esses candidatos "antecipados" não podem ser descartados, eles devem ser lembrados, para que mais tarde, quando a oferta chegar e RTCPeerConnectionfor criada, adicione-os à conexão. O fato de os candidatos começarem a chegar antes mesmo da oferta, além de outros motivos, torna a implementação de chamadas completas uma tarefa não trivial. O que fazer se vários usuários nos ligar de uma vez? Receberemos candidatos de todos e, embora possamos separar candidatos de um usuário de outro, não fica claro quais candidatos rejeitar, porque não sabemos de quem a oferta será apresentada mais cedo. Também haverá problemas se os candidatos começarem a vir até nós e, em seguida, uma oferta no momento em que nós mesmos ligarmos para alguém.

Depois de testar várias opções com um WebRtc simples, cheguei à conclusão de que, dessa forma, seria problemático e cheio de vazamentos de memória para tentar fazer chamadas, então decidi adicionar outro estágio ao processo de negociação do WebRtc. Eu chamo essa fase Inquire - Grant/Refuse.

A ideia é muito simples, mas demorei um pouco para alcançá-la. O chamador antes mesmo de criar o fluxo e RTCPeerConnection(e geralmente antes de executar qualquer código relacionado ao WebRtc) envia uma mensagem através do servidor de sinal para o outro lado Inquire. No lado de recebimento, é verificado se o usuário está em alguma outra sessão de comunicação no momento ( boolcampo simples ). Se for, uma mensagem será enviada de volta.Refuse, e dessa maneira informamos ao chamador que o usuário está ocupado e ao receptor - que esse e aquele telefone ligaram enquanto ele estava ocupado com outra conversa. Se o usuário estiver atualmente livre, ele será reservado . O Inquireidentificador da sessão é enviado na mensagem e esse identificador é definido como o identificador da sessão atual . Se o usuário estiver reservado, ele rejeitará todas as Inquire/Offer/Candidatemensagens com identificadores de sessão diferentes do atual. Após a reserva, o destinatário envia uma mensagem através do servidor de sinal ao chamador Grant. Vale dizer que esse processo não é visível para o usuário receptor, pois ainda não há chamada. E o principal aqui é não esquecer de desligar o tempo limite no lado receptor. De repente, reservaremos uma sessão e nenhuma oferta se seguirá.

O chamador recebe Grante é aqui que o WebRtc começa com ofertas, candidatos e é para todos. A oferta voa para o destinatário e, após o recebimento, exibe uma tela com os botões Atender / Rejeitar. Mas os candidatos, como sempre, não esperam ninguém. Eles novamente começam a chegar ainda mais cedo que a oferta, porque não há motivo para esperar o usuário atender a chamada. Ele pode não responder, mas rejeitar ou esperar até o tempo limite expirar - então os candidatos serão simplesmente jogados fora.

Status atual e planos futuros


  • Bate-papo privado e em grupo
  • Enviando texto, imagens e vídeos
  • Chamadas de áudio e vídeo
  • Confirmação de recebimento e leitura
  • "Imprime ..."
  • Notificações
  • Pesquisa por código QR e geolocalização


A busca pelo código QR é inesperadamente bastante problemática de implementar, porque quase todos os plugins para a verificação de código que tentei recusar-se a iniciar ou não funcionam corretamente. Mas acho que os problemas serão resolvidos aqui. E para a implementação da busca por geolocalização, ainda não iniciei. Em teoria, não deve haver problemas especiais.

Notificações em andamento, além de enviar vídeos.

O que mais precisa ser feito?


Ah, muito.
Em primeiro lugar, não há testes. Os colegas costumavam escrever testes, então eu relaxei completamente.
Em segundo lugar, não é possível convidar usuários para um bate-papo existente e sair do bate-papo. O código do servidor está pronto para isso, o código do cliente não.
Em terceiro lugar, se o tratamento de erros no servidor for mais ou menos, não haverá tratamento de erros no cliente. Não basta apenas fazer uma entrada de log; você precisa repetir a operação. Agora, por exemplo, o mecanismo para reenviar mensagens não está implementado.
Quarto, o servidor não executa ping no cliente, portanto, a desconexão não é detectada se, por exemplo, o cliente perdeu a Internet. A desconexão é detectada apenas quando o cliente fecha o aplicativo.
Quinto, os índices não são usados ​​no banco de dados.
Sexta, otimização. O código tem um grande número de lugares onde algo como está escrito // @@TODO: Pool. A maioria das matrizes é exatamente newisso. O servidor back-end cria muitas matrizes de comprimento fixo, portanto, aqui você pode e deve usar o pool.
Sétimo, há muitos lugares no cliente onde o código awaittermina, embora isso não seja necessário. O envio de imagens, por exemplo, parece lento porque o códigoawaitEle salva imagens no sistema de arquivos e gera miniaturas antes de exibir a mensagem, embora nada disso precise ser feito. Ou, por exemplo, se você abrir o aplicativo e, durante a sua ausência, receber imagens, a inicialização será lenta, porque novamente todas essas imagens serão baixadas, salvas no sistema, miniaturas são geradas e somente depois que a inicialização for concluída e você for exibido na tela inicial na tela inicial. Todos esses redundantes awaitforam criados para facilitar a depuração, mas é claro que você precisa se livrar da espera desnecessária antes do lançamento.
OitavoA interface do usuário agora está meio pronta, porque ainda não decidi como quero vê-la. Portanto, agora tudo não é intuitivo, metade dos botões não está claro o que está fazendo. E os botões muitas vezes não são pressionados na primeira vez, porque agora são apenas ícones com GestureDetectore sem preenchimento, portanto nem sempre é possível entrar neles. Além disso, em alguns lugares, o excesso de pixels não é fixo.
Nono, agora é impossível fazer login em uma conta, apenas se inscrever. Portanto, se você desinstalar o aplicativo e reinstalá-lo, não poderá fazer login na sua conta :)
Décimo, o código de verificação não será enviado para o correio. Agora, o código geralmente é sempre o mesmo, novamente porque é mais fácil depurar.
Décima primeiraO princípio de responsabilidade única é violado em muitos lugares. Precisa de um refator. As classes responsáveis ​​pela interação com o banco de dados (no cliente e no servidor) geralmente são muito inchadas, porque estão envolvidas em todas as operações do banco de dados.
Em décimo segundo, o servidor front - end agora sempre espera uma resposta do servidor, mesmo que a mensagem não implique o envio de uma resposta (por exemplo, uma mensagem com um código IsTypinge algumas mensagens relacionadas ao WebRtc). Portanto, sem esperar por uma resposta, ele grava um erro no console, embora isso não seja um erro.
Décimo terceiro, imagens completas não abrem na torneira.
Cem milhões de quintosalgumas mensagens que precisam ser enviadas em lotes são enviadas separadamente. O mesmo se aplica a algumas operações de banco de dados. Em vez de executar um único comando, os comandos são executados em um loop com await(brr ..).
Cem milhões de sextos, alguns valores são codificados em vez de serem configuráveis.
Cento e um milhão sétimoso logon no servidor agora é apenas para o console e, em geral, para o cliente, diretamente para o widget. Na tela principal, há uma guia Logs, onde todos os logs na torneira são descartados. O fato é que minha máquina de trabalho se recusa a executar o emulador e tudo o que é necessário para o servidor (kafka, banco de dados, rabanete e todos os servidores). O débito com um dispositivo conectado também não deu certo, tudo ficou pendurado na metade dos casos, porque o computador não suportava as cargas. Portanto, você precisa criar uma compilação toda vez, soltá-la no dispositivo, instalar e testar assim. Para ver os logs, eu os solto direto no widget. Perversão, eu sei, mas não há escolha. Pelo mesmo motivo, muitos métodos retornam FutureeawaitEles são (para capturar a exceção e lançar no widget), embora não devam. Se você olhar o código, verá um _logErrormétodo feio em muitas classes que faz isso. Isso, é claro, também irá para o lixo.
Cem milhões e oito, sem som.
Cem milhões e nono, você precisa usar mais o cache.
Cem milhões décimos, muito código repetitivo. Por exemplo, muitas ações primeiro verificam a validade do token e, se não for válido, elas enviam um erro. Eu acho que você precisa implementar um pipeline de middleware simples.

E muitas pequenas coisas, como concatenar strings em vez de usar StringBuilder'a,Disposenem todo lugar é chamado onde deveria, e assim por diante. Em geral, o estado normal do projeto está em desenvolvimento. Tudo isso é solucionável, mas há um problema fundamental que eu não pensava até o último momento, porque saiu da minha cabeça - o messenger deve funcionar mesmo quando o aplicativo não está aberto e o meu não funciona. Para ser sincero, a solução para esse problema ainda não passou pela minha cabeça. Aqui, aparentemente, você não pode prescindir do código nativo.

Eu classificaria a disponibilidade do projeto em 70%.

Sumário


Seis meses se passaram desde o início dos trabalhos no projeto. Combinado com o trabalho de meio período e fazia longas pausas, mas, mesmo assim, o tempo e o esforço foram decentes. Eu pretendo implementar todos os recursos declarados + adicionar algo incomum como jogo da velha ou rascunhos na sala. Por nenhuma razão, apenas porque é interessante.

Se você tiver alguma dúvida, escreva. O correio está no github.

All Articles