我试着索引超过
100k
使用Java执行器的文档提供服务,以便更快地索引。
10万
加上索引中的文件路径
documents_qa
d:\drive
. 通过使用文件路径,am读取实际文件并转换为base64,am使用另一个索引中的base64内容重新索引
document_attachment_qa
.
public class DocumentIndex {
private final static String INDEX = "documents_qa";
private final static String TYPE = "doc";
public static void main(String[] args) throws IOException {
ExecutorService executor = Executors.newFixedThreadPool(5);
List<String> filePathList = new ArrayList<String>();
Document doc=new Document();
logger.info("Started Indexing the Document.....");
SearchRequest searchRequest = new SearchRequest(INDEX);
searchRequest.types(TYPE);
final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(60L));
searchRequest.scroll(scroll);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
QueryBuilder qb = QueryBuilders.matchAllQuery();
searchSourceBuilder.query(qb);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = SearchEngineClient.getInstance3().search(searchRequest);
String scrollId = searchResponse.getScrollId();
SearchHit[] searchHits = searchResponse.getHits().getHits();
long totalHits=searchResponse.getHits().totalHits;
while (searchHits != null && searchHits.length > 0) {
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
scrollRequest.scroll(scroll);
searchResponse = SearchEngineClient.getInstance3().searchScroll(scrollRequest);
scrollId = searchResponse.getScrollId();
searchHits = searchResponse.getHits().getHits();
Map<String, Object> jsonMap ;
for (SearchHit hit : searchHits) {
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
if(sourceAsMap != null) {
doc.setId((int) sourceAsMap.get("id"));
doc.setApp_language(String.valueOf(sourceAsMap.get("app_language")));
doc.setFilename(String.valueOf(sourceAsMap.get("filename")));
doc.setPath(String.valueOf(sourceAsMap.get("path")));
}
if(doc.getPath()!= null && doc.getFilename() != null) {
filePathList.add(doc.getPath().concat(doc.getFilename()));
}
}
for (int i = 0; i < filePathList.size(); i++) {
Runnable worker = new WorkerThread(doc);
executor.execute(worker);
}
}
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
}
请查找工作线程:
public class WorkerThread implements Runnable {
private String command;
private Document doc;
private final static String ATTACHMENT = "document_attachment_qa";
private static final Logger logger = Logger.getLogger(Thread.currentThread().getStackTrace()[0].getClassName());
Map<String, Object> jsonMap ;
List<String> filePathList = new ArrayList<String>();
public WorkerThread(Document doc){
this.doc=doc;
}
@Override
public void run() {
File all_files_path = new File("d:\\All_Files_Path.txt");
File available_files = new File("d:\\Available_Files.txt");
int totalFilePath=1;
int totalAvailableFile=1;
String encodedfile = null;
File file=null;
if(doc.getPath()!= null && doc.getFilename() != null) {
filePathList.add(doc.getPath().concat(doc.getFilename()));
}
PrintWriter out=null;
try{
out = new PrintWriter(new FileOutputStream(all_files_path, true));
for(int i=0;i<filePathList.size();i++) {
out.println("FilePath Count ---"+totalFilePath+":::::::ID---> "+doc.getId()+"File Path --->"+filePathList.get(i));
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}
finally {
out.close();
}
for(int i=0;i<filePathList.size();i++) {
file = new File(filePathList.get(i));
if(file.exists() && !file.isDirectory()) {
try {
try(PrintWriter out1 = new PrintWriter(new FileOutputStream(available_files, true)) ){
out1.println("Available File Count --->"+totalAvailableFile+":::::::ID---> "+doc.getId()+"File Path --->"+filePathList.get(i));
totalAvailableFile++;
}
FileInputStream fileInputStreamReader = new FileInputStream(file);
byte[] bytes = new byte[(int) file.length()];
fileInputStreamReader.read(bytes);
encodedfile = new String(Base64.getEncoder().encodeToString(bytes));
fileInputStreamReader.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
jsonMap = new HashMap<String, Object>();
jsonMap.put("id", doc.getId());
jsonMap.put("app_language", doc.getApp_language());
jsonMap.put("fileContent", encodedfile);
System.out.println(Thread.currentThread().getName()+" End.");
String id=Long.toString(doc.getId());
IndexRequest request = new IndexRequest(ATTACHMENT, "doc", id )
.source(jsonMap)
.setPipeline(ATTACHMENT);
try {
IndexResponse response = SearchEngineClient.getInstance3().index(request);
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
}
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
processCommand();
}
}
private void processCommand() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString(){
return this.command;
}
}