代码之家  ›  专栏  ›  技术社区  ›  D. Müller

从Java客户端运行LoadIncrementalHFiles

  •  0
  • D. Müller  · 技术社区  · 7 年前

    hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /user/myuser/map_data/hfiles mytable 方法。

    org.apache.hadoop.hbase.io.hfile.CorruptHFileException: Problem reading HFile Trailer from file webhdfs://myserver.de:50070/user/myuser/map_data/hfiles/b/b22db8e263b74a7dbd8e36f9ccf16508
        at org.apache.hadoop.hbase.io.hfile.HFile.pickReaderVersion(HFile.java:477)
        at org.apache.hadoop.hbase.io.hfile.HFile.createReader(HFile.java:520)
        at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.groupOrSplit(LoadIncrementalHFiles.java:632)
        at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles$3.call(LoadIncrementalHFiles.java:549)
        at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles$3.call(LoadIncrementalHFiles.java:546)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
    Caused by: java.lang.RuntimeException: native snappy library not available: this version of libhadoop was built without snappy support.
        at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:65)
        at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:193)
        at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:178)
        at org.apache.hadoop.hbase.io.compress.Compression$Algorithm.getDecompressor(Compression.java:327)
        at org.apache.hadoop.hbase.io.compress.Compression.decompress(Compression.java:422)
        at org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext.prepareDecoding(HFileBlockDefaultDecodingContext.java:90)
        at org.apache.hadoop.hbase.io.hfile.HFileBlock.unpack(HFileBlock.java:529)
        at org.apache.hadoop.hbase.io.hfile.HFileBlock$AbstractFSReader$1.nextBlock(HFileBlock.java:1350)
        at org.apache.hadoop.hbase.io.hfile.HFileBlock$AbstractFSReader$1.nextBlockWithBlockType(HFileBlock.java:1356)
        at org.apache.hadoop.hbase.io.hfile.HFileReaderV2.<init>(HFileReaderV2.java:149)
        at org.apache.hadoop.hbase.io.hfile.HFileReaderV3.<init>(HFileReaderV3.java:77)
        at org.apache.hadoop.hbase.io.hfile.HFile.pickReaderVersion(HFile.java:467)
        ... 8 more
    

    运行 hbase ... 上面的命令在我的Hadoop服务器上的控制台上运行得很好。但是,当我尝试使用HBase/Hadoop客户端库从Java代码中运行这些时,它失败了,出现了一个异常!

    下面是一段代码:

    public static void main(String[] args) {
    
        try {
            Configuration conf = loginFromKeyTab("REALM.DE", "server.de", "user", "C:/user.keytab");
            conf.set("fs.webhdfs.impl", org.apache.hadoop.hdfs.web.WebHdfsFileSystem.class.getName());
            conf.set("hbase.zookeeper.quorum", "server1.de,server2.de,server3.de");
            conf.set("zookeeper.znode.parent", "/hbase-secure");
            conf.set("hbase.master.kerberos.principal", "hbase/_HOST@REALM.DE");
            conf.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@REALM.DE");
            conf.set("hbase.security.authentication", "kerberos");
    
            Connection connection = ConnectionFactory.createConnection(conf);
            Table table = connection.getTable(TableName.valueOf("mytable"));
    
            RegionLocator locator = connection.getRegionLocator(table.getName());
    
            Job job = Job.getInstance(conf, "Test Bulk Load"); 
    
            //HFileOutputFormat2.configureIncrementalLoad(job, table, locator);     
            //Configuration conf2 = job.getConfiguration();
    
            LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
            loader.doBulkLoad(new Path(HDFS_PATH), connection.getAdmin(), table, locator);
        } catch(Exception e) {
            e.printStackTrace();
        }
    }
    

    我需要向我的项目添加依赖项吗?但是如何/在哪里/哪个版本?

    1 回复  |  直到 7 年前
        1
  •  1
  •   D. Müller    7 年前

    我为我的问题找到了另一个解决方案:我正在使用一个运行 Process 调用 LoadIncrementalHFiles 加载增量文件 类本身在我的代码中!

    下面是我的解决方案的代码片段:

    TreeSet<String> subDirs = getHFileDirectories(new Path(HDFS_OUTPUT_PATH), conf);        // The HDFS_OUTPUT_PATH directory contains many HFile sub-directories
    
    for(String hFileDir : subDirs) {
        String pathToReadFrom = HDFS_OUTPUT_PATH + "/" + hFileDir;
    
        String[] execCode = {"hbase", "org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles", "-Dcreate.table=no", pathToReadFrom, "mytable"};       // Important: Separate each parameter here!!!
        ProcessBuilder pb = new ProcessBuilder(execCode);
        pb.redirectErrorStream(true);
        final Process p = pb.start();
    
        new Thread(new Runnable() {
            public void run() {
                BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
                String line = null; 
    
                try {
                    while ((line = input.readLine()) != null)
                        System.out.println(line);
                } catch (IOException e) {
                        e.printStackTrace();
                 }
            }
        }).start();
    
        p.waitFor();
    
        int exitCode = p.exitValue();
        System.out.println(" ==> Exit Code: " + exitCode);
    }
    
    System.out.println("Finished");
    

    如果有人有其他解决方案(例如,如何使用 加载增量文件