Description
I created a streaming table and loaded data into it using the following commands on spark shell:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.streaming.
val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp")
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE")
carbon.sql("drop table if exists uniqdata_stream")
carbon.sql("create table uniqdata_stream(CUST_ID int,CUST_NAME String,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB', 'streaming'='true')");
import carbon.sqlContext.implicits._
import org.apache.spark.sql.types._
val uniqdataSch = StructType(
Array(StructField("CUST_ID", IntegerType),StructField("CUST_NAME", StringType),StructField("DOB", TimestampType), StructField("DOJ", TimestampType), StructField("BIGINT_COLUMN1", LongType), StructField("BIGINT_COLUMN2", LongType), StructField("DECIMAL_COLUMN1", org.apache.spark.sql.types.DecimalType(30, 10)), StructField("DECIMAL_COLUMN2", org.apache.spark.sql.types.DecimalType(36,10)), StructField("Double_COLUMN1", DoubleType), StructField("Double_COLUMN2", DoubleType), StructField("INTEGER_COLUMN1", IntegerType)))
val streamDf = carbon.readStream
.schema(uniqdataSch)
.option("sep", ",")
.csv("file:///home/geetika/Downloads/uniqdata")
val qry = streamDf.writeStream.format("carbondata").trigger(ProcessingTime("5 seconds"))
.option("checkpointLocation","/stream/uniq")
.option("dbName", "default")
.option("tableName", "uniqdata_stream")
.start()
qry.awaitTermination()
//Press ctrl+c to terminate
start the spark shell again
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._
val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp")
carbon.sql("show segments for table uniqdata_stream").show
It shows the following output:
---------------------------------------------------------------------+
SegmentSequenceId | Status | Load Start Time | Load End Time | Merged To | File Format |
---------------------------------------------------------------------+
0 | Streaming | 2018-01-05 18:23:... | null | NA | ROW_V1 |
---------------------------------------------------------------------+
Status for the segment is not updated