代码之家  ›  专栏  ›  技术社区  ›  davidalayachew

为什么我的文件有竞争条件,即使我使用了StandardOpenOption。同步?

  •  2
  • davidalayachew  · 技术社区  · 3 月前

    我有一个CSV文件太大,无法放入RAM,而且只能勉强放在硬盘上。我需要将此CSV文件拆分为多个部分。

    我正在使用 BufferedReader.lines() 流式传输文件,现在我想使用 Collector 把零件写出来。我明白这可能并不理想,但说实话,手头的任务并不是重点。

    让我困惑的是 StandardOpenOption.SYNC 。这是我的密码。

    import java.nio.file.*;
    import java.util.function.*;
    import java.util.stream.*;
    import java.util.*;
    
    import static java.nio.file.StandardOpenOption.*;
    
    public class temp2
    {
    
       public static final Path parentFolder = 
          Path
             .of(".")
             .toAbsolutePath()
             .normalize()
             ;
    
       public static void main(final String[] args) throws Exception
       {
       
          System.out.println(parentFolder);
          
          for (int i = 0; i < 10; i++)
          {
          
             Files.deleteIfExists(parentFolder.resolve(String.valueOf(i)));
          
          }
       
          final int LIMIT = 100_000;
          final HashMap<String, List<String>> blahs =
             IntStream
                .range(0, 100_000_000)
                .parallel()
                .mapToObj(String::valueOf)
                .collect
                (
                   Collector
                      .of
                      (
                         HashMap::new,
                         (map, s) ->
                         {
                         
                            final String key = String.valueOf(s.charAt(0));
                            final List<String> list = map.compute(key, (k, v) -> v == null ? new ArrayList<>() : v);
                            list.add(s);
                            
                            if (list.size() > LIMIT)
                            {
                            
                               writeThenClearList(key, list);
                            
                            }
                         
                         },
                         (HashMap<String, List<String>> oldMap, HashMap<String, List<String>> newMap) ->
                         {
                         
                            System.out.println(oldMap.values().stream().flatMap(Collection::stream).count());
                            System.out.println(newMap.values().stream().flatMap(Collection::stream).count());
                            System.out.println("---");
                         
                            oldMap.forEach(temp2::writeThenClearList);
                            newMap.forEach(temp2::writeThenClearList);
                            return new HashMap<>();
                         },
                         (map) ->
                         {
                         
                            map.forEach(temp2::writeThenClearList);
                            
                            return map;
                         
                         }
                      )
                )
                ;
          blahs.entrySet().forEach(System.out::println);
       
       }
       
       private static void writeThenClearList(final String key, final List<String> list)
       {
       
          if (list.isEmpty())
          {
          
             return;
          
          }
       
          try {
             Files
                .write(
                   parentFolder.resolve(key),
                   list,
                   StandardOpenOption.CREATE,
                   StandardOpenOption.WRITE,
                   StandardOpenOption.APPEND,
                   StandardOpenOption.SYNC
                );
          } catch (final Exception e) {
             throw new RuntimeException(e);
          }
       
          list.clear();
       
       }
    
    }
    

    写起来感觉很简单——它只是生成0到1亿之间的所有数字,然后根据起始数字将它们分组到文件中。因此,0进入te 0文件,1进入1文件,10进入1文件、20进入2文件、300进入3文件,以此类推。

    我也特别注意使用 标准开放选项。同步 ,以确保我的写入同步进行。

    因此,为了分析我的结果,我编写并运行了以下代码。我用过 jshell ,但我也得到了与正常文件相同的结果。

    IntStream
        .range(0, 10)
        .mapToObj(String::valueOf)
        .map(Path::of)
        .map
        (
            path -> 
            { 
                try 
                { 
                    return Files.lines(path); 
                } 
    
                catch (final Exception e) 
                { 
                    throw new RuntimeException(e); 
                } 
            }
        )
        .map
        (
            stream -> 
                stream
                    .filter(s -> !s.equals(""))
                    .mapToLong(Long::parseLong)
                    .summaryStatistics()
        )
        .forEach(System.out::println)
        ;
    
    

    所有这些都是打印出来的 LongSummaryStatistics 对于10个文件中的每一个。

    这是我的输出。

    LongSummaryStatistics{count=1, sum=0, min=0, average=0.000000, max=0}
    LongSummaryStatistics{count=11110722, sum=671067271105847657, min=0, average=60398169543.423700, max=1996215711700017}
    LongSummaryStatistics{count=11110807, sum=936118810008566710, min=0, average=84252998905.351040, max=2999558127899730}
    LongSummaryStatistics{count=11110948, sum=726950006539566180, min=0, average=65426461049.009155, max=3699881930345059}
    LongSummaryStatistics{count=11110761, sum=1580564487107336657, min=0, average=142255286303.731720, max=4980250345042342}
    LongSummaryStatistics{count=11110842, sum=1291096623566986753, min=0, average=116201510521.613650, max=5990245950494212}
    LongSummaryStatistics{count=11110683, sum=2140046523919170444, min=0, average=192611608478.000000, max=6999483760545061}
    LongSummaryStatistics{count=11110881, sum=1629411286034487818, min=0, average=146650052865.698760, max=7993937378575107}
    LongSummaryStatistics{count=11110718, sum=3892896980864594155, min=0, average=350373124478.957500, max=8993173987418912}
    LongSummaryStatistics{count=11110795, sum=2930254808993867970, min=0, average=263730435940.350620, max=9996168394101800}
    
    
    num前缀 计数 总和 min 平均的 最大值
    0 1. 0 0 0 0
    1. 11110722 671067271105847657 0 60398169543.423700 1996215711700017
    2. 11110807 936118810008566710 0 84252998905.351040 2999558127899730
    3. 11110948 726950006539566180 0 65426461049.009155 3699881930345059
    4. 11110761 1580564487107336657 0 142255286303.731720 4980250345042342
    5. 11110842 1291096623566986753 0 116201510521.613650 5990245950494212
    6. 11110683 2140046523919170444 0 192611608478.000000 6999483760545061
    7. 11110881 1629411286034487818 0 146650052865.698760 7993937378575107
    8. 11110718 3892896980864594155 0 350373124478.957500 8993173987418912
    9 11110795 2930254808993867970 0 263730435940.350620 9996168394101800

    现在,我立刻想到的是每列的最大值。这太高了。一些人报告的数字达到了千万亿。此外,每个人都有0分。而且,这些计数都不一样。它们应该是一样的,对吧?

    好吧,我们有比赛条件。然而,我不明白的是,为什么 标准开放选项。同步 没有保护我。这不是它的工作吗?

    如果这不是它的工作,那么这个选择对我到底有什么作用呢?

    1 回复  |  直到 3 月前
        1
  •  7
  •   jtahlborn    3 月前

    我没有完全分析你的代码,但我很确定你误解了什么 StandardOpenOption.SYNC 意味着。它与多线程无关。这与写入文件系统时数据的持久性有关。

    现代文件操作对你撒谎。你说“把这个写到磁盘上”,系统说“当然,完成了”。然而,实际上,数据被卡在某个缓冲区中,稍后将写入物理磁盘(因为写入文件系统相对较慢)。也就是说,数据将在稍后的某个时间点“异步”写入磁盘。

    设置 SYNC 文件写入选项表示 真正地 希望在执行写入操作时将数据写入物理磁盘。所以,你说“把这个写到磁盘上”,系统会说“当然,完成了”,你会说“不,真的,我会等”,系统说“很好……好的,真的完成了”。(实际上,这较慢,这就是为什么它不是默认值),即在写入操作期间,数据“同步”写入磁盘。

    如果您在让多个线程写入同一文件时依赖此来保护您,那么您将无法获得所需的结果。你可以使用类似的东西 FileLock 要执行实际的文件锁定,或者您可以在java本身中使用某种同步机制(我的建议)。

    推荐文章
    twenty7  ·  按阵列列表分组
    9 年前