在spring云流中,错误处理的不同方法的例子太少了,而部分通过文档提供的少数方法似乎也不起作用。
我有一个测试存储库,尝试了多种错误捕获方法,但没有一种方法能以任何方式工作。
Spring Cloud Streams具有可靠的反序列化和序列化错误处理,但映射、转换和处理器方法的错误处理记录非常少。
样本存储库:
https://github.com/StevenPG/scs-experimentation/tree/main/scs4-error-handling/error-handling
我只有两个主要文件
@SpringBootApplication
public class ErrorHandlingApplication {
public final Random randomNumberGenerator = new Random(System.currentTimeMillis());
public static void main(String[] args) {
SpringApplication.run(ErrorHandlingApplication.class, args);
}
@Bean
public Supplier<Message<String>> randomIntegerPublisher() {
return () -> MessageBuilder
.withPayload(String.valueOf(randomNumberGenerator.nextInt()))
.setHeader(KafkaHeaders.RECEIVED_KEY, 0)
.build();
}
@Bean
public Consumer<KStream<String, String>> errorStream() {
return input -> input
.map((key, value) -> new KeyValue<>(key, Integer.parseInt(value)))
.filter((key, value) -> (value & 1) == 0)
.map((key, value) -> {
throw new RuntimeException("Pushing uncaught error to kill stream!");
}
);
}
@Bean
public Consumer<KStream<String, String>> errorHandledStream() {
return input -> input
.map((key, value) -> new KeyValue<>(key, Integer.parseInt(value)))
.filter((key, value) -> (value & 1) == 0)
.map((key, value) -> {
System.out.println("This should not kill the stream");
throw new RuntimeException("Publishing error to be caught!");
}
);
}
@Bean
public Consumer<ErrorMessage> defaultErrorHandler() {
return v -> {
System.out.println("Caught and handling error");
System.out.println(v.toString());
};
}
@Bean
public CommonErrorHandler defaultCommonErrorHandler() {
return new CommonLoggingErrorHandler();
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setCommonErrorHandler(defaultCommonErrorHandler());
return factory;
}
}
和
spring:
cloud:
function:
definition: randomIntegerPublisher;errorStream;errorHandledStream;defaultErrorHandler
stream:
default:
error-handler-definition: defaultErrorHandler
kafka:
streams:
binder:
deserialization-exception-handler: logandcontinue
bindings:
errorHandledStream-in-0:
error-handler-definition: defaultErrorHandler
consumer:
commonErrorHandlerBeanName: defaultCommonErrorHandler
bindings:
errorHandledStream-in-0:
consumer:
commonErrorHandlerBeanName: defaultCommonErrorHandler
bindings:
randomIntegerPublisher-out-0:
destination: integer-topic
errorStream-in-0:
destination: integer-topic
errorHandledStream-in-0:
destination: integer-topic
error-handler-definition: defaultErrorHandler
几乎所有记录在案的错误处理变体似乎都不能正常工作。
我的第一个流errorStream按预期运行。杀死相关的使用者(尽管全局配置应该捕捉到这一点)。
第二个流errorHandledStream尝试提供捕获错误的配置。
主要的要求是,当map方法中发生异常时(对于本例),能够让一些异常处理程序执行操作,使流不会崩溃并重新启动。
这就是最新的spring云流版本以及以下依赖项的全部内容。
extra["springCloudVersion"] = "2022.0.3"
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka")
使用了以下参考文献:
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-error-handling
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#_error_handling
我在这里遗漏了什么,和/或我可以使用什么参考资料来审查和实施。或者,是否有一个工作示例发布在任何地方(或可以在这里提供)作为起点?