Uploaded image for project: 'HBase'
  1. HBase
  2. HBASE-20295

TableOutputFormat.checkOutputSpecs throw NullPointerException Exception

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.4.0
    • Fix Version/s: 2.0.0
    • Component/s: mapreduce
    • Labels:
      None
    • Environment:

      Spark 2.2.1, HBase 1.4.0

    • Hadoop Flags:
      Reviewed

      Description

      I am using spark write data to HBase by using RDD.

      saveAsNewAPIHadoopDataset function, it works fine with hbase 1.3.1, but when update my hbase dependency to 1.4.0 in pom.xml, it throw java.lang.NullPointerException, it is caused by a logic error in TableOutputFormat.checkOutputSpecs function, please check below details:

      first let's take a look at SparkHadoopMapReduceWriter.write function in SparkHadoopMapReduceWriter.scala

      // SparkHadoopMapReduceWriter.write (org.apache.spark.internal.io.SparkHadoopMapReduceWriter.scala)
      
      def write[K, V: ClassTag](
          rdd: RDD[(K, V)],
          hadoopConf: Configuration): Unit = {
        // Extract context and configuration from RDD.
        val sparkContext = rdd.context
        val stageId = rdd.id
        val sparkConf = rdd.conf
        val conf = new SerializableConfiguration(hadoopConf)
      
        // Set up a job.
        val jobTrackerId = SparkHadoopWriterUtils.createJobTrackerID(new Date())
        val jobAttemptId = new TaskAttemptID(jobTrackerId, stageId, TaskType.MAP, 0, 0)
        val jobContext = new TaskAttemptContextImpl(conf.value, jobAttemptId)
        val format = jobContext.getOutputFormatClass
      
        if (SparkHadoopWriterUtils.isOutputSpecValidationEnabled(sparkConf)) {
          // FileOutputFormat ignores the filesystem parameter
          val jobFormat = format.newInstance
          jobFormat.checkOutputSpecs(jobContext)
        }
      
        val committer = FileCommitProtocol.instantiate(
          className = classOf[HadoopMapReduceCommitProtocol].getName,
          jobId = stageId.toString,
          outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
          isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
        committer.setupJob(jobContext)
      ...

      in "write" function if output spec validation is enabled, it will call checkOutputSpec function in TableOutputFormat class, but the job format is simply created by "vall jobFormat = format.newInstance", this will NOT initialize "conf" member variable in TableOutputFormat class, let's continue check checkOutputSpecs function in TableOutputFormat class

       

      // TableOutputFormat.checkOutputSpecs (org.apache.hadoop.hbase.mapreduce.TableOutputFormat.java) HBASE 1.4.0
      @Override
      public void checkOutputSpecs(JobContext context) throws IOException,
          InterruptedException {
      
        try (Admin admin = ConnectionFactory.createConnection(getConf()).getAdmin()) {
          TableName tableName = TableName.valueOf(this.conf.get(OUTPUT_TABLE));
          if (!admin.tableExists(tableName)) {
            throw new TableNotFoundException("Can't write, table does not exist:" +
                tableName.getNameAsString());
          }
      
          if (!admin.isTableEnabled(tableName)) {
            throw new TableNotEnabledException("Can't write, table is not enabled: " +
                tableName.getNameAsString());
          }
        }
      }
      

       

      "ConnectionFactory.createConnection(getConf())", as mentioned above "conf" class member is not initialized, so getConf() will return null, so in the next UserProvider create instance process, it throw the NullPointException(Please part of stack trace at the end), it is a little confused that, context passed by function parameter is actually been properly constructed, and it contains Configuration object, why context is never used? So I suggest to use below code to partly fix this issue:

       

      // code placeholder
      @Override
      public void checkOutputSpecs(JobContext context) throws IOException,
          InterruptedException {
        Configuration hConf = context.getConfiguration();
        if(hConf == null)
          hConf = this.conf;
      
        try (Admin admin = ConnectionFactory.createConnection(hConf).getAdmin()) {
          TableName tableName = TableName.valueOf(hConf.get(OUTPUT_TABLE));
          if (!admin.tableExists(tableName)) {
            throw new TableNotFoundException("Can't write, table does not exist:" +
                    tableName.getNameAsString());
          }
      
          if (!admin.isTableEnabled(tableName)) {
            throw new TableNotEnabledException("Can't write, table is not enabled: " +
                    tableName.getNameAsString());
          }
        }
      }
      

      In hbase 1.3.1, this issue is not exists because checkOutputSpecs has a blank function body

       

       

      Part of stack trace:

      Exception in thread "main" java.lang.NullPointerException
      at org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:122)
      at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:214)
      at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119)
      at org.apache.hadoop.hbase.mapreduce.TableOutputFormat.checkOutputSpecs(TableOutputFormat.java:177)
      at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:76)
      at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
      at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
      at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
      at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)

       

       

        Attachments

        1. HBASE-20295.branch-1.4.001.patch
          2 kB
          Michael Jin
        2. HBASE-20295.master.001.patch
          2 kB
          Michael Jin
        3. HBASE-20295.master.002.patch
          2 kB
          Michael Jin
        4. HBASE-20295.master.003.patch
          2 kB
          Michael Jin

          Issue Links

            Activity

              People

              • Assignee:
                menjin Michael Jin
                Reporter:
                menjin Michael Jin
              • Votes:
                0 Vote for this issue
                Watchers:
                7 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - 168h
                  168h
                  Remaining:
                  Remaining Estimate - 168h
                  168h
                  Logged:
                  Time Spent - Not Specified
                  Not Specified