代码之家  ›  专栏  ›  技术社区  ›  Karussell

Apache Ignite队列比LinkedBlockingQueue慢得多

  •  0
  • Karussell  · 技术社区  · 7 年前

    我试图在Ignite中复制一个简单的生产者-消费者场景:

    public class QueueExample {
        public static void main(String[] args) {
            new QueueExample().start();
        }
    
        private void start() {
            final AtomicBoolean finishedTest1 = new AtomicBoolean(false);
            final BlockingQueue<Double> queue = new LinkedBlockingQueue<>(5);
            final CountDownLatch latch = new CountDownLatch(2);
            final int MAX = 1000;
    
            new Thread(() -> {
                System.out.println("test1 before latch");
                latch.countDown();
                try {
                    // wait until other runnable is able to poll
                    latch.await(20, TimeUnit.SECONDS);
                } catch (Exception ex) {
                    throw new RuntimeException(ex);
                }
                System.out.println(new Date().getTime() + " start test1");
                double test = 2;
                Random r = new Random();
                StopWatch sw = new StopWatch();
                sw.start();
                for (int i = 0; i < MAX; i++) {
                    try {
                        queue.put(r.nextDouble());
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                sw.stop();
                finishedTest1.set(true);
                //LoggerFactory.getLogger(getClass()).info
                System.out.println(new Date().getTime() + " end test1. " + test + ", took:" + sw.getTime() / 1000f);
            }).start();
    
            new Thread(() -> {
                System.out.println("test2 before latch");
                latch.countDown();
                try {
                    // wait until other runnable is able to poll
                    latch.await(10, TimeUnit.SECONDS);
                } catch (Exception ex) {
                    throw new RuntimeException(ex);
                }
                System.out.println(new Date().getTime() + " start test2");
                StopWatch sw = new StopWatch();
                sw.start();
                int counter = 0;
                try {
                    for (int i = 0; i < MAX ; i++) {
                        Double res = queue.poll(1, TimeUnit.SECONDS);
                        counter++;
                    }
                } catch (InterruptedException e) {
                    // expected
                }
                sw.stop();
    
                //LoggerFactory.getLogger(getClass()).info
                System.out.println(new Date().getTime() + " end test2. counter " + counter + ", finished:" + finishedTest1.get() + ", took:" + sw.getTime() / 1000f);
            }).start();
        }
    }
    

    为什么这比下面的点火代码快100倍(0.02秒vs<2秒)?

    public class MyIgnite {
        public static void main(String[] args) {
            new MyIgnite().start();
        }
    
        private void start() {
            IgniteConfiguration icfg = new IgniteConfiguration();
            icfg.setIgniteInstanceName("test1");
            Ignite ignite1 = Ignition.start(icfg);
    
            final CountDownLatch latch = new CountDownLatch(2);
    
            final int queueSize = 5;
            CollectionConfiguration queueCfg = new CollectionConfiguration();
    
            ignite1.compute().runAsync(new IgniteRunnable() {
    
                @IgniteInstanceResource
                Ignite ignite;
    
                @Override
                public void run() {
                    IgniteQueue<Double> queue = ignite.queue("test", queueSize, queueCfg);
                    System.out.println("test1 fetched queue");
                    latch.countDown();
                    try {
                        // wait until other runnable is able to poll
                        latch.await(20, TimeUnit.SECONDS);
                    } catch (Exception ex) {
                        throw new RuntimeException(ex);
                    }
                    System.out.println("start test1");
                    double test = 2;
                    Random r = new Random();
                    StopWatch sw = new StopWatch();
                    sw.start();
                    for (int i = 0; i < 1000; i++) {
                        queue.put(r.nextDouble());
                    }
                    sw.stop();
                    //LoggerFactory.getLogger(getClass()).info
                    System.out.println("end test1. " + test + " at ignite " + ignite.name() + ", took:" + sw.getTime() / 1000f);
                }
            });
    
            System.out.println("starting test2");
            icfg = new IgniteConfiguration();
            icfg.setIgniteInstanceName("test2");
            Ignite ignite2 = Ignition.start(icfg);
            ignite2.compute().runAsync(new IgniteRunnable() {
                @IgniteInstanceResource
                Ignite ignite;
    
                @Override
                public void run() {
                    IgniteQueue<Double> queue = ignite.queue("test", queueSize, queueCfg);
                    System.out.println("test2 fetched queue");
                    latch.countDown();
                    try {
                        // wait until other runnable is able to poll
                        latch.await(10, TimeUnit.SECONDS);
                    } catch (Exception ex) {
                        throw new RuntimeException(ex);
                    }
                    System.out.println("start test2");
                    StopWatch sw = new StopWatch();
                    sw.start();
                    int counter = 0;
                    try {
                        for (int i = 0; i < 1000; i++) {
                            Double res = queue.poll(5, TimeUnit.SECONDS);
                            counter++;
                        }
    
                    } catch (IgniteException exc) {
                        System.out.println("Somehow cannot poll. " + exc);
                    }
                    sw.stop();
                    //LoggerFactory.getLogger(getClass()).info
                    System.out.println("end test2. counter " + counter + " at ignite " + ignite.name() + ", took:" + sw.getTime() / 1000f);
                }
            });
    
            System.out.println("oldest node: " + ignite1.cluster().forOldest().hostNames());
            System.out.println("nodes: " + ignite1.cluster().nodes().size());
    
            // does it really gracefully shut the nodes down?
    //        Ignition.stop(ignite1.name(), false);
    //        Ignition.stop(ignite2.name(), false);
        }
    }
    

    1 回复  |  直到 7 年前
        1
  •  1
  •   Stanislav Lukyanov    7 年前

    你把航空母舰比作玩具船。

    LinkedBlockingQueue

    IgniteQueue