代码之家  ›  专栏  ›  技术社区  ›  ioreskovic

Lagom服务使用来自Kafka的输入

  •  5
  • ioreskovic  · 技术社区  · 6 年前

    我正在试图弄清楚如何使用Lagom来使用通过Kafka进行通信的外部系统的数据。

    我碰到过这个 section of Lagom documentation ,它描述了Lagom服务如何通过订阅其主题与另一个Lagom服务通信。

    helloService
      .greetingsTopic()
      .subscribe // <-- you get back a Subscriber instance
      .atLeastOnce(
      Flow.fromFunction(doSomethingWithTheMessage)
    )
    

    但是,当您想要订阅包含由某个随机外部系统生成的事件的卡夫卡主题时,适当的配置是什么?

    这个功能需要某种适配器吗?

    object Aggregator {
      val TOPIC_NAME = "my-aggregation"
    }
    
    trait Aggregator extends Service {
      def aggregate(correlationId: String): ServiceCall[Data, Done]
    
      def aggregationTopic(): Topic[DataRecorded]
    
      override final def descriptor: Descriptor = {
        import Service._
    
        named("aggregator")
          .withCalls(
            pathCall("/api/aggregate/:correlationId", aggregate _)
          )
          .withTopics(
            topic(Aggregator.TOPIC_NAME, aggregationTopic())
              .addProperty(
                KafkaProperties.partitionKeyStrategy,
                PartitionKeyStrategy[DataRecorded](_.sessionData.correlationId)
              )
          )
          .withAutoAcl(true)
      }
    }
    

    然而,我希望它能被消费调用 Data

    我想知道是否有这样一种方式可以以类似于此模型的方式配置描述符:

    override final def descriptor: Descriptor = {
      ...
      kafkaTopic("my-input-topic")
        .subscribe(serviceCall(aggregate _)
        .withAtMostOnceDelivery
    }
    

    discussion on Google Groups ,但在老年退休金计划的问题中,我看不出他真的在做什么 EventMessage 他来自哪里 some-topic

    编辑#1:进度更新

    我又增加了两个模块, aggregator-kafka-proxy-api aggregator-kafka-proxy-impl .

    在新的api模块中,我定义了一个新的服务,没有方法,只有一个主题代表我的卡夫卡主题:

    object DataKafkaPublisher {
      val TOPIC_NAME = "data-in"
    }
    
    trait DataKafkaPublisher extends Service {
      def dataInTopic: Topic[DataPublished]
    
      override final def descriptor: Descriptor = {
        import Service._
        import DataKafkaPublisher._
    
        named("data-kafka-in")
          .withTopics(
            topic(TOPIC_NAME, dataInTopic)
              .addProperty(
                KafkaProperties.partitionKeyStrategy,
                PartitionKeyStrategy[SessionDataPublished](_.data.correlationId)
              )
          )
          .withAutoAcl(true)
      }
    }
    

    class DataKafkaPublisherImpl(persistentEntityRegistry: PersistentEntityRegistry) extends DataKafkaPublisher {
      override def dataInTopic: Topic[api.DataPublished] =
        TopicProducer.singleStreamWithOffset {
          fromOffset =>
            persistentEntityRegistry.eventStream(KafkaDataEvent.Tag, fromOffset)
              .map(ev => (convertEvent(ev), ev.offset))
        }
    
      private def convertEvent(evt: EventStreamElement[KafkaDataEvent]): api.DataPublished = {
        evt.event match {
          case DataPublished(data) => api.DataPublished(data)
        }
      }
    }
    

    aggregator-impl 模块中,我添加了一个“订阅者”服务,该服务接收这些事件,并在实体上调用适当的命令。

    class DataKafkaSubscriber(persistentEntityRegistry: PersistentEntityRegistry, kafkaPublisher: DataKafkaPublisher) {
    
      kafkaPublisher.dataInTopic.subscribe.atLeastOnce(
        Flow[DataPublished].mapAsync(1) { sd =>
          sessionRef(sd.data.correlationId).ask(RecordData(sd.data))
        }
      )
    
      private def sessionRef(correlationId: String) =
        persistentEntityRegistry.refFor[Entity](correlationId)
    }
    

    这有效地让我发布了一条关于卡夫卡主题“中的数据”的消息,然后将其代理并转换为 RecordData

    然而,对我来说,这似乎有点骇人。我和卡夫卡是由拉格姆·卡夫卡结合在一起的。我无法轻松交换数据源。例如,如果我愿意,我将如何使用来自RabbitMQ的外部消息? 如果我试图从另一个卡夫卡(与Lagom使用的卡夫卡不同)中消费,该怎么办?

    编辑#2:更多文档

    我发现了一些关于Lagom文档的文章,特别是:

    Consuming Topics from 3rd parties

    你的Lagom项目。该模块将包含一个服务描述符 声明您将从中使用的主题。一旦你有了你的 别致的服务。最后,您可以使用所描述的主题 在第三方服务中,如订阅主题中所述 部分

    1 回复  |  直到 6 年前
        1
  •  1
  •   pme    6 年前

    我不使用 lagom 所以这可能只是一个想法。但作为 akka-streams 拉贡 (至少我认为)-从这个解决方案中得到您需要的东西应该很容易。

    我曾经 akka-stream-kafka 这真的很好(我只做了一个原型)

    当您使用消息时,您会做一些事情:

         Consumer
          .committableSource(
              consumerSettings(..), // config of Kafka
              Subscriptions.topics("kafkaWsPathMsgTopic")) // Topic to subscribe
          .mapAsync(10) { msg =>
            business(msg.record) // do something
          }
    

    检查字迹是否正确 documentation

    PathMsgConsumer

        2
  •  0
  •   ioreskovic    6 年前

    here .

    第1部分:

    如果您仅在业务服务中使用外部Kafka群集 然后,您可以仅使用Lagom Broker API实现这一点。所以你需要 致:

    1. 使用仅包含主题定义的服务描述符创建API(未实现此API)
    2. 在您的业务服务中,根据您的部署配置kafka_native(正如我在前一篇文章中提到的)

    在Lagom Broker API中,处理的是偏移提交

    第2部分:

    Kafka和AMQP消费者实现需要持久的akka 流动因此,您需要处理断开连接。这些工作可以分两步完成 方式:

    1. 通过将其包裹在一名演员中来控制peristant akka流。在actor preStart和pipe stream complete上初始化流 给那个能阻止它的演员。如果流完成或失败,则 我会停下来的。然后使用重启策略将参与者包装为参与者中的参与者退避, 这将在完成或失败的情况下重新启动参与者,并且 重新初始化流

    我通常使用1,但还没有尝试2。

    初始化#1的退避参与者或#2的流可以在 现在使用Lagom代理API订阅)。

    在配置使用者时,请确保设置使用者组以确保 避免重复消费。您可以像Lagom一样使用服务 描述符中的名称作为使用者组名称。

    推荐文章