当我写信使时

在又一个令人沮丧的一天之后的一个晚上,充满了平衡游戏的尝试,我决定我急需休息一下。我将切换到另一个项目,迅速执行,返回在游戏开发过程中逐渐减弱的自尊心,并将以新的活力席卷整个游戏!最主要的是选择一个不错且轻松的项目。写您自己的Messenger?哈!它能有多难?

该代码可以在这里找到


简要背景


在开始从事Messenger之前的将近一年时间里,他一直在从事在线多人在线《线下大战》游戏。编程进行得很顺利,其他所有东西(尤其是平衡感和视觉感)都不是很好。突然发现,制作游戏和制作有趣的游戏(为自己以外的人带来乐趣)是两回事。经过一年的磨难,我需要分心,所以我决定尝试其他事情。选择取决于移动开发,即Flutter。我听到了关于Flutter的很多好消息,经过短暂的实验,我喜欢飞镖。我决定写自己的Messenger。首先,最好同时实施客户端和服务器。其次,我将在投资组合中寻找重要的工作。

预定功能


  • 私人和群聊
  • 发送文本,图像和视频
  • 音频和视频通话
  • 确认收货和阅读(Votsap的提示)
  • “印刷品...”
  • 通知事项
  • 通过QR码和地理位置搜索

展望未来,我可以自豪地(并欣慰地)说,几乎所有已计划的计划都已实施,而尚未实施-将在不久的将来实施。



语言选择


我很久没有选择语言了。最初,在客户端和服务器上都使用dart是很诱人的,但是更详细的检查表明,没有可用的dart驱动程序,而那些驱动程序并没有激发很大的信心。尽管我不保证谈论当前时刻,但情况可能有所改善。因此,我的选择落在了我在Unity中工作过的C#上。

建筑


他开始思考建筑。当然,考虑到三分之二的人最有可能使用我的Messenger,因此一般而言,人们不必去烦恼建筑。您可以按照无数教程中的说明进行操作。这是节点,这是mongo,这是Web套接字。做完了 Firebase就在附近。但这并不有趣。我决定制作一个可以轻松水平扩展的Messenger,好像我希望有数百万个同时进行的客户一样。但是,由于我没有这方面的经验,因此我不得不通过错误和错误的方法来学习实践中的所有内容。

最终的架构看起来像这样


我并不是说这样的体系结构超级酷且可靠,但是它是可行的,并且理论上应该可以承受重负荷并可以水平扩展,但是我并不真正了解如何检查。我希望我不要错过除了我之外每个人都知道的明显时刻。

以下是各个组件的详细说明。

前端服务器


甚至在我开始制作游戏之前,我就对异步单线程服务器的概念着迷。有效且没有潜在的竞赛机会-您还能要求什么。为了了解如何安排这些服务器,我开始深入研究asynciopython语言模块我看到的解决方案看起来非常优雅。简而言之,伪代码解决方案如下所示。
//  ,      ,    
//       .      socket.Receive
//     , :
var bytesReceived = Completer<object>();
selector.Register(
    socket,
    SocketEvent.Receive,
    () => bytesReceived.Complete(null)
);

await bytesReceived.Future;

int n = socket.Receive(...); //   

// selector -     poll.   
//        (Receive 
//  ), ,    ,  .
//   completer,      ,
//        , ,     .
//     ,       .

使用这种简单的技术,我们可以在单个线程中服务大量套接字。我们永远不会在等待接收或发送字节时阻塞流。流总是忙于有用的工作。总之,并发。

前端服务器是通过这种方式实现的。它们都是单线程和异步的。因此,为了获得最佳性能,一台计算机上需要运行具有核心数量的服务器(图中为4个)。

前端服务器从客户端读取消息,然后根据消息代码将其发送到Kafka中的主题之一。

不熟悉kafa的人的小注脚
, RabbitMQ. . , ( authentication backend authentication, ). ? - , (partition). , . , , . , ( , , , (headers)).

? ? . (consumer) ( consumer'), ( ) . , , , 2 , . 3 — 2. .

前端服务器不带密钥就向Kafka发送消息(如果没有密钥,则Kafka只是依次向聚会发送消息)。消息由相应的后端服务器之一从主题中拉出。服务器处理该消息,然后...接下来是什么?还有什么进一步取决于消息的类型。

在最常见的情况下,会发生请求-响应周期。例如,对于注册请求,我们只需要给客户一个答案(SuccessEmailAlreadyInUse等)。但是,对于包含邀请邀请新成员(Vasya,Emil和Julia)进行现有聊天的消息,我们需要立即使用三种不同类型的消息进行响应。第一种类型-您需要将操作的结果通知邀请者(突然发生服务器错误)。第二种类型-您需要通知聊天的所有当前成员,聊天中现在有此类成员。第三是向Vasya,Emil和Yulia发送邀请。

好的,这听起来并不困难,但是为了向任何客户端发送消息,我们需要:1)找出此客户端连接到哪个前端服务器(我们不选择客户端将连接到哪个服务器,平衡器会为我们决定); 2)从后端服务器向所需的前端服务器发送消息; 3)实际上是向客户端发送消息。

为了实现第1点和第2点,我决定使用一个单独的主题(“前端服务器”主题)。将身份验证,会话和呼叫主题分隔为多个分区可充当并行化机制。我们看到会话服务器负载很重吗?只需添加几个新的分区和会话服务器,Kafka就会为我们重新分配负载,卸载现有的会话服务器。将“前端服务器”主题分离到分区中充当路由机制

每个前端服务器对应于“前端服务器”主题的一部分(具有与服务器本身相同的索引)。也就是说,服务器0-分区0,依此类推。 Kafka不仅可以订阅特定主题,还可以订阅特定主题的特定部分。初创企业上的所有前端服务器都订阅相应的分区。因此,后端服务器能够通过将消息发送到特定分区来将消息发送到特定前端服务器。

好的,现在客户端加入时,您只需要在某处保存一对UserId-Frontend Server Index。如果断开连接-删除。为此,许多内存键值数据库都可以使用。我选了萝卜。

在实践中的外观。首先,建立连接后,客户端Andrey向服务器发送一条消息Join前端服务器接收到该消息并将其转发到会话主题,并预先添加了“前端服务器”标头:{index}。其中一个后端会话服务器将收到一条消息,读取授权令牌,确定已加入的用户类型,读取前端服务器添加的索引,并将UserId-索引写入萝卜。从这一刻起,该客户端被认为是联机的,现在我们知道当其他客户端向Andrey发送消息时,可以通过哪个前端服务器(相应地,通过“前端服务器”主题的哪一部分)与该客户端“联系”。

*实际上,该过程比我描述的要复杂一些。您可以在源代码中找到它。

前端服务器伪代码


// Frontend Server 6
while (true) {
    // Consume from "Frontend Servers" topic, partition 6
    var messageToClient = consumer.Consume();
    if (message != null) {
        relayMessageToClient(messageToClient);
    }

    var callbacks = selector.Poll();
    while (callbacks.TryDequeue(out callback)) {
        callback();
    }

    long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
    while (!callAtQueue.IsEmpty && callAtQueue.PeekPriority() <= now) {
        callAtQueue.Dequeue()();
    }

    while (messagesToRelayToBackendServers.TryDequeue(out messageFromClient)) {
        // choose topic
        producer.Produce(topic, messageFromClient);
    }
}


这里有一些技巧。
1)relayMessageToClient。仅获取所需的套接字并立即开始向其发送消息是一个错误,因为也许我们已经在向客户端发送其他消息。如果我们开始发送字节而不检查套接字当前是否繁忙,则消息将混合在一起。像在许多其他需要顺序处理数据的地方一样,诀窍是使用队列,即来自Completers的队列(TaskCompletionSource在C#中)。
void async relayMessageToClient(message) {
    // find client
    await client.ReadyToSend();
    await sendMessage(client, message);
    client.CompleteSend();
}

class Client {
    // ...
    sendMessageQueue = new LinkedList<Completer<object>>();

    async Future ReadyToSend() {
        var sendMessage = Completer<object>();
	if (sendMessageQueue.IsEmpty) {
	    sendMessageQueue.AddLast(sendMessage);
	} else {
	    var prevSendMessage = sendMessageQueue.Last;
	    sendMessageQueue.AddLast(sendMessage);
	    await prevSendMessage.Future;
	}
    }

    void CompleteSend() {
        var sendMessage = sendMessageQueue.RemoveFirst();
	sendMessage.Complete(null);
    }
}

如果队列不为空,则此刻套接字已被占用。创建一个新的completer,把它添加到队列中,并await前面的一个 completer。因此,发送上一条消息时,它将CompleteSend完成completer,这将导致服务器开始发送下一条消息。这样的队列还允许平稳传播异常。假设在向客户端发送消息时发生错误。在这种情况下,我们不仅需要发送此消息,还需要发送队列中当前正在等待的所有消息(挂在await“ ah”上),因此需要完成。如果我们不这样做,则它们将继续挂起,并且我们将收到内存泄漏。为简便起见,此处未显示执行此操作的代码。

2)selector.Poll。实际上,这甚至不是一个技巧,而只是试图消除方法实现的缺点Socket.Selectselector-只是对该方法的包装)。根据内部操作系统的不同,此方法使用selectpoll。但这在这里并不重要。重要的是此方法如何与我们将其馈送到输入的列表(用于读取,写入,错误检查的套接字列表)一起使用。此方法获取列表,轮询套接字,仅将那些准备好执行所需操作的套接字留在列表中。所有其他套接字都从列表中抛出。 “踢”通过RemoveAt(也就是说,所有后续元素都会移动,这是无效的)。另外,由于我们需要在循环的每个迭代中轮询所有已注册的套接字,所以这种“清理”通常是有害的,因此我们每次都必须重新填充列表。我们可以使用自定义List方法来解决所有这些问题RemoveAt方法不会将项目从列表中删除,而只是将其标记为已删除。该类ListForPolling是我对此类列表的实现。ListForPolling仅适用于该方法Socket.Select,不适用于其他任何方法。

3)callAtQueue。在大多数情况下,已将客户端消息发送到后端服务器的前端服务器期望得到响应(确认操作成功,如果出现问题则返回错误)。如果他没有在可配置的时间段内等待答案,那么他会将错误发送给客户端,这样他就不会等待永远不会出现的答案。callAtQueue是优先级队列。服务器将消息发送到Kafka之后,立即执行以下操作:
long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
callAtQueue.Enqueue(callback, now + config.WaitForReplyMSec);

在回调中,等待响应被取消,服务器错误发送开始。如果收到来自后端服务器的响应,则回调不执行任何操作。由于它是在池中的线​​程上执行的,因此无法

使用await Task.WhenAny(answerReceivedTask, Task.Delay(x))Task.Delay

实际上,这里是有关前端服务器的所有内容。这里需要进行一些更正。实际上,服务器不完全单线程。当然,幕后的kafka使用线程,但是我的意思是应用程序代码。事实是,将消息发送到kafka(生产)主题可能不会成功。如果发生故障,Kafka会重复发送一定数量的可配置次数,但是,如果重复出发失败,Kafka会毫无希望地放弃这项业务。您可以检查消息是否成功发送,deliveryHandler然后我们将其传递给该方法Produce。 Kafka在生产者的I / O线程(发送消息的线程)中调用此处理程序。我们必须确保成功发送了消息,如果没有成功,则取消等待后端服务器的响应(由于未发送请求,响应不会出现),然后将错误发送给客户端。也就是说,我们无法避免与另一个线程进行交互。

*在写文章时,我突然意识到我们不能传递deliveryHandler给该方法Produce或简单地忽略所有的kafka错误(该错误仍将在我前面描述的超时之前发送给客户端)-然后我们所有的代码都将是单线程的。现在我正在考虑如何做得更好。

为什么实际上是卡夫卡而不是兔子?
, , , , , RabbitMQ? . , , . ? , frontend . , backend , , . , , . , error-prone. , basicGet , , , . . basicGet, , . .


后端服务器


与前端服务器相比,这里几乎没有有趣的地方。所有后端服务器都以相同的方式工作。在启动时,服务器订阅主题(身份验证,会话或呼叫,取决于角色),而kafka为其分配一个或多个分区。服务器从Kafka接收消息,进行处理并通常发送一个或多个消息作为响应。几乎真实的代码:
void Run() {
    long lastCommitTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();

    while (true) {
        var consumeResult = consumer.Consume(
            TimeSpan.FromMilliseconds(config.Consumer.PollTimeoutMSec)
        );

        if (consumeResult != null) {
            var workUnit = new WorkUnit() {
                ConsumeResult = consumeResult,
            };

            LinkedList<WorkUnit> workUnits;
            if (partitionToWorkUnits.ContainsKey(consumeResult.Partition)) {
                workUnits = partitionToWorkUnits[consumeResult.Partition];
            } else {
                workUnits = partitionToWorkUnits[consumeResult.Partition] =
                    new LinkedList<WorkUnit>();
            }

            workUnits.AddLast(workUnit);

            handleWorkUnit(workUnit);
        }

	if (
            DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - lastCommitTime >=
            config.Consumer.CommitIntervalMSec
        ) {
            commitOffsets();
	    lastCommitTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
	}
    }
}

要提交哪种补偿?
. — (offset) (0, 1 ). 0. TopicPartitionOffset. (consume) , ConsumeResult, , , TopicPartitionOffset. ?

at least once delivery, , ( ). , (commited) . , consumer 16, , 16 , , , . - consumer' consumer' , 16 + 1 ( + 1). 17 . N , .

我禁用了自动提交并承诺自己。这是必需的,因为handleWorkUnit在实际执行消息处理的地方,这是一种async void方法,因此不能保证在消息6之前将处理消息5。Kafka在提交偏移之前仅分别存储一个提交的偏移(而不是一组偏移)。 6,我们需要确保所有先前的消息也已处理。此外,一台后端服务器可以同时使用来自多个分区的消息,因此,必须确保它对相应的分区提交正确的偏移量。为此,我们使用表单分区的哈希图:工作单位。这是代码的样子commitOffsets(这次实际代码):
private void commitOffsets() {
    foreach (LinkedList<WorkUnit> workUnits in partitionToWorkUnits.Values) {
        WorkUnit lastFinishedWorkUnit = null;
        LinkedListNode<WorkUnit> workUnit;
        while ((workUnit = workUnits.First) != null && workUnit.Value.IsFinished) {
            lastFinishedWorkUnit = workUnit.Value;
            workUnits.RemoveFirst();
        }

        if (lastFinishedWorkUnit != null) {
            offsets.Add(lastFinishedWorkUnit.ConsumeResult.TopicPartitionOffset);
        }
    }

    if (offsets.Count > 0) {
        consumer.Commit(offsets);
        foreach (var offset in offsets) {
            logger.Debug(
                "{Identifier}: Commited offset {TopicPartitionOffset}",
                identifier,
                offset
            );
        }
        offsets.Clear();
    }
}

如您所见,我们遍历单位,找到此刻之前完成的最后一个单位,此后没有不完整的单位,并提交相应的偏移量。这样的循环使我们避免了“漏洞”提交。例如,如果我们当前有4个单位(0: Finished, 1: Not Finished, 2: Finished, 3: Finished),则只能提交第0个单位,因为如果立即提交第三个单位,则可能导致丢失第1个单位(如果服务器立即死亡)。
class WorkUnit {
    public ConsumeResult<Null, byte[]> ConsumeResult { get; set; }
    private int finished = 0;

    public bool IsFinished => finished == 1;

    public void Finish() {
        Interlocked.Increment(ref finished);
    }
}


handleWorkUnit如前所述,该async void方法及其相应方法已完全封装在中try-catch-finally在中,try他呼唤必要的服务,在finally-中workUnit.Finish()

服务非常简单。例如,在这里,当用户发送新消息时执行什么代码:
private async Task<ServiceResult> createShareItem(CreateShareItemMessage msg) {
    byte[] message;
    byte[] messageToPals1 = null;
    int?[] partitions1 = null;

    //  UserId  .
    long? userId = hashService.ValidateSessionIdentifier(msg.SessionIdentifier);
    if (userId != null) {
        var shareItem = new ShareItemModel(
            requestIdentifier: msg.RequestIdentifier,
            roomIdentifier: msg.RoomIdentifier,
            creatorId: userId,
            timeOfCreation: null,
            type: msg.ShareItemType,
            content: msg.Content
        );

        //      null,
        //     .
        long? timeOfCreation = await storageService.CreateShareItem(shareItem);
        if (timeOfCreation != null) {
            //      .
            List<long> pals = await inMemoryStorageService.GetRoomPals(
                msg.RoomIdentifier
            );
            if (pals == null) {
            	//     -       .
                pals = await storageService.GetRoomPals(msg.RoomIdentifier);
                await inMemoryStorageService.SaveRoomPals(msg.RoomIdentifier, pals);
            }

            //    ,  .
            pals.Remove(userId.Value);

            if (pals.Count > 0) {
            	//  ack,  ,    
                //    .
                await storageService.CreateAck(
                    msg.RequestIdentifier, userId.Value, msg.RoomIdentifier,
                    timeOfCreation.Value, pals
                );

                // in -  UserId, out -   frontend ,
                //    .  -   -
                //   null.
                partitions1 = await inMemoryStorageService.GetUserPartitions(pals);

                List<long> onlinePals = getOnlinePals(pals, partitions1);

                //    ,       .
                //         .
                if (onlinePals.Count > 0) {
                    messageToPals1 = converterService.EncodeNewShareItemMessage(
                        userId.Value, timeOfCreation.Value, onlinePals, shareItem
                    );
                    nullRepeatedPartitions(partitions1);
                    // -         
                    // frontend ,    null' .
                }
            }

            message = converterService.EncodeSuccessfulShareItemCreationMessage(
                msg.RequestIdentifier, timeOfCreation.Value
            );
        } else {
            message = converterService.EncodeMessage(
                MessageCode.RoomNotFound, msg.RequestIdentifier
            );
        }
    } else {
        message = converterService.EncodeMessage(
            MessageCode.UserNotFound, msg.RequestIdentifier
        );
    }

    return new ServiceResult(
        message: message, //    .
        messageToPals1: messageToPals1, //  -    .
        partitions1: partitions1
    );
}


数据库


后端服务器调用的服务的大多数功能只是将新数据添加到数据库并处理现有数据。显然,数据库的组织方式和操作方式对于Messenger来说非常重要,在这里我想说的是,在仔细研究了所有选项之后,我非常仔细地研究了选择数据库的问题,但事实并非如此。我之所以选择CockroachDb,是因为它以最小的努力承诺了很多,并且具有与postgres兼容的语法(我之前使用过postgres)。曾经有过使用Cassandra的想法,但是最后我决定专注于一些熟悉的东西。我从未与Kafka,Rabbit,Flutter和Dart或WebRtc一起工作过,所以我决定也不拖累Cassandra,因为我担心自己会淹没很多新技术。

在项目的所有部分中,我最怀疑的是数据库设计。我不知道我所做的决定确实不错的决定。一切正常,但可以做得更好。例如,有表ShareRooms(如我所说的聊天)和ShareItems(如我所说的消息)。因此,进入会议室的所有用户都记录在该会议室的jsonb字段中。这很方便,但显然很慢,因此我可能会使用外键重做。或者,例如,ShareItems表存储所有消息。这也很方便,但是由于ShareItems是负载最大的表之一(持久表selectinsert),则可能需要为每个房间或类似的地方创建一个新表。 Kokroach将记录分散在不同的节点上,因此,您需要仔细考虑要走哪条记录才能获得最佳性能,但是我没有。通常,从以上所有内容可以理解,数据库不是我的强项。现在,我通常会测试所有Postgres而不是kokroach的东西,因为我的工作机器上的负载较少,负载已经很差了,很快就会起飞。幸运的是,postgres和kokroach的代码相差很多,因此切换并不困难。

现在,我正在研究cocroach实际如何工作(SQL和键值之间的映射是如何发生的(cocroach在后台使用RocksDb),如何在节点之间分配数据,复制等等)。当然,在使用cocroach之前值得研究,但是迟到总比没有好。

我认为,当我更好地理解这个问题时,基础将会发生很大的变化。现在,Acks桌子正困扰着我。在此表中,我存储有关谁尚未收到和谁尚未阅读消息的数据(以显示用户选中标记)。如果用户现在在线,很容易通知用户已经阅读了他的消息,但是如果没有,我们需要保存此信息以便以后通知用户。而且由于可以进行群聊,因此仅存储标记是不够的,您需要有关单个用户的数据。因此,在这里,我们直接要求使用位串(对于尚未接收到的用户来说是一行,对于尚未阅读的用户来说是第二行)。特别是kokroach的支持bitbit varying。但是,鉴于房间的组成可以不断变化,我从未想出如何实施这项业务。为了使位串保持其含义,房间中的用户必须保持相同的顺序,这在例如某些用户离开房间时很难做到。这里有选项。也许值得写-1,而不是从jsonb字段中删除用户以保留顺序或使用某种版本控制方法,这样我们就知道此位字符串是指用户的顺序,那时,而不是按照当前的用户顺序。我仍在考虑如何更好地实施这项业务,但是就目前而言,那些尚未收到并且尚未阅读用户的人也只是jsonb字段。假设Acks表是随每条消息写入的,则数据量很大。尽管记录当然会在每个人都收到并读取消息后删除。


长期以来,我在服务器端工作,并使用简单的控制台客户端进行测试,因此我什至没有创建Flutter项目。当我创建它时,我以为服务器部分是一个复杂的部分,而应用程序就是这样,垃圾,我将在几天之内解决它。在服务器上工作时,我创建了Hello Worlds几次,以使他们对框架有所了解,并且由于Messenger不需要任何复杂的UI,所以我认为它已经完全准备就绪。因此,UI确实是垃圾,但是该功能的实现给我带来了问题(并且仍然可以交付,因为还没有准备好一切)。

国家管理


最受欢迎的话题。有上千种方法可以管理您的病情,建议的方法每六个月更改一次。现在主流是提供商。就个人而言,我为自己选择了两种方式:bloc和redux。 Bloc(业务逻辑组件)用于管理本地状态,redux用于管理全局。

Bloc不是某种类型的库(尽管,当然还有一个减少样板的库,但我不使用它)。 Bloc是一种基于流的方法。通常,dart是一种非常不错的语言,并且流通常是如此的甜美。这种方法的本质是,我们将整个业务逻辑推送到服务中,并通过为我们提供各种流的控制器在UI和服务之间进行通信。用户是否单击了“查找联系人”按钮?使用sink(流的另一端),我们向控制器发送事件SearchContactsEvent,控制器将调用所需的服务,等待结果,并通过流将用户列表返回给UI。 UI使用StreamBuilder通过每次订阅的流中新数据到达时都会重建的小部件)等待结果。实际上,仅此而已。在某些情况下,我们需要在没有用户参与的情况下更新UI(例如,当收到新消息时),但这也很容易通过流来完成。实际上,带有流的简单MVC,没有任何魔力。

与其他一些方法相比,bloc需要更多样板,但我认为,最好在没有第三方库参与的情况下使用本机解决方案,除非使用第三方解决方案会带来一些重大变化优点。顶层的抽象越多,发生错误时就越难理解错误是什么。我认为提供者的优势不足以切换到它。但是我在这方面经验不足,因此将来可能会改变阵营。

好吧,关于redux,所以每个人都知道一切,所以无话可说。此外,我从应用程序中删除了它:)我用它来管理我的帐户,但是随后意识到,在这种情况下,该块没有特别的优势,所以我切出了它,以免拖得太多。但总的来说,我认为redux对于管理全局状态很有用。

最令人难忘的部分


如果用户发送了一条消息,但是在发送之前,Internet连接丢失,该怎么办?如果用户收到读取确认,但是在更新数据库中的相应记录之前关闭了应用程序,该怎么办?如果用户邀请他的朋友去房间,但是在发送邀请之前,他的电池没电了,该怎么办?您曾经问过类似的问题吗?我在这里。之前。但是在开发过程中我开始怀疑。由于连接可以随时消失,并且手机可以随时关闭,因此必须确认所有内容。不好玩。因此,客户端发送到服务器的第一条消息(Join如果您还记得的话)不仅是“你好我在线”,而是“您好我在线,这里是未确认的房间,这里是未确认的座位,这里是未确认的房间成员身份操作,这是每个房间最近收到的消息服务器用类似的表格回应:“当您处于脱机状态时,这样的用户读取了这样的消息,他们还邀请Petya到这个房间,而Sveta离开了这个房间,您被邀请到这个房间,但是这两个房间有40个新职位我真的很想知道在其他Messenger中是如何完成类似的事情的,因为我的实现并不优雅。

图片


目前,您可以发送文本,文本+图片以及仅图片。视频上传尚未实现。图像会进行一点压缩并保存在Firebase存储中。消息本身包含链接。收到消息后,客户端下载图像,生成缩略图并将所有内容保存到文件系统。文件路径被写入数据库。顺便说一下,缩略图生成是在单独的线程上执行的唯一代码,因为它是计算量很大的操作。我只是开始一个工作流,向其提供图像,然后返回缩略图。该代码非常简单,因为dart为使用流提供了方便的抽象。

ThumbnailGeneratorService
class ThumbnailGeneratorService {
  SendPort _sendPort;
  final Queue<Completer<Uint8List>> _completerQueue =
      Queue<Completer<Uint8List>>();

  ThumbnailGeneratorService() {
    var receivePort = ReceivePort();
    Isolate.spawn(startWorker, receivePort.sendPort);

    receivePort.listen((data) {
      if (data is SendPort) {
        _sendPort = data;
      } else {
        var completer = _completerQueue.removeFirst();
        completer.complete(data);
      }
    });
  }

  static void startWorker(SendPort sendPort) async {
    var receivePort = ReceivePort();
    sendPort.send(receivePort.sendPort);

    receivePort.listen((imageBytes) {
      Image image = decodeImage(imageBytes);
      Image thumbnail = copyResize(image, width: min(image.width, 200));

      sendPort.send(Uint8List.fromList(encodePng(thumbnail)));
    });
  }

  Future<Uint8List> generate(Uint8List imageBytes) {
    var completer = Completer<Uint8List>();
    _completerQueue.add(completer);
    
    _sendPort.send(imageBytes);

    return completer.future;
  }
}


还使用Firebase身份验证,但仅用于访问Firebase存储的授权(这样,用户就无法将个人资料图片填写给其他人)。所有其他授权都是通过我的服务器完成的。

讯息格式


由于我使用常规字节数组,因此您在这里可能会感到恐惧。 Json消失是因为需要效率,而我刚开始时并不了解protobuf。使用数组需要格外小心,因为一个索引错误并且事情出错了。

前4个字节是消息的长度。
下一个字节是消息代码。
接下来的16个字节是请求标识符(uuid)。
接下来的40个字节是授权令牌。
其余消息

讯息长度是必需的,因为我不使用http或Web套接字,也不使用其他将某条消息与另一条消息分开的协议。我的前端服务器只看到字节流,它们需要知道一条消息在哪里结束而另一条消息在哪里开始。分离消息有几种方法(例如,使用在消息中找不到的某种字符作为分隔符),但我更喜欢指定长度,因为此方法最简单,尽管会带来开销,因为大多数消息都会丢失并且一个字节来指示长度。

消息代码只是枚举的成员之一MessageCode。路由是根据代码执行的,并且由于我们可以在不进行初步反序列化的情况下从数组中提取代码,因此前端服务器决定在kafka的哪个主题上发送消息,而不是将此职责委托给其他人。

要求编号出现在大多数帖子中,但不是全部。它执行2个功能:通过此标识符,客户端在发送的请求和接收的响应之间建立对应关系(如果客户端以该顺序发送消息A,B,C,这并不意味着答案也将依次出现)。第二个功能是避免重复。如前所述,kafka保证至少交货一次。也就是说,在极少数情况下,消息仍然可以被复制。通过将具有唯一约束的RequestIdentifier列添加到所需的数据库表中,我们可以避免插入重复项。

授权令牌是一个UserId(8个字节)+ 32个字节的HmacSha256签名。我认为在这里使用Jwt不值得。Jwt是什么的大约7-8倍?我的用户没有任何要求,因此简单的hmac签名就可以了。通过其他服务进行授权不是,而且也没有计划。

音频和视频通话


我特意推迟了音频和视频通话的实现,这很有趣,因为我确定我无法解决问题,但实际上,它却是实现起来最简单的功能之一。至少是基本功能。通常,仅将WebRtc添加到应用程序中并进行第一次视频会话就只花了几个小时,而且奇迹般地,第一次测试是成功的。在此之前,我认为第一次起作用的代码是一个神话。通常,由于某种愚蠢的错误(例如“添加了服务,但未将其注册到DI容器中”),对新功能的首次测试始终会失败。

对于初学者来说,WebRtc并不是很简短
WebRtc — , peer-to-peer , , peer-to-peer , . - , , . , .

(peer-to-peer), , 3 ( , 3 . 3 ).

— stun . stun , — Source IP Source Port , . ? - . IP . - , , , Source IP Source Port IP - NAT [ Source IP | Source Port | Router External IP | Router Port ]. - , Dest IP Dest Port Router External IP Router Port NAT, , Source IP — Source Port , . , , , , , , NAT . stun NAT . stun Router External IP — Router Port. — . , «» NAT (NAT traversal) , NAT , stun .
* NAT , . , , WebRtc .

— turn. , , peer-to-peer . Fallback . , , , , , peer-to-peer . turn — coturn, .

— . , . , . — . . , , , :) — .

WebRtc 3 : offer, answer candidate. offer , answer, . , , , , . ( ) , .


WebRtc技术本身会建立连接并参与来回传输流,但这不是用于创建完整调用的框架。所谓通话,是指可以取消,拒绝和接受通话以及挂断电话的通信会话。另外,您需要让呼叫者知道对方是否已被占用。而且还可以执行一些小事情,例如“等待呼叫N秒,然后重置”。如果仅以裸露的形式在应用程序中实现WebRtc,则在有来电时,摄像头和视频会自动打开,这当然是不可接受的。

以纯粹的形式,WebRtc通常意味着尽快将候选人发送给另一方,以使谈判尽快开始,这是合乎逻辑的。在我的测试中,接收方的候选人通常总是在要约到来之前就开始出现。这样的“早期”候选者不能被丢弃,必须记住它们,以便稍后,当报价到达并RTCPeerConnection创建时,将它们添加到连接中。候选人甚至可能在要约之前就开始到来的事实以及其他一些原因,使得执行完整的招聘举足轻重。如果有多个用户立即致电我们该怎么办?我们将收到来自所有人的候选人,尽管我们可以将一个用户的候选人与另一个用户分开,但是不清楚哪些候选人被拒绝,因为我们不知道谁的提议会更早提出。如果候选人开始来找我们,然后在我们自己给某人打电话时提出要约,也会有问题

在使用裸露的WebRtc测试了几个选项之后,我得出的结论是,尝试以这种形式进行调用会出现问题并且充满内存泄漏,因此我决定在WebRtc协商过程中增加另一个阶段。我称这个阶段Inquire - Grant/Refuse

这个想法很简单,但是花了我一段时间。甚至在创建流之前RTCPeerConnection(通常在执行与WebRtc相关的任何代码之前),调用方都会通过信号服务器将消息发送到另一端Inquire。在接收方,检查用户此刻是否处于其他通信会话中(简单bool字段)。如果是,那么将发送回一条消息。Refuse,这样,我们就可以让呼叫者知道用户正忙,让接收者知道当他正忙于另一个对话时,某某电话正在打电话。如果用户当前有空,则保留Inquire会话标识符在消息中发送,并且此标识符设置为当前会话的标识符。如果保留该用户,则他将拒绝所有Inquire/Offer/Candidate具有会话标识符(当前标识符除外)的消息。预订之后,接收方通过信号服务器向呼叫方发送一条消息Grant。值得一提的是,此过程对接收用户不可见,因为还没有呼叫。最主要的是不要忘记在接收方挂断超时。突然,我们将保留一个会议,没有报价。

呼叫者会收到Grant,这是WebRtc从要约,候选人开始的地方,这就是每个人都可以使用的地方。报价会飞给接收者,接收者会在接收器上显示一个带有“应答/拒绝”按钮的屏幕。但是候选人像往常一样没有期待任何人。他们甚至早于要约开始到达,因为没有理由等待用户应答呼叫。他可能不会回答,但会拒绝或等到超时时间到期-然后候选人将被直接抛出。

现状和未来计划


  • 私人和群聊
  • 发送文本,图像和视频
  • 音频和视频通话
  • 确认收货和阅读
  • “印刷品...”
  • 通知事项
  • 通过QR码和地理位置搜索


出乎意料的是,通过QR码搜索很难实现,因为我尝试尝试的几乎所有用于代码扫描的插件都无法启动或无法正常工作。但我认为问题将在这里解决。对于实施地理位置搜索,我尚未开始。从理论上讲,不应有任何特殊问题。

正在进行通知以及发送视频。

还有什么需要做的?


哦,很多。
首先,没有测试。同事以前经常写测试,所以我完全放松了。
其次,当前无法邀请用户加入现有聊天。服务器代码已准备就绪,客户端代码尚未准备好。
第三,如果服务器上的错误处理或多或少,则客户端上没有错误处理。仅进行日志输入是不够的;您需要重试该操作。现在,例如,没有实现用于重新发送消息的机制。
第四,服务器不对客户端执行ping操作,因此,例如,如果客户端丢失了Internet,则不会检测到断开连接。仅当客户端关闭应用程序时,才会检测到断开连接。
第五,数据库中不使用索引。
第六,优化。该代码在很多地方都写有类似代码// @@TODO: Pool。大多数数组就是new这样。后端服务器会创建许多固定长度的数组,因此您可以在这里使用池。
第七,客户端上有很多代码await结束的地方,尽管这不是必需的。例如,发送图像似乎很慢,因为代码await尽管不需要执行任何操作,但它会将图片保存到文件系统并在显示消息之前生成缩略图。或者,例如,如果您打开该应用程序,并且在您不在的情况下它们将图像发送给您,则启动会很慢,因为所有这些图像都会再次下载,保存到系统,生成缩略图,并且只有在启动结束并从初始屏幕中退出后,在主屏幕上。所有这些冗余await都是为了简化调试而进行的,但是,当然,您需要在发布之前摆脱不必要的等待。
第八用户界面现在已经准备就绪,因为我还没有决定如何查看它。因此,现在一切都不直观,一半按钮不清楚它们在做什么。而且通常不会第一次按下按钮,因为现在它们只是带有GestureDetector或不带有填充的图标,因此并不总是能够进入它们。加上在某些地方像素溢出不是固定的。
第九,现在什至不可能登录帐户,只能注册。因此,如果您卸载该应用程序并重新安装它,则将无法登录您的帐户:)
第十,验证码不会发送到邮件中。现在,代码通常总是相同的,再次是因为它更易于调试。
第十一在很多地方违反了单一责任原则。需要重构。负责与数据库交互的类(在客户端和服务器上)通常非常庞大,因为它们参与了所有数据库操作。
第十二,前端服务器现在始终期望后端服务器提供响应,即使该消息并不意味着发送响应(例如,带有代码的消息IsTyping以及一些与WebRtc相关的消息)。因此,尽管这不是错误,但他无需等待答案就将错误写入控制台。
第十三,完整的图像无法点击打开。
亿五分之一需要分批发送的某些消息将单独发送。这同样适用于某些数据库操作。而不是执行单个命令,而是使用await(brr ..)循环执行命令
亿分之六,有些值是硬编码的,而不是可配置的。
一百一十分之一现在,仅在控制台上登录服务器,通常在客户端上直接登录到小部件。在主屏幕上,有一个“日志”选项卡,其中放置了所有点击的日志。事实是,我的工作机器拒绝同时运行模拟器和服务器所需的一切(kafka,数据库,萝卜和所有服务器)。连接设备的借记卡也无法解决问题,在一半情况下,一切都紧紧地挂着了,因为计算机无法应付负荷。因此,您必须每次进行构建,然后将其拖放到设备上,进行安装和测试。要查看日志,我将其直接放到小部件中。我知道变态,但别无选择。出于相同的原因,许多方法返回Futureawait它们是(捕获异常并扔到小部件中),尽管它们不应该这样。如果看一下代码,您会_logError在许多类中看到一个丑陋的方法。当然,这也将变成垃圾。
一亿零八,没有声音。
一亿分之一,您需要使用更多的缓存。
十分之一,很多重复的代码。例如,许多操作首先检查令牌的有效性,如果无效,则发送错误。我认为您需要实现一个简单的中间件管道。

还有很多小事情,例如连接字符串而不是使用StringBuilder'a,Dispose并非到处都应该在任何地方调用它,依此类推。通常,项目的正常状态正在开发中。上面所有这些都是可以解决的,但是直到最后一刻我才想到一个基本问题,因为它浮现在脑海了-即使应用程序未打开,并且即使我的应用程序无法运行,Messenger仍然可以工作。老实说,解决这个问题的办法还没有想到。在这里,显然,您不能没有本机代码。

我认为该项目的就绪程度为70%。

摘要


自该项目开始工作以来已经过去了六个月。结合兼职工作,花了很长的时间,但仍然花费了大量的时间和精力。我计划实施所有已声明的功能+在房间里添加井字游戏或草稿等不寻常的内容。无缘无故,只是因为它很有趣。

如有任何疑问,请写下。邮件在github上。

All Articles