代码之家  ›  专栏  ›  技术社区  ›  Pragalathan M

如何使用独立服务器集群配置HornetQ客户端(使用JGroups TCP配置)

  •  0
  • Pragalathan M  · 技术社区  · 10 年前

    我已经使用组(tcp)在集群模式下配置了2个hornetq独立服务器,因为我不能使用默认UDP。以下是配置。

    hornetq-configuration.xml:

    <broadcast-groups>
        <broadcast-group name="bg-group1">
            <jgroups-file>jgroups-tcp.xml</jgroups-file>
            <jgroups-channel>hornetq_broadcast_channel</jgroups-channel>
            <connector-ref>netty</connector-ref>
        </broadcast-group>
    </broadcast-groups>
    
    <discovery-groups>
        <discovery-group name="dg-group1">
            <jgroups-file>jgroups-tcp.xml</jgroups-file>
            <jgroups-channel>hornetq_broadcast_channel</jgroups-channel>
            <refresh-timeout>10000</refresh-timeout>
        </discovery-group>
    </discovery-groups>
    

    Jgroups.xml:

    <config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns="urn:org:jgroups"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd">
    <TCP bind_port="7800"
         recv_buf_size="${tcp.recv_buf_size:5M}"
         send_buf_size="${tcp.send_buf_size:5M}"
         max_bundle_size="64K"
         max_bundle_timeout="30"
         use_send_queues="true"
         sock_conn_timeout="300"
    
         timer_type="new3"
         timer.min_threads="4"
         timer.max_threads="10"
         timer.keep_alive_time="3000"
         timer.queue_max_size="500"
    
         thread_pool.enabled="true"
         thread_pool.min_threads="2"
         thread_pool.max_threads="8"
         thread_pool.keep_alive_time="5000"
         thread_pool.queue_enabled="true"
         thread_pool.queue_max_size="10000"
         thread_pool.rejection_policy="discard"
    
         oob_thread_pool.enabled="true"
         oob_thread_pool.min_threads="1"
         oob_thread_pool.max_threads="8"
         oob_thread_pool.keep_alive_time="5000"
         oob_thread_pool.queue_enabled="false"
         oob_thread_pool.queue_max_size="100"
         oob_thread_pool.rejection_policy="discard"/>
    
    <TCPPING
             initial_hosts="${jgroups.tcpping.initial_hosts:hornetq-server1-ip[7800], hornetq-server1-ip[7900], hornetq-server2-ip[7800], hornetq-server2-ip[7900]}"
             port_range="1"/>
    <MERGE3  min_interval="10000"
             max_interval="30000"/>
    <FD_SOCK/>
    <FD timeout="3000" max_tries="3" />
    <VERIFY_SUSPECT timeout="1500"  />
    <BARRIER />
    <pbcast.NAKACK2 use_mcast_xmit="false"
                   discard_delivered_msgs="true"/>
    <UNICAST3 />
    <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
                   max_bytes="4M"/>
    <pbcast.GMS print_local_addr="true" join_timeout="2000"
                view_bundling="true"/>
    <MFC max_credits="2M"
         min_threshold="0.4"/>
    <FRAG2 frag_size="60K"  />
    <!--RSVP resend_interval="2000" timeout="10000"/-->
    <pbcast.STATE_TRANSFER/>
    

    服务器工作正常,即,如果实时停机,备份将取而代之。

    客户生产商:

    TransportConfiguration[] servers = new TransportConfiguration[2];
    List<Configuration> configurations = ... // user defined class
    for (int i = 0; i < configurations.size(); i++) {
        Map<String, Object> map = new HashMap<>();
        map.put("host", configurations.get(i).getHost());
        map.put("port", configurations.get(i).getPort());
        servers[i] = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);
    }
    ServerLocator locator = HornetQClient.createServerLocatorWithHA(servers);
    locator.setReconnectAttempts(5);
    
    factory = locator.createSessionFactory();
    session = factory.createSession();
    producer = session.createProducer(queueName); 
    

    客户消费者:

    ClientSessionFactory factory = locator.createSessionFactory();
    for (int i = 1; i <= nReceivers; i++) {
        ClientSession session = factory.createSession(true, true, 1);
        sessions.add(session);
        if (i == 1) {
            Thread.sleep(10000); // waiting to download cluster information
        }
        session.start();
        ClientConsumer consumer = session.createConsumer(queueName);
        consumer.setMessageHandler(handler);
    }
    

    问题:

    1. 如果连接到的服务器在发送消息时发生故障,客户端(生产者)不会自动后退。
    2. 使用同一客户端工厂创建的会话始终连接到一个服务器(与文档相反 http://docs.jboss.org/hornetq/2.3.0.beta1/docs/user-manual/html/clusters.html#clusters.client.loadbalancing )

    因此,客户端似乎从未获得集群信息。我也找不到任何关于配置客户端使用jgroups(需要吗?)连接到hornetq集群的文档。

    感谢任何帮助。

    1 回复  |  直到 10 年前
        1
  •  0
  •   Pragalathan M    10 年前

    我发现我也可以在客户端使用jgroups。 可以找到详细的解决方案 here