代码之家  ›  专栏  ›  技术社区  ›  vanben

zeppelin用户Spark解释器将数据帧保存到Hbase表

  •  0
  • vanben  · 技术社区  · 7 年前

    我需要用Zeppelin构建一个分析解决方案,从Oracle、Hadoop、Hbase读取数据,现在我需要用zeepelin spark解释器将分析结果保存到Hbase,我编写了一个演示代码,这个演示在spark开发环境中工作,但在Zeppelin中不起作用。

    谁能帮帮我,谢谢。

     %spark
    import org.apache.spark.sql.{DataFrame, Row, SQLContext}
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.hbase.client.Put
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.client.Result
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
    import org.apache.hadoop.mapreduce.Job
    
    var sql="select * from analyticresult"
    
    val sqlContext = sqlc
    
    import sqlContext.implicits._
    val list = sqlContext.sql(sql)
    
    
    def getJob(tableName: String): Job = {
        sc.hadoopConfiguration.set("hbase.zookeeper.quorum", "dn1.hadoop,dn2.hadoop,dn3.hadoop,dn4.hadoop,dn5.hadoop")
        sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
        sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    
        val job = Job.getInstance(sc.hadoopConfiguration)
        job.setOutputKeyClass(classOf[ImmutableBytesWritable])
        job.setOutputValueClass(classOf[Result])
        job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
        job
      }
    
    val hbaseRDD = list.mapPartitions( iter => {
          iter.map( it => {
            var rowkey = it.getAs[String](8).getBytes
            var put = new Put(rowkey) //set rowkey
            put.addColumn(Bytes.toBytes("A"), Bytes.toBytes("CI"), it.getAs[String](3).getBytes)
            put.addColumn(Bytes.toBytes("A"), Bytes.toBytes("CT"), it.getAs[String](4).getBytes) 
            //Bytes.toBytes(it.getLong(9))
    
            (new ImmutableBytesWritable, put)
    
          })
        })
    
    var table = getJob("HWBTEST") 
    hbaseRDD.saveAsNewAPIHadoopDataset(table.getConfiguration)
    

    齐柏林飞艇控制台编译代码是正常的,但有一个非法状态异常。

    sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@38a0571d
    import sqlContext.implicits._
    list: org.apache.spark.sql.DataFrame = [cardid: string, lineid: string, amount: string, time: string, busline_name: string, vin: string, carno: string, carcolor: string, gpstime: string, systime: string, status: string, alarm: string, lon: string, lat: string, height: string, speed: string, buslineId: string, stype: string, stationSeq: string, stationName: string, stationMark: string, onPersons: string, offPersons: string, buslineName: string, between_time: double, rn: int]
    getJob: (tableName: String)org.apache.hadoop.mapreduce.Job
    hbaseRDD: org.apache.spark.rdd.RDD[(org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Put)] = MapPartitionsRDD[276] at mapPartitions at <console>:137
    java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING
        at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:292)
        at org.apache.hadoop.mapreduce.Job.toString(Job.java:457)
        at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:324)
        at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:329)
        at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337)
        at .<init>(<console>:10)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
        at sun.reflect.GeneratedMethodAccessor210.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.zeppelin.spark.Utils.invokeMethod(Utils.java:38)
        at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:717)
        at org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:928)
        at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:871)
        at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:864)
        at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:94)
        at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:341)
        at org.apache.zeppelin.scheduler.Job.run(Job.java:176)
        at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:139)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    
    1 回复  |  直到 7 年前
        1
  •  0
  •   vanben    7 年前

    hbaseRDD。saveAsNewAPIHadoopDataset(getJob(“HWBTEST”)。getConfiguration)

    解决了问题