Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-2468

Netty-based block server / client module

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.2.0
    • Component/s: Shuffle, Spark Core
    • Labels:
      None
    • Target Version/s:

      Description

      Right now shuffle send goes through the block manager. This is inefficient because it requires loading a block from disk into a kernel buffer, then into a user space buffer, and then back to a kernel send buffer before it reaches the NIC. It does multiple copies of the data and context switching between kernel/user. It also creates unnecessary buffer in the JVM that increases GC

      Instead, we should use FileChannel.transferTo, which handles this in the kernel space with zero-copy. See http://www.ibm.com/developerworks/library/j-zerocopy/

      One potential solution is to use Netty. Spark already has a Netty based network module implemented (org.apache.spark.network.netty). However, it lacks some functionality and is turned off by default.

        Issue Links

        1.
        Migrate Netty network module from Java to Scala Sub-task Resolved Reynold Xin
         
        2.
        Support transferring large blocks in Netty network module Sub-task Resolved Reynold Xin
         
        3.
        Add config option to support NIO vs OIO in Netty network module Sub-task Resolved Reynold Xin
         
        4.
        Support fetching in-memory blocks for Netty network module Sub-task Resolved Reynold Xin
         
        5.
        Create config options for Netty sendBufferSize and receiveBufferSize Sub-task Resolved Reynold Xin
         
        6.
        Report error messages back from server to client Sub-task Resolved Reynold Xin
         
        7.
        Support fetching multiple blocks in a single request in Netty network module Sub-task Resolved Reynold Xin
         
        8.
        Use a single FileClient and Netty client thread pool Sub-task Resolved Reynold Xin
         
        9.
        Support SASL authentication in Netty network module Sub-task Resolved Aaron Davidson
         
        10.
        Leverage Hadoop native io's fadvise and read-ahead in Netty transferTo Sub-task Closed Unassigned
         
        11.
        Recycle ByteBufs by using PooledByteBufAllocator Sub-task Resolved Reynold Xin
         
        12.
        Maintains a connection pool and reuse clients in BlockClientFactory Sub-task Resolved Reynold Xin
         
        13.
        Client should be able to put blocks in addition to fetch blocks Sub-task Resolved Reynold Xin
         
        14.
        Implement unit/integration tests for connection failures Sub-task Resolved Reynold Xin
         
        15.
        Release all ManagedBuffers upon task completion/failure Sub-task Resolved Reynold Xin
         
        16.
        Make sure client doesn't block when server/connection has error(s) Sub-task Resolved Reynold Xin
         
        17.
        Refactor Netty module to use BlockTransferService Sub-task Resolved Reynold Xin
         
        18.
        SO_RCVBUF and SO_SNDBUF should be bootstrap childOption, not option Sub-task Resolved Reynold Xin
         
        19.
        Disable thread local cache in PooledByteBufAllocator Sub-task Resolved Reynold Xin
         

          Activity

          Hide
          zzcclp Zhichao Zhang added a comment -

          AH, Aaron Davidson, With patch #3465, I can run successful previously failed application and my configuration is the same as before. It's great.

          Show
          zzcclp Zhichao Zhang added a comment - AH, Aaron Davidson, With patch #3465, I can run successful previously failed application and my configuration is the same as before. It's great.
          Hide
          zzcclp Zhichao Zhang added a comment -

          Great. I will test it later.

          Show
          zzcclp Zhichao Zhang added a comment - Great. I will test it later.
          Hide
          rxin Reynold Xin added a comment -

          Glad that we are able to resolve this!

          Show
          rxin Reynold Xin added a comment - Glad that we are able to resolve this!
          Hide
          lianhuiwang Lianhui Wang added a comment -

          Aaron Davidson that 's very great. With patch #3465, I can run successful previously failed application and my configuration is the same as before.
          thanks.

          Show
          lianhuiwang Lianhui Wang added a comment - Aaron Davidson that 's very great. With patch #3465, I can run successful previously failed application and my configuration is the same as before. thanks.
          Hide
          ilikerps Aaron Davidson added a comment -

          Hey guys, I finally got a chance to run a more comprehensive set of tests with constrained containers. In doing so, I found a critical issue which caused us to allocate direct byte buffers proportional to the number of executors times the number of cores, rather than just proportional to the number of cores. With patch #3465, I was able to run a shuffle with Lianhui Wang's configuration of 7GB container with 6GB heap and 2 cores – prior to the patch, it exceeded the container's limits.

          If you guys get a chance, please let me know if this is sufficient to fix your issues with your initial overhead configurations. (Note that while the memory usage was greatly decreased, we still allocate a significant amount of off-heap memory, so it's possible you need to shift some of the heap to off-heap if your off-heap was previously very constrained.)

          Show
          ilikerps Aaron Davidson added a comment - Hey guys, I finally got a chance to run a more comprehensive set of tests with constrained containers. In doing so, I found a critical issue which caused us to allocate direct byte buffers proportional to the number of executors times the number of cores, rather than just proportional to the number of cores. With patch #3465 , I was able to run a shuffle with Lianhui Wang 's configuration of 7GB container with 6GB heap and 2 cores – prior to the patch, it exceeded the container's limits. If you guys get a chance, please let me know if this is sufficient to fix your issues with your initial overhead configurations. (Note that while the memory usage was greatly decreased, we still allocate a significant amount of off-heap memory, so it's possible you need to shift some of the heap to off-heap if your off-heap was previously very constrained.)
          Hide
          zzcclp Zhichao Zhang added a comment -

          Hi, Aaron Davidson, I send a email to you about shuffle data performance test. Looking forward to hear from your reply.
          Thanks.

          Show
          zzcclp Zhichao Zhang added a comment - Hi, Aaron Davidson, I send a email to you about shuffle data performance test. Looking forward to hear from your reply. Thanks.
          Hide
          ilikerps Aaron Davidson added a comment - - edited

          Here is my spark configuration for the test, 32 cores total (note that this is test-only configuration to maximize throughput, I would not recommend these settings for real workloads):

          spark.shuffle.io.clientThreads =16,
          spark.shuffle.io.serverThreads =16,
          spark.serializer = "org.apache.spark.serializer.KryoSerializer",
          spark.shuffle.blockTransferService = "netty",
          spark.shuffle.compress = false,
          spark.shuffle.io.maxRetries = 0,
          spark.reducer.maxMbInFlight = 512

          Forgot to mention, but #3155 now automatically sets spark.shuffle.io.clientThreads and spark.shuffle.io.serverThreads based on the number of cores the Executor has allotted to it. You can override it by setting those properties by hand, but ideally the default behavior is sufficient.

          Show
          ilikerps Aaron Davidson added a comment - - edited Here is my spark configuration for the test, 32 cores total (note that this is test-only configuration to maximize throughput, I would not recommend these settings for real workloads): spark.shuffle.io.clientThreads =16, spark.shuffle.io.serverThreads =16, spark.serializer = "org.apache.spark.serializer.KryoSerializer", spark.shuffle.blockTransferService = "netty", spark.shuffle.compress = false, spark.shuffle.io.maxRetries = 0, spark.reducer.maxMbInFlight = 512 Forgot to mention, but #3155 now automatically sets spark.shuffle.io.clientThreads and spark.shuffle.io.serverThreads based on the number of cores the Executor has allotted to it. You can override it by setting those properties by hand, but ideally the default behavior is sufficient.
          Hide
          zzcclp Zhichao Zhang added a comment -

          Aaron Davidson, I find that #3155 has merged into master, but spark.shuffle.io.maxUsableCores not found?

          Show
          zzcclp Zhichao Zhang added a comment - Aaron Davidson, I find that #3155 has merged into master, but spark.shuffle.io.maxUsableCores not found?
          Hide
          zzcclp Zhichao Zhang added a comment -

          Aaron Davidson, thank you for your reply. I will try it again.
          Can you describe your configuration about spark.*?

          Show
          zzcclp Zhichao Zhang added a comment - Aaron Davidson, thank you for your reply. I will try it again. Can you describe your configuration about spark.*?
          Hide
          ilikerps Aaron Davidson added a comment -

          My test was significantly less strict in its memory requirements, which may be the difference with respect to OOMs. I used two 28GB containers on different machines, with 24GB of that given to Spark's heap. Due to the networking of the containers, the maximum throughput was around 7Gb/s (combined directionally), which I was able to saturate using Netty but could only achieve around 3.5Gb/s (combined) using Nio.

          My test was a sort of 50GB generated data shuffled between the two machines. I tested the sort as a whole as well as a different version where I injected a deserializer which immediately EOFs (this causes us to still read all data but do no computation on the reducer side, maximizing network throughput).

          Here is my full test, including the no-op deserializer:

          import org.apache.spark.SparkConf
          import org.apache.spark.serializer.{Serializer, SerializerInstance, SerializationStream, DeserializationStream}
          import java.io._
          import java.nio.ByteBuffer
          import scala.reflect.ClassTag
          
          class NoOpReadSerializer(conf: SparkConf) extends Serializer with Serializable {
            override def newInstance(): SerializerInstance = {
              new NoOpReadSerializerInstance()
            }
          }
          
          class NoOpReadSerializerInstance()
            extends SerializerInstance {
          
            override def serialize[T: ClassTag](t: T): ByteBuffer = {
              val bos = new ByteArrayOutputStream()
              val out = serializeStream(bos)
              out.writeObject(t)
              out.close()
              ByteBuffer.wrap(bos.toByteArray)
            }
          
            override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
              null.asInstanceOf[T]
            }
          
            override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {
              null.asInstanceOf[T]
            }
          
            override def serializeStream(s: OutputStream): SerializationStream = {
              new NoOpSerializationStream(s, 100)
            }
          
            override def deserializeStream(s: InputStream): DeserializationStream = {
              new NoOpDeserializationStream(s, Thread.currentThread().getContextClassLoader)
            }
          
            def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = {
              new NoOpDeserializationStream(s, loader)
            }
          }
          
          class NoOpDeserializationStream(in: InputStream, loader: ClassLoader)
            extends DeserializationStream {
            def readObject[T: ClassTag](): T = throw new EOFException()
            def close() { }
          }
          
          class NoOpSerializationStream(out: OutputStream, counterReset: Int) extends SerializationStream {
            private val objOut = new ObjectOutputStream(out)
            private var counter = 0
          
            def writeObject[T: ClassTag](t: T): SerializationStream = {
              objOut.writeObject(t)
              counter += 1
              if (counterReset > 0 && counter >= counterReset) {
                objOut.reset()
                counter = 0
              }
              this
            }
          
            def flush() { objOut.flush() }
            def close() { objOut.close() }
          }
          
          
          // Test code below:
          implicit val arrayOrdering = Ordering.by((_: Array[Byte]).toIterable)
          def createSort() = sc.parallelize( 0 until 5000000, 320).map { x : Int =>
            val rand = new scala.util.Random(System.nanoTime())
            val bytes = new Array[Byte](10000)
            rand.nextBytes(bytes)
            (bytes, 1)
          }.sortByKey(true, 333)
          
          val x = createSort()
          x.count() // does shuffle + sorting on reduce side
          
          val y = createSort().asInstanceOf[org.apache.spark.rdd.ShuffledRDD[_, _, _]].setSerializer(new NoOpReadSerializer(sc.getConf))
          y.count() // does shuffle with no read-side computation (warning: causes FD leak in Spark!)
          

          Note that if you run that with less memory, you may have to tun the number of partitions or size of data to avoid invoking the ExternalSorter. I observed very little GC and no significant heap/process growth in memory after the first run.

          I will try another test where the memory is more constrained to further investigate the OOM problem.

          Show
          ilikerps Aaron Davidson added a comment - My test was significantly less strict in its memory requirements, which may be the difference with respect to OOMs. I used two 28GB containers on different machines, with 24GB of that given to Spark's heap. Due to the networking of the containers, the maximum throughput was around 7Gb/s (combined directionally), which I was able to saturate using Netty but could only achieve around 3.5Gb/s (combined) using Nio. My test was a sort of 50GB generated data shuffled between the two machines. I tested the sort as a whole as well as a different version where I injected a deserializer which immediately EOFs (this causes us to still read all data but do no computation on the reducer side, maximizing network throughput). Here is my full test, including the no-op deserializer: import org.apache.spark.SparkConf import org.apache.spark.serializer.{Serializer, SerializerInstance, SerializationStream, DeserializationStream} import java.io._ import java.nio.ByteBuffer import scala.reflect.ClassTag class NoOpReadSerializer(conf: SparkConf) extends Serializer with Serializable { override def newInstance(): SerializerInstance = { new NoOpReadSerializerInstance() } } class NoOpReadSerializerInstance() extends SerializerInstance { override def serialize[T: ClassTag](t: T): ByteBuffer = { val bos = new ByteArrayOutputStream() val out = serializeStream(bos) out.writeObject(t) out.close() ByteBuffer.wrap(bos.toByteArray) } override def deserialize[T: ClassTag](bytes: ByteBuffer): T = { null .asInstanceOf[T] } override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader ): T = { null .asInstanceOf[T] } override def serializeStream(s: OutputStream): SerializationStream = { new NoOpSerializationStream(s, 100) } override def deserializeStream(s: InputStream): DeserializationStream = { new NoOpDeserializationStream(s, Thread .currentThread().getContextClassLoader) } def deserializeStream(s: InputStream, loader: ClassLoader ): DeserializationStream = { new NoOpDeserializationStream(s, loader) } } class NoOpDeserializationStream(in: InputStream, loader: ClassLoader ) extends DeserializationStream { def readObject[T: ClassTag](): T = throw new EOFException() def close() { } } class NoOpSerializationStream(out: OutputStream, counterReset: Int) extends SerializationStream { private val objOut = new ObjectOutputStream(out) private var counter = 0 def writeObject[T: ClassTag](t: T): SerializationStream = { objOut.writeObject(t) counter += 1 if (counterReset > 0 && counter >= counterReset) { objOut.reset() counter = 0 } this } def flush() { objOut.flush() } def close() { objOut.close() } } // Test code below: implicit val arrayOrdering = Ordering.by((_: Array[ Byte ]).toIterable) def createSort() = sc.parallelize( 0 until 5000000, 320).map { x : Int => val rand = new scala.util.Random( System .nanoTime()) val bytes = new Array[ Byte ](10000) rand.nextBytes(bytes) (bytes, 1) }.sortByKey( true , 333) val x = createSort() x.count() // does shuffle + sorting on reduce side val y = createSort().asInstanceOf[org.apache.spark.rdd.ShuffledRDD[_, _, _]].setSerializer( new NoOpReadSerializer(sc.getConf)) y.count() // does shuffle with no read-side computation (warning: causes FD leak in Spark!) Note that if you run that with less memory, you may have to tun the number of partitions or size of data to avoid invoking the ExternalSorter. I observed very little GC and no significant heap/process growth in memory after the first run. I will try another test where the memory is more constrained to further investigate the OOM problem.
          Hide
          lianhuiwang Lianhui Wang added a comment -

          Aaron Davidsonyes i re-run test with preferDirectBufs=true and maxUsableCores=2 on yarn. that cause container to be killed because Container is running beyond physical memory limits.
          if i set preferDirectBufs=false, it is ok, but setting preferDirectBufs=false the Netty's performance is not good than NioBlockTransferService.
          in my test size of shuffle data is 1-2G , executor-memory=7g,executor-cores=2, spark.yarn.executor.memoryOverhead=1024

          Show
          lianhuiwang Lianhui Wang added a comment - Aaron Davidson yes i re-run test with preferDirectBufs=true and maxUsableCores=2 on yarn. that cause container to be killed because Container is running beyond physical memory limits. if i set preferDirectBufs=false, it is ok, but setting preferDirectBufs=false the Netty's performance is not good than NioBlockTransferService. in my test size of shuffle data is 1-2G , executor-memory=7g,executor-cores=2, spark.yarn.executor.memoryOverhead=1024
          Hide
          zzcclp Zhichao Zhang added a comment -

          Hi, Aaron Davidson, can you describe your test, including the environment, configuration, data volume?

          Show
          zzcclp Zhichao Zhang added a comment - Hi, Aaron Davidson, can you describe your test, including the environment, configuration, data volume?
          Hide
          zzcclp Zhichao Zhang added a comment -

          Hi, Aaron Davidson, I am sure that I ran my last test with the patch #3155 applied.
          configuration :
          spark.shuffle.consolidateFiles true
          spark.storage.memoryFraction 0.2
          spark.shuffle.memoryFraction 0.2
          spark.shuffle.file.buffer.kb 100
          spark.reducer.maxMbInFlight 48
          spark.shuffle.blockTransferService netty
          spark.shuffle.io.mode nio
          spark.shuffle.io.connectionTimeout 120
          spark.shuffle.manager SORT

          spark.shuffle.io.preferDirectBufs true
          spark.shuffle.io.maxRetries 3
          spark.shuffle.io.retryWaitMs 5000
          spark.shuffle.io.maxUsableCores 3

          command:
          --num-executors 17 --executor-memory 12g --executor-cores 3

          If spark.shuffle.io.preferDirectBufs=false, it's OK.

          Show
          zzcclp Zhichao Zhang added a comment - Hi, Aaron Davidson, I am sure that I ran my last test with the patch #3155 applied. configuration : spark.shuffle.consolidateFiles true spark.storage.memoryFraction 0.2 spark.shuffle.memoryFraction 0.2 spark.shuffle.file.buffer.kb 100 spark.reducer.maxMbInFlight 48 spark.shuffle.blockTransferService netty spark.shuffle.io.mode nio spark.shuffle.io.connectionTimeout 120 spark.shuffle.manager SORT spark.shuffle.io.preferDirectBufs true spark.shuffle.io.maxRetries 3 spark.shuffle.io.retryWaitMs 5000 spark.shuffle.io.maxUsableCores 3 command: --num-executors 17 --executor-memory 12g --executor-cores 3 If spark.shuffle.io.preferDirectBufs=false, it's OK.
          Hide
          ilikerps Aaron Davidson added a comment -

          Zhichao Zhang Ah, man, that's not good. Just to be certain, you ran your last test with the patch #3155 applied? We shouldn't be able to allocate more than 96MB of off-heap memory if so, which should be well within the 1GB you had left over between the 12GB Spark heap and 13GB YARN container.

          Lianhui Wang Were you able to re-run the test at any point? I ran a simple benchmark on ec2 and did not see any regressions from earlier, so if you're still seeing perf being worse than NIO, that suggests it may be workload-specific, making it harder to reproduce.

          Show
          ilikerps Aaron Davidson added a comment - Zhichao Zhang Ah, man, that's not good. Just to be certain, you ran your last test with the patch #3155 applied? We shouldn't be able to allocate more than 96MB of off-heap memory if so, which should be well within the 1GB you had left over between the 12GB Spark heap and 13GB YARN container. Lianhui Wang Were you able to re-run the test at any point? I ran a simple benchmark on ec2 and did not see any regressions from earlier, so if you're still seeing perf being worse than NIO, that suggests it may be workload-specific, making it harder to reproduce.
          Hide
          zzcclp Zhichao Zhang added a comment -

          @Aaron Davidson, I try with preferDirectBufs=true and maxUsableCores=3, same as executor-core, OOM still occur.

          Show
          zzcclp Zhichao Zhang added a comment - @Aaron Davidson, I try with preferDirectBufs=true and maxUsableCores=3, same as executor-core, OOM still occur.
          Hide
          zzcclp Zhichao Zhang added a comment -

          @Aaron Davidson, Thank you for the suggestion.
          The configuration and code of comparing Hadoop vs Spark performance is not shown above. It just run a wordcount on 240G snappy files and writes 500G shuffle files. Configuration is as follows:

          command "--driver-memory 10g --num-executors 17 --executor-memory 12g --executor-cores 3 --driver-library-path :/usr/local/hadoop/lib/native/ /opt/wsspark.jar 24G_10_20g_1c 1 100 hdfs://wscluster/zzc_test/in/snappy8/ 100 100 hdfs://wscluster/zzc_test/out/i007"

          configuration :
          spark.default.parallelism 204
          spark.shuffle.consolidateFiles false
          spark.shuffle.spill.compress true
          spark.shuffle.compress true
          spark.storage.memoryFraction 0.3
          spark.shuffle.memoryFraction 0.5
          spark.shuffle.file.buffer.kb 100
          spark.reducer.maxMbInFlight 48
          spark.shuffle.blockTransferService nio
          spark.shuffle.manager HASH
          spark.scheduler.mode FIFO
          spark.akka.frameSize 10
          spark.akka.timeout 100

          Show
          zzcclp Zhichao Zhang added a comment - @Aaron Davidson, Thank you for the suggestion. The configuration and code of comparing Hadoop vs Spark performance is not shown above. It just run a wordcount on 240G snappy files and writes 500G shuffle files. Configuration is as follows: command "--driver-memory 10g --num-executors 17 --executor-memory 12g --executor-cores 3 --driver-library-path :/usr/local/hadoop/lib/native/ /opt/wsspark.jar 24G_10_20g_1c 1 100 hdfs://wscluster/zzc_test/in/snappy8/ 100 100 hdfs://wscluster/zzc_test/out/i007" configuration : spark.default.parallelism 204 spark.shuffle.consolidateFiles false spark.shuffle.spill.compress true spark.shuffle.compress true spark.storage.memoryFraction 0.3 spark.shuffle.memoryFraction 0.5 spark.shuffle.file.buffer.kb 100 spark.reducer.maxMbInFlight 48 spark.shuffle.blockTransferService nio spark.shuffle.manager HASH spark.scheduler.mode FIFO spark.akka.frameSize 10 spark.akka.timeout 100
          Hide
          lianhuiwang Lianhui Wang added a comment -

          ok, thanks.Aaron Davidson i will try to do as you say.

          Show
          lianhuiwang Lianhui Wang added a comment - ok, thanks. Aaron Davidson i will try to do as you say.
          Hide
          ilikerps Aaron Davidson added a comment -

          Lianhui Wang Can you try again with preferDirectBufs set to true, and just setting maxUsableCores down to the number of cores each container actually has? It's possible the performance discrepancy you're seeing is simply due to heap byte buffers not being as fast as direct ones. You might also decrease the Java heap size a bit while keeping the container size the same, if any direct memory allocation is causing the container to be killed.

          Zhichao Zhang Same suggestion for you about setting preferDirectBufs to true and setting maxUsableCores down, but I will also perform another round of benchmarking – it's possible we accidentally introduced a performance regression in the last few patches.

          Comparing Hadoop vs Spark performance is a different matter. A few suggestions on your setup: You should set executor-cores to 5, so that each executor is actually using 5 cores instead of just 1. You're losing significant parallelism because of this setting, as Spark will only launch 1 task per core on an executor at any given time. Second, groupBy() is inefficient (it's doc was changed recently to reflect this), and should be avoided. I would recommend changing your job to sort the whole RDD using something similar to

          mapR.map { x => ((x._1._1, x._2._1), x) }.sortByKey()

          , which would not require that all values for a single group fit in memory. This would still effectively group by x._1._1, but would sort within each group by x._2._1, and would utilize Spark's efficient sorting machinery.

          Show
          ilikerps Aaron Davidson added a comment - Lianhui Wang Can you try again with preferDirectBufs set to true, and just setting maxUsableCores down to the number of cores each container actually has? It's possible the performance discrepancy you're seeing is simply due to heap byte buffers not being as fast as direct ones. You might also decrease the Java heap size a bit while keeping the container size the same, if any direct memory allocation is causing the container to be killed. Zhichao Zhang Same suggestion for you about setting preferDirectBufs to true and setting maxUsableCores down, but I will also perform another round of benchmarking – it's possible we accidentally introduced a performance regression in the last few patches. Comparing Hadoop vs Spark performance is a different matter. A few suggestions on your setup: You should set executor-cores to 5, so that each executor is actually using 5 cores instead of just 1. You're losing significant parallelism because of this setting, as Spark will only launch 1 task per core on an executor at any given time. Second, groupBy() is inefficient (it's doc was changed recently to reflect this), and should be avoided. I would recommend changing your job to sort the whole RDD using something similar to mapR.map { x => ((x._1._1, x._2._1), x) }.sortByKey() , which would not require that all values for a single group fit in memory. This would still effectively group by x._1._1, but would sort within each group by x._2._1, and would utilize Spark's efficient sorting machinery.
          Hide
          zzcclp Zhichao Zhang added a comment -

          The performance of Netty is worse than NIO in my test. Why?@Aaron Davidson.

          I want to improve the performance of shuffle, with 500G of shuffle data, the performance is more worse than hadoop.

          Show
          zzcclp Zhichao Zhang added a comment - The performance of Netty is worse than NIO in my test. Why?@Aaron Davidson. I want to improve the performance of shuffle, with 500G of shuffle data, the performance is more worse than hadoop.
          Hide
          lianhuiwang Lianhui Wang added a comment - - edited

          Aaron Davidson yes,with https://github.com/apache/spark/pull/3155/ in my test beyond physical memory limits does not happened.but i discover that Netty's performance is not good than NioBlockTransferService. so I need to find why Netty's performance is bad than NioBlockTransferService in my test.Can you give me some suggestions? thanks.and how about your test? Zhichao Zhang

          Show
          lianhuiwang Lianhui Wang added a comment - - edited Aaron Davidson yes,with https://github.com/apache/spark/pull/3155/ in my test beyond physical memory limits does not happened.but i discover that Netty's performance is not good than NioBlockTransferService. so I need to find why Netty's performance is bad than NioBlockTransferService in my test.Can you give me some suggestions? thanks.and how about your test? Zhichao Zhang
          Hide
          zzcclp Zhichao Zhang added a comment -

          Hi, Aaron Davidson, what do you mean that " Is it really the case that each of your executors is only using 1 core for its 20GB of RAM? It seems like 5 would be in line with the portion of memory you're using"?

          I try to set spark.storage.memoryFraction and spark.shuffle.memoryFraction from 0.2 to 0.5 before, OOM still occur.

          Show
          zzcclp Zhichao Zhang added a comment - Hi, Aaron Davidson, what do you mean that " Is it really the case that each of your executors is only using 1 core for its 20GB of RAM? It seems like 5 would be in line with the portion of memory you're using"? I try to set spark.storage.memoryFraction and spark.shuffle.memoryFraction from 0.2 to 0.5 before, OOM still occur.
          Hide
          ilikerps Aaron Davidson added a comment -

          Zhichao Zhang Thank you for the writeup. Is it really the case that each of your executors is only using 1 core for its 20GB of RAM? It seems like 5 would be in line with the portion of memory you're using. Also, the sum of your storage and memory fractions exceed 1, so if you're caching any data and then performing a reduction/groupBy, you could actually see an OOM even without this other issue. I would recommend keeping shuffle fraction relatively low unless you have a good reason not to, as it can lead to increased instability.

          The numbers are relatively close to my expectations, which would estimate netty allocating around 750MB of direct buffer space, thinking that it has 24 cores. With #3155 and maxUsableCores set to 1 (or 5), I hope this issue may be resolved.

          Show
          ilikerps Aaron Davidson added a comment - Zhichao Zhang Thank you for the writeup. Is it really the case that each of your executors is only using 1 core for its 20GB of RAM? It seems like 5 would be in line with the portion of memory you're using. Also, the sum of your storage and memory fractions exceed 1, so if you're caching any data and then performing a reduction/groupBy, you could actually see an OOM even without this other issue. I would recommend keeping shuffle fraction relatively low unless you have a good reason not to, as it can lead to increased instability. The numbers are relatively close to my expectations, which would estimate netty allocating around 750MB of direct buffer space, thinking that it has 24 cores. With #3155 and maxUsableCores set to 1 (or 5), I hope this issue may be resolved.
          Hide
          zzcclp Zhichao Zhang added a comment -

          By the way, My test code:
          val mapR = textFile.map(line =>

          { ...... ((value(1) + "_" + date.toString(), url), (flow, 1)) }

          ).reduceByKey((pair1, pair2) =>

          { (pair1._1 + pair2._1, pair1._2 + pair2._2) }

          , 100)

          mapR.persist(StorageLevel.MEMORY_AND_DISK_SER)

          val mapR1 = mapR.groupBy(_._1._1)
          .mapValues(pairs =>

          { pairs.toList.sortBy(_._2._1).reverse }

          )
          .flatMap(values =>

          { values._2 })
          .map(values => { values._1._1 + "\t" + values._1._2 + "\t" + values._2._1.toString() + "\t" + values._2._2.toString() })
          .saveAsTextFile(outputPath + "_1/")

          val mapR2 = mapR.groupBy(_._1._1)
          .mapValues(pairs => { pairs.toList.sortBy(_._2._2).reverse })
          .flatMap(values => { values._2 }

          )
          .map(values =>

          { values._1._1 + "\t" + values._1._2 + "\t" + values._2._1.toString() + "\t" + values._2._2.toString() }

          )
          .saveAsTextFile(outputPath + "_2/")

          Show
          zzcclp Zhichao Zhang added a comment - By the way, My test code: val mapR = textFile.map(line => { ...... ((value(1) + "_" + date.toString(), url), (flow, 1)) } ).reduceByKey((pair1, pair2) => { (pair1._1 + pair2._1, pair1._2 + pair2._2) } , 100) mapR.persist(StorageLevel.MEMORY_AND_DISK_SER) val mapR1 = mapR.groupBy(_._1._1) .mapValues(pairs => { pairs.toList.sortBy(_._2._1).reverse } ) .flatMap(values => { values._2 }) .map(values => { values._1._1 + "\t" + values._1._2 + "\t" + values._2._1.toString() + "\t" + values._2._2.toString() }) .saveAsTextFile(outputPath + "_1/") val mapR2 = mapR.groupBy(_._1._1) .mapValues(pairs => { pairs.toList.sortBy(_._2._2).reverse }) .flatMap(values => { values._2 } ) .map(values => { values._1._1 + "\t" + values._1._2 + "\t" + values._2._1.toString() + "\t" + values._2._2.toString() } ) .saveAsTextFile(outputPath + "_2/")
          Hide
          zzcclp Zhichao Zhang added a comment -

          Hi, Aaron Davidson, I can't download logs from server, so I just write them here:

          there are 3 nodes in cluster, 24 cores / 128G per node, YARN can allocate 20 cores and 80G per node.

          I start application with command "--driver-memory 10g --num-executors 10 --executor-memory 20g --executor-cores 1 --driver-library-path :/usr/local/hadoop/lib/native/ /opt/wsspark.jar 24G_10_20g_1c 1 100 hdfs://wscluster/zzc_test/in/snappy8/ 100 100 hdfs://wscluster/zzc_test/out/i007"

          My spark config :
          spark.default.parallelism 100
          spark.shuffle.consolidateFiles false
          spark.shuffle.spill.compress true
          spark.shuffle.compress true
          spark.storage.memoryFraction 0.6
          spark.shuffle.memoryFraction 0.5
          spark.shuffle.file.buffer.kb 100
          spark.reducer.maxMbInFlight 48
          spark.shuffle.blockTransferService netty
          spark.shuffle.io.mode nio
          spark.shuffle.io.connectionTimeout 120
          spark.shuffle.manager SORT
          spark.shuffle.io.preferDirectBufs false
          spark.shuffle.io.maxRetries 3
          spark.shuffle.io.retryWaitMs 5000
          spark.scheduler.mode FIFO
          spark.akka.frameSize 10
          spark.akka.timeout 100

          there are about 24G snappy files for input and 14.5G shuffle write data.

          With above config, from am's log, I find that each container need greater than 13G memory, so OOM occur.

          If I set "spark.shuffle.blockTransferService=nio", each container need about 12G memory.

          Show
          zzcclp Zhichao Zhang added a comment - Hi, Aaron Davidson, I can't download logs from server, so I just write them here: there are 3 nodes in cluster, 24 cores / 128G per node, YARN can allocate 20 cores and 80G per node. I start application with command "--driver-memory 10g --num-executors 10 --executor-memory 20g --executor-cores 1 --driver-library-path :/usr/local/hadoop/lib/native/ /opt/wsspark.jar 24G_10_20g_1c 1 100 hdfs://wscluster/zzc_test/in/snappy8/ 100 100 hdfs://wscluster/zzc_test/out/i007" My spark config : spark.default.parallelism 100 spark.shuffle.consolidateFiles false spark.shuffle.spill.compress true spark.shuffle.compress true spark.storage.memoryFraction 0.6 spark.shuffle.memoryFraction 0.5 spark.shuffle.file.buffer.kb 100 spark.reducer.maxMbInFlight 48 spark.shuffle.blockTransferService netty spark.shuffle.io.mode nio spark.shuffle.io.connectionTimeout 120 spark.shuffle.manager SORT spark.shuffle.io.preferDirectBufs false spark.shuffle.io.maxRetries 3 spark.shuffle.io.retryWaitMs 5000 spark.scheduler.mode FIFO spark.akka.frameSize 10 spark.akka.timeout 100 there are about 24G snappy files for input and 14.5G shuffle write data. With above config, from am's log, I find that each container need greater than 13G memory, so OOM occur. If I set "spark.shuffle.blockTransferService=nio", each container need about 12G memory.
          Hide
          ilikerps Aaron Davidson added a comment -

          Lianhui Wang I have created #3155, which I will clean up and try to get in tomorrow, which makes the preferDirectBufs config forcefully disable direct byte buffers from both the server and client pools. Additionally, I have added the conf "spark.shuffle.io.maxUsableCores" which should allow you to inform the executor how many cores you're actually using, so it will avoid allocating enough memory for all the machine's cores.

          I hope that simply specifying the maxUsableCores is sufficient to actually fix this issue for you, but the combination should give a higher chance of success.

          Show
          ilikerps Aaron Davidson added a comment - Lianhui Wang I have created #3155 , which I will clean up and try to get in tomorrow, which makes the preferDirectBufs config forcefully disable direct byte buffers from both the server and client pools. Additionally, I have added the conf "spark.shuffle.io.maxUsableCores" which should allow you to inform the executor how many cores you're actually using, so it will avoid allocating enough memory for all the machine's cores. I hope that simply specifying the maxUsableCores is sufficient to actually fix this issue for you, but the combination should give a higher chance of success.
          Hide
          ilikerps Aaron Davidson added a comment -

          Looking at the netty code a bit more, it seems that they might unconditionally allocate direct buffers for IO, whether or not direct is "preferred". Additionally, they allocate more memory based on the number of cores in your system. The default settings would be roughly 16MB per core, and this might be multiplied by 2 in our current setup since we have independent client and server pools in the same JVM. I'm not certain how executors running in YARN report "availableProcessors", but is it possible your machines have 32 or greater cores? This could cause an extra allocation of around 1GB direct heap buffers.

          Show
          ilikerps Aaron Davidson added a comment - Looking at the netty code a bit more, it seems that they might unconditionally allocate direct buffers for IO, whether or not direct is "preferred". Additionally, they allocate more memory based on the number of cores in your system. The default settings would be roughly 16MB per core, and this might be multiplied by 2 in our current setup since we have independent client and server pools in the same JVM. I'm not certain how executors running in YARN report "availableProcessors", but is it possible your machines have 32 or greater cores? This could cause an extra allocation of around 1GB direct heap buffers.
          Hide
          ilikerps Aaron Davidson added a comment -

          Thanks a lot for those diagnostics. Can you confirm that "spark.shuffle.io.preferDirectBufs" does show up in the UI as being set properly? Does your workload mainly involve a large shuffle? How big is each partition/how many are there? In addition to the netty buffers (which should be disabled by the config), we also memory map shuffle blocks larger than 2MB.

          Show
          ilikerps Aaron Davidson added a comment - Thanks a lot for those diagnostics. Can you confirm that "spark.shuffle.io.preferDirectBufs" does show up in the UI as being set properly? Does your workload mainly involve a large shuffle? How big is each partition/how many are there? In addition to the netty buffers (which should be disabled by the config), we also memory map shuffle blocks larger than 2MB.
          Hide
          lianhuiwang Lianhui Wang added a comment -

          Zhichao Zhang in am's log, you can find this log:
          Exit status: 143. Diagnostics: Container[container-id]is running beyond physical memory limits. Current usage: 8.3 GB of 8 GB physical memory used; 11.0 GB of 16.8 GB virtual memory used. Killing container.
          and i already set spark.yarn.executor.memoryOverhead=1024 and executor's memory is 7G.
          so through above log, i can confirm that executor use big no-heap jvm memory.

          Show
          lianhuiwang Lianhui Wang added a comment - Zhichao Zhang in am's log, you can find this log: Exit status: 143. Diagnostics: Container [container-id] is running beyond physical memory limits. Current usage: 8.3 GB of 8 GB physical memory used; 11.0 GB of 16.8 GB virtual memory used. Killing container. and i already set spark.yarn.executor.memoryOverhead=1024 and executor's memory is 7G. so through above log, i can confirm that executor use big no-heap jvm memory.
          Hide
          ilikerps Aaron Davidson added a comment -

          Yup, that would work.

          Show
          ilikerps Aaron Davidson added a comment - Yup, that would work.
          Hide
          zzcclp Zhichao Zhang added a comment -

          aaron@databricks.com?

          Show
          zzcclp Zhichao Zhang added a comment - aaron@databricks.com?
          Hide
          ilikerps Aaron Davidson added a comment -

          Zhichao Zhang Yes, please do. What's the memory of your YARN executors/containers? With preferDirectBufs off, we should allocate little to no off-heap memory, so these results are surprising.

          Show
          ilikerps Aaron Davidson added a comment - Zhichao Zhang Yes, please do. What's the memory of your YARN executors/containers? With preferDirectBufs off, we should allocate little to no off-heap memory, so these results are surprising.
          Hide
          zzcclp Zhichao Zhang added a comment -

          @Lianhui Wang, How to view the associated logs with " yarn still kill executor's container because it's physical memory beyond allocated memory". I can't find it.

          Show
          zzcclp Zhichao Zhang added a comment - @Lianhui Wang, How to view the associated logs with " yarn still kill executor's container because it's physical memory beyond allocated memory". I can't find it.
          Hide
          lianhuiwang Lianhui Wang added a comment -

          Aaron Davidson when i set the spark config "spark.shuffle.io.preferDirectBufs=false", and yarn still kill executor's container because it's physical memory beyond allocated memory.so i think netty have used big no-heap jvm memory.

          Show
          lianhuiwang Lianhui Wang added a comment - Aaron Davidson when i set the spark config "spark.shuffle.io.preferDirectBufs=false", and yarn still kill executor's container because it's physical memory beyond allocated memory.so i think netty have used big no-heap jvm memory.
          Hide
          zzcclp Zhichao Zhang added a comment -

          @Aaron Davidson, error still occur by setting the spark config "spark.shuffle.io.preferDirectBufs=false", can I send email to you with error logs and env detail?

          Show
          zzcclp Zhichao Zhang added a comment - @Aaron Davidson, error still occur by setting the spark config "spark.shuffle.io.preferDirectBufs=false", can I send email to you with error logs and env detail?
          Hide
          zzcclp Zhichao Zhang added a comment -

          YES, I see it, and start to compile and test.

          Show
          zzcclp Zhichao Zhang added a comment - YES, I see it, and start to compile and test.
          Hide
          rxin Reynold Xin added a comment -

          It's been merged.

          Show
          rxin Reynold Xin added a comment - It's been merged.
          Hide
          zzcclp Zhichao Zhang added a comment -

          Thank you.

          Show
          zzcclp Zhichao Zhang added a comment - Thank you.
          Hide
          ilikerps Aaron Davidson added a comment -

          Zhichao Zhang I believe it is close to merging. Reynold Xin is finishing up his review.

          Show
          ilikerps Aaron Davidson added a comment - Zhichao Zhang I believe it is close to merging. Reynold Xin is finishing up his review.
          Hide
          zzcclp Zhichao Zhang added a comment -

          @Aaron Davidson, Thank you for your recommendation. By the way, PR #3101 can be merged into master today?

          @Lianhui Wang, I haven't tested it.

          Show
          zzcclp Zhichao Zhang added a comment - @Aaron Davidson, Thank you for your recommendation. By the way, PR #3101 can be merged into master today? @Lianhui Wang, I haven't tested it.
          Hide
          ilikerps Aaron Davidson added a comment -

          Zhichao Zhang Use of epoll mode is highly dependent on your environment, and I personally would not recommend it due to known netty bugs which may cause it to be less stable. We have found nio mode to be sufficiently performant in our testing (and netty actually still tries to use epoll if it's available as its selector).

          Lianhui Wang Could you please elaborate on what you mean?

          Show
          ilikerps Aaron Davidson added a comment - Zhichao Zhang Use of epoll mode is highly dependent on your environment, and I personally would not recommend it due to known netty bugs which may cause it to be less stable. We have found nio mode to be sufficiently performant in our testing (and netty actually still tries to use epoll if it's available as its selector). Lianhui Wang Could you please elaborate on what you mean?
          Hide
          lianhuiwang Lianhui Wang added a comment -

          Aaron Davidson i use your branch and memory overhead on yarn is exist. Zhichao Zhang how about your test.

          Show
          lianhuiwang Lianhui Wang added a comment - Aaron Davidson i use your branch and memory overhead on yarn is exist. Zhichao Zhang how about your test.
          Hide
          zzcclp Zhichao Zhang added a comment -

          Hi, Aaron Davidson, I set spark.shuffle.blockTransferService=netty and spark.shuffle.io.mode=nio, run on CentOS 5.8 with 12G files successfully , but when I set spark.shuffle.blockTransferService=netty and spark.shuffle.io.mode=epoll, there is error:
          Exception in thread "main" java.lang.UnsatisfiedLinkError: /tmp/libnetty-transport-native-epoll7072694982027222413.so: /lib64/libc.so.6: version `GLIBC_2.10' not found

          I find GLIBC_2.5 on CentOS 5.8 and can not upgrade, how to resolve it.

          Show
          zzcclp Zhichao Zhang added a comment - Hi, Aaron Davidson, I set spark.shuffle.blockTransferService=netty and spark.shuffle.io.mode=nio, run on CentOS 5.8 with 12G files successfully , but when I set spark.shuffle.blockTransferService=netty and spark.shuffle.io.mode=epoll, there is error: Exception in thread "main" java.lang.UnsatisfiedLinkError: /tmp/libnetty-transport-native-epoll7072694982027222413.so: /lib64/libc.so.6: version `GLIBC_2.10' not found I find GLIBC_2.5 on CentOS 5.8 and can not upgrade, how to resolve it.
          Hide
          zzcclp Zhichao Zhang added a comment -

          Thank you for your reply. I will focus on it.

          Show
          zzcclp Zhichao Zhang added a comment - Thank you for your reply. I will focus on it.
          Hide
          ilikerps Aaron Davidson added a comment - - edited

          This could be due to the netty transfer service allocating more off-heap byte buffers, which perhaps is accounted for differently by YARN. PR #3101, which should go in tomorrow, will include a way to avoid allocating off-heap buffers (by setting the spark config "spark.shuffle.io.preferDirectBufs=false"), which should either solve your problem or at least produce the more typical OutOfMemoryError.

          Show
          ilikerps Aaron Davidson added a comment - - edited This could be due to the netty transfer service allocating more off-heap byte buffers, which perhaps is accounted for differently by YARN. PR #3101 , which should go in tomorrow, will include a way to avoid allocating off-heap buffers (by setting the spark config "spark.shuffle.io.preferDirectBufs=false"), which should either solve your problem or at least produce the more typical OutOfMemoryError.
          Hide
          zzcclp Zhichao Zhang added a comment -

          If I use less data, it can run successfully, such as 24G snappy files.
          but if use 240G snappy files, the above error will occur.

          Show
          zzcclp Zhichao Zhang added a comment - If I use less data, it can run successfully, such as 24G snappy files. but if use 240G snappy files, the above error will occur.
          Hide
          zzcclp Zhichao Zhang added a comment -

          Yes, running on yarn client mode, but application is running, not killed, and then running failed repeatedly.
          Why? I can not find other error .

          Show
          zzcclp Zhichao Zhang added a comment - Yes, running on yarn client mode, but application is running, not killed, and then running failed repeatedly. Why? I can not find other error .
          Hide
          rxin Reynold Xin added a comment -

          Are you running on YARN? It seems like YARN just killed your application.

          Show
          rxin Reynold Xin added a comment - Are you running on YARN? It seems like YARN just killed your application.
          Hide
          zzcclp Zhichao Zhang added a comment - - edited

          Hi, Reynold Xin, SPARK-3453 Netty-based BlockTransferService, extracted from Spark core was commited yesterday, I compile latest code from github master, when I set spark.shuffle.blockTransferService=netty, there is error:

          2014-11-04 15:30:27,013 - ERROR - org.apache.spark.util.SignalLoggerHandler.handle(SignalLogger.scala:57) - RECEIVED SIGNAL 15: SIGTERM
          2014-11-04 15:30:28,484 - ERROR - org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:95) - Still have 6 requests outstanding when connection from np04/203.130.48.183:42574 is closed
          2014-11-04 15:30:28,522 - WARN - org.apache.spark.network.server.TransportChannelHandler.exceptionCaught(TransportChannelHandler.java:66) - Exception in connection from /203.130.48.183:39332
          java.io.IOException: Connection reset by peer
          at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
          at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
          at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
          at sun.nio.ch.IOUtil.read(IOUtil.java:192)
          at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
          at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
          at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
          at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
          at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
          at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
          at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
          at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
          at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)

          and when i set spark.shuffle.blockTransferService=nio, run successfully.

          In addition, when shuffle performance improvement issue will be resolved?

          Show
          zzcclp Zhichao Zhang added a comment - - edited Hi, Reynold Xin, SPARK-3453 Netty-based BlockTransferService, extracted from Spark core was commited yesterday, I compile latest code from github master, when I set spark.shuffle.blockTransferService=netty, there is error: 2014-11-04 15:30:27,013 - ERROR - org.apache.spark.util.SignalLoggerHandler.handle(SignalLogger.scala:57) - RECEIVED SIGNAL 15: SIGTERM 2014-11-04 15:30:28,484 - ERROR - org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:95) - Still have 6 requests outstanding when connection from np04/203.130.48.183:42574 is closed 2014-11-04 15:30:28,522 - WARN - org.apache.spark.network.server.TransportChannelHandler.exceptionCaught(TransportChannelHandler.java:66) - Exception in connection from /203.130.48.183:39332 java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) and when i set spark.shuffle.blockTransferService=nio, run successfully. In addition, when shuffle performance improvement issue will be resolved?
          Hide
          zzcclp Zhichao Zhang added a comment -

          thanks

          Show
          zzcclp Zhichao Zhang added a comment - thanks
          Show
          rxin Reynold Xin added a comment - Take a look here https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage
          Hide
          zzcclp Zhichao Zhang added a comment -

          Hi,Reynold Xin, When version 1.2 released, approximate time?

          Show
          zzcclp Zhichao Zhang added a comment - Hi,Reynold Xin, When version 1.2 released, approximate time?
          Hide
          rxin Reynold Xin added a comment -

          Scheduled to go in in 1.2.

          Show
          rxin Reynold Xin added a comment - Scheduled to go in in 1.2.
          Hide
          zzcclp Zhichao Zhang added a comment -

          Hi,Reynold Xin, What time does this issue can be solved?
          I need to improve shuffle performance as soon as possible.

          Show
          zzcclp Zhichao Zhang added a comment - Hi,Reynold Xin, What time does this issue can be solved? I need to improve shuffle performance as soon as possible.
          Hide
          apachespark Apache Spark added a comment -

          User 'rxin' has created a pull request for this issue:
          https://github.com/apache/spark/pull/1971

          Show
          apachespark Apache Spark added a comment - User 'rxin' has created a pull request for this issue: https://github.com/apache/spark/pull/1971
          Hide
          apachespark Apache Spark added a comment -

          User 'rxin' has created a pull request for this issue:
          https://github.com/apache/spark/pull/1907

          Show
          apachespark Apache Spark added a comment - User 'rxin' has created a pull request for this issue: https://github.com/apache/spark/pull/1907
          Hide
          rxin Reynold Xin added a comment -

          It's something I'd like to prototype for 1.2. Do you have any thoughts on this?

          Show
          rxin Reynold Xin added a comment - It's something I'd like to prototype for 1.2. Do you have any thoughts on this?
          Hide
          colorant Raymond Liu added a comment -

          so, is there anyone working on this?

          Show
          colorant Raymond Liu added a comment - so, is there anyone working on this?
          Hide
          mridulm80 Mridul Muralidharan added a comment -

          Ah, small files - those are indeed a problem.

          Btw, we do dispose off map'ed blocks as soon as it is done; so we dont need to wait for gc to free them. Also note that the files are closed as soon as opened and mmap'ed - so they do not count towards open file count/ulimit.

          Agree on 1, 3 and 4 - some of these apply to sendfile too btw : so not avoidable; but it is the best we have right now.
          Since we use mmap'ed buffers and rarely transfer the same file again, the performance jump might not be the order(s) of magnitude other projects claim - but then even 10% (or whatever) improvement in our case would be substantial !

          Show
          mridulm80 Mridul Muralidharan added a comment - Ah, small files - those are indeed a problem. Btw, we do dispose off map'ed blocks as soon as it is done; so we dont need to wait for gc to free them. Also note that the files are closed as soon as opened and mmap'ed - so they do not count towards open file count/ulimit. Agree on 1, 3 and 4 - some of these apply to sendfile too btw : so not avoidable; but it is the best we have right now. Since we use mmap'ed buffers and rarely transfer the same file again, the performance jump might not be the order(s) of magnitude other projects claim - but then even 10% (or whatever) improvement in our case would be substantial !
          Hide
          rxin Reynold Xin added a comment - - edited

          We do use mmap for large blocks. However, most of the shuffle blocks are small so a lot of blocks are not mapped. In addition, there are multiple problems with memory mapped files:

          1. Memory mapped blocks are off-heap and are not managed by the JVM, which creates another memory space to tune/mange

          2. Memory mapped blocks cannot be reused and are only released at GC. It is easy to have too many files opened.

          3. On Linux machines with Huge Pages configured (which is increasingly more common with large memory), the default behavior is each file will consume 2MB, leading to OOM very soon.

          4. For large blocks that span multiple pages, it creates page faults which leads to unnecessary context switches

          The last one is probably much less important.

          Show
          rxin Reynold Xin added a comment - - edited We do use mmap for large blocks. However, most of the shuffle blocks are small so a lot of blocks are not mapped. In addition, there are multiple problems with memory mapped files: 1. Memory mapped blocks are off-heap and are not managed by the JVM, which creates another memory space to tune/mange 2. Memory mapped blocks cannot be reused and are only released at GC. It is easy to have too many files opened. 3. On Linux machines with Huge Pages configured (which is increasingly more common with large memory), the default behavior is each file will consume 2MB, leading to OOM very soon. 4. For large blocks that span multiple pages, it creates page faults which leads to unnecessary context switches The last one is probably much less important.
          Hide
          mridulm80 Mridul Muralidharan added a comment -

          Writing mmap'ed buffers are pretty efficient btw - the second fallback in transferTo implementation iirc.

          Show
          mridulm80 Mridul Muralidharan added a comment - Writing mmap'ed buffers are pretty efficient btw - the second fallback in transferTo implementation iirc.
          Hide
          mridulm80 Mridul Muralidharan added a comment -

          We map the file content and directly write that to the socket (except when the size is below 8k or so iirc) - are you sure we are copying to user space and back ?

          Show
          mridulm80 Mridul Muralidharan added a comment - We map the file content and directly write that to the socket (except when the size is below 8k or so iirc) - are you sure we are copying to user space and back ?

            People

            • Assignee:
              rxin Reynold Xin
              Reporter:
              rxin Reynold Xin
            • Votes:
              0 Vote for this issue
              Watchers:
              23 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development