Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-10358

Spark-sql throws IOException on exit when using HDFS to store event log.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Duplicate
    • 1.3.1
    • None
    • SQL
    • None
      • spark-1.3.1-bin-hadoop2.6
      • hadoop-2.6.0
      • Red hat 2.6.32-504.el6.x86_64

    Description

      Summary

      In Spark 1.3.1, if using HDFS to store event log, spark-sql will throw an "java.io.IOException: Filesystem closed" when exit.

      How to reproduce

      1. Enable event log mechanism, and configure the file location to HDFS.
      You can do this by setting these two properties in spark-defaults.conf:
      spark.eventLog.enabled true
      spark.eventLog.dir hdfs://xxxxx:xxxxx/spark-events
      2. start spark-sql, and type exit once it starts.

       
      spark-sql> exit; 
      15/08/14 06:29:20 ERROR scheduler.LiveListenerBus: Listener EventLoggingListener threw an exception 
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) 
      at java.lang.reflect.Method.invoke(Method.java:597) 
      at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144) 
      at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144) 
      at scala.Option.foreach(Option.scala:236) 
      at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144) 
      at org.apache.spark.scheduler.EventLoggingListener.onApplicationEnd(EventLoggingListener.scala:181) 
      at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:54) 
      at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) 
      at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) 
      at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:53) 
      at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:36) 
      at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:76) 
      at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61) 
      at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61) 
      at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1678) 
      at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60) 
      Caused by: java.io.IOException: Filesystem closed 
      at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:795) 
      at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1985) 
      at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1946) 
      at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130) 
      ... 19 more 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null} 
      15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null} 
      15/08/14 06:29:20 INFO ui.SparkUI: Stopped Spark web UI at http://9.111.251.177:4040 
      15/08/14 06:29:20 INFO scheduler.DAGScheduler: Stopping DAGScheduler 
      15/08/14 06:29:20 INFO cluster.SparkDeploySchedulerBackend: Shutting down all executors 
      15/08/14 06:29:20 INFO cluster.SparkDeploySchedulerBackend: Asking each executor to shut down 
      Exception in thread "Thread-4" java.io.IOException: Filesystem closed 
      at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:795) 
      at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1986) 
      at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1118) 
      at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114) 
      at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) 
      at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114) 
      at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400) 
      at org.apache.spark.scheduler.EventLoggingListener.stop(EventLoggingListener.scala:198) 
      at org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1391) 
      at org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1391) 
      at scala.Option.foreach(Option.scala:236) 
      at org.apache.spark.SparkContext.stop(SparkContext.scala:1391) 
      at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.stop(SparkSQLEnv.scala:66) 
      at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$$anon$1.run(SparkSQLCLIDriver.scala:107) 
      

      Analysis

      Enabling DEBUG level, we can see more information:

       
      15/08/14 09:32:34 DEBUG SessionState: Removing resource dir /tmp/a9d3b5b8-3329-4e93-aee2-0a3496d661fa_resources 
      15/08/14 09:32:34 DEBUG SparkSQLEnv: Shutting down Spark SQL Environment 
      15/08/14 09:32:34 DEBUG DiskBlockManager: Shutdown hook called 
      15/08/14 09:32:34 DEBUG DFSClient: DFSClient writeChunk allocating new packet seqno=2, src=/spark/events/app-20150814093225-0002.snappy.inprogress, packetSize=65532, chunksPerPacket=127,bytesCurBlock=2048 
      15/08/14 09:32:34 DEBUG DFSClient: Queued packet 2 
      15/08/14 09:32:34 DEBUG DFSClient: Queued packet 3 
      15/08/14 09:32:34 DEBUG DFSClient: Waiting for ack for: 3 
      15/08/14 09:32:34 DEBUG Utils: Shutdown hook called 
      15/08/14 09:32:34 DEBUG DFSClient: DataStreamer block BP-456748518-127.0.0.1-1439547535043:blk_1073741833_1010 sending packet packet seqno:2 offsetInBlock:2048 lastPacketInBlock:false lastByteOffsetInBlock: 2399 
      15/08/14 09:32:34 INFO SparkContext: END POST APPLICATION END 
      15/08/14 09:32:34 DEBUG DFSClient: DFSClient seqno: 2 status: SUCCESS downstreamAckTimeNanos: 0 
      15/08/14 09:32:34 DEBUG DFSClient: DataStreamer block BP-456748518-127.0.0.1-1439547535043:blk_1073741833_1010 sending packet packet seqno:3 offsetInBlock:2399 lastPacketInBlock:true lastByteOffsetInBlock: 2399 
      15/08/14 09:32:34 DEBUG DFSClient: DFSClient seqno: 3 status: SUCCESS downstreamAckTimeNanos: 0 
      15/08/14 09:32:34 DEBUG AbstractLifeCycle: stopping org.spark-project.jetty.server.Server@489bb457 
      15/08/14 09:32:34 DEBUG DFSClient: Closing old block BP-456748518-127.0.0.1-1439547535043:blk_1073741833_1010 
      15/08/14 09:32:34 DEBUG AbstractLifeCycle: stopping SelectChannelConnector@0.0.0.0:4040 
      15/08/14 09:32:34 DEBUG AbstractLifeCycle: stopping org.spark-project.jetty.server.nio.SelectChannelConnector$ConnectorSelectorManager@15e3d24a 
      15/08/14 09:32:34 DEBUG nio: Stopped Thread[qtp1717362942-45 Selector0,5,main] on org.spark-project.jetty.io.nio.SelectorManager$1@25964fe8 
      15/08/14 09:32:34 DEBUG Client: IPC Client (123257783) connection to /9.111.251.177:9000 from root sending #6 
      15/08/14 09:32:34 DEBUG Client: IPC Client (123257783) connection to /9.111.251.177:9000 from root got value #6 
      15/08/14 09:32:34 DEBUG ProtobufRpcEngine: Call: complete took 4ms 
      15/08/14 09:32:34 DEBUG nio: Stopped Thread[qtp1717362942-46 Selector1,5,main] on org.spark-project.jetty.io.nio.SelectorManager$1@2f581b9f 
      15/08/14 09:32:34 DEBUG Client: stopping client from cache: org.apache.hadoop.ipc.Client@3aca5e2 
      15/08/14 09:32:34 DEBUG Client: removing client from cache: org.apache.hadoop.ipc.Client@3aca5e2 
      15/08/14 09:32:34 DEBUG Client: stopping actual client because no more references remain: org.apache.hadoop.ipc.Client@3aca5e2 
      15/08/14 09:32:34 DEBUG Client: Stopping client 
      15/08/14 09:32:34 *ERROR* LiveListenerBus: Listener EventLoggingListener threw an exception 
      java.lang.reflect.InvocationTargetException 
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) 
      at java.lang.reflect.Method.invoke(Method.java:597) 
      at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144) 
      at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144) 
      at scala.Option.foreach(Option.scala:236) 
      

      From the stacktrace, we can see a "Shutdown hook called" of org.apache.spark.util.Utils, and then the shutdown hook of HDFS where DFSClient is stopped, and finally the SparkSQLEnv.stop, where the exception was thrown.
      Clearly, Spark is trying to add new event log message after HDFS client is closed, which caused the exception.

      After checking org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver, we found that the SparkSQLEnv.stop() is invoked from a normal java runtime shutdown hook:

       
          // Clean up after we exit 
          Runtime.getRuntime.addShutdownHook( 
            new Thread() { 
              override def run() { 
                SparkSQLEnv.stop() 
              } 
            } 
          ) 
      

      This shutdown hook has lower priority, and will be executed after the DFSClient is closed.

      Solution

      We changed it by using Hadoop's ShutdownHookManager and assigning a higher priority than HDFS's shutdown hook. It fixed the problem.
      We'll link a pull request later for review.
      The link is https://github.com/apache/spark/pull/8530#issuecomment-136236733

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              Sioa Yanis Song
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: