Uploaded image for project: 'CarbonData'
  1. CarbonData
  2. CARBONDATA-1781

(Carbon1.3.0 - Streaming) Select * & select column fails but select count(*) is success when .streaming file is removed from HDFS

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.3.0
    • 1.3.0
    • data-query
    • 3 node ant cluster

    Description

      Steps :
      Thrift server is started using the command - bin/spark-submit --master yarn-client --executor-memory 10G --executor-cores 5 --driver-memory 5G --num-executors 3 --class org.apache.carbondata.spark.thriftserver.CarbonThriftServer /srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar "hdfs://hacluster/user/hive/warehouse/carbon.store"

      Spark shell is opened using the command - bin/spark-shell --master yarn-client --executor-memory 10G --executor-cores 5 --driver-memory 5G --num-executors 3 --jars /srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar

      From spark shell the below code is executed -
      import java.io.

      {File, PrintWriter}

      import java.net.ServerSocket

      import org.apache.spark.sql.

      {CarbonEnv, SparkSession}

      import org.apache.spark.sql.hive.CarbonRelation
      import org.apache.spark.sql.streaming.

      {ProcessingTime, StreamingQuery}

      import org.apache.carbondata.core.constants.CarbonCommonConstants
      import org.apache.carbondata.core.util.CarbonProperties
      import org.apache.carbondata.core.util.path.

      {CarbonStorePath, CarbonTablePath}

      CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")

      import org.apache.spark.sql.CarbonSession._

      val carbonSession = SparkSession.
      builder().
      appName("StreamExample").
      getOrCreateCarbonSession("hdfs://hacluster/user/hive/warehouse/carbon.store")

      carbonSession.sparkContext.setLogLevel("INFO")

      def sql(sql: String) = carbonSession.sql(sql)

      def writeSocket(serverSocket: ServerSocket): Thread = {
      val thread = new Thread() {
      override def run(): Unit = {
      // wait for client to connection request and accept
      val clientSocket = serverSocket.accept()
      val socketWriter = new PrintWriter(clientSocket.getOutputStream())
      var index = 0
      for (_ <- 1 to 1000) {
      // write 5 records per iteration
      for (_ <- 0 to 100)

      { index = index + 1 socketWriter.println(index.toString + ",name_" + index + ",city_" + index + "," + (index * 10000.00).toString + ",school_" + index + ":school_" + index + index + "$" + index) }

      socketWriter.flush()
      Thread.sleep(2000)
      }
      socketWriter.close()
      System.out.println("Socket closed")
      }
      }
      thread.start()
      thread
      }

      def startStreaming(spark: SparkSession, tablePath: CarbonTablePath, tableName: String, port: Int): Thread = {
      val thread = new Thread() {
      override def run(): Unit = {
      var qry: StreamingQuery = null
      try

      { val readSocketDF = spark.readStream .format("socket") .option("host", "10.18.98.34") .option("port", port) .load() qry = readSocketDF.writeStream .format("carbondata") .trigger(ProcessingTime("5 seconds")) .option("checkpointLocation", tablePath.getStreamingCheckpointDir) .option("tablePath", tablePath.getPath).option("tableName", tableName) .start() qry.awaitTermination() }

      catch

      { case ex: Throwable => ex.printStackTrace() println("Done reading and writing streaming data") }

      finally

      { qry.stop() }

      }
      }
      thread.start()
      thread
      }

      val streamTableName = "brinjal"

      sql(s"drop table brinjal").show

      sql(s"create table brinjal (imei string,AMSize string,channelsId string,ActiveCountry string, Activecity string,gamePointId double,deviceInformationId double,productionDate Timestamp,deliveryDate timestamp,deliverycharge double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('streaming'='true','table_blocksize'='1')")

      sql(s"LOAD DATA INPATH 'hdfs://hacluster/chetan/vardhandaterestruct.csv' INTO TABLE brinjal OPTIONS('DELIMITER'=',', 'QUOTECHAR'= '','BAD_RECORDS_ACTION'='FORCE','FILEHEADER'= 'imei,deviceInformationId,AMSize,channelsId,ActiveCountry,Activecity,gamePointId,productionDate,deliveryDate,deliverycharge')")

      val carbonTable = CarbonEnv.getInstance(carbonSession).carbonMetastore.
      lookupRelation(Some("default"), streamTableName)(carbonSession).asInstanceOf[CarbonRelation].carbonTable

      val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)

      val port = 8002
      val serverSocket = new ServerSocket(port)
      val socketThread = writeSocket(serverSocket)
      val streamingThread = startStreaming(carbonSession, tablePath, streamTableName, port)

      From other terminal user deletes the .streaming file - BLR1000014307:/srv/spark2.2Bigdata/install/hadoop/datanode # bin/hadoop fs -rm -r /user/hive/warehouse/carbon.store/default/brinjal/.streaming
      17/11/20 19:02:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
      Deleted /user/hive/warehouse/carbon.store/default/brinjal/.streaming

      Also the thrift server spark submit process is killed when streaming is in progress.

      From beeline session user executes the select * , select column and select count queries.

      Issue : Select * and select column queries fail and throw exception while select count displays result set..

      0: jdbc:hive2://10.18.98.34:23040> select * from brinjal;
      Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 17.0 failed 4 times, most recent failure: Lost task 4.3 in stage 17.0 (TID 120, BLR1000014307, executor 12): java.io.EOFException
      at org.apache.carbondata.hadoop.streaming.StreamBlockletReader.readBytesFromStream(StreamBlockletReader.java:182)
      at org.apache.carbondata.hadoop.streaming.StreamBlockletReader.readBlockletData(StreamBlockletReader.java:116)
      at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.scanBlockletAndFillVector(CarbonStreamRecordReader.java:406)
      at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextColumnarBatch(CarbonStreamRecordReader.java:317)
      at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextKeyValue(CarbonStreamRecordReader.java:298)
      at org.apache.carbondata.spark.rdd.CarbonScanRDD$$anon$1.hasNext(CarbonScanRDD.scala:298)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
      at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
      at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
      at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:99)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

      Driver stacktrace: (state=,code=0)
      0: jdbc:hive2://10.18.98.34:23040> select imei from brinjal;
      Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 18.0 failed 4 times, most recent failure: Lost task 5.3 in stage 18.0 (TID 136, BLR1000014307, executor 13): java.io.EOFException
      at org.apache.carbondata.hadoop.streaming.StreamBlockletReader.readBytesFromStream(StreamBlockletReader.java:182)
      at org.apache.carbondata.hadoop.streaming.StreamBlockletReader.readBlockletData(StreamBlockletReader.java:116)
      at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.scanBlockletAndFillVector(CarbonStreamRecordReader.java:406)
      at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextColumnarBatch(CarbonStreamRecordReader.java:317)
      at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextKeyValue(CarbonStreamRecordReader.java:298)
      at org.apache.carbondata.spark.rdd.CarbonScanRDD$$anon$1.hasNext(CarbonScanRDD.scala:298)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
      at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
      at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
      at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:99)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

      Driver stacktrace: (state=,code=0)

      0: jdbc:hive2://10.18.98.34:23040> select count from brinjal;
      -----------+

      count(1)

      -----------+

      11916

      -----------+
      1 row selected (0.545 seconds)

      Expected : The behavior of select * and select column should be consistent with the select count behavior for streaming query. when .streaming file is deleted when streaming is in progress.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              chetdb Chetan Bhat
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1h
                  1h