Uploaded image for project: 'CarbonData'
  1. CarbonData
  2. CARBONDATA-2002

Streaming segment status is not getting updated to finished or success

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.4.0
    • Component/s: data-load
    • Labels:
      None
    • Environment:
      spark2.1

      Description

      I created a streaming table and loaded data into it using the following commands on spark shell:

      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.CarbonSession._
      import org.apache.carbondata.core.util.CarbonProperties
      import org.apache.spark.sql.streaming.

      {ProcessingTime, StreamingQuery}

      val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp")

      import org.apache.carbondata.core.constants.CarbonCommonConstants
      import org.apache.carbondata.core.util.CarbonProperties
      CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE")

      carbon.sql("drop table if exists uniqdata_stream")

      carbon.sql("create table uniqdata_stream(CUST_ID int,CUST_NAME String,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB', 'streaming'='true')");
      import carbon.sqlContext.implicits._

      import org.apache.spark.sql.types._
      val uniqdataSch = StructType(
      Array(StructField("CUST_ID", IntegerType),StructField("CUST_NAME", StringType),StructField("DOB", TimestampType), StructField("DOJ", TimestampType), StructField("BIGINT_COLUMN1", LongType), StructField("BIGINT_COLUMN2", LongType), StructField("DECIMAL_COLUMN1", org.apache.spark.sql.types.DecimalType(30, 10)), StructField("DECIMAL_COLUMN2", org.apache.spark.sql.types.DecimalType(36,10)), StructField("Double_COLUMN1", DoubleType), StructField("Double_COLUMN2", DoubleType), StructField("INTEGER_COLUMN1", IntegerType)))

      val streamDf = carbon.readStream
      .schema(uniqdataSch)
      .option("sep", ",")
      .csv("file:///home/geetika/Downloads/uniqdata")
      val qry = streamDf.writeStream.format("carbondata").trigger(ProcessingTime("5 seconds"))
      .option("checkpointLocation","/stream/uniq")
      .option("dbName", "default")
      .option("tableName", "uniqdata_stream")
      .start()

      qry.awaitTermination()

      //Press ctrl+c to terminate

      start the spark shell again

      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.CarbonSession._
      val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp")
      carbon.sql("show segments for table uniqdata_stream").show

      It shows the following output:
      ---------------------------------------------------------------------+

      SegmentSequenceId Status Load Start Time Load End Time Merged To File Format

      ---------------------------------------------------------------------+

      0 Streaming 2018-01-05 18:23:... null NA ROW_V1

      ---------------------------------------------------------------------+

      Status for the segment is not updated

        Attachments

        1. 2000_UniqData.csv
          367 kB
          Geetika Gupta

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              geetikagupta Geetika Gupta
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: