我正在试图弄清楚如何使用Lagom来使用通过Kafka进行通信的外部系统的数据。
我碰到过这个
section of Lagom documentation
,它描述了Lagom服务如何通过订阅其主题与另一个Lagom服务通信。
helloService
.greetingsTopic()
.subscribe
.atLeastOnce(
Flow.fromFunction(doSomethingWithTheMessage)
)
但是,当您想要订阅包含由某个随机外部系统生成的事件的卡夫卡主题时,适当的配置是什么?
这个功能需要某种适配器吗?
object Aggregator {
val TOPIC_NAME = "my-aggregation"
}
trait Aggregator extends Service {
def aggregate(correlationId: String): ServiceCall[Data, Done]
def aggregationTopic(): Topic[DataRecorded]
override final def descriptor: Descriptor = {
import Service._
named("aggregator")
.withCalls(
pathCall("/api/aggregate/:correlationId", aggregate _)
)
.withTopics(
topic(Aggregator.TOPIC_NAME, aggregationTopic())
.addProperty(
KafkaProperties.partitionKeyStrategy,
PartitionKeyStrategy[DataRecorded](_.sessionData.correlationId)
)
)
.withAutoAcl(true)
}
}
然而,我希望它能被消费调用
Data
我想知道是否有这样一种方式可以以类似于此模型的方式配置描述符:
override final def descriptor: Descriptor = {
...
kafkaTopic("my-input-topic")
.subscribe(serviceCall(aggregate _)
.withAtMostOnceDelivery
}
discussion on Google Groups
,但在老年退休金计划的问题中,我看不出他真的在做什么
EventMessage
他来自哪里
some-topic
编辑#1:进度更新
我又增加了两个模块,
aggregator-kafka-proxy-api
和
aggregator-kafka-proxy-impl
.
在新的api模块中,我定义了一个新的服务,没有方法,只有一个主题代表我的卡夫卡主题:
object DataKafkaPublisher {
val TOPIC_NAME = "data-in"
}
trait DataKafkaPublisher extends Service {
def dataInTopic: Topic[DataPublished]
override final def descriptor: Descriptor = {
import Service._
import DataKafkaPublisher._
named("data-kafka-in")
.withTopics(
topic(TOPIC_NAME, dataInTopic)
.addProperty(
KafkaProperties.partitionKeyStrategy,
PartitionKeyStrategy[SessionDataPublished](_.data.correlationId)
)
)
.withAutoAcl(true)
}
}
class DataKafkaPublisherImpl(persistentEntityRegistry: PersistentEntityRegistry) extends DataKafkaPublisher {
override def dataInTopic: Topic[api.DataPublished] =
TopicProducer.singleStreamWithOffset {
fromOffset =>
persistentEntityRegistry.eventStream(KafkaDataEvent.Tag, fromOffset)
.map(ev => (convertEvent(ev), ev.offset))
}
private def convertEvent(evt: EventStreamElement[KafkaDataEvent]): api.DataPublished = {
evt.event match {
case DataPublished(data) => api.DataPublished(data)
}
}
}
aggregator-impl
模块中,我添加了一个“订阅者”服务,该服务接收这些事件,并在实体上调用适当的命令。
class DataKafkaSubscriber(persistentEntityRegistry: PersistentEntityRegistry, kafkaPublisher: DataKafkaPublisher) {
kafkaPublisher.dataInTopic.subscribe.atLeastOnce(
Flow[DataPublished].mapAsync(1) { sd =>
sessionRef(sd.data.correlationId).ask(RecordData(sd.data))
}
)
private def sessionRef(correlationId: String) =
persistentEntityRegistry.refFor[Entity](correlationId)
}
这有效地让我发布了一条关于卡夫卡主题“中的数据”的消息,然后将其代理并转换为
RecordData
然而,对我来说,这似乎有点骇人。我和卡夫卡是由拉格姆·卡夫卡结合在一起的。我无法轻松交换数据源。例如,如果我愿意,我将如何使用来自RabbitMQ的外部消息?
如果我试图从另一个卡夫卡(与Lagom使用的卡夫卡不同)中消费,该怎么办?
编辑#2:更多文档
我发现了一些关于Lagom文档的文章,特别是:
Consuming Topics from 3rd parties
你的Lagom项目。该模块将包含一个服务描述符
声明您将从中使用的主题。一旦你有了你的
别致的服务。最后,您可以使用所描述的主题
在第三方服务中,如订阅主题中所述
部分