代码之家  ›  专栏  ›  技术社区  ›  omerkudat

一种合并队列条目的好方法

  •  3
  • omerkudat  · 技术社区  · 16 年前

    我有一个线程池,这些线程是从队列馈送任务的。通常,少数线程能够保持队列为空。偶尔,一个特别大的突发事件会使队列大小在一段时间内保持在零以上,但不会持续太久。

    我关心的是重复的事件,或者携带过时以前事件的数据。在大容量的时候,这样的事件可以在短时间内在队列中共存。我希望能够把这些合并起来,这样我就可以减少浪费在工作上的时间。

    把这样一个队列混为一谈的好方法是什么?我可以在插入时将其合并,从后面迭代到头部,然后搜索要替换的候选者,但是这似乎太暴力了。如果您有代码或库建议,请记住,我正在使用Java。

    3 回复  |  直到 8 年前
        1
  •  5
  •   ng.    16 年前

    为什么不基于任务实现hashcode()和equals()。然后只需删除任务。例如。

    queue.remove(task);
    queue.offer(task);
    

    那你就没有复制品了。或者。

    if(!queue.contains(task)) {
       queue.offer(task);
    }
    

    如果任务已经在队列中,这将避免将其排队。

        2
  •  2
  •   Dan Breslau    16 年前

    如果你使用 LinkedHashMap ,可以保留条目添加到队列的顺序。

    当匹配条目到达时,我收集到您想要将它的一些数据附加到原始队列条目中。在这种情况下,您可以就地更新哈希对象,或者使用 HashMap.put(key, value) 用新对象替换排队的项。(i 认为 这保留了原始项目的顺序,但我没有对此进行测试。)

    请注意,您的代码需要显式同步对 链接地图 以及里面的数据。您不希望在另一个线程正在获取队列中的项目进行处理的同时更新该项目。最简单的同步方法可能是访问 链接地图 通过 Collections.synchronizedMap() .

        3
  •  2
  •   Mahesh    8 年前

    这个合并者似乎在做你想要做的事情: https://github.com/GuillaumeArnaud/conflator

    根据您的需求,如果合并队列中存在某个事件,则可以将实现更改为合并或用现有事件替换最新事件。

    例如,对于以下内容,每个事件都被实现为一个“勾号”,它定义合并行为。

    public class Tick implements Message<Tick> {
    
        private final String ticker;
    
        public long getInitialQuantity() {
            return initialQuantity;
        }
    
        private final long initialQuantity;
    
        public long getCurrentQuantity() {
            return currentQuantity;
        }
    
        private long currentQuantity;
        private int numberOfMerges;
    
        public String getTicker() {
            return ticker;
        }
    
        public Tick(String ticker, long quantity) {
            this.ticker = ticker;
            this.initialQuantity = quantity;
            this.currentQuantity = quantity;
        }
    
        @Override
        public String key() {
            return this.ticker;
        }
    
        @Override
        public String body() {
            return String.valueOf(currentQuantity);
        }
    
        @Override
        public boolean isMerged() {
            return this.initialQuantity != this.currentQuantity;
        }
    
        @Override
        public int mergesCount() {
            return numberOfMerges;
        }
    
        @Override
        public boolean isValid() {
            return false;
        }
    
        @Override
        public boolean merge(Tick message) {
            if (this.equals(message)) {
                this.currentQuantity += message.currentQuantity;
                numberOfMerges++;
                return true;
            }
            return false;
        }
    
        @Override
        public int hashCode() {
            return ticker.hashCode();
        }
    
        @Override
        public boolean equals(Object obj) {
            if (obj != null && obj instanceof Tick) {
                Tick other = (Tick) obj;
                return this.ticker.equals(other.getTicker());
            }
            return false;
        }
    

    测试用例:

    public class TickMergeTest {
        MultiValuedMapConflator conflator;
    
        @Test
        public void two_unmergeable_ticks_should_be_remain_unmergeable() {
            Tick tick1 = new Tick("GOOG", 100L);
            Tick tick2 = new Tick("AAPL", 120L);
    
            List<Tick> messages = conflator.merge(Lists.newArrayList(tick1, tick2));
    
            assertNotNull(messages);
            assertEquals(messages.size(), 2);
            assertEquals(Long.valueOf(messages.get(0).body()).longValue(), tick1.getCurrentQuantity());
            assertEquals(Long.valueOf(messages.get(1).body()).longValue(), tick2.getCurrentQuantity());
        }
    
        @Test(timeout = 1000)
        public void two_mergeable_ticks_should_be_merged() {
            Tick tick1 = new Tick("GOOG", 100L);
            Tick tick2 = new Tick("GOOG", 120L);
    
            List<Tick> messages = conflator.merge(Lists.newArrayList(tick1, tick2));
    
            assertNotNull(messages);
            assertEquals(messages.size(), 1);
            assertEquals(Long.valueOf(messages.get(0).body()).longValue(), tick1.getInitialQuantity() + tick2.getInitialQuantity());
        }
    
        @Test(timeout = 1000)
        public void should_merge_messages_on_same_key() throws InterruptedException {
            // given
            conflator.put(new Tick("GOOG", 100L));
            conflator.put(new Tick("GOOG", 120L));
    
            // test
            Thread.sleep(300); // waiting the conflation
            Message message = conflator.take();
    
            // check
            assertNotNull(message);
            assertEquals(Long.valueOf(message.body()).longValue(), 220L);
            assertTrue(message.isMerged());
        }
    
        @Test(timeout = 1000)
        public void should_not_merge_messages_on_diff_key() throws InterruptedException {
            // given
            conflator.put(new Tick("GOOG", 100L));
            conflator.put(new Tick("AAPL", 120L));
    
            // test
            Thread.sleep(300); // waiting the conflation
            Message message1 = conflator.take();
            Message message2 = conflator.take();
    
            // check
            assertNotNull(message1);
            assertNotNull(message2);
    
            assertEquals(Long.valueOf(message1.body()).longValue(), 100L);
            assertFalse(message1.isMerged());
    
            assertEquals(Long.valueOf(message2.body()).longValue(), 120L);
            assertFalse(message2.isMerged());
    
        }
        @Before
        public void setUp() {
            conflator = new MultiValuedMapConflator<Tick>(true);
        }
    
        @After
        public void tearDown() {
            conflator.stop();
        }
    }