代码之家  ›  专栏  ›  技术社区  ›  Raffaele Rossi

Java并行流填充数组

  •  -2
  • Raffaele Rossi  · 技术社区  · 6 年前

    我有一个非常大的hashmap,它充满了素数。

    var mapA = new HashMap<Integer, Long>();
    

    var res = new ArrayList<Integer();
    
    mapA.entrySet()
            .parallelStream()
            .forEach( x -> {
    
                var values = mapA.entrySet()
                                        .parallelStream()
                                        .filter( /*conditions*/ )
                                        .map(y -> y.getKey())
                                        .toArray();                 
    
                Arrays.stream(values)
                          .parallel()
                          .sorted()
                          .forEach(val -> {
    
                               synchronized (this) {
                                    res.add(x.getKey());
                                    res.add((Integer) val);
                               }
    
                          });
    
    
            });
    

    正如你所看到的,有 res 这是一个超出流范围的数组。我需要循环是并行的,否则计算可能需要几分钟。需要这个吗?

    .forEach(val -> {
    
        synchronized (this) {
            res.add(x.getKey());
            res.add((Integer) val);
        }
    
    });
    

    synchronized 因为由于流是并行运行的,所以我不希望在2个或更多线程正在添加数据的情况下出现争用条件 物件 同时

    非常感谢。


    import java.util.*;
    import java.util.stream.Collectors;
    import java.util.stream.IntStream;
    
    public class DiffieHellman {
    
        private static final int LIMIT = 65536;
    
        private final long p;
        private final long g;
    
        public DiffieHellman(long p, long g) {
            this.p = p;
            this.g = g;
        }
    
        public List<Integer> tryBruteForce(long publicA, long publicB) {
            List<Integer> res = new ArrayList<Integer>();
    
            var mapA = new HashMap<Integer, Long>(
                    IntStream
                            .rangeClosed(0, LIMIT)
                            .parallel()
                            .boxed()
                            .collect(
                                    Collectors.toMap(x -> x, x -> DiffieHellmanUtils.modPow(publicB, x, p))
                            )
            );
    
            var mapB = new HashMap<Integer, Long>(
                    IntStream
                            .rangeClosed(0, LIMIT)
                            .parallel()
                            .boxed()
                            .collect(
                                    Collectors.toMap(x -> x, x -> DiffieHellmanUtils.modPow(publicB, x, p))
                            )
            );
    
            mapA.entrySet()
                        .parallelStream()
                        .forEach( x -> {
    
                            var values = mapB.entrySet()
                                            .parallelStream()
                                            .filter( y -> y.getValue().equals(x.getValue()))
                                            .map(Map.Entry::getKey)
                                            .toArray(Integer[]::new);
    
                            Arrays.stream(values)
                                    .parallel()
                                    .sorted()
                                    .forEach(val -> {
                                            res.add(x.getKey());
                                            res.add((Integer) val);
                                    });
    
    
                        });
    
            return res;
        }
    
    }
    
    1 回复  |  直到 6 年前
        1
  •  1
  •   Grzegorz Piwowarek    6 年前

    相反,您可以通过习惯性地使用流API以稍微不同的方式来解决这个问题。

    首先,嵌套操作可以在单个流管道中完成:

    mapB.entrySet()
                .parallelStream()
                .filter(y -> y.getValue().equals(x.getValue()))
                .map(y -> y.getKey())
                .sorted()
                .forEach(val -> {
    
                    synchronized (this) {
                        res.add(x.getKey());
                        res.add((Integer) val);
                    }
                });
    

    其次,为了避免并发问题,最简单的方法是放弃命令式方法并利用流API的声明性。

    for-each 然后 add

    这里要做的是通过替换 mapA 具有自定义序列的entrySet():

    List<Integer> res = mapA.entrySet()
          .parallelStream()
          .flatMap(x -> mapB.entrySet().stream()
             .filter(y -> y.getValue().equals(x.getValue()))
             .map(Map.Entry::getKey)
             .sorted()
             .flatMap(v -> Stream.of(x.getKey(), v)))
          .collect(Collectors.toList());
    

    嵌套 parallelStream 可以省略,因为 flatMap sequential() 反正是在小溪上。

    推荐文章