代码之家  ›  专栏  ›  技术社区  ›  Karthikeyan

使用Java Executors服务的ElasticSearch索引文档

  •  0
  • Karthikeyan  · 技术社区  · 6 年前

    我试着索引超过 100k 使用Java执行器的文档提供服务,以便更快地索引。

    10万 加上索引中的文件路径 documents_qa d:\drive . 通过使用文件路径,am读取实际文件并转换为base64,am使用另一个索引中的base64内容重新索引 document_attachment_qa .

    public class DocumentIndex {
    
        private final static String INDEX = "documents_qa";
        private final static String TYPE = "doc";
    
         public static void main(String[] args) throws IOException {
                ExecutorService executor = Executors.newFixedThreadPool(5);
                List<String> filePathList = new ArrayList<String>();
    
                Document doc=new Document();
    
                logger.info("Started Indexing the Document.....");
    
                //Fetching Id, FilePath & FileName from Document Index. 
                SearchRequest searchRequest = new SearchRequest(INDEX); 
                searchRequest.types(TYPE);
                final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(60L)); //part of Scroll API
    
                searchRequest.scroll(scroll); //part of Scroll API
                SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
                QueryBuilder qb = QueryBuilders.matchAllQuery();
    
                searchSourceBuilder.query(qb);
                searchRequest.source(searchSourceBuilder);
    
                SearchResponse searchResponse = SearchEngineClient.getInstance3().search(searchRequest);
                String scrollId = searchResponse.getScrollId(); //part of Scroll API
                SearchHit[] searchHits = searchResponse.getHits().getHits();
                long totalHits=searchResponse.getHits().totalHits;
    
                //part of Scroll API -- Starts
                while (searchHits != null && searchHits.length > 0) { 
                    SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); 
                    scrollRequest.scroll(scroll);
                    searchResponse = SearchEngineClient.getInstance3().searchScroll(scrollRequest);
                    scrollId = searchResponse.getScrollId();
                    searchHits = searchResponse.getHits().getHits();
    
                    Map<String, Object> jsonMap ;
                    for (SearchHit hit : searchHits) {
                        Map<String, Object> sourceAsMap = hit.getSourceAsMap();
                        if(sourceAsMap != null) {  
                            doc.setId((int) sourceAsMap.get("id"));
                            doc.setApp_language(String.valueOf(sourceAsMap.get("app_language")));
                            doc.setFilename(String.valueOf(sourceAsMap.get("filename")));
                            doc.setPath(String.valueOf(sourceAsMap.get("path")));
                        }
                        if(doc.getPath()!= null && doc.getFilename() != null) {
                        filePathList.add(doc.getPath().concat(doc.getFilename()));
                        }
    
                    }
    
                        for (int i = 0; i < filePathList.size(); i++) {
                            Runnable worker = new WorkerThread(doc);
                            executor.execute(worker);
                          }
    
                            }
                executor.shutdown();
                while (!executor.isTerminated()) {
                }
                System.out.println("Finished all threads");
                }
    
    } 
    

    请查找工作线程:

    public class WorkerThread implements Runnable {
    
         private String command;
         private Document doc;
         private final static String ATTACHMENT = "document_attachment_qa";
         private static final Logger logger = Logger.getLogger(Thread.currentThread().getStackTrace()[0].getClassName());
         Map<String, Object> jsonMap ;
         List<String> filePathList = new ArrayList<String>();
    
            public WorkerThread(Document doc){
                this.doc=doc;
            }
    
    
            @Override
            public void run() {
    
                    File all_files_path = new File("d:\\All_Files_Path.txt");
                    File available_files = new File("d:\\Available_Files.txt");
    
                    int totalFilePath=1;
                    int totalAvailableFile=1;
                    String encodedfile = null;
                    File file=null;
    
                    if(doc.getPath()!= null && doc.getFilename() != null) {
                        filePathList.add(doc.getPath().concat(doc.getFilename()));
                        }
    
                        PrintWriter out=null;
                        try{
                            out = new PrintWriter(new FileOutputStream(all_files_path, true));
                            for(int i=0;i<filePathList.size();i++) {
                            out.println("FilePath Count ---"+totalFilePath+":::::::ID---> "+doc.getId()+"File Path --->"+filePathList.get(i));
                            }
                        } catch (FileNotFoundException e) {
                            e.printStackTrace();
                        }
                        finally {
                            out.close();
                        }
    
                        for(int i=0;i<filePathList.size();i++) {
                            file = new File(filePathList.get(i));
                            if(file.exists() && !file.isDirectory()) {
    
                                    try {
                                          try(PrintWriter out1 = new PrintWriter(new FileOutputStream(available_files, true))  ){
                                                out1.println("Available File Count --->"+totalAvailableFile+":::::::ID---> "+doc.getId()+"File Path --->"+filePathList.get(i));
                                                totalAvailableFile++;
                                            }
                                        FileInputStream fileInputStreamReader = new FileInputStream(file);
                                        byte[] bytes = new byte[(int) file.length()];
                                        fileInputStreamReader.read(bytes);
                                        encodedfile = new String(Base64.getEncoder().encodeToString(bytes));
                                        fileInputStreamReader.close();
                                    } catch (FileNotFoundException e) {
                                        e.printStackTrace();
                                    } catch (IOException e) {
                                        e.printStackTrace();
                                    }
    
                                    jsonMap = new HashMap<String, Object>();
                                    jsonMap.put("id", doc.getId());
                                    jsonMap.put("app_language", doc.getApp_language());
                                    jsonMap.put("fileContent", encodedfile);
                                    System.out.println(Thread.currentThread().getName()+" End.");
    
                                    String id=Long.toString(doc.getId());
    
                                    IndexRequest request = new IndexRequest(ATTACHMENT, "doc", id )
                                            .source(jsonMap)
                                            .setPipeline(ATTACHMENT);
    
                                    try {
                                        IndexResponse response = SearchEngineClient.getInstance3().index(request); 
    
                                    } catch(ElasticsearchException e) {
                                        if (e.status() == RestStatus.CONFLICT) {
                                        }
                                        e.printStackTrace();
                                    } catch (IOException e) {
                                            e.printStackTrace();
                                        }
                            }
    
                        processCommand();
    
               }
            }
    
            private void processCommand() {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            @Override
            public String toString(){
                return this.command;
            }
    
    }
    

    0 回复  |  直到 6 年前