代码之家  ›  专栏  ›  技术社区  ›  Midiparse

如何将生成的Java代码转储到stdout?

  •  2
  • Midiparse  · 技术社区  · 6 年前

    在apachespark2.+上使用DataFrames,有没有办法获取底层rdd并将生成的Java代码转储到控制台?

    2 回复  |  直到 6 年前
        1
  •  2
  •   huon John U    6 年前

    这可以使用 QueryExecution.debug.codegen . 此值可通过访问Dataframe/Dataset .queryExecution (这是一个“开发人员API”,即不稳定,易被破坏,因此只能用于调试)。这适用于Spark 2.4.0,从代码上看,它应该从2.0.0(或更高版本)开始工作:

    scala> val df = spark.range(1000)
    df: org.apache.spark.sql.Dataset[Long] = [id: bigint]
    
    scala> df.queryExecution.debug.codegen
    Found 1 WholeStageCodegen subtrees.
    == Subtree 1 / 1 ==
    *(1) Range (0, 1000, step=1, splits=12)
    
    Generated code:
    /* 001 */ public Object generate(Object[] references) {
    /* 002 */   return new GeneratedIteratorForCodegenStage1(references);
    /* 003 */ }
    /* 004 */
    /* 005 */ // codegenStageId=1
    /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
    /* 007 */   private Object[] references;
    /* 008 */   private scala.collection.Iterator[] inputs;
    /* 009 */   private boolean range_initRange_0;
    /* 010 */   private long range_number_0;
    /* 011 */   private TaskContext range_taskContext_0;
    /* 012 */   private InputMetrics range_inputMetrics_0;
    /* 013 */   private long range_batchEnd_0;
    /* 014 */   private long range_numElementsTodo_0;
    /* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
    
    ...
    
    /* 104 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_nextBatchTodo_0);
    /* 105 */       range_inputMetrics_0.incRecordsRead(range_nextBatchTodo_0);
    /* 106 */
    /* 107 */       range_batchEnd_0 += range_nextBatchTodo_0 * 1L;
    /* 108 */     }
    /* 109 */   }
    /* 110 */
    /* 111 */ }
    
        2
  •  2
  •   Raphael Roth    6 年前

    下面是一种输出生成代码的方法,可能还有其他方法:

    import org.apache.spark.sql.execution.command.ExplainCommand
    
    val explain = ExplainCommand(df.queryExecution.logical, codegen=true)
    spark.sessionState.executePlan(explain).executedPlan.executeCollect().foreach {
      r => println(r.getString(0))
    }