代码之家  ›  专栏  ›  技术社区  ›  Valya

rxJava有序(按键)任务执行

  •  0
  • Valya  · 技术社区  · 7 年前

    我有一堆表示一些数据的对象。这些对象可以写入相应的文件。用户可能会要求比以前写入文件的更改更快地进行某些更改。

    我如何在rxJava中做到这一点?

    还有一点。在另一个地方,我希望使用最新的更改运行操作。一种选择是等待所有任务完成。

    是否有合适的RxJava原语/方法有望涵盖这两个用例?

    我不熟悉RxJava,但我希望这是有意义的。 Subjects

    我已经有了使用自定义的实现 Executor .

    public class OrderingExecutor
    implements Executor
    {
        @Delegate
        private final Executor delegate;
        private final Map<Object, Queue<Runnable>> keyedTasks = new HashMap<>();
    
        public OrderingExecutor(
            Executor delegate)
        {
            this.delegate = delegate;
        }
    
        public void execute(
            Runnable task,
            Object key)
        {
            Objects.requireNonNull(key);
    
            boolean first;
            Runnable wrappedTask;
            synchronized (keyedTasks)
            {
                Queue<Runnable> dependencyQueue = keyedTasks.get(key);
                first = (dependencyQueue == null);
                if (dependencyQueue == null)
                {
                    dependencyQueue = new LinkedList<>();
                    keyedTasks.put(key, dependencyQueue);
                }
    
                wrappedTask = wrap(task, dependencyQueue, key);
                if (!first)
                {
                    dependencyQueue.add(wrappedTask);
                }
            }
    
            // execute method can block, call it outside synchronize block
            if (first)
            {
                delegate.execute(wrappedTask);
            }
    
        }
    
        private Runnable wrap(
            Runnable task,
            Queue<Runnable> dependencyQueue,
            Object key)
        {
            return new OrderedTask(task, dependencyQueue, key);
        }
    
        class OrderedTask
        implements Runnable
        {
    
            private final Queue<Runnable> dependencyQueue;
            private final Runnable task;
            private final Object key;
    
            public OrderedTask(
                Runnable task,
                Queue<Runnable> dependencyQueue,
                Object key)
            {
                this.task = task;
                this.dependencyQueue = dependencyQueue;
                this.key = key;
            }
    
            @Override
            public void run()
            {
                try
                {
                    task.run();
                }
                finally
                {
                    Runnable nextTask = null;
                    synchronized (keyedTasks)
                    {
                        if (dependencyQueue.isEmpty())
                        {
                            keyedTasks.remove(key);
                        }
                        else
                        {
                            nextTask = dependencyQueue.poll();
                        }
                    }
                    if (nextTask != null)
                    {
                        delegate.execute(nextTask);
                    }
                }
            }
        }
    }
    

    也许是将其插入rxJava的合理方法?

    1 回复  |  直到 7 年前
        1
  •  0
  •   akarnokd    7 年前

    现在还不完全清楚您试图在这里实现什么,但是您可以在上面分层优先级队列 RxJava的顶部。

    class OrderedTask implements Comparable<OrderedTask> { ... }
    
    PriorityBlockingQueue<OrderedTask> queue = new PriorityBlockingQueue<>();
    
    PublishSubject<Integer> trigger = PublishSubject.create();
    
    trigger.flatMap(v -> {
       OrderedTask t = queue.poll();
       return someAPI.workWith(t);
    }, 1)
    .subscribe(result -> { }, error -> { });
    
    queue.offer(new SomeOrderedTask(1));
    trigger.onNext(1);
    
    queue.offer(new SomeOrderedTask(2));
    trigger.onNext(2);