Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.3.0
-
3 node ant cluster
Description
Steps :
Thrift server is started using the command - bin/spark-submit --master yarn-client --executor-memory 10G --executor-cores 5 --driver-memory 5G --num-executors 3 --class org.apache.carbondata.spark.thriftserver.CarbonThriftServer /srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar "hdfs://hacluster/user/hive/warehouse/carbon.store"
Spark shell is opened using the command - bin/spark-shell --master yarn-client --executor-memory 10G --executor-cores 5 --driver-memory 5G --num-executors 3 --jars /srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar
From spark shell the below code is executed -
import java.io.
import java.net.ServerSocket
import org.apache.spark.sql.
{CarbonEnv, SparkSession}import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.streaming.
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
import org.apache.spark.sql.CarbonSession._
val carbonSession = SparkSession.
builder().
appName("StreamExample").
getOrCreateCarbonSession("hdfs://hacluster/user/hive/warehouse/carbon.store")
carbonSession.sparkContext.setLogLevel("INFO")
def sql(sql: String) = carbonSession.sql(sql)
def writeSocket(serverSocket: ServerSocket): Thread = {
val thread = new Thread() {
override def run(): Unit = {
// wait for client to connection request and accept
val clientSocket = serverSocket.accept()
val socketWriter = new PrintWriter(clientSocket.getOutputStream())
var index = 0
for (_ <- 1 to 1000) {
// write 5 records per iteration
for (_ <- 0 to 100)
socketWriter.flush()
Thread.sleep(2000)
}
socketWriter.close()
System.out.println("Socket closed")
}
}
thread.start()
thread
}
def startStreaming(spark: SparkSession, tablePath: CarbonTablePath, tableName: String, port: Int): Thread = {
val thread = new Thread() {
override def run(): Unit = {
var qry: StreamingQuery = null
try
catch
{ case ex: Throwable => ex.printStackTrace() println("Done reading and writing streaming data") }finally
{ qry.stop() } }
}
thread.start()
thread
}
val streamTableName = "brinjal"
sql(s"drop table brinjal").show
sql(s"create table brinjal (imei string,AMSize string,channelsId string,ActiveCountry string, Activecity string,gamePointId double,deviceInformationId double,productionDate Timestamp,deliveryDate timestamp,deliverycharge double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('streaming'='true','table_blocksize'='1')")
sql(s"LOAD DATA INPATH 'hdfs://hacluster/chetan/vardhandaterestruct.csv' INTO TABLE brinjal OPTIONS('DELIMITER'=',', 'QUOTECHAR'= '','BAD_RECORDS_ACTION'='FORCE','FILEHEADER'= 'imei,deviceInformationId,AMSize,channelsId,ActiveCountry,Activecity,gamePointId,productionDate,deliveryDate,deliverycharge')")
val carbonTable = CarbonEnv.getInstance(carbonSession).carbonMetastore.
lookupRelation(Some("default"), streamTableName)(carbonSession).asInstanceOf[CarbonRelation].carbonTable
val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
val port = 8002
val serverSocket = new ServerSocket(port)
val socketThread = writeSocket(serverSocket)
val streamingThread = startStreaming(carbonSession, tablePath, streamTableName, port)
From other terminal user deletes the .streaming file - BLR1000014307:/srv/spark2.2Bigdata/install/hadoop/datanode # bin/hadoop fs -rm -r /user/hive/warehouse/carbon.store/default/brinjal/.streaming
17/11/20 19:02:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Deleted /user/hive/warehouse/carbon.store/default/brinjal/.streaming
Also the thrift server spark submit process is killed when streaming is in progress.
From beeline session user executes the select * , select column and select count queries.
Issue : Select * and select column queries fail and throw exception while select count displays result set..
0: jdbc:hive2://10.18.98.34:23040> select * from brinjal;
Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 17.0 failed 4 times, most recent failure: Lost task 4.3 in stage 17.0 (TID 120, BLR1000014307, executor 12): java.io.EOFException
at org.apache.carbondata.hadoop.streaming.StreamBlockletReader.readBytesFromStream(StreamBlockletReader.java:182)
at org.apache.carbondata.hadoop.streaming.StreamBlockletReader.readBlockletData(StreamBlockletReader.java:116)
at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.scanBlockletAndFillVector(CarbonStreamRecordReader.java:406)
at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextColumnarBatch(CarbonStreamRecordReader.java:317)
at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextKeyValue(CarbonStreamRecordReader.java:298)
at org.apache.carbondata.spark.rdd.CarbonScanRDD$$anon$1.hasNext(CarbonScanRDD.scala:298)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace: (state=,code=0)
0: jdbc:hive2://10.18.98.34:23040> select imei from brinjal;
Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 18.0 failed 4 times, most recent failure: Lost task 5.3 in stage 18.0 (TID 136, BLR1000014307, executor 13): java.io.EOFException
at org.apache.carbondata.hadoop.streaming.StreamBlockletReader.readBytesFromStream(StreamBlockletReader.java:182)
at org.apache.carbondata.hadoop.streaming.StreamBlockletReader.readBlockletData(StreamBlockletReader.java:116)
at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.scanBlockletAndFillVector(CarbonStreamRecordReader.java:406)
at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextColumnarBatch(CarbonStreamRecordReader.java:317)
at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextKeyValue(CarbonStreamRecordReader.java:298)
at org.apache.carbondata.spark.rdd.CarbonScanRDD$$anon$1.hasNext(CarbonScanRDD.scala:298)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace: (state=,code=0)
0: jdbc:hive2://10.18.98.34:23040> select count from brinjal;
-----------+
count(1) |
-----------+
11916 |
-----------+
1 row selected (0.545 seconds)
Expected : The behavior of select * and select column should be consistent with the select count behavior for streaming query. when .streaming file is deleted when streaming is in progress.
Attachments
Issue Links
- links to