Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5602

Migration with RocksDB job led to NPE for next checkpoint

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.2.0, 1.3.0
    • Component/s: None
    • Labels:
      None

      Description

      When migrating a job with RocksDB I got the following Exception when the next checkpoint was triggered. This only happened once and I could not reproduce it ever since.

      Stefan Richter Maybe we can look over the code and check what could have failed here? I unfortunately don't have more available of the stack trace. I don't think that this will be very helpful will it?

      	at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
      	at org.apache.flink.runtime.state.KeyedBackendSerializationProxy$StateMetaInfo.<init>(KeyedBackendSerializationProxy.java:126)
      	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBSnapshotOperation.writeKVStateMetaData(RocksDBKeyedStateBackend.java:471)
      	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBSnapshotOperation.writeDBSnapshot(RocksDBKeyedStateBackend.java:382)
      	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.performOperation(RocksDBKeyedStateBackend.java:280)
      	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.performOperation(RocksDBKeyedStateBackend.java:262)
      	at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:37)
      	... 6 more
      

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StefanRRichter opened a pull request:

          https://github.com/apache/flink/pull/3200

          FLINK-5602 migration namespace serializer

          This PR fixes FLINK-5602. We introduce an artificial namespace serializer instead of `null` so that checkpoints can run even before the user states are registered.

          This sits on top of PR #3198 .

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StefanRRichter/flink FLINK-5602-MigrationNamespaceSerializer

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3200.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3200


          commit a30466009e0431aa884f7d358ecb2f386d4feb01
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-01-19T17:08:02Z

          FLINK-5626 Improved resource deallocation in RocksDBKeyedStateBackend

          commit 39fee85f4f05598949929ecb577bd7fd31dd0d10
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-01-24T13:27:18Z

          review comments

          commit 49d374f02aec698e672cc98db81049f51e1ba5fc
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-01-24T13:35:42Z

          FLINK-5602 Introduce artifical namespace serializer for migration


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/3200 FLINK-5602 migration namespace serializer This PR fixes FLINK-5602 . We introduce an artificial namespace serializer instead of `null` so that checkpoints can run even before the user states are registered. This sits on top of PR #3198 . You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink FLINK-5602 -MigrationNamespaceSerializer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3200.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3200 commit a30466009e0431aa884f7d358ecb2f386d4feb01 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-01-19T17:08:02Z FLINK-5626 Improved resource deallocation in RocksDBKeyedStateBackend commit 39fee85f4f05598949929ecb577bd7fd31dd0d10 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-01-24T13:27:18Z review comments commit 49d374f02aec698e672cc98db81049f51e1ba5fc Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-01-24T13:35:42Z FLINK-5602 Introduce artifical namespace serializer for migration
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3200

          cc @uce @aljoscha

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3200 cc @uce @aljoscha
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3200#discussion_r97591922

          — Diff: flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java —
          @@ -0,0 +1,108 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.migration;
          +
          +import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.core.memory.DataInputView;
          +import org.apache.flink.core.memory.DataOutputView;
          +
          +import java.io.IOException;
          +import java.io.Serializable;
          +
          +public class MigrationNamespaceSerializerProxy extends TypeSerializer<Serializable> {
          +
          + public static MigrationNamespaceSerializerProxy INSTANCE = new MigrationNamespaceSerializerProxy();
          +
          + private static final long serialVersionUID = -707800010807094491L;
          +
          + private MigrationNamespaceSerializerProxy()

          { + }

          +
          + @Override
          + public boolean isImmutableType()

          { + return false; + }

          +
          + @Override
          + public TypeSerializer<Serializable> duplicate() {
          + return new MigrationNamespaceSerializerProxy();
          — End diff –

          Return `INSTANCE`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3200#discussion_r97591922 — Diff: flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java — @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.migration; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.io.Serializable; + +public class MigrationNamespaceSerializerProxy extends TypeSerializer<Serializable> { + + public static MigrationNamespaceSerializerProxy INSTANCE = new MigrationNamespaceSerializerProxy(); + + private static final long serialVersionUID = -707800010807094491L; + + private MigrationNamespaceSerializerProxy() { + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer<Serializable> duplicate() { + return new MigrationNamespaceSerializerProxy(); — End diff – Return `INSTANCE`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3200#discussion_r97593753

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java —
          @@ -92,10 +94,9 @@ public boolean isCompatibleWith(RegisteredBackendStateMetaInfo<?, ?> other)

          { return false; }
          • return ((namespaceSerializer == null && other.namespaceSerializer == null)
          • namespaceSerializer == null other.namespaceSerializer == null
          • namespaceSerializer.isCompatibleWith(other.namespaceSerializer))
          • && stateSerializer.isCompatibleWith(other.stateSerializer);
            + return (stateSerializer.isCompatibleWith(other.stateSerializer)) &&
            + (namespaceSerializer.isCompatibleWith(other.namespaceSerializer)
            + || other.namespaceSerializer instanceof MigrationNamespaceSerializerProxy);
              • End diff –

          I would add a short comment that it's important to check for the `MigrationNamespaceSerializerProxy`. It could even warrant a test. I can see how someone removes this line in the future.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3200#discussion_r97593753 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java — @@ -92,10 +94,9 @@ public boolean isCompatibleWith(RegisteredBackendStateMetaInfo<?, ?> other) { return false; } return ((namespaceSerializer == null && other.namespaceSerializer == null) namespaceSerializer == null other.namespaceSerializer == null namespaceSerializer.isCompatibleWith(other.namespaceSerializer)) && stateSerializer.isCompatibleWith(other.stateSerializer); + return (stateSerializer.isCompatibleWith(other.stateSerializer)) && + (namespaceSerializer.isCompatibleWith(other.namespaceSerializer) + || other.namespaceSerializer instanceof MigrationNamespaceSerializerProxy); End diff – I would add a short comment that it's important to check for the `MigrationNamespaceSerializerProxy`. It could even warrant a test. I can see how someone removes this line in the future.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3200#discussion_r97591827

          — Diff: flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java —
          @@ -0,0 +1,108 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.migration;
          +
          +import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.core.memory.DataInputView;
          +import org.apache.flink.core.memory.DataOutputView;
          +
          +import java.io.IOException;
          +import java.io.Serializable;
          +
          +public class MigrationNamespaceSerializerProxy extends TypeSerializer<Serializable> {
          — End diff –

          I would add short class level JavaDoc.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3200#discussion_r97591827 — Diff: flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java — @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.migration; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.io.Serializable; + +public class MigrationNamespaceSerializerProxy extends TypeSerializer<Serializable> { — End diff – I would add short class level JavaDoc.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3200#discussion_r97591863

          — Diff: flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java —
          @@ -0,0 +1,108 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.migration;
          +
          +import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.core.memory.DataInputView;
          +import org.apache.flink.core.memory.DataOutputView;
          +
          +import java.io.IOException;
          +import java.io.Serializable;
          +
          +public class MigrationNamespaceSerializerProxy extends TypeSerializer<Serializable> {
          +
          + public static MigrationNamespaceSerializerProxy INSTANCE = new MigrationNamespaceSerializerProxy();
          — End diff –

          Make final?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3200#discussion_r97591863 — Diff: flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java — @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.migration; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.io.Serializable; + +public class MigrationNamespaceSerializerProxy extends TypeSerializer<Serializable> { + + public static MigrationNamespaceSerializerProxy INSTANCE = new MigrationNamespaceSerializerProxy(); — End diff – Make final?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3200

          Thanks for the review @uce ! I changed my PR according to your suggestions (except for only throwing an unsupported op exception on the duplicate function).

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3200 Thanks for the review @uce ! I changed my PR according to your suggestions (except for only throwing an unsupported op exception on the duplicate function).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/3200

          OK thanks! Makes sense with the UnsupportedOpException. I thought that duplicate is called somewhere maybe. I'll merge this as soon as Travis gives the green light.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/3200 OK thanks! Makes sense with the UnsupportedOpException. I thought that duplicate is called somewhere maybe. I'll merge this as soon as Travis gives the green light.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/3200

          Verified that this fixes the issue. Thanks! Merging this with together with #3198.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/3200 Verified that this fixes the issue. Thanks! Merging this with together with #3198.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3200

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3200
          Hide
          uce Ufuk Celebi added a comment -

          Fixed in 80c9f80 (release-1.2), b9ea4cf (master).

          Show
          uce Ufuk Celebi added a comment - Fixed in 80c9f80 (release-1.2), b9ea4cf (master).

            People

            • Assignee:
              srichter Stefan Richter
              Reporter:
              uce Ufuk Celebi
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development