代码之家  ›  专栏  ›  技术社区  ›  Deepboy

如何将错误从执行器通道路由到错误通道?

  •  1
  • Deepboy  · 技术社区  · 6 年前

    有关设置,请参阅附图。解释如下。

    enter image description here

    有一个接收请求的公共输入通道。从这个输入通道,有两个流:

    1. 流1—将请求存储到数据库中

    我希望流1和流2彼此独立。所以我把流1放在执行器通道上。这样,流1中的错误不会中断流2。

    流程1说明:

    1. 从公共输入通道,代码读取请求并将其放入执行器通道。
    2. 我还有一个错误通道(对于项目中的所有类都是通用的),它将悄悄地记录错误

    我所拥有的:

    相反,如果出现错误,它将被重新发布到“公共输入通道”并重复处理,直到队列填满为止。

    我想在执行器通道上的任何错误被发送到错误通道,在那里它将被悄悄地记录和消息将得到处置。

        @Configuration
    @EnableIntegration
    public class InputChanneltoExecutorChannelConfig {
    
    //DEFINING THE EXECUTOR CHANNEL
        @Bean
        public TaskExecutor taskExecutor() {
            return new SimpleAsyncTaskExecutor();
        }
    
        @Bean(name="executorChannelToDB")
        public ExecutorChannel outboundRequests() {
            return new ExecutorChannel(taskExecutor());
        }
    //DEFINE FAILURE CHANNEL FOR USE IN ExpressionEvaluatingRequestHandlerAdvice
        @Bean(name = "DBFailureChannel")
        public static MessageChannel getFailureChannel() {
            return new DirectChannel();
        }   
    
    //MAIN METHOD THAT READS FROM INPUT CHANNEL AND SENDS TO EXECUTOR CHANNEL
        @Bean
        public IntegrationFlow outboundtoDB() {
            return IntegrationFlows
                    .from("commonInputChannel")
                    /*
                     * We publish the msg to be stored into the DB onto a executor
                     * channel (so that the DB operation is processed on a separate
                     * thread).
                     */
                    .channel("executorChannelToDB").get();
                    /****************************************************************************
                            *********************************************************
                     * How do I route the error from executor channel to error channel over here?
                            **********************************************************
                     ****************************************************************************/
        }
    
        /*
         * Create an advice bean to handle DB errors. In case of failure, send
         * response to a separate channel.
         */
        @Bean
        public ExpressionEvaluatingRequestHandlerAdvice expressionAdvice() {
            ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
            advice.setFailureChannelName("DBFailureChannel");
            advice.setOnFailureExpressionString("'##Error while storing request into DB'");
            advice.setTrapException(true);
            return advice;
        }
    
        /*
         * We create a separate flow for DB failure because in future we may need
         * other actions such as retries/notify support in addition to logging.
         */
        @Bean
        public IntegrationFlow failure() {
            return IntegrationFlows.from("DBFailureChannel")
                    .channel("errorChannel").get();
    
        }   
    }
    

    更新:

       @Bean
        public IntegrationFlow outboundtoDB() {
            return IntegrationFlows
                    .from("commonInputChannel")
                    //Setting Headers
                    .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "errorChannel", true))
                    .enrichHeaders(h -> h.header(MessageHeaders.REPLY_CHANNEL, "DBSuccessChannel", true))
                    .channel("executorChannelToDB").get();
    

    DBSuccess channel设置为处理如下响应:

    @Bean
    public IntegrationFlow success() {
        return IntegrationFlows
                .from("DBSuccessChannel")
                .wireTap(
                        flow -> flow.handle(msg -> logger
                                .info("Response from storing in DB : "
                                        + msg.getPayload()))).get();
    }
    

    但我还是有错误,

    2018-09-26 23:34:47.398错误17186---[SimpleAsyncTaskExecutor-465] org.springframework.messaging.MessageHandlingException:嵌套 无法在索引0处分析“创建时间戳”, failedMessage=常规消息 [有效负载=com.td.sba.iep.schema。InstructionRs@37919153, headers={errorChannel=errorChannel, jms\u destination=公共输入通道 jms\u timestamp=1538018141672,jms\u Solace\u isXML=true, replyChannel=DBSuccessChannel,jms\u redelivered=true, JMS\u Solace\u DeliverToOne=false,JMS\u Solace\u elidingqualified=false, JMS\u Solace\u deadmsgqueuequalified=false, id=ff6c2ea6-b6d6-c67a-7943-6b7db33bb977, 时间戳=1538019287394}]

    在这里,jms\u目的地仍然被设置为输入通道,错误不断地被重新发布到commonInputChannel。

    1 回复  |  直到 6 年前
        1
  •  0
  •   Gary Russell    6 年前

    该建议不会有帮助,因为它只适用于该端点,而不适用于下游流,而且,在任何情况下,即使它应用了,到执行器的切换也会成功,并且任何下游异常都由执行器处理(它被包装在一个 ErrorHandlingTaskExecutor 用一个 MessagePublishingErrorHandler

    尝试用标题充实器替换该组件,并设置 errorChannel 标题。或者您可以使用配置了错误通道的MPEH来包装TE(executor通道将检测到TE已经是EHTE)。

    编辑

    这对我来说很好。。。

    @SpringBootApplication
    public class So52526134Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So52526134Application.class, args);
        }
    
        @Bean
        public IntegrationFlow mainFlow() {
            return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5000)))
                    .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "myErrors.input"))
                    .channel(MessageChannels.executor(executor()))
                    .handle((p, h) -> {
                        throw new RuntimeException("foo");
                    })
                    .get();
        }
    
        @Bean
        public IntegrationFlow myErrors() {
            return f -> f.handle((p, h) -> {
                System.out.println("in my error flow");
                return p;
            })
            .handle(System.out::println);
        }
    
        @Bean
        public TaskExecutor executor() {
            return new ThreadPoolTaskExecutor();
        }
    
    }
    

    in my error flow
    ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: ...