Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22558

SparkHiveDynamicPartition fails when trying to write data from kafka to hive using spark streaming

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Important

    Description

      I am able to write data from kafka into hive table using spark streaming. Batches run successfully for one day and after some successful runs I get below errors. Is there a way to resolve it.

      It is dynamic hive partitoon

      Job aborted due to stage failure: Task 0 in stage 381.0 failed 4 times, most recent failure: Lost task 0.3 in stage 381.0 (TID 129383, brksvl255.brk.navistar.com, executor 1): org.apache.spark.SparkException: Task failed while writing rows.
      at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:328)
      at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:210)
      at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:210)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:99)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.NullPointerException
      at parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:152)
      at parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:111)
      at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
      at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.close(ParquetRecordWriterWrapper.java:102)
      at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.close(ParquetRecordWriterWrapper.java:119)
      at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:320)
      ... 8 more

      I am sure there is some problem with dynamic partion. Here is query executed inside dstream.

      insert into bonalab.datapoint_location partition(year,month)
      select vin,utctime,description,descriptionuom,providerdesc,
      islocation,latitude,longitude,speed,value, current_timestamp as processed_date,
      1 as version,
      year,month from
      bonalab.datapoint_location where
      year=2017
      and month=10
      group by year,month,vin,utctime,description,descriptionuom,providerdesc,
      islocation,latitude,longitude,speed,value,processed_date limit 15

      val datapointDF = datapointDStream.foreachRDD { rdd =>
      if (!runMode.equalsIgnoreCase("local"))

      { sparkSession.sql(s"set hive.exec.dynamic.partition.mode=nonstrict") sparkSession.sql("SET hive.exec.max.dynamic.partitions.pernode = 400") sparkSession.sql(s"set hive.exec.dynamic.partition = true") }

      if (!rdd.isEmpty)

      { /* val sparkSession = SparkSession.builder.enableHiveSupport.getOrCreate import sparkSession.implicits._*/ val datapointDstreamDF = rdd.toDS //println("DataPoint data") //datapointDstreamDF.show(1) datapointDstreamDF.createOrReplaceTempView("datapoint_tmp") sparkSession.sql(HiveDAO.Geofences.insertLocationDataPoints("datapoint_tmp",hiveDBInstance)) }

      }

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            khajaasmath786 KhajaAsmath Mohammed
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - 12h
                12h
                Remaining:
                Remaining Estimate - 12h
                12h
                Logged:
                Time Spent - Not Specified
                Not Specified

                Slack

                  Issue deployment