Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5101

Test CassandraConnectorITCase instable

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0, 1.4.0
    • Component/s: Cassandra Connector
    • Labels:
      None

      Description

      I observed this test fail on Travis (very rarely):

      Running org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase

      Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 80.843 sec <<< FAILURE! - in org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase
      testCassandraBatchFormats(org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase) Time elapsed: 5.82 sec <<< FAILURE!
      java.lang.AssertionError: expected:<40> but was:<20>
      at org.junit.Assert.fail(Assert.java:88)
      at org.junit.Assert.failNotEquals(Assert.java:834)
      at org.junit.Assert.assertEquals(Assert.java:645)
      at org.junit.Assert.assertEquals(Assert.java:631)
      at org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase.testCassandraBatchFormats(CassandraConnectorITCase.java:442)

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zentol opened a pull request:

          https://github.com/apache/flink/pull/2866

          FLINK-5101 Refactor CassandraConnectorITCase

          This PR refactors the CassandraConnectorITCase to be a bit more stable and easier to debug.

          The following changes were made:

          • we no longer run actual flink jobs; all tests directly interact with the sink to save resources
          • every test uses a different table, preventing race conditions related to truncating the table
          • the at-least-once sinks were modified to track pending updates
            => the pojo sink was modified to use a method that returns an actually useful `Future`
            => since the sink waits in `close()` for pending updates it can no longer occur that a test checks a condition prematurely, improving stability
          • the initial connection is established across a time-span of 30 seconds, increasing the chance that cassandra has started before the tests are run

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zentol/flink 4177_cass_test

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2866.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2866


          commit e2cb8b9dc1131422dc97c0c08cbacd39cb747a44
          Author: zentol <chesnay@apache.org>
          Date: 2016-11-23T15:59:22Z

          FLINK-5101 Track pending records in CassandraSinkBase

          commit 4c74937fac3e112f24a1af62d529153ce3aabb68
          Author: zentol <chesnay@apache.org>
          Date: 2016-11-23T16:29:51Z

          FLINK-5101 Refactor CassandraConnectorITCase


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/2866 FLINK-5101 Refactor CassandraConnectorITCase This PR refactors the CassandraConnectorITCase to be a bit more stable and easier to debug. The following changes were made: we no longer run actual flink jobs; all tests directly interact with the sink to save resources every test uses a different table, preventing race conditions related to truncating the table the at-least-once sinks were modified to track pending updates => the pojo sink was modified to use a method that returns an actually useful `Future` => since the sink waits in `close()` for pending updates it can no longer occur that a test checks a condition prematurely, improving stability the initial connection is established across a time-span of 30 seconds, increasing the chance that cassandra has started before the tests are run You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 4177_cass_test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2866.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2866 commit e2cb8b9dc1131422dc97c0c08cbacd39cb747a44 Author: zentol <chesnay@apache.org> Date: 2016-11-23T15:59:22Z FLINK-5101 Track pending records in CassandraSinkBase commit 4c74937fac3e112f24a1af62d529153ce3aabb68 Author: zentol <chesnay@apache.org> Date: 2016-11-23T16:29:51Z FLINK-5101 Refactor CassandraConnectorITCase
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user LorenzBuehmann commented on the issue:

          https://github.com/apache/flink/pull/3072

          Hm, ok. One test failed, but that's strange since it has nothing to do with Scopt nor the Moreover, it failed only for one profile. According to https://issues.apache.org/jira/browse/FLINK-5101 it is a random problem with the Cassandra unit test. So, how to proceed?

          Show
          githubbot ASF GitHub Bot added a comment - Github user LorenzBuehmann commented on the issue: https://github.com/apache/flink/pull/3072 Hm, ok. One test failed, but that's strange since it has nothing to do with Scopt nor the Moreover, it failed only for one profile. According to https://issues.apache.org/jira/browse/FLINK-5101 it is a random problem with the Cassandra unit test. So, how to proceed?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2866#discussion_r97310954

          — Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java —
          @@ -371,70 +351,77 @@ public void testCassandraCommitter() throws Exception {

          @Test
          public void testCassandraTupleAtLeastOnceSink() throws Exception {

          • StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          • env.setParallelism(1);
            + CassandraTupleSink<Tuple3<String, Integer, Integer>> sink = new CassandraTupleSink<>(injectTableName(INSERT_DATA_QUERY), builder);
            +
            + sink.open(new Configuration());
          • DataStream<Tuple3<String, Integer, Integer>> source = env.fromCollection(collection);
          • source.addSink(new CassandraTupleSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder));
            + for (Tuple3<String, Integer, Integer> value : collection) { + sink.send(value); + }
          • env.execute();
            + sink.close();
          • ResultSet rs = session.execute(SELECT_DATA_QUERY);
            + synchronized (sink.updatesPending) {
              • End diff –

          Is this needed when `close()` already waits for pending requests?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2866#discussion_r97310954 — Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java — @@ -371,70 +351,77 @@ public void testCassandraCommitter() throws Exception { @Test public void testCassandraTupleAtLeastOnceSink() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); + CassandraTupleSink<Tuple3<String, Integer, Integer>> sink = new CassandraTupleSink<>(injectTableName(INSERT_DATA_QUERY), builder); + + sink.open(new Configuration()); DataStream<Tuple3<String, Integer, Integer>> source = env.fromCollection(collection); source.addSink(new CassandraTupleSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder)); + for (Tuple3<String, Integer, Integer> value : collection) { + sink.send(value); + } env.execute(); + sink.close(); ResultSet rs = session.execute(SELECT_DATA_QUERY); + synchronized (sink.updatesPending) { End diff – Is this needed when `close()` already waits for pending requests?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2866#discussion_r97310432

          — Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java —
          @@ -173,39 +157,49 @@ public static void startCassandra() throws IOException

          { cassandra.start(); }
          • try { - Thread.sleep(1000 * 10); - }

            catch (InterruptedException e) { //give cassandra a few seconds to start up
            + // give cassandra a few seconds to start up

              • End diff –

          Can we skip this and simply rely on the loop below that tries to connect?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2866#discussion_r97310432 — Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java — @@ -173,39 +157,49 @@ public static void startCassandra() throws IOException { cassandra.start(); } try { - Thread.sleep(1000 * 10); - } catch (InterruptedException e) { //give cassandra a few seconds to start up + // give cassandra a few seconds to start up End diff – Can we skip this and simply rely on the loop below that tries to connect?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2866#discussion_r97310987

          — Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java —
          @@ -371,70 +351,77 @@ public void testCassandraCommitter() throws Exception {

          @Test
          public void testCassandraTupleAtLeastOnceSink() throws Exception {

          • StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          • env.setParallelism(1);
            + CassandraTupleSink<Tuple3<String, Integer, Integer>> sink = new CassandraTupleSink<>(injectTableName(INSERT_DATA_QUERY), builder);
            +
            + sink.open(new Configuration());
          • DataStream<Tuple3<String, Integer, Integer>> source = env.fromCollection(collection);
          • source.addSink(new CassandraTupleSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder));
            + for (Tuple3<String, Integer, Integer> value : collection) { + sink.send(value); + }
          • env.execute();
            + sink.close();
          • ResultSet rs = session.execute(SELECT_DATA_QUERY);
            + synchronized (sink.updatesPending)
            Unknown macro: { + if (sink.updatesPending.get() != 0) { + sink.updatesPending.wait(); + } + }

            +
            + ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
            Assert.assertEquals(20, rs.all().size());
            }

          @Test
          public void testCassandraPojoAtLeastOnceSink() throws Exception {

          • StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          • env.setParallelism(1);
            -
          • DataStreamSource<Pojo> source = env
          • .addSource(new SourceFunction<Pojo>() {
            -
          • private boolean running = true;
          • private volatile int cnt = 0;
            -
          • @Override
          • public void run(SourceContext<Pojo> ctx) throws Exception {
          • while (running) {
          • ctx.collect(new Pojo(UUID.randomUUID().toString(), cnt, 0));
          • cnt++;
          • if (cnt == 20) { - cancel(); - }
          • }
          • }
            + session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test"));
          • @Override
          • public void cancel() { - running = false; - }
          • });
            + CassandraPojoSink<Pojo> sink = new CassandraPojoSink<>(Pojo.class, builder);
          • source.addSink(new CassandraPojoSink<>(Pojo.class, builder));
            + sink.open(new Configuration());
          • env.execute();
            + for (int x = 0; x < 20; x++) { + sink.send(new Pojo(UUID.randomUUID().toString(), x, 0)); + }

            +
            + sink.close();
            +
            + synchronized (sink.updatesPending) {

              • End diff –

          Same as above

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2866#discussion_r97310987 — Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java — @@ -371,70 +351,77 @@ public void testCassandraCommitter() throws Exception { @Test public void testCassandraTupleAtLeastOnceSink() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); + CassandraTupleSink<Tuple3<String, Integer, Integer>> sink = new CassandraTupleSink<>(injectTableName(INSERT_DATA_QUERY), builder); + + sink.open(new Configuration()); DataStream<Tuple3<String, Integer, Integer>> source = env.fromCollection(collection); source.addSink(new CassandraTupleSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder)); + for (Tuple3<String, Integer, Integer> value : collection) { + sink.send(value); + } env.execute(); + sink.close(); ResultSet rs = session.execute(SELECT_DATA_QUERY); + synchronized (sink.updatesPending) Unknown macro: { + if (sink.updatesPending.get() != 0) { + sink.updatesPending.wait(); + } + } + + ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY)); Assert.assertEquals(20, rs.all().size()); } @Test public void testCassandraPojoAtLeastOnceSink() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); - DataStreamSource<Pojo> source = env .addSource(new SourceFunction<Pojo>() { - private boolean running = true; private volatile int cnt = 0; - @Override public void run(SourceContext<Pojo> ctx) throws Exception { while (running) { ctx.collect(new Pojo(UUID.randomUUID().toString(), cnt, 0)); cnt++; if (cnt == 20) { - cancel(); - } } } + session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test")); @Override public void cancel() { - running = false; - } }); + CassandraPojoSink<Pojo> sink = new CassandraPojoSink<>(Pojo.class, builder); source.addSink(new CassandraPojoSink<>(Pojo.class, builder)); + sink.open(new Configuration()); env.execute(); + for (int x = 0; x < 20; x++) { + sink.send(new Pojo(UUID.randomUUID().toString(), x, 0)); + } + + sink.close(); + + synchronized (sink.updatesPending) { End diff – Same as above
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2866#discussion_r97310568

          — Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java —
          @@ -173,39 +157,49 @@ public static void startCassandra() throws IOException

          { cassandra.start(); }
          • try { - Thread.sleep(1000 * 10); - }

            catch (InterruptedException e) { //give cassandra a few seconds to start up
            + // give cassandra a few seconds to start up
            + long start = System.currentTimeMillis();
            + long deadline = start + 1000 * 10;
            + while (System.currentTimeMillis() < deadline)

            Unknown macro: { + try { + Thread.sleep(1000); + } catch (InterruptedException ignored) { + } }
          • cluster = builder.getCluster();
          • session = cluster.connect();
            + // start establishing a connection within 30 seconds
            + start = System.currentTimeMillis();
              • End diff –

          I would suggest to use `System.nanoTime()`, because `System.curremtTimeMillis` is not stable. Seems especially unstable on Travis.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2866#discussion_r97310568 — Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java — @@ -173,39 +157,49 @@ public static void startCassandra() throws IOException { cassandra.start(); } try { - Thread.sleep(1000 * 10); - } catch (InterruptedException e) { //give cassandra a few seconds to start up + // give cassandra a few seconds to start up + long start = System.currentTimeMillis(); + long deadline = start + 1000 * 10; + while (System.currentTimeMillis() < deadline) Unknown macro: { + try { + Thread.sleep(1000); + } catch (InterruptedException ignored) { + } } cluster = builder.getCluster(); session = cluster.connect(); + // start establishing a connection within 30 seconds + start = System.currentTimeMillis(); End diff – I would suggest to use `System.nanoTime()`, because `System.curremtTimeMillis` is not stable. Seems especially unstable on Travis.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2866#discussion_r97309628

          — Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java —
          @@ -69,17 +87,27 @@ public void onFailure(Throwable t) {

          @Override
          public void invoke(IN value) throws Exception {

          • if (exception != null) {
          • throw new IOException("invoke() failed", exception);
            + Throwable e = exception.get();
            + if (e != null) { + throw new IOException("Error while sending value.", e); }

            ListenableFuture<V> result = send(value);
            + updatesPending.incrementAndGet();
            Futures.addCallback(result, callback);
            }

          public abstract ListenableFuture<V> send(IN value);

          @Override
          public void close() {
          + while (updatesPending.get() > 0) {
          + synchronized (updatesPending) {
          + try

          { + updatesPending.wait(); + }

          catch (InterruptedException e)

          { + }

          — End diff –

          Probably better here even: Throw an exception, because closing is incomplete when interrupted (and cannot guarantee correctness)

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2866#discussion_r97309628 — Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java — @@ -69,17 +87,27 @@ public void onFailure(Throwable t) { @Override public void invoke(IN value) throws Exception { if (exception != null) { throw new IOException("invoke() failed", exception); + Throwable e = exception.get(); + if (e != null) { + throw new IOException("Error while sending value.", e); } ListenableFuture<V> result = send(value); + updatesPending.incrementAndGet(); Futures.addCallback(result, callback); } public abstract ListenableFuture<V> send(IN value); @Override public void close() { + while (updatesPending.get() > 0) { + synchronized (updatesPending) { + try { + updatesPending.wait(); + } catch (InterruptedException e) { + } — End diff – Probably better here even: Throw an exception, because closing is incomplete when interrupted (and cannot guarantee correctness)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2866#discussion_r97310489

          — Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java —
          @@ -173,39 +157,49 @@ public static void startCassandra() throws IOException

          { cassandra.start(); }
          • try { - Thread.sleep(1000 * 10); - }

            catch (InterruptedException e) { //give cassandra a few seconds to start up
            + // give cassandra a few seconds to start up
            + long start = System.currentTimeMillis();
            + long deadline = start + 1000 * 10;
            + while (System.currentTimeMillis() < deadline)

            Unknown macro: { + try { + Thread.sleep(1000); + } catch (InterruptedException ignored) { + } }
          • cluster = builder.getCluster();
          • session = cluster.connect();
            + // start establishing a connection within 30 seconds
            + start = System.currentTimeMillis();
            + deadline = start + 1000 * 30;
            + while (true) {
            + try { + cluster = builder.getCluster(); + session = cluster.connect(); + break; + }

            catch (Exception e) {
            + if (System.currentTimeMillis() > deadline)

            { + throw e; + }

            + try {
            + Thread.sleep(2000);

              • End diff –

          How about reducing this to, say 500ms, to make it react a bit sooner once the connection can be established.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2866#discussion_r97310489 — Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java — @@ -173,39 +157,49 @@ public static void startCassandra() throws IOException { cassandra.start(); } try { - Thread.sleep(1000 * 10); - } catch (InterruptedException e) { //give cassandra a few seconds to start up + // give cassandra a few seconds to start up + long start = System.currentTimeMillis(); + long deadline = start + 1000 * 10; + while (System.currentTimeMillis() < deadline) Unknown macro: { + try { + Thread.sleep(1000); + } catch (InterruptedException ignored) { + } } cluster = builder.getCluster(); session = cluster.connect(); + // start establishing a connection within 30 seconds + start = System.currentTimeMillis(); + deadline = start + 1000 * 30; + while (true) { + try { + cluster = builder.getCluster(); + session = cluster.connect(); + break; + } catch (Exception e) { + if (System.currentTimeMillis() > deadline) { + throw e; + } + try { + Thread.sleep(2000); End diff – How about reducing this to, say 500ms, to make it react a bit sooner once the connection can be established.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2866#discussion_r97309082

          — Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java —
          @@ -69,17 +87,27 @@ public void onFailure(Throwable t) {

          @Override
          public void invoke(IN value) throws Exception {

          • if (exception != null) {
          • throw new IOException("invoke() failed", exception);
            + Throwable e = exception.get();
            + if (e != null) { + throw new IOException("Error while sending value.", e); }

            ListenableFuture<V> result = send(value);
            + updatesPending.incrementAndGet();
            Futures.addCallback(result, callback);
            }

          public abstract ListenableFuture<V> send(IN value);

          @Override
          public void close() {
          + while (updatesPending.get() > 0) {
          + synchronized (updatesPending) {
          + try

          { + updatesPending.wait(); + }

          catch (InterruptedException e)

          { + }

          — End diff –

          Good practice is to set the interruption flag back: `Thread.currentThread().interrupt();`

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2866#discussion_r97309082 — Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java — @@ -69,17 +87,27 @@ public void onFailure(Throwable t) { @Override public void invoke(IN value) throws Exception { if (exception != null) { throw new IOException("invoke() failed", exception); + Throwable e = exception.get(); + if (e != null) { + throw new IOException("Error while sending value.", e); } ListenableFuture<V> result = send(value); + updatesPending.incrementAndGet(); Futures.addCallback(result, callback); } public abstract ListenableFuture<V> send(IN value); @Override public void close() { + while (updatesPending.get() > 0) { + synchronized (updatesPending) { + try { + updatesPending.wait(); + } catch (InterruptedException e) { + } — End diff – Good practice is to set the interruption flag back: `Thread.currentThread().interrupt();`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2866#discussion_r97309712

          — Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java —
          @@ -94,5 +122,9 @@ public void close() {
          } catch (Exception e)

          { LOG.error("Error while closing cluster.", e); }

          + Throwable e = exception.get();
          + if (e != null) {
          + LOG.error("Error while sending value.", e);
          — End diff –

          This needs to be rethrown, otherwise close() may complete without an exception, but the result may be incomplete.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2866#discussion_r97309712 — Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java — @@ -94,5 +122,9 @@ public void close() { } catch (Exception e) { LOG.error("Error while closing cluster.", e); } + Throwable e = exception.get(); + if (e != null) { + LOG.error("Error while sending value.", e); — End diff – This needs to be rethrown, otherwise close() may complete without an exception, but the result may be incomplete.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2866

          Good set of changes. Some comments for small adjustments, otherwise +1 for the approch

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2866 Good set of changes. Some comments for small adjustments, otherwise +1 for the approch
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/2866

          @StephanEwen I've addressed your comments and Travis is passing

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2866 @StephanEwen I've addressed your comments and Travis is passing
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2866#discussion_r98026925

          — Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java —
          @@ -40,26 +42,42 @@
          protected transient Cluster cluster;
          protected transient Session session;

          • protected transient Throwable exception = null;
            + protected transient AtomicReference<Throwable> exception;
              • End diff –

          Does `exception` have to be an `AtomicReference`, or does a volatile variable suffice?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2866#discussion_r98026925 — Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java — @@ -40,26 +42,42 @@ protected transient Cluster cluster; protected transient Session session; protected transient Throwable exception = null; + protected transient AtomicReference<Throwable> exception; End diff – Does `exception` have to be an `AtomicReference`, or does a volatile variable suffice?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2866#discussion_r98018710

          — Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java —
          @@ -173,39 +157,39 @@ public static void startCassandra() throws IOException

          { cassandra.start(); }
          • try { - Thread.sleep(1000 * 10); - }

            catch (InterruptedException e) { //give cassandra a few seconds to start up
            + // start establishing a connection within 30 seconds
            + long start = System.nanoTime();
            + long deadline = start + 1000 * 30;
            + while (true) {
            + try

            { + cluster = builder.getCluster(); + session = cluster.connect(); + break; + }

            catch (Exception e) {
            + if (System.currentTimeMillis() > deadline) {

              • End diff –

          This should also be `System.nanoTime()`

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2866#discussion_r98018710 — Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java — @@ -173,39 +157,39 @@ public static void startCassandra() throws IOException { cassandra.start(); } try { - Thread.sleep(1000 * 10); - } catch (InterruptedException e) { //give cassandra a few seconds to start up + // start establishing a connection within 30 seconds + long start = System.nanoTime(); + long deadline = start + 1000 * 30; + while (true) { + try { + cluster = builder.getCluster(); + session = cluster.connect(); + break; + } catch (Exception e) { + if (System.currentTimeMillis() > deadline) { End diff – This should also be `System.nanoTime()`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2866#discussion_r98018596

          — Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java —
          @@ -173,39 +157,39 @@ public static void startCassandra() throws IOException

          { cassandra.start(); }
          • try { - Thread.sleep(1000 * 10); - }

            catch (InterruptedException e) { //give cassandra a few seconds to start up
            + // start establishing a connection within 30 seconds
            + long start = System.nanoTime();
            + long deadline = start + 1000 * 30;

              • End diff –

          This needs to be `long deadline = start + 30_000_000_000` (because its nanos)

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2866#discussion_r98018596 — Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java — @@ -173,39 +157,39 @@ public static void startCassandra() throws IOException { cassandra.start(); } try { - Thread.sleep(1000 * 10); - } catch (InterruptedException e) { //give cassandra a few seconds to start up + // start establishing a connection within 30 seconds + long start = System.nanoTime(); + long deadline = start + 1000 * 30; End diff – This needs to be `long deadline = start + 30_000_000_000` (because its nanos)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2866

          Looks much better, one mixup is still in the tests.
          The comment about the Atomic Reference is a minor improvement comment.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2866 Looks much better, one mixup is still in the tests. The comment about the Atomic Reference is a minor improvement comment.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2866#discussion_r99562651

          — Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java —
          @@ -40,26 +42,42 @@
          protected transient Cluster cluster;
          protected transient Session session;

          • protected transient Throwable exception = null;
            + protected transient AtomicReference<Throwable> exception;
              • End diff –

          it can be volatile.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2866#discussion_r99562651 — Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java — @@ -40,26 +42,42 @@ protected transient Cluster cluster; protected transient Session session; protected transient Throwable exception = null; + protected transient AtomicReference<Throwable> exception; End diff – it can be volatile.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/2866

          @StephanEwen I've addressed your comments and rebased the branch.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2866 @StephanEwen I've addressed your comments and rebased the branch.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2866

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2866
          Hide
          Zentol Chesnay Schepler added a comment -

          1.3: a85b1881d184c02075441f57f3364ec80b2d4f4d & 5d6b5a6773a795a3266b6298ae26d4f158ce18c4
          1.4: 948bb9f674a9c4cf95491f3d9a92b38eed6b64e8 & 0be04b454f58d5575bd6fab5755aa1264f363b91

          Show
          Zentol Chesnay Schepler added a comment - 1.3: a85b1881d184c02075441f57f3364ec80b2d4f4d & 5d6b5a6773a795a3266b6298ae26d4f158ce18c4 1.4: 948bb9f674a9c4cf95491f3d9a92b38eed6b64e8 & 0be04b454f58d5575bd6fab5755aa1264f363b91

            People

            • Assignee:
              Zentol Chesnay Schepler
              Reporter:
              srichter Stefan Richter
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development