Details
Description
Exception displays while loading data with streaming
Steps to reproduce:
1) start spark-shell:
./spark-shell --jars /opt/spark/spark-2.2.1/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar
2) Execute following script:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54310/newCarbonStore","/tmp")
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE")
carbon.sql("drop table if exists uniqdata_stream")
carbon.sql("create table uniqdata_stream(CUST_ID int,CUST_NAME String,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB', 'streaming'='true')");
import carbon.sqlContext.implicits._
import org.apache.spark.sql.types._
val uniqdataSch = StructType(
Array(StructField("CUST_ID", IntegerType),StructField("CUST_NAME", StringType),StructField("DOB", TimestampType), StructField("DOJ", TimestampType), StructField("BIGINT_COLUMN1", LongType), StructField("BIGINT_COLUMN2", LongType), StructField("DECIMAL_COLUMN1", org.apache.spark.sql.types.DecimalType(30, 10)), StructField("DECIMAL_COLUMN2", org.apache.spark.sql.types.DecimalType(36,10)), StructField("Double_COLUMN1", DoubleType), StructField("Double_COLUMN2", DoubleType), StructField("INTEGER_COLUMN1", IntegerType)))
val streamDf = carbon.readStream
.schema(uniqdataSch)
.option("sep", ",")
.csv("file:///home/knoldus/Documents/uniqdata")
val qry = streamDf.writeStream.format("carbondata").trigger(ProcessingTime("5 seconds"))
.option("checkpointLocation","/stream/uniq")
.option("dbName", "default")
.option("tableName", "uniqdata_stream")
.start()
3) Error logs:
warning: there was one deprecation warning; re-run with -deprecation for details
uniqdataSch: org.apache.spark.sql.types.StructType = StructType(StructField(CUST_ID,IntegerType,true), StructField(CUST_NAME,StringType,true), StructField(DOB,TimestampType,true), StructField(DOJ,TimestampType,true), StructField(BIGINT_COLUMN1,LongType,true), StructField(BIGINT_COLUMN2,LongType,true), StructField(DECIMAL_COLUMN1,DecimalType(30,10),true), StructField(DECIMAL_COLUMN2,DecimalType(36,10),true), StructField(Double_COLUMN1,DoubleType,true), StructField(Double_COLUMN2,DoubleType,true), StructField(INTEGER_COLUMN1,IntegerType,true))
streamDf: org.apache.spark.sql.DataFrame = [CUST_ID: int, CUST_NAME: string ... 9 more fields]
qry: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@d0e155c
scala> 18/02/08 16:38:53 ERROR StreamSegment: Executor task launch worker for task 5 Failed to append batch data to stream segment: hdfs://localhost:54310/newCarbonStore/default/uniqdata_stream1/Fact/Part0/Segment_0
java.lang.NullPointerException
at org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:32)
at org.apache.carbondata.streaming.parser.CSVStreamParserImp.parserRow(CSVStreamParserImp.java:40)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:337)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:331)
at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
18/02/08 16:38:53 ERROR Utils: Aborting task
java.lang.NullPointerException
at org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:32)
at org.apache.carbondata.streaming.parser.CSVStreamParserImp.parserRow(CSVStreamParserImp.java:40)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:337)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:331)
at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
18/02/08 16:38:53 ERROR CarbonAppendableStreamSink$: Executor task launch worker for task 5 Job job_20180208163853_0005 aborted.
18/02/08 16:38:53 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
org.apache.carbondata.streaming.CarbonStreamException: Task failed while writing rows
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:324)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:32)
at org.apache.carbondata.streaming.parser.CSVStreamParserImp.parserRow(CSVStreamParserImp.java:40)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:337)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:331)
at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
... 8 more
Attachments
Issue Links
- links to