Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.4.0
-
None
-
None
-
spark 2.1, spark 2.2
Description
Getting Error while executing Streaming example from the streaming guide documentation
Steps to reproduce:
1) Run spark shell with latest build using:
./spark-shell --jars /home/knoldus/Desktop/CARBONDATA/carbondata/assembly/target/scala-2.11-carbondata-1.4.0-SNAPSHOT-bin-spark2.1.0-hadoop2.7.2.jar
Execute the example:
:paste
// Entering paste mode (ctrl-D to finish)
import java.io.File
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
import org.apache.carbondata.core.util.path.CarbonStorePath
val warehouse = new File("./warehouse").getCanonicalPath
val metastore = new File("./metastore").getCanonicalPath
val spark = SparkSession
.builder()
.master("local")
.appName("StreamExample")
.config("spark.sql.warehouse.dir", warehouse)
.getOrCreateCarbonSession(warehouse, metastore)
spark.sparkContext.setLogLevel("ERROR")
// drop table if exists previously
spark.sql(s"DROP TABLE IF EXISTS carbon_table")
// Create target carbon table and populate with initial data
spark.sql(
s"""
CREATE TABLE carbon_table ( |
col1 INT, |
col2 STRING |
) |
STORED BY 'carbondata' |
TBLPROPERTIES('streaming'='true')""".stripMargin) |
val carbonTable = CarbonEnv.getCarbonTable(Some("default"), "carbon_table")(spark)
val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
// batch load
var qry: StreamingQuery = null
val readSocketDF = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9099)
.load()
// Write data from socket stream to carbondata file
qry = readSocketDF.writeStream
.format("carbondata")
.trigger(ProcessingTime("5 seconds"))
.option("checkpointLocation", tablePath.getStreamingCheckpointDir)
.option("dbName", "default")
.option("tableName", "carbon_table")
.start()
// start new thread to show data
new Thread() {
override def run(): Unit = {
do
while (true)
}
}.start()
qry.awaitTermination()
Expected Result: it should be executed successfully.
Actual Result:
// Exiting paste mode, now interpreting.
<console>:27: error: object CarbonStorePath is not a member of package org.apache.carbondata.core.util.path
import org.apache.carbondata.core.util.path.CarbonStorePath
^
<console>:54: error: not found: value CarbonStorePath
val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
Attachments
Issue Links
- links to