Uploaded image for project: 'Hadoop HDFS'
  1. Hadoop HDFS
  2. HDFS-14099

Unknown frame descriptor when decompressing multiple frames in ZStandardDecompressor

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.4.0, 3.2.3, 3.3.2
    • 3.4.0, 3.2.3, 3.3.2
    • compress, io
    • Hadoop Version: hadoop-3.0.3

      Java Version: 1.8.0_144

    • Reviewed
    • Patch

    Description

      We need to use the ZSTD compression algorithm in Hadoop. So I write a simple demo like this for testing.

      // code placeholder
      while ((size = fsDataInputStream.read(bufferV2)) > 0 ) {
        countSize += size;
        if (countSize == 65536 * 8) {
          if(!isFinished) {
            // finish a frame in zstd
            cmpOut.finish();
            isFinished = true;
          }
          fsDataOutputStream.flush();
          fsDataOutputStream.hflush();
        }
        if(isFinished) {
          LOG.info("Will resetState. N=" + n);
          // reset the stream and write again
          cmpOut.resetState();
          isFinished = false;
        }
        cmpOut.write(bufferV2, 0, size);
        bufferV2 = new byte[5 * 1024 * 1024];
        n++;
      }
      

       

      And I use "hadoop fs -text"  to read this file and failed. The error as blow.

      Exception in thread "main" java.lang.InternalError: Unknown frame descriptor
      at org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.inflateBytesDirect(Native Method)
      at org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.decompress(ZStandardDecompressor.java:181)
      at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
      at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
      at java.io.InputStream.read(InputStream.java:101)
      at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:98)
      at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:66)
      at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:127)
      at org.apache.hadoop.fs.shell.Display$Cat.printToStdout(Display.java:101)
      at org.apache.hadoop.fs.shell.Display$Cat.processPath(Display.java:96)
      at org.apache.hadoop.fs.shell.Command.processPaths(Command.java:331)
      at org.apache.hadoop.fs.shell.Command.processPathArgument(Command.java:303)
      at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:285)
      at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:269)
      at org.apache.hadoop.fs.shell.FsCommand.processRawArguments(FsCommand.java:119)
      at org.apache.hadoop.fs.shell.Command.run(Command.java:176)
      at org.apache.hadoop.fs.FsShell.run(FsShell.java:328)
      at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
      at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
      at org.apache.hadoop.fs.FsShell.main(FsShell.java:391)
      

       

      So I had to look the code, include jni, then found this bug.

      ZSTD_initDStream(stream) method may by called twice in the same Frame.

      The first is  in ZStandardDecompressor.c. 

      if (size == 0) {
          (*env)->SetBooleanField(env, this, ZStandardDecompressor_finished, JNI_TRUE);
          size_t result = dlsym_ZSTD_initDStream(stream);
          if (dlsym_ZSTD_isError(result)) {
              THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result));
              return (jint) 0;
          }
      }
      

      This call here is correct, but Finished no longer be set to false, even if there is some datas (a new frame) in CompressedBuffer or UserBuffer need to be decompressed.

      The second is in org.apache.hadoop.io.compress.DecompressorStream by decompressor.reset(), because Finished is always true after decompressed a Frame.

      if (decompressor.finished()) {
        // First see if there was any leftover buffered input from previous
        // stream; if not, attempt to refill buffer.  If refill -> EOF, we're
        // all done; else reset, fix up input buffer, and get ready for next
        // concatenated substream/"member".
        int nRemaining = decompressor.getRemaining();
        if (nRemaining == 0) {
          int m = getCompressedData();
          if (m == -1) {
            // apparently the previous end-of-stream was also end-of-file:
            // return success, as if we had never called getCompressedData()
            eof = true;
            return -1;
          }
          decompressor.reset();
          decompressor.setInput(buffer, 0, m);
          lastBytesSent = m;
        } else {
          // looks like it's a concatenated stream:  reset low-level zlib (or
          // other engine) and buffers, then "resend" remaining input data
          decompressor.reset();
          int leftoverOffset = lastBytesSent - nRemaining;
          assert (leftoverOffset >= 0);
          // this recopies userBuf -> direct buffer if using native libraries:
          decompressor.setInput(buffer, leftoverOffset, nRemaining);
          // NOTE:  this is the one place we do NOT want to save the number
          // of bytes sent (nRemaining here) into lastBytesSent:  since we
          // are resending what we've already sent before, offset is nonzero
          // in general (only way it could be zero is if it already equals
          // nRemaining), which would then screw up the offset calculation
          // _next_ time around.  IOW, getRemaining() is in terms of the
          // original, zero-offset bufferload, so lastBytesSent must be as
          // well.  Cheesy ASCII art:
          //
          //          <------------ m, lastBytesSent ----------->
          //          +===============================================+
          // buffer:  |1111111111|22222222222222222|333333333333|     |
          //          +===============================================+
          //     #1:  <-- off -->|<-------- nRemaining --------->
          //     #2:  <----------- off ----------->|<-- nRem. -->
          //     #3:  (final substream:  nRemaining == 0; eof = true)
          //
          // If lastBytesSent is anything other than m, as shown, then "off"
          // will be calculated incorrectly.
        }
      }

      Attachments

        1. HDFS-14099-trunk-003.patch
          4 kB
          ZanderXu
        2. HDFS-14099-trunk-002.patch
          4 kB
          ZanderXu
        3. HDFS-14099-trunk-001.patch
          4 kB
          ZanderXu

        Issue Links

          Activity

            People

              xuzq_zander ZanderXu
              xuzq_zander ZanderXu
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1h 10m
                  1h 10m