Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.16.1, 1.16.2, 1.17.1
-
None
-
None
Description
Run the following test to reproduce this bug.
// code placeholder import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.io.DelimitedInputFormat; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.Test; import javax.xml.bind.DatatypeConverter; import java.io.IOException; public class MyTest { @Test public void myTest() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(5); String path = MyTest.class.getClassLoader().getResource("5parallel.dat").getPath(); final DelimitedInputFormat<byte[]> inputFormat = new TestInputFormat(); // The delimiter is "B87E7E7E" inputFormat.setDelimiter(new byte[]{(byte) 184, (byte) 126, (byte) 126, (byte) 126}); // Set buffer size less than default value of 1M for easily debugging inputFormat.setBufferSize(128); DataStreamSource<byte[]> source = env.readFile(inputFormat, path); source.map(new MapFunction<byte[], Object>() { @Override public Object map(byte[] value) throws Exception { System.out.println(DatatypeConverter.printHexBinary(value)); return value; } }).setParallelism(1); env.execute(); } private class TestInputFormat extends DelimitedInputFormat<byte[]> { @Override public byte[] readRecord(byte[] reuse, byte[] bytes, int offset, int numBytes) throws IOException { final int delimiterLen = this.getDelimiter().length; if (numBytes > 0) { byte[] record = new byte[delimiterLen + numBytes]; System.arraycopy(this.getDelimiter(), 0, record, 0, delimiterLen); System.arraycopy(bytes, offset, record, delimiterLen, numBytes); return record; } return new byte[0]; } } }
The actually output result is:
// code placeholder
B87E7E7E1A00EB900A4EDC6850160070F6BED4631321ADDC6F06DC137C221E99
B87E7E7E1A00EB900A4EDC6A51160070F61A8AFE022A3EC67718002A217C2181
B87E7E7E1A00EB900A4EDC6D5516
The expected output result shoud be:
// code placeholder
B87E7E7E1A00EB900A4EDC6850160070F6BED4631321ADDC6F06DC137C221E99
B87E7E7E1A00EB900A4EDC6B52150070F6BE468EFD20BEEEB756E03FD7F653D0
B87E7E7E1A00EB900A4EDC6D5516
B87E7E7E1A00EB900A4EDC6A51160070F61A8AFE022A3EC67718002A217C2181
The view of a delimit is seperated to two splits (The tail of line 2 and head of line 3):