Uploaded image for project: 'CarbonData'
  1. CarbonData
  2. CARBONDATA-4276

writestream fail when csv is copied to readstream hdfs path in Spark 2.4.5

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 2.2.0
    • None
    • data-load
    • None
    • Spark 2.4.5

    Description

      With Carbon 2.2.0 Spark 2.4.5 cluster

      steps :

      In hdfs execute following command :

       cd /opt/HA/C10/install/hadoop/datanode/bin/
      ./hdfs dfs -rm -r /tmp/stream_test/checkpoint_all_data
      ./hdfs dfs -mkdir -p /tmp/stream_test/{checkpoint_all_data,bad_records_all_data}
      ./hdfs dfs -mkdir -p /Priyesh/streaming/csv/
      ./hdfs dfs -cp /chetan/100_olap_C20.csv /Priyesh/streaming/csv/

      ./hdfs dfs -cp /Priyesh/streaming/csv/100_olap_C20.csv /Priyesh/streaming/csv/100_olap_C21.csv

       

      From Spark-beeline /Spark-sql /Spark-shell, execute :

      DROP TABLE IF EXISTS all_datatypes_2048;
      create table all_datatypes_2048 (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) stored as carbondata TBLPROPERTIES('table_blocksize'='2048','streaming'='true', 'sort_columns'='imei');

       

      From Spark-shell ,execute :

      import org.apache.spark.sql.streaming._
      import org.apache.spark.sql.streaming.Trigger.ProcessingTime

      val df_j=spark.readStream.text("hdfs://hacluster/Priyesh/streaming/csv/*.csv")

      df_j.writeStream.format("carbondata").option("dbName","ranjan").option("carbon.stream.parser","org.apache.carbondata.streaming.parser.CSVStreamParserImp").option("checkpointLocation", "hdfs://hacluster/tmp/stream_test/checkpoint_all_data").option("bad_records_action","hdfs://hacluster/tmp/stream_test/bad_records_all_data").option("tableName","all_datatypes_2048").trigger(ProcessingTime(6000)).option("carbon.streaming.auto.handoff.enabled","true").option("carbon.streaming.segment.max.size",102400).start

      show segments for table all_datatypes_2048;

       

      issue 1 :

      when  copy csv file in hdfs folder for 1st time after streaming started ,writestream fails with error:

      scala> df_j.writeStream.format("carbondata").option("dbName","ranjan").option("carbon.stream.parser","org.apache.carbondata.streaming.parser.CSVStreamParserImp").option("checkpointLocation", "hdfs://hacluster/tmp/stream_test/checkpoint_all_data").option("bad_records_action","hdfs://hacluster/tmp/stream_test/bad_records_all_data").option("tableName","all_datatypes_2048").trigger(ProcessingTime(6000)).option("carbon.streaming.auto.handoff.enabled","true").option("carbon.streaming.segment.max.size",102400).start
      21/08/26 12:53:11 WARN CarbonProperties: The enable mv value "null" is invalid. Using the default value "true"
      21/08/26 12:53:11 WARN CarbonProperties: The value "LOCALLOCK" configured for key carbon.lock.type is invalid for current file system. Use the default value HDFSLOCK instead.
      21/08/26 12:53:12 WARN HiveConf: HiveConf of name hive.metastore.rdb.password.decode.enable does not exist
      21/08/26 12:53:12 WARN HiveConf: HiveConf of name hive.metastore.db.ssl.enabled does not exist
      21/08/26 12:53:13 WARN HiveConf: HiveConf of name hive.metastore.rdb.password.decode.enable does not exist
      21/08/26 12:53:13 WARN HiveConf: HiveConf of name hive.metastore.db.ssl.enabled does not exist
      21/08/26 12:53:14 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
      res0: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@ad038f8

      scala> 21/08/26 13:00:49 WARN DFSClient: DataStreamer Exception
      java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK], DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK]], original=[DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK], DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK]]). The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.
      at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.findNewDatanode(DFSOutputStream.java:925)
      at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:988)
      at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1156)
      at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:454)
      21/08/26 13:00:49 ERROR CarbonUtil: Error while closing stream:java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK], DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK]], original=[DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK], DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK]]). The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.

      issue 2 :

      when  copy csv file in hdfs folder for 2nd time after streaming started ,writestreaming fails with :

       

      21/08/26 13:01:36 ERROR StreamSegment: Failed to append batch data to stream segment: hdfs://hacluster/user/sparkhive/warehouse/ranjan.db/all_datatypes_2048/Fact/Part0/Segment_0
      org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to APPEND_FILE /user/sparkhive/warehouse/ranjan.db/all_datatypes_2048/Fact/Part0/Segment_0/part-0-0_batchno0-0-0-0.snappy.carbondata for DFSClient_NONMAPREDUCE_260546362_1 on 7.187.185.158 because DFSClient_NONMAPREDUCE_260546362_1 is already the current lease holder.
      at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2540)
      at org.apache.hadoop.hdfs.server.namenode.FSDirAppendOp.appendFile(FSDirAppendOp.java:124)
      at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2624)
      at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:805)
      at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:487)
      at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
      at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
      at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
      at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:872)
      at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:818)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:422)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
      at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2678)

      at org.apache.hadoop.ipc.Client.call(Client.java:1475)
      at org.apache.hadoop.ipc.Client.call(Client.java:1412)
      at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
      at com.sun.proxy.$Proxy17.append(Unknown Source)
      at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:328)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:497)
      at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
      at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
      at com.sun.proxy.$Proxy18.append(Unknown Source)
      at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1808)
      at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877)
      at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847)
      at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:340)
      at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:336)
      at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
      at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:348)
      at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:318)
      at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1166)
      at org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.getDataOutputStreamUsingAppend(AbstractDFSCarbonFile.java:440)
      at org.apache.carbondata.core.datastore.impl.FileFactory.getDataOutputStreamUsingAppend(FileFactory.java:348)
      at org.apache.carbondata.streaming.CarbonStreamRecordWriter.initializeAtFirstRow(CarbonStreamRecordWriter.java:176)
      at org.apache.carbondata.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:210)
      at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:278)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:349)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338)
      at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:351)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:271)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:270)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
      at org.apache.spark.scheduler.Task.run(Task.scala:123)
      at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
      21/08/26 13:01:36 ERROR Utils: Aborting task
      org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to APPEND_FILE /user/sparkhive/warehouse/ranjan.db/all_datatypes_2048/Fact/Part0/Segment_0/part-0-0_batchno0-0-0-0.snappy.carbondata for DFSClient_NONMAPREDUCE_260546362_1 on 7.187.185.158 because DFSClient_NONMAPREDUCE_260546362_1 is already the current lease holder.
      at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2540)
      at org.apache.hadoop.hdfs.server.namenode.FSDirAppendOp.appendFile(FSDirAppendOp.java:124)
      at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2624)
      at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:805)
      at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:487)
      at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
      at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
      at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
      at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:872)
      at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:818)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:422)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
      at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2678)

      at org.apache.hadoop.ipc.Client.call(Client.java:1475)
      at org.apache.hadoop.ipc.Client.call(Client.java:1412)
      at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
      at com.sun.proxy.$Proxy17.append(Unknown Source)
      at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:328)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:497)
      at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
      at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
      at com.sun.proxy.$Proxy18.append(Unknown Source)
      at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1808)
      at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877)
      at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847)
      at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:340)
      at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:336)
      at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
      at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:348)
      at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:318)
      at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1166)
      at org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.getDataOutputStreamUsingAppend(AbstractDFSCarbonFile.java:440)
      at org.apache.carbondata.core.datastore.impl.FileFactory.getDataOutputStreamUsingAppend(FileFactory.java:348)
      at org.apache.carbondata.streaming.CarbonStreamRecordWriter.initializeAtFirstRow(CarbonStreamRecordWriter.java:176)
      at org.apache.carbondata.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:210)
      at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:278)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:349)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338)
      at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:351)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:271)
      at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:270)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
      at org.apache.spark.scheduler.Task.run(Task.scala:123)
      at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

      Attachments

        Activity

          People

            Unassigned Unassigned
            pwx944901 PRIYESH RANJAN
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: