代码之家  ›  专栏  ›  技术社区  ›  softveda

如何设计处理到达队列的消息的服务

  •  4
  • softveda  · 技术社区  · 16 年前

    我有一个多线程Windows服务的设计问题,它处理来自多个客户机的消息。 规则是

    • 每个消息都是为一个实体(具有唯一的ID)处理一些东西,并且可以是不同的,即DOA、DOB、DOC等。实体ID在消息的有效负载中。
    • 处理可能需要一些时间(最多几秒钟)。
    • 必须按照每个实体(具有相同ID)到达的顺序处理消息。
    • 但是,可以同时为另一个实体处理消息(即,只要它们不是同一个实体ID)
    • 并发处理的数量是可配置的(通常为8个)
    • 消息不能丢失。如果在处理消息时出错,则必须存储该消息和同一实体的所有其他消息,以便将来手动处理。
    • 消息到达事务性msmq队列。

    您将如何设计服务。我有一个可行的解决方案,但我想知道其他人将如何解决这个问题。

    4 回复  |  直到 15 年前
        1
  •  0
  •   Kiril    15 年前

    我的方法如下:

    1. 使用可配置的线程数创建线程池。
    2. 保留实体ID的映射,并将每个ID与消息队列相关联。
    3. 当您收到一条消息时,将其放入对应实体ID的队列中。
    4. 每个线程将只查看专用于它的实体ID(例如,创建一个初始化为此类服务的类(entity id id))。
    5. 让线程只处理来自其专用实体ID队列的消息。
    6. 处理完给定实体ID的所有消息后,从映射中删除该ID并退出线程循环。
    7. 如果线程池中有空间,则添加一个新线程来处理下一个可用的实体ID。

    您必须管理当时无法处理的消息,包括消息处理失败的情况。创建积压的消息等。

    如果您可以访问并发映射(无锁/无等待映射),那么您可以拥有多个对该映射的读写器,而无需锁定或等待。如果你不能得到一个并发的映射,那么所有的意外情况都将出现在映射上:每当你向映射中的一个队列添加消息或者你添加新的实体ID时,你必须锁定它。最好的做法是将映射包装在一个结构中,该结构提供了使用适当的锁进行读写的方法。

    我认为您不会看到锁对性能的任何显著影响,但是如果您确实开始看到锁,我建议您创建自己的无锁哈希图: http://www.azulsystems.com/events/javaone_2007/2007_LockFreeHash.pdf

    实施这个系统不是一个基本的任务,所以把我的意见作为一个总的指导方针…由工程师来实施适用的想法。

        2
  •  1
  •   dferraro    15 年前

    您要做的第一件事是后退一步,并考虑到这个应用程序的性能有多重要。你…吗 真的? 需要同时处理消息吗?它是任务关键吗?或者你只是 认为 你需要它吗?您是否在您的服务上运行了一个探查器来发现过程的真正瓶颈并优化这些瓶颈?

    我之所以问,是因为你提到你想要8个并发过程-但是,如果你让这个应用程序单线程化,它会 大大地 减少复杂性、开发和测试时间…既然你只想要8个,这似乎不值得…

    其次,因为您只能处理同一实体上的并发消息- 您真正从客户机获得并发请求以处理同一实体的频率是多少 ?对于一个可能不会经常出现的用例,是否值得添加这么多的复杂性层?

    我会亲吻。我将通过WCF使用msmq,并将我的WCF服务保留为单例。现在您拥有了msmq的电源和有序的可靠性,现在您可以满足您的实际需求了。然后我用实际的数据在高负载下测试它,并运行一个分析器来发现瓶颈。 如果 我觉得太慢了。只有这样,我才能通过构建一个更复杂的应用程序来管理仅针对特定用例的并发性,从而解决所有额外的问题…

    一个需要考虑的设计是创建一个中央的“门卫”或“服务总线”服务,它接收来自客户机的所有消息,然后将这些消息传递给实际的工人服务。当他收到一个请求时,他会发现他的另一个客户机是否已经在处理同一个实体的消息——如果是,他会将消息发送到他发送另一个消息的同一个服务。通过这种方式,您可以同时处理给定实体的相同消息,而无需其他任何操作…而且您可以轻松实现无缝的可扩展性…但是,只有在我必须这样做的情况下,我才会这样做,并且通过分析和测试证明了这一点,而不是因为“我们认为我们需要它”(见Yagni Principal:)

        3
  •  0
  •   Joe Caffeine    16 年前

    虽然我的需求与您的不同,但我必须处理来自消息队列的并发处理。我的解决方案是拥有一个服务,它可以查看每个传入的消息,并将其传递给代理进程来使用。该服务有一个设置,控制它可以运行多少个代理。

        4
  •  0
  •   Ian Ringrose    15 年前

    我将研究从单个线程安全队列中读取每个线程的N个线程。然后我将散列entityid以决定是否打开witch队列,并打开一条输入消息。

    有时,一些线程将无事可做,但如果您有更多的线程,而不是CPU,这是一个问题吗?

    (还有你 可以 希望按类型将Entities分组到队列中,以便减少数据库中的锁定冲突数。)