代码之家  ›  专栏  ›  技术社区  ›  Yoni Gibbs

如何在当前消息后关闭Spring Boot Kafka流处理应用程序

  •  0
  • Yoni Gibbs  · 技术社区  · 4 年前

    在使用springcloud函数的Spring-Boot Kafka流处理应用程序中,如何在响应当前消息之后关闭应用程序(以停止它接收任何进一步的消息)?

    场景是这样的:我们有一个应用程序处理一条消息,其中包含对某个文件的引用。我们通过使用一些第三方库询问给定文件的内容并用从文件中提取的一些数据进行响应来处理这个问题。在某些罕见的情况下,某些文件会导致外部库挂起。所以我们在后台线程上调用这些库,如果需要的时间太长,就会超时。我们需要回复Kafka这个消息(用一些JSON详细说明错误),这样Kafka就不会将它发送到我们应用程序的任何其他实例(因为它可能会导致该实例挂起)。但是我们希望SpringBoot应用程序的这个实例关闭,因为我们无法从第三方库中的挂起中完全恢复(否则我们可能会导致资源或内存泄漏)。另一个实例将由Kubernetes或Docker Swarm或其他什么自动产生。

    我不确定它是否相关,但我们使用的是Spring云函数,我们加入了两个流:“file”流,其中每个消息都包含对要处理的文件的引用,以及带有一些配置信息的GlobalKTable。代码如下(在Kotlin中):

        // The service we use to run the process on a background thread
        private val executorService = Executors.newFixedThreadPool(1)
    
        @Bean
        fun process() = BiFunction<KStream<String, FileInfo>,
            GlobalKTable<String, Config>,
            KStream<String, FileInterrogationResult>> { fileStream, configTable ->
    
            requestStream.join(
                configTable,
                { _, file -> file.configId },
                { file, config ->
                    try {
                        // Process the file using the 3rd party libraries.
                        val result = executorService.submit(theThirdPartyLibExtractionFunction)
                            .get(someTimeout, TimeUnit.MILLISECONDS)
    
                        // Success: return some FileInterrogationResult object wrapping the result from above.
                    } catch (e: TimeoutException) {
                        // Return some FileInterrogationResult object with error details.
                        // TODO: Here we know that after this function completes the app should shut down. How do we do this?
                    } catch (e: Throwable) {
                        // Return some FileInterrogationResult object with error details.
                    }
                }
            )
    
    0 回复  |  直到 4 年前
        1
  •  0
  •   Oleg Zhurakousky    4 年前

    您可以使用执行器端点来停止/启动/暂停/恢复以及监视单个绑定。 curl 管理绑定。 例如

    curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
    

    关于如何做这件事的确切说明 here