Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4329

Fix Streaming File Source Timestamps/Watermarks Handling

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.1.0
    • Fix Version/s: 1.2.0, 1.1.3
    • Component/s: Streaming Connectors
    • Labels:
      None

      Description

      The ContinuousFileReaderOperator does not correctly deal with watermarks, i.e. they are just passed through. This means that when the ContinuousFileMonitoringFunction closes and emits a Long.MAX_VALUE that watermark can "overtake" the records that are to be emitted in the ContinuousFileReaderOperator. Together with the new "allowed lateness" setting in window operator this can lead to elements being dropped as late.

      Also, ContinuousFileReaderOperator does not correctly assign ingestion timestamps since it is not technically a source but looks like one to the user.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user kl0u opened a pull request:

          https://github.com/apache/flink/pull/2350

          FLINK-4329 Fix Streaming File Source Timestamps/Watermarks Handling

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/kl0u/flink continuous_file_fix

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2350.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2350


          commit 6207a9f5da086d808331afe0e8caf0f03b3fabc5
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-08-09T12:11:45Z

          FLINK-4329 Fix Streaming File Source Timestamps/Watermarks Handling

          Now the ContinuousFileReaderOperator ignores the watermarks sent by
          the source function and emits its own watermarks in case we are
          opearating on Ingestion time. In addition, and for Ingestion time
          only, the reader also assigns the correct timestamps to the elements
          that it reads.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/2350 FLINK-4329 Fix Streaming File Source Timestamps/Watermarks Handling You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink continuous_file_fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2350.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2350 commit 6207a9f5da086d808331afe0e8caf0f03b3fabc5 Author: kl0u <kkloudas@gmail.com> Date: 2016-08-09T12:11:45Z FLINK-4329 Fix Streaming File Source Timestamps/Watermarks Handling Now the ContinuousFileReaderOperator ignores the watermarks sent by the source function and emits its own watermarks in case we are opearating on Ingestion time. In addition, and for Ingestion time only, the reader also assigns the correct timestamps to the elements that it reads.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2350

          How does this fix work?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2350 How does this fix work?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2350

          The way it works is that now the reader gets a ReaderContext and emits its own watermarks depending on which timeCharacteristic we are operating on. If it is on IngestionTime, which was the original problem, we emit periodically. In addition, in this case, it assigns timestamps to the emitted elements.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2350 The way it works is that now the reader gets a ReaderContext and emits its own watermarks depending on which timeCharacteristic we are operating on. If it is on IngestionTime, which was the original problem, we emit periodically. In addition, in this case, it assigns timestamps to the emitted elements.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2350#discussion_r75297857

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java —
          @@ -179,7 +195,16 @@ public void close() throws Exception

          { // called by the StreamTask while having it. checkpointLock.wait(); }
          • collector.close();
            +
            + // finally if we are closed normally and we are operating on
            + // event or ingestion time, emit the max watermark indicating
            + // the end of the stream, like a normal source would do.
            +
            + readerContext.emitWatermark(Watermark.MAX_WATERMARK);
            + if (readerContext != null) {
              • End diff –

          if `readerContext` is null, we'll get a NPE in the line before.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75297857 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java — @@ -179,7 +195,16 @@ public void close() throws Exception { // called by the StreamTask while having it. checkpointLock.wait(); } collector.close(); + + // finally if we are closed normally and we are operating on + // event or ingestion time, emit the max watermark indicating + // the end of the stream, like a normal source would do. + + readerContext.emitWatermark(Watermark.MAX_WATERMARK); + if (readerContext != null) { End diff – if `readerContext` is null, we'll get a NPE in the line before.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2350#discussion_r75298028

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java —
          @@ -146,16 +146,24 @@ void checkAsyncException() {
          private final Output<StreamRecord<T>> output;
          private final StreamRecord<T> reuse;

          • public NonTimestampContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) {
          • this.owner = owner;
            + public NonTimestampContext(AbstractStreamOperator<T> owner, Object lockingObject, Output<StreamRecord<T>> output) {
            this.lockingObject = lockingObject;
            this.output = output;
            this.reuse = new StreamRecord<T>(null);
            +
            + // if it is a source, then we cast and cache it
            + // here so that we do not have to do it in every collect(),
            + // collectWithTimestamp() and emitWatermark()
            +
            + this.owner = (owner instanceof StreamSource) ?
            + (StreamSource) owner : null;
              • End diff –

          This looks a bit hacky. How about you add an interface `AsyncException` or so, that all classes the `NonTimestampContext` are using can use.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75298028 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java — @@ -146,16 +146,24 @@ void checkAsyncException() { private final Output<StreamRecord<T>> output; private final StreamRecord<T> reuse; public NonTimestampContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) { this.owner = owner; + public NonTimestampContext(AbstractStreamOperator<T> owner, Object lockingObject, Output<StreamRecord<T>> output) { this.lockingObject = lockingObject; this.output = output; this.reuse = new StreamRecord<T>(null); + + // if it is a source, then we cast and cache it + // here so that we do not have to do it in every collect(), + // collectWithTimestamp() and emitWatermark() + + this.owner = (owner instanceof StreamSource) ? + (StreamSource) owner : null; End diff – This looks a bit hacky. How about you add an interface `AsyncException` or so, that all classes the `NonTimestampContext` are using can use.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2350#discussion_r75298087

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java —
          @@ -146,16 +146,24 @@ void checkAsyncException() {
          private final Output<StreamRecord<T>> output;
          private final StreamRecord<T> reuse;

          • public NonTimestampContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) {
          • this.owner = owner;
            + public NonTimestampContext(AbstractStreamOperator<T> owner, Object lockingObject, Output<StreamRecord<T>> output) {
            this.lockingObject = lockingObject;
            this.output = output;
            this.reuse = new StreamRecord<T>(null);
            +
            + // if it is a source, then we cast and cache it
            + // here so that we do not have to do it in every collect(),
            + // collectWithTimestamp() and emitWatermark()
            +
            + this.owner = (owner instanceof StreamSource) ?
            + (StreamSource) owner : null;
              • End diff –

          For the file reader, the method can just be empty

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75298087 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java — @@ -146,16 +146,24 @@ void checkAsyncException() { private final Output<StreamRecord<T>> output; private final StreamRecord<T> reuse; public NonTimestampContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) { this.owner = owner; + public NonTimestampContext(AbstractStreamOperator<T> owner, Object lockingObject, Output<StreamRecord<T>> output) { this.lockingObject = lockingObject; this.output = output; this.reuse = new StreamRecord<T>(null); + + // if it is a source, then we cast and cache it + // here so that we do not have to do it in every collect(), + // collectWithTimestamp() and emitWatermark() + + this.owner = (owner instanceof StreamSource) ? + (StreamSource) owner : null; End diff – For the file reader, the method can just be empty
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2350#discussion_r75303417

          — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java —
          @@ -106,6 +109,140 @@ public static void destroyHDFS() {
          // TESTS

          @Test
          + public void testFileReadingOperatorWithIngestionTime() throws Exception {
          + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
          + Map<Integer, String> expectedFileContents = new HashMap<>();
          + for(int i = 0; i < NO_OF_FILES; i++)

          { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + }

          +
          + TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
          + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
          +
          + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
          +
          + StreamConfig streamConfig = new StreamConfig(new Configuration());
          + streamConfig.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
          +
          + ExecutionConfig executionConfig = new ExecutionConfig();
          + executionConfig.setAutoWatermarkInterval(100);
          +
          + TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
          + OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
          + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider, streamConfig);
          +
          + reader.setOutputType(typeInfo, new ExecutionConfig());
          — End diff –

          You can reuse the EC created a few lines above

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75303417 — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java — @@ -106,6 +109,140 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>(); + Map<Integer, String> expectedFileContents = new HashMap<>(); + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); + + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); + + StreamConfig streamConfig = new StreamConfig(new Configuration()); + streamConfig.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(100); + + TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider, streamConfig); + + reader.setOutputType(typeInfo, new ExecutionConfig()); — End diff – You can reuse the EC created a few lines above
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2350#discussion_r75303846

          — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java —
          @@ -106,6 +109,140 @@ public static void destroyHDFS() {
          // TESTS

          @Test
          + public void testFileReadingOperatorWithIngestionTime() throws Exception {
          + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
          + Map<Integer, String> expectedFileContents = new HashMap<>();
          + for(int i = 0; i < NO_OF_FILES; i++)

          { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + }

          +
          + TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
          + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
          +
          + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
          +
          + StreamConfig streamConfig = new StreamConfig(new Configuration());
          + streamConfig.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
          +
          + ExecutionConfig executionConfig = new ExecutionConfig();
          + executionConfig.setAutoWatermarkInterval(100);
          +
          + TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
          + OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
          + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider, streamConfig);
          +
          + reader.setOutputType(typeInfo, new ExecutionConfig());
          + tester.open();
          +
          + timeServiceProvider.setCurrentTime(0);
          +
          + long elementTimestamp = 201;
          + timeServiceProvider.setCurrentTime(elementTimestamp);
          +
          + // test that a watermark is actually emitted
          + Assert.assertTrue(tester.getOutput().size() == 1 &&
          + tester.getOutput().peek() instanceof Watermark &&
          + ((Watermark) tester.getOutput().peek()).getTimestamp() == 200);
          — End diff –

          You don't need to change it, but I think it's a good idea to test the conditions independently. This allows you to see which condition was false, based on the line number.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75303846 — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java — @@ -106,6 +109,140 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>(); + Map<Integer, String> expectedFileContents = new HashMap<>(); + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); + + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); + + StreamConfig streamConfig = new StreamConfig(new Configuration()); + streamConfig.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(100); + + TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider, streamConfig); + + reader.setOutputType(typeInfo, new ExecutionConfig()); + tester.open(); + + timeServiceProvider.setCurrentTime(0); + + long elementTimestamp = 201; + timeServiceProvider.setCurrentTime(elementTimestamp); + + // test that a watermark is actually emitted + Assert.assertTrue(tester.getOutput().size() == 1 && + tester.getOutput().peek() instanceof Watermark && + ((Watermark) tester.getOutput().peek()).getTimestamp() == 200); — End diff – You don't need to change it, but I think it's a good idea to test the conditions independently. This allows you to see which condition was false, based on the line number.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2350#discussion_r75304033

          — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java —
          @@ -106,6 +109,140 @@ public static void destroyHDFS() {
          // TESTS

          @Test
          + public void testFileReadingOperatorWithIngestionTime() throws Exception {
          + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
          + Map<Integer, String> expectedFileContents = new HashMap<>();
          + for(int i = 0; i < NO_OF_FILES; i++)

          { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + }

          +
          + TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
          + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
          +
          + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
          +
          + StreamConfig streamConfig = new StreamConfig(new Configuration());
          + streamConfig.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
          +
          + ExecutionConfig executionConfig = new ExecutionConfig();
          + executionConfig.setAutoWatermarkInterval(100);
          +
          + TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
          + OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
          + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider, streamConfig);
          +
          + reader.setOutputType(typeInfo, new ExecutionConfig());
          + tester.open();
          +
          + timeServiceProvider.setCurrentTime(0);
          +
          + long elementTimestamp = 201;
          + timeServiceProvider.setCurrentTime(elementTimestamp);
          — End diff –

          Can you quickly explain how this works?
          Is the `OneInputStreamOperatorTestHarness` starting a thread in the background emitting watermarks?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75304033 — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java — @@ -106,6 +109,140 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>(); + Map<Integer, String> expectedFileContents = new HashMap<>(); + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); + + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); + + StreamConfig streamConfig = new StreamConfig(new Configuration()); + streamConfig.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(100); + + TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider, streamConfig); + + reader.setOutputType(typeInfo, new ExecutionConfig()); + tester.open(); + + timeServiceProvider.setCurrentTime(0); + + long elementTimestamp = 201; + timeServiceProvider.setCurrentTime(elementTimestamp); — End diff – Can you quickly explain how this works? Is the `OneInputStreamOperatorTestHarness` starting a thread in the background emitting watermarks?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2350#discussion_r75304305

          — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java —
          @@ -106,6 +109,140 @@ public static void destroyHDFS() {
          // TESTS

          @Test
          + public void testFileReadingOperatorWithIngestionTime() throws Exception {
          + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
          + Map<Integer, String> expectedFileContents = new HashMap<>();
          + for(int i = 0; i < NO_OF_FILES; i++)

          { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + }

          +
          + TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
          + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
          +
          + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
          +
          + StreamConfig streamConfig = new StreamConfig(new Configuration());
          + streamConfig.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
          +
          + ExecutionConfig executionConfig = new ExecutionConfig();
          + executionConfig.setAutoWatermarkInterval(100);
          +
          + TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
          + OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
          + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider, streamConfig);
          +
          + reader.setOutputType(typeInfo, new ExecutionConfig());
          + tester.open();
          +
          + timeServiceProvider.setCurrentTime(0);
          +
          + long elementTimestamp = 201;
          + timeServiceProvider.setCurrentTime(elementTimestamp);
          +
          + // test that a watermark is actually emitted
          + Assert.assertTrue(tester.getOutput().size() == 1 &&
          + tester.getOutput().peek() instanceof Watermark &&
          + ((Watermark) tester.getOutput().peek()).getTimestamp() == 200);
          +
          + // create the necessary splits for the test
          + FileInputSplit[] splits = format.createInputSplits(
          + reader.getRuntimeContext().getNumberOfParallelSubtasks());
          +
          + // and feed them to the operator
          + for(FileInputSplit split: splits)

          { + tester.processElement(new StreamRecord<>(split)); + }

          +
          + /*
          + * Given that the reader is multithreaded, the test finishes before the reader thread finishes
          + * reading. This results in files being deleted by the test before being read, thus throwing an exception.
          + * In addition, even if file deletion happens at the end, the results are not ready for testing.
          + * To face this, we wait until all the output is collected or until the waiting time exceeds 1000 ms, or 1s.
          + */
          +
          + long start = System.currentTimeMillis();
          + Queue<Object> output;
          + do

          { + output = tester.getOutput(); + Thread.sleep(50); + }

          while ((output == null || output.size() != NO_OF_FILES * LINES_PER_FILE) && (System.currentTimeMillis() - start) < 1000);
          — End diff –

          I wonder if this can lead to unstable tests (for example on Travis).
          What if the output needs more than one second to show up?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75304305 — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java — @@ -106,6 +109,140 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>(); + Map<Integer, String> expectedFileContents = new HashMap<>(); + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); + + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); + + StreamConfig streamConfig = new StreamConfig(new Configuration()); + streamConfig.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(100); + + TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider, streamConfig); + + reader.setOutputType(typeInfo, new ExecutionConfig()); + tester.open(); + + timeServiceProvider.setCurrentTime(0); + + long elementTimestamp = 201; + timeServiceProvider.setCurrentTime(elementTimestamp); + + // test that a watermark is actually emitted + Assert.assertTrue(tester.getOutput().size() == 1 && + tester.getOutput().peek() instanceof Watermark && + ((Watermark) tester.getOutput().peek()).getTimestamp() == 200); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + for(FileInputSplit split: splits) { + tester.processElement(new StreamRecord<>(split)); + } + + /* + * Given that the reader is multithreaded, the test finishes before the reader thread finishes + * reading. This results in files being deleted by the test before being read, thus throwing an exception. + * In addition, even if file deletion happens at the end, the results are not ready for testing. + * To face this, we wait until all the output is collected or until the waiting time exceeds 1000 ms, or 1s. + */ + + long start = System.currentTimeMillis(); + Queue<Object> output; + do { + output = tester.getOutput(); + Thread.sleep(50); + } while ((output == null || output.size() != NO_OF_FILES * LINES_PER_FILE) && (System.currentTimeMillis() - start) < 1000); — End diff – I wonder if this can lead to unstable tests (for example on Travis). What if the output needs more than one second to show up?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2350#discussion_r75488070

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java —
          @@ -103,12 +103,28 @@ public void open() throws Exception {
          this.format.setRuntimeContext(getRuntimeContext());
          this.format.configure(new Configuration());

          • this.collector = new TimestampedCollector<>(output);
            this.checkpointLock = getContainingTask().getCheckpointLock();

          Preconditions.checkState(reader == null, "The reader is already initialized.");

          • this.reader = new SplitReader<>(format, serializer, collector, checkpointLock, readerState);
            + // set the reader context based on the time characteristic
              • End diff –

          I think both the `SourceContext` plus subclasses and this instantiation code should be moved out of the sources since it is now used for more than that.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75488070 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java — @@ -103,12 +103,28 @@ public void open() throws Exception { this.format.setRuntimeContext(getRuntimeContext()); this.format.configure(new Configuration()); this.collector = new TimestampedCollector<>(output); this.checkpointLock = getContainingTask().getCheckpointLock(); Preconditions.checkState(reader == null, "The reader is already initialized."); this.reader = new SplitReader<>(format, serializer, collector, checkpointLock, readerState); + // set the reader context based on the time characteristic End diff – I think both the `SourceContext` plus subclasses and this instantiation code should be moved out of the sources since it is now used for more than that.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/2350

          I made one inline comments about moving the `SourceContext` and the instantiation code.

          Also, the problem with the "async exception check" can be solved by introducing an interface `AsyncExceptionChecker` that is passed to the context. (I think that's what @rmetzger was hinting at.)

          Even better yet, we might be able to get rid of that stuff by using `task.failExternally()` in all places that previously made these async checks necessary. (The file read operator already uses that, btw)

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2350 I made one inline comments about moving the `SourceContext` and the instantiation code. Also, the problem with the "async exception check" can be solved by introducing an interface `AsyncExceptionChecker` that is passed to the context. (I think that's what @rmetzger was hinting at.) Even better yet, we might be able to get rid of that stuff by using `task.failExternally()` in all places that previously made these async checks necessary. (The file read operator already uses that, btw)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2350#discussion_r75647030

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java —
          @@ -63,22 +66,22 @@
          */
          @Internal
          public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT>

          • implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT> {
            + implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT>, AsyncExceptionChecker {

          private static final long serialVersionUID = 1L;

          private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);

          + @VisibleForTesting
          — End diff –

          This doesn't do anything. It's just a marker interface.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75647030 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java — @@ -63,22 +66,22 @@ */ @Internal public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT> { + implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT>, AsyncExceptionChecker { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class); + @VisibleForTesting — End diff – This doesn't do anything. It's just a marker interface.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/2350

          This looks very good now! ๐Ÿ‘

          I'm running it a last time on Travis and them I'm merging.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2350 This looks very good now! ๐Ÿ‘ I'm running it a last time on Travis and them I'm merging.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2350#discussion_r75651032

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java —
          @@ -214,14 +216,26 @@ public AutomaticWatermarkContext(
          this.watermarkInterval = watermarkInterval;
          this.reuse = new StreamRecord<T>(null);

          + // if it is a source, then we cast and cache it
          + // here so that we do not have to do it in every collect(),
          + // collectWithTimestamp() and emitWatermark()
          +
          + if (!(owner instanceof AsyncExceptionChecker))

          { + throw new IllegalStateException("The ManualWatermarkContext can only be used " + + "with sources that implement the AsyncExceptionThrower interface."); + }

          + this.source = (AsyncExceptionChecker) owner;
          +
          long now = owner.getCurrentProcessingTime();
          this.watermarkTimer = owner.registerTimer(now + watermarkInterval,
          new WatermarkEmittingTask(owner, lockingObjectParam, outputParam));
          }

          @Override
          public void collect(T element) {

          • owner.checkAsyncException();
            + if (source != null) {
              • End diff –

          I don't think these null checks are needed, because the `IllegalStateException` is thrown if `owner` is `null`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75651032 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java — @@ -214,14 +216,26 @@ public AutomaticWatermarkContext( this.watermarkInterval = watermarkInterval; this.reuse = new StreamRecord<T>(null); + // if it is a source, then we cast and cache it + // here so that we do not have to do it in every collect(), + // collectWithTimestamp() and emitWatermark() + + if (!(owner instanceof AsyncExceptionChecker)) { + throw new IllegalStateException("The ManualWatermarkContext can only be used " + + "with sources that implement the AsyncExceptionThrower interface."); + } + this.source = (AsyncExceptionChecker) owner; + long now = owner.getCurrentProcessingTime(); this.watermarkTimer = owner.registerTimer(now + watermarkInterval, new WatermarkEmittingTask(owner, lockingObjectParam, outputParam)); } @Override public void collect(T element) { owner.checkAsyncException(); + if (source != null) { End diff – I don't think these null checks are needed, because the `IllegalStateException` is thrown if `owner` is `null`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2350#discussion_r75652728

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java —
          @@ -188,18 +189,19 @@ public void close() {}
          */
          public static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> {

          • private final StreamSource<?, ?> owner;
            + private final AbstractStreamOperator<T> owner;
              • End diff –

          This should also be an AsyncExceptionChecker, same for the parameter. For the time handling, this can get a `TimeServiceProvider`, that way, things are cleanly separated.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75652728 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java — @@ -188,18 +189,19 @@ public void close() {} */ public static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> { private final StreamSource<?, ?> owner; + private final AbstractStreamOperator<T> owner; End diff – This should also be an AsyncExceptionChecker, same for the parameter. For the time handling, this can get a `TimeServiceProvider`, that way, things are cleanly separated.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/2350

          Ah, seems I was a bit to quick earlier. I added one more inline comment.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2350 Ah, seems I was a bit to quick earlier. I added one more inline comment.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u closed the pull request at:

          https://github.com/apache/flink/pull/2350

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/2350
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user kl0u opened a pull request:

          https://github.com/apache/flink/pull/2546

          FLINK-4329 Fix Streaming File Source Timestamps/Watermarks Handling

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/kl0u/flink fix_ingestion_time

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2546.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2546


          commit 1b15b77b80334adf869714937dbfa8d8b7c2e12f
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-08-25T15:38:49Z

          FLINK-4329 Fix Streaming File Source Timestamps/Watermarks Handling


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/2546 FLINK-4329 Fix Streaming File Source Timestamps/Watermarks Handling You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink fix_ingestion_time Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2546.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2546 commit 1b15b77b80334adf869714937dbfa8d8b7c2e12f Author: kl0u <kkloudas@gmail.com> Date: 2016-08-25T15:38:49Z FLINK-4329 Fix Streaming File Source Timestamps/Watermarks Handling
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2546#discussion_r80901242

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java —
          @@ -201,6 +201,11 @@ public void testWindowTriggerTimeAlignment() throws Exception {
          assertTrue(op.getNextEvaluationTime() % 1000 == 0);
          op.dispose();

          + timerService.shutdownService();
          — End diff –

          Same here

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r80901242 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java — @@ -201,6 +201,11 @@ public void testWindowTriggerTimeAlignment() throws Exception { assertTrue(op.getNextEvaluationTime() % 1000 == 0); op.dispose(); + timerService.shutdownService(); — End diff – Same here
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2546#discussion_r80901203

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java —
          @@ -209,6 +209,11 @@ public void testWindowTriggerTimeAlignment() throws Exception {
          assertTrue(op.getNextEvaluationTime() % 1000 == 0);
          op.dispose();

          + timerService.shutdownService();
          — End diff –

          Why does this need to create and shut down a timer service every time?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r80901203 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java — @@ -209,6 +209,11 @@ public void testWindowTriggerTimeAlignment() throws Exception { assertTrue(op.getNextEvaluationTime() % 1000 == 0); op.dispose(); + timerService.shutdownService(); — End diff – Why does this need to create and shut down a timer service every time?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2546#discussion_r80901399

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java —
          @@ -109,9 +110,15 @@ public void run() {
          public static DefaultTimeServiceProvider createForTesting(ScheduledExecutorService executor, Object checkpointLock) {
          return new DefaultTimeServiceProvider(new AsyncExceptionHandler() {
          @Override

          • public void registerAsyncException(AsynchronousException exception)
            Unknown macro: { + public void handleAsyncException(String message, Throwable exception) { exception.printStackTrace(); } }

            , executor, checkpointLock);
            }
            +
            + @VisibleForTesting
            + public static DefaultTimeServiceProvider createForTestingWithHandler(

              • End diff –

          Is this the exact same code as the default constructor? Can it be removed?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r80901399 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java — @@ -109,9 +110,15 @@ public void run() { public static DefaultTimeServiceProvider createForTesting(ScheduledExecutorService executor, Object checkpointLock) { return new DefaultTimeServiceProvider(new AsyncExceptionHandler() { @Override public void registerAsyncException(AsynchronousException exception) Unknown macro: { + public void handleAsyncException(String message, Throwable exception) { exception.printStackTrace(); } } , executor, checkpointLock); } + + @VisibleForTesting + public static DefaultTimeServiceProvider createForTestingWithHandler( End diff – Is this the exact same code as the default constructor? Can it be removed?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2546#discussion_r80902027

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java —
          @@ -18,12 +18,14 @@
          package org.apache.flink.streaming.runtime.tasks;

          /**

          • * Interface for reporting exceptions that are thrown in (possibly) a different thread.
            + * An interface marking a task as capable of handling exceptions thrown
            + * by different threads, other than the one executing the task itself.
            */
            public interface AsyncExceptionHandler {

          /**

          • * Registers the given exception.
            + * Handles an exception thrown by another thread (e.g. a TriggerTask),
            + * other than the one executing the main task.
            */
          • void registerAsyncException(AsynchronousException exception);
            + void handleAsyncException(String message, Throwable exception);
              • End diff –

          This name change is good!

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r80902027 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java — @@ -18,12 +18,14 @@ package org.apache.flink.streaming.runtime.tasks; /** * Interface for reporting exceptions that are thrown in (possibly) a different thread. + * An interface marking a task as capable of handling exceptions thrown + * by different threads, other than the one executing the task itself. */ public interface AsyncExceptionHandler { /** * Registers the given exception. + * Handles an exception thrown by another thread (e.g. a TriggerTask), + * other than the one executing the main task. */ void registerAsyncException(AsynchronousException exception); + void handleAsyncException(String message, Throwable exception); End diff – This name change is good!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2546#discussion_r80904818

          — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java —
          @@ -224,7 +327,7 @@ public void testFilePathFiltering() throws Exception {
          monitoringFunction.open(new Configuration());
          monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));

          • Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
            + Assert.assertEquals(uniqFilesFound.size(), NO_OF_FILES);
              • End diff –

          `assertEquals()` takes "expected" first and "actual" second.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r80904818 — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java — @@ -224,7 +327,7 @@ public void testFilePathFiltering() throws Exception { monitoringFunction.open(new Configuration()); monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound)); Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES); + Assert.assertEquals(uniqFilesFound.size(), NO_OF_FILES); End diff – `assertEquals()` takes "expected" first and "actual" second.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2546#discussion_r80901983

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java —
          @@ -99,7 +100,7 @@ public void run()

          { target.trigger(timestamp); }

          catch (Throwable t) {
          TimerException asyncException = new TimerException(t);
          — End diff –

          Do we need this extra level of exception wrapping?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r80901983 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java — @@ -99,7 +100,7 @@ public void run() { target.trigger(timestamp); } catch (Throwable t) { TimerException asyncException = new TimerException(t); — End diff – Do we need this extra level of exception wrapping?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2546

          All in all some minor change requests, otherwise this seems good.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2546 All in all some minor change requests, otherwise this seems good.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2546

          Actually, let me take a step back and understand a few things deeper, first.
          Who actually generates the watermarks (in ingestion time)? The operator that creates the file splits, or the operator that reads the splits?

          If the configuration is set to IngestionTime, will the operator that creates the file splits emit a final LongMax watermark? Is that one passing through by the split-reading operator? Is there a test that test that specific scenario? (I believe it was the initially reported bug).

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2546 Actually, let me take a step back and understand a few things deeper, first. Who actually generates the watermarks (in ingestion time)? The operator that creates the file splits, or the operator that reads the splits? If the configuration is set to IngestionTime, will the operator that creates the file splits emit a final LongMax watermark? Is that one passing through by the split-reading operator? Is there a test that test that specific scenario? (I believe it was the initially reported bug).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2546

          Hi @StephanEwen , thanks for the review!

          The watermarks/timestamps are now generated by the Reader, and not the operator that creates the splits. The same holds for the LongMax watermark, which is created at the close() of the ContinuousFileReaderOperator.

          As for tests, it is the testFileReadingOperatorWithIngestionTime() in the ContinuousFileMonitoringTest which checks if the last Watermark is the LongMax.

          The original problem was that there were no timestamps assigned to the elements for Ingestion time and watermarks were emitted (I think it was a Process_once case).

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2546 Hi @StephanEwen , thanks for the review! The watermarks/timestamps are now generated by the Reader, and not the operator that creates the splits. The same holds for the LongMax watermark, which is created at the close() of the ContinuousFileReaderOperator. As for tests, it is the testFileReadingOperatorWithIngestionTime() in the ContinuousFileMonitoringTest which checks if the last Watermark is the LongMax. The original problem was that there were no timestamps assigned to the elements for Ingestion time and watermarks were emitted (I think it was a Process_once case).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2546#discussion_r80915830

          — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java —
          @@ -106,6 +107,117 @@ public static void destroyHDFS() {
          // TESTS

          @Test
          + public void testFileReadingOperatorWithIngestionTime() throws Exception {
          + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
          + Map<Integer, String> expectedFileContents = new HashMap<>();
          + for(int i = 0; i < NO_OF_FILES; i++)

          { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + }

          +
          + TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
          + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
          +
          + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
          +
          + ExecutionConfig executionConfig = new ExecutionConfig();
          + executionConfig.setAutoWatermarkInterval(100);
          +
          + TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
          + OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
          + new OneInputStreamOperatorTestHarness<>(reader, executionConfig,
          + timeServiceProvider, TimeCharacteristic.IngestionTime);
          +
          + reader.setOutputType(typeInfo, executionConfig);
          + tester.open();
          +
          + // test that watermarks are correctly emitted
          +
          + timeServiceProvider.setCurrentTime(201);
          + timeServiceProvider.setCurrentTime(301);
          + timeServiceProvider.setCurrentTime(401);
          + timeServiceProvider.setCurrentTime(501);
          +
          + int i = 0;
          + for(Object line: tester.getOutput()) {
          + if (!(line instanceof Watermark))

          { + Assert.fail("Only watermarks are expected here "); + }

          + Watermark w = (Watermark) line;
          + Assert.assertEquals(w.getTimestamp(), 200 + (i * 100));
          + i++;
          + }
          +
          + // clear the output to get the elements only and the final watermark
          + tester.getOutput().clear();
          + Assert.assertEquals(tester.getOutput().size(), 0);
          +
          + // create the necessary splits for the test
          + FileInputSplit[] splits = format.createInputSplits(
          — End diff –

          What will the `getNumberOfParallelSubtasks()` be here? The test does not control the number of splits, but leave this to the implicit behavior of the test harness?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r80915830 — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java — @@ -106,6 +107,117 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>(); + Map<Integer, String> expectedFileContents = new HashMap<>(); + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); + + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(100); + + TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, + timeServiceProvider, TimeCharacteristic.IngestionTime); + + reader.setOutputType(typeInfo, executionConfig); + tester.open(); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(w.getTimestamp(), 200 + (i * 100)); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(tester.getOutput().size(), 0); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( — End diff – What will the `getNumberOfParallelSubtasks()` be here? The test does not control the number of splits, but leave this to the implicit behavior of the test harness?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2546#discussion_r80916240

          — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java —
          @@ -106,6 +107,117 @@ public static void destroyHDFS() {
          // TESTS

          @Test
          + public void testFileReadingOperatorWithIngestionTime() throws Exception {
          + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
          + Map<Integer, String> expectedFileContents = new HashMap<>();
          + for(int i = 0; i < NO_OF_FILES; i++)

          { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + }

          +
          + TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
          + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
          +
          + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
          +
          + ExecutionConfig executionConfig = new ExecutionConfig();
          + executionConfig.setAutoWatermarkInterval(100);
          +
          + TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
          + OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
          + new OneInputStreamOperatorTestHarness<>(reader, executionConfig,
          + timeServiceProvider, TimeCharacteristic.IngestionTime);
          +
          + reader.setOutputType(typeInfo, executionConfig);
          + tester.open();
          +
          + // test that watermarks are correctly emitted
          +
          + timeServiceProvider.setCurrentTime(201);
          + timeServiceProvider.setCurrentTime(301);
          + timeServiceProvider.setCurrentTime(401);
          + timeServiceProvider.setCurrentTime(501);
          +
          + int i = 0;
          + for(Object line: tester.getOutput()) {
          + if (!(line instanceof Watermark))

          { + Assert.fail("Only watermarks are expected here "); + }

          + Watermark w = (Watermark) line;
          + Assert.assertEquals(w.getTimestamp(), 200 + (i * 100));
          + i++;
          + }
          +
          + // clear the output to get the elements only and the final watermark
          + tester.getOutput().clear();
          + Assert.assertEquals(tester.getOutput().size(), 0);
          +
          + // create the necessary splits for the test
          + FileInputSplit[] splits = format.createInputSplits(
          + reader.getRuntimeContext().getNumberOfParallelSubtasks());
          +
          + // and feed them to the operator
          + for(FileInputSplit split: splits)

          { + tester.processElement(new StreamRecord<>(split)); + }

          +
          + // then close the reader gracefully so that
          + // we wait until all input is read
          + synchronized (tester.getCheckpointLock())

          { + tester.close(); + }

          +
          + for(org.apache.hadoop.fs.Path file: filesCreated)

          { + hdfs.delete(file, false); + }

          +
          + // the lines received must be the elements in the files +1 for the Long.MAX_VALUE watermark
          + Assert.assertEquals(tester.getOutput().size(), NO_OF_FILES * LINES_PER_FILE + 1);
          +
          + // put the elements read in a map by file they belong to
          + Map<Integer, List<String>> actualFileContents = new HashMap<>();
          + for(Object line: tester.getOutput()) {
          + if (line instanceof StreamRecord) {
          + StreamRecord<String> element = (StreamRecord<String>) line;
          + Assert.assertEquals(element.getTimestamp(), 501);
          +
          + int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
          + List<String> content = actualFileContents.get(fileIdx);
          + if (content == null)

          { + content = new ArrayList<>(); + actualFileContents.put(fileIdx, content); + }

          + content.add(element.getValue() + "\n");
          + } else if (line instanceof Watermark) {
          + Assert.assertEquals(((Watermark) line).getTimestamp(), Long.MAX_VALUE);
          — End diff –

          Does the test assume that all watermarks emitted by the reader are LongMax? I am confused here, isn't that exactly what should NOT happen? Otherwise all emitted elements are late?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r80916240 — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java — @@ -106,6 +107,117 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>(); + Map<Integer, String> expectedFileContents = new HashMap<>(); + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); + + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(100); + + TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, + timeServiceProvider, TimeCharacteristic.IngestionTime); + + reader.setOutputType(typeInfo, executionConfig); + tester.open(); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(w.getTimestamp(), 200 + (i * 100)); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(tester.getOutput().size(), 0); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + for(FileInputSplit split: splits) { + tester.processElement(new StreamRecord<>(split)); + } + + // then close the reader gracefully so that + // we wait until all input is read + synchronized (tester.getCheckpointLock()) { + tester.close(); + } + + for(org.apache.hadoop.fs.Path file: filesCreated) { + hdfs.delete(file, false); + } + + // the lines received must be the elements in the files +1 for the Long.MAX_VALUE watermark + Assert.assertEquals(tester.getOutput().size(), NO_OF_FILES * LINES_PER_FILE + 1); + + // put the elements read in a map by file they belong to + Map<Integer, List<String>> actualFileContents = new HashMap<>(); + for(Object line: tester.getOutput()) { + if (line instanceof StreamRecord) { + StreamRecord<String> element = (StreamRecord<String>) line; + Assert.assertEquals(element.getTimestamp(), 501); + + int fileIdx = Character.getNumericValue(element.getValue().charAt(0)); + List<String> content = actualFileContents.get(fileIdx); + if (content == null) { + content = new ArrayList<>(); + actualFileContents.put(fileIdx, content); + } + content.add(element.getValue() + "\n"); + } else if (line instanceof Watermark) { + Assert.assertEquals(((Watermark) line).getTimestamp(), Long.MAX_VALUE); — End diff – Does the test assume that all watermarks emitted by the reader are LongMax? I am confused here, isn't that exactly what should NOT happen? Otherwise all emitted elements are late?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2546

          I added some more comments. I could not find in that test anywhere the notion of checking that elements are not late, but properly interleaved with the watermarks.

          Is there a test that checks that the reader does not let LongMax watermarks pass through? Or that the split generating task does not emit a long-max watermark on exit?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2546 I added some more comments. I could not find in that test anywhere the notion of checking that elements are not late, but properly interleaved with the watermarks. Is there a test that checks that the reader does not let LongMax watermarks pass through? Or that the split generating task does not emit a long-max watermark on exit?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/2546

          Just a quick comment (I didn't review all code): Why does this touch the AlignedWindowOperator tests? I would like to keep this commit as small as possible because we're dealing with sensitive stuff where I'd like to clearly separate things.

          In `OneInputStreamOperatorTestHarness` and `KeyedOneInputStreamOperatorTestHarness`, restricting the time provider parameter to a `TestTimeServiceProvider` does not change anything, right? So I think we can leave it as is. Also in `OneInputStreamOperatorTestHarness` the additional `TimeCharacteristic` parameter is only useful for one specific test so I think it would be better to instead expose the `StreamConfig` and set the parameter there for the one test to keep the number of constructors manageable.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2546 Just a quick comment (I didn't review all code): Why does this touch the AlignedWindowOperator tests? I would like to keep this commit as small as possible because we're dealing with sensitive stuff where I'd like to clearly separate things. In `OneInputStreamOperatorTestHarness` and `KeyedOneInputStreamOperatorTestHarness`, restricting the time provider parameter to a `TestTimeServiceProvider` does not change anything, right? So I think we can leave it as is. Also in `OneInputStreamOperatorTestHarness` the additional `TimeCharacteristic` parameter is only useful for one specific test so I think it would be better to instead expose the `StreamConfig` and set the parameter there for the one test to keep the number of constructors manageable.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2546

          Hi @aljoscha, the problem with the AlignedWindowOperator tests is that they were using the DefaultTimeServiceProvider and by not shutting down the service, the previous timers would fire and throw a NPE because the reference to the operator they had would have been invalidated.

          For the restriction to TestTimeServiceProvider, this was done because now the DefaultTimeServiceProvider needs the checkpointLock, so either in the same constructor we add this as an argument, or we have to restrict the options to only the TestProvider.

          Finally for the StreamConfig I agree that it is only needed for one test so we can just expose it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2546 Hi @aljoscha, the problem with the AlignedWindowOperator tests is that they were using the DefaultTimeServiceProvider and by not shutting down the service, the previous timers would fire and throw a NPE because the reference to the operator they had would have been invalidated. For the restriction to TestTimeServiceProvider, this was done because now the DefaultTimeServiceProvider needs the checkpointLock, so either in the same constructor we add this as an argument, or we have to restrict the options to only the TestProvider. Finally for the StreamConfig I agree that it is only needed for one test so we can just expose it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2546#discussion_r81088167

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java —
          @@ -99,7 +100,7 @@ public void run()

          { target.trigger(timestamp); }

          catch (Throwable t) {
          TimerException asyncException = new TimerException(t);
          — End diff –

          No. This is just because this is how it was before. I will remove it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r81088167 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java — @@ -99,7 +100,7 @@ public void run() { target.trigger(timestamp); } catch (Throwable t) { TimerException asyncException = new TimerException(t); — End diff – No. This is just because this is how it was before. I will remove it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2546#discussion_r81089031

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java —
          @@ -99,7 +100,7 @@ public void run()

          { target.trigger(timestamp); }

          catch (Throwable t) {
          TimerException asyncException = new TimerException(t);
          — End diff –

          Although, now that I think about it, it is good to know that it came from a timer callback. What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r81089031 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java — @@ -99,7 +100,7 @@ public void run() { target.trigger(timestamp); } catch (Throwable t) { TimerException asyncException = new TimerException(t); — End diff – Although, now that I think about it, it is good to know that it came from a timer callback. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2546

          Thanks for the comments @StephanEwen and @aljoscha !
          I integrated most of them.
          Please have a look.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2546 Thanks for the comments @StephanEwen and @aljoscha ! I integrated most of them. Please have a look.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2546#discussion_r81204990

          — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java —
          @@ -190,12 +213,30 @@ public void testFileReadingOperatorWithIngestionTime() throws Exception {
          }
          content.add(element.getValue() + "\n");
          } else if (line instanceof Watermark)

          { - Assert.assertEquals(((Watermark) line).getTimestamp(), Long.MAX_VALUE); + watermarkTimestamps.add(((Watermark) line).getTimestamp()); }

          else

          { Assert.fail("Unknown element in the list."); }

          }

          + // check if all the input was read
          + Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, noOfLines);
          +
          + // check if the last element is the LongMax watermark
          + Assert.assertTrue(lastElement instanceof Watermark);
          + Assert.assertEquals(Long.MAX_VALUE, ((Watermark) lastElement).getTimestamp());
          +
          + System.out.println(watermarkTimestamps.size());
          — End diff –

          Leftover sysout printing.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r81204990 — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java — @@ -190,12 +213,30 @@ public void testFileReadingOperatorWithIngestionTime() throws Exception { } content.add(element.getValue() + "\n"); } else if (line instanceof Watermark) { - Assert.assertEquals(((Watermark) line).getTimestamp(), Long.MAX_VALUE); + watermarkTimestamps.add(((Watermark) line).getTimestamp()); } else { Assert.fail("Unknown element in the list."); } } + // check if all the input was read + Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, noOfLines); + + // check if the last element is the LongMax watermark + Assert.assertTrue(lastElement instanceof Watermark); + Assert.assertEquals(Long.MAX_VALUE, ((Watermark) lastElement).getTimestamp()); + + System.out.println(watermarkTimestamps.size()); — End diff – Leftover sysout printing.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2546#discussion_r81204828

          — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java —
          @@ -440,10 +491,50 @@ public void testFileSplitMonitoringProcessOnce() throws Exception {

          private int getLineNo(String line)

          { String[] tkns = line.split("\\s"); - Assert.assertEquals(tkns.length, 6); + Assert.assertEquals(6, tkns.length); return Integer.parseInt(tkns[tkns.length - 1]); }

          + private class TimeUpdatingThread extends Thread {
          +
          + private volatile boolean isRunning;
          +
          + private final TestTimeServiceProvider timeServiceProvider;
          + private final OneInputStreamOperatorTestHarness testHarness;
          + private final long wmInterval;
          + private final int elementUntilUpdating;
          +
          + TimeUpdatingThread(final TestTimeServiceProvider timeServiceProvider,
          + final OneInputStreamOperatorTestHarness testHarness,
          + final long wmInterval,
          + final int elementUntilUpdating)

          { + + this.timeServiceProvider = timeServiceProvider; + this.testHarness = testHarness; + this.wmInterval = wmInterval; + this.elementUntilUpdating = elementUntilUpdating; + this.isRunning = true; + }

          +
          + @Override
          + public void run() {
          + try {
          + while (isRunning) {
          + if (testHarness.getOutput().size() % elementUntilUpdating == 0)

          { + long now = timeServiceProvider.getCurrentProcessingTime(); + timeServiceProvider.setCurrentTime(now + wmInterval); + }

          + }
          + } catch (Exception e) {
          + e.printStackTrace();
          — End diff –

          This will not result in any meaningful feedback to the test.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r81204828 — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java — @@ -440,10 +491,50 @@ public void testFileSplitMonitoringProcessOnce() throws Exception { private int getLineNo(String line) { String[] tkns = line.split("\\s"); - Assert.assertEquals(tkns.length, 6); + Assert.assertEquals(6, tkns.length); return Integer.parseInt(tkns[tkns.length - 1]); } + private class TimeUpdatingThread extends Thread { + + private volatile boolean isRunning; + + private final TestTimeServiceProvider timeServiceProvider; + private final OneInputStreamOperatorTestHarness testHarness; + private final long wmInterval; + private final int elementUntilUpdating; + + TimeUpdatingThread(final TestTimeServiceProvider timeServiceProvider, + final OneInputStreamOperatorTestHarness testHarness, + final long wmInterval, + final int elementUntilUpdating) { + + this.timeServiceProvider = timeServiceProvider; + this.testHarness = testHarness; + this.wmInterval = wmInterval; + this.elementUntilUpdating = elementUntilUpdating; + this.isRunning = true; + } + + @Override + public void run() { + try { + while (isRunning) { + if (testHarness.getOutput().size() % elementUntilUpdating == 0) { + long now = timeServiceProvider.getCurrentProcessingTime(); + timeServiceProvider.setCurrentTime(now + wmInterval); + } + } + } catch (Exception e) { + e.printStackTrace(); — End diff – This will not result in any meaningful feedback to the test.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2546#discussion_r81204726

          — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java —
          @@ -440,10 +491,50 @@ public void testFileSplitMonitoringProcessOnce() throws Exception {

          private int getLineNo(String line)

          { String[] tkns = line.split("\\s"); - Assert.assertEquals(tkns.length, 6); + Assert.assertEquals(6, tkns.length); return Integer.parseInt(tkns[tkns.length - 1]); }

          + private class TimeUpdatingThread extends Thread {
          +
          + private volatile boolean isRunning;
          +
          + private final TestTimeServiceProvider timeServiceProvider;
          + private final OneInputStreamOperatorTestHarness testHarness;
          + private final long wmInterval;
          + private final int elementUntilUpdating;
          +
          + TimeUpdatingThread(final TestTimeServiceProvider timeServiceProvider,
          + final OneInputStreamOperatorTestHarness testHarness,
          + final long wmInterval,
          + final int elementUntilUpdating)

          { + + this.timeServiceProvider = timeServiceProvider; + this.testHarness = testHarness; + this.wmInterval = wmInterval; + this.elementUntilUpdating = elementUntilUpdating; + this.isRunning = true; + }

          +
          + @Override
          + public void run() {
          + try {
          + while (isRunning) {
          + if (testHarness.getOutput().size() % elementUntilUpdating == 0) {
          — End diff –

          There is a "race" between the operator emitting elements and this thread. Both run in loops without delays. Only if this condition is evaluated by chance at the exact point in time when the list happens to have so many result elements, there will actually be a time advance.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r81204726 — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java — @@ -440,10 +491,50 @@ public void testFileSplitMonitoringProcessOnce() throws Exception { private int getLineNo(String line) { String[] tkns = line.split("\\s"); - Assert.assertEquals(tkns.length, 6); + Assert.assertEquals(6, tkns.length); return Integer.parseInt(tkns[tkns.length - 1]); } + private class TimeUpdatingThread extends Thread { + + private volatile boolean isRunning; + + private final TestTimeServiceProvider timeServiceProvider; + private final OneInputStreamOperatorTestHarness testHarness; + private final long wmInterval; + private final int elementUntilUpdating; + + TimeUpdatingThread(final TestTimeServiceProvider timeServiceProvider, + final OneInputStreamOperatorTestHarness testHarness, + final long wmInterval, + final int elementUntilUpdating) { + + this.timeServiceProvider = timeServiceProvider; + this.testHarness = testHarness; + this.wmInterval = wmInterval; + this.elementUntilUpdating = elementUntilUpdating; + this.isRunning = true; + } + + @Override + public void run() { + try { + while (isRunning) { + if (testHarness.getOutput().size() % elementUntilUpdating == 0) { — End diff – There is a "race" between the operator emitting elements and this thread. Both run in loops without delays. Only if this condition is evaluated by chance at the exact point in time when the list happens to have so many result elements, there will actually be a time advance.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2546

          Hi @StephanEwen . If I understand correctly, your suggestion is to make the test something like the following: 1) put the split in the reader 2) read the split 3) when the split finishes update the time in the provider 4) observe the time in the output elements. If this is the case, then the problem is that the reader just puts the split in a queue, and this is picked up by another thread that reads it. In this context, there is no way of knowing when the reading thread has finished reading the split and goes to the next one. So step 3) cannot be synchronized correctly. This is the reason I am just having a thread in the test that tries (without guarantees - the race condition you mentioned) to update the time while the reader is still reading. Any suggestions are welcome.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2546 Hi @StephanEwen . If I understand correctly, your suggestion is to make the test something like the following: 1) put the split in the reader 2) read the split 3) when the split finishes update the time in the provider 4) observe the time in the output elements. If this is the case, then the problem is that the reader just puts the split in a queue, and this is picked up by another thread that reads it. In this context, there is no way of knowing when the reading thread has finished reading the split and goes to the next one. So step 3) cannot be synchronized correctly. This is the reason I am just having a thread in the test that tries (without guarantees - the race condition you mentioned) to update the time while the reader is still reading. Any suggestions are welcome.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user kl0u opened a pull request:

          https://github.com/apache/flink/pull/2593

          FLINK-4329 Fix Streaming File Source Timestamps/Watermarks Handling

          This is a quick fix for the FLINK-4329 issue. The fix on the master is different but it contains more changes that are not easy to back-port to 1.1.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/kl0u/flink injestion_fix_1.1

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2593.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2593


          commit 79d48e77b4a9c8a8eaf4e1e3199e2787deebab63
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-10-04T13:27:59Z

          FLINK-4329 Fix Streaming File Source Timestamps/Watermarks Handling


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/2593 FLINK-4329 Fix Streaming File Source Timestamps/Watermarks Handling This is a quick fix for the FLINK-4329 issue. The fix on the master is different but it contains more changes that are not easy to back-port to 1.1. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink injestion_fix_1.1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2593.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2593 commit 79d48e77b4a9c8a8eaf4e1e3199e2787deebab63 Author: kl0u <kkloudas@gmail.com> Date: 2016-10-04T13:27:59Z FLINK-4329 Fix Streaming File Source Timestamps/Watermarks Handling
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mxm commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2593#discussion_r81944208

          — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java —
          @@ -106,6 +107,155 @@ public static void destroyHDFS() {
          // TESTS

          @Test
          + public void testFileReadingOperatorWithIngestionTime() throws Exception {
          + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
          + Map<Integer, String> expectedFileContents = new HashMap<>();
          +
          + for(int i = 0; i < NO_OF_FILES; i++)

          { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + }

          +
          + TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
          + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
          +
          + final long watermarkInterval = 10;
          + ExecutionConfig executionConfig = new ExecutionConfig();
          + executionConfig.setAutoWatermarkInterval(watermarkInterval);
          +
          + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
          + reader.setOutputType(typeInfo, executionConfig);
          +
          + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
          + final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
          + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider);
          + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
          + tester.open();
          +
          + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic());
          +
          + // test that watermarks are correctly emitted
          +
          + timeServiceProvider.setCurrentTime(201);
          + timeServiceProvider.setCurrentTime(301);
          + timeServiceProvider.setCurrentTime(401);
          + timeServiceProvider.setCurrentTime(501);
          +
          + int i = 0;
          + for(Object line: tester.getOutput()) {
          + if (!(line instanceof Watermark))

          { + Assert.fail("Only watermarks are expected here "); + }

          + Watermark w = (Watermark) line;
          + Assert.assertEquals(200 + (i * 100), w.getTimestamp());
          + i++;
          + }
          +
          + // clear the output to get the elements only and the final watermark
          + tester.getOutput().clear();
          + Assert.assertEquals(0, tester.getOutput().size());
          +
          + // create the necessary splits for the test
          + FileInputSplit[] splits = format.createInputSplits(
          + reader.getRuntimeContext().getNumberOfParallelSubtasks());
          +
          + // and feed them to the operator
          + Map<Integer, List<String>> actualFileContents = new HashMap<>();
          +
          + long lastSeenWatermark = Long.MIN_VALUE;
          + int lineCounter = 0; // counter for the lines read from the splits
          + int watermarkCounter = 0;
          +
          + for(FileInputSplit split: splits) {
          +
          + // set the next "current processing time".
          + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
          + timeServiceProvider.setCurrentTime(nextTimestamp);
          +
          + // send the next split to be read and wait until it is fully read.
          + tester.processElement(new StreamRecord<>(split));
          + synchronized (tester.getCheckpointLock()) {
          + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1))

          { + tester.getCheckpointLock().wait(10); + }

          + }
          +
          + // verify that the results are the expected
          + for(Object line: tester.getOutput()) {
          + if (line instanceof StreamRecord) {
          + StreamRecord<String> element = (StreamRecord<String>) line;
          — End diff –

          You could add a `@SupressWarnings("unchecked");` here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81944208 — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java — @@ -106,6 +107,155 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>(); + Map<Integer, String> expectedFileContents = new HashMap<>(); + + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); + + final long watermarkInterval = 10; + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(watermarkInterval); + + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, executionConfig); + + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); + + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(200 + (i * 100), w.getTimestamp()); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(0, tester.getOutput().size()); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + Map<Integer, List<String>> actualFileContents = new HashMap<>(); + + long lastSeenWatermark = Long.MIN_VALUE; + int lineCounter = 0; // counter for the lines read from the splits + int watermarkCounter = 0; + + for(FileInputSplit split: splits) { + + // set the next "current processing time". + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval; + timeServiceProvider.setCurrentTime(nextTimestamp); + + // send the next split to be read and wait until it is fully read. + tester.processElement(new StreamRecord<>(split)); + synchronized (tester.getCheckpointLock()) { + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) { + tester.getCheckpointLock().wait(10); + } + } + + // verify that the results are the expected + for(Object line: tester.getOutput()) { + if (line instanceof StreamRecord) { + StreamRecord<String> element = (StreamRecord<String>) line; — End diff – You could add a `@SupressWarnings("unchecked");` here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mxm commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2593#discussion_r81944676

          — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java —
          @@ -106,6 +107,155 @@ public static void destroyHDFS() {
          // TESTS

          @Test
          + public void testFileReadingOperatorWithIngestionTime() throws Exception {
          + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
          + Map<Integer, String> expectedFileContents = new HashMap<>();
          +
          + for(int i = 0; i < NO_OF_FILES; i++)

          { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + }

          +
          + TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
          + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
          +
          + final long watermarkInterval = 10;
          + ExecutionConfig executionConfig = new ExecutionConfig();
          + executionConfig.setAutoWatermarkInterval(watermarkInterval);
          +
          + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
          + reader.setOutputType(typeInfo, executionConfig);
          +
          + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
          + final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
          + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider);
          + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
          + tester.open();
          +
          + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic());
          +
          + // test that watermarks are correctly emitted
          +
          + timeServiceProvider.setCurrentTime(201);
          + timeServiceProvider.setCurrentTime(301);
          + timeServiceProvider.setCurrentTime(401);
          + timeServiceProvider.setCurrentTime(501);
          +
          + int i = 0;
          + for(Object line: tester.getOutput()) {
          + if (!(line instanceof Watermark))

          { + Assert.fail("Only watermarks are expected here "); + }

          + Watermark w = (Watermark) line;
          + Assert.assertEquals(200 + (i * 100), w.getTimestamp());
          + i++;
          + }
          +
          + // clear the output to get the elements only and the final watermark
          + tester.getOutput().clear();
          + Assert.assertEquals(0, tester.getOutput().size());
          +
          + // create the necessary splits for the test
          + FileInputSplit[] splits = format.createInputSplits(
          + reader.getRuntimeContext().getNumberOfParallelSubtasks());
          +
          + // and feed them to the operator
          + Map<Integer, List<String>> actualFileContents = new HashMap<>();
          +
          + long lastSeenWatermark = Long.MIN_VALUE;
          + int lineCounter = 0; // counter for the lines read from the splits
          + int watermarkCounter = 0;
          +
          + for(FileInputSplit split: splits) {
          +
          + // set the next "current processing time".
          + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
          + timeServiceProvider.setCurrentTime(nextTimestamp);
          +
          + // send the next split to be read and wait until it is fully read.
          + tester.processElement(new StreamRecord<>(split));
          + synchronized (tester.getCheckpointLock()) {
          + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1))

          { + tester.getCheckpointLock().wait(10); + }

          + }
          +
          + // verify that the results are the expected
          + for(Object line: tester.getOutput()) {
          + if (line instanceof StreamRecord) {
          + StreamRecord<String> element = (StreamRecord<String>) line;
          + lineCounter++;
          +
          + Assert.assertEquals(nextTimestamp, element.getTimestamp());
          +
          + int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
          + List<String> content = actualFileContents.get(fileIdx);
          + if (content == null)

          { + content = new ArrayList<>(); + actualFileContents.put(fileIdx, content); + }

          + content.add(element.getValue() + "\n");
          + } else if (line instanceof Watermark)

          { + long watermark = ((Watermark) line).getTimestamp(); + + Assert.assertEquals(nextTimestamp - (nextTimestamp % watermarkInterval), watermark); + Assert.assertTrue(watermark > lastSeenWatermark); + watermarkCounter++; + + lastSeenWatermark = watermark; + }

          else

          { + Assert.fail("Unknown element in the list."); + }

          + }
          +
          + // clean the output to be ready for the next split
          + tester.getOutput().clear();
          + }
          +
          + // now we are processing one split after the other,
          + // so all the elements must be here by now.
          + Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter);
          +
          + // because we expect one watermark per split.
          + Assert.assertEquals(NO_OF_FILES, watermarkCounter);
          +
          + // then close the reader gracefully so that the Long.MAX watermark is emitted
          + synchronized (tester.getCheckpointLock())

          { + tester.close(); + }

          +
          + for(org.apache.hadoop.fs.Path file: filesCreated)

          { + hdfs.delete(file, false); + }

          +
          + // check if the last element is the LongMax watermark (by now this must be the only element)
          + Assert.assertEquals(1, tester.getOutput().size());
          + Assert.assertTrue(tester.getOutput().peek() instanceof Watermark);
          + Assert.assertEquals(Long.MAX_VALUE, ((Watermark) tester.getOutput().poll()).getTimestamp());
          +
          + // check if the elements are the expected ones.
          + Assert.assertEquals(expectedFileContents.size(), actualFileContents.size());
          + for (Integer fileIdx: expectedFileContents.keySet()) {
          + Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx));
          — End diff –

          You wouldn't have to use file indexes if you immediately checked the split output.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81944676 — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java — @@ -106,6 +107,155 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>(); + Map<Integer, String> expectedFileContents = new HashMap<>(); + + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); + + final long watermarkInterval = 10; + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(watermarkInterval); + + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, executionConfig); + + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); + + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(200 + (i * 100), w.getTimestamp()); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(0, tester.getOutput().size()); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + Map<Integer, List<String>> actualFileContents = new HashMap<>(); + + long lastSeenWatermark = Long.MIN_VALUE; + int lineCounter = 0; // counter for the lines read from the splits + int watermarkCounter = 0; + + for(FileInputSplit split: splits) { + + // set the next "current processing time". + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval; + timeServiceProvider.setCurrentTime(nextTimestamp); + + // send the next split to be read and wait until it is fully read. + tester.processElement(new StreamRecord<>(split)); + synchronized (tester.getCheckpointLock()) { + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) { + tester.getCheckpointLock().wait(10); + } + } + + // verify that the results are the expected + for(Object line: tester.getOutput()) { + if (line instanceof StreamRecord) { + StreamRecord<String> element = (StreamRecord<String>) line; + lineCounter++; + + Assert.assertEquals(nextTimestamp, element.getTimestamp()); + + int fileIdx = Character.getNumericValue(element.getValue().charAt(0)); + List<String> content = actualFileContents.get(fileIdx); + if (content == null) { + content = new ArrayList<>(); + actualFileContents.put(fileIdx, content); + } + content.add(element.getValue() + "\n"); + } else if (line instanceof Watermark) { + long watermark = ((Watermark) line).getTimestamp(); + + Assert.assertEquals(nextTimestamp - (nextTimestamp % watermarkInterval), watermark); + Assert.assertTrue(watermark > lastSeenWatermark); + watermarkCounter++; + + lastSeenWatermark = watermark; + } else { + Assert.fail("Unknown element in the list."); + } + } + + // clean the output to be ready for the next split + tester.getOutput().clear(); + } + + // now we are processing one split after the other, + // so all the elements must be here by now. + Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter); + + // because we expect one watermark per split. + Assert.assertEquals(NO_OF_FILES, watermarkCounter); + + // then close the reader gracefully so that the Long.MAX watermark is emitted + synchronized (tester.getCheckpointLock()) { + tester.close(); + } + + for(org.apache.hadoop.fs.Path file: filesCreated) { + hdfs.delete(file, false); + } + + // check if the last element is the LongMax watermark (by now this must be the only element) + Assert.assertEquals(1, tester.getOutput().size()); + Assert.assertTrue(tester.getOutput().peek() instanceof Watermark); + Assert.assertEquals(Long.MAX_VALUE, ((Watermark) tester.getOutput().poll()).getTimestamp()); + + // check if the elements are the expected ones. + Assert.assertEquals(expectedFileContents.size(), actualFileContents.size()); + for (Integer fileIdx: expectedFileContents.keySet()) { + Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx)); — End diff – You wouldn't have to use file indexes if you immediately checked the split output.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mxm commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2593#discussion_r81947121

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java —
          @@ -299,7 +319,7 @@ public void trigger(long timestamp) {
          synchronized (lockingObject) {
          if (currentTime > nextWatermarkTime) {
          output.emitWatermark(new Watermark(watermarkTime));

          • nextWatermarkTime += watermarkInterval;
            + nextWatermarkTime = watermarkTime + watermarkInterval;
              • End diff –

          Is this a fix? Does it change the semantics?

          Show
          githubbot ASF GitHub Bot added a comment - Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81947121 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java — @@ -299,7 +319,7 @@ public void trigger(long timestamp) { synchronized (lockingObject) { if (currentTime > nextWatermarkTime) { output.emitWatermark(new Watermark(watermarkTime)); nextWatermarkTime += watermarkInterval; + nextWatermarkTime = watermarkTime + watermarkInterval; End diff – Is this a fix? Does it change the semantics?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mxm commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2593#discussion_r81944268

          — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java —
          @@ -106,6 +107,155 @@ public static void destroyHDFS() {
          // TESTS

          @Test
          + public void testFileReadingOperatorWithIngestionTime() throws Exception {
          + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
          + Map<Integer, String> expectedFileContents = new HashMap<>();
          +
          + for(int i = 0; i < NO_OF_FILES; i++)

          { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + }

          +
          + TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
          + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
          +
          + final long watermarkInterval = 10;
          + ExecutionConfig executionConfig = new ExecutionConfig();
          + executionConfig.setAutoWatermarkInterval(watermarkInterval);
          +
          + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
          + reader.setOutputType(typeInfo, executionConfig);
          +
          + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
          + final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
          + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider);
          + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
          + tester.open();
          +
          + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic());
          +
          + // test that watermarks are correctly emitted
          +
          + timeServiceProvider.setCurrentTime(201);
          + timeServiceProvider.setCurrentTime(301);
          + timeServiceProvider.setCurrentTime(401);
          + timeServiceProvider.setCurrentTime(501);
          +
          + int i = 0;
          + for(Object line: tester.getOutput()) {
          + if (!(line instanceof Watermark))

          { + Assert.fail("Only watermarks are expected here "); + }

          + Watermark w = (Watermark) line;
          + Assert.assertEquals(200 + (i * 100), w.getTimestamp());
          + i++;
          + }
          +
          + // clear the output to get the elements only and the final watermark
          + tester.getOutput().clear();
          + Assert.assertEquals(0, tester.getOutput().size());
          +
          + // create the necessary splits for the test
          + FileInputSplit[] splits = format.createInputSplits(
          + reader.getRuntimeContext().getNumberOfParallelSubtasks());
          +
          + // and feed them to the operator
          + Map<Integer, List<String>> actualFileContents = new HashMap<>();
          +
          + long lastSeenWatermark = Long.MIN_VALUE;
          + int lineCounter = 0; // counter for the lines read from the splits
          + int watermarkCounter = 0;
          +
          + for(FileInputSplit split: splits) {
          +
          + // set the next "current processing time".
          + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
          + timeServiceProvider.setCurrentTime(nextTimestamp);
          +
          + // send the next split to be read and wait until it is fully read.
          + tester.processElement(new StreamRecord<>(split));
          + synchronized (tester.getCheckpointLock()) {
          + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1))

          { + tester.getCheckpointLock().wait(10); + }

          + }
          +
          + // verify that the results are the expected
          + for(Object line: tester.getOutput()) {
          + if (line instanceof StreamRecord) {
          + StreamRecord<String> element = (StreamRecord<String>) line;
          + lineCounter++;
          +
          + Assert.assertEquals(nextTimestamp, element.getTimestamp());
          +
          + int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
          + List<String> content = actualFileContents.get(fileIdx);
          + if (content == null)

          { + content = new ArrayList<>(); + actualFileContents.put(fileIdx, content); + }

          + content.add(element.getValue() + "\n");
          + } else if (line instanceof Watermark)

          { + long watermark = ((Watermark) line).getTimestamp(); + + Assert.assertEquals(nextTimestamp - (nextTimestamp % watermarkInterval), watermark); + Assert.assertTrue(watermark > lastSeenWatermark); + watermarkCounter++; + + lastSeenWatermark = watermark; + }

          else

          { + Assert.fail("Unknown element in the list."); + }

          + }
          +
          + // clean the output to be ready for the next split
          + tester.getOutput().clear();
          + }
          +
          + // now we are processing one split after the other,
          + // so all the elements must be here by now.
          + Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter);
          +
          + // because we expect one watermark per split.
          + Assert.assertEquals(NO_OF_FILES, watermarkCounter);
          — End diff –

          This should be `Assert.assertEquals(splits.length, watermarkCounter);`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81944268 — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java — @@ -106,6 +107,155 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>(); + Map<Integer, String> expectedFileContents = new HashMap<>(); + + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); + + final long watermarkInterval = 10; + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(watermarkInterval); + + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, executionConfig); + + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); + + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(200 + (i * 100), w.getTimestamp()); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(0, tester.getOutput().size()); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + Map<Integer, List<String>> actualFileContents = new HashMap<>(); + + long lastSeenWatermark = Long.MIN_VALUE; + int lineCounter = 0; // counter for the lines read from the splits + int watermarkCounter = 0; + + for(FileInputSplit split: splits) { + + // set the next "current processing time". + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval; + timeServiceProvider.setCurrentTime(nextTimestamp); + + // send the next split to be read and wait until it is fully read. + tester.processElement(new StreamRecord<>(split)); + synchronized (tester.getCheckpointLock()) { + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) { + tester.getCheckpointLock().wait(10); + } + } + + // verify that the results are the expected + for(Object line: tester.getOutput()) { + if (line instanceof StreamRecord) { + StreamRecord<String> element = (StreamRecord<String>) line; + lineCounter++; + + Assert.assertEquals(nextTimestamp, element.getTimestamp()); + + int fileIdx = Character.getNumericValue(element.getValue().charAt(0)); + List<String> content = actualFileContents.get(fileIdx); + if (content == null) { + content = new ArrayList<>(); + actualFileContents.put(fileIdx, content); + } + content.add(element.getValue() + "\n"); + } else if (line instanceof Watermark) { + long watermark = ((Watermark) line).getTimestamp(); + + Assert.assertEquals(nextTimestamp - (nextTimestamp % watermarkInterval), watermark); + Assert.assertTrue(watermark > lastSeenWatermark); + watermarkCounter++; + + lastSeenWatermark = watermark; + } else { + Assert.fail("Unknown element in the list."); + } + } + + // clean the output to be ready for the next split + tester.getOutput().clear(); + } + + // now we are processing one split after the other, + // so all the elements must be here by now. + Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter); + + // because we expect one watermark per split. + Assert.assertEquals(NO_OF_FILES, watermarkCounter); — End diff – This should be `Assert.assertEquals(splits.length, watermarkCounter);`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mxm commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2593#discussion_r81944020

          — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java —
          @@ -106,6 +107,155 @@ public static void destroyHDFS() {
          // TESTS

          @Test
          + public void testFileReadingOperatorWithIngestionTime() throws Exception {
          + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
          + Map<Integer, String> expectedFileContents = new HashMap<>();
          +
          + for(int i = 0; i < NO_OF_FILES; i++)

          { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + }

          +
          + TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
          + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
          +
          + final long watermarkInterval = 10;
          + ExecutionConfig executionConfig = new ExecutionConfig();
          + executionConfig.setAutoWatermarkInterval(watermarkInterval);
          +
          + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
          + reader.setOutputType(typeInfo, executionConfig);
          +
          + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
          + final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
          + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider);
          + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
          + tester.open();
          +
          + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic());
          +
          + // test that watermarks are correctly emitted
          +
          + timeServiceProvider.setCurrentTime(201);
          + timeServiceProvider.setCurrentTime(301);
          + timeServiceProvider.setCurrentTime(401);
          + timeServiceProvider.setCurrentTime(501);
          +
          + int i = 0;
          + for(Object line: tester.getOutput()) {
          + if (!(line instanceof Watermark))

          { + Assert.fail("Only watermarks are expected here "); + }

          + Watermark w = (Watermark) line;
          + Assert.assertEquals(200 + (i * 100), w.getTimestamp());
          + i++;
          + }
          +
          + // clear the output to get the elements only and the final watermark
          + tester.getOutput().clear();
          + Assert.assertEquals(0, tester.getOutput().size());
          +
          + // create the necessary splits for the test
          + FileInputSplit[] splits = format.createInputSplits(
          + reader.getRuntimeContext().getNumberOfParallelSubtasks());
          +
          + // and feed them to the operator
          + Map<Integer, List<String>> actualFileContents = new HashMap<>();
          +
          + long lastSeenWatermark = Long.MIN_VALUE;
          + int lineCounter = 0; // counter for the lines read from the splits
          + int watermarkCounter = 0;
          +
          + for(FileInputSplit split: splits) {
          +
          + // set the next "current processing time".
          + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
          + timeServiceProvider.setCurrentTime(nextTimestamp);
          +
          + // send the next split to be read and wait until it is fully read.
          + tester.processElement(new StreamRecord<>(split));
          + synchronized (tester.getCheckpointLock()) {
          + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) {
          + tester.getCheckpointLock().wait(10);
          — End diff –

          Seems like you don't need to synchronize on the checkpoint lock here and you simply want to `Thread.sleep(10)` to give the SplitReader thread more time to read. Perhaps add a comment to explain the +1.

          ```java
          while (tester.getOutput().size() < (LINES_PER_FILE + 1))

          { // wait for all lines of this split to be read and the watermark to be emitted Thread.sleep(10); }

          ```
          should be enough.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81944020 — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java — @@ -106,6 +107,155 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>(); + Map<Integer, String> expectedFileContents = new HashMap<>(); + + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); + + final long watermarkInterval = 10; + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(watermarkInterval); + + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, executionConfig); + + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); + + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(200 + (i * 100), w.getTimestamp()); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(0, tester.getOutput().size()); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + Map<Integer, List<String>> actualFileContents = new HashMap<>(); + + long lastSeenWatermark = Long.MIN_VALUE; + int lineCounter = 0; // counter for the lines read from the splits + int watermarkCounter = 0; + + for(FileInputSplit split: splits) { + + // set the next "current processing time". + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval; + timeServiceProvider.setCurrentTime(nextTimestamp); + + // send the next split to be read and wait until it is fully read. + tester.processElement(new StreamRecord<>(split)); + synchronized (tester.getCheckpointLock()) { + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) { + tester.getCheckpointLock().wait(10); — End diff – Seems like you don't need to synchronize on the checkpoint lock here and you simply want to `Thread.sleep(10)` to give the SplitReader thread more time to read. Perhaps add a comment to explain the +1. ```java while (tester.getOutput().size() < (LINES_PER_FILE + 1)) { // wait for all lines of this split to be read and the watermark to be emitted Thread.sleep(10); } ``` should be enough.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mxm commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2593#discussion_r81944308

          — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java —
          @@ -106,6 +107,155 @@ public static void destroyHDFS() {
          // TESTS

          @Test
          + public void testFileReadingOperatorWithIngestionTime() throws Exception {
          + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
          + Map<Integer, String> expectedFileContents = new HashMap<>();
          +
          + for(int i = 0; i < NO_OF_FILES; i++)

          { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + }

          +
          + TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
          + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
          +
          + final long watermarkInterval = 10;
          + ExecutionConfig executionConfig = new ExecutionConfig();
          + executionConfig.setAutoWatermarkInterval(watermarkInterval);
          +
          + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
          + reader.setOutputType(typeInfo, executionConfig);
          +
          + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
          + final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
          + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider);
          + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
          + tester.open();
          +
          + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic());
          +
          + // test that watermarks are correctly emitted
          +
          + timeServiceProvider.setCurrentTime(201);
          + timeServiceProvider.setCurrentTime(301);
          + timeServiceProvider.setCurrentTime(401);
          + timeServiceProvider.setCurrentTime(501);
          +
          + int i = 0;
          + for(Object line: tester.getOutput()) {
          + if (!(line instanceof Watermark))

          { + Assert.fail("Only watermarks are expected here "); + }

          + Watermark w = (Watermark) line;
          + Assert.assertEquals(200 + (i * 100), w.getTimestamp());
          + i++;
          + }
          +
          + // clear the output to get the elements only and the final watermark
          + tester.getOutput().clear();
          + Assert.assertEquals(0, tester.getOutput().size());
          +
          + // create the necessary splits for the test
          + FileInputSplit[] splits = format.createInputSplits(
          + reader.getRuntimeContext().getNumberOfParallelSubtasks());
          +
          + // and feed them to the operator
          + Map<Integer, List<String>> actualFileContents = new HashMap<>();
          +
          + long lastSeenWatermark = Long.MIN_VALUE;
          + int lineCounter = 0; // counter for the lines read from the splits
          + int watermarkCounter = 0;
          +
          + for(FileInputSplit split: splits) {
          +
          + // set the next "current processing time".
          + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
          + timeServiceProvider.setCurrentTime(nextTimestamp);
          +
          + // send the next split to be read and wait until it is fully read.
          + tester.processElement(new StreamRecord<>(split));
          + synchronized (tester.getCheckpointLock()) {
          + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1))

          { + tester.getCheckpointLock().wait(10); + }

          + }
          +
          + // verify that the results are the expected
          + for(Object line: tester.getOutput()) {
          + if (line instanceof StreamRecord) {
          + StreamRecord<String> element = (StreamRecord<String>) line;
          + lineCounter++;
          +
          + Assert.assertEquals(nextTimestamp, element.getTimestamp());
          +
          + int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
          + List<String> content = actualFileContents.get(fileIdx);
          + if (content == null)

          { + content = new ArrayList<>(); + actualFileContents.put(fileIdx, content); + }

          + content.add(element.getValue() + "\n");
          + } else if (line instanceof Watermark)

          { + long watermark = ((Watermark) line).getTimestamp(); + + Assert.assertEquals(nextTimestamp - (nextTimestamp % watermarkInterval), watermark); + Assert.assertTrue(watermark > lastSeenWatermark); + watermarkCounter++; + + lastSeenWatermark = watermark; + }

          else

          { + Assert.fail("Unknown element in the list."); + }

          + }
          +
          + // clean the output to be ready for the next split
          + tester.getOutput().clear();
          + }
          +
          + // now we are processing one split after the other,
          + // so all the elements must be here by now.
          + Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter);
          +
          + // because we expect one watermark per split.
          + Assert.assertEquals(NO_OF_FILES, watermarkCounter);
          +
          + // then close the reader gracefully so that the Long.MAX watermark is emitted
          + synchronized (tester.getCheckpointLock()) {
          + tester.close();
          — End diff –

          I don't think you need to lock on the checkpoint lock here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81944308 — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java — @@ -106,6 +107,155 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>(); + Map<Integer, String> expectedFileContents = new HashMap<>(); + + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); + + final long watermarkInterval = 10; + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(watermarkInterval); + + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, executionConfig); + + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); + + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(200 + (i * 100), w.getTimestamp()); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(0, tester.getOutput().size()); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + Map<Integer, List<String>> actualFileContents = new HashMap<>(); + + long lastSeenWatermark = Long.MIN_VALUE; + int lineCounter = 0; // counter for the lines read from the splits + int watermarkCounter = 0; + + for(FileInputSplit split: splits) { + + // set the next "current processing time". + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval; + timeServiceProvider.setCurrentTime(nextTimestamp); + + // send the next split to be read and wait until it is fully read. + tester.processElement(new StreamRecord<>(split)); + synchronized (tester.getCheckpointLock()) { + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) { + tester.getCheckpointLock().wait(10); + } + } + + // verify that the results are the expected + for(Object line: tester.getOutput()) { + if (line instanceof StreamRecord) { + StreamRecord<String> element = (StreamRecord<String>) line; + lineCounter++; + + Assert.assertEquals(nextTimestamp, element.getTimestamp()); + + int fileIdx = Character.getNumericValue(element.getValue().charAt(0)); + List<String> content = actualFileContents.get(fileIdx); + if (content == null) { + content = new ArrayList<>(); + actualFileContents.put(fileIdx, content); + } + content.add(element.getValue() + "\n"); + } else if (line instanceof Watermark) { + long watermark = ((Watermark) line).getTimestamp(); + + Assert.assertEquals(nextTimestamp - (nextTimestamp % watermarkInterval), watermark); + Assert.assertTrue(watermark > lastSeenWatermark); + watermarkCounter++; + + lastSeenWatermark = watermark; + } else { + Assert.fail("Unknown element in the list."); + } + } + + // clean the output to be ready for the next split + tester.getOutput().clear(); + } + + // now we are processing one split after the other, + // so all the elements must be here by now. + Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter); + + // because we expect one watermark per split. + Assert.assertEquals(NO_OF_FILES, watermarkCounter); + + // then close the reader gracefully so that the Long.MAX watermark is emitted + synchronized (tester.getCheckpointLock()) { + tester.close(); — End diff – I don't think you need to lock on the checkpoint lock here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mxm commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2593#discussion_r81946646

          — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java —
          @@ -106,6 +107,155 @@ public static void destroyHDFS() {
          // TESTS

          @Test
          + public void testFileReadingOperatorWithIngestionTime() throws Exception {
          + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
          + Map<Integer, String> expectedFileContents = new HashMap<>();
          +
          + for(int i = 0; i < NO_OF_FILES; i++)

          { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + }

          +
          + TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
          + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
          +
          + final long watermarkInterval = 10;
          + ExecutionConfig executionConfig = new ExecutionConfig();
          + executionConfig.setAutoWatermarkInterval(watermarkInterval);
          +
          + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
          + reader.setOutputType(typeInfo, executionConfig);
          +
          + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
          + final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
          + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider);
          + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
          + tester.open();
          +
          + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic());
          +
          + // test that watermarks are correctly emitted
          +
          + timeServiceProvider.setCurrentTime(201);
          + timeServiceProvider.setCurrentTime(301);
          + timeServiceProvider.setCurrentTime(401);
          + timeServiceProvider.setCurrentTime(501);
          +
          + int i = 0;
          + for(Object line: tester.getOutput()) {
          + if (!(line instanceof Watermark))

          { + Assert.fail("Only watermarks are expected here "); + }

          + Watermark w = (Watermark) line;
          + Assert.assertEquals(200 + (i * 100), w.getTimestamp());
          + i++;
          + }
          +
          + // clear the output to get the elements only and the final watermark
          + tester.getOutput().clear();
          + Assert.assertEquals(0, tester.getOutput().size());
          +
          + // create the necessary splits for the test
          + FileInputSplit[] splits = format.createInputSplits(
          + reader.getRuntimeContext().getNumberOfParallelSubtasks());
          +
          + // and feed them to the operator
          + Map<Integer, List<String>> actualFileContents = new HashMap<>();
          +
          + long lastSeenWatermark = Long.MIN_VALUE;
          + int lineCounter = 0; // counter for the lines read from the splits
          + int watermarkCounter = 0;
          +
          + for(FileInputSplit split: splits) {
          +
          + // set the next "current processing time".
          + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
          + timeServiceProvider.setCurrentTime(nextTimestamp);
          +
          + // send the next split to be read and wait until it is fully read.
          + tester.processElement(new StreamRecord<>(split));
          + synchronized (tester.getCheckpointLock()) {
          + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) {
          + tester.getCheckpointLock().wait(10);
          — End diff –

          Actually, sleeping wouldn't be necessary if you disabled the threaded split processing of the `SplitReader` for this test. You could have a synchronous reader for the test (would require a small change of the operator/reader to enable that).

          Show
          githubbot ASF GitHub Bot added a comment - Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81946646 — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java — @@ -106,6 +107,155 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>(); + Map<Integer, String> expectedFileContents = new HashMap<>(); + + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); + + final long watermarkInterval = 10; + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(watermarkInterval); + + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, executionConfig); + + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); + + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(200 + (i * 100), w.getTimestamp()); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(0, tester.getOutput().size()); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + Map<Integer, List<String>> actualFileContents = new HashMap<>(); + + long lastSeenWatermark = Long.MIN_VALUE; + int lineCounter = 0; // counter for the lines read from the splits + int watermarkCounter = 0; + + for(FileInputSplit split: splits) { + + // set the next "current processing time". + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval; + timeServiceProvider.setCurrentTime(nextTimestamp); + + // send the next split to be read and wait until it is fully read. + tester.processElement(new StreamRecord<>(split)); + synchronized (tester.getCheckpointLock()) { + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) { + tester.getCheckpointLock().wait(10); — End diff – Actually, sleeping wouldn't be necessary if you disabled the threaded split processing of the `SplitReader` for this test. You could have a synchronous reader for the test (would require a small change of the operator/reader to enable that).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mxm commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2593#discussion_r81944479

          — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java —
          @@ -106,6 +107,155 @@ public static void destroyHDFS() {
          // TESTS

          @Test
          + public void testFileReadingOperatorWithIngestionTime() throws Exception {
          + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
          + Map<Integer, String> expectedFileContents = new HashMap<>();
          +
          + for(int i = 0; i < NO_OF_FILES; i++)

          { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + }

          +
          + TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
          + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
          +
          + final long watermarkInterval = 10;
          + ExecutionConfig executionConfig = new ExecutionConfig();
          + executionConfig.setAutoWatermarkInterval(watermarkInterval);
          +
          + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
          + reader.setOutputType(typeInfo, executionConfig);
          +
          + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
          + final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
          + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider);
          + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
          + tester.open();
          +
          + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic());
          +
          + // test that watermarks are correctly emitted
          +
          + timeServiceProvider.setCurrentTime(201);
          + timeServiceProvider.setCurrentTime(301);
          + timeServiceProvider.setCurrentTime(401);
          + timeServiceProvider.setCurrentTime(501);
          +
          + int i = 0;
          + for(Object line: tester.getOutput()) {
          + if (!(line instanceof Watermark))

          { + Assert.fail("Only watermarks are expected here "); + }

          + Watermark w = (Watermark) line;
          + Assert.assertEquals(200 + (i * 100), w.getTimestamp());
          + i++;
          + }
          +
          + // clear the output to get the elements only and the final watermark
          + tester.getOutput().clear();
          + Assert.assertEquals(0, tester.getOutput().size());
          +
          + // create the necessary splits for the test
          + FileInputSplit[] splits = format.createInputSplits(
          + reader.getRuntimeContext().getNumberOfParallelSubtasks());
          +
          + // and feed them to the operator
          + Map<Integer, List<String>> actualFileContents = new HashMap<>();
          +
          + long lastSeenWatermark = Long.MIN_VALUE;
          + int lineCounter = 0; // counter for the lines read from the splits
          + int watermarkCounter = 0;
          +
          + for(FileInputSplit split: splits) {
          +
          + // set the next "current processing time".
          + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
          + timeServiceProvider.setCurrentTime(nextTimestamp);
          +
          + // send the next split to be read and wait until it is fully read.
          + tester.processElement(new StreamRecord<>(split));
          + synchronized (tester.getCheckpointLock()) {
          + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1))

          { + tester.getCheckpointLock().wait(10); + }

          + }
          +
          + // verify that the results are the expected
          + for(Object line: tester.getOutput()) {
          + if (line instanceof StreamRecord) {
          + StreamRecord<String> element = (StreamRecord<String>) line;
          + lineCounter++;
          +
          + Assert.assertEquals(nextTimestamp, element.getTimestamp());
          +
          + int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
          + List<String> content = actualFileContents.get(fileIdx);
          + if (content == null)

          { + content = new ArrayList<>(); + actualFileContents.put(fileIdx, content); + }

          + content.add(element.getValue() + "\n");
          + } else if (line instanceof Watermark)

          { + long watermark = ((Watermark) line).getTimestamp(); + + Assert.assertEquals(nextTimestamp - (nextTimestamp % watermarkInterval), watermark); + Assert.assertTrue(watermark > lastSeenWatermark); + watermarkCounter++; + + lastSeenWatermark = watermark; + }

          else

          { + Assert.fail("Unknown element in the list."); + }

          + }
          +
          + // clean the output to be ready for the next split
          + tester.getOutput().clear();
          + }
          +
          + // now we are processing one split after the other,
          + // so all the elements must be here by now.
          + Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter);
          +
          + // because we expect one watermark per split.
          + Assert.assertEquals(NO_OF_FILES, watermarkCounter);
          +
          + // then close the reader gracefully so that the Long.MAX watermark is emitted
          + synchronized (tester.getCheckpointLock())

          { + tester.close(); + }

          +
          + for(org.apache.hadoop.fs.Path file: filesCreated)

          { + hdfs.delete(file, false); + }

          +
          + // check if the last element is the LongMax watermark (by now this must be the only element)
          + Assert.assertEquals(1, tester.getOutput().size());
          + Assert.assertTrue(tester.getOutput().peek() instanceof Watermark);
          + Assert.assertEquals(Long.MAX_VALUE, ((Watermark) tester.getOutput().poll()).getTimestamp());
          +
          + // check if the elements are the expected ones.
          + Assert.assertEquals(expectedFileContents.size(), actualFileContents.size());
          + for (Integer fileIdx: expectedFileContents.keySet()) {
          + Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx));
          +
          + List<String> cntnt = actualFileContents.get(fileIdx);
          + Collections.sort(cntnt, new Comparator<String>() {
          — End diff –

          Sorting here wouldn't be necessary if you immediately compared the output of the split after reading it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81944479 — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java — @@ -106,6 +107,155 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>(); + Map<Integer, String> expectedFileContents = new HashMap<>(); + + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); + + final long watermarkInterval = 10; + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(watermarkInterval); + + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, executionConfig); + + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); + + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(200 + (i * 100), w.getTimestamp()); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(0, tester.getOutput().size()); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + Map<Integer, List<String>> actualFileContents = new HashMap<>(); + + long lastSeenWatermark = Long.MIN_VALUE; + int lineCounter = 0; // counter for the lines read from the splits + int watermarkCounter = 0; + + for(FileInputSplit split: splits) { + + // set the next "current processing time". + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval; + timeServiceProvider.setCurrentTime(nextTimestamp); + + // send the next split to be read and wait until it is fully read. + tester.processElement(new StreamRecord<>(split)); + synchronized (tester.getCheckpointLock()) { + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) { + tester.getCheckpointLock().wait(10); + } + } + + // verify that the results are the expected + for(Object line: tester.getOutput()) { + if (line instanceof StreamRecord) { + StreamRecord<String> element = (StreamRecord<String>) line; + lineCounter++; + + Assert.assertEquals(nextTimestamp, element.getTimestamp()); + + int fileIdx = Character.getNumericValue(element.getValue().charAt(0)); + List<String> content = actualFileContents.get(fileIdx); + if (content == null) { + content = new ArrayList<>(); + actualFileContents.put(fileIdx, content); + } + content.add(element.getValue() + "\n"); + } else if (line instanceof Watermark) { + long watermark = ((Watermark) line).getTimestamp(); + + Assert.assertEquals(nextTimestamp - (nextTimestamp % watermarkInterval), watermark); + Assert.assertTrue(watermark > lastSeenWatermark); + watermarkCounter++; + + lastSeenWatermark = watermark; + } else { + Assert.fail("Unknown element in the list."); + } + } + + // clean the output to be ready for the next split + tester.getOutput().clear(); + } + + // now we are processing one split after the other, + // so all the elements must be here by now. + Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter); + + // because we expect one watermark per split. + Assert.assertEquals(NO_OF_FILES, watermarkCounter); + + // then close the reader gracefully so that the Long.MAX watermark is emitted + synchronized (tester.getCheckpointLock()) { + tester.close(); + } + + for(org.apache.hadoop.fs.Path file: filesCreated) { + hdfs.delete(file, false); + } + + // check if the last element is the LongMax watermark (by now this must be the only element) + Assert.assertEquals(1, tester.getOutput().size()); + Assert.assertTrue(tester.getOutput().peek() instanceof Watermark); + Assert.assertEquals(Long.MAX_VALUE, ((Watermark) tester.getOutput().poll()).getTimestamp()); + + // check if the elements are the expected ones. + Assert.assertEquals(expectedFileContents.size(), actualFileContents.size()); + for (Integer fileIdx: expectedFileContents.keySet()) { + Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx)); + + List<String> cntnt = actualFileContents.get(fileIdx); + Collections.sort(cntnt, new Comparator<String>() { — End diff – Sorting here wouldn't be necessary if you immediately compared the output of the split after reading it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mxm commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2593#discussion_r81944138

          — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java —
          @@ -106,6 +107,155 @@ public static void destroyHDFS() {
          // TESTS

          @Test
          + public void testFileReadingOperatorWithIngestionTime() throws Exception {
          + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
          + Map<Integer, String> expectedFileContents = new HashMap<>();
          +
          + for(int i = 0; i < NO_OF_FILES; i++)

          { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + }

          +
          + TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
          + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
          +
          + final long watermarkInterval = 10;
          + ExecutionConfig executionConfig = new ExecutionConfig();
          + executionConfig.setAutoWatermarkInterval(watermarkInterval);
          +
          + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
          + reader.setOutputType(typeInfo, executionConfig);
          +
          + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
          + final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
          + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider);
          + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
          + tester.open();
          +
          + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic());
          +
          + // test that watermarks are correctly emitted
          +
          + timeServiceProvider.setCurrentTime(201);
          + timeServiceProvider.setCurrentTime(301);
          + timeServiceProvider.setCurrentTime(401);
          + timeServiceProvider.setCurrentTime(501);
          +
          + int i = 0;
          + for(Object line: tester.getOutput()) {
          + if (!(line instanceof Watermark))

          { + Assert.fail("Only watermarks are expected here "); + }

          + Watermark w = (Watermark) line;
          + Assert.assertEquals(200 + (i * 100), w.getTimestamp());
          + i++;
          + }
          +
          + // clear the output to get the elements only and the final watermark
          + tester.getOutput().clear();
          + Assert.assertEquals(0, tester.getOutput().size());
          +
          + // create the necessary splits for the test
          + FileInputSplit[] splits = format.createInputSplits(
          + reader.getRuntimeContext().getNumberOfParallelSubtasks());
          +
          + // and feed them to the operator
          + Map<Integer, List<String>> actualFileContents = new HashMap<>();
          +
          + long lastSeenWatermark = Long.MIN_VALUE;
          + int lineCounter = 0; // counter for the lines read from the splits
          + int watermarkCounter = 0;
          +
          + for(FileInputSplit split: splits) {
          +
          + // set the next "current processing time".
          + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
          + timeServiceProvider.setCurrentTime(nextTimestamp);
          +
          + // send the next split to be read and wait until it is fully read.
          + tester.processElement(new StreamRecord<>(split));
          + synchronized (tester.getCheckpointLock()) {
          + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1))

          { + tester.getCheckpointLock().wait(10); + }

          + }
          +
          + // verify that the results are the expected
          + for(Object line: tester.getOutput()) {
          — End diff –

          Generally, I find

          ```java
          for (Object line : tester.getOutput()) {
          ```

          more readable.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81944138 — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java — @@ -106,6 +107,155 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>(); + Map<Integer, String> expectedFileContents = new HashMap<>(); + + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); + + final long watermarkInterval = 10; + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(watermarkInterval); + + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, executionConfig); + + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); + + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(200 + (i * 100), w.getTimestamp()); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(0, tester.getOutput().size()); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + Map<Integer, List<String>> actualFileContents = new HashMap<>(); + + long lastSeenWatermark = Long.MIN_VALUE; + int lineCounter = 0; // counter for the lines read from the splits + int watermarkCounter = 0; + + for(FileInputSplit split: splits) { + + // set the next "current processing time". + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval; + timeServiceProvider.setCurrentTime(nextTimestamp); + + // send the next split to be read and wait until it is fully read. + tester.processElement(new StreamRecord<>(split)); + synchronized (tester.getCheckpointLock()) { + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) { + tester.getCheckpointLock().wait(10); + } + } + + // verify that the results are the expected + for(Object line: tester.getOutput()) { — End diff – Generally, I find ```java for (Object line : tester.getOutput()) { ``` more readable.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mxm commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2593#discussion_r81945670

          — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java —
          @@ -106,6 +107,155 @@ public static void destroyHDFS() {
          // TESTS

          @Test
          + public void testFileReadingOperatorWithIngestionTime() throws Exception {
          + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
          + Map<Integer, String> expectedFileContents = new HashMap<>();
          +
          + for(int i = 0; i < NO_OF_FILES; i++)

          { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + }

          +
          + TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
          + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
          +
          + final long watermarkInterval = 10;
          + ExecutionConfig executionConfig = new ExecutionConfig();
          + executionConfig.setAutoWatermarkInterval(watermarkInterval);
          +
          + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
          + reader.setOutputType(typeInfo, executionConfig);
          +
          + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
          + final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
          + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider);
          + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
          + tester.open();
          +
          + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic());
          +
          + // test that watermarks are correctly emitted
          +
          + timeServiceProvider.setCurrentTime(201);
          + timeServiceProvider.setCurrentTime(301);
          + timeServiceProvider.setCurrentTime(401);
          + timeServiceProvider.setCurrentTime(501);
          +
          + int i = 0;
          + for(Object line: tester.getOutput()) {
          + if (!(line instanceof Watermark))

          { + Assert.fail("Only watermarks are expected here "); + }

          + Watermark w = (Watermark) line;
          + Assert.assertEquals(200 + (i * 100), w.getTimestamp());
          + i++;
          — End diff –

          I think this can be more easily readable if you break it up into:

          ```java
          timeServiceProvider.setCurrentTime(201);
          Assert.assertEquals(200, ((Watermark) tester.getOutput().poll()).getTimestamp());
          // ....
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81945670 — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java — @@ -106,6 +107,155 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>(); + Map<Integer, String> expectedFileContents = new HashMap<>(); + + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); + + final long watermarkInterval = 10; + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(watermarkInterval); + + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, executionConfig); + + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); + + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(200 + (i * 100), w.getTimestamp()); + i++; — End diff – I think this can be more easily readable if you break it up into: ```java timeServiceProvider.setCurrentTime(201); Assert.assertEquals(200, ((Watermark) tester.getOutput().poll()).getTimestamp()); // .... ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mxm commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2593#discussion_r81947893

          — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java —
          @@ -106,6 +107,155 @@ public static void destroyHDFS() {
          // TESTS

          @Test
          + public void testFileReadingOperatorWithIngestionTime() throws Exception {
          + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
          + Map<Integer, String> expectedFileContents = new HashMap<>();
          +
          + for(int i = 0; i < NO_OF_FILES; i++)

          { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + }

          +
          + TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
          + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
          +
          + final long watermarkInterval = 10;
          + ExecutionConfig executionConfig = new ExecutionConfig();
          + executionConfig.setAutoWatermarkInterval(watermarkInterval);
          +
          + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
          + reader.setOutputType(typeInfo, executionConfig);
          +
          + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
          + final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
          + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider);
          + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
          + tester.open();
          +
          + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic());
          +
          + // test that watermarks are correctly emitted
          +
          + timeServiceProvider.setCurrentTime(201);
          + timeServiceProvider.setCurrentTime(301);
          + timeServiceProvider.setCurrentTime(401);
          + timeServiceProvider.setCurrentTime(501);
          +
          + int i = 0;
          + for(Object line: tester.getOutput()) {
          + if (!(line instanceof Watermark))

          { + Assert.fail("Only watermarks are expected here "); + }

          + Watermark w = (Watermark) line;
          + Assert.assertEquals(200 + (i * 100), w.getTimestamp());
          + i++;
          + }
          +
          + // clear the output to get the elements only and the final watermark
          + tester.getOutput().clear();
          + Assert.assertEquals(0, tester.getOutput().size());
          +
          + // create the necessary splits for the test
          + FileInputSplit[] splits = format.createInputSplits(
          + reader.getRuntimeContext().getNumberOfParallelSubtasks());
          +
          + // and feed them to the operator
          + Map<Integer, List<String>> actualFileContents = new HashMap<>();
          +
          + long lastSeenWatermark = Long.MIN_VALUE;
          + int lineCounter = 0; // counter for the lines read from the splits
          + int watermarkCounter = 0;
          +
          + for(FileInputSplit split: splits) {
          +
          + // set the next "current processing time".
          + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
          + timeServiceProvider.setCurrentTime(nextTimestamp);
          +
          + // send the next split to be read and wait until it is fully read.
          + tester.processElement(new StreamRecord<>(split));
          + synchronized (tester.getCheckpointLock()) {
          + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1))

          { + tester.getCheckpointLock().wait(10); + }

          + }
          +
          + // verify that the results are the expected
          + for(Object line: tester.getOutput()) {
          + if (line instanceof StreamRecord) {
          + StreamRecord<String> element = (StreamRecord<String>) line;
          + lineCounter++;
          +
          + Assert.assertEquals(nextTimestamp, element.getTimestamp());
          +
          + int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
          + List<String> content = actualFileContents.get(fileIdx);
          + if (content == null)

          { + content = new ArrayList<>(); + actualFileContents.put(fileIdx, content); + }

          + content.add(element.getValue() + "\n");
          + } else if (line instanceof Watermark)

          { + long watermark = ((Watermark) line).getTimestamp(); + + Assert.assertEquals(nextTimestamp - (nextTimestamp % watermarkInterval), watermark); + Assert.assertTrue(watermark > lastSeenWatermark); + watermarkCounter++; + + lastSeenWatermark = watermark; + }

          else

          { + Assert.fail("Unknown element in the list."); + }

          + }
          +
          + // clean the output to be ready for the next split
          + tester.getOutput().clear();
          + }
          +
          + // now we are processing one split after the other,
          + // so all the elements must be here by now.
          + Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter);
          +
          + // because we expect one watermark per split.
          + Assert.assertEquals(NO_OF_FILES, watermarkCounter);
          +
          + // then close the reader gracefully so that the Long.MAX watermark is emitted
          + synchronized (tester.getCheckpointLock()) {
          + tester.close();
          — End diff –

          Never mind. Actually, `close()` expects as to hold the lock.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81947893 — Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java — @@ -106,6 +107,155 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>(); + Map<Integer, String> expectedFileContents = new HashMap<>(); + + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); + + final long watermarkInterval = 10; + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(watermarkInterval); + + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, executionConfig); + + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); + + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(200 + (i * 100), w.getTimestamp()); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(0, tester.getOutput().size()); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + Map<Integer, List<String>> actualFileContents = new HashMap<>(); + + long lastSeenWatermark = Long.MIN_VALUE; + int lineCounter = 0; // counter for the lines read from the splits + int watermarkCounter = 0; + + for(FileInputSplit split: splits) { + + // set the next "current processing time". + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval; + timeServiceProvider.setCurrentTime(nextTimestamp); + + // send the next split to be read and wait until it is fully read. + tester.processElement(new StreamRecord<>(split)); + synchronized (tester.getCheckpointLock()) { + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) { + tester.getCheckpointLock().wait(10); + } + } + + // verify that the results are the expected + for(Object line: tester.getOutput()) { + if (line instanceof StreamRecord) { + StreamRecord<String> element = (StreamRecord<String>) line; + lineCounter++; + + Assert.assertEquals(nextTimestamp, element.getTimestamp()); + + int fileIdx = Character.getNumericValue(element.getValue().charAt(0)); + List<String> content = actualFileContents.get(fileIdx); + if (content == null) { + content = new ArrayList<>(); + actualFileContents.put(fileIdx, content); + } + content.add(element.getValue() + "\n"); + } else if (line instanceof Watermark) { + long watermark = ((Watermark) line).getTimestamp(); + + Assert.assertEquals(nextTimestamp - (nextTimestamp % watermarkInterval), watermark); + Assert.assertTrue(watermark > lastSeenWatermark); + watermarkCounter++; + + lastSeenWatermark = watermark; + } else { + Assert.fail("Unknown element in the list."); + } + } + + // clean the output to be ready for the next split + tester.getOutput().clear(); + } + + // now we are processing one split after the other, + // so all the elements must be here by now. + Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter); + + // because we expect one watermark per split. + Assert.assertEquals(NO_OF_FILES, watermarkCounter); + + // then close the reader gracefully so that the Long.MAX watermark is emitted + synchronized (tester.getCheckpointLock()) { + tester.close(); — End diff – Never mind. Actually, `close()` expects as to hold the lock.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2593

          Thanks @mxm and @StephanEwen for the comments. I have updated the PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2593 Thanks @mxm and @StephanEwen for the comments. I have updated the PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2593#discussion_r81990874

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java —
          @@ -214,14 +216,26 @@ public AutomaticWatermarkContext(
          this.watermarkInterval = watermarkInterval;
          this.reuse = new StreamRecord<T>(null);

          + // if it is a source, then we cast and cache it
          + // here so that we do not have to do it in every collect(),
          + // collectWithTimestamp() and emitWatermark()
          +
          + if (!(owner instanceof AsyncExceptionChecker))

          { + throw new IllegalStateException("The ManualWatermarkContext can only be used " + + "with sources that implement the AsyncExceptionChecker interface."); + }

          + this.source = (AsyncExceptionChecker) owner;
          +
          long now = owner.getCurrentProcessingTime();
          this.watermarkTimer = owner.registerTimer(now + watermarkInterval,
          new WatermarkEmittingTask(owner, lockingObjectParam, outputParam));
          }

          @Override
          public void collect(T element) {

          • owner.checkAsyncException();
            + if (source != null) {
              • End diff –

          source can never be null?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81990874 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java — @@ -214,14 +216,26 @@ public AutomaticWatermarkContext( this.watermarkInterval = watermarkInterval; this.reuse = new StreamRecord<T>(null); + // if it is a source, then we cast and cache it + // here so that we do not have to do it in every collect(), + // collectWithTimestamp() and emitWatermark() + + if (!(owner instanceof AsyncExceptionChecker)) { + throw new IllegalStateException("The ManualWatermarkContext can only be used " + + "with sources that implement the AsyncExceptionChecker interface."); + } + this.source = (AsyncExceptionChecker) owner; + long now = owner.getCurrentProcessingTime(); this.watermarkTimer = owner.registerTimer(now + watermarkInterval, new WatermarkEmittingTask(owner, lockingObjectParam, outputParam)); } @Override public void collect(T element) { owner.checkAsyncException(); + if (source != null) { End diff – source can never be null?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2593#discussion_r81990910

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java —
          @@ -250,7 +264,9 @@ public void collectWithTimestamp(T element, long timestamp) {

          @Override
          public void emitWatermark(Watermark mark) {

          • owner.checkAsyncException();
            + if (source != null) {
              • End diff –

          same here

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81990910 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java — @@ -250,7 +264,9 @@ public void collectWithTimestamp(T element, long timestamp) { @Override public void emitWatermark(Watermark mark) { owner.checkAsyncException(); + if (source != null) { End diff – same here
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2593#discussion_r81995575

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java —
          @@ -214,14 +216,26 @@ public AutomaticWatermarkContext(
          this.watermarkInterval = watermarkInterval;
          this.reuse = new StreamRecord<T>(null);

          + // if it is a source, then we cast and cache it
          + // here so that we do not have to do it in every collect(),
          + // collectWithTimestamp() and emitWatermark()
          +
          + if (!(owner instanceof AsyncExceptionChecker))

          { + throw new IllegalStateException("The ManualWatermarkContext can only be used " + + "with sources that implement the AsyncExceptionChecker interface."); + }

          + this.source = (AsyncExceptionChecker) owner;
          +
          long now = owner.getCurrentProcessingTime();
          this.watermarkTimer = owner.registerTimer(now + watermarkInterval,
          new WatermarkEmittingTask(owner, lockingObjectParam, outputParam));
          }

          @Override
          public void collect(T element) {

          • owner.checkAsyncException();
            + if (source != null) {
              • End diff –

          You are right. I fixed it!

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81995575 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java — @@ -214,14 +216,26 @@ public AutomaticWatermarkContext( this.watermarkInterval = watermarkInterval; this.reuse = new StreamRecord<T>(null); + // if it is a source, then we cast and cache it + // here so that we do not have to do it in every collect(), + // collectWithTimestamp() and emitWatermark() + + if (!(owner instanceof AsyncExceptionChecker)) { + throw new IllegalStateException("The ManualWatermarkContext can only be used " + + "with sources that implement the AsyncExceptionChecker interface."); + } + this.source = (AsyncExceptionChecker) owner; + long now = owner.getCurrentProcessingTime(); this.watermarkTimer = owner.registerTimer(now + watermarkInterval, new WatermarkEmittingTask(owner, lockingObjectParam, outputParam)); } @Override public void collect(T element) { owner.checkAsyncException(); + if (source != null) { End diff – You are right. I fixed it!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mxm commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2593#discussion_r82004204

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java —
          @@ -214,14 +216,24 @@ public AutomaticWatermarkContext(
          this.watermarkInterval = watermarkInterval;
          this.reuse = new StreamRecord<T>(null);

          + // if it is a source, then we cast and cache it
          + // here so that we do not have to do it in every collect(),
          + // collectWithTimestamp() and emitWatermark()
          +
          + if (!(owner instanceof AsyncExceptionChecker)) {
          + throw new IllegalStateException("The ManualWatermarkContext can only be used " +
          — End diff –

          AutomaticWatermarkContext?

          Show
          githubbot ASF GitHub Bot added a comment - Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r82004204 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java — @@ -214,14 +216,24 @@ public AutomaticWatermarkContext( this.watermarkInterval = watermarkInterval; this.reuse = new StreamRecord<T>(null); + // if it is a source, then we cast and cache it + // here so that we do not have to do it in every collect(), + // collectWithTimestamp() and emitWatermark() + + if (!(owner instanceof AsyncExceptionChecker)) { + throw new IllegalStateException("The ManualWatermarkContext can only be used " + — End diff – AutomaticWatermarkContext?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mxm commented on the issue:

          https://github.com/apache/flink/pull/2593

          Thanks! Merged. Could you close the PR?

          Show
          githubbot ASF GitHub Bot added a comment - Github user mxm commented on the issue: https://github.com/apache/flink/pull/2593 Thanks! Merged. Could you close the PR?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u closed the pull request at:

          https://github.com/apache/flink/pull/2593

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/2593
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2546

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2546
          Hide
          StephanEwen Stephan Ewen added a comment -

          Fixed in

          • 1.2.0 via 8ff451bec58e9f5800eb77c74c1d7457b776cc94
          • 1.1.3 via bab59dfa7cf94f4c392c3205ee180e72e1ad7814
          Show
          StephanEwen Stephan Ewen added a comment - Fixed in 1.2.0 via 8ff451bec58e9f5800eb77c74c1d7457b776cc94 1.1.3 via bab59dfa7cf94f4c392c3205ee180e72e1ad7814

            People

            • Assignee:
              kkl0u Kostas Kloudas
              Reporter:
              aljoscha Aljoscha Krettek
            • Votes:
              1 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development