Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-32593

DelimitedInputFormat will cause record loss for multi-bytes delimit when a delimit is seperated to two splits

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.16.1, 1.16.2, 1.17.1
    • None
    • API / Core
    • None

    Description

      Run the following test to reproduce this bug.

      // code placeholder
      import org.apache.flink.api.common.functions.MapFunction;
      import org.apache.flink.api.common.io.DelimitedInputFormat;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.junit.Test;
      
      import javax.xml.bind.DatatypeConverter;
      import java.io.IOException;
      
      public class MyTest {
      
        @Test
        public void myTest() throws Exception {
          final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          env.setParallelism(5);
      
          String path = MyTest.class.getClassLoader().getResource("5parallel.dat").getPath();
      
          final DelimitedInputFormat<byte[]> inputFormat = new TestInputFormat();
          // The delimiter is "B87E7E7E"
          inputFormat.setDelimiter(new byte[]{(byte) 184, (byte) 126, (byte) 126, (byte) 126});
          // Set buffer size less than default value of 1M for easily debugging
          inputFormat.setBufferSize(128);
      
          DataStreamSource<byte[]> source = env.readFile(inputFormat, path);
      
          source.map(new MapFunction<byte[], Object>() {
            @Override
            public Object map(byte[] value) throws Exception {
              System.out.println(DatatypeConverter.printHexBinary(value));
              return value;
            }
          }).setParallelism(1);
      
          env.execute();
        }
      
        private class TestInputFormat extends DelimitedInputFormat<byte[]> {
          @Override
          public byte[] readRecord(byte[] reuse, byte[] bytes, int offset, int numBytes) throws IOException {
            final int delimiterLen = this.getDelimiter().length;
      
            if (numBytes > 0) {
              byte[] record = new byte[delimiterLen + numBytes];
              System.arraycopy(this.getDelimiter(), 0, record, 0, delimiterLen);
              System.arraycopy(bytes, offset, record, delimiterLen, numBytes);
              return record;
            }
      
            return new byte[0];
          }
        }
      }
       

       

      The actually output result is:

      // code placeholder
      B87E7E7E1A00EB900A4EDC6850160070F6BED4631321ADDC6F06DC137C221E99
      B87E7E7E1A00EB900A4EDC6A51160070F61A8AFE022A3EC67718002A217C2181
      B87E7E7E1A00EB900A4EDC6D5516 

       

      The expected output result shoud be:

      // code placeholder
      B87E7E7E1A00EB900A4EDC6850160070F6BED4631321ADDC6F06DC137C221E99
      B87E7E7E1A00EB900A4EDC6B52150070F6BE468EFD20BEEEB756E03FD7F653D0
      B87E7E7E1A00EB900A4EDC6D5516
      B87E7E7E1A00EB900A4EDC6A51160070F61A8AFE022A3EC67718002A217C2181 

      The view of a delimit is seperated to two splits (The tail of line 2 and head of line 3):

      Attachments

        1. 5parallel.dat
          0.1 kB
          Zhaofu Liu
        2. image-2023-07-15-10-30-03-740.png
          38 kB
          Zhaofu Liu

        Activity

          People

            Unassigned Unassigned
            thlzhf00 Zhaofu Liu
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: