Details
Description
We need to use the ZSTD compression algorithm in Hadoop. So I write a simple demo like this for testing.
// code placeholder while ((size = fsDataInputStream.read(bufferV2)) > 0 ) { countSize += size; if (countSize == 65536 * 8) { if(!isFinished) { // finish a frame in zstd cmpOut.finish(); isFinished = true; } fsDataOutputStream.flush(); fsDataOutputStream.hflush(); } if(isFinished) { LOG.info("Will resetState. N=" + n); // reset the stream and write again cmpOut.resetState(); isFinished = false; } cmpOut.write(bufferV2, 0, size); bufferV2 = new byte[5 * 1024 * 1024]; n++; }
And I use "hadoop fs -text" to read this file and failed. The error as blow.
Exception in thread "main" java.lang.InternalError: Unknown frame descriptor
at org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.inflateBytesDirect(Native Method)
at org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.decompress(ZStandardDecompressor.java:181)
at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
at java.io.InputStream.read(InputStream.java:101)
at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:98)
at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:66)
at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:127)
at org.apache.hadoop.fs.shell.Display$Cat.printToStdout(Display.java:101)
at org.apache.hadoop.fs.shell.Display$Cat.processPath(Display.java:96)
at org.apache.hadoop.fs.shell.Command.processPaths(Command.java:331)
at org.apache.hadoop.fs.shell.Command.processPathArgument(Command.java:303)
at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:285)
at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:269)
at org.apache.hadoop.fs.shell.FsCommand.processRawArguments(FsCommand.java:119)
at org.apache.hadoop.fs.shell.Command.run(Command.java:176)
at org.apache.hadoop.fs.FsShell.run(FsShell.java:328)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
at org.apache.hadoop.fs.FsShell.main(FsShell.java:391)
So I had to look the code, include jni, then found this bug.
ZSTD_initDStream(stream) method may by called twice in the same Frame.
The first is in ZStandardDecompressor.c.
if (size == 0) { (*env)->SetBooleanField(env, this, ZStandardDecompressor_finished, JNI_TRUE); size_t result = dlsym_ZSTD_initDStream(stream); if (dlsym_ZSTD_isError(result)) { THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result)); return (jint) 0; } }
This call here is correct, but Finished no longer be set to false, even if there is some datas (a new frame) in CompressedBuffer or UserBuffer need to be decompressed.
The second is in org.apache.hadoop.io.compress.DecompressorStream by decompressor.reset(), because Finished is always true after decompressed a Frame.
if (decompressor.finished()) { // First see if there was any leftover buffered input from previous // stream; if not, attempt to refill buffer. If refill -> EOF, we're // all done; else reset, fix up input buffer, and get ready for next // concatenated substream/"member". int nRemaining = decompressor.getRemaining(); if (nRemaining == 0) { int m = getCompressedData(); if (m == -1) { // apparently the previous end-of-stream was also end-of-file: // return success, as if we had never called getCompressedData() eof = true; return -1; } decompressor.reset(); decompressor.setInput(buffer, 0, m); lastBytesSent = m; } else { // looks like it's a concatenated stream: reset low-level zlib (or // other engine) and buffers, then "resend" remaining input data decompressor.reset(); int leftoverOffset = lastBytesSent - nRemaining; assert (leftoverOffset >= 0); // this recopies userBuf -> direct buffer if using native libraries: decompressor.setInput(buffer, leftoverOffset, nRemaining); // NOTE: this is the one place we do NOT want to save the number // of bytes sent (nRemaining here) into lastBytesSent: since we // are resending what we've already sent before, offset is nonzero // in general (only way it could be zero is if it already equals // nRemaining), which would then screw up the offset calculation // _next_ time around. IOW, getRemaining() is in terms of the // original, zero-offset bufferload, so lastBytesSent must be as // well. Cheesy ASCII art: // // <------------ m, lastBytesSent -----------> // +===============================================+ // buffer: |1111111111|22222222222222222|333333333333| | // +===============================================+ // #1: <-- off -->|<-------- nRemaining ---------> // #2: <----------- off ----------->|<-- nRem. --> // #3: (final substream: nRemaining == 0; eof = true) // // If lastBytesSent is anything other than m, as shown, then "off" // will be calculated incorrectly. } }
Attachments
Attachments
Issue Links
- relates to
-
HADOOP-17096 Fix ZStandardCompressor input buffer offset
- Resolved
- links to