代码之家  ›  专栏  ›  技术社区  ›  Roman

Spring集成:刷新/重新加载动态注册的IntegrationFlow

  •  1
  • Roman  · 技术社区  · 8 年前

    我从“dynConfigFlows”目录加载xml配置文件,然后动态创建IntegrationFlows。
    每个xml文件一个IntegrationFlow。如果编辑了配置文件,则应更新IntegrationFlow。IntegrationFlow的注册id设置为配置文件名。
    因此,应平稳地停止电流(不再有输入消息,且应完成当前处理)。在此之后,应将其删除,并注册流的更新版本。
    现在问题是:
    a) 在下面的JUnit测试中,我能够用相同的ID注册两个流,以防我不调用注册。删除(id);这算是行为吗?ID不应该是唯一的吗?
    b) 调用remove(…)后我尝试向删除的InputChannel再发送一条消息。我收到一个错误,但仍然可以看到从工作删除流中的日志输出。

    我应该如何实现动态IntegrationFlows的刷新/重新加载功能?

    @Test
    public void testRegisterUnregister() {
        final String FLOW_ID = "regFlow";
        Flux<Message<?>> messageFlux = Flux.just("1,2,3,4").map(v -> v.split(",")).flatMapIterable(Arrays::asList)
                .map(Integer::parseInt).map(GenericMessage<Integer>::new);
    
        QueueChannel resultChannel = new QueueChannel();
    
        IntegrationFlow regFlow = createFlow("first", messageFlux, resultChannel);
    
        IntegrationFlowRegistration register1 = this.flowContext.registration(regFlow).id(FLOW_ID).register();
        final String FIRST_ID = register1.getId();
        System.out.println("!!!!!!!!!!!!!!!! register flow: "+FIRST_ID);
        // validate incoming of 4 integers
        assertThat(resultChannel.getQueueSize()).as("queueSize").isEqualTo(4);
        // save reference to first InputChannel
        MessageChannel input1 = register1.getInputChannel();
    
        // stop and remove first flow
        register1.stop();
        this.flowContext.remove(FIRST_ID);
        // wait for the case if flows shutdown needs some time
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    
        // create second flow
        regFlow = createFlow("second", messageFlux, resultChannel);
        IntegrationFlowRegistration register2 = this.flowContext.registration(regFlow).id(FLOW_ID).register();
        System.out.println("!!!!!!!!!!!!!!!! register flow: "+register2.getId());
        // send extra message to the second flow
        register2.getInputChannel().send(MessageBuilder.withPayload("!!!!!!!!!!! new registered flow message").build());
        try {
            // send extra message to the first flow. It should fail because first flow is removed.
            // It fails but we still see "!!!!!!!!!!! new registered flow message" from first flow. It should't happen!
            input1.send(MessageBuilder.withPayload("!!!!!!!!!!! new registered flow message").build());
            Assert.fail("we should get an exception because first flow is removed");
        } catch (MessageDeliveryException e) {
            e.printStackTrace();
        }
    
        assertThat(resultChannel.getQueueSize()).as("queueSize=2 x 4 + 1").isEqualTo(9);
    }
    
    private IntegrationFlow createFlow(String markerText, Flux<Message<?>> messageFlux, QueueChannel resultChannel) {
        return IntegrationFlows
                .from(messageFlux)
                .log(l -> "!!!!!!!!!!!!!!!!!!!!!!! "+markerText+"="+l)
                .channel(resultChannel)
                .get();
    }
    
    1 回复  |  直到 8 年前
        1
  •  1
  •   Artem Bilan    8 年前

    第一个问题肯定是一个bug: https://jira.spring.io/browse/INT-4413 .

    我们可以覆盖应用程序上下文中的现有bean,因为默认情况下允许:

    if (oldBeanDefinition != null) {
            if (!isAllowBeanDefinitionOverriding()) {
                throw new BeanDefinitionStoreException(beanDefinition.getResourceDescription(), beanName,
                        "Cannot register bean definition [" + beanDefinition + "] for bean '" + beanName +
                        "': There is already [" + oldBeanDefinition + "] bound.");
            }
    

    然而,从 IntegrationFlowContext . 我们将在上述JIRA中拒绝该国。

    第二个问题与停止或销毁无关。这是关于 ChannelInterceptor 这正是我们所拥有的 .log() 操作员: https://docs.spring.io/spring-integration/docs/5.0.2.RELEASE/reference/html/java-dsl.html#java-dsl-log . 换句话说,它不是频道的订户。这就是为什么从 .日志() ,但仍然失败 doesn't have subscribers to accept messages ,因为流已被破坏。

    另一方面,这让您有点困惑,因为我们没有bean,但我们仍然有对象。要完全移除它,我们应该 null 我们在这件事上的所有变量:-)。