代码之家  ›  专栏  ›  技术社区  ›  akarnokd

交错并行文件读取比顺序读取慢?

  •  10
  • akarnokd  · 技术社区  · 16 年前

    我已经实现了一个小的IO类,它可以从不同磁盘上的多个和相同的文件中读取(例如,包含相同文件的两个硬盘)。在连续的情况下,两个磁盘在文件上的平均读取速度均为60MB/s,但是当我执行交错操作(例如,4K磁盘1,4K磁盘2然后合并)时,有效读取速度将降低到40MB/s而不是增加?

    上下文:Win 7+JDK 7B70,2GB RAM,2.2GB测试文件。基本上,我尝试模仿Win7的ReadyBoost和RaidX,以一个穷人的方式。

    在心脏中,当向类发出read()时,它会创建两个运行文件,其中包含从特定位置和长度读取预先打开的randomaccessfile的指令。使用执行器服务和future.get()调用,当两者都完成时,数据读取被复制到一个公共缓冲区并返回给调用方。

    我的方法有概念错误吗?(例如,操作系统缓存机制将始终抵消?)

    protected <T> List<T> waitForAll(List<Future<T>> futures) 
    throws MultiIOException {
        MultiIOException mex = null;
        int i = 0;
        List<T> result = new ArrayList<T>(futures.size());
        for (Future<T> f : futures) {
            try {
                result.add(f.get());
            } catch (InterruptedException ex) {
                if (mex == null) {
                    mex = new MultiIOException();
                }
                mex.exceptions.add(new ExceptionPair(metrics[i].file, ex));
            } catch (ExecutionException ex) {
                if (mex == null) {
                    mex = new MultiIOException();
                }
                mex.exceptions.add(new ExceptionPair(metrics[i].file, ex));
            }
            i++;
        }
        if (mex != null) {
            throw mex;
        }
        return result;
    }
    
    public int read(long position, byte[] output, int start, int length) 
    throws IOException {
        if (start < 0 || start + length > output.length) {
            throw new IndexOutOfBoundsException(
            String.format("start=%d, length=%d, output=%d", 
            start, length, output.length));
        }
        // compute the fragment sizes and positions
        int result = 0;
        final long[] positions = new long[metrics.length];
        final int[] lengths = new int[metrics.length];
        double speedSum = 0.0;
        double maxValue = 0.0;
        int maxIndex = 0;
        for (int i = 0; i < metrics.length; i++) {
            speedSum += metrics[i].readSpeed;
            if (metrics[i].readSpeed > maxValue) {
                maxValue = metrics[i].readSpeed;
                maxIndex = i;
            }
        }
        // adjust read lengths
        int lengthSum = length;
        for (int i = 0; i < metrics.length; i++) {
            int len = (int)Math.ceil(length * metrics[i].readSpeed / speedSum);
            lengths[i] = (len > lengthSum) ? lengthSum : len;
            lengthSum -= lengths[i];
        }
        if (lengthSum > 0) {
            lengths[maxIndex] += lengthSum;
        }
        // adjust read positions
        long positionDelta = position;
        for (int i = 0; i < metrics.length; i++) {
            positions[i] = positionDelta;
            positionDelta += (long)lengths[i]; 
        }        
        List<Future<byte[]>> futures = new LinkedList<Future<byte[]>>();
        // read in parallel
        for (int i = 0; i < metrics.length; i++) {
            final int j = i;
            futures.add(exec.submit(new Callable<byte[]>() {
                @Override
                public byte[] call() throws Exception {
                    byte[] buffer = new byte[lengths[j]];
                    long t = System.nanoTime();
                    long t0 = t;
    
                    long currPos = metrics[j].handle.getFilePointer();
                    metrics[j].handle.seek(positions[j]);
                    t = System.nanoTime() - t;
                    metrics[j].seekTime = t * 1024.0 * 1024.0 / 
                        Math.abs(currPos - positions[j]) / 1E9 ;
    
                    int c = metrics[j].handle.read(buffer);
                    t0 = System.nanoTime() - t0;
                    // adjust the read speed if we read something
                    if (c > 0) {
                        metrics[j].readSpeed = (alpha * c * 1E9 / t0 / 1024 / 1024
                        + (1 - alpha) * metrics[j].readSpeed) ;
                    }
                    if (c < 0) {
                        return null;
                    } else
                    if (c == 0) {
                        return EMPTY_BYTE_ARRAY;
                    } else
                    if (c < buffer.length) {
                        return Arrays.copyOf(buffer, c);
                    }
                    return buffer;
                }
            }));
        }
        List<byte[]> data = waitForAll(futures);
        boolean eof = true;
        for (byte[] b : data) {
            if (b != null && b.length > 0) {
                System.arraycopy(b, 0, output, start + result, b.length);
                result += b.length;
                eof = false;
            } else {
                break; // the rest probably reached EOF
            }
        }
        // if there was no data at all, we reached the end of file
        if (eof) {
            return -1;
        }
        sequentialPosition = position + (long)result;
    
        // evaluate the fastest file to read
        double maxSpeed = 0;
        maxIndex = 0;
        for (int i = 0; i < metrics.length; i++) {
            if (metrics[i].readSpeed > maxSpeed) {
                maxSpeed = metrics[i].readSpeed;
                maxIndex = i;
            }
        }
        fastest = metrics[maxIndex];
        return result;
    }
    

    (metrics数组中的filemetrics包含读取速度的测量值,以自适应地确定各种输入通道的缓冲区大小-在我的测试中,alpha=0,read speed=1结果相等分布)

    编辑 我运行了一个非纠缠测试(例如,在单独的线程中独立地读取两个文件),我的总有效速度为110MB/s。

    编辑2 我想我知道为什么会这样。

    当我以并行和顺序读取时,它不是磁盘的顺序读取,而是由于交错(并且可能会被分配表查找所困扰)而导致的读取-跳过-读取-跳过模式。这基本上将每个磁盘的有效读取速度降低到一半或更糟。

    4 回复  |  直到 14 年前
        1
  •  3
  •   Jay Conrod    16 年前

    正如您所说,磁盘上的顺序读取比读取-跳过-读取-跳过模式快得多。硬盘在按顺序读取时能够提供高带宽,但查找时间(延迟)很昂贵。

    不要将文件的副本存储在每个磁盘中,而是尝试将文件的块I存储在磁盘I(mod 2)上。这样,您就可以按顺序从两个磁盘中读取数据,并在内存中重新组合结果。

        2
  •  2
  •   hhawk    16 年前

    如果要进行并行读取,请将读取分为两次连续读取。找到中间点,从第一个文件读取前半部分,从第二个文件读取后半部分。

        3
  •  0
  •   David Rabinowitz    16 年前

    如果您确定每个磁盘执行的读取次数不超过一次(否则会有许多磁盘未命中),则仍然会在计算机的其他部分(总线、RAID控制器(如果存在)等)上创建争用。

        4
  •  0
  •   Jan    14 年前

    也许吧 http://stxxl.sourceforge.net/ 可能对你也有兴趣。