As I wrote my messenger

One evening, after another frustrating day, filled with attempts to balance the game, I decided that I urgently needed a rest. I’ll switch to another project, do it quickly, return the self-esteem that has rolled down during the development of the game, and I will take the game by storm with renewed vigor! The main thing is to choose a nice and relaxing project ... Write your own messenger? Ha! How hard can it be?

The code can be found here .


Brief Background


For almost a year before starting work on the messenger, he had been working on the online multiplayer Line Tower Wars game. Programming went well, everything else (balance and visual in particular) was not very good. Suddenly it turned out that making a game and making a fun game (fun for someone other than himself) are two different things. After a year of ordeal, I needed to get distracted, so I decided to try my hand at something else. The choice fell on mobile development, namely Flutter. I heard a lot of good things about Flutter, and I liked the dart after a short experiment. I decided to write my own messenger. Firstly, it’s good practice to implement both client and server. Secondly, there will be something significant to put in the portfolio to look for work, I'm just in the process.

Scheduled Functionality


  • Private and group chats
  • Sending text, images and videos
  • Audio and video calls
  • Confirmation of receipt and reading (ticks from Votsap)
  • "Prints ..."
  • Notifications
  • Search by QR code and geolocation

Looking ahead, I can proudly (and with relief) say that almost everything planned has been implemented, and that has not yet been implemented - will be implemented in the near future.



Language selection


I did not think for a long time with the choice of language. At first, it was tempting to use the dart for both the client and the server, but a more detailed inspection showed that there are not many drivers for darts available, and those that are not inspire much confidence. Although I will not vouch to talk about the current moment, the situation may have improved. So my choice fell on C #, with which I worked in Unity.

Architecture


He started with thinking over architecture. Of course, considering that 3 and a half people will most likely use my messenger, one would not have to bother with architecture in general. You take and do as in countless tutorials. Here is the node, here is the mongo, here are the web sockets. Done. And Firebase is around here. But it’s not interesting. I decided to make a messenger that can easily scale horizontally, as if I expect millions of simultaneous clients. However, since I had no experience in this area, I had to learn everything in practice by the method of errors and again errors.

The final architecture looks like this


I do not claim that such an architecture is super cool and reliable, but it is viable and in theory should withstand heavy loads and scale horizontally, but I don’t really understand how to check. And I hope that I did not miss some obvious moment that is known to everyone except me.

Below is a detailed description of the individual components.

Frontend server


Even before I started making the game, I was fascinated by the concept of an asynchronous single-threaded server. Effectively and without potential race'ov - what else can you ask for. In order to understand how such servers are arranged, I began to delve into the asynciopython language module . The solution I saw seemed very elegant. In short, the pseudo-code solution looks like this.
//  ,      ,    
//       .      socket.Receive
//     , :
var bytesReceived = Completer<object>();
selector.Register(
    socket,
    SocketEvent.Receive,
    () => bytesReceived.Complete(null)
);

await bytesReceived.Future;

int n = socket.Receive(...); //   

// selector -     poll.   
//        (Receive 
//  ), ,    ,  .
//   completer,      ,
//        , ,     .
//     ,       .

Using this simple technique, we can serve a large number of sockets in a single thread. We never block a stream while waiting for bytes to be received or sent. The stream is always busy with useful work. Concurrency, in a word.

Frontend servers are implemented that way. They are all single-threaded and asynchronous. Therefore, for maximum performance, you need to run as many servers on one machine as it has cores (4 in the picture).

The Frontend server reads the message from the client and, based on the message code, sends it to one of the topics in Kafka.

A small footnote for those who are not familiar with kafa
, RabbitMQ. . , ( authentication backend authentication, ). ? - , (partition). , . , , . , ( , , , (headers)).

? ? . (consumer) ( consumer'), ( ) . , , , 2 , . 3 — 2. .

The frontend server sends a message to the kafka without a key (when there is no key, the kafka simply sends messages to the party in turn). The message is pulled from the topic by one of the corresponding backend servers. The server processes the message and ... what next? And what further depends on the type of message.

In the most common case, a request-response cycle occurs. For example, for a registration request, we just need to give the client an answer ( Success,EmailAlreadyInUse, etc). But to a message containing an invitation to an existing chat of new members (Vasya, Emil and Julia), we need to respond immediately with three different types of messages. The first type - you need to notify the inviter about the outcome of the operation (suddenly a server error occurred). The second type - you need to notify all current members of the chat that there are now such and such new members in the chat. The third is to send invitations to Vasya, Emil and Yulia.

Okay, that doesn’t sound very difficult, but in order to send a message to any client we need to: 1) find out which frontend server this client is connected to (we don’t choose which server the client will connect to, the balancer decides for us); 2) send a message from the backend server to the desired frontend server; 3) in fact, send a message to the client.

To implement points 1 and 2, I decided to use a separate topic ("frontend servers" topic). Separation of authentication, session, and call topics into partitions serves as a parallelization mechanism. We see that the session server is heavily loaded? We just add a couple of new partition and session servers, and Kafka will redistribute the load for us, unloading the existing session servers. Separation of the "frontend servers" topic into the partition serves as a routing mechanism .

Each frontend server corresponds to one part of the "frontend servers" topic (with the same index as the server itself). That is, server 0 - partition 0, and so on. Kafka makes it possible to subscribe not only to a specific topic, but also to a specific part of a certain topic. All frontend servers on start-ups subscribe to the corresponding partition. Thus, the backend server is able to send a message to a specific frontend server by sending a message to a specific partition.

Okay, now when the client joins, you just need to save somewhere a pair of UserId - Frontend Server Index. In case of disconnect - delete. For these purposes, any of the many in-memory key-value databases will do. I chose a radish.

How it looks in practice. First of all, after the connection is established, client Andrey sends a message to the server Join. The Frontend server receives the message and forwards it to the session topic, preliminarily adding the “Frontend Server” header: {index}. One of the backend session servers will receive a message, read the authorization token, determine what kind of user has joined, read the index added by the frontend server and write UserId - Index to the radish. From this moment, the client is considered online, and now we know through which frontend server (and, accordingly, through which part of the "frontend servers" topic) we can "reach out" to it when other clients send messages to Andrey.

* In fact, the process is a little more complicated than I described. You can find it in the source code.

Frontend server pseudo code


// Frontend Server 6
while (true) {
    // Consume from "Frontend Servers" topic, partition 6
    var messageToClient = consumer.Consume();
    if (message != null) {
        relayMessageToClient(messageToClient);
    }

    var callbacks = selector.Poll();
    while (callbacks.TryDequeue(out callback)) {
        callback();
    }

    long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
    while (!callAtQueue.IsEmpty && callAtQueue.PeekPriority() <= now) {
        callAtQueue.Dequeue()();
    }

    while (messagesToRelayToBackendServers.TryDequeue(out messageFromClient)) {
        // choose topic
        producer.Produce(topic, messageFromClient);
    }
}


There are a few tricks here.
1) relayMessageToClient. It will be a mistake to simply take the socket you want and immediately start sending a message to it, because maybe we are already sending some other message to the client. If we start sending bytes without checking if the socket is currently busy, the messages will be mixed. As in many other places where orderly data processing is required, the trick is to use a queue, namely, a queue from Completers ( TaskCompletionSourcein C #).
void async relayMessageToClient(message) {
    // find client
    await client.ReadyToSend();
    await sendMessage(client, message);
    client.CompleteSend();
}

class Client {
    // ...
    sendMessageQueue = new LinkedList<Completer<object>>();

    async Future ReadyToSend() {
        var sendMessage = Completer<object>();
	if (sendMessageQueue.IsEmpty) {
	    sendMessageQueue.AddLast(sendMessage);
	} else {
	    var prevSendMessage = sendMessageQueue.Last;
	    sendMessageQueue.AddLast(sendMessage);
	    await prevSendMessage.Future;
	}
    }

    void CompleteSend() {
        var sendMessage = sendMessageQueue.RemoveFirst();
	sendMessage.Complete(null);
    }
}

If the queue is not empty, then the socket is already occupied at the moment. Create a new one completer, add it to the queue and awaitthe previous one completer . Thus, when the previous message is sent, it CompleteSendwill complete completer, which will cause the server to start sending the next message. Such a queue also allows smoothly propagate exceptions. Suppose an error occurred while sending a message to a client. In this case, we need to complete, with the exception of sending not only this message, but also all messages that are currently waiting in the queue (hang on await'ah). If we do not, then they will continue to hang, and we will receive a memory leak. For brevity, the code that does this is not shown here.

2)selector.Poll. Actually, it’s not even a trick, but just an attempt to smooth out the shortcomings of the method implementation Socket.Select( selector- just a wrapper over this method). Depending on the OS under the hood, this method uses either selector poll. But this is not important here. The important thing is how this method works with the lists that we feed it to the input (list of sockets for reading, writing, error checking). This method takes lists, polls sockets and leaves only those sockets in the lists that are ready to perform the required operation. All other sockets are thrown from the lists. “Kicking” occurs throughRemoveAt(that is, all subsequent elements are shifted, which is inefficient). Plus, since we need to poll all registered sockets every iteration of the cycle, such a “cleansing” is generally harmful, we have to re-fill the lists every time. We can get around all these problems using a custom one List, RemoveAtwhose method does not remove the item from the list, but simply marks it as deleted. The class ListForPollingis my implementation of such a list. ListForPollingonly works with the method Socket.Selectand is not suitable for anything else.

3)callAtQueue. In most cases, the frontend server, having sent the client message to the backend server, expects a response (confirmation that the operation was successful, or an error if something went wrong). If he does not wait for an answer within a configurable period of time, he sends an error to the client so that he does not wait for an answer that will never come. callAtQueueIs a priority queue. Immediately after the server sends the message to Kafka, it does something like this:
long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
callAtQueue.Enqueue(callback, now + config.WaitForReplyMSec);

In the callback, waiting for a response is canceled and the server error sending begins. If a response from the backend server is received, the callback does nothing. There is no way to

use await Task.WhenAny(answerReceivedTask, Task.Delay(x))it, since the code after it Task.Delayis executed on the thread from the pool.

Here, in fact, everything about frontend servers. A slight correction is required here. In fact, the server is not fullysingle threaded. Of course, kafka under the hood uses threads, but I mean the application code. The fact is that sending a message to the topic of kafka (produce) may not succeed. In the event of a failure, Kafka repeats sending a certain configurable number of times, but, if repeated departures fail, Kafka abandons this business as hopeless. You can check whether the message was sent successfully or not in deliveryHandlerwhich we pass to the method Produce. Kafka calls this handler in the producer's I / O thread (the thread that sends messages). We must make sure that the message was sent successfully, and if not, cancel waiting for a response from the backend server (the response will not come because the request was not sent) and send an error to the client. That is, we can’t avoid interacting with another thread.

* When writing an article, I suddenly realized that we can not pass deliveryHandlerto the method Produceor simply ignore all the kafka errors (the error will still be sent to the client by the timeout that I described earlier) - then all our code will be single-threaded. Now I’m thinking how to do it better.

Why, in fact, kafka, not rabbit?
, , , , , RabbitMQ? . , , . ? , frontend . , backend , , . , , . , error-prone. , basicGet , , , . . basicGet, , . .


Backend server


Compared to the frontend server, there are practically no interesting points here. All backend servers work the same way. At the startup, the server subscribes to the topic (authentication, session or call depending on the role), and the kafka assigns one or more partitions to it. The server receives the message from Kafka, processes and usually sends one or more messages in response. Almost real code:
void Run() {
    long lastCommitTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();

    while (true) {
        var consumeResult = consumer.Consume(
            TimeSpan.FromMilliseconds(config.Consumer.PollTimeoutMSec)
        );

        if (consumeResult != null) {
            var workUnit = new WorkUnit() {
                ConsumeResult = consumeResult,
            };

            LinkedList<WorkUnit> workUnits;
            if (partitionToWorkUnits.ContainsKey(consumeResult.Partition)) {
                workUnits = partitionToWorkUnits[consumeResult.Partition];
            } else {
                workUnits = partitionToWorkUnits[consumeResult.Partition] =
                    new LinkedList<WorkUnit>();
            }

            workUnits.AddLast(workUnit);

            handleWorkUnit(workUnit);
        }

	if (
            DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - lastCommitTime >=
            config.Consumer.CommitIntervalMSec
        ) {
            commitOffsets();
	    lastCommitTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
	}
    }
}

What kind of offsets to commit?
. — (offset) (0, 1 ). 0. TopicPartitionOffset. (consume) , ConsumeResult, , , TopicPartitionOffset. ?

at least once delivery, , ( ). , (commited) . , consumer 16, , 16 , , , . - consumer' consumer' , 16 + 1 ( + 1). 17 . N , .

I disabled auto-commit and commit myself. This is necessary because handleWorkUnit, where the message processing is actually carried out, this is a async voidmethod, therefore there is no guarantee that message 5 will be processed before message 6. Kafka stores only one commited offset (and not a set of offset), respectively, before committing the offset 6, we need to make sure that all previous messages have been processed too. In addition, one backend server can consume messages from several partitions at the same time, and, therefore, must make sure that it commits the correct offset to the corresponding partition. For this, we use a hash map of the form partition: work units. Here's what the code looks like commitOffsets(real code this time):
private void commitOffsets() {
    foreach (LinkedList<WorkUnit> workUnits in partitionToWorkUnits.Values) {
        WorkUnit lastFinishedWorkUnit = null;
        LinkedListNode<WorkUnit> workUnit;
        while ((workUnit = workUnits.First) != null && workUnit.Value.IsFinished) {
            lastFinishedWorkUnit = workUnit.Value;
            workUnits.RemoveFirst();
        }

        if (lastFinishedWorkUnit != null) {
            offsets.Add(lastFinishedWorkUnit.ConsumeResult.TopicPartitionOffset);
        }
    }

    if (offsets.Count > 0) {
        consumer.Commit(offsets);
        foreach (var offset in offsets) {
            logger.Debug(
                "{Identifier}: Commited offset {TopicPartitionOffset}",
                identifier,
                offset
            );
        }
        offsets.Clear();
    }
}

As you can see, we iterate over the units, find the last unit completed by this moment, after which there are no incomplete ones , and commit the corresponding offset. Such a loop allows us to avoid "holey" commits. For example, if we currently have 4 units ( 0: Finished, 1: Not Finished, 2: Finished, 3: Finished), we can commit only the 0th unit, because if we commit the 3rd immediately, this can lead to the potential loss of the 1st, if the server dies right now.
class WorkUnit {
    public ConsumeResult<Null, byte[]> ConsumeResult { get; set; }
    private int finished = 0;

    public bool IsFinished => finished == 1;

    public void Finish() {
        Interlocked.Increment(ref finished);
    }
}


handleWorkUnitas was said, the async voidmethod, and it, accordingly, is completely wrapped in try-catch-finally. In tryhe calls the necessary service, and in finally- workUnit.Finish().

The services are pretty trivial. Here, for example, what code is executed when the user sends a new message:
private async Task<ServiceResult> createShareItem(CreateShareItemMessage msg) {
    byte[] message;
    byte[] messageToPals1 = null;
    int?[] partitions1 = null;

    //  UserId  .
    long? userId = hashService.ValidateSessionIdentifier(msg.SessionIdentifier);
    if (userId != null) {
        var shareItem = new ShareItemModel(
            requestIdentifier: msg.RequestIdentifier,
            roomIdentifier: msg.RoomIdentifier,
            creatorId: userId,
            timeOfCreation: null,
            type: msg.ShareItemType,
            content: msg.Content
        );

        //      null,
        //     .
        long? timeOfCreation = await storageService.CreateShareItem(shareItem);
        if (timeOfCreation != null) {
            //      .
            List<long> pals = await inMemoryStorageService.GetRoomPals(
                msg.RoomIdentifier
            );
            if (pals == null) {
            	//     -       .
                pals = await storageService.GetRoomPals(msg.RoomIdentifier);
                await inMemoryStorageService.SaveRoomPals(msg.RoomIdentifier, pals);
            }

            //    ,  .
            pals.Remove(userId.Value);

            if (pals.Count > 0) {
            	//  ack,  ,    
                //    .
                await storageService.CreateAck(
                    msg.RequestIdentifier, userId.Value, msg.RoomIdentifier,
                    timeOfCreation.Value, pals
                );

                // in -  UserId, out -   frontend ,
                //    .  -   -
                //   null.
                partitions1 = await inMemoryStorageService.GetUserPartitions(pals);

                List<long> onlinePals = getOnlinePals(pals, partitions1);

                //    ,       .
                //         .
                if (onlinePals.Count > 0) {
                    messageToPals1 = converterService.EncodeNewShareItemMessage(
                        userId.Value, timeOfCreation.Value, onlinePals, shareItem
                    );
                    nullRepeatedPartitions(partitions1);
                    // -         
                    // frontend ,    null' .
                }
            }

            message = converterService.EncodeSuccessfulShareItemCreationMessage(
                msg.RequestIdentifier, timeOfCreation.Value
            );
        } else {
            message = converterService.EncodeMessage(
                MessageCode.RoomNotFound, msg.RequestIdentifier
            );
        }
    } else {
        message = converterService.EncodeMessage(
            MessageCode.UserNotFound, msg.RequestIdentifier
        );
    }

    return new ServiceResult(
        message: message, //    .
        messageToPals1: messageToPals1, //  -    .
        partitions1: partitions1
    );
}


Database


Most of the functionality of services called by backend servers is simply adding new data to the database and processing existing ones. Obviously, how the database is organized and how we operate on it is very important for the messenger, and here I would like to say that I approached the issue of choosing a database very carefully after carefully studying all the options, but this is not so. I just chose CockroachDb because it promises a lot with a minimum of effort and has postgres compatible syntax (I worked with postgres before). There were thoughts of using Cassandra, but in the end I decided to dwell on something familiar. I had never worked with Kafka, or with Rabbit, or with Flutter and Dart, or with WebRtc, so I decided not to drag Cassandra as well, because I was afraid to drown in a whole host of new technologies for me.

Of all the parts of my project, database design is the thing that I doubt the most. I’m not sure that the decisions I made are really good decisions. Everything works, but could be done better. For example, there are tables ShareRooms (as I call chats) and ShareItems (as I call messages). So all the users entering a room are recorded in the jsonb field of this room. This is convenient, but obviously very slow, so I will probably redo it using foreign keys. Or, for example, the ShareItems table stores all messages. Which is also convenient, but since ShareItems is one of the most loaded tables (persistent selectandinsert), it might be worth creating a new table for each room or something like that. Kokroach scatters records on different nodes, accordingly, you need to carefully think over which record will go in order to achieve maximum performance, but I did not. In general, as can be understood from all of the above, databases are not my strongest point. Right now I’m generally testing everything for postgres, and not kokroach, because there is less load on my working machine, it is already so poor from loads it will take off soon. Fortunately, the code for postgres and kokroach differs quite a bit, so switching is not difficult.

Now I am in the process of studying how the cocroach actually works (how the mapping occurs between SQL and key-value (the cocroach uses RocksDb under the hood), how it distributes data between nodes, replicates, etc.). It was, of course, worthwhile to study cocroach before using it, but better late than never.

I think that the base will undergo big changes when I become better at understanding this issue. Right now, the Acks table is haunting me. In this table, I store data about who has not yet received and who has not yet read the message (to show the user checkmarks). It is easy to notify the user that his message has been read if the user is online now, but if not, we need to save this information in order to notify the user later. And since group chats are available, it’s not enough just to store the flag, you need data about individual users. So here we directly ask for the use of bit strings (one line for users who have not yet received, the second - for those who have not yet read). Especially kokroach support bitandbit varying. However, I never figured out how to implement this business, given that the composition of the rooms can constantly change. In order for bit strings to retain their meaning, users in the room must remain in the same order, which is quite difficult to do when, for example, some user leaves the room. There are options here. Perhaps it’s worth writing -1 instead of deleting the user from the jsonb field so that order is preserved, or using some versioning method, so that we know that this bit string refers to the order of users, which was then then, and not on the current order of users. I'm still in the process of thinking about how to better implement this business, but for the time being, those who have not yet received and have not read the users are also just jsonb fields. Given that the Acks table is written with each message, the amount of data is large.Although the record, of course, is deleted when the message is received and read by everyone.

Flutter


For a long time I worked on the server side and used simple console clients for the test, so I did not even create a Flutter project. And when I created it, I thought that the server part was a complex part, and the application is like that, garbage, I'll figure it out in a couple of days. While working on the server, I created Hello Worlds for flutter a couple of times to get a feel for the framework, and since the messenger doesn’t need any intricate UI, I thought it was completely ready. So the UI, really, is garbage, but the implementation of the functionality gave me problems (and it will still deliver, since not everything is ready).

State management


The most popular topic. There are a thousand ways to manage your condition, and the recommended approach is changed every six months. Now the mainstream is provider. Personally, I chose 2 ways for myself: bloc and redux. Bloc (Business Logic Component) for managing local state and redux for managing global.

Bloc is not some kind of library (although, of course, there is also a library that reduces boilerplate, but I do not use it). Bloc is a stream-based approach. In general, dart is a pretty nice language, and streams are generally so sweet. The essence of this approach is that we push the entire business logic into services, and we communicate between the UI and services through a controller that provides us with various streams. Did the user click the “find contact” button? Usingsink(the other end of the stream) we send an event to the controller SearchContactsEvent, the controller will call the desired service, wait for the result and return the list of users back to the UI through the stream too. UI waits for results using StreamBuilder(widget that is rebuilt every time new data arrives in the stream it is subscribed to). That, in fact, is all. In some cases, we need to update the UI without any user involvement (for example, when a new message arrived), but this is also easily done through streams. In fact, a simple MVC with streams, no magic.

Compared to some other approaches, bloc requires more boilerplate, but, in my opinion, it is better to use native solutions without the participation of third-party libraries, unless using a third-party solution gives some significantadvantages. The more abstractions on top, the more difficult it is to understand what the error is when an error occurs. I do not consider the advantages of the provider to be significant enough to switch to it. But I have little experience in this area, so it is likely that I will change the camp in the future.

Well, about redux, and so everyone knows everything, so there’s nothing to say. Moreover, I cut it out of the application :) I used it to manage my account, but then, realizing that in this case there are no special advantages over the block, I cut it out so as not to drag too much. But in general I consider redux a useful thing for managing global state.

The most excruciating part


What should I do if the user sent a message, but before it was sent, the Internet connection was lost? What should I do if the user received a read confirmation, but he closed the application before the corresponding record in the database was updated? What should I do if the user invited his friend to the room, but before the invitation was sent, his battery died? Have you ever asked similar questions? Here I am. Before. But in the development process I began to wonder. Since the connection can disappear at any time, and the phone turns off at any time, everything must be confirmed . Not fun. Therefore, the very first message that the client sends to the server ( Joinif you remember) is not just “Hello I am online” , it is"Hello I am online and here are unconfirmed rooms, here are unconfirmed acks, here are unconfirmed room membership operations, and here are last received messages per room . " And the server responds with a similar sheet: “While you were offline, such and such your messages were read by such and such users, and they also invited Petya to this room, and Sveta left this room, and you were invited to this room, but to these two rooms have 40 new posts . I would really like to know how similar things are done in other messengers, because my implementation does not shine with grace.

Images


At the moment, you can send text, text + images and just images. Video upload has not yet been implemented. Images are compressed a bit and saved in Firebase storage. The message itself contains links. Upon receipt of the message, the client downloads images, generates thumbnails and saves everything to the file system. File paths are written to the database. By the way, thumbnail generation is the only code executed on a separate thread, since it is a compute-heavy operation. I just start one worker-stream, feed it an image and in return I get a thumbnail. The code is extremely simple, since dart provides convenient abstractions for working with streams.

ThumbnailGeneratorService
class ThumbnailGeneratorService {
  SendPort _sendPort;
  final Queue<Completer<Uint8List>> _completerQueue =
      Queue<Completer<Uint8List>>();

  ThumbnailGeneratorService() {
    var receivePort = ReceivePort();
    Isolate.spawn(startWorker, receivePort.sendPort);

    receivePort.listen((data) {
      if (data is SendPort) {
        _sendPort = data;
      } else {
        var completer = _completerQueue.removeFirst();
        completer.complete(data);
      }
    });
  }

  static void startWorker(SendPort sendPort) async {
    var receivePort = ReceivePort();
    sendPort.send(receivePort.sendPort);

    receivePort.listen((imageBytes) {
      Image image = decodeImage(imageBytes);
      Image thumbnail = copyResize(image, width: min(image.width, 200));

      sendPort.send(Uint8List.fromList(encodePng(thumbnail)));
    });
  }

  Future<Uint8List> generate(Uint8List imageBytes) {
    var completer = Completer<Uint8List>();
    _completerQueue.add(completer);
    
    _sendPort.send(imageBytes);

    return completer.future;
  }
}


Firebase auth is also used, but only for authorization of access to Firebase storage (so that the user cannot, say, fill in the profile picture to someone else ). All other authorization is done through my servers.

Message format


You are probably horrified here, as I use regular byte arrays. Json disappears because efficiency is required, and I did not know about protobuf when I started. Using arrays requires a lot of care because one index is wrong and things go awry.

The first 4 bytes are the length of the message.
The next byte is the message code.
The next 16 bytes are the request identifier (uuid).
The next 40 bytes is the authorization token.
The rest of the message .

Message Lengthrequired, since I do not use http or web sockets, or some other protocol that provides separation of one message from another. My frontend servers only see byte streams, and they need to know where one message ends and another begins. There are several ways to separate messages (for example, use some kind of character never found in messages as a separator), but I preferred to specify the length, since this method is the easiest, although it entails overhead, as most messages are missing and one byte to indicate the length.

The message code is just one of the members of the enumMessageCode. Routing is carried out according to the code, and since we can extract the code from the array without preliminary deserialization, the frontend server decides in which topic of the kafka to send a message instead of delegating this responsibility to someone else.

Request IDpresent in most posts, but not in all. It performs 2 functions: by this identifier, the client establishes the correspondence between the sent request and the received response (if the client sent messages A, B, C in this order, this does not mean that the answers will also come in order). The second function is to avoid duplicates. As mentioned earlier, kafka guarantees at least once delivery. That is, in rare cases, messages can still be duplicated. By adding the RequestIdentifier column with a unique constraint to the desired database table, we can avoid inserting a duplicate.

Authorization TokenIs a UserId (8 bytes) + 32 bytes HmacSha256 signature. I don't think it's worth using Jwt here. Jwt is about 7-8 times larger for what? My users do not have any claims, so a simple hmac signature is fine. Authorization through other services is not and is not planned.

Audio and video calls


It's funny that I deliberately put off the implementation of audio and video calls, because I was sure that I won’t be able to solve the problems, but in fact it turned out to be one of the easiest features to implement. At least the basic functionality. In general, just adding WebRtc to the application and getting the first video session took only a few hours, and, miraculously, the first test was successful. Before that, I thought that the code that worked the first time was a myth. Usually the first test of a new feature always fails due to some kind of stupid error like “added a service, but did not register it in a DI container”.

Not very brief about WebRtc for the uninitiated
WebRtc — , peer-to-peer , , peer-to-peer , . - , , . , .

(peer-to-peer), , 3 ( , 3 . 3 ).

— stun . stun , — Source IP Source Port , . ? - . IP . - , , , Source IP Source Port IP - NAT [ Source IP | Source Port | Router External IP | Router Port ]. - , Dest IP Dest Port Router External IP Router Port NAT, , Source IP — Source Port , . , , , , , , NAT . stun NAT . stun Router External IP — Router Port. — . , «» NAT (NAT traversal) , NAT , stun .
* NAT , . , , WebRtc .

— turn. , , peer-to-peer . Fallback . , , , , , peer-to-peer . turn — coturn, .

— . , . , . — . . , , , :) — .

WebRtc 3 : offer, answer candidate. offer , answer, . , , , , . ( ) , .


WebRtc technology itself establishes a connection and is engaged in transferring flows back and forth, but this is not a framework for creating full-fledged calls. By call, I mean a communication session with the ability to cancel, reject and accept the call, as well as hang up. Plus, you need to let the caller know if the other side is already taken. And also to implement little things like "wait for an answer to the call N seconds, then reset." If you simply implement WebRtc in the application in a bare form, then with an incoming call, the camera and video will spontaneously turn on, which, of course, is unacceptable.

In its pure form, WebRtc usually means sending candidates to the other party as soon as possible so that negotiations begin as quickly as possible, which is logical. In my tests, candidates to the receiving party generally alwaysstarted to come even before offer comes. Such “early” candidates cannot be discarded, they must be remembered, so that later, when the offer arrives and RTCPeerConnectionis created, add them to the connection. The fact that candidates may begin to arrive even before the offer, as well as some other reasons, make the implementation of full-fledged calls a non-trivial task. What to do if several users call us at once? We will receive candidates from all, and although we can separate candidates from one user from another, it becomes unclear which candidates to reject, because we do not know whose offer will come earlier. There will also be problems if candidates begin to come to us and then an offer at the moment when we ourselves call someone.

After testing several options with a bare WebRtc, I came to the conclusion that in this form it would be problematic and fraught with memory leaks to try to make calls, so I decided to add another stage to the WebRtc negotiation process. I call this stage Inquire - Grant/Refuse.

The idea is very simple, but it took me quite a while to reach it. The caller even before creating the stream and RTCPeerConnection(and generally before executing any code related to WebRtc) sends a message through the signal server to the other side Inquire. On the receiving side, it is checked whether the user is in some other communication session at the moment (simple boolfield). If it is, then a message is sent back.Refuse, and in this way we let the caller know that the user is busy, and the receiver - that such and such a phone called while he was busy with another conversation. If the user is currently free, then it is reserved . The Inquiresession identifier is sent in the message , and this identifier is set as the identifier of the current session. If the user is reserved, he will reject all Inquire/Offer/Candidatemessages with session identifiers other than the current one. After the reservation, the receiver sends a message through the signal server to the caller Grant. It is worth saying that this process is not visible to the receiving user, since there is no call yet. And the main thing here is not to forget to hang up a timeout on the receiving side. Suddenly we will reserve a session, and no offer will follow.

The caller receives Grant, and this is where WebRtc begins with offers, candidates, and this is it for everyone. Offer flies to the receiver, and he, upon receipt, displays a screen with the buttons Answer / Reject. But candidates, as usual, are not expecting anyone. They again begin to arrive even earlier than the offer, because there is no reason to wait for the user to answer the call. He may not answer, but reject or wait until the timeout expires - then the candidates will simply be thrown out.

Current status and future plans


  • Private and group chats
  • Sending text, images and videos
  • Audio and video calls
  • Confirmation of receipt and reading
  • "Prints ..."
  • Notifications
  • Search by QR code and geolocation


Searching by QR code is, unexpectedly, quite problematic to implement, because almost all the plugins for the code scan that I tried refuse to start or do not work correctly. But I think the problems will be solved here. And for the implementation of the search for geolocation, I have not yet taken up. In theory, there should not be any special problems.

Notifications in progress, as well as sending videos.

What else needs to be done?


Oh, a lot.
Firstly, there are no tests. Colleagues used to write tests, so I completely relaxed.
Secondly, inviting users to an existing chat and leaving the chat is currently not possible. Server code is ready for this, client code is not.
Thirdly, if the error handling on the server is more or less, then there is no error handling on the client. It is not enough just to make a log entry; you need to retry the operation. Now, for example, the mechanism for re-sending messages is not implemented.
Fourth, the server does not ping the client, so disconnect is not detected if, for example, the client has lost the Internet. Disconnect is detected only when the client closes the application.
Fifth, indexes are not used in the database.
Sixth, optimization. The code has a huge number of places where something like is written // @@TODO: Pool. Most arrays are just newthat. The backend server creates many arrays of fixed length, so here you can and should use the pool.
Seventh, there are many places on the client where the code awaitends, although this is not necessary. Sending images, for example, therefore seems slow because the codeawaitIt saves pictures to the file system and generates thumbnails before displaying the message, although none of this needs to be done. Or, for example, if you open the application and during your absence they sent images to you, the startup will be slow, because again all these images are downloaded, saved to the system, thumbnails are generated, and only after that the startup ends and you are thrown from the splash screen on home screen. All these redundant await's were made for easier debugging, but, of course, you need to get rid of unnecessary waiting before release.
EighthUI is now half ready, because I have not decided how I want to see it. Therefore, now everything is not intuitive, half of the buttons are unclear what they are doing. And the buttons are often not pressed the first time, because now they are just icons with GestureDetectorand without padding, so it’s not always possible to get into them. Plus in some places pixel overflow is not fixed.
Ninth, now it’s even impossible to Sign-in to an account, only Sign Up. Therefore, if you uninstall the application and reinstall it, you won’t be able to log into your account :)
Tenth, the verification code is not sent to the mail. Now the code is generally always the same, again because it's easier to debug.
EleventhThe single-responsibility principle is violated in many places. Need a refactor. The classes responsible for interacting with the database (both on the client and on the server) are generally very bloated, because they are engaged in all database operations.
Twelfth, the frontend server now always expects a response from the backend server, even if the message does not imply sending a response (for example, a message with a code IsTypingand some WebRtc-related messages). Therefore, without waiting for an answer, he writes an error to the console, although this is not an error.
Thirteenth, full images do not open on tap.
One hundred million fifthssome messages that need to be sent in batches are sent separately. The same applies to some database operations. Instead of executing a single command, the commands are executed in a loop with await(brr ..).
One hundred million sixths, some values ​​are hardcoded, instead of being configurable.
One hundred and one million seventhslogging on the server is now only to the console, and on the client in general, directly to the widget. On the main screen there is a Logs tab, where all the logs on tap are dropped. The fact is that my working machine refuses to run both the emulator and everything necessary for the server (kafka, database, radish and all servers). Debit with a connected device also did not work out, everything just tightly hung in half the cases, because the computer could not cope with the loads. Therefore, you have to make a build every time, drop it onto the device, install and test like this. To see the logs, I drop them right into the widget. Perversion, I know, but there is no choice. For the same reason, many methods return FutureandawaitThey are (to catch the exception and throw into the widget), although they should not. If you look at the code, you will see an ugly _logErrormethod in many classes that does this. This, of course, will also go to the trash.
One hundred million and eight, no sounds.
One hundred million and ninth, you need to use caching more.
One hundred million tenths, a lot of repetitive code. For example, many actions first of all check the validity of the token, and if it is not valid, they send an error. I think you need to implement a simple middleware-pipeline.

And a lot of little things, like concatenating strings instead of using StringBuilder'a,Disposenot everywhere is called where it should, and so on and so on. In general, the normal state of the project is under development. All of the above can be solved, but there is one fundamental problem that I did not think about until the last moment, because it got out of my head - the messenger should work even when the application is not open, and mine does not work. To be honest, the solution to this problem has not yet crossed my mind. Here, apparently, you can not do without the native code.

I would rate the readiness of the project at 70%.

Summary


Six months have passed since the start of work on the project. Combined with part-time work and took long breaks, but still it took a decent amount of time and energy. I plan to implement all the declared features + add something unusual like tic-tac-toe or drafts right in the room. For no reason, just because it's interesting.

If you have any questions, write. Mail is on github.

All Articles