代码之家  ›  专栏  ›  技术社区  ›  Jimoc

始终确保ActiveMQ主题中只有最后10条消息

  •  1
  • Jimoc  · 技术社区  · 16 年前

    我们在ActiveMQ中遇到了一个问题,其中有大量的消息没有从主题中删除。 主题设置为非持久性、非持久性。

    <beans>
    
      <broker xmlns="http://activemq.apache.org/schema/core" useJmx="false" persistent="false">
    
    <!--
        <persistenceAdapter>
          <journaledJDBC journalLogFiles="5" dataDirectory="../data"/>
        </persistenceAdapter>
    -->
    
            <transportConnectors>
                <transportConnector uri="vm://localhost"/>
            </transportConnectors>
    
      </broker>
    
    </beans>
    

    messaging-config.xml中的主题定义是

    <destination id="traceChannel">
    
        <properties>
    
            <network>
            <session-timeout>10</session-timeout>
        </network>
    
            <server>
                <message-time-to-live>10000</message-time-to-live>
                <durable>false</durable>
                <durable-store-manager>flex.messaging.durability.FileStoreManager</durable-store-manager>
            </server>
    
            <jms>
                <destination-type>Topic</destination-type>
                <message-type>javax.jms.ObjectMessage</message-type>
                <connection-factory>ConnectionFactory</connection-factory>
                <destination-jndi-name>dynamicTopics/traceTopic</destination-jndi-name>
                <delivery-mode>NON_PERSISTENT</delivery-mode>
                <message-priority>DEFAULT_PRIORITY</message-priority>
                <acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode>
                <transacted-sessions>false</transacted-sessions>
                <initial-context-environment>
                    <property>
                        <name>Context.INITIAL_CONTEXT_FACTORY</name>
                        <value>org.apache.activemq.jndi.ActiveMQInitialContextFactory</value>
                    </property>
                    <property>
                        <name>Context.PROVIDER_URL</name>
                        <value>tcp://localhost:61616</value>
                    </property>
                </initial-context-environment>
            </jms>
        </properties>
    
        <channels>
            <channel ref="rtmps" />
        </channels>
    
        <adapter ref="trace" />
    
    </destination>
    

    我试图做到的是,在任何时候,只有最后10条消息是关于主题的,因为让它在一夜之间运行会导致超过15万条关于主题的消息,尽管它应该只包含一个非常小的数字。

    4 回复  |  直到 16 年前
        1
  •  5
  •   Henryk Konsek    16 年前

    据我所知,发送到没有订阅者的非持久主题的消息应该被删除。只有当前注册的消费者才能获得消息副本。

    无论您的非持久主题不应缓存这些150K消息,您都可以使用代理策略限制每个消费者存储的消息量:

    <broker>
    ...    
      <pendingMessageLimitStrategy>
        <constantPendingMessageLimitStrategy limit="10"/>
      </pendingMessageLimitStrategy>
    ...
    </broker>
    
        2
  •  2
  •   srodriguez    16 年前

    我不确定你想要的东西是否可以存档。

    首先,你可以为你的信息设定一个生存时间:

    public ITopicPublisher CreateTopicPublisher(string selector)
            {
                try
                {
                    IMessageProducer producer = m_session.CreateProducer(m_topic);
                    ActiveMQPublisher publisher = new ActiveMQPublisher(producer);
    
                    // here we put a time to live to 1min for eg
                    TimeSpan messageTTL = TimeSpan.FromMilliseconds(60000);
                    publisher.TimeToLive = messageTTL;
    
                    if (!String.IsNullOrEmpty(selector))
                    {
                        publisher.IsSelector = true;
                        publisher.Selector = selector;
                    }
                    return publisher;
                }
                catch (Exception ex)
                {
                    Logger.Exception(ex);
                    throw;
                }
            }
    

    早上你仍然会收到15万条信息,但一旦一个消费者连接到你的主题,过期的信息就会消失。

    Eviction Strategy )

    编辑:我刚刚意识到一件事。你说“让它在一夜之间运行会产生超过15万条关于这个主题的消息,尽管它应该只包含一个非常小的数字。”在一个主题中,你没有必须收集的消息的概念。如果没有人订阅某个主题,则会删除发送到该主题的消息。“消息排队”仍将增加1。

        3
  •  1
  •   Claudio    16 年前
        4
  •  -1
  •   CEJ    11 年前

    • 基于我的分析和尝试添加此。。它需要更多的上下文,所以我在下面为那些不想经历ActiveMQ谷歌之争的人添加了它。。
    • 这就是我的想法 activemq.xml 我根本看不到这项工作,所以我希望我能从其他人那里得到一些见解,他们可能会给我指出正确的方向。

      <destinationPolicy>
        <policyMap>
          <policyEntries>
            <policyEntry topic=">"
                         topicPrefetch="10"/>
            <policyEntry topic=">"
                         producerFlowControl="false"/>
            <policyEntry topic=".>"
                         >
              <messageEvictionStrategy>
                <oldestMessageEvictionStrategy/>
              </messageEvictionStrategy>
      
              <pendingMessageLimitStrategy>
                <constantPendingMessageLimitStrategy limit="10"/>
              </pendingMessageLimitStrategy>
            </policyEntry>
          </policyEntries>
        </policyMap>
      </destinationPolicy>