Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-13059

Cassandra Connector leaks Semaphore on Exception; hangs on close

    XMLWordPrintableJSON

Details

    Description

      In CassandraSinkBase the following code is present (comments are mine):

       

      public void invoke(IN value) throws Exception {
         checkAsyncErrors();
         tryAcquire();
         //Semaphore held here
      
         final ListenableFuture<V> result = send(value);
      
         Futures.addCallback(result, callback); //Callback releases semaphore
      }

      Any Exception happening inside send(value) will result in the semaphore not being released. Such exceptions are possible, e.g.

      com.datastax.driver.core.exceptions.InvalidQueryException: Some partition key parts are missing: hest
      at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:50)
      at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
      at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:98)
      at com.datastax.driver.mapping.Mapper.getPreparedQuery(Mapper.java:118)
      at com.datastax.driver.mapping.Mapper.saveQuery(Mapper.java:201)
      at com.datastax.driver.mapping.Mapper.saveQuery(Mapper.java:163)
      at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.send(CassandraPojoSink.java:128)
      at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.invoke(CassandraSinkBase.java:131)
      at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
      

      The result of the semaphore not being released will be that when the exception bubbles out and causes the job to close, CassandraSinkBase.flush() will eventually be called. Flush will be deadlocked trying to acquire config.getMaxConcurrentRequests() from the semaphore, which has 1 less than that available.

      The Flink job will thus be half-way closed, but marked as "RUNNING". Checkpointing will however fail with

      INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 201325 of job XXX. org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException: Task Source: XXX (3/4) was not running 

       

      Attachments

        Issue Links

          Activity

            People

              mchro Mads Chr. Olesen
              mchro Mads Chr. Olesen
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m