代码之家  ›  专栏  ›  技术社区  ›  learner

使用可重入锁的生产者消费者不工作

  •  1
  • learner  · 技术社区  · 7 年前

    我试图使用 ReentrantLock 如下所示:

    class Producer implements Runnable {
    
        private List<String> data;
        private ReentrantLock lock;
    
        Producer(List<String> data,ReentrantLock lock)
        {
            this.data = data;
            this.lock = lock;
        }
    
        @Override
        public void run() {
            int counter = 0;
            synchronized (lock)
            {
                while (true)
                {
    
                    if ( data.size() < 5)
                    {
                        counter++;
                        data.add("writing:: "+counter);
                    }
                    else
                    {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
    
        }
    }
    
    class Consumer implements Runnable{
    
        private List<String> data;
        private ReentrantLock lock;
    
        Consumer(List<String> data,ReentrantLock lock)
        {
            this.data = data;
            this.lock = lock;
        }
    
        @Override
        public void run() {
            int counter = 0;
            synchronized (lock)
            {
                while (true)
                {
    
                    if ( data.size() > 0)
                    {
                        System.out.println("reading:: "+data.get(data.size()-1));
                        data.remove(data.size()-1);
                    }
                    else
                    {
                        System.out.println("Notifying..");
                        lock.notify();
                    }
                }
            }
    
        }
    }
    
    public class ProducerConsumer  {
    
        public static void main(String[] args) {
            List<String> str = new LinkedList<>();
            ReentrantLock lock= new ReentrantLock();
            Thread t1 = new Thread(new Producer(str,lock));
            Thread t2 = new Thread(new Consumer(str,lock));
            t1.start();
            t2.start();
        }
    }
    

    因此,它只向列表中写入一次,然后消费者无限期地等待。为什么会这样?为什么制作人不去拿锁呢?

    4 回复  |  直到 7 年前
        1
  •  1
  •   Andrew    7 年前

    我想指出你犯了两个错误:

    1. 错误使用 ReentrantLock 是的。

    每个对象都可以用于一个内部锁,因此不需要查找特定的 Lock 上课。从每个 synchronized 块由一个方法限定,您不需要 any enhanced means 我是说, 重入锁 这里是多余的。

    1. 错误的位置 同步的 阻碍。

    一旦你进入一个同步块,没有人可以进入那里,直到你离开它。显然,你永远不会退出,因为 while(true) 是的。

    我建议你把 重入锁 打开并执行同步 data 是的。

    @Override
    public void run() {
        int counter = 0;
        while (true) {
            synchronized (data) {
                if (data.size() < 5) {
                    data.add("writing:: " + ++counter);
                } else {
                    try {
                        data.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            // we are out of the synchornized block here
            // to let others use data as a monitor somewhere else
        }
    }
    
        2
  •  2
  •   Simeon G    7 年前

    @反义词是对的。您没有正确使用ReentrantLock,而是可以用一个对象替换它。如果您想改用ReentrantLock(它更为流行),那么我建议如下:

    package Multithreading;
    
    
    import java.util.LinkedList;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.ReentrantLock;
    
    class Producer implements Runnable{
    
      private List<String> data;
      private ReentrantLock lock;
    
      Producer(List<String> data,ReentrantLock lock)
      {
        this.data = data;
        this.lock = lock;
      }
    
      @Override
      public void run() {
        int counter = 0;
        while (true)
        {
    
    
            try {
              lock.lock();
              if ( data.size() < 5)
              {
                counter++;
                data.add("writing:: " + counter);
              }
            }finally {
              lock.unlock();
            }
    
          try {
            TimeUnit.SECONDS.sleep(1);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    
    }
    
    class Consumer implements Runnable{
    
      private List<String> data;
      private ReentrantLock lock;
    
      Consumer(List<String> data,ReentrantLock lock)
      {
        this.data = data;
        this.lock = lock;
      }
    
      @Override
      public void run() {
        int counter = 0;
        while (true)
        { 
          try {
            lock.lock();
              if ( data.size() > 0) 
              {
                System.out.println("reading:: "+data.get(data.size()-1));
                data.remove(data.size()-1);
              }
           }finally {
             lock.unlock();
           }
           try 
           {
              TimeUnit.SECONDS.sleep(1);
           } catch (InterruptedException e) {
             e.printStackTrace();
           }
          }
        }
      }
    }
    
    public class ProducerConsumer  {
    
      public static void main(String[] args) {
        List<String> str = new LinkedList<>();
        ReentrantLock lock= new ReentrantLock();
        Thread t1 = new Thread(new Producer(str,lock));
        Thread t2 = new Thread(new Consumer(str,lock));
        t1.start();
        t2.start();
      }
    }
    

    我删除了notify调用,但是如果您真的需要一次引入一个线程,只需使用lock.notify()

        3
  •  1
  •   Antoniossss    7 年前

    如你所愿 Producer 不停地创造价值,以及 Consumer 要不停地使用这些值,并最终等待由于没有值而产生的值,请在同步和锁定时跳过并使用 LinkedBlockingQueue

    在制作过程中,您只需:

    queue.put(value)
    

    而在消费者中

    value=queue.take();
    

    瞧,消费者会接受任何值,并等待值队列为空。

    至于你的代码:

    1. 你没有使用 ReentrantLock 完全。你可以换成 new Object 输出也一样。 wait notify Object 的方法。
    2. 为什么你只能创造单一的价值,就在于你的消费者。实际上你有这样的东西:

      synchronized(lock){ // aquire lock's monitor
      
      while(true){
      
        lock.notify();
       }
      
      } // release lock's monitor - never doing that, thread never leaves this block
      

    这里的问题是你的执行 从不离开同步块 是的。所以你打电话来 通知 但你不能释放 lock 退出监听 synchronized 封锁。生产商被“通知”,但为了继续执行,它必须要求 的监视器-但可能是因为消费者从未发布过。这里几乎是典型的僵局。

    你可以想象房间里有几个PPL,只有一个拿着棍子的能说话。所以第一个得到了 syncronized(stick) 是的。做它必须做的事并决定它必须 等待 为了别人所以他打电话 等待 把棍子传回来。现在第二个人可以说话,做他的工作,并决定谁给他棍子可以继续现在。他打电话来 通知 -现在他必须离开 synchronized(stick) 封锁。如果他不这样做,第一个人就不能继续——这就是你的情况。

        4
  •  1
  •   David Conrad    7 年前

    你获得并释放 ReentrantLock 具有 lock() unlock() ,而不是 synchronized .

    正如antoniosss所指出的,这是一个死锁,其中一个线程正在等待另一个线程永远不会放弃的锁(而两个线程正试图协调该锁)。以下是一个解决方案:

    import static java.util.Objects.requireNonNull;
    
    import java.util.LinkedList;
    import java.util.List;
    
    class Producer implements Runnable {
        private final List<String> data;
    
        Producer(List<String> data) {
            this.data = requireNonNull(data);
        }
    
        @Override
        public void run() {
            int counter = 0;
            while (true) {
                synchronized (data) {
                    if (data.size() < 5) {
                        counter++;
                        data.add("writing:: " + counter);
                    } else {
                        try {
                            data.wait();
                        } catch (InterruptedException e) {
                            return;
                        }
                    }
                }
            }
        }
    }
    
    class Consumer implements Runnable {
        private final List<String> data;
    
        Consumer(List<String> data) {
            this.data = requireNonNull(data);
        }
    
        @Override
        public void run() {
            while (true) {
                synchronized (data) {
                    if (data.size() > 0) {
                        System.out.println("reading:: " + data.get(data.size() - 1));
                        data.remove(data.size() - 1);
                    }
                    data.notify();
                }
            }
        }
    }
    
    public class ProducerConsumer {
        public static void main(String[] args) {
            List<String> data = new LinkedList<>();
            Thread t1 = new Thread(new Producer(data));
            Thread t2 = new Thread(new Consumer(data));
            t1.start();
            t2.start();
        }
    }
    

    我们已经去掉了lock对象并在列表中进行了同步。producer在while循环中进行同步,其效果是一旦列表中至少有5个项,producer将等待,放弃列表中的监视器。

    使用者还可以在循环内进行同步,这一点非常关键,因为在您的代码中,一旦获取了锁,它就永远不会放弃锁上的监视器。事实上,如果你先开始消费(或者很不走运),根本就不会生产任何东西。当离开一个同步块或方法时,或者当一个线程持有监视器时,监视器被释放 wait() s 在上面,但是 notify() notifyAll() 不要松开显示器。

    使用者读取最后一个项目(如果有),然后立即通知生产者并释放锁。两个注释:

    首先,还不清楚你对商品的要求是什么。你希望生产商生产5个,消费者消费5个,等等,或者你希望5个只是一个限制,太大的积压无法形成(这是好的,它被称为背压),但消费者消费项目急切时,他们有空吗?这种实现实现实现了后者。

    其次,消费者一旦发布了清单上的监视器,就会试图获取它。这是一种 忙着等待 ,然后消费者和生产者竞相获取锁,可能是消费者经常赢得这场比赛,一旦列表为空,这将变得毫无意义。打个电话给 onSpinWait 在同步块之外,但在使用者的while循环内,在java 9或更高版本中。在早期版本中, yield 可能是合适的。但是在我的测试中,没有这两个代码都可以。

    antoniosss提出了另一个建议,即使用linkedblockingqueue,但目前的代码总是使用最后一个项,而使用queue将改变这种行为。相反,我们可以使用deque(一个双端队列),将项目放在末尾,并从末尾获取它们。下面是它的样子:

    import static java.util.Objects.requireNonNull;
    
    import java.util.concurrent.BlockingDeque;
    import java.util.concurrent.LinkedBlockingDeque;
    
    class Producer implements Runnable {
        private final BlockingDeque<String> data;
    
        Producer(BlockingDeque<String> data) {
            this.data = requireNonNull(data);
        }
    
        @Override
        public void run() {
            int counter = 0;
            while (true) {
                counter++;
                try {
                    data.put("writing:: " + counter);
                } catch (InterruptedException e) {
                    break;
                }
            }
        }
    }
    
    class Consumer implements Runnable {
        private final BlockingDeque<String> data;
    
        Consumer(BlockingDeque<String> data) {
            this.data = requireNonNull(data);
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    System.out.println("reading:: " + data.takeLast());
                } catch (InterruptedException e) {
                    break;
                }
            }
        }
    }
    
    public class ProducerConsumer {
        public static void main(String[] args) {
            BlockingDeque<String> data = new LinkedBlockingDeque<>(5);
            Thread t1 = new Thread(new Producer(data));
            Thread t2 = new Thread(new Consumer(data));
            t1.start();
            t2.start();
        }
    }
    

    因为 LinkedBlockingDeque 是一个并发数据结构,我们不需要任何同步块,也不需要在这里等待或通知。我们可以试着 put takeLast 从deque,如果deque分别是满的或空的,它将阻塞。德克的生产能力是5,所以如果生产商能像原来那样遥遥领先,它就会向生产商施加反压力。

    没有什么可以阻止生产者尽可能快地生产出消费者能够消费的元素,这意味着第一个元素可能需要等待任意长的时间才能被消费。我不清楚这是否是你代码的意图。你可以通过介绍 等待() 通知() 再次,通过使用 Semaphore 或者其他方式,但我还是不说了,因为你还不清楚自己想要什么。

    最后一点说明 InterruptedException . 如果有人打电话来 interrupt() 在线程上,但唯一保留对生产者线程和使用者线程的引用的是 main() 方法,它从不打断他们。所以例外不应该发生在这里,但如果它不知何故,我只是有生产者或消费者退出。在一个更复杂的场景中,中断线程可以作为一种方式来表示它是否正在休眠或处于阻塞方法中(或者甚至在一个方法之外,如果它显式地检查它的话),但是我们这里没有使用它。