代码之家  ›  专栏  ›  技术社区  ›  LSG

在查询Java UDF产生的数据时产生错误

  •  0
  • LSG  · 技术社区  · 7 年前

    首先,我的目标不是让您理解我的UDF代码,这样我就可以实现我的目标(我知道它是这样的),而是知道为什么在调用它在以后的查询中生成的字符串后会出错。

    我做了一个自定义的UDF,代码是:

    import java.util.HashMap;
    
    import org.apache.hadoop.hive.ql.exec.Description;
    import org.apache.hadoop.hive.ql.exec.UDF;
    import org.apache.hadoop.hive.ql.udf.UDFType;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    
    public class Calculate_states extends UDF
    {
    
            HashMap<String, Long> last_ts = 
                    new HashMap<String, Long>();
    
            HashMap<String, Integer> last_val = 
                    new HashMap<String, Integer>();
    
            HashMap<String, Long> ts_last_start = 
                    new HashMap<String, Long>();
    
            HashMap<String, String> start_type = 
                    new HashMap<String, String>();
    
    
      public  String evaluate( Integer bit,  Long ts,  Long next_ts, Integer next_bit, Integer time, String Ut)
      {
    
        Object[] result = new Object[4];
        String estado = new String();
    
        if(bit==null)
        {
            result[0]=new Text("no state");
    
        }
        else 
    
        {
            if(bit==1 && (
    
                        (
                        ( next_ts == null ||  ((next_ts-ts)/1000) > time )
                        &&
                        ( last_ts.get(Ut) == null  || ((ts-last_ts.get(Ut))/1000) > time  ) 
                        )
    
                        ||
                        (
                        (last_val.get(Ut)!=null) && 
                        last_val.get(Ut)==0 && ((ts-last_ts.get(Ut))/1000) <=time && 
                            (next_ts == null ||
                            (next_ts-ts)/1000 > time)
                        )
                        ||
                        (
                                (next_bit!=null) && // Condición necesaria para no entrar en problemas con los nulls
                                (       next_bit==0 && ((next_ts-ts)/1000) <= time && 
                                (   (last_ts.get(Ut) == null ||
                                    ((ts-last_ts.get(Ut))/1000) > time )
    
                                )
                                )
                        )
                        )
                    )
                 { estado= "isolated point";
                result[0]=new Text("isolated point");}
    
            else if 
        (
    
                bit==1 && 
                (
                last_val.get(Ut) != null && // Para evitar problemas de nulls
                last_val.get(Ut)==0 && ((ts-last_ts.get(Ut))/1000 ) <=time)
        ){  estado= "start";
            result[0]=new Text("start");}
    
        else if 
        (
                bit==0 && 
                ( last_val.get(Ut) != null && // Para evitar problemas de nulls
                last_val.get(Ut)==1 && ((ts-last_ts.get(Ut))/1000 ) <=time )
        ){estado= "stop";
            result[0]=new Text("stop");}    
        else if 
        (
                bit==1 && (last_ts.get(Ut)==null ||  ((ts-last_ts.get(Ut))/1000 ) > time  )
        ){estado= "no info start";
            result[0]=new Text("no info start");}
         else if 
        (
                bit==1 && (next_bit==null || ((next_ts-ts)/1000 ) > time  )
        ){estado= "no info stop";
            result[0]=new Text("no info stop");} 
        else if
        (bit==1 ){
            result[0]=new Text("working");}
        else if
        (bit==0 ){
            result[0]=new Text("stopped");}
            // Actualizar valores
            last_val.put(Ut,bit);
            last_ts.put(Ut,ts);
        }
    
        if (estado.equals("isolated point"))
        { result[1]= new LongWritable(1);
        // Podria ser freq. muestreo, nuevo parametro
        result[2]= new Text("isolated point");
          result[3]= new LongWritable(ts);
        } 
    
        else if ( 
         estado.equals("start") || 
         estado.equals("no info start")
          ){
            ts_last_start.put(Ut,ts);
            start_type.put(Ut,estado);
            //result[2]=null;
            result[3]=new LongWritable(ts);
            }
        else if ( 
                estado.equals("stop") || 
                estado.equals("no info stop")
                  ){
                    result[3]=new LongWritable(ts_last_start.get(Ut));
                    result[1]= new LongWritable((ts-ts_last_start.get(Ut))/1000);
                    result[2]= new Text(start_type.get(Ut)+"-"+estado); 
                    ts_last_start.put(Ut,null);
                    }
        else 
            //result[2]=null;
            if (ts_last_start.get(Ut) == null)
            {
                result[3] =null;
            }
            else
            result[3]=new LongWritable(ts_last_start.get(Ut));
    
        String resultado="";
        for (int i=0;i<4;i++)
        {
        if (i==3)
        resultado=resultado+String.valueOf(result[i]);
        else
            resultado=resultado+String.valueOf(result[i])+";";
        }
    
        return resultado;
      }
    }
    

    它的目标是计算一个组件的状态(它开始工作,停止工作),并在开始和停止之间的所有行中放置一个标识符。1/0表示工作/不工作组件。

    例如,这个 查询 :

    select 
    ut,ts, bit, 
    calculate_states(bit,ts,if (bit is null, null,next_ts),next_bit,1,ut) as states
    from 
    (
    select 
    ut,
    ts,
    bit, -- Means component bit
    last_value(bit ignore nulls) over (partition by ut order by ts desc rows between 1 preceding and 1 preceding) as next_bit,
    min(if (bit is not null, ts, null)) over (partition by ut order by ts desc rows between unbounded preceding and 1 preceding) as next_ts
    from my_table
    order by 1,2
    )b
    order by 1,2;
    

    将返回(在此表中):

    UT  |  ts  |  bit |   States
    a     1000     0      stopped;null;null;null
    a     2000     0      stopped;null;null;null
    a     3000     0      stopped;null;null;null
    a     4000     1      start;null;null;4000
    a     5000     1      no info stop;2;start-no info stop;4000
    a     6000     null   no state;null;null;null
    a     7000     1      no info start;null;null;7000
    a     8000     1      working;null;null;7000
    a     9000     0      stop;3;no info start-stop;7000
    a     10000    1      start;null;null;10000
    a     11000    1      working;null;null;10000
    a     12000    1      no info stop;3;start-no info stop;10000
    

    到这里一切正常。现在,我只是添加

    从中选择* 查询 UT指令

    创建表格新表格为 查询 和 从新表格中选择*按UT,TS排序

    在我的日志中出现此错误后:

    UDF WARNING: Hive UDF path=hdfs://mypath class=UDFpackImpala.Calculate_states failed due to: ImpalaRuntimeException: UDF::evaluate() ran into a problem.
    CAUSED BY: ImpalaRuntimeException: UDF failed to evaluate
    CAUSED BY: InvocationTargetException: null
    CAUSED BY: NullPointerException: null
    

    我的结果会变为我之前标记的结果

    UT  |  ts  |  bit |   States
    a     1000     0      stopped;null;null;null
    a     2000     0      stopped;null;null;null
    a     3000     0      NULL
    a     4000     1      stop;null;null;4000
    a     5000     1      working;null;null;null
    a     6000     null   start;null;null;null
    a     7000     1      working;null;null;null
    a     8000     1      working;null;null;null
    a     9000     0      stop;-1;no info start-stop;10000
    a     10000    1      start;null;null;10000
    a     11000    1      working;null;null;10000
    a     12000    1      isolated point;1;null;12000
    

    完全随机的东西。我的问题是,为什么?

    IMPALA版本:2.9.0-CDH5.12.2

    1 回复  |  直到 6 年前
        1
  •  0
  •   LSG    6 年前

    这一切都是因为如果不包括限制,IMPALA不尊重第一个select中的order by子句。

    如果您设置了9999999999限制 一阶按1,2,问题解决。