代码之家  ›  专栏  ›  技术社区  ›  StevenPG

Spring Cloud Streams错误处理不起作用

  •  0
  • StevenPG  · 技术社区  · 2 年前

    在spring云流中,错误处理的不同方法的例子太少了,而部分通过文档提供的少数方法似乎也不起作用。

    我有一个测试存储库,尝试了多种错误捕获方法,但没有一种方法能以任何方式工作。

    Spring Cloud Streams具有可靠的反序列化和序列化错误处理,但映射、转换和处理器方法的错误处理记录非常少。

    样本存储库: https://github.com/StevenPG/scs-experimentation/tree/main/scs4-error-handling/error-handling

    我只有两个主要文件

    @SpringBootApplication
    public class ErrorHandlingApplication {
    
        public final Random randomNumberGenerator = new Random(System.currentTimeMillis());
    
        public static void main(String[] args) {
            SpringApplication.run(ErrorHandlingApplication.class, args);
        }
    
        @Bean
        public Supplier<Message<String>> randomIntegerPublisher() {
            return () -> MessageBuilder
                    .withPayload(String.valueOf(randomNumberGenerator.nextInt()))
                    .setHeader(KafkaHeaders.RECEIVED_KEY, 0)
                    .build();
        }
    
        @Bean
        public Consumer<KStream<String, String>> errorStream() {
            return input -> input
                    // Remove odd numbers so we throw an exception on every other message
                    .map((key, value) -> new KeyValue<>(key, Integer.parseInt(value)))
                    .filter((key, value) -> (value & 1) == 0)
                    .map((key, value) -> {
                        throw new RuntimeException("Pushing uncaught error to kill stream!");
                    }
            );
        }
    
        @Bean
        public Consumer<KStream<String, String>> errorHandledStream() {
            return input -> input
                    // Remove odd numbers so we throw an exception on ever other message
                    .map((key, value) -> new KeyValue<>(key, Integer.parseInt(value)))
                    .filter((key, value) -> (value & 1) == 0)
                    .map((key, value) -> {
                                System.out.println("This should not kill the stream");
                                throw new RuntimeException("Publishing error to be caught!");
                            }
                    );
        }
    
        @Bean
        // TODO - doesn't seem to be working, is this because we're using kstreams?
        public Consumer<ErrorMessage> defaultErrorHandler() {
            return v -> {
                System.out.println("Caught and handling error");
                System.out.println(v.toString());
            };
        }
    
        @Bean
        // TODO - not working via the config
        /**
         * bindings:
         *   errorHandledStream-in-0:
         *     consumer:
         *       commonErrorHandlerBeanName: defaultCommonErrorHandler
         */
        public CommonErrorHandler defaultCommonErrorHandler() {
            return new CommonLoggingErrorHandler();
        }
    
        /**
         * Also not working
         */
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
        kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setCommonErrorHandler(defaultCommonErrorHandler());
            return factory;
        }
    }
    

    spring:
      cloud:
        function:
          definition: randomIntegerPublisher;errorStream;errorHandledStream;defaultErrorHandler
        stream:
          default:
            error-handler-definition: defaultErrorHandler
          kafka:
            streams:
              binder:
                deserialization-exception-handler: logandcontinue
              bindings:
                errorHandledStream-in-0:
                  error-handler-definition: defaultErrorHandler
                  consumer:
                    commonErrorHandlerBeanName: defaultCommonErrorHandler
            bindings:
              errorHandledStream-in-0:
                consumer:
                  commonErrorHandlerBeanName: defaultCommonErrorHandler
          bindings:
            randomIntegerPublisher-out-0:
              destination: integer-topic
            errorStream-in-0:
              destination: integer-topic
            errorHandledStream-in-0:
              destination: integer-topic
              error-handler-definition: defaultErrorHandler
    

    几乎所有记录在案的错误处理变体似乎都不能正常工作。

    我的第一个流errorStream按预期运行。杀死相关的使用者(尽管全局配置应该捕捉到这一点)。

    第二个流errorHandledStream尝试提供捕获错误的配置。

    主要的要求是,当map方法中发生异常时(对于本例),能够让一些异常处理程序执行操作,使流不会崩溃并重新启动。

    这就是最新的spring云流版本以及以下依赖项的全部内容。

    extra["springCloudVersion"] = "2022.0.3"
    
        implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
        implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka")
    

    使用了以下参考文献:

    https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-error-handling

    https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#_error_handling

    我在这里遗漏了什么,和/或我可以使用什么参考资料来审查和实施。或者,是否有一个工作示例发布在任何地方(或可以在这里提供)作为起点?

    推荐文章