Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22305

HDFSBackedStateStoreProvider fails with StackOverflowException when attempting to recover state

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.2.0
    • Fix Version/s: 2.3.0
    • Component/s: Structured Streaming
    • Labels:
      None

      Description

      Environment:

      Spark: 2.2.0
      Java version: 1.8.0_112
      spark.sql.streaming.minBatchesToRetain: 100

      After an application failure due to OOM exceptions, restarting the application with the existing state produces the following OOM:

      java.io.IOException: com.google.protobuf.ServiceException: java.lang.StackOverflowError
      	at org.apache.hadoop.ipc.ProtobufHelper.getRemoteException(ProtobufHelper.java:47)
      	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:260)
      	at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
      	at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source)
      	at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1240)
      	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1227)
      	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1215)
      	at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:303)
      	at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:269)
      	at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:261)
      	at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1540)
      	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
      	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
      	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
      	at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
      	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
      	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$readSnapshotFile(HDFSBackedStateStoreProvider.scala:405)
      	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296)
      	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:295)
      	at scala.Option.getOrElse(Option.scala:121)
      	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:295)
      	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:297)
      	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:296)
      	at scala.Option.getOrElse(Option.scala:121)
      	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296)
      	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:295)
      	at scala.Option.getOrElse(Option.scala:121)
      	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:295)
      	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:297)
      	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:296)
      	at scala.Option.getOrElse(Option.scala:121)
      

      There were 2 snapshot files in the state directory and 230 delta files.

        Attachments

          Activity

            People

            • Assignee:
              joseph.torres Jose Torres
              Reporter:
              Yuval.Itzchakov Yuval Itzchakov
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: