代码之家  ›  专栏  ›  技术社区  ›  Karthikeyan

elasticsearch-resthighlevelclient with scrollapi抛出'suppressed:org.apache.http.contenttoolongception:实体内容太长`

  •  0
  • Karthikeyan  · 技术社区  · 7 年前

    正在获取 100k 使用resthighlevelclient和scroll api实现该方法的文档将出现以下错误。

    Suppressed: org.apache.http.ContentTooLongException: entity content is too long

    请查找以下错误 RestHighLevelClient

    D:\Karthikeyan\ElasticSearch\ElasticSearch_Tesing\target>java -jar ElasticSearch
    Utility-1.0.0-SNAPSHOT-jar-with-dependencies.jar
    Jul 13, 2018 3:11:59 PM com.es.utility.DocumentIndex main
    INFO: Started Indexing the Document.....
    ERROR StatusLogger No log4j2 configuration file found. Using default configurati
    on: logging only errors to the console. Set system property 'log4j2.debug' to sh
    ow Log4j2 internal initialization logging.
    Exception in thread "main" java.net.ConnectException: Connection refused: no fur
    ther information
            at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
            at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
            at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEv
    ent(DefaultConnectingIOReactor.java:171)
            at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEv
    ents(DefaultConnectingIOReactor.java:145)
            at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute
    (AbstractMultiworkerIOReactor.java:348)
            at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.exe
    cute(PoolingNHttpClientConnectionManager.java:192)
            at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(Cl
    oseableHttpAsyncClientBase.java:64)
            at java.lang.Thread.run(Unknown Source)
            Suppressed: org.apache.http.ContentTooLongException: entity content is t
    oo long [223328895] for the configured buffer limit [104857600]
                    at org.elasticsearch.client.HeapBufferedAsyncResponseConsumer.on
    EntityEnclosed(HeapBufferedAsyncResponseConsumer.java:76)
                    at org.apache.http.nio.protocol.AbstractAsyncResponseConsumer.re    final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L)); //part of Scroll API
        searchRequest.scroll(scroll); //part of Scroll API
    sponseReceived(AbstractAsyncResponseConsumer.java:131)
                    at org.apache.http.impl.nio.client.MainClientExec.responseReceiv
    ed(MainClientExec.java:315)
                    at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerI
    mpl.responseReceived(DefaultClientExchangeHandlerImpl.java:147)
                    at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.respons
    eReceived(HttpAsyncRequestExecutor.java:303)
                    at org.apache.http.impl.nio.DefaultNHttpClientConnection.consume
    Input(DefaultNHttpClientConnection.java:255)
                    at org.apache.http.impl.nio.client.InternalIODispatch.onInputRea
    dy(InternalIODispatch.java:81)
                    at org.apache.http.impl.nio.client.InternalIODispatch.onInputRea
    dy(InternalIODispatch.java:39)
                    at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputRead
    y(AbstractIODispatch.java:114)
                    at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseI
    OReactor.java:162)
                    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEve
    nt(AbstractIOReactor.java:337)
                    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEve
    nts(AbstractIOReactor.java:315)
                    at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(Ab
    stractIOReactor.java:276)
                    at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIO
    Reactor.java:104)
                    at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor
    $Worker.run(AbstractMultiworkerIOReactor.java:588)
                    ... 1 more
    

    所以,我试着用 RestClient 而不是 restHighLevelClient公司 通过设置缓冲区大小 static long BUFFER_SIZE = 500 * 1024 * 1024;

    但是使用 restclient公司 不知道怎么用 ScrollAPI 是的。因为,我得去 10万 索引中的文档。

    请找到我的 restHighLevelClient公司 代码

    public class DocumentIndexRestHighLevelClient {
    
        private final static String INDEX = "documents";  
        private final static String ATTACHMENT = "document_attachment"; 
        private final static String TYPE = "doc";
        private static final Logger logger = Logger.getLogger(Thread.currentThread().getStackTrace()[0].getClassName());
    
        public static void main(String args[]) throws IOException {
    
    
            RestHighLevelClient restHighLevelClient = null;
            Document doc=new Document();
    
            logger.info("Started Indexing the Document.....");
    
            try {
                restHighLevelClient = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200, "http"),
                        new HttpHost("localhost", 9201, "http")));
            } catch (Exception e) {
                System.out.println(e.getMessage());
            }
    
    
            //Fetching Id, FilePath & FileName from Document Index. 
            SearchRequest searchRequest = new SearchRequest(INDEX); 
            searchRequest.types(TYPE);
            final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L)); //part of Scroll API
            searchRequest.scroll(scroll); //part of Scroll API
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            QueryBuilder qb = QueryBuilders.matchAllQuery();
            searchSourceBuilder.query(qb);
            searchSourceBuilder.size(120000); 
            searchRequest.source(searchSourceBuilder);
    
            SearchResponse searchResponse = restHighLevelClient.search(searchRequest);
            String scrollId = searchResponse.getScrollId(); //part of Scroll API
            SearchHit[] searchHits = searchResponse.getHits().getHits();
            long totalHits=searchResponse.getHits().totalHits;
            logger.info("Total Hits --->"+totalHits);
    
            //part of Scroll API -- Starts
            while (searchHits != null && searchHits.length > 0) { 
                SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); 
                scrollRequest.scroll(scroll);
                searchResponse = restHighLevelClient.searchScroll(scrollRequest);
                scrollId = searchResponse.getScrollId();
                searchHits = searchResponse.getHits().getHits();
    
                File all_files_path = new File("d:\\All_Files_Path.txt");
                File available_files = new File("d:\\Available_Files.txt");
                File missing_files = new File("d:\\Missing_Files.txt");
    
                int totalFilePath=1;
                int totalAvailableFile=1;
                int missingFilecount=1;
    
                Map<String, Object> jsonMap ;
                for (SearchHit hit : searchHits) {
    
                    String encodedfile = null;
                    File file=null;
    
                    Map<String, Object> sourceAsMap = hit.getSourceAsMap();
    
    
                    if(sourceAsMap != null) {  
                        doc.setId((int) sourceAsMap.get("id"));
                        doc.setApp_language(String.valueOf(sourceAsMap.get("app_language")));
    
                    }
    
                    String filepath=doc.getPath().concat(doc.getFilename());
    
                    logger.info("ID---> "+doc.getId()+"File Path --->"+filepath);
    
    
                    try(PrintWriter out = new PrintWriter(new FileOutputStream(all_files_path, true))  ){
                        out.println("FilePath Count ---"+totalFilePath+":::::::ID---> "+doc.getId()+"File Path --->"+filepath);
                    }
    
                    file = new File(filepath);
                    if(file.exists() && !file.isDirectory()) {
                        try {
                              try(PrintWriter out = new PrintWriter(new FileOutputStream(available_files, true))  ){
                                    out.println("Available File Count --->"+totalAvailableFile+":::::::ID---> "+doc.getId()+"File Path --->"+filepath);
                                    totalAvailableFile++;
                                }
                            FileInputStream fileInputStreamReader = new FileInputStream(file);
                            byte[] bytes = new byte[(int) file.length()];
                            fileInputStreamReader.read(bytes);
                            encodedfile = new String(Base64.getEncoder().encodeToString(bytes));
                            fileInputStreamReader.close();
                        } catch (FileNotFoundException e) {
                            e.printStackTrace();
                        }
                    }
                    else
                    {
                        System.out.println("Else block");
                        PrintWriter out = new PrintWriter(new FileOutputStream(missing_files, true));
                        out.println("Available File Count --->"+missingFilecount+":::::::ID---> "+doc.getId()+"File Path --->"+filepath);
                        out.close();
                        missingFilecount++;
                    }
    
                    jsonMap = new HashMap<>();
                    jsonMap.put("id", doc.getId());
                    jsonMap.put("app_language", doc.getApp_language());
                    jsonMap.put("fileContent", encodedfile);
    
                    String id=Long.toString(doc.getId());
    
                    IndexRequest request = new IndexRequest(ATTACHMENT, "doc", id )
                            .source(jsonMap)
                            .setPipeline(ATTACHMENT);
    
                    PrintStream printStream = new PrintStream(new File("d:\\exception.txt"));
                    try {
                        IndexResponse response = restHighLevelClient.index(request);
    
                    } catch(ElasticsearchException e) {
                        if (e.status() == RestStatus.CONFLICT) {
                        }
                        e.printStackTrace(printStream);
                    }
    
                    totalFilePath++;
                }
            }
    
            ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); 
            clearScrollRequest.addScrollId(scrollId);
            ClearScrollResponse clearScrollResponse = restHighLevelClient.clearScroll(clearScrollRequest);
            boolean succeeded = clearScrollResponse.isSucceeded();
            ////part of Scroll API -- Ends
            logger.info("Indexing done.....");
        }
    
    }
    
    1 回复  |  直到 7 年前
        1
  •  1
  •   sramalingam24    7 年前

    size参数应该设置为每个滚动条要返回多少文档,而不是全部文档。从大约100开始,慢慢增加,直到没有更多的性能增益。