Hadoop Map/Reduce
  1. Hadoop Map/Reduce
  2. MAPREDUCE-2212

MapTask and ReduceTask should only compress/decompress the final map output file

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Won't Fix
    • Affects Version/s: 0.23.0
    • Fix Version/s: 0.23.0
    • Component/s: task
    • Labels:
      None

      Description

      Currently if we set mapred.map.output.compression.codec
      1. MapTask will compress every spill, decompress every spill, merge and compress the final map output file
      2. ReduceTask will decompress, merge and compress every map output file. And repeat the compression/decompression every pass.

      This causes all the data being compressed/decompressed many times.
      The reason we need mapred.map.output.compression.codec is for network traffic.
      We should not compress/decompress the data again and again during merge sort.

      We should only compress the final map output file that will be transmitted over the network.

        Activity

        Transition Time In Source Status Execution Times Last Executer Last Execution Date
        Open Open Resolved Resolved
        9d 3m 1 Scott Chen 16/Dec/10 19:53
        Resolved Resolved Closed Closed
        333d 4h 55m 1 Arun C Murthy 15/Nov/11 00:49
        Arun C Murthy made changes -
        Status Resolved [ 5 ] Closed [ 6 ]
        Scott Chen made changes -
        Status Open [ 1 ] Resolved [ 5 ]
        Resolution Won't Fix [ 2 ]
        Hide
        Scott Chen added a comment -

        I am closing this now because I think there is no much benefit to do this.
        This will increase complexity of the code.

        Show
        Scott Chen added a comment - I am closing this now because I think there is no much benefit to do this. This will increase complexity of the code.
        Hide
        Scott Chen added a comment -

        I think maybe we should leave the way it is right now since there is no huge difference.
        Doing more change increases the complexity.

        What do you guys think?

        Show
        Scott Chen added a comment - I think maybe we should leave the way it is right now since there is no huge difference. Doing more change increases the complexity. What do you guys think?
        Hide
        Scott Chen added a comment -

        I have done some experiments on the latency.
        In the experiment, 500mb of data are read from the disk, compressed and written to the disk.
        It shows that the throughput of LZO is slightly worse than no codec. But they are very close.

        I think for latency, there is no much difference.
        The question here is about the trade-off between disk IO and CPU.
        Using LZO uses more CPU (I don' have number for this) but can save disk IO to 50%.

        ================================================
        Initialize codec lzo 
        Finished. Time: 10278 ms
        File size: 239.19908142089844MB Compression ratio: 0.501636832
        Throughput: 47.50741875851333MB/s
        ================================================
        Initialize codec gz
        Finished. Time: 38132 ms
        File size: 161.91629219055176MB Compression ratio: 0.339563076
        Throughput: 12.805025962446239MB/s
        ================================================
        Initialize codec none
        Finished. Time: 8783 ms
        File size: 476.837158203125MB Compression ratio: 1.0
        Throughput: 55.59390299442104MB/s
        ================================================
        

        Here is a simple example that produces these numbers.

        public class TestCodecDiskIO extends TestCase {
          
          Log LOG = LogFactory.getLog(TestCodecDiskIO.class);
        
          static {
            System.setProperty(Compression.Algorithm.CONF_LZO_CLASS,
                "com.hadoop.compression.lzo.LzoCodec");
          }
          
          public void testCodecWrite()
              throws Exception {
            File dataFile = new File("/home/schen/data/test_data");
            print("Data file:" + dataFile.getName());
            InputStream in = new BufferedInputStream(new FileInputStream(dataFile));
            int dataLength = 5 * 1024 * 1024 * 1024;
            byte buff[] = new byte[dataLength];
            print("Start reading file. Read length = " + dataLength);
            long start = now();
            in.read(buff);
            long timeSpent = now() - start;
            in.close();
            print("Reading time: " + timeSpent);
            
            byte buff2[] = new byte[dataLength];
            start = now();
            System.arraycopy(buff, 0, buff2, 0, buff.length);
            timeSpent = now() - start;
            print("Memory copy time: " + timeSpent);
            
            int count = 3;
        
            for (int i = 0; i < count; ++i) {
              for (Compression.Algorithm algo : Compression.Algorithm.values()) {
                print("================================================");
                print("Initialize codec " + algo.getName());
                CompressionCodec codec = algo.getCodec();
                File temp = File.createTempFile("test", "", new File("/tmp"));
                temp.deleteOnExit();
                FileOutputStream fout = new FileOutputStream(temp);
                BufferedOutputStream bout = new BufferedOutputStream(fout);
                OutputStream out;
                if (codec != null) {
                  out = codec.createOutputStream(bout);
                } else {
                  out = bout;
                }
                print("Start writing");
                start = now();
                out.write(buff);
                out.flush();
                fout.getFD().sync();
                out.close();
                timeSpent = now() - start;
                print("Finished. Time: " + timeSpent + " ms");
                print("File size: " + (temp.length() / 1024.0 / 1024.0) + "MB" +
                    " Compression ratio: " + temp.length() / (double)(dataLength));
                print(("Throughput: " + dataLength / (double)(timeSpent) / 1024.0) + "MB/s");
              }
            }
            print("================================================");
          }
        
          private void print(String s) {
            System.out.println(s);
          }
          private long now() {
            return System.currentTimeMillis();
          }
        }
        
        Show
        Scott Chen added a comment - I have done some experiments on the latency. In the experiment, 500mb of data are read from the disk, compressed and written to the disk. It shows that the throughput of LZO is slightly worse than no codec. But they are very close. I think for latency, there is no much difference. The question here is about the trade-off between disk IO and CPU. Using LZO uses more CPU (I don' have number for this) but can save disk IO to 50%. ================================================ Initialize codec lzo Finished. Time: 10278 ms File size: 239.19908142089844MB Compression ratio: 0.501636832 Throughput: 47.50741875851333MB/s ================================================ Initialize codec gz Finished. Time: 38132 ms File size: 161.91629219055176MB Compression ratio: 0.339563076 Throughput: 12.805025962446239MB/s ================================================ Initialize codec none Finished. Time: 8783 ms File size: 476.837158203125MB Compression ratio: 1.0 Throughput: 55.59390299442104MB/s ================================================ Here is a simple example that produces these numbers. public class TestCodecDiskIO extends TestCase { Log LOG = LogFactory.getLog(TestCodecDiskIO.class); static { System .setProperty(Compression.Algorithm.CONF_LZO_CLASS, "com.hadoop.compression.lzo.LzoCodec" ); } public void testCodecWrite() throws Exception { File dataFile = new File( "/home/schen/data/test_data" ); print( "Data file:" + dataFile.getName()); InputStream in = new BufferedInputStream( new FileInputStream(dataFile)); int dataLength = 5 * 1024 * 1024 * 1024; byte buff[] = new byte [dataLength]; print( "Start reading file. Read length = " + dataLength); long start = now(); in.read(buff); long timeSpent = now() - start; in.close(); print( "Reading time: " + timeSpent); byte buff2[] = new byte [dataLength]; start = now(); System .arraycopy(buff, 0, buff2, 0, buff.length); timeSpent = now() - start; print( "Memory copy time: " + timeSpent); int count = 3; for ( int i = 0; i < count; ++i) { for (Compression.Algorithm algo : Compression.Algorithm.values()) { print( "================================================" ); print( "Initialize codec " + algo.getName()); CompressionCodec codec = algo.getCodec(); File temp = File.createTempFile( "test" , "", new File(" /tmp")); temp.deleteOnExit(); FileOutputStream fout = new FileOutputStream(temp); BufferedOutputStream bout = new BufferedOutputStream(fout); OutputStream out; if (codec != null ) { out = codec.createOutputStream(bout); } else { out = bout; } print( "Start writing" ); start = now(); out.write(buff); out.flush(); fout.getFD().sync(); out.close(); timeSpent = now() - start; print( "Finished. Time: " + timeSpent + " ms" ); print( "File size: " + (temp.length() / 1024.0 / 1024.0) + "MB" + " Compression ratio: " + temp.length() / ( double )(dataLength)); print(( "Throughput: " + dataLength / ( double )(timeSpent) / 1024.0) + "MB/s" ); } } print( "================================================" ); } private void print( String s) { System .out.println(s); } private long now() { return System .currentTimeMillis(); } }
        Hide
        Joydeep Sen Sarma added a comment -

        Todd - do you know for sure that the benefit is due to the compression of the final spill or because of the compression of the intermediate sort runs? i am thinking that if the experiment is just to turn compression on/off and run some benchmark - then it wouldn't be clear whether any win is from lower network latencies (from map->reduce) or from faster mappers (if they were disk bound without compression).

        in general i have seen that the map-reduce stack consumes data at a very low rate (it's cpu bound by the time it gets to 10-20 MBps). (Obviously this is a very loose statement and depends a lot on what the mappers are doing etc.). so even with 6 disks (say a total of 300MBps streaming read/write bandwidth) and 8 cores (say about 200 MBps processing bandwidth) - it would seem that we would be cpu bound before we would be disk throughput bound. would be nice to get more accurate numbers along these lines.

        Show
        Joydeep Sen Sarma added a comment - Todd - do you know for sure that the benefit is due to the compression of the final spill or because of the compression of the intermediate sort runs? i am thinking that if the experiment is just to turn compression on/off and run some benchmark - then it wouldn't be clear whether any win is from lower network latencies (from map->reduce) or from faster mappers (if they were disk bound without compression). in general i have seen that the map-reduce stack consumes data at a very low rate (it's cpu bound by the time it gets to 10-20 MBps). (Obviously this is a very loose statement and depends a lot on what the mappers are doing etc.). so even with 6 disks (say a total of 300MBps streaming read/write bandwidth) and 8 cores (say about 200 MBps processing bandwidth) - it would seem that we would be cpu bound before we would be disk throughput bound. would be nice to get more accurate numbers along these lines.
        Hide
        Todd Lipcon added a comment -

        We've found that even on single rack clusters (where bandwidth is usually not the bottleneck) LZO intermediate compression almost always helps. That indicates that at least in many workloads we're intermediate-IO bound more than CPU. This is consistent with what we see on most clusters with 4-6 disks. Clusters with 12 local disks more often are bound on network or CPU.

        Show
        Todd Lipcon added a comment - We've found that even on single rack clusters (where bandwidth is usually not the bottleneck) LZO intermediate compression almost always helps. That indicates that at least in many workloads we're intermediate-IO bound more than CPU. This is consistent with what we see on most clusters with 4-6 disks. Clusters with 12 local disks more often are bound on network or CPU.
        Hide
        Scott Chen added a comment -

        In our case, the resource is usually CPU or network bounded.

        Let me take this back. I don't have number for this one.

        I think the intuition is that the latency should be better if we do lzo compression for intermediate data.
        For throughput, it varies. If TT is constantly running with 100% cpu, we should probably trade cpu with some disk io.

        Show
        Scott Chen added a comment - In our case, the resource is usually CPU or network bounded. Let me take this back. I don't have number for this one. I think the intuition is that the latency should be better if we do lzo compression for intermediate data. For throughput, it varies. If TT is constantly running with 100% cpu, we should probably trade cpu with some disk io.
        Hide
        Scott Chen added a comment -

        In our case, the resource is usually CPU or network bounded.
        I like Joydeep's idea. It will be nice to have two separate codec options for the intermediate compression (for disk IO) and the final output compression (for network traffic).

        Show
        Scott Chen added a comment - In our case, the resource is usually CPU or network bounded. I like Joydeep's idea. It will be nice to have two separate codec options for the intermediate compression (for disk IO) and the final output compression (for network traffic).
        Hide
        Chris Douglas added a comment -

        Todd's point on disk bandwidth matches some benchmarks we did a couple years ago. Compressing the intermediate data improved the spill and merge times. It would be interesting to see if those results hold today, and for which codecs.

        In the case where no records are collected after the soft spill, the intermediate output will either need to be rewritten (since the reduce is expecting compressed output) or the shuffle will need to handle mixed segments. It's a rare case, but one the framework would need to handle.

        Show
        Chris Douglas added a comment - Todd's point on disk bandwidth matches some benchmarks we did a couple years ago. Compressing the intermediate data improved the spill and merge times. It would be interesting to see if those results hold today, and for which codecs. In the case where no records are collected after the soft spill, the intermediate output will either need to be rewritten (since the reduce is expecting compressed output) or the shuffle will need to handle mixed segments. It's a rare case, but one the framework would need to handle.
        Hide
        Joydeep Sen Sarma added a comment -

        makes sense - hadn't thought about backwards compatibility. so that would imply an additional (new) option to turn off intermediate run compression.

        Show
        Joydeep Sen Sarma added a comment - makes sense - hadn't thought about backwards compatibility. so that would imply an additional (new) option to turn off intermediate run compression.
        Hide
        Mahadev konar added a comment -

        joydeep,
        shouldnt that be the default to compress the on disk version if JobConf.setCompressMapOutput() is set to true. The jobs that have been running with this property set should have the same disk/network footprint that they used to have. no?

        Show
        Mahadev konar added a comment - joydeep, shouldnt that be the default to compress the on disk version if JobConf.setCompressMapOutput() is set to true. The jobs that have been running with this property set should have the same disk/network footprint that they used to have. no?
        Hide
        Joydeep Sen Sarma added a comment -

        we should have a separate option for compressing intermediate runs (for optimizing for disk bandwidth for folks who need it)

        Show
        Joydeep Sen Sarma added a comment - we should have a separate option for compressing intermediate runs (for optimizing for disk bandwidth for folks who need it)
        Hide
        Todd Lipcon added a comment -

        Do we have data to confirm that intermediate compression is only useful for reducing network traffic? It seems we're also reducing disk IO which can be a bottleneck especially when the core:disk ratio is high.

        Show
        Todd Lipcon added a comment - Do we have data to confirm that intermediate compression is only useful for reducing network traffic? It seems we're also reducing disk IO which can be a bottleneck especially when the core:disk ratio is high.
        Scott Chen made changes -
        Field Original Value New Value
        Description Currently if we set mapred.map.output.compression.codec
        1. MapTask will compress every spill, decompress every spill, merge and compress the final map output file
        2. ReduceTask will decompress, merge and compress every map output file. And repeat the compression/decompression every pass.

        This cause all the data being compressed/decompressed many times.
        The reason we need mapred.map.output.compression.codec is for network traffic.
        We should not compress/decompress the data again and again during merge sort.

        We should do the compression only for the final map output file that is been transmit over the network.
        Currently if we set mapred.map.output.compression.codec
        1. MapTask will compress every spill, decompress every spill, merge and compress the final map output file
        2. ReduceTask will decompress, merge and compress every map output file. And repeat the compression/decompression every pass.

        This causes all the data being compressed/decompressed many times.
        The reason we need mapred.map.output.compression.codec is for network traffic.
        We should not compress/decompress the data again and again during merge sort.

        We should only compress the final map output file that will be transmitted over the network.
        Scott Chen created issue -

          People

          • Assignee:
            Scott Chen
            Reporter:
            Scott Chen
          • Votes:
            0 Vote for this issue
            Watchers:
            9 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development