Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6996

FlinkKafkaProducer010 doesn't guarantee at-least-once semantic

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.2.0, 1.3.0, 1.2.1, 1.3.1
    • Fix Version/s: 1.4.0, 1.3.2
    • Component/s: Kafka Connector
    • Labels:
      None

      Description

      FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This means, when it's used like a "regular sink function" (option a from the java doc) it will not flush the data on "snapshotState" as it is supposed to.

        Issue Links

          Activity

          Hide
          pnowojski Piotr Nowojski added a comment -

          Those test failures most likely are not caused by actual bug in Flink, but a intermittent tests. I have created separate issue for fixing those tests: https://issues.apache.org/jira/browse/FLINK-7343 . Closing this issue.

          Show
          pnowojski Piotr Nowojski added a comment - Those test failures most likely are not caused by actual bug in Flink, but a intermittent tests. I have created separate issue for fixing those tests: https://issues.apache.org/jira/browse/FLINK-7343 . Closing this issue.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user greghogan commented on the issue:

          https://github.com/apache/flink/pull/4456

          +1 this matches the parent `pom.xml`. Wondering if the same change would fix `flink-hbase` always failing for me when running `mvn verify`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4456 +1 this matches the parent `pom.xml`. Wondering if the same change would fix `flink-hbase` always failing for me when running `mvn verify`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user pnowojski opened a pull request:

          https://github.com/apache/flink/pull/4456

          FLINK-6996[kafka] Increase Xmx for tests

          As reported by @NicoK, sometimes 1000m was not enough memory to run at-least-once tests with broker failures on Travis. I remember having the same issue in #4239 where I have set this same value to `2048`. Hopefully it will solve the problems.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/pnowojski/flink kafka010

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4456.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4456


          commit 8c211b8fd975af441c5a762ee45a62a7dd44f173
          Author: Piotr Nowojski <piotr.nowojski@gmail.com>
          Date: 2017-08-01T13:02:56Z

          [hotfix][docs] Add section in docs about writing unit and integration tests

          commit dd7060497454c2450be3f33a4cf7bdf8cc854f14
          Author: Piotr Nowojski <piotr.nowojski@gmail.com>
          Date: 2017-08-01T14:05:49Z

          FLINK-6996[kafka] Increase Xmx for tests

          Sometimes 1000m was not enough memory to run at-least-once tests with broker failures on Travis


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4456 FLINK-6996 [kafka] Increase Xmx for tests As reported by @NicoK, sometimes 1000m was not enough memory to run at-least-once tests with broker failures on Travis. I remember having the same issue in #4239 where I have set this same value to `2048`. Hopefully it will solve the problems. You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink kafka010 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4456.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4456 commit 8c211b8fd975af441c5a762ee45a62a7dd44f173 Author: Piotr Nowojski <piotr.nowojski@gmail.com> Date: 2017-08-01T13:02:56Z [hotfix] [docs] Add section in docs about writing unit and integration tests commit dd7060497454c2450be3f33a4cf7bdf8cc854f14 Author: Piotr Nowojski <piotr.nowojski@gmail.com> Date: 2017-08-01T14:05:49Z FLINK-6996 [kafka] Increase Xmx for tests Sometimes 1000m was not enough memory to run at-least-once tests with broker failures on Travis
          Hide
          NicoK Nico Kruber added a comment -

          I got another incarnation (seen only once) with a different failure (only change in there is switching from HeapMemorySegment to HybridMemorySegment but since the memory type was not changed (still at on-heap by default) this should not be related.

          09:02:49,616 ERROR org.apache.flink.streaming.connectors.kafka.Kafka010ProducerITCase  - 
          --------------------------------------------------------------------------------
          Test testOneToOneAtLeastOnceCustomOperator(org.apache.flink.streaming.connectors.kafka.Kafka010ProducerITCase) failed with:
          java.lang.OutOfMemoryError: Java heap space
          	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
          	at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
          	at kafka.log.SkimpyOffsetMap.<init>(OffsetMap.scala:44)
          	at kafka.log.LogCleaner$CleanerThread.<init>(LogCleaner.scala:198)
          	at kafka.log.LogCleaner$$anonfun$2.apply(LogCleaner.scala:89)
          	at kafka.log.LogCleaner$$anonfun$2.apply(LogCleaner.scala:89)
          	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
          	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
          	at scala.collection.immutable.Range.foreach(Range.scala:160)
          	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
          	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
          	at kafka.log.LogCleaner.<init>(LogCleaner.scala:89)
          	at kafka.log.LogManager.<init>(LogManager.scala:72)
          	at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:648)
          	at kafka.server.KafkaServer.startup(KafkaServer.scala:208)
          	at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.getKafkaServer(KafkaTestEnvironmentImpl.java:433)
          	at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.restartBroker(KafkaTestEnvironmentImpl.java:181)
          	at org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnce(KafkaProducerTestBase.java:282)
          	at org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator(KafkaProducerTestBase.java:212)
          	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          	at java.lang.reflect.Method.invoke(Method.java:498)
          	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
          	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
          	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
          	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
          	at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
          	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
          	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
          	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
          	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
          

          https://transfer.sh/H7pW5/369.3.tar.gz

          Show
          NicoK Nico Kruber added a comment - I got another incarnation (seen only once) with a different failure (only change in there is switching from HeapMemorySegment to HybridMemorySegment but since the memory type was not changed (still at on-heap by default) this should not be related. 09:02:49,616 ERROR org.apache.flink.streaming.connectors.kafka.Kafka010ProducerITCase - -------------------------------------------------------------------------------- Test testOneToOneAtLeastOnceCustomOperator(org.apache.flink.streaming.connectors.kafka.Kafka010ProducerITCase) failed with: java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at kafka.log.SkimpyOffsetMap.<init>(OffsetMap.scala:44) at kafka.log.LogCleaner$CleanerThread.<init>(LogCleaner.scala:198) at kafka.log.LogCleaner$$anonfun$2.apply(LogCleaner.scala:89) at kafka.log.LogCleaner$$anonfun$2.apply(LogCleaner.scala:89) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at kafka.log.LogCleaner.<init>(LogCleaner.scala:89) at kafka.log.LogManager.<init>(LogManager.scala:72) at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:648) at kafka.server.KafkaServer.startup(KafkaServer.scala:208) at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.getKafkaServer(KafkaTestEnvironmentImpl.java:433) at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.restartBroker(KafkaTestEnvironmentImpl.java:181) at org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnce(KafkaProducerTestBase.java:282) at org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator(KafkaProducerTestBase.java:212) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) https://transfer.sh/H7pW5/369.3.tar.gz
          Hide
          till.rohrmann Till Rohrmann added a comment -
          Show
          till.rohrmann Till Rohrmann added a comment - True. I hope this link works now: https://travis-ci.org/tillrohrmann/flink/jobs/258538641
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Till Rohrmann The log is not accessible. (at least for me)

          Show
          aljoscha Aljoscha Krettek added a comment - Till Rohrmann The log is not accessible. (at least for me)
          Hide
          till.rohrmann Till Rohrmann added a comment -

          There seems to be a test instability with Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink.

          https://s3.amazonaws.com/archive.travis-ci.org/jobs/258538641/log.txt

          Show
          till.rohrmann Till Rohrmann added a comment - There seems to be a test instability with Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink . https://s3.amazonaws.com/archive.travis-ci.org/jobs/258538641/log.txt
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pnowojski commented on the issue:

          https://github.com/apache/flink/pull/4206

          thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4206 thanks!
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Fixed for master via 0e7a53189991f7615891e9a168c747f43d1b13c3.
          Fixed for 1.3 via 6630dfdd748dee9c2fa6a0993497dcf3468a0948.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Fixed for master via 0e7a53189991f7615891e9a168c747f43d1b13c3. Fixed for 1.3 via 6630dfdd748dee9c2fa6a0993497dcf3468a0948.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4206

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4206
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4206

          LGTM, merging 👍

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4206 LGTM, merging 👍
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pnowojski commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4206#discussion_r125416757

          — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java —
          @@ -172,6 +195,144 @@ public void cancel() {
          }
          }

          + /**
          + * Tests the at-least-once semantic for the simple writes into Kafka.
          + */
          + @Test
          + public void testOneToOneAtLeastOnceRegularSink() throws Exception

          { + testOneToOneAtLeastOnce(true); + }

          +
          + /**
          + * Tests the at-least-once semantic for the simple writes into Kafka.
          + */
          + @Test
          + public void testOneToOneAtLeastOnceCustomOperator() throws Exception

          { + testOneToOneAtLeastOnce(false); + }

          +
          + /**
          + * This test sets KafkaProducer so that it will not automatically flush the data and
          + * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState.
          + */
          + protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
          + final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
          + final int partition = 0;
          + final int numElements = 1000;
          + final int failAfterElements = 333;
          +
          + createTestTopic(topic, 1, 1);
          +
          + TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
          + KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
          +
          + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          + env.enableCheckpointing(500);
          + env.setParallelism(1);
          + env.setRestartStrategy(RestartStrategies.noRestart());
          + env.getConfig().disableSysoutLogging();
          +
          + Properties properties = new Properties();
          + properties.putAll(standardProps);
          + properties.putAll(secureProps);
          + // decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close()
          + properties.setProperty("timeout.ms", "10000");
          + properties.setProperty("max.block.ms", "10000");
          + // increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately
          + properties.setProperty("batch.size", "10240000");
          + properties.setProperty("linger.ms", "10000");
          +
          + int leaderId = kafkaServer.getLeaderToShutDown(topic);
          + BrokerRestartingMapper.resetState();
          +
          + // process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application
          + DataStream<Integer> inputStream = env
          + .fromCollection(getIntegersSequence(numElements))
          + .map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements));
          +
          + StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
          + @Override
          + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions)

          { + return partition; + }

          + });
          +
          + if (regularSink)

          { + inputStream.addSink(kafkaSink.getUserFunction()); + }

          + else {
          + kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
          + @Override
          + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions)

          { + return partition; + }

          + });
          + }
          +
          + FailingIdentityMapper.failedBefore = false;
          + try

          { + env.execute("One-to-one at least once test"); + fail("Job should fail!"); + }

          + catch (Exception ex) {
          — End diff –

          FYI `getCause` Exception is type of `java.lang.Exception`, so there is no point in making an assertion on that.

          Show
          githubbot ASF GitHub Bot added a comment - Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4206#discussion_r125416757 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java — @@ -172,6 +195,144 @@ public void cancel() { } } + /** + * Tests the at-least-once semantic for the simple writes into Kafka. + */ + @Test + public void testOneToOneAtLeastOnceRegularSink() throws Exception { + testOneToOneAtLeastOnce(true); + } + + /** + * Tests the at-least-once semantic for the simple writes into Kafka. + */ + @Test + public void testOneToOneAtLeastOnceCustomOperator() throws Exception { + testOneToOneAtLeastOnce(false); + } + + /** + * This test sets KafkaProducer so that it will not automatically flush the data and + * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState. + */ + protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception { + final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator"; + final int partition = 0; + final int numElements = 1000; + final int failAfterElements = 333; + + createTestTopic(topic, 1, 1); + + TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); + KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(500); + env.setParallelism(1); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.getConfig().disableSysoutLogging(); + + Properties properties = new Properties(); + properties.putAll(standardProps); + properties.putAll(secureProps); + // decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close() + properties.setProperty("timeout.ms", "10000"); + properties.setProperty("max.block.ms", "10000"); + // increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately + properties.setProperty("batch.size", "10240000"); + properties.setProperty("linger.ms", "10000"); + + int leaderId = kafkaServer.getLeaderToShutDown(topic); + BrokerRestartingMapper.resetState(); + + // process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application + DataStream<Integer> inputStream = env + .fromCollection(getIntegersSequence(numElements)) + .map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements)); + + StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() { + @Override + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + return partition; + } + }); + + if (regularSink) { + inputStream.addSink(kafkaSink.getUserFunction()); + } + else { + kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() { + @Override + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + return partition; + } + }); + } + + FailingIdentityMapper.failedBefore = false; + try { + env.execute("One-to-one at least once test"); + fail("Job should fail!"); + } + catch (Exception ex) { — End diff – FYI `getCause` Exception is type of `java.lang.Exception`, so there is no point in making an assertion on that.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pnowojski commented on the issue:

          https://github.com/apache/flink/pull/4206

          @tzulitai applied changes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4206 @tzulitai applied changes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4206

          Thanks for the fixups @pnowojski!
          I have some final minor comments, other than that this LGTM.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4206 Thanks for the fixups @pnowojski! I have some final minor comments, other than that this LGTM.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4206#discussion_r125411974

          — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java —
          @@ -80,6 +82,12 @@ public void createTestTopic(String topic, int numberOfPartitions, int replicatio

          public abstract <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props);

          + public abstract <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(
          + Properties properties,
          + String topic,
          + int partition,
          + long timeout);
          — End diff –

          nit: the indentation pattern is inconsistent with the other abstract method declarations here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4206#discussion_r125411974 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java — @@ -80,6 +82,12 @@ public void createTestTopic(String topic, int numberOfPartitions, int replicatio public abstract <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props); + public abstract <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic( + Properties properties, + String topic, + int partition, + long timeout); — End diff – nit: the indentation pattern is inconsistent with the other abstract method declarations here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4206#discussion_r125411707

          — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java —
          @@ -172,6 +194,118 @@ public void cancel() {
          }
          }

          + /**
          + * Tests the at-least-once semantic for the simple writes into Kafka.
          + */
          + @Test
          + public void testOneToOneAtLeastOnceRegularSink() throws Exception

          { + testOneToOneAtLeastOnce(true); + }

          +
          + /**
          + * Tests the at-least-once semantic for the simple writes into Kafka.
          + */
          + @Test
          + public void testOneToOneAtLeastOnceCustomOperator() throws Exception

          { + testOneToOneAtLeastOnce(false); + }

          +
          + /**
          + * This test sets KafkaProducer so that it will not automatically flush the data and
          + * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState.
          + */
          + protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
          + final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
          + final int partition = 0;
          + final int numElements = 1000;
          + final int failAfterElements = 333;
          +
          + createTestTopic(topic, 1, 1);
          +
          + TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
          + KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
          +
          + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          + env.enableCheckpointing(500);
          + env.setParallelism(1);
          + env.setRestartStrategy(RestartStrategies.noRestart());
          + env.getConfig().disableSysoutLogging();
          +
          + Properties properties = new Properties();
          + properties.putAll(standardProps);
          + properties.putAll(secureProps);
          + // decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close()
          + properties.setProperty("timeout.ms", "10000");
          + properties.setProperty("max.block.ms", "10000");
          + // increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately
          + properties.setProperty("batch.size", "10240000");
          + properties.setProperty("linger.ms", "10000");
          +
          + int leaderId = kafkaServer.getLeaderToShutDown(topic);
          + BrokerRestartingMapper.resetState();
          +
          + // process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application
          + DataStream<Integer> inputStream = env
          + .fromCollection(getIntegersSequence(numElements))
          + .map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements));
          +
          + StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
          + @Override
          + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions)

          { + return partition; + }

          + });
          +
          + if (regularSink)

          { + inputStream.addSink(kafkaSink.getUserFunction()); + }

          + else {
          + kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
          + @Override
          + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions)

          { + return partition; + }

          + });
          + }
          +
          + FailingIdentityMapper.failedBefore = false;
          — End diff –

          I see. Perhaps we can make this more explicit by following the same pattern as `BrokerRestartingMapper.resetState()`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4206#discussion_r125411707 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java — @@ -172,6 +194,118 @@ public void cancel() { } } + /** + * Tests the at-least-once semantic for the simple writes into Kafka. + */ + @Test + public void testOneToOneAtLeastOnceRegularSink() throws Exception { + testOneToOneAtLeastOnce(true); + } + + /** + * Tests the at-least-once semantic for the simple writes into Kafka. + */ + @Test + public void testOneToOneAtLeastOnceCustomOperator() throws Exception { + testOneToOneAtLeastOnce(false); + } + + /** + * This test sets KafkaProducer so that it will not automatically flush the data and + * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState. + */ + protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception { + final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator"; + final int partition = 0; + final int numElements = 1000; + final int failAfterElements = 333; + + createTestTopic(topic, 1, 1); + + TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); + KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(500); + env.setParallelism(1); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.getConfig().disableSysoutLogging(); + + Properties properties = new Properties(); + properties.putAll(standardProps); + properties.putAll(secureProps); + // decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close() + properties.setProperty("timeout.ms", "10000"); + properties.setProperty("max.block.ms", "10000"); + // increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately + properties.setProperty("batch.size", "10240000"); + properties.setProperty("linger.ms", "10000"); + + int leaderId = kafkaServer.getLeaderToShutDown(topic); + BrokerRestartingMapper.resetState(); + + // process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application + DataStream<Integer> inputStream = env + .fromCollection(getIntegersSequence(numElements)) + .map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements)); + + StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() { + @Override + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + return partition; + } + }); + + if (regularSink) { + inputStream.addSink(kafkaSink.getUserFunction()); + } + else { + kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() { + @Override + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + return partition; + } + }); + } + + FailingIdentityMapper.failedBefore = false; — End diff – I see. Perhaps we can make this more explicit by following the same pattern as `BrokerRestartingMapper.resetState()`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4206#discussion_r125411227

          — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java —
          @@ -172,6 +195,144 @@ public void cancel() {
          }
          }

          + /**
          + * Tests the at-least-once semantic for the simple writes into Kafka.
          + */
          + @Test
          + public void testOneToOneAtLeastOnceRegularSink() throws Exception

          { + testOneToOneAtLeastOnce(true); + }

          +
          + /**
          + * Tests the at-least-once semantic for the simple writes into Kafka.
          + */
          + @Test
          + public void testOneToOneAtLeastOnceCustomOperator() throws Exception

          { + testOneToOneAtLeastOnce(false); + }

          +
          + /**
          + * This test sets KafkaProducer so that it will not automatically flush the data and
          + * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState.
          + */
          + protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
          + final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
          + final int partition = 0;
          + final int numElements = 1000;
          + final int failAfterElements = 333;
          +
          + createTestTopic(topic, 1, 1);
          +
          + TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
          + KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
          +
          + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          + env.enableCheckpointing(500);
          + env.setParallelism(1);
          + env.setRestartStrategy(RestartStrategies.noRestart());
          + env.getConfig().disableSysoutLogging();
          +
          + Properties properties = new Properties();
          + properties.putAll(standardProps);
          + properties.putAll(secureProps);
          + // decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close()
          + properties.setProperty("timeout.ms", "10000");
          + properties.setProperty("max.block.ms", "10000");
          + // increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately
          + properties.setProperty("batch.size", "10240000");
          + properties.setProperty("linger.ms", "10000");
          +
          + int leaderId = kafkaServer.getLeaderToShutDown(topic);
          + BrokerRestartingMapper.resetState();
          +
          + // process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application
          + DataStream<Integer> inputStream = env
          + .fromCollection(getIntegersSequence(numElements))
          + .map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements));
          +
          + StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
          + @Override
          + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions)

          { + return partition; + }

          + });
          +
          + if (regularSink)

          { + inputStream.addSink(kafkaSink.getUserFunction()); + }

          + else {
          + kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
          + @Override
          + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions)

          { + return partition; + }

          + });
          + }
          +
          + FailingIdentityMapper.failedBefore = false;
          + try

          { + env.execute("One-to-one at least once test"); + fail("Job should fail!"); + }

          + catch (Exception ex) {
          — End diff –

          I think we need a more specific exception here.
          There may be actual exceptions thrown by Flink that would be masked by this assumption.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4206#discussion_r125411227 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java — @@ -172,6 +195,144 @@ public void cancel() { } } + /** + * Tests the at-least-once semantic for the simple writes into Kafka. + */ + @Test + public void testOneToOneAtLeastOnceRegularSink() throws Exception { + testOneToOneAtLeastOnce(true); + } + + /** + * Tests the at-least-once semantic for the simple writes into Kafka. + */ + @Test + public void testOneToOneAtLeastOnceCustomOperator() throws Exception { + testOneToOneAtLeastOnce(false); + } + + /** + * This test sets KafkaProducer so that it will not automatically flush the data and + * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState. + */ + protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception { + final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator"; + final int partition = 0; + final int numElements = 1000; + final int failAfterElements = 333; + + createTestTopic(topic, 1, 1); + + TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); + KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(500); + env.setParallelism(1); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.getConfig().disableSysoutLogging(); + + Properties properties = new Properties(); + properties.putAll(standardProps); + properties.putAll(secureProps); + // decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close() + properties.setProperty("timeout.ms", "10000"); + properties.setProperty("max.block.ms", "10000"); + // increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately + properties.setProperty("batch.size", "10240000"); + properties.setProperty("linger.ms", "10000"); + + int leaderId = kafkaServer.getLeaderToShutDown(topic); + BrokerRestartingMapper.resetState(); + + // process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application + DataStream<Integer> inputStream = env + .fromCollection(getIntegersSequence(numElements)) + .map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements)); + + StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() { + @Override + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + return partition; + } + }); + + if (regularSink) { + inputStream.addSink(kafkaSink.getUserFunction()); + } + else { + kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() { + @Override + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + return partition; + } + }); + } + + FailingIdentityMapper.failedBefore = false; + try { + env.execute("One-to-one at least once test"); + fail("Job should fail!"); + } + catch (Exception ex) { — End diff – I think we need a more specific exception here. There may be actual exceptions thrown by Flink that would be masked by this assumption.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pnowojski commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4206#discussion_r125306972

          — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java —
          @@ -172,6 +194,118 @@ public void cancel() {
          }
          }

          + /**
          + * Tests the at-least-once semantic for the simple writes into Kafka.
          + */
          + @Test
          + public void testOneToOneAtLeastOnceRegularSink() throws Exception

          { + testOneToOneAtLeastOnce(true); + }

          +
          + /**
          + * Tests the at-least-once semantic for the simple writes into Kafka.
          + */
          + @Test
          + public void testOneToOneAtLeastOnceCustomOperator() throws Exception

          { + testOneToOneAtLeastOnce(false); + }

          +
          + /**
          + * This test sets KafkaProducer so that it will not automatically flush the data and
          + * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState.
          + */
          + protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
          + final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
          + final int partition = 0;
          + final int numElements = 1000;
          + final int failAfterElements = 333;
          +
          + createTestTopic(topic, 1, 1);
          +
          + TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
          + KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
          +
          + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          + env.enableCheckpointing(500);
          + env.setParallelism(1);
          + env.setRestartStrategy(RestartStrategies.noRestart());
          + env.getConfig().disableSysoutLogging();
          +
          + Properties properties = new Properties();
          + properties.putAll(standardProps);
          + properties.putAll(secureProps);
          + // decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close()
          + properties.setProperty("timeout.ms", "10000");
          + properties.setProperty("max.block.ms", "10000");
          + // increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately
          + properties.setProperty("batch.size", "10240000");
          + properties.setProperty("linger.ms", "10000");
          +
          + int leaderId = kafkaServer.getLeaderToShutDown(topic);
          + BrokerRestartingMapper.resetState();
          +
          + // process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application
          + DataStream<Integer> inputStream = env
          + .fromCollection(getIntegersSequence(numElements))
          + .map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements));
          +
          + StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
          + @Override
          + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions)

          { + return partition; + }

          + });
          +
          + if (regularSink)

          { + inputStream.addSink(kafkaSink.getUserFunction()); + }

          + else {
          + kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
          + @Override
          + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions)

          { + return partition; + }

          + });
          + }
          +
          + FailingIdentityMapper.failedBefore = false;
          — End diff –

          This is static variable and `FialingIdentityMapper` is used twice. First to test regular sink and then custom sink operator. Without reseting this state second test run would fail.

          Show
          githubbot ASF GitHub Bot added a comment - Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4206#discussion_r125306972 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java — @@ -172,6 +194,118 @@ public void cancel() { } } + /** + * Tests the at-least-once semantic for the simple writes into Kafka. + */ + @Test + public void testOneToOneAtLeastOnceRegularSink() throws Exception { + testOneToOneAtLeastOnce(true); + } + + /** + * Tests the at-least-once semantic for the simple writes into Kafka. + */ + @Test + public void testOneToOneAtLeastOnceCustomOperator() throws Exception { + testOneToOneAtLeastOnce(false); + } + + /** + * This test sets KafkaProducer so that it will not automatically flush the data and + * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState. + */ + protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception { + final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator"; + final int partition = 0; + final int numElements = 1000; + final int failAfterElements = 333; + + createTestTopic(topic, 1, 1); + + TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); + KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(500); + env.setParallelism(1); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.getConfig().disableSysoutLogging(); + + Properties properties = new Properties(); + properties.putAll(standardProps); + properties.putAll(secureProps); + // decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close() + properties.setProperty("timeout.ms", "10000"); + properties.setProperty("max.block.ms", "10000"); + // increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately + properties.setProperty("batch.size", "10240000"); + properties.setProperty("linger.ms", "10000"); + + int leaderId = kafkaServer.getLeaderToShutDown(topic); + BrokerRestartingMapper.resetState(); + + // process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application + DataStream<Integer> inputStream = env + .fromCollection(getIntegersSequence(numElements)) + .map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements)); + + StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() { + @Override + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + return partition; + } + }); + + if (regularSink) { + inputStream.addSink(kafkaSink.getUserFunction()); + } + else { + kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() { + @Override + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + return partition; + } + }); + } + + FailingIdentityMapper.failedBefore = false; — End diff – This is static variable and `FialingIdentityMapper` is used twice. First to test regular sink and then custom sink operator. Without reseting this state second test run would fail.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pnowojski commented on the issue:

          https://github.com/apache/flink/pull/4206

          Sorry, those tests were passing before rebase - after rebase I have accidentally reverted this previous fixup. I have re-introduced it now as a separate commit

          Show
          githubbot ASF GitHub Bot added a comment - Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4206 Sorry, those tests were passing before rebase - after rebase I have accidentally reverted this previous fixup. I have re-introduced it now as a separate commit
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4206

          I wonder if we can actually replace this validation step with a validating map function after the Kafka producer sink. e.g. use a Flink Kafka consumer to read the results, followed by a validating flat map function?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4206 I wonder if we can actually replace this validation step with a validating map function after the Kafka producer sink. e.g. use a Flink Kafka consumer to read the results, followed by a validating flat map function?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4206

          It seems like the tests are still failing on Travis:

          >Failed tests:
          Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink:202->KafkaProducerTestBase.testOneToOneAtLeastOnce:282->KafkaProducerTestBase.assertAtLeastOnceForTopic:298 expected:<[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 274, 275, 276, 277, 278, 279, 280, 281, 282, 283, 284, 285, 286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 301, 302, 303, 304, 305, 306, 307, 308, 309, 310, 311, 312, 313, 314, 315, 316, 317, 318, 319, 320, 321, 322, 323, 324, 325, 326, 327, 328, 329, 330, 331]> but was:<[]>

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4206 It seems like the tests are still failing on Travis: >Failed tests: Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink:202->KafkaProducerTestBase.testOneToOneAtLeastOnce:282->KafkaProducerTestBase.assertAtLeastOnceForTopic:298 expected:< [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 274, 275, 276, 277, 278, 279, 280, 281, 282, 283, 284, 285, 286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 301, 302, 303, 304, 305, 306, 307, 308, 309, 310, 311, 312, 313, 314, 315, 316, 317, 318, 319, 320, 321, 322, 323, 324, 325, 326, 327, 328, 329, 330, 331] > but was:<[]>
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4206#discussion_r125295964

          — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java —
          @@ -172,6 +194,118 @@ public void cancel() {
          }
          }

          + /**
          + * Tests the at-least-once semantic for the simple writes into Kafka.
          + */
          + @Test
          + public void testOneToOneAtLeastOnceRegularSink() throws Exception

          { + testOneToOneAtLeastOnce(true); + }

          +
          + /**
          + * Tests the at-least-once semantic for the simple writes into Kafka.
          + */
          + @Test
          + public void testOneToOneAtLeastOnceCustomOperator() throws Exception

          { + testOneToOneAtLeastOnce(false); + }

          +
          + /**
          + * This test sets KafkaProducer so that it will not automatically flush the data and
          + * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState.
          + */
          + protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
          + final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
          + final int partition = 0;
          + final int numElements = 1000;
          + final int failAfterElements = 333;
          +
          + createTestTopic(topic, 1, 1);
          +
          + TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
          + KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
          +
          + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          + env.enableCheckpointing(500);
          + env.setParallelism(1);
          + env.setRestartStrategy(RestartStrategies.noRestart());
          + env.getConfig().disableSysoutLogging();
          +
          + Properties properties = new Properties();
          + properties.putAll(standardProps);
          + properties.putAll(secureProps);
          + // decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close()
          + properties.setProperty("timeout.ms", "10000");
          + properties.setProperty("max.block.ms", "10000");
          + // increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately
          + properties.setProperty("batch.size", "10240000");
          + properties.setProperty("linger.ms", "10000");
          +
          + int leaderId = kafkaServer.getLeaderToShutDown(topic);
          + BrokerRestartingMapper.resetState();
          +
          + // process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application
          + DataStream<Integer> inputStream = env
          + .fromCollection(getIntegersSequence(numElements))
          + .map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements));
          +
          + StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
          + @Override
          + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions)

          { + return partition; + }

          + });
          +
          + if (regularSink)

          { + inputStream.addSink(kafkaSink.getUserFunction()); + }

          + else {
          + kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
          + @Override
          + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions)

          { + return partition; + }

          + });
          + }
          +
          + FailingIdentityMapper.failedBefore = false;
          — End diff –

          Why do we need this here? I don't see that the `FailingIdentityMapper` is used elsewhere in the pipeline.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4206#discussion_r125295964 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java — @@ -172,6 +194,118 @@ public void cancel() { } } + /** + * Tests the at-least-once semantic for the simple writes into Kafka. + */ + @Test + public void testOneToOneAtLeastOnceRegularSink() throws Exception { + testOneToOneAtLeastOnce(true); + } + + /** + * Tests the at-least-once semantic for the simple writes into Kafka. + */ + @Test + public void testOneToOneAtLeastOnceCustomOperator() throws Exception { + testOneToOneAtLeastOnce(false); + } + + /** + * This test sets KafkaProducer so that it will not automatically flush the data and + * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState. + */ + protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception { + final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator"; + final int partition = 0; + final int numElements = 1000; + final int failAfterElements = 333; + + createTestTopic(topic, 1, 1); + + TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); + KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(500); + env.setParallelism(1); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.getConfig().disableSysoutLogging(); + + Properties properties = new Properties(); + properties.putAll(standardProps); + properties.putAll(secureProps); + // decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close() + properties.setProperty("timeout.ms", "10000"); + properties.setProperty("max.block.ms", "10000"); + // increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately + properties.setProperty("batch.size", "10240000"); + properties.setProperty("linger.ms", "10000"); + + int leaderId = kafkaServer.getLeaderToShutDown(topic); + BrokerRestartingMapper.resetState(); + + // process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application + DataStream<Integer> inputStream = env + .fromCollection(getIntegersSequence(numElements)) + .map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements)); + + StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() { + @Override + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + return partition; + } + }); + + if (regularSink) { + inputStream.addSink(kafkaSink.getUserFunction()); + } + else { + kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() { + @Override + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + return partition; + } + }); + } + + FailingIdentityMapper.failedBefore = false; — End diff – Why do we need this here? I don't see that the `FailingIdentityMapper` is used elsewhere in the pipeline.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pnowojski commented on the issue:

          https://github.com/apache/flink/pull/4206

          Hmm, this is disturbing. Locally it works for me always. I have rewritten test so that it should be less prone to intermittent failures (longer reading from Kafka timeout). Hopefully that will solve this issue, otherwise we still have some bug.

          Show
          githubbot ASF GitHub Bot added a comment - Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4206 Hmm, this is disturbing. Locally it works for me always. I have rewritten test so that it should be less prone to intermittent failures (longer reading from Kafka timeout). Hopefully that will solve this issue, otherwise we still have some bug.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4206

          @pnowojski related test `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` is failing in Travis. It seems like the fetched records from Kafka is empty?

          ```
          Failed tests:
          Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink:202->KafkaProducerTestBase.testOneToOneAtLeastOnce:282->KafkaProducerTestBase.assertAtLeastOnceForTopic:298 expected:<[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266, 267, 268, 269]> but was:<[]>
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4206 @pnowojski related test `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` is failing in Travis. It seems like the fetched records from Kafka is empty? ``` Failed tests: Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink:202->KafkaProducerTestBase.testOneToOneAtLeastOnce:282->KafkaProducerTestBase.assertAtLeastOnceForTopic:298 expected:< [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266, 267, 268, 269] > but was:<[]> ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pnowojski commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4206#discussion_r124476992

          — Diff: flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java —
          @@ -18,17 +18,19 @@

          package org.apache.flink.streaming.connectors.kafka;

          -import org.junit.Test;
          -
          /**

          • IT cases for the {@link FlinkKafkaProducer08}

            .
            */
            @SuppressWarnings("serial")
            public class Kafka08ProducerITCase extends KafkaProducerTestBase {

          • @Test
          • public void testCustomPartitioning() {
          • runCustomPartitioningTest();
            + @Override
            + public void testOneToOneAtLeastOnceRegularSink() throws Exception {
            + // TODO: enable this for Kafka 0.8 - now it hangs indefinitely
              • End diff –

          I will not fix this test (I'm pretty sure this is a test issue) within the scope of this ticket. I even think that it's not worth the effort to investigate it at all - it is difficult to debug those failure tests and Kafka 0.8 is pretty old.

          Show
          githubbot ASF GitHub Bot added a comment - Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4206#discussion_r124476992 — Diff: flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java — @@ -18,17 +18,19 @@ package org.apache.flink.streaming.connectors.kafka; -import org.junit.Test; - /** IT cases for the {@link FlinkKafkaProducer08} . */ @SuppressWarnings("serial") public class Kafka08ProducerITCase extends KafkaProducerTestBase { @Test public void testCustomPartitioning() { runCustomPartitioningTest(); + @Override + public void testOneToOneAtLeastOnceRegularSink() throws Exception { + // TODO: enable this for Kafka 0.8 - now it hangs indefinitely End diff – I will not fix this test (I'm pretty sure this is a test issue) within the scope of this ticket. I even think that it's not worth the effort to investigate it at all - it is difficult to debug those failure tests and Kafka 0.8 is pretty old.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pnowojski commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4206#discussion_r124476235

          — Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java —
          @@ -116,6 +120,30 @@ public String getVersion() {
          }

          @Override
          + public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition) {
          + ImmutableList.Builder<ConsumerRecord<K, V>> result = ImmutableList.builder();
          + KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
          + consumer.assign(ImmutableList.of(new TopicPartition(topic, partition)));
          +
          + while (true) {
          + boolean processedAtLeastOneRecord = false;
          — End diff –

          No, it's other way around. We are braking the loop if after pooling for 1 second for next records we did get an empty response. Added comment.

          Show
          githubbot ASF GitHub Bot added a comment - Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4206#discussion_r124476235 — Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java — @@ -116,6 +120,30 @@ public String getVersion() { } @Override + public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition) { + ImmutableList.Builder<ConsumerRecord<K, V>> result = ImmutableList.builder(); + KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties); + consumer.assign(ImmutableList.of(new TopicPartition(topic, partition))); + + while (true) { + boolean processedAtLeastOneRecord = false; — End diff – No, it's other way around. We are braking the loop if after pooling for 1 second for next records we did get an empty response. Added comment.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4206#discussion_r124467929

          — Diff: flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java —
          @@ -28,6 +28,7 @@
          import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
          import org.apache.flink.util.NetUtils;

          +import com.google.common.collect.ImmutableList;
          — End diff –

          Avoid Guava

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4206#discussion_r124467929 — Diff: flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java — @@ -28,6 +28,7 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.util.NetUtils; +import com.google.common.collect.ImmutableList; — End diff – Avoid Guava
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4206#discussion_r124467740

          — Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java —
          @@ -18,17 +18,13 @@

          package org.apache.flink.streaming.connectors.kafka;

          -import org.junit.Test;
          -
          /**

          • IT cases for the {@link FlinkKafkaProducer09}

            .
            */
            @SuppressWarnings("serial")
            public class Kafka09ProducerITCase extends KafkaProducerTestBase {
            -

          • @Test
          • public void testCustomPartitioning() {
          • runCustomPartitioningTest();
            + @Override
            + public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
            + // Disable this test since FlinkKafka09Producer doesn't support writing timestamps
              • End diff –

          Same here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4206#discussion_r124467740 — Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java — @@ -18,17 +18,13 @@ package org.apache.flink.streaming.connectors.kafka; -import org.junit.Test; - /** IT cases for the {@link FlinkKafkaProducer09} . */ @SuppressWarnings("serial") public class Kafka09ProducerITCase extends KafkaProducerTestBase { - @Test public void testCustomPartitioning() { runCustomPartitioningTest(); + @Override + public void testOneToOneAtLeastOnceCustomOperator() throws Exception { + // Disable this test since FlinkKafka09Producer doesn't support writing timestamps End diff – Same here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4206#discussion_r124467315

          — Diff: flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java —
          @@ -18,17 +18,19 @@

          package org.apache.flink.streaming.connectors.kafka;

          -import org.junit.Test;
          -
          /**

          • IT cases for the {@link FlinkKafkaProducer08}

            .
            */
            @SuppressWarnings("serial")
            public class Kafka08ProducerITCase extends KafkaProducerTestBase {

          • @Test
          • public void testCustomPartitioning() {
          • runCustomPartitioningTest();
            + @Override
            + public void testOneToOneAtLeastOnceRegularSink() throws Exception {
            + // TODO: enable this for Kafka 0.8 - now it hangs indefinitely
              • End diff –

          If this a pending fix?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4206#discussion_r124467315 — Diff: flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java — @@ -18,17 +18,19 @@ package org.apache.flink.streaming.connectors.kafka; -import org.junit.Test; - /** IT cases for the {@link FlinkKafkaProducer08} . */ @SuppressWarnings("serial") public class Kafka08ProducerITCase extends KafkaProducerTestBase { @Test public void testCustomPartitioning() { runCustomPartitioningTest(); + @Override + public void testOneToOneAtLeastOnceRegularSink() throws Exception { + // TODO: enable this for Kafka 0.8 - now it hangs indefinitely End diff – If this a pending fix?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4206#discussion_r124466523

          — Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java —
          @@ -411,6 +414,18 @@ public void processElement(StreamRecord<T> element) throws Exception

          { invokeInternal(element.getValue(), element.getTimestamp()); }

          + @Override
          + public void snapshotState(FunctionSnapshotContext context) throws Exception

          { + final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; + internalProducer.snapshotState(context); + }

          +
          + @Override
          + public void initializeState(FunctionInitializationContext context) throws Exception {
          — End diff –

          nit: I would declare `initializeState` before `snapshotState`, just for the sake of a better logic flow.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4206#discussion_r124466523 — Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java — @@ -411,6 +414,18 @@ public void processElement(StreamRecord<T> element) throws Exception { invokeInternal(element.getValue(), element.getTimestamp()); } + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; + internalProducer.snapshotState(context); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { — End diff – nit: I would declare `initializeState` before `snapshotState`, just for the sake of a better logic flow.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4206#discussion_r124467541

          — Diff: flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java —
          @@ -18,17 +18,19 @@

          package org.apache.flink.streaming.connectors.kafka;

          -import org.junit.Test;
          -
          /**

          • IT cases for the {@link FlinkKafkaProducer08}

            .
            */
            @SuppressWarnings("serial")
            public class Kafka08ProducerITCase extends KafkaProducerTestBase {

          • @Test
          • public void testCustomPartitioning() {
          • runCustomPartitioningTest();
            + @Override
            + public void testOneToOneAtLeastOnceRegularSink() throws Exception { + // TODO: enable this for Kafka 0.8 - now it hangs indefinitely }

          + @Override
          + public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
          + // Disable this test since FlinkKafka08Producer doesn't support writing timestamps
          — End diff –

          I would perhaps rephrase this comment a bit:
          it's disabled because FlinkKafka08Producer doesn't run in the custom operator mode (to be coherent with the test case name)

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4206#discussion_r124467541 — Diff: flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java — @@ -18,17 +18,19 @@ package org.apache.flink.streaming.connectors.kafka; -import org.junit.Test; - /** IT cases for the {@link FlinkKafkaProducer08} . */ @SuppressWarnings("serial") public class Kafka08ProducerITCase extends KafkaProducerTestBase { @Test public void testCustomPartitioning() { runCustomPartitioningTest(); + @Override + public void testOneToOneAtLeastOnceRegularSink() throws Exception { + // TODO: enable this for Kafka 0.8 - now it hangs indefinitely } + @Override + public void testOneToOneAtLeastOnceCustomOperator() throws Exception { + // Disable this test since FlinkKafka08Producer doesn't support writing timestamps — End diff – I would perhaps rephrase this comment a bit: it's disabled because FlinkKafka08Producer doesn't run in the custom operator mode (to be coherent with the test case name)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4206#discussion_r124466733

          — Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java —
          @@ -26,6 +26,7 @@
          import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
          import org.apache.flink.util.NetUtils;

          +import com.google.common.collect.ImmutableList;
          — End diff –

          In Flink we usually try to avoid Guava usages. Would it be easy to switch to `Collections.unmodifiableList`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4206#discussion_r124466733 — Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java — @@ -26,6 +26,7 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.util.NetUtils; +import com.google.common.collect.ImmutableList; — End diff – In Flink we usually try to avoid Guava usages. Would it be easy to switch to `Collections.unmodifiableList`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4206#discussion_r124468424

          — Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java —
          @@ -116,6 +120,30 @@ public String getVersion() {
          }

          @Override
          + public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition) {
          + ImmutableList.Builder<ConsumerRecord<K, V>> result = ImmutableList.builder();
          + KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
          + consumer.assign(ImmutableList.of(new TopicPartition(topic, partition)));
          +
          + while (true) {
          + boolean processedAtLeastOneRecord = false;
          — End diff –

          I'm a bit confused by this flag.
          The method name is `getAllRecordsFromTopic`, but it seems like we're escaping the loop once some record is fetched.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4206#discussion_r124468424 — Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java — @@ -116,6 +120,30 @@ public String getVersion() { } @Override + public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition) { + ImmutableList.Builder<ConsumerRecord<K, V>> result = ImmutableList.builder(); + KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties); + consumer.assign(ImmutableList.of(new TopicPartition(topic, partition))); + + while (true) { + boolean processedAtLeastOneRecord = false; — End diff – I'm a bit confused by this flag. The method name is `getAllRecordsFromTopic`, but it seems like we're escaping the loop once some record is fetched.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user pnowojski opened a pull request:

          https://github.com/apache/flink/pull/4206

          FLINK-6996 FlinkKafkaProducer010 doesn't guarantee at-least-once semantic

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/pnowojski/flink at-least-once

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4206.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4206


          commit b05b72a2baab8656787e2020120750e780b37621
          Author: Piotr Nowojski <piotr.nowojski@gmail.com>
          Date: 2017-06-26T09:28:51Z

          FLINK-6996 Refactor and automaticall inherit KafkaProducer integration tests

          commit 62b553503964230d8af6d7d79054721060da8061
          Author: Piotr Nowojski <piotr.nowojski@gmail.com>
          Date: 2017-06-26T10:20:36Z

          FLINK-6996 Fix formatting in KafkaConsumerTestBase and KafkaProducerTestBase

          commit 34ba4b74f0c5c6b915695ab8bf7bda5b40955d5b
          Author: Piotr Nowojski <piotr.nowojski@gmail.com>
          Date: 2017-06-26T10:36:40Z

          FLINK-6996 Fix at-least-once semantic for FlinkKafkaProducer010

          Add tests coverage for Kafka 0.10 and 0.9


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4206 FLINK-6996 FlinkKafkaProducer010 doesn't guarantee at-least-once semantic You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink at-least-once Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4206.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4206 commit b05b72a2baab8656787e2020120750e780b37621 Author: Piotr Nowojski <piotr.nowojski@gmail.com> Date: 2017-06-26T09:28:51Z FLINK-6996 Refactor and automaticall inherit KafkaProducer integration tests commit 62b553503964230d8af6d7d79054721060da8061 Author: Piotr Nowojski <piotr.nowojski@gmail.com> Date: 2017-06-26T10:20:36Z FLINK-6996 Fix formatting in KafkaConsumerTestBase and KafkaProducerTestBase commit 34ba4b74f0c5c6b915695ab8bf7bda5b40955d5b Author: Piotr Nowojski <piotr.nowojski@gmail.com> Date: 2017-06-26T10:36:40Z FLINK-6996 Fix at-least-once semantic for FlinkKafkaProducer010 Add tests coverage for Kafka 0.10 and 0.9
          Hide
          pnowojski Piotr Nowojski added a comment -

          Tzu-Li (Gordon) Tai I guess we were both wrong. I added tests for this issue for both (a) and (b), and both were failing (not flushing the data) before fix.

          Show
          pnowojski Piotr Nowojski added a comment - Tzu-Li (Gordon) Tai I guess we were both wrong. I added tests for this issue for both (a) and (b), and both were failing (not flushing the data) before fix.
          Hide
          pnowojski Piotr Nowojski added a comment - - edited

          Yes, thanks, you are correct. Fixed description.

          Show
          pnowojski Piotr Nowojski added a comment - - edited Yes, thanks, you are correct. Fixed description.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          I think its the other way around.
          In approach (b), i.e. FlinkKafkaProducer010.writeToKafkaWithTimestamps(inStream, schema, config), flushing works.
          It's in approach (a) where its used as a regular sink UDF stream.addSink(new FlinkKafkaProducer010(...)), since it doesn't implement the CheckpointedFunction interface, there's no flushing happening.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited I think its the other way around. In approach (b), i.e. FlinkKafkaProducer010.writeToKafkaWithTimestamps(inStream, schema, config) , flushing works. It's in approach (a) where its used as a regular sink UDF stream.addSink(new FlinkKafkaProducer010(...)) , since it doesn't implement the CheckpointedFunction interface, there's no flushing happening.

            People

            • Assignee:
              pnowojski Piotr Nowojski
              Reporter:
              pnowojski Piotr Nowojski
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development