代码之家  ›  专栏  ›  技术社区  ›  SSV

如何在Java中并行处理对象列表

  •  5
  • SSV  · 技术社区  · 7 年前

    我有一个Java中的对象列表,就像列表中的1000个对象一样,我正在迭代每个对象的列表并进行进一步的处理。对每个对象进行相同的Haping处理。这种顺序方法需要花费大量时间进行处理,因此,我想用Java实现并行处理。我检查了Java中的executor框架,但我被卡住了。

    我想到了一种实现我的需求的方法。

    我想实现一些固定数量的最小对象,这些对象将由每个线程处理,以便每个线程能够快速地完成其工作和处理对象。我怎样才能做到这一点?或者,如果有任何其他方法可以实现我的要求,请分享。

    例如:

    List objects=new List();

    For(对象对象:对象){ //为所有人执行一些常见操作 对象

    }

    3 回复  |  直到 7 年前
        1
  •  8
  •   xingbin    7 年前

    您可以使用 ThreadPoolExecutor ,它将负责负载平衡。任务将分布在不同的线程上。

    以下是一个示例:

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Test {
    
        public static void main(String[] args) {
    
            // Fixed thread number
            ExecutorService service = Executors.newFixedThreadPool(10);
    
            // Or un fixed thread number
            // The number of threads will increase with tasks
            // ExecutorService service = Executors.newCachedThreadPool(10);
    
            List<Object> objects = new ArrayList<>();
            for (Object o : objects) {
                service.execute(new MyTask(o));
            }
    
            // shutdown
            // this will get blocked until all task finish
            service.shutdown();
            try {
                service.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public static class MyTask implements Runnable {
            Object target;
    
            public MyTask(Object target) {
                this.target = target;
            }
    
            @Override
            public void run() {
                // business logic at here
            }
        }
    }
    
        2
  •  6
  •   ernest_k Petronella    7 年前

    并行处理列表有许多选项:

    使用平行流 :

    objects.stream().parallel().forEach(object -> {
        //Your work on each object goes here, using object
    })
    

    使用executor服务 如果要使用线程数大于fork-join池的池,请提交任务:

    ExecutorService es = Executors.newFixedThreadPool(10);
    for(Object o: objects) {
        es.submit(() -> {
            //code here using Object o...
        }
    }
    

    前面的示例本质上与传统的executor服务相同,它在不同的线程上运行任务。

    除此之外,您还可以使用completable future提交:

    //You can also just run a for-each and manually add each
    //feature to a list
    List<CompletableFuture<Void>> futures = 
        objects.stream().map(object -> CompletableFuture.runAsync(() -> {
        //Your work on each object goes here, using object
    })
    

    然后可以使用 futures 对象来检查每次执行的状态(如果需要)。

        3
  •  2
  •   jithin    6 年前

    将列表拆分为多个子列表并使用多线程处理 每个子列表都是平行的。

    public class ParallelProcessListElements {
        public void processList (int numberofthreads,List<Object>tempList, 
                Object obj, Method method){
    
                    final int sizeofList=tempList.size();
                    final int sizeofsublist = sizeofList/numberofthreads;
                    List<Thread> threadlist = new ArrayList<Thread>();
    
                    for(int i=0;i<numberofthreads;i++) {
                        int firstindex = i*sizeofsublist;
                        int lastindex = i*sizeofsublist+sizeofsublist;
                        if(i==numberofthreads-1)
                            lastindex=sizeofList;
    
                        List<Object> subList=tempList.subList(firstindex,lastindex );
    
                        Thread th = new Thread(()->{
                                    try{method.invoke(obj, subList);}catch(Exception e) {e.printStackTrace();}
                                });
    
                        threadlist.add(th);
                    }
    
                    threadlist.forEach(th->{th.start();try{Thread.sleep(10);}catch(Exception e) {}});
        }
    
    }
    
    
    public class Demo {
        public static void main(String[] args) {
    
            List<Object> tempList= new ArrayList<Object>();
            /**
             * Adding values to list... For Demo purpose..
             */
            for(int i=0;i<500;i++)
                tempList.add(i);
    
            ParallelProcessListElements process = new ParallelProcessListElements();
            final int numberofthreads = 5;
            Object obj = new Demo();
            Method method=null;
    
            try{ method=Demo.class.getMethod("printList", List.class);}catch(Exception e) {}
            /**
             * Method Call...
             */
            process.processList(numberofthreads,tempList,obj,method);
        }
    
        public void printList(List<Integer>list) {
            /**
             * Business logic to process the list...
             */
            list.forEach(item->{
                try{Thread.sleep(1000);}catch(Exception e) {}
                System.out.println(item);
                });
        }
    }