代码之家  ›  专栏  ›  技术社区  ›  Screwtape

为什么我的ConsumerTemplate没有从ActiveMQ主题中读取任何消息?

  •  0
  • Screwtape  · 技术社区  · 7 年前

    我正试图使用持久的订阅服务器端点,不时地使用来自activemq主题的消息作为轮询使用者。

    在我的bean中,我有一个ConsumerTemplate,从中我尝试接收交换,并发送到另一个URI。

    bean方法是:

    public void pollConsumer() throws Exception {
        long count = 0;
        try {
        if ( consumerEndpoint == null ) consumerEndpoint = consumer.getCamelContext().getEndpoint( endpointUri );
        logger.debug( "Consuming: " + consumerEndpoint.getEndpointUri() );
        consumer.start();
        producer.start();
        while ( true ) {
            logger.trace("Awaiting message: " + ++count );
            Exchange exchange = consumer.receive( consumerEndpoint, 1000 );
            if ( exchange == null ) break;
            logger.trace("Processing message: " + count );
            producer.send( exchange );
            consumer.doneUoW( exchange );
            logger.trace("Processed message: " + count );
        }
        producer.stop();
        consumer.stop();
        } catch ( Throwable t ) {
            logger.error("Something went wrong!", t );
            throw t;
        }
    }
    

    当被调用时,记录器以窗体的形式显示“消费”消息。

    activemq://topic:fromQueue.Name?clientId=MyClient&durableSubscriptionName=MyClient&selector=RecordType+IN+%28+%271%27%2C+%272%27+%29+AND+SubType+%3D+%272%27
    

    据我所见,哪个是正确的(选择器应该 RecordType IN ('1', '2') AND SubType = '2' 没有URL编码。

    我只收到一个“等待消息”日志,没有其他内容,所以似乎没有检索到任何内容。

    奇怪的是,它也没有在ActiveMQ上注册为一个持久的订户,所以看起来它根本没有做任何事情,但它也没有注册任何错误,所以我很困惑。

    有人能提出为什么这可能不起作用,或者至少我应该从哪里开始寻找?

    2 回复  |  直到 7 年前
        1
  •  1
  •   pcoates    7 年前

    如果消息必须等待一秒钟以上才能进入队列/主题,则PollConsumer将停止。

    它等待消息1秒钟,然后返回空值,并将中断while循环并停止使用者。

        Exchange exchange = consumer.receive( consumerEndpoint, 1000 );
    ==> if ( exchange == null ) break;
        logger.trace("Processing message: " + count );
        producer.send( exchange );
        consumer.doneUoW( exchange );
    

    仅仅使用阿帕奇骆驼路线来做你描述的事情会更容易。

        2
  •  0
  •   Screwtape    7 年前

    注意到@pcoates answer,并尝试为测试目的延长超时时间,很明显问题是URI上的持久订阅选项没有被执行,并且由于在1秒的等待期间没有关于主题的新消息,所以没有发生任何事情。

    答案 another question 与持久订阅相关的说明您不能使用来自轮询使用者的持久订阅。

    因此,我的解决方案是订阅主题并将消息路由到一个新队列,并在这个新队列上拥有轮询使用者。这不太好,因为我不希望有额外的队列,但是它可以工作,而且比编写新版本的JMSPollingConsumer要省力。