Uploaded image for project: 'CarbonData'
  1. CarbonData
  2. CARBONDATA-2003

Streaming table is not updated on second streaming load

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.3.0
    • None
    • data-load
    • None
    • spark2.1

    Description

      I tried the following scenario on spark shell:

      import org.apache.spark.sql.SparkSession

      import org.apache.spark.sql.types._
      import org.apache.spark.sql.CarbonSession._
      import org.apache.carbondata.core.util.CarbonProperties
      import org.apache.spark.sql.streaming.

      {ProcessingTime, StreamingQuery}

      val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp")

      import org.apache.carbondata.core.constants.CarbonCommonConstants
      import org.apache.carbondata.core.util.CarbonProperties
      CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE")

      carbon.sql("CREATE TABLE uniqdata_stream_8(CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION string, DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10), DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB', 'streaming'='true')")

      import carbon.sqlContext.implicits._

      val uniqdataSch = StructType(
      Array(StructField("CUST_ID", IntegerType),StructField("CUST_NAME", StringType),StructField("ACTIVE_EMUI_VERSION", StringType),StructField("DOB", TimestampType), StructField("DOJ", TimestampType), StructField("BIGINT_COLUMN1", LongType), StructField("BIGINT_COLUMN2", LongType), StructField("DECIMAL_COLUMN1", org.apache.spark.sql.types.DecimalType(30, 10)), StructField("DECIMAL_COLUMN2", org.apache.spark.sql.types.DecimalType(36,10)), StructField("Double_COLUMN1", DoubleType), StructField("Double_COLUMN2", DoubleType), StructField("INTEGER_COLUMN1", IntegerType)))

      val streamDf = carbon.readStream
      .schema(uniqdataSch)
      .option("sep", ",")
      .csv("file:///home/geetika/Downloads/uniqdata")

      val dfToWrite = streamDf.map

      {x => x.get(0) + "," + x.get(1) + "," + x.get(2)+ "," + x.get(3)+ "," + x.get(4)+ "," + x.get(5)+ "," + x.get(6)+ "," + x.get(7)+ "," + x.get(8)+ "," + x.get(9)+ "," + x.get(10)+ "," + x.get(11)}

      val qry = dfToWrite.writeStream.format("carbondata").trigger(ProcessingTime("5 seconds"))
      .option("checkpointLocation","/stream/uniq8")
      .option("dbName", "default")
      .option("tableName", "uniqdata_stream_8")
      .start()

      qry.awaitTermination()

      Now close this shell and check the record count on the table using :

      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.CarbonSession._
      val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp")
      carbon.sql("select count from uniqdata_stream_8").show
      OUTPUT:
      scala> carbon.sql("select count from uniqdata_stream_8").show
      18/01/08 15:51:53 ERROR CarbonProperties: Executor task launch worker-0 Configured value for property carbon.number.of.cores.while.loading is wrong. Falling back to the default value 2
      --------

      count(1)

      --------

      2013

      --------

      Again try the above scenario and check the count. It remains same after the second streaming load.

      Attachments

        1. 2000_UniqData.csv
          367 kB
          Geetika Gupta

        Activity

          People

            Jatin Demla Jatin
            geetikagupta Geetika Gupta
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: