Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2182

Spooling Directory Source will not ingest data completely when a wide character appears at the edge of a buffer

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: v1.4.0
    • Fix Version/s: v1.5.0
    • Component/s: Sinks+Sources
    • Labels:
      None

      Description

      the bug is in ResettableFileInputStream.java: int readChar().
      if the last byte of buf is only a partial of a wide character, readChar() shouldn't return -1(ResettableFileInputStream.java:186). it
      loses the remanent data in a file.

      I fix it such as:
      public synchronized int readChar() throws IOException {
      // if (!buf.hasRemaining()) {
      if(buf.limit()- buf.position < 10)

      { refillBuf(); }

      int start = buf.position();
      charBuf.clear();

      boolean isEndOfInput = false;
      if (position >= fileSize)

      { isEndOfInput = true; }

      CoderResult res = decoder.decode(buf, charBuf, isEndOfInput);
      if (res.isMalformed() || res.isUnmappable())

      { res.throwException(); }

      int delta = buf.position() - start;

      charBuf.flip();
      if (charBuf.hasRemaining())

      { char c = charBuf.get(); // don't increment the persisted location if we are in between a // surrogate pair, otherwise we may never recover if we seek() to this // location! incrPosition(delta, !Character.isHighSurrogate(c)); return c; // there may be a partial character in the decoder buffer }

      else

      { incrPosition(delta, false); return -1; }

      }

      it avoid a partial character, but have new issue. sometime, some lines of a log file have a repeated character.
      eg.
      original file: 123456
      sink file: 1233456

      1. Fix_for_FLUME-2182.patch
        4 kB
        Sven Meys
      2. ModifiedLineDeserializer.java
        8 kB
        Sven Meys

        Issue Links

          Activity

          Hide
          syntonyliu syntony liu added a comment -

          Do I fix it correctly?
          Why the hdfs sink file have a repeated character in some line?

          Show
          syntonyliu syntony liu added a comment - Do I fix it correctly? Why the hdfs sink file have a repeated character in some line?
          Hide
          syntonyliu syntony liu added a comment - - edited

          in addition, I modify decoder . it can't throw Exception.
          public ResettableFileInputStream(File file, PositionTracker tracker,
          int bufSize, Charset charset)
          throws IOException

          { ...... this.decoder = charset.newDecoder(); this.decoder.implOnMalformedInput(CodingErrorAction.REPLACE); // add by me this.decoder.implOnUnmappableCharacter(CodingErrorAction.REPLACE); // add by me ...... }
          Show
          syntonyliu syntony liu added a comment - - edited in addition, I modify decoder . it can't throw Exception. public ResettableFileInputStream(File file, PositionTracker tracker, int bufSize, Charset charset) throws IOException { ...... this.decoder = charset.newDecoder(); this.decoder.implOnMalformedInput(CodingErrorAction.REPLACE); // add by me this.decoder.implOnUnmappableCharacter(CodingErrorAction.REPLACE); // add by me ...... }
          Hide
          svenmeys Sven Meys added a comment -

          I found the same error.
          Happens when the byte buffer reaches it's end, so only a part of a character is read.

          I also tried to solve it using the method described in the bug report, by ensuring there are at least 32 bytes in the buffer at all times. But I found that somehow, between the the millions of lines parsed, there is one extra line written to the output that contains exactly as many characters as the extra space you allow for the buffer refill.

          To solve this, I commented out the line chan.position(position); in the refilBuff() method.

          However, this may cause a whole series of other problems, which I have no time researching right now.

          Show
          svenmeys Sven Meys added a comment - I found the same error. Happens when the byte buffer reaches it's end, so only a part of a character is read. I also tried to solve it using the method described in the bug report, by ensuring there are at least 32 bytes in the buffer at all times. But I found that somehow, between the the millions of lines parsed, there is one extra line written to the output that contains exactly as many characters as the extra space you allow for the buffer refill. To solve this, I commented out the line chan.position(position); in the refilBuff() method. However, this may cause a whole series of other problems, which I have no time researching right now.
          Hide
          svenmeys Sven Meys added a comment -

          Included a workaround that we're using until a permanent fix is created.

          Show
          svenmeys Sven Meys added a comment - Included a workaround that we're using until a permanent fix is created.
          Hide
          jlord Jeff Lord added a comment -

          I believe this issue is a duplicate of FLUME-2052

          Show
          jlord Jeff Lord added a comment - I believe this issue is a duplicate of FLUME-2052
          Hide
          svenmeys Sven Meys added a comment -

          It is related, but not the same.

          In the case of FLUME-2052, they are dealing with corrupt characters that cause the readCahr method to throw a MalformedException.

          In this specific case no such exceptions are thrown at all. Even when explicitly turning on reporting in the decoder. Instead, the readChar method returns a -1, which denotes end-of-file, and continues to do so in further attempts to read new characters.

          I did some additional investigation. The workaround works as a temporary patch. The fix we proposed here by checking for minimum character width, is also kind-of a hack. But I found the real culprit and a fix that makes the code more robust.

          The culprit: delta. When calling decoder.decode on an incomplete character (eg: two bytes of a three-byte char), the decoder does not ingest the chars, nor advances the position of the byte buffer.

          This results in an empty char buffer, which returns -1 as expected, but also in a delta(number of bytes processed) that is zero. At the end of the method, the filepointer(position) is increased by delta (0 in this case).

          Any following call to readChar will return -1, as the byte buffer did not advance its position and won't refill.

          Forcing a call to refillBuffers at this point will result in strange behavior as your byte buffer still has the bytes of an incomplete character in it and the file pointer is positioned before that exact same set of bytes.

          The fix: Check if delta is 0. If so, clear the bytebuffer, refill it, continue.

          Here's the fixed function (note, this might not fix FLUME-2052):

              @Override
              public synchronized int readChar() throws IOException {
                  if (!buf.hasRemaining()) {
                      refillBuf();
                  }
          
                  int start = buf.position();
                  charBuf.clear();
          
                  boolean isEndOfInput = false;
                  if (position >= fileSize) {
                      isEndOfInput = true;
                  }
          
                  CoderResult res = decoder.decode(buf, charBuf, isEndOfInput);
                  int delta = buf.position() - start;
          
                  if(delta == 0 && buf.hasRemaining()) {
                      logger.debug("Incomplete character detected! Reloading buffer");
                      buf.clear();
                      buf.flip();
                      refillBuf();
          
                      start = buf.position();
                      res = decoder.decode(buf, charBuf, isEndOfInput);
                      delta = buf.position() - start;
                  }
          
                  if (res.isMalformed() || res.isUnmappable()) {
                      res.throwException();
                  }
          
                  charBuf.flip();
                  if (charBuf.hasRemaining()) {
                      char c = charBuf.get();
                      // don't increment the persisted location if we are in between a
                      // surrogate pair, otherwise we may never recover if we seek() to this
                      // location!
                      incrPosition(delta, !Character.isHighSurrogate(c));
                      return c;
          
                      // there may be a partial character in the decoder buffer
                  } else {
                      incrPosition(delta, false);
                      return -1;
                  }
              }
          
          Show
          svenmeys Sven Meys added a comment - It is related, but not the same. In the case of FLUME-2052 , they are dealing with corrupt characters that cause the readCahr method to throw a MalformedException. In this specific case no such exceptions are thrown at all. Even when explicitly turning on reporting in the decoder. Instead, the readChar method returns a -1, which denotes end-of-file, and continues to do so in further attempts to read new characters. I did some additional investigation. The workaround works as a temporary patch. The fix we proposed here by checking for minimum character width, is also kind-of a hack. But I found the real culprit and a fix that makes the code more robust. The culprit: delta. When calling decoder.decode on an incomplete character (eg: two bytes of a three-byte char), the decoder does not ingest the chars, nor advances the position of the byte buffer. This results in an empty char buffer, which returns -1 as expected, but also in a delta(number of bytes processed) that is zero. At the end of the method, the filepointer(position) is increased by delta (0 in this case). Any following call to readChar will return -1, as the byte buffer did not advance its position and won't refill. Forcing a call to refillBuffers at this point will result in strange behavior as your byte buffer still has the bytes of an incomplete character in it and the file pointer is positioned before that exact same set of bytes. The fix: Check if delta is 0. If so, clear the bytebuffer, refill it, continue. Here's the fixed function (note, this might not fix FLUME-2052 ): @Override public synchronized int readChar() throws IOException { if (!buf.hasRemaining()) { refillBuf(); } int start = buf.position(); charBuf.clear(); boolean isEndOfInput = false ; if (position >= fileSize) { isEndOfInput = true ; } CoderResult res = decoder.decode(buf, charBuf, isEndOfInput); int delta = buf.position() - start; if (delta == 0 && buf.hasRemaining()) { logger.debug( "Incomplete character detected! Reloading buffer" ); buf.clear(); buf.flip(); refillBuf(); start = buf.position(); res = decoder.decode(buf, charBuf, isEndOfInput); delta = buf.position() - start; } if (res.isMalformed() || res.isUnmappable()) { res.throwException(); } charBuf.flip(); if (charBuf.hasRemaining()) { char c = charBuf.get(); // don't increment the persisted location if we are in between a // surrogate pair, otherwise we may never recover if we seek() to this // location! incrPosition(delta, ! Character .isHighSurrogate(c)); return c; // there may be a partial character in the decoder buffer } else { incrPosition(delta, false ); return -1; } }
          Hide
          svenmeys Sven Meys added a comment - - edited

          An other, more defensive way to fix this would be to do the following:

          // Add this class variable
          private final int maxCharWidth;
          
          // Add this to the constructor
          this.maxCharWidth = (int)charset.newEncoder().maxBytesPerChar();
          
          
          // Replace this in the readChar() method
          //if(!buf.hasRemaining) {
          //    refillBuf();
          //}
          if (buf.remaining() < maxCharWidth) {
             buf.clear();
             buf.flip();
             refillBuf();
          }
          
          
          Show
          svenmeys Sven Meys added a comment - - edited An other, more defensive way to fix this would be to do the following: // Add this class variable private final int maxCharWidth; // Add this to the constructor this .maxCharWidth = ( int )charset.newEncoder().maxBytesPerChar(); // Replace this in the readChar() method // if (!buf.hasRemaining) { // refillBuf(); //} if (buf.remaining() < maxCharWidth) { buf.clear(); buf.flip(); refillBuf(); }
          Hide
          mpercy Mike Percy added a comment -

          Hey Sven, thanks for reporting this! Your patch looks good.

          Can you please add a unit test for this that fails before your patch and passes with your patch?

          Show
          mpercy Mike Percy added a comment - Hey Sven, thanks for reporting this! Your patch looks good. Can you please add a unit test for this that fails before your patch and passes with your patch?
          Hide
          mpercy Mike Percy added a comment -

          Also: please add a comment above the "if (buf.remaining() < maxCharWidth) {" block with something to the effect of:

          // The decoder can have issues with partial multi-byte characters,
          // so ensure that we maintain at least maxCharWidth characters in
          // the buffer up until EOF.

          Show
          mpercy Mike Percy added a comment - Also: please add a comment above the "if (buf.remaining() < maxCharWidth) {" block with something to the effect of: // The decoder can have issues with partial multi-byte characters, // so ensure that we maintain at least maxCharWidth characters in // the buffer up until EOF.
          Hide
          svenmeys Sven Meys added a comment - - edited

          There ya go. Fix submitted, now with unit test(for UTF-8) and comments.

          Show
          svenmeys Sven Meys added a comment - - edited There ya go. Fix submitted, now with unit test(for UTF-8) and comments.
          Hide
          mpercy Mike Percy added a comment -

          +1

          Thanks Sven, looks good! I'm running all the unit tests now and then I will commit this.

          Show
          mpercy Mike Percy added a comment - +1 Thanks Sven, looks good! I'm running all the unit tests now and then I will commit this.
          Hide
          mpercy Mike Percy added a comment -

          I just pushed this to trunk and flume-1.5 branches.

          Removing the relnote... I haven't tried the workaround deserializer but this patch appears to fix the core issue.

          Show
          mpercy Mike Percy added a comment - I just pushed this to trunk and flume-1.5 branches. Removing the relnote... I haven't tried the workaround deserializer but this patch appears to fix the core issue.
          Hide
          hudson Hudson added a comment -

          FAILURE: Integrated in flume-trunk #502 (See https://builds.apache.org/job/flume-trunk/502/)
          FLUME-2182. Spooling Directory Source will not ingest data completely when a wide character appears at the edge of a buffer (mpercy: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=ffa706429186df2cf8ad04fd9dcba37b6a35d7f1)

          • flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java
          • flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
          Show
          hudson Hudson added a comment - FAILURE: Integrated in flume-trunk #502 (See https://builds.apache.org/job/flume-trunk/502/ ) FLUME-2182 . Spooling Directory Source will not ingest data completely when a wide character appears at the edge of a buffer (mpercy: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=ffa706429186df2cf8ad04fd9dcba37b6a35d7f1 ) flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java

            People

            • Assignee:
              svenmeys Sven Meys
              Reporter:
              syntonyliu syntony liu
            • Votes:
              1 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development