Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5626

Improve resource release in RocksDBKeyedStateBackend

    Details

      Description

      I found some minor cases in which some resources in `RocksDBKeyedStateBackend` might not get properly released in case of an exception, because their ownership is not completely transferred to the closing entity.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3198

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3198
          Hide
          uce Ufuk Celebi added a comment -

          Fixed in 5a435fc (release-1.2), cd9115f (master).

          Show
          uce Ufuk Celebi added a comment - Fixed in 5a435fc (release-1.2), cd9115f (master).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3198

          +1 to these changes, very good!

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3198 +1 to these changes, very good!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3198

          Alright, then I am changing this first.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3198 Alright, then I am changing this first.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3198

          Renaming is not part of this PR/Jira Issue, though, so with the changes to ownership of the RocksDB options I would like to merge this quickly. We can then open another issue for the renaming, if it can happen at all.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3198 Renaming is not part of this PR/Jira Issue, though, so with the changes to ownership of the RocksDB options I would like to merge this quickly. We can then open another issue for the renaming, if it can happen at all.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3198

          I would call this `AbstractStateBackendFactory`, because that is what it is...an abstract factory for state backends.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3198 I would call this `AbstractStateBackendFactory`, because that is what it is...an abstract factory for state backends.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3198

          The is already `StateBackendFactory`. 😉 Both are used in the "public" APIs: `StateBackendFactory` is used for setting a backend in the config while `AbstractStateBackend` appears in a public method on `StreamExecutionEnvironment`.

          Bringing in @rmetzger, the API stability czar. 😃

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3198 The is already `StateBackendFactory`. 😉 Both are used in the "public" APIs: `StateBackendFactory` is used for setting a backend in the config while `AbstractStateBackend` appears in a public method on `StreamExecutionEnvironment`. Bringing in @rmetzger, the API stability czar. 😃
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3198

          If the caching is optional, then I would also prefer to keep it out of the factory. Different question: if there is no other reason against it, wouldn't that also be an opportunity to rename this to abstract factory?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3198 If the caching is optional, then I would also prefer to keep it out of the factory. Different question: if there is no other reason against it, wouldn't that also be an opportunity to rename this to abstract factory?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3198

          Aljoscha's proposal sounds reasonable to me

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3198 Aljoscha's proposal sounds reasonable to me
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3198

          I think we can keep from having to close the `AbstractStateBackend` by not caching the `dbOptions` and `columnOptions` and instead giving new ones to the `RocksDBKeyedStateBackend`. The keyed backend is not meant to be stateless and already has a lifecycle and will be disposed. It would simply take ownership of these options as well which would make responsibilities more clear.

          Sorry for being annoying about this but I would really like to keep the "state backend factory" as a factory and not add lifecycle stuff to it as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3198 I think we can keep from having to close the `AbstractStateBackend` by not caching the `dbOptions` and `columnOptions` and instead giving new ones to the `RocksDBKeyedStateBackend`. The keyed backend is not meant to be stateless and already has a lifecycle and will be disposed. It would simply take ownership of these options as well which would make responsibilities more clear. Sorry for being annoying about this but I would really like to keep the "state backend factory" as a factory and not add lifecycle stuff to it as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3198#discussion_r97558004

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java —
          @@ -66,4 +66,9 @@ public OperatorStateBackend createOperatorStateBackend(
          String operatorIdentifier) throws Exception

          { return new DefaultOperatorStateBackend(env.getUserClassLoader()); }

          +
          + @Override
          + public void close() throws Exception

          { + + }

          — End diff –

          Releasing some native resources in `RocksDBStateBackend`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3198#discussion_r97558004 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java — @@ -66,4 +66,9 @@ public OperatorStateBackend createOperatorStateBackend( String operatorIdentifier) throws Exception { return new DefaultOperatorStateBackend(env.getUserClassLoader()); } + + @Override + public void close() throws Exception { + + } — End diff – Releasing some native resources in `RocksDBStateBackend`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3198#discussion_r97554901

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java —
          @@ -66,4 +66,9 @@ public OperatorStateBackend createOperatorStateBackend(
          String operatorIdentifier) throws Exception

          { return new DefaultOperatorStateBackend(env.getUserClassLoader()); }

          +
          + @Override
          + public void close() throws Exception

          { + + }

          — End diff –

          The reason for not renaming it is because it's used in the (admittedly `@PublicEvolcing`) method `StreamExecutionEnvironment.setStateBackend()`. If we didn't have that I would also prefer renaming this to something like `StateBackendFactory`.

          The fact that it is in fact a factory (that should be stateless) makes me a bit uneasy about adding a `close()` method to the `AbstractStateBackend`. Why is it needed, in fact?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3198#discussion_r97554901 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java — @@ -66,4 +66,9 @@ public OperatorStateBackend createOperatorStateBackend( String operatorIdentifier) throws Exception { return new DefaultOperatorStateBackend(env.getUserClassLoader()); } + + @Override + public void close() throws Exception { + + } — End diff – The reason for not renaming it is because it's used in the (admittedly `@PublicEvolcing`) method `StreamExecutionEnvironment.setStateBackend()`. If we didn't have that I would also prefer renaming this to something like `StateBackendFactory`. The fact that it is in fact a factory (that should be stateless) makes me a bit uneasy about adding a `close()` method to the `AbstractStateBackend`. Why is it needed, in fact?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3198#discussion_r97553270

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java —
          @@ -773,12 +783,13 @@ private void restoreKVStateData() throws IOException, RocksDBException {

          • we don't restore the individual k/v states, just the global RocksDB data base and the
          • list of column families. When a k/v state is first requested we check here whether we
          • already have a column family for that and return it or create a new one if it doesn't exist.
          • *
            + * <p>
              • End diff –

          superfluous `<p>`

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3198#discussion_r97553270 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java — @@ -773,12 +783,13 @@ private void restoreKVStateData() throws IOException, RocksDBException { we don't restore the individual k/v states, just the global RocksDB data base and the list of column families. When a k/v state is first requested we check here whether we already have a column family for that and return it or create a new one if it doesn't exist. * + * <p> End diff – superfluous `<p>`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3198

          I changed my PR as suggested by @uce and @tillrohrmann.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3198 I changed my PR as suggested by @uce and @tillrohrmann.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3198#discussion_r97540391

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java —
          @@ -245,14 +235,15 @@ public int getKeyGroupPrefixBytes() {
          // hold the db lock while operation on the db to guard us against async db disposal
          synchronized (asyncSnapshotLock) {

          • if (kvStateInformation.isEmpty()) {
          • LOG.info("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp +
          • " . Returning null.");
            + if (db != null) { - return new DoneFuture<>(null); - }

            + if (kvStateInformation.isEmpty()) {
            + LOG.info("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp +

              • End diff –

          +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3198#discussion_r97540391 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java — @@ -245,14 +235,15 @@ public int getKeyGroupPrefixBytes() { // hold the db lock while operation on the db to guard us against async db disposal synchronized (asyncSnapshotLock) { if (kvStateInformation.isEmpty()) { LOG.info("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp + " . Returning null."); + if (db != null) { - return new DoneFuture<>(null); - } + if (kvStateInformation.isEmpty()) { + LOG.info("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp + End diff – +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3198#discussion_r97540340

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java —
          @@ -66,4 +66,9 @@ public OperatorStateBackend createOperatorStateBackend(
          String operatorIdentifier) throws Exception

          { return new DefaultOperatorStateBackend(env.getUserClassLoader()); }

          +
          + @Override
          + public void close() throws Exception

          { + + }

          — End diff –

          `AbstractStateBackend` has never been part of the public API. Given that, we won't break at least the API stability promise.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3198#discussion_r97540340 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java — @@ -66,4 +66,9 @@ public OperatorStateBackend createOperatorStateBackend( String operatorIdentifier) throws Exception { return new DefaultOperatorStateBackend(env.getUserClassLoader()); } + + @Override + public void close() throws Exception { + + } — End diff – `AbstractStateBackend` has never been part of the public API. Given that, we won't break at least the API stability promise.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3198#discussion_r97525648

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java —
          @@ -66,4 +66,9 @@ public OperatorStateBackend createOperatorStateBackend(
          String operatorIdentifier) throws Exception

          { return new DefaultOperatorStateBackend(env.getUserClassLoader()); }

          +
          + @Override
          + public void close() throws Exception

          { + + }

          — End diff –

          yes

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3198#discussion_r97525648 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java — @@ -66,4 +66,9 @@ public OperatorStateBackend createOperatorStateBackend( String operatorIdentifier) throws Exception { return new DefaultOperatorStateBackend(env.getUserClassLoader()); } + + @Override + public void close() throws Exception { + + } — End diff – yes
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3198#discussion_r97524989

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java —
          @@ -66,4 +66,9 @@ public OperatorStateBackend createOperatorStateBackend(
          String operatorIdentifier) throws Exception

          { return new DefaultOperatorStateBackend(env.getUserClassLoader()); }

          +
          + @Override
          + public void close() throws Exception

          { + + }

          — End diff –

          Do you mean for users implementing their own state backends?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3198#discussion_r97524989 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java — @@ -66,4 +66,9 @@ public OperatorStateBackend createOperatorStateBackend( String operatorIdentifier) throws Exception { return new DefaultOperatorStateBackend(env.getUserClassLoader()); } + + @Override + public void close() throws Exception { + + } — End diff – Do you mean for users implementing their own state backends?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3198#discussion_r97524568

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java —
          @@ -245,14 +235,15 @@ public int getKeyGroupPrefixBytes() {
          // hold the db lock while operation on the db to guard us against async db disposal
          synchronized (asyncSnapshotLock) {

          • if (kvStateInformation.isEmpty()) {
          • LOG.info("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp +
          • " . Returning null.");
            + if (db != null) { - return new DoneFuture<>(null); - }

            + if (kvStateInformation.isEmpty()) {
            + LOG.info("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp +

              • End diff –

          Wondering whether we should log this on debug. As a user, I don't know how useful this is for me when I see this message in my logs.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3198#discussion_r97524568 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java — @@ -245,14 +235,15 @@ public int getKeyGroupPrefixBytes() { // hold the db lock while operation on the db to guard us against async db disposal synchronized (asyncSnapshotLock) { if (kvStateInformation.isEmpty()) { LOG.info("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp + " . Returning null."); + if (db != null) { - return new DoneFuture<>(null); - } + if (kvStateInformation.isEmpty()) { + LOG.info("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp + End diff – Wondering whether we should log this on debug. As a user, I don't know how useful this is for me when I see this message in my logs.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3198#discussion_r97523685

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java —
          @@ -66,4 +66,9 @@ public OperatorStateBackend createOperatorStateBackend(
          String operatorIdentifier) throws Exception

          { return new DefaultOperatorStateBackend(env.getUserClassLoader()); }

          +
          + @Override
          + public void close() throws Exception

          { + + }

          — End diff –

          I absolutely agree, but I am afraid that this might be a breaking API change. Maybe @aljoscha can give a second opinion about this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3198#discussion_r97523685 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java — @@ -66,4 +66,9 @@ public OperatorStateBackend createOperatorStateBackend( String operatorIdentifier) throws Exception { return new DefaultOperatorStateBackend(env.getUserClassLoader()); } + + @Override + public void close() throws Exception { + + } — End diff – I absolutely agree, but I am afraid that this might be a breaking API change. Maybe @aljoscha can give a second opinion about this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3198#discussion_r97520420

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java —
          @@ -148,16 +140,14 @@ public RocksDBKeyedStateBackend(
          ) throws Exception {

          super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange);

          • this.operatorIdentifier = operatorIdentifier;
          • this.jobId = jobId;
            this.columnOptions = columnFamilyOptions;

          this.instanceBasePath = instanceBasePath;
          this.instanceRocksDBPath = new File(instanceBasePath, "db");
          — End diff –

          Maybe we could add here `Preconditions` checks?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3198#discussion_r97520420 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java — @@ -148,16 +140,14 @@ public RocksDBKeyedStateBackend( ) throws Exception { super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange); this.operatorIdentifier = operatorIdentifier; this.jobId = jobId; this.columnOptions = columnFamilyOptions; this.instanceBasePath = instanceBasePath; this.instanceRocksDBPath = new File(instanceBasePath, "db"); — End diff – Maybe we could add here `Preconditions` checks?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3198#discussion_r97520822

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java —
          @@ -148,16 +140,14 @@ public RocksDBKeyedStateBackend(
          ) throws Exception {
          — End diff –

          I think this can now be narrowed down to `IOException`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3198#discussion_r97520822 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java — @@ -148,16 +140,14 @@ public RocksDBKeyedStateBackend( ) throws Exception { — End diff – I think this can now be narrowed down to `IOException`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3198#discussion_r97522787

          — Diff: flink-core/src/main/java/org/apache/flink/util/IOUtils.java —
          @@ -216,9 +215,52 @@ public static void closeSocket(final Socket sock) {
          }

          /**
          + * Closes all

          {@link AutoCloseable}

          objects in the parameter, suppressing exceptions. Exception will be emitted
          + * after calling close() on every object.
          + *
          + * @param closeables iterable with closeables to close.
          + * @throws Exception collected exceptions that occurred during closing
          + */
          + public static void closeAll(Iterable<? extends AutoCloseable> closeables) throws Exception {
          + if (null != closeables) {
          +
          + Exception collectedExceptions = null;
          +
          + for (AutoCloseable closeable : closeables) {
          + try {
          + if (null != closeable)

          { + closeable.close(); + }

          + } catch (Exception e) {
          + if (null == collectedExceptions)

          { + collectedExceptions = e; + }

          else

          { + collectedExceptions.addSuppressed(e); + }

          — End diff –

          Here you could use `collectedExceptions = ExceptionUtils.firstOrSuppressed(e, collectedExceptions);`

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3198#discussion_r97522787 — Diff: flink-core/src/main/java/org/apache/flink/util/IOUtils.java — @@ -216,9 +215,52 @@ public static void closeSocket(final Socket sock) { } /** + * Closes all {@link AutoCloseable} objects in the parameter, suppressing exceptions. Exception will be emitted + * after calling close() on every object. + * + * @param closeables iterable with closeables to close. + * @throws Exception collected exceptions that occurred during closing + */ + public static void closeAll(Iterable<? extends AutoCloseable> closeables) throws Exception { + if (null != closeables) { + + Exception collectedExceptions = null; + + for (AutoCloseable closeable : closeables) { + try { + if (null != closeable) { + closeable.close(); + } + } catch (Exception e) { + if (null == collectedExceptions) { + collectedExceptions = e; + } else { + collectedExceptions.addSuppressed(e); + } — End diff – Here you could use `collectedExceptions = ExceptionUtils.firstOrSuppressed(e, collectedExceptions);`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3198#discussion_r97523023

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java —
          @@ -66,4 +66,9 @@ public OperatorStateBackend createOperatorStateBackend(
          String operatorIdentifier) throws Exception

          { return new DefaultOperatorStateBackend(env.getUserClassLoader()); }

          +
          + @Override
          + public void close() throws Exception

          { + + }

          — End diff –

          Does it make sense to not implement the `close` method here? Then all subclasses would be forced to think about what happens when closing the state backend?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3198#discussion_r97523023 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java — @@ -66,4 +66,9 @@ public OperatorStateBackend createOperatorStateBackend( String operatorIdentifier) throws Exception { return new DefaultOperatorStateBackend(env.getUserClassLoader()); } + + @Override + public void close() throws Exception { + + } — End diff – Does it make sense to not implement the `close` method here? Then all subclasses would be forced to think about what happens when closing the state backend?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3198#discussion_r97521254

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java —
          @@ -201,12 +191,12 @@ public void dispose() {
          for (Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> column :
          kvStateInformation.values())

          { - column.f0.close(); + IOUtils.closeQuietly(column.f0); }

          kvStateInformation.clear();

          • db.close();
            + IOUtils.closeQuietly(db);
            db = null;
            }
            }
              • End diff –

          Not visible here, but just the next catch clause swallows its caught exception.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3198#discussion_r97521254 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java — @@ -201,12 +191,12 @@ public void dispose() { for (Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> column : kvStateInformation.values()) { - column.f0.close(); + IOUtils.closeQuietly(column.f0); } kvStateInformation.clear(); db.close(); + IOUtils.closeQuietly(db); db = null; } } End diff – Not visible here, but just the next catch clause swallows its caught exception.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3198#discussion_r97521501

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java —
          @@ -345,7 +337,7 @@ public void done(boolean canceled) {
          /**

          • 1) Create a snapshot object from RocksDB.
            *
          • * @param checkpointId id of the checkpoint for which we take the snapshot
            + * @param checkpointId id of the checkpoint for which we take the snapshot
              • End diff –

          I think we don't do these kind of alignments because they increase the maintenance overhead when renaming parameters.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3198#discussion_r97521501 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java — @@ -345,7 +337,7 @@ public void done(boolean canceled) { /** 1) Create a snapshot object from RocksDB. * * @param checkpointId id of the checkpoint for which we take the snapshot + * @param checkpointId id of the checkpoint for which we take the snapshot End diff – I think we don't do these kind of alignments because they increase the maintenance overhead when renaming parameters.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3198

          cc @tillrohrmann

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3198 cc @tillrohrmann
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StefanRRichter opened a pull request:

          https://github.com/apache/flink/pull/3198

          FLINK-5626 Improved resource deallocation in RocksDBKeyedStateBackend

          This PR improves some cases of resource deallocation for RocksDB under exceptions. On top of that, I gave some exceptions a more appropriate type.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StefanRRichter/flink RocksHardening

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3198.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3198


          commit a30466009e0431aa884f7d358ecb2f386d4feb01
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-01-19T17:08:02Z

          FLINK-5626 Improved resource deallocation in RocksDBKeyedStateBackend


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/3198 FLINK-5626 Improved resource deallocation in RocksDBKeyedStateBackend This PR improves some cases of resource deallocation for RocksDB under exceptions. On top of that, I gave some exceptions a more appropriate type. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink RocksHardening Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3198.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3198 commit a30466009e0431aa884f7d358ecb2f386d4feb01 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-01-19T17:08:02Z FLINK-5626 Improved resource deallocation in RocksDBKeyedStateBackend

            People

            • Assignee:
              srichter Stefan Richter
              Reporter:
              srichter Stefan Richter
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development