Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-29114

Dataset<Row>.coalesce(10) throw ChunkFetchFailureException when original Dataset partition size is big

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.3.0
    • Fix Version/s: None
    • Component/s: Block Manager, Spark Core
    • Labels:
      None

      Description

      Updated time:15/Nov/19

      We saw this blog solved our confusion.

      http://www.russellspitzer.com/2018/05/10/SparkPartitions/

      --------------------------------------------------------

      Updated time:15/Nov/19

      I discussed this issue with my colleagues today. We think that spark has caused cross-border problems in the process of doing shuffle.

      The problem may be in the Sort-based Shuffle stage. When the map task partition is too large, and the storage of the writerIndex variable uses int, writerIndex may cause cross-border problems. If this is the case, the variable writerIndex replaces int with long should solve the current problem.

      --------------------------------------------------------

      I create a Dataset<Row> df with 200 partitions. I applied for 100 executors for my task. Each executor with 1 core, and driver memory is 8G executor is 16G. I use df.cache() before df.coalesce(10). When Dataset<Row> partition size is small, the program works well. But when I increase the size of the Dataset<Row> partition , the function df.coalesce(10) will throw ChunkFetchFailureException.

      19/09/17 08:26:44 INFO CoarseGrainedExecutorBackend: Got assigned task 210
      19/09/17 08:26:44 INFO Executor: Running task 0.0 in stage 3.0 (TID 210)
      19/09/17 08:26:44 INFO MapOutputTrackerWorker: Updating epoch to 1 and clearing cache
      19/09/17 08:26:44 INFO TorrentBroadcast: Started reading broadcast variable 1003
      19/09/17 08:26:44 INFO MemoryStore: Block broadcast_1003_piece0 stored as bytes in memory (estimated size 49.4 KB, free 3.8 GB)
      19/09/17 08:26:44 INFO TorrentBroadcast: Reading broadcast variable 1003 took 7 ms
      19/09/17 08:26:44 INFO MemoryStore: Block broadcast_1003 stored as values in memory (estimated size 154.5 KB, free 3.8 GB)
      19/09/17 08:26:44 INFO BlockManager: Found block rdd_1005_0 locally
      19/09/17 08:26:44 INFO BlockManager: Found block rdd_1005_1 locally
      19/09/17 08:26:44 INFO TransportClientFactory: Successfully created connection to /100.76.29.130:54238 after 1 ms (0 ms spent in bootstraps)
      19/09/17 08:26:46 ERROR RetryingBlockFetcher: Failed to fetch block rdd_1005_18, and will not retry (0 retries)
      org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId{streamId=69368607002, chunkIndex=0}: readerIndex: 0, writerIndex: -2137154997 (expected: 0 <= readerIndex <= writerIndex <= capacity(-2137154997))
      at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182)
      at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
      at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
      at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
      at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
      at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
      at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
      at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
      at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
      at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
      at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
      at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
      at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
      at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399)
      at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371)
      at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
      at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
      at java.lang.Thread.run(Thread.java:745)
      19/09/17 08:26:46 WARN BlockManager: Failed to fetch block after 1 fetch failures. Most recent failure cause:
      org.apache.spark.SparkException: Exception thrown in awaitResult:
      at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
      at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:115)
      at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:691)
      at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:634)
      at org.apache.spark.storage.BlockManager.get(BlockManager.scala:747)
      at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:802)
      at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
      at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100)
      at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99)
      at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
      at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
      at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
      at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
      at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
      at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
      at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
      at org.apache.spark.scheduler.Task.run(Task.scala:109)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId{streamId=69368607002, chunkIndex=0}: readerIndex: 0, writerIndex: -2137154997 (expected: 0 <= readerIndex <= writerIndex <= capacity(-2137154997))
      at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182)
      at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
      at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
      at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
      at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
      at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
      at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
      at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
      at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
      at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
      at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
      at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
      at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
      at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399)
      at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371)
      at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
      at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
      ... 1 more
      19/09/17 08:26:46 INFO NewHadoopRDD: Input split: 9.46.2.233:0935a68ad8000000,0935d7fb180999FF
      19/09/17 08:26:46 INFO TorrentBroadcast: Started reading broadcast variable 93
      19/09/17 08:26:46 INFO MemoryStore: Block broadcast_93_piece0 stored as bytes in memory (estimated size 32.5 KB, free 3.8 GB)
      19/09/17 08:26:46 INFO TorrentBroadcast: Reading broadcast variable 93 took 8 ms
      19/09/17 08:26:47 INFO MemoryStore: Block broadcast_93 stored as values in memory (estimated size 372.0 KB, free 3.8 GB)
      19/09/17 08:26:47 INFO RecoverableZooKeeper: Process identifier=hconnection-0x1aa852f0 connecting to ZooKeeper ensemble=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181
      19/09/17 08:26:47 INFO ZooKeeper: Initiating client connection, connectString=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181 sessionTimeout=90000 watcher=hconnection-0x1aa852f0, quorum=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181, baseZNode=/hbase-qq-mp-ss-slave
      19/09/17 08:26:47 INFO ClientCnxn: Opening socket connection to server 10.254.82.84/10.254.82.84:2181. Will not attempt to authenticate using SASL (unknown error)
      19/09/17 08:26:47 INFO ClientCnxn: Socket connection established to 10.254.82.84/10.254.82.84:2181, initiating session
      19/09/17 08:26:47 INFO ClientCnxn: Session establishment complete on server 10.254.82.84/10.254.82.84:2181, sessionid = 0x36c7f371e67307f, negotiated timeout = 90000
      19/09/17 08:26:47 INFO TableInputFormatBase: Input split length: 19.8 G bytes.
      19/09/17 08:41:37 INFO HConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x36c7f371e67307f
      19/09/17 08:41:38 INFO ZooKeeper: Session: 0x36c7f371e67307f closed
      19/09/17 08:41:38 INFO ClientCnxn: EventThread shut down
      19/09/17 08:41:38 INFO MemoryStore: Block rdd_1005_18 stored as values in memory (estimated size 1822.9 MB, free 2025.1 MB)
      19/09/17 08:41:38 INFO TransportClientFactory: Successfully created connection to /9.10.29.145:37002 after 0 ms (0 ms spent in bootstraps)
      19/09/17 08:41:39 ERROR RetryingBlockFetcher: Failed to fetch block rdd_1005_32, and will not retry (0 retries)
      org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId{streamId=1800856515000, chunkIndex=0}: readerIndex: 0, writerIndex: -2138342822 (expected: 0 <= readerIndex <= writerIndex <= capacity(-2138342822))

       

      Let me explain more experimental details.

      When each partition size is small:

      • Storage Level: Memory Deserialized 1x Replicated
      • Cached Partitions: 200
      • Total Partitions: 200
      • Memory Size: 356.2 GB
      • Disk Size: 0.0 B

        Data Distribution on 100 Executors

       

      The log of the successful task is as follows:

      19/09/17 09:33:17 INFO CoarseGrainedExecutorBackend: Got assigned task 211
      19/09/17 09:33:17 INFO Executor: Running task 3.0 in stage 3.0 (TID 211)
      19/09/17 09:33:17 INFO MapOutputTrackerWorker: Updating epoch to 1 and clearing cache
      19/09/17 09:33:17 INFO TorrentBroadcast: Started reading broadcast variable 1003
      19/09/17 09:33:17 INFO TransportClientFactory: Successfully created connection to /9.10.19.210:51072 after 8 ms (0 ms spent in bootstraps)
      19/09/17 09:33:17 INFO MemoryStore: Block broadcast_1003_piece0 stored as bytes in memory (estimated size 49.4 KB, free 5.5 GB)
      19/09/17 09:33:17 INFO TorrentBroadcast: Reading broadcast variable 1003 took 36 ms
      19/09/17 09:33:18 INFO MemoryStore: Block broadcast_1003 stored as values in memory (estimated size 154.5 KB, free 5.5 GB)
      19/09/17 09:33:18 INFO BlockManager: Found block rdd_1005_6 locally
      19/09/17 09:33:18 INFO TransportClientFactory: Successfully created connection to /100.76.35.94:37220 after 1 ms (0 ms spent in bootstraps)
      19/09/17 09:33:36 INFO BlockManager: Found block rdd_1005_33 remotely
      19/09/17 09:33:37 INFO TransportClientFactory: Successfully created connection to /100.76.35.94:32935 after 10 ms (0 ms spent in bootstraps)
      19/09/17 09:33:50 INFO BlockManager: Found block rdd_1005_40 remotely
      19/09/17 09:33:52 INFO TransportClientFactory: Successfully created connection to /100.76.25.87:45875 after 2 ms (0 ms spent in bootstraps)
      19/09/17 09:34:06 INFO BlockManager: Found block rdd_1005_46 remotely
      19/09/17 09:34:08 INFO TransportClientFactory: Successfully created connection to /9.10.36.96:35134 after 32 ms (0 ms spent in bootstraps)
      19/09/17 09:34:18 INFO BlockManager: Found block rdd_1005_48 remotely
      19/09/17 09:34:20 INFO TransportClientFactory: Successfully created connection to /9.47.25.185:47504 after 1 ms (0 ms spent in bootstraps)
      19/09/17 09:34:42 INFO BlockManager: Found block rdd_1005_49 remotely
      19/09/17 09:34:44 INFO TransportClientFactory: Successfully created connection to /100.76.33.91:35365 after 1 ms (0 ms spent in bootstraps)
      19/09/17 09:34:59 INFO BlockManager: Found block rdd_1005_51 remotely
      19/09/17 09:35:01 INFO TransportClientFactory: Successfully created connection to /9.10.7.26:49383 after 3 ms (0 ms spent in bootstraps)
      19/09/17 09:35:16 INFO BlockManager: Found block rdd_1005_71 remotely
      19/09/17 09:35:18 INFO TransportClientFactory: Successfully created connection to /100.76.72.246:51684 after 2 ms (0 ms spent in bootstraps)
      19/09/17 09:35:28 INFO BlockManager: Found block rdd_1005_75 remotely
      19/09/17 09:35:30 INFO TransportClientFactory: Successfully created connection to /9.47.30.46:51291 after 1 ms (0 ms spent in bootstraps)
      19/09/17 09:35:45 INFO BlockManager: Found block rdd_1005_98 remotely
      19/09/17 09:35:47 INFO TransportClientFactory: Successfully created connection to /9.10.137.17:56554 after 2 ms (0 ms spent in bootstraps)
      19/09/17 09:36:00 INFO BlockManager: Found block rdd_1005_116 remotely
      19/09/17 09:36:02 INFO TransportClientFactory: Successfully created connection to /100.76.35.94:58951 after 2 ms (0 ms spent in bootstraps)
      19/09/17 09:36:16 INFO BlockManager: Found block rdd_1005_121 remotely
      19/09/17 09:36:19 INFO TransportClientFactory: Successfully created connection to /9.10.36.96:50992 after 1 ms (0 ms spent in bootstraps)
      19/09/17 09:36:27 INFO BlockManager: Found block rdd_1005_128 remotely
      19/09/17 09:36:39 INFO BlockManager: Found block rdd_1005_134 remotely
      19/09/17 09:36:42 INFO TransportClientFactory: Successfully created connection to /9.10.7.92:41607 after 73 ms (0 ms spent in bootstraps)
      19/09/17 09:36:54 INFO BlockManager: Found block rdd_1005_153 remotely
      19/09/17 09:37:06 INFO BlockManager: Found block rdd_1005_167 remotely
      19/09/17 09:37:08 INFO BlockManager: Found block rdd_1005_174 locally
      19/09/17 09:37:08 INFO TransportClientFactory: Successfully created connection to /9.10.29.150:43709 after 10 ms (0 ms spent in bootstraps)
      19/09/17 09:37:20 INFO BlockManager: Found block rdd_1005_182 remotely
      19/09/17 09:37:22 INFO TransportClientFactory: Successfully created connection to /9.10.8.84:55958 after 14 ms (0 ms spent in bootstraps)
      19/09/17 09:37:32 INFO BlockManager: Found block rdd_1005_189 remotely
      19/09/17 09:37:34 INFO Executor: Finished task 3.0 in stage 3.0 (TID 211). 1752 bytes result sent to driver

       

      When I increase the size of each partition:

      • Storage Level: Disk Serialized 1x Replicated
      • Cached Partitions: 200
      • Total Partitions: 200
      • Memory Size: 390.8 GB
      • Disk Size: 166.8 GB

        Data Distribution on 100 Executors

       

      In this situation, 10 executors used disk usage because df.coalesce(10) throw ChunkFetchFailureException, 10 executors just fetch data from original datasource again, and cached in new Dataset<Row>.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              maxzxwang ZhanxiongWang
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: