Uploaded image for project: 'CarbonData'
  1. CarbonData
  2. CARBONDATA-2198

Streaming data to a table with bad_records_action as IGNORE throws ClassCastException

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 1.4.0
    • 1.4.0
    • data-load
    • None

    Description

      Steps to reproduce:

      /*

      • Licensed to the Apache Software Foundation (ASF) under one or more
      • contributor license agreements. See the NOTICE file distributed with
      • this work for additional information regarding copyright ownership.
      • The ASF licenses this file to You under the Apache License, Version 2.0
      • (the "License"); you may not use this file except in compliance with
      • the License. You may obtain a copy of the License at
        *
      • http://www.apache.org/licenses/LICENSE-2.0
        *
      • Unless required by applicable law or agreed to in writing, software
      • distributed under the License is distributed on an "AS IS" BASIS,
      • WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      • See the License for the specific language governing permissions and
      • limitations under the License.
        */

      package org.apache.carbondata.examples

      import java.io.{File, PrintWriter}
      import java.net.ServerSocket

      import org.apache.spark.sql.{CarbonEnv, SparkSession}
      import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}

      import org.apache.carbondata.core.constants.CarbonCommonConstants
      import org.apache.carbondata.core.util.CarbonProperties
      import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}

      // scalastyle:off println
      object CarbonStructuredStreamingExample {
      def main(args: Array[String]) {

      // setup paths
      val rootPath = new File(this.getClass.getResource("/").getPath
      + "../../../..").getCanonicalPath
      val storeLocation = s"$rootPath/examples/spark2/target/store"
      val warehouse = s"$rootPath/examples/spark2/target/warehouse"
      val metastoredb = s"$rootPath/examples/spark2/target"
      val streamTableName = s"stream_table"

      CarbonProperties.getInstance()
      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")

      import org.apache.spark.sql.CarbonSession._
      val spark = SparkSession
      .builder()
      .master("local")
      .appName("CarbonStructuredStreamingExample")
      .config("spark.sql.warehouse.dir", warehouse)
      .getOrCreateCarbonSession(storeLocation, metastoredb)

      spark.sparkContext.setLogLevel("ERROR")

      val requireCreateTable = true
      val useComplexDataType = false

      if (requireCreateTable) {
      // drop table if exists previously
      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
      // Create target carbon table and populate with initial data
      if (useComplexDataType) {
      spark.sql(
      s"""

      CREATE TABLE ${ streamTableName }(
      id INT,
      name STRING,
      city STRING,
      salary FLOAT,
      file struct<school:array<string>, age:int>
      )
      STORED BY 'carbondata'
      TBLPROPERTIES(
      'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')
      """.stripMargin)
      } else {
      spark.sql(
      s"""
      CREATE TABLE ${ streamTableName }(
      id INT,
      name STRING,
      city STRING,
      salary FLOAT
      )
      STORED BY 'carbondata'
      TBLPROPERTIES(
      'streaming'='true', 'sort_columns'='name')
      """.stripMargin)
      }

      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
      val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)

      // streaming ingest
      val serverSocket = new ServerSocket(7071)
      val thread1 = startStreaming(spark, tablePath)
      val thread2 = writeSocket(serverSocket)

      System.out.println("type enter to interrupt streaming")
      System.in.read()
      thread1.interrupt()
      thread2.interrupt()
      serverSocket.close()
      }

      spark.sql(s"select * from $streamTableName").show
      spark.stop()
      System.out.println("streaming finished")
      }

      def showTableCount(spark: SparkSession, tableName: String): Thread = {
      val thread = new Thread() {
      override def run(): Unit = {
      for (_ <- 0 to 1000)

      { spark.sql(s"select count(*) from $tableName").show(truncate = false) Thread.sleep(1000 * 3) }

      }
      }
      thread.start()
      thread
      }

      def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread = {
      val thread = new Thread() {
      override def run(): Unit = {
      var qry: StreamingQuery = null
      try

      { val readSocketDF = spark.readStream .format("socket") .option("host", "localhost") .option("port", 7071) .load() // Write data from socket stream to carbondata file qry = readSocketDF.writeStream .format("carbondata") .trigger(ProcessingTime("5 seconds")) .option("checkpointLocation", tablePath.getStreamingCheckpointDir) .option("bad_records_action", "ignore") .option("dbName", "default") .option("tableName", "stream_table") .start() qry.awaitTermination() }

      catch

      { case ex: Exception => ex.printStackTrace() println("Done reading and writing streaming data") }

      finally

      { qry.stop() }

      }
      }
      thread.start()
      thread
      }

      def writeSocket(serverSocket: ServerSocket): Thread = {
      val thread = new Thread() {
      override def run(): Unit = {
      // wait for client to connection request and accept
      val clientSocket = serverSocket.accept()
      val socketWriter = new PrintWriter(clientSocket.getOutputStream())
      var index = 0
      for (_ <- 1 to 1000) {
      // write 5 records per iteration
      for (_ <- 0 to 1000)

      { index = index + 1 socketWriter.println("null" + ",name_" + index + ",city_" + index + "," + (index * 10000.00).toString + ",school_" + index + ":school_" + index + index + "$" + index) }

      socketWriter.flush()
      Thread.sleep(1000)
      }
      socketWriter.close()
      System.out.println("Socket closed")
      }
      }
      thread.start()
      thread
      }
      }
      // scalastyle:on println

      In the above example we are streaming data to table with bad_records_action as IGNORE, it throws ClassCastException.

       

      Here are the logs:

      18/02/23 16:09:50 ERROR StreamSegment: Executor task launch worker-0 Failed to append batch data to stream segment: /home/geetika/Workspace/incubator-carbondata/examples/spark2/target/store/default/stream_table/Fact/Part0/Segment_0
      java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
      at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241)
      at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
      at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:99)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
      18/02/23 16:09:50 ERROR Utils: Aborting task
      java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
      at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241)
      at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
      at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:99)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
      18/02/23 16:09:50 ERROR CarbonAppendableStreamSink$: Executor task launch worker-0 Job job_20180223160950_0000 aborted.
      18/02/23 16:09:50 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
      org.apache.carbondata.streaming.CarbonStreamException: Task failed while writing rows
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:324)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:99)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
      at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241)
      at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
      at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
      ... 8 more

      Attachments

        Issue Links

          Activity

            People

              anubhavtarar anubhav tarar
              geetikagupta Geetika Gupta
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 3.5h
                  3.5h