Uploaded image for project: 'CarbonData'
  1. CarbonData
  2. CARBONDATA-2147

Exception displays while loading data with streaming

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 1.3.0
    • 1.4.0, 1.3.1
    • data-load
    • None
    • spark 2.1, spark 2.2.1

    Description

      Exception displays while loading data with streaming

      Steps to reproduce:

      1) start spark-shell:

      ./spark-shell --jars /opt/spark/spark-2.2.1/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar

      2) Execute following script:

      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.CarbonSession._
      import org.apache.carbondata.core.util.CarbonProperties
      import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}

      val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54310/newCarbonStore","/tmp")

      import org.apache.carbondata.core.constants.CarbonCommonConstants
      import org.apache.carbondata.core.util.CarbonProperties
      CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE")

      carbon.sql("drop table if exists uniqdata_stream")

      carbon.sql("create table uniqdata_stream(CUST_ID int,CUST_NAME String,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB', 'streaming'='true')");
      import carbon.sqlContext.implicits._

      import org.apache.spark.sql.types._
      val uniqdataSch = StructType(
      Array(StructField("CUST_ID", IntegerType),StructField("CUST_NAME", StringType),StructField("DOB", TimestampType), StructField("DOJ", TimestampType), StructField("BIGINT_COLUMN1", LongType), StructField("BIGINT_COLUMN2", LongType), StructField("DECIMAL_COLUMN1", org.apache.spark.sql.types.DecimalType(30, 10)), StructField("DECIMAL_COLUMN2", org.apache.spark.sql.types.DecimalType(36,10)), StructField("Double_COLUMN1", DoubleType), StructField("Double_COLUMN2", DoubleType), StructField("INTEGER_COLUMN1", IntegerType)))

      val streamDf = carbon.readStream
      .schema(uniqdataSch)
      .option("sep", ",")
      .csv("file:///home/knoldus/Documents/uniqdata")
      val qry = streamDf.writeStream.format("carbondata").trigger(ProcessingTime("5 seconds"))
      .option("checkpointLocation","/stream/uniq")
      .option("dbName", "default")
      .option("tableName", "uniqdata_stream")
      .start()

       

      3) Error logs:

      warning: there was one deprecation warning; re-run with -deprecation for details
      uniqdataSch: org.apache.spark.sql.types.StructType = StructType(StructField(CUST_ID,IntegerType,true), StructField(CUST_NAME,StringType,true), StructField(DOB,TimestampType,true), StructField(DOJ,TimestampType,true), StructField(BIGINT_COLUMN1,LongType,true), StructField(BIGINT_COLUMN2,LongType,true), StructField(DECIMAL_COLUMN1,DecimalType(30,10),true), StructField(DECIMAL_COLUMN2,DecimalType(36,10),true), StructField(Double_COLUMN1,DoubleType,true), StructField(Double_COLUMN2,DoubleType,true), StructField(INTEGER_COLUMN1,IntegerType,true))
      streamDf: org.apache.spark.sql.DataFrame = [CUST_ID: int, CUST_NAME: string ... 9 more fields]
      qry: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@d0e155c

      scala> 18/02/08 16:38:53 ERROR StreamSegment: Executor task launch worker for task 5 Failed to append batch data to stream segment: hdfs://localhost:54310/newCarbonStore/default/uniqdata_stream1/Fact/Part0/Segment_0
      java.lang.NullPointerException
      at org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:32)
      at org.apache.carbondata.streaming.parser.CSVStreamParserImp.parserRow(CSVStreamParserImp.java:40)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:337)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:331)
      at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
      at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:108)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
      18/02/08 16:38:53 ERROR Utils: Aborting task
      java.lang.NullPointerException
      at org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:32)
      at org.apache.carbondata.streaming.parser.CSVStreamParserImp.parserRow(CSVStreamParserImp.java:40)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:337)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:331)
      at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
      at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:108)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
      18/02/08 16:38:53 ERROR CarbonAppendableStreamSink$: Executor task launch worker for task 5 Job job_20180208163853_0005 aborted.
      18/02/08 16:38:53 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
      org.apache.carbondata.streaming.CarbonStreamException: Task failed while writing rows
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:324)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:108)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.NullPointerException
      at org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:32)
      at org.apache.carbondata.streaming.parser.CSVStreamParserImp.parserRow(CSVStreamParserImp.java:40)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:337)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:331)
      at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
      at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
      ... 8 more

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              Vandana7 Vandana Yadav
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m