Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Won't Fix
    • Affects Version/s: 2.0.2
    • Fix Version/s: None
    • Component/s: Input/Output, PySpark
    • Labels:
      None
    • Environment:

      Ubuntu 14.04 LTS on ARM 7.1

      Description

      Note - Updated the ticket title to be reflective of what was found to be the underlying issue

      When I create a data frame in PySpark with a small row count (less than number executors), then write it to a parquet file, then load that parquet file into a new data frame, and finally do any sort of read against the loaded new data frame, Spark fails with an ExecutorLostFailure.

      Example code to replicate this issue:

      from pyspark.sql.types import *
      
      rdd = sc.parallelize([('row1',1,4.33,'name'),('row2',2,3.14,'string')])
      my_schema = StructType([
          StructField("id", StringType(), True),
          StructField("value1", IntegerType(), True),
          StructField("value2", DoubleType(), True),
          StructField("name",StringType(), True)
      ])
      df = spark.createDataFrame( rdd, schema=my_schema)
      df.write.parquet('hdfs://master:9000/user/michael/test_data',mode='overwrite')
      
      newdf = spark.read.parquet('hdfs://master:9000/user/michael/test_data/')
      newdf.take(1)
      

      The error I get when the take step runs is:

      ---------------------------------------------------------------------------
      Py4JJavaError                             Traceback (most recent call last)
      <ipython-input-2-a3aa06c0c511> in <module>()
            1 newdf = spark.read.parquet('hdfs://master:9000/user/michael/test_data/')
      ----> 2 newdf.take(1)
      
      /usr/local/spark/python/pyspark/sql/dataframe.py in take(self, num)
          346         [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
          347         """
      --> 348         return self.limit(num).collect()
          349 
          350     @since(1.3)
      
      /usr/local/spark/python/pyspark/sql/dataframe.py in collect(self)
          308         """
          309         with SCCallSiteSync(self._sc) as css:
      --> 310             port = self._jdf.collectToPython()
          311         return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
          312 
      
      /usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py in __call__(self, *args)
         1131         answer = self.gateway_client.send_command(command)
         1132         return_value = get_return_value(
      -> 1133             answer, self.gateway_client, self.target_id, self.name)
         1134 
         1135         for temp_arg in temp_args:
      
      /usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
           61     def deco(*a, **kw):
           62         try:
      ---> 63             return f(*a, **kw)
           64         except py4j.protocol.Py4JJavaError as e:
           65             s = e.java_exception.toString()
      
      /usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
          317                 raise Py4JJavaError(
          318                     "An error occurred while calling {0}{1}{2}.\n".
      --> 319                     format(target_id, ".", name), value)
          320             else:
          321                 raise Py4JError(
      
      Py4JJavaError: An error occurred while calling o54.collectToPython.
      : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 6, 10.10.10.4): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
      Driver stacktrace:
      	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
      	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
      	at scala.Option.foreach(Option.scala:257)
      	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
      	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
      	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
      	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
      	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
      	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
      	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
      	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)
      	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347)
      	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
      	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2526)
      	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2523)
      	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2523)
      	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
      	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
      	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2523)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
      	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
      	at py4j.Gateway.invoke(Gateway.java:280)
      	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      	at py4j.commands.CallCommand.execute(CallCommand.java:79)
      	at py4j.GatewayConnection.run(GatewayConnection.java:214)
      	at java.lang.Thread.run(Thread.java:745)
      

      The stdout logs of a failed executor contains:

      #
      # A fatal error has been detected by the Java Runtime Environment:
      #
      #  SIGBUS (0x7) at pc=0xb68f92e0, pid=1424, tid=0x612ae460
      #
      # JRE version: Java(TM) SE Runtime Environment (8.0_101-b13) (build 1.8.0_101-b13)
      # Java VM: Java HotSpot(TM) Client VM (25.101-b13 mixed mode linux-arm )
      # Problematic frame:
      # V  [libjvm.so+0x4e72e0]  Unsafe_GetDouble+0x6c
      #
      # Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
      #
      # An error report file with more information is saved as:
      # /opt/spark-2.0.2-bin-hadoop2.7/work/app-20161211093349-0000/3/hs_err_pid1424.log
      

      While the stderr of a failed executor is:

      Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
      16/12/11 09:33:51 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 1424@slave2
      16/12/11 09:33:51 INFO SignalUtils: Registered signal handler for TERM
      16/12/11 09:33:51 INFO SignalUtils: Registered signal handler for HUP
      16/12/11 09:33:51 INFO SignalUtils: Registered signal handler for INT
      16/12/11 09:33:54 INFO SecurityManager: Changing view acls to: hduser
      16/12/11 09:33:54 INFO SecurityManager: Changing modify acls to: hduser
      16/12/11 09:33:54 INFO SecurityManager: Changing view acls groups to: 
      16/12/11 09:33:54 INFO SecurityManager: Changing modify acls groups to: 
      16/12/11 09:33:54 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(hduser); groups with view permissions: Set(); users  with modify permissions: Set(hduser); groups with modify permissions: Set()
      16/12/11 09:33:55 INFO TransportClientFactory: Successfully created connection to /10.10.10.1:44389 after 342 ms (0 ms spent in bootstraps)
      16/12/11 09:33:57 INFO SecurityManager: Changing view acls to: hduser
      16/12/11 09:33:57 INFO SecurityManager: Changing modify acls to: hduser
      16/12/11 09:33:57 INFO SecurityManager: Changing view acls groups to: 
      16/12/11 09:33:57 INFO SecurityManager: Changing modify acls groups to: 
      16/12/11 09:33:57 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(hduser); groups with view permissions: Set(); users  with modify permissions: Set(hduser); groups with modify permissions: Set()
      16/12/11 09:33:57 INFO TransportClientFactory: Successfully created connection to /10.10.10.1:44389 after 15 ms (0 ms spent in bootstraps)
      16/12/11 09:33:58 INFO DiskBlockManager: Created local directory at /data/spark/spark-161cf7dc-377b-4f40-94d9-b1928f124966/executor-517734a6-11d3-4ad1-94a0-cf5642a0ff22/blockmgr-dbef9ae3-3249-4455-8eec-3dae57798c8c
      16/12/11 09:33:58 INFO MemoryStore: MemoryStore started with capacity 516.0 MB
      16/12/11 09:33:58 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@10.10.10.1:44389
      16/12/11 09:33:58 INFO WorkerWatcher: Connecting to worker spark://Worker@10.10.10.3:45672
      16/12/11 09:33:58 INFO TransportClientFactory: Successfully created connection to /10.10.10.3:45672 after 9 ms (0 ms spent in bootstraps)
      16/12/11 09:33:59 INFO WorkerWatcher: Successfully connected to spark://Worker@10.10.10.3:45672
      16/12/11 09:33:59 INFO CoarseGrainedExecutorBackend: Successfully registered with driver
      16/12/11 09:33:59 INFO Executor: Starting executor ID 3 on host 10.10.10.3
      16/12/11 09:33:59 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43844.
      16/12/11 09:33:59 INFO NettyBlockTransferService: Server created on 10.10.10.3:43844
      16/12/11 09:33:59 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(3, 10.10.10.3, 43844)
      16/12/11 09:33:59 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(3, 10.10.10.3, 43844)
      16/12/11 09:34:44 INFO CoarseGrainedExecutorBackend: Got assigned task 2
      16/12/11 09:34:44 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
      16/12/11 09:34:45 INFO TorrentBroadcast: Started reading broadcast variable 1
      16/12/11 09:34:45 INFO TransportClientFactory: Successfully created connection to /10.10.10.1:37106 after 5 ms (0 ms spent in bootstraps)
      16/12/11 09:34:45 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 25.8 KB, free 516.0 MB)
      16/12/11 09:34:46 INFO TorrentBroadcast: Reading broadcast variable 1 took 543 ms
      16/12/11 09:34:46 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
      16/12/11 09:34:46 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 71.4 KB, free 515.9 MB)
      SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
      SLF4J: Defaulting to no-operation (NOP) logger implementation
      SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
      16/12/11 09:34:50 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 2135 bytes result sent to driver
      16/12/11 09:35:03 INFO CoarseGrainedExecutorBackend: Got assigned task 4
      16/12/11 09:35:03 INFO Executor: Running task 0.1 in stage 2.0 (TID 4)
      16/12/11 09:35:03 INFO TorrentBroadcast: Started reading broadcast variable 3
      16/12/11 09:35:03 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 4.4 KB, free 516.0 MB)
      16/12/11 09:35:03 INFO TorrentBroadcast: Reading broadcast variable 3 took 102 ms
      16/12/11 09:35:03 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 9.0 KB, free 516.0 MB)
      16/12/11 09:35:05 INFO CodeGenerator: Code generated in 958.630042 ms
      16/12/11 09:35:05 INFO FileScanRDD: Reading File path: hdfs://master:9000/user/michael/test_data/part-r-00001-b802e900-dfaa-4fb7-aa2f-fb07d122d033.snappy.parquet, range: 0-889, partition values: [empty row]
      16/12/11 09:35:05 INFO TorrentBroadcast: Started reading broadcast variable 2
      16/12/11 09:35:05 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 24.9 KB, free 516.0 MB)
      16/12/11 09:35:05 INFO TorrentBroadcast: Reading broadcast variable 2 took 57 ms
      16/12/11 09:35:05 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 349.5 KB, free 515.6 MB)
      16/12/11 09:35:05 INFO CodecPool: Got brand-new decompressor [.snappy]
      

      I have tested this against HDFS 2.7 and QFS 1.2 on an ARM v7.1 based cluster. Both have the same results. Note I have verified this issue doesn't express on x86 platforms. The java version installed is Oracle's 1.8.0_101.

      I generally discovered this when processing larger files that have individual parquet part files with a single row in them. The same problem manifested then.

        Issue Links

          Activity

          Hide
          kamprath Michael Kamprath added a comment -

          Per the discussion in this ticket's associated pull request, this issue will not be resolved for the core Spark project because it does not affect the platforms targeted by the Spark project.

          For those who are running Spark on other platforms, such as 32-bit ARM, I have created a patch to the Spark project which ensures that direct memory manipulation of double types are done at 8-byte aligned addresses. The patch is available here.

          Show
          kamprath Michael Kamprath added a comment - Per the discussion in this ticket's associated pull request, this issue will not be resolved for the core Spark project because it does not affect the platforms targeted by the Spark project. For those who are running Spark on other platforms, such as 32-bit ARM, I have created a patch to the Spark project which ensures that direct memory manipulation of double types are done at 8-byte aligned addresses. The patch is available here .
          Hide
          apachespark Apache Spark added a comment -

          User 'michaelkamprath' has created a pull request for this issue:
          https://github.com/apache/spark/pull/16403

          Show
          apachespark Apache Spark added a comment - User 'michaelkamprath' has created a pull request for this issue: https://github.com/apache/spark/pull/16403
          Hide
          kamprath Michael Kamprath added a comment -

          I was able to fix this issue by changing org.apache.spark.unsafe.Platform.getDouble() to the following:

            public static double getDouble(Object object, long offset) {
              byte[] dblBytes = new byte[8];
              Platform.copyMemory(object, offset, dblBytes, 0, 8);
              return _UNSAFE.getDouble(dblBytes, 0l);
            }
          

          I will look into formalizing this change, more thoroughly testing, and issuing a pull request. Any advice on a better approach would be appreciated.

          Show
          kamprath Michael Kamprath added a comment - I was able to fix this issue by changing org.apache.spark.unsafe.Platform.getDouble() to the following: public static double getDouble( Object object, long offset) { byte [] dblBytes = new byte [8]; Platform.copyMemory(object, offset, dblBytes, 0, 8); return _UNSAFE.getDouble(dblBytes, 0l); } I will look into formalizing this change, more thoroughly testing, and issuing a pull request. Any advice on a better approach would be appreciated.
          Hide
          kamprath Michael Kamprath added a comment -

          One more note, this issue only arises when doubles are in the Parquet file. This code runs just fine in the ARM71 environment:

          from pyspark.sql.types import *
          rdd2 = sc.parallelize([('row3',1,5,'name'),('row4',2,6,'string')])
          my_schema2 = StructType([
              StructField("id", StringType(), True),
              StructField("value1", IntegerType(), True),
              StructField("value2", IntegerType(), True),
              StructField("name",StringType(), True)
          ])
          df2 = spark.createDataFrame( rdd2, schema=my_schema2)
          df2.coalesce(1).write.parquet('hdfs://master:9000/user/michael/test_data2',mode='overwrite')
          
          newdf2 = spark.read.parquet('hdfs://master:9000/user/michael/test_data2/')
          newdf2.take(1)
          

          ARM71 requires doubles to be 8-byte aligned. So this is the first time I am digging into the Spark code ... is SPARK-16962 a similar issue? I see that issue didn't address double alignment.

          Show
          kamprath Michael Kamprath added a comment - One more note, this issue only arises when doubles are in the Parquet file. This code runs just fine in the ARM71 environment: from pyspark.sql.types import * rdd2 = sc.parallelize([('row3',1,5,'name'),('row4',2,6,'string')]) my_schema2 = StructType([ StructField( "id" , StringType(), True), StructField( "value1" , IntegerType(), True), StructField( "value2" , IntegerType(), True), StructField( "name" ,StringType(), True) ]) df2 = spark.createDataFrame( rdd2, schema=my_schema2) df2.coalesce(1).write.parquet('hdfs: //master:9000/user/michael/test_data2',mode='overwrite') newdf2 = spark.read.parquet('hdfs: //master:9000/user/michael/test_data2/') newdf2.take(1) ARM71 requires doubles to be 8-byte aligned. So this is the first time I am digging into the Spark code ... is SPARK-16962 a similar issue? I see that issue didn't address double alignment.
          Hide
          kamprath Michael Kamprath added a comment -

          Possibly. I can dump the file created using parquet-tools on the ARM machines using the same java installation. I am assuming that this at least rules out the JVM, but not necessarily the parquet lib because I am using the latest snapshot of parquet to do the dump (which might not be the same as in spark 2.0.2). The fact that this problem arises with both HDFS and QFS as the file system rules out the file system itself, though not necessarily the spark interface to it.

          If this is not enough, I'll see what I can do to isolate it more.

          Show
          kamprath Michael Kamprath added a comment - Possibly. I can dump the file created using parquet-tools on the ARM machines using the same java installation. I am assuming that this at least rules out the JVM, but not necessarily the parquet lib because I am using the latest snapshot of parquet to do the dump (which might not be the same as in spark 2.0.2). The fact that this problem arises with both HDFS and QFS as the file system rules out the file system itself, though not necessarily the spark interface to it. If this is not enough, I'll see what I can do to isolate it more.
          Hide
          srowen Sean Owen added a comment -

          Surely, this is specific to ARM if it doesn't occur on x86? I doubt it has anything to do with Parquet per se.
          I have no particular reason to believe ARM doesn't work, but also doubt it's been tested or is supported.
          This still just contains the driver stack trace, which says "something went wrong over there". It's not even clear the failure is from Spark.

          Show
          srowen Sean Owen added a comment - Surely, this is specific to ARM if it doesn't occur on x86? I doubt it has anything to do with Parquet per se. I have no particular reason to believe ARM doesn't work, but also doubt it's been tested or is supported. This still just contains the driver stack trace, which says "something went wrong over there". It's not even clear the failure is from Spark.
          Hide
          kamprath Michael Kamprath added a comment - - edited

          Sure. I updated the description above.

          Show
          kamprath Michael Kamprath added a comment - - edited Sure. I updated the description above.
          Hide
          srowen Sean Owen added a comment -

          This doesn't say anything about the underlying error though. Without that I think this would have to be closed as unactionable. Any more detail?

          Show
          srowen Sean Owen added a comment - This doesn't say anything about the underlying error though. Without that I think this would have to be closed as unactionable. Any more detail?

            People

            • Assignee:
              Unassigned
              Reporter:
              kamprath Michael Kamprath
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development