代码之家  ›  专栏  ›  技术社区  ›  Jörn Horstmann

以可关闭资源作为累加器的Java收集器

  •  4
  • Jörn Horstmann  · 技术社区  · 7 年前

    假设我试图创建一个收集器,将数据聚合到一个资源中,该资源在使用后必须关闭。有什么方法可以实现类似于 finally a中的块 Collector ?在成功的情况下,可以在 finisher 方法,但在异常情况下似乎没有调用任何方法。

    目标是以干净的方式实现如下操作,而不必首先将流收集到内存列表中。

    stream.collect(groupingBy(this::extractFileName, collectToFile()));
    
    2 回复  |  直到 7 年前
        1
  •  1
  •   fps    7 年前

    我认为您满足要求的唯一方法是向 Stream.onClose 方法假设您有以下类:

    class CloseHandler implements Runnable {
        List<Runnable> children = new ArrayList<>();
    
        void add(Runnable ch) { children.add(ch); }
    
        @Override
        public void run() { children.forEach(Runnable::run); }
    }
    

    现在,您需要按如下方式使用流:

    CloseHandler closeAll = new CloseHandler();
    try (Stream<Something> stream = list.stream().onClose(closeAll)) {
        // Now collect
        stream.collect(Collectors.groupingBy(
            this::extractFileName, 
            toFile(closeAll)));
    }
    

    这使用 try-with-resources 构造,以便在使用或发生错误时自动关闭流。请注意,我们正在通过 closeAll 靠近 流动一旦关闭 方法

    下面是下游收集器的草图,它将收集/写入/发送元素到 Closeable 资源(请注意,我们还传递 全部关闭 关闭处理程序):

    static Collector<Something, ?, Void> toFile(CloseHandler closeAll) {
    
        class Acc {
    
            SomeResource resource; // this is your closeable resource
    
            Acc() {
                try {
                    resource = new SomeResource(...); // create closeable resource
                    closeAll.add(this::close);        // this::close is a Runnable
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }
    
            void add(Something elem) {
                try {
                    // TODO write/send to closeable resource here
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }
    
            Acc merge(Acc another) {
                // TODO left as an exercise
            }
    
            // This is the close handler for this particular closeable resource
            private void close() {
                try {
                    // Here we close our closeable resource
                    if (resource != null) resource.close();
                } catch (IOException ignored) {
                }
            }
        }
        return Collector.of(Acc::new, Acc::add, Acc::merge, a -> null);
    }
    

    因此,它使用一个本地类(名为 Acc )包装可关闭的资源,并将方法声明为 add 流的一个元素到可关闭的资源,也到 merge Acc公司 实例,以防流是平行的(留作练习,以防值得付出努力)。

    Collector.of 用于根据 Acc公司 类的方法,具有返回 null ,因为我们不想在 Collectors.groupingBy

    最后,还有 close 方法,该方法在已创建包装的可关闭资源的情况下关闭该资源。

    当流通过 尝试使用资源 构造,即 CloseHandler.run 方法将自动执行,这将依次执行之前添加的所有子关闭处理程序 Acc公司 实例已创建。

        2
  •  0
  •   HRgiger forhas    7 年前

    好的,我已经看过了 Collectors 实施,您需要 CollectorImpl 创建自定义收集器,但它不是公共的。因此,我使用其副本实现了新的方法(您可能会感兴趣的最后2种方法):

    public class CollectorUtils<T, A, R> implements Collector<T, A, R> {
    
        static final Set<Collector.Characteristics> CH_ID = Collections
                .unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH));
    
        private final Supplier<A> supplier;
        private final BiConsumer<A, T> accumulator;
        private final BinaryOperator<A> combiner;
        private final Function<A, R> finisher;
        private final Set<Characteristics> characteristics;
    
        CollectorUtils(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner,
                Function<A, R> finisher, Set<Characteristics> characteristics) {
            this.supplier = supplier;
            this.accumulator = accumulator;
            this.combiner = combiner;
            this.finisher = finisher;
            this.characteristics = characteristics;
        }
    
        CollectorUtils(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner,
                Set<Characteristics> characteristics) {
            this(supplier, accumulator, combiner, castingIdentity(), characteristics);
        }
    
        @Override
        public BiConsumer<A, T> accumulator() {
            return accumulator;
        }
    
        @Override
        public Supplier<A> supplier() {
            return supplier;
        }
    
        @Override
        public BinaryOperator<A> combiner() {
            return combiner;
        }
    
        @Override
        public Function<A, R> finisher() {
            return finisher;
        }
    
        @Override
        public Set<Characteristics> characteristics() {
            return characteristics;
        }
    
        @SuppressWarnings("unchecked")
        private static <I, R> Function<I, R> castingIdentity() {
            return i -> (R) i;
        }
    
        public static <C extends Collection<File>> Collector<String, ?, C> toFile() {
            return new CollectorUtils<>((Supplier<List<File>>) ArrayList::new, (c, t) -> {
                c.add(toFile(t));
            }, (r1, r2) -> {
                r1.addAll(r2);
                return r1;
            }, CH_ID);
        }
    
        private static File toFile(String fileName) {
            try (Closeable type = () -> System.out.println("Complete! closing file " + fileName);) {
                // stuff
                System.out.println("Converting " + fileName);
    
                return new File(fileName);
            } catch (FileNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            throw new RuntimeException("Failed to create file");
    
        }
    
    }
    

    然后我调用流,如下所示:

    public static void main(String[] args) {
            Stream.of("x.txt", "y.txt","z.txt").collect(CollectorUtils.toFile());
        }
    

    输出:

    Convertingx.txt
    closing filex.txt
    Convertingy.txt
    closing filey.txt
    Convertingz.txt
    closing filez.txt
    
    推荐文章