代码之家  ›  专栏  ›  技术社区  ›  kiran reddy

split&aggregate flow中的Spring集成验证

  •  1
  • kiran reddy  · 技术社区  · 7 年前

    我正在尝试添加过滤器以丢弃流,并在失败后继续执行主流,并聚合拆分器。错误和错误的预期类型;成功是一样的。没有特定的聚合器逻辑。

    @Bean
    public IntegrationFlow flow() {
         return f -> f
             .split(Orders.class, Orders::getItems)
             .enrich(e -> e.requestChannel("enrichChannel"))
             .filter(Order.class, c -> c.getId() > 10 ? true : false,
                 e -> e.discardChannel(validationError()))
             .handle(new MyHandler())
             .transform(new MapToObjectTransformer(Order.class))
             .enrich(e -> e.requestChannel("transformChannel"))
             .filter(Order.class, c -> c.getTotal() > 100 ? true : false,
                 e -> e.discardChannel(validationError())).handle( new transformer())
             .aggregate();
     }
                
    @Bean
    public IntegrationFlow validationErrorFlow() {
     return IntegrationFlows.from(validationError())
             .handle(new ValidationHandler())
             .get();
    }
    

    discard通道没有连接回主流以执行拆分中的下一项。

    我可以写路由(&A);子流映射,但将在路由中嵌套过多->子流量->路线->尝试使用过滤器解决此问题的子流。是否有更好的方法来执行验证并继续拆分流中的所有项目。

    更新1:

    .handle(request.class, (p, h) -> validator.validate(p)
    .gateway("filterFlow.input")
    .handle(new MyHandler())
    .enrich(...)
    .handle(...)
    .enrich(...)
    .handle(...)
    .enrich(...)
    .handle(...)
    .aggregate();
    
    
    
    @Bean
        public IntegrationFlow filterFlow() {
            return f -> f
                    .filter(response.class, c -> c.isValidationStatus(), df -> df.discardFlow
                            (flow -> flow.handle(Message.class, (p, h) -> p.getPayload())));
        }
    

    网关能够拦截请求,但执行的流 .handle(new MyHandler()) 而不是 split()

    更新2:(回答)来自Artem

    .handle(request.class, (p, h) -> validator.validate(p))
        .filter(response.class,p -> p.isValidationStatus(), f -> f.discardChannel("aggregatorChannel"))
        .handle(new MyHandler())
        .enrich(...)
        .handle(...)
        .enrich(...)
        .handle(...)
        .enrich(...)
        .handle(...)
        .channel("aggregatorChannel")
        .aggregate();
    

    这将执行条件跳过(&a);继续流动。

    1 回复  |  直到 4 年前
        1
  •  0
  •   Artem Bilan    7 年前

    discard通道没有连接回主流以执行拆分中的下一项。

    这是真的。它就是这样设计的。在大多数情况下,丢弃流类似于JMS中的死信队列。所以,这是短的单向分支。

    如果您真的想回到主流,那么应该考虑在流定义中使用命名通道。我的意思是,在补偿(放弃)流之后,您希望返回的点:

    .filter(Order.class, c -> c.getId() > 10 ? true : false,
                        e -> e.discardFlow(sf -> sf
                                .gateway(validationError())
                                .channel("myHandleChannel")))
                .channel("myHandleChannel")
                .handle(new MyHandler())
    

    我使用 gateway() 因为我们需要discard流的回复才能继续处理。我们需要这个 .channel("myHandleChannel") 在子流的末尾,因为丢弃流是一个分支。

    另一种实现方法是 .gateway() 在主流上:

    .gateway("filterFlow.input")
    .handle(new MyHandler())
    
    ...
    
    @Bean
    public IntegrationFlow filterFlow() {
        return f -> f
                .filter(Order.class, c -> c.getId() > 10 ? true : false,
                        e -> e.discardChannel(validationError()));
    }
    

    我们发送到 discardChannel 相同的请求消息,因此 replyChannel 所述网关的标头仍然存在。您唯一需要的是确保从 .handle(new ValidationHandler()) .