Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.2.0
    • Fix Version/s: 1.2.0, 1.3.0
    • Component/s: DataStream API
    • Labels:
      None

      Description

      We could allow users to provided (alternative) hashes for operators in a StreamGraph. This can make migration between Flink versions easier, in case the automatically produced hashes between versions are incompatible. For example, users could just copy the old hashes from the web ui to their job.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StefanRRichter opened a pull request:

          https://github.com/apache/flink/pull/3117

          FLINK-5480 Introduce user-provided hash for JobVertexes

          This PR allows users to provided (alternative) hashes for operators in a StreamGraph. This can make migration between Flink versions easier, in case the automatically produced hashes between versions are incompatible. For example, users could just copy the old hashes from the web ui to their job.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StefanRRichter/flink UserProvidedHash

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3117.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3117


          commit 96ef2c041bfe33462600d72b8ec1472f53c852f2
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-01-12T17:57:52Z

          FLINK-5480 Introduce user-provided hash for JobVertexes


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/3117 FLINK-5480 Introduce user-provided hash for JobVertexes This PR allows users to provided (alternative) hashes for operators in a StreamGraph. This can make migration between Flink versions easier, in case the automatically produced hashes between versions are incompatible. For example, users could just copy the old hashes from the web ui to their job. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink UserProvidedHash Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3117.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3117 commit 96ef2c041bfe33462600d72b8ec1472f53c852f2 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-01-12T17:57:52Z FLINK-5480 Introduce user-provided hash for JobVertexes
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3117

          cc @uce

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3117 cc @uce
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3117#discussion_r96220999

          — Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java —
          @@ -95,6 +95,28 @@ private CassandraSink(SingleOutputStreamOperator<IN> sink) {
          }

          /**
          + * Sets an additional, user provided hash for this operator.
          + *
          + * <p/>
          + * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
          + * operator through the default hash mechanics fails (e.g. because of changes between Flink versions.
          — End diff –

          missing closing brackets.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3117#discussion_r96220999 — Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java — @@ -95,6 +95,28 @@ private CassandraSink(SingleOutputStreamOperator<IN> sink) { } /** + * Sets an additional, user provided hash for this operator. + * + * <p/> + * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an + * operator through the default hash mechanics fails (e.g. because of changes between Flink versions. — End diff – missing closing brackets.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3117#discussion_r96222455

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java —
          @@ -422,6 +425,43 @@ public void testManualHashAssignmentForStartNodeInInChain() throws Exception

          { env.getStreamGraph().getJobGraph(); }

          + @Test
          + public void testUserProvidedHashing() {
          + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
          +
          + List<String> userHashes = Arrays.asList("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
          +
          + env.addSource(new NoOpSourceFunction(), "src").provideAdditionalNodeHash(userHashes.get(0))
          + .map(new NoOpMapFunction())
          + .filter(new NoOpFilterFunction())
          + .keyBy(new NoOpKeySelector())
          + .reduce(new NoOpReduceFunction()).name("reduce").provideAdditionalNodeHash(userHashes.get(1));
          +
          + StreamGraph streamGraph = env.getStreamGraph();
          + int idx = 1;
          + for (JobVertex jobVertex : streamGraph.getJobGraph().getVertices())

          { + Assert.assertEquals(jobVertex.getIdAlternatives().get(1).toString(), userHashes.get(idx)); + --idx; + }

          + }
          +
          + @Test
          + public void testUserProvidedHashingOnChainNotSupported() {
          + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
          +
          + env.addSource(new NoOpSourceFunction(), "src").provideAdditionalNodeHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
          + .map(new NoOpMapFunction()).provideAdditionalNodeHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
          — End diff –

          the fact that this fails the job should probably be documented in the javadocs of ```provideAdditionalNodeHash```.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3117#discussion_r96222455 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java — @@ -422,6 +425,43 @@ public void testManualHashAssignmentForStartNodeInInChain() throws Exception { env.getStreamGraph().getJobGraph(); } + @Test + public void testUserProvidedHashing() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + + List<String> userHashes = Arrays.asList("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"); + + env.addSource(new NoOpSourceFunction(), "src").provideAdditionalNodeHash(userHashes.get(0)) + .map(new NoOpMapFunction()) + .filter(new NoOpFilterFunction()) + .keyBy(new NoOpKeySelector()) + .reduce(new NoOpReduceFunction()).name("reduce").provideAdditionalNodeHash(userHashes.get(1)); + + StreamGraph streamGraph = env.getStreamGraph(); + int idx = 1; + for (JobVertex jobVertex : streamGraph.getJobGraph().getVertices()) { + Assert.assertEquals(jobVertex.getIdAlternatives().get(1).toString(), userHashes.get(idx)); + --idx; + } + } + + @Test + public void testUserProvidedHashingOnChainNotSupported() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + + env.addSource(new NoOpSourceFunction(), "src").provideAdditionalNodeHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") + .map(new NoOpMapFunction()).provideAdditionalNodeHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") — End diff – the fact that this fails the job should probably be documented in the javadocs of ```provideAdditionalNodeHash```.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3117#discussion_r96222946

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java —
          @@ -0,0 +1,74 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.api.graph;
          +
          +import org.apache.flink.streaming.api.operators.ChainingStrategy;
          +import org.apache.flink.streaming.api.operators.StreamOperator;
          +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
          +import org.apache.flink.util.StringUtils;
          +
          +import java.util.HashMap;
          +import java.util.Map;
          +
          +/**
          + * StreamGraphHasher that works with user provided hashes.
          + */
          +public class StreamGraphUserHashHasher implements StreamGraphHasher {
          + @Override
          + public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
          + HashMap<Integer, byte[]> hashResult = new HashMap<>();
          + for (StreamNode streamNode : streamGraph.getStreamNodes()) {
          + String userHash = streamNode.getUserHash();
          + if (null != userHash) {
          +
          + for (StreamEdge inEdge : streamNode.getInEdges()) {
          + if (isChainable(inEdge, streamGraph.isChainingEnabled()))

          { + throw new UnsupportedOperationException("Cannot assign user-specified hash " + + "to intermediate node in chain. This will be supported in future " + + "versions of Flink. As a work around start new chain at task " + + streamNode.getOperatorName() + "."); + }

          + }
          +
          + hashResult.put(streamNode.getId(), StringUtils.hexStringToByte(userHash));
          + }
          + }
          +
          + return hashResult;
          + }
          +
          + private boolean isChainable(StreamEdge edge, boolean isChainingEnabled) {
          — End diff –

          Can't we reuse the ```isChainable``` method in the ```StreamJobGraphGenerator``` instead? Otherwise we risk these conditions going out of sync.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3117#discussion_r96222946 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java — @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.apache.flink.util.StringUtils; + +import java.util.HashMap; +import java.util.Map; + +/** + * StreamGraphHasher that works with user provided hashes. + */ +public class StreamGraphUserHashHasher implements StreamGraphHasher { + @Override + public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) { + HashMap<Integer, byte[]> hashResult = new HashMap<>(); + for (StreamNode streamNode : streamGraph.getStreamNodes()) { + String userHash = streamNode.getUserHash(); + if (null != userHash) { + + for (StreamEdge inEdge : streamNode.getInEdges()) { + if (isChainable(inEdge, streamGraph.isChainingEnabled())) { + throw new UnsupportedOperationException("Cannot assign user-specified hash " + + "to intermediate node in chain. This will be supported in future " + + "versions of Flink. As a work around start new chain at task " + + streamNode.getOperatorName() + "."); + } + } + + hashResult.put(streamNode.getId(), StringUtils.hexStringToByte(userHash)); + } + } + + return hashResult; + } + + private boolean isChainable(StreamEdge edge, boolean isChainingEnabled) { — End diff – Can't we reuse the ```isChainable``` method in the ```StreamJobGraphGenerator``` instead? Otherwise we risk these conditions going out of sync.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3117#discussion_r96229201

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java —
          @@ -205,6 +209,31 @@ public void setMaxParallelism(int maxParallelism) {
          }

          /**
          + * Sets an additional, user provided hash for this operator.
          — End diff –

          Above comments apply to these as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3117#discussion_r96229201 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java — @@ -205,6 +209,31 @@ public void setMaxParallelism(int maxParallelism) { } /** + * Sets an additional, user provided hash for this operator. — End diff – Above comments apply to these as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3117#discussion_r96227219

          — Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java —
          @@ -95,6 +95,28 @@ private CassandraSink(SingleOutputStreamOperator<IN> sink) {
          }

          /**
          + * Sets an additional, user provided hash for this operator.
          + *
          + * <p/>
          + * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
          + * operator through the default hash mechanics fails (e.g. because of changes between Flink versions.
          + *
          + * <p><strong>Important</strong>: this hash needs to be unique per transformation and job. Otherwise, job
          + * submission will fail.
          + *
          + * @param hash the user provided hash for this operator.
          + * @return The operator with the user provided hash.
          + */
          + public CassandraSink<IN> provideAdditionalNodeHash(String hash) {
          — End diff –

          I think the `provide` prefix gives the impression that it is possible to provide multiple additional node hashes. I would replcae this with `set`. What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3117#discussion_r96227219 — Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java — @@ -95,6 +95,28 @@ private CassandraSink(SingleOutputStreamOperator<IN> sink) { } /** + * Sets an additional, user provided hash for this operator. + * + * <p/> + * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an + * operator through the default hash mechanics fails (e.g. because of changes between Flink versions. + * + * <p><strong>Important</strong>: this hash needs to be unique per transformation and job. Otherwise, job + * submission will fail. + * + * @param hash the user provided hash for this operator. + * @return The operator with the user provided hash. + */ + public CassandraSink<IN> provideAdditionalNodeHash(String hash) { — End diff – I think the `provide` prefix gives the impression that it is possible to provide multiple additional node hashes. I would replcae this with `set`. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3117#discussion_r96229220

          — Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala —
          @@ -201,6 +201,28 @@ class DataStream[T](stream: JavaStream[T]) {
          }

          /**
          + * Sets an additional, user provided hash for this operator.
          — End diff –

          Above comments apply to these as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3117#discussion_r96229220 — Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala — @@ -201,6 +201,28 @@ class DataStream [T] (stream: JavaStream [T] ) { } /** + * Sets an additional, user provided hash for this operator. — End diff – Above comments apply to these as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3117#discussion_r96227741

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java —
          @@ -0,0 +1,74 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.api.graph;
          +
          +import org.apache.flink.streaming.api.operators.ChainingStrategy;
          +import org.apache.flink.streaming.api.operators.StreamOperator;
          +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
          +import org.apache.flink.util.StringUtils;
          +
          +import java.util.HashMap;
          +import java.util.Map;
          +
          +/**
          + * StreamGraphHasher that works with user provided hashes.
          + */
          +public class StreamGraphUserHashHasher implements StreamGraphHasher {
          + @Override
          — End diff –

          empty lines above missing

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3117#discussion_r96227741 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java — @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.apache.flink.util.StringUtils; + +import java.util.HashMap; +import java.util.Map; + +/** + * StreamGraphHasher that works with user provided hashes. + */ +public class StreamGraphUserHashHasher implements StreamGraphHasher { + @Override — End diff – empty lines above missing
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3117#discussion_r96228294

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java —
          @@ -272,12 +273,20 @@ public void setStateKeySerializer(TypeSerializer<?> stateKeySerializer)

          { this.stateKeySerializer = stateKeySerializer; }
          • public String getTransformationId() {
          • return transformationId;
            + public String getTransformationUID() { + return transformationUID; }
          • void setTransformationId(String transformationId) {
          • this.transformationId = transformationId;
            + void setTransformationUID(String transformationId) { + this.transformationUID = transformationId; + }

            +
            + public String getUserHash()

            { + return userHash; + }

            +
            + public void setUserHash(String userHash) {
            + this.userHash = userHash;

              • End diff –

          Since this is executed on the client before submitting the job it might be helpful to do some early sanity checking here. The expected String is a hex representation of a JobVertexID

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3117#discussion_r96228294 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java — @@ -272,12 +273,20 @@ public void setStateKeySerializer(TypeSerializer<?> stateKeySerializer) { this.stateKeySerializer = stateKeySerializer; } public String getTransformationId() { return transformationId; + public String getTransformationUID() { + return transformationUID; } void setTransformationId(String transformationId) { this.transformationId = transformationId; + void setTransformationUID(String transformationId) { + this.transformationUID = transformationId; + } + + public String getUserHash() { + return userHash; + } + + public void setUserHash(String userHash) { + this.userHash = userHash; End diff – Since this is executed on the client before submitting the job it might be helpful to do some early sanity checking here. The expected String is a hex representation of a JobVertexID
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3117#discussion_r96227773

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java —
          @@ -0,0 +1,74 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.api.graph;
          +
          +import org.apache.flink.streaming.api.operators.ChainingStrategy;
          +import org.apache.flink.streaming.api.operators.StreamOperator;
          +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
          +import org.apache.flink.util.StringUtils;
          +
          +import java.util.HashMap;
          +import java.util.Map;
          +
          +/**
          + * StreamGraphHasher that works with user provided hashes.
          + */
          +public class StreamGraphUserHashHasher implements StreamGraphHasher {
          + @Override
          + public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
          + HashMap<Integer, byte[]> hashResult = new HashMap<>();
          + for (StreamNode streamNode : streamGraph.getStreamNodes()) {
          + String userHash = streamNode.getUserHash();
          + if (null != userHash) {
          +
          — End diff –

          empty line

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3117#discussion_r96227773 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java — @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.apache.flink.util.StringUtils; + +import java.util.HashMap; +import java.util.Map; + +/** + * StreamGraphHasher that works with user provided hashes. + */ +public class StreamGraphUserHashHasher implements StreamGraphHasher { + @Override + public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) { + HashMap<Integer, byte[]> hashResult = new HashMap<>(); + for (StreamNode streamNode : streamGraph.getStreamNodes()) { + String userHash = streamNode.getUserHash(); + if (null != userHash) { + — End diff – empty line
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3117#discussion_r96227285

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java —
          @@ -77,6 +77,25 @@ protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) {
          }

          /**
          + * Sets an additional, user provided hash for this operator.
          — End diff –

          Above comments apply to these as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3117#discussion_r96227285 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java — @@ -77,6 +77,25 @@ protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) { } /** + * Sets an additional, user provided hash for this operator. — End diff – Above comments apply to these as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3117#discussion_r96227360

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java —
          @@ -87,6 +87,26 @@ public String getName()

          { return this; }

          +
          + /**
          + * Sets an additional, user provided hash for this operator.
          — End diff –

          Above comments apply to these as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3117#discussion_r96227360 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java — @@ -87,6 +87,26 @@ public String getName() { return this; } + + /** + * Sets an additional, user provided hash for this operator. — End diff – Above comments apply to these as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3117#discussion_r96229428

          — Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java —
          @@ -95,6 +95,28 @@ private CassandraSink(SingleOutputStreamOperator<IN> sink) {
          }

          /**
          + * Sets an additional, user provided hash for this operator.
          + *
          + * <p/>
          + * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
          + * operator through the default hash mechanics fails (e.g. because of changes between Flink versions.
          + *
          + * <p><strong>Important</strong>: this hash needs to be unique per transformation and job. Otherwise, job
          + * submission will fail.
          + *
          + * @param hash the user provided hash for this operator.
          + * @return The operator with the user provided hash.
          + */
          — End diff –

          We don't annotate classes outside of some projects, but should we add the `@PublicEvolving` annotation here as well (also missing for `uid` in this class).

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3117#discussion_r96229428 — Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java — @@ -95,6 +95,28 @@ private CassandraSink(SingleOutputStreamOperator<IN> sink) { } /** + * Sets an additional, user provided hash for this operator. + * + * <p/> + * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an + * operator through the default hash mechanics fails (e.g. because of changes between Flink versions. + * + * <p><strong>Important</strong>: this hash needs to be unique per transformation and job. Otherwise, job + * submission will fail. + * + * @param hash the user provided hash for this operator. + * @return The operator with the user provided hash. + */ — End diff – We don't annotate classes outside of some projects, but should we add the `@PublicEvolving` annotation here as well (also missing for `uid` in this class).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3117#discussion_r96227716

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java —
          @@ -0,0 +1,74 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.api.graph;
          +
          +import org.apache.flink.streaming.api.operators.ChainingStrategy;
          +import org.apache.flink.streaming.api.operators.StreamOperator;
          +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
          +import org.apache.flink.util.StringUtils;
          +
          +import java.util.HashMap;
          +import java.util.Map;
          +
          +/**
          + * StreamGraphHasher that works with user provided hashes.
          — End diff –

          I think we should provide a more detailed description about why we need this etc.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3117#discussion_r96227716 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java — @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.apache.flink.util.StringUtils; + +import java.util.HashMap; +import java.util.Map; + +/** + * StreamGraphHasher that works with user provided hashes. — End diff – I think we should provide a more detailed description about why we need this etc.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3117#discussion_r96226989

          — Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java —
          @@ -95,6 +95,28 @@ private CassandraSink(SingleOutputStreamOperator<IN> sink) {
          }

          /**
          + * Sets an additional, user provided hash for this operator.
          + *
          + * <p/>
          — End diff –

          Not needed

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3117#discussion_r96226989 — Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java — @@ -95,6 +95,28 @@ private CassandraSink(SingleOutputStreamOperator<IN> sink) { } /** + * Sets an additional, user provided hash for this operator. + * + * <p/> — End diff – Not needed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3117#discussion_r96227062

          — Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java —
          @@ -95,6 +95,28 @@ private CassandraSink(SingleOutputStreamOperator<IN> sink) {
          }

          /**
          + * Sets an additional, user provided hash for this operator.
          + *
          + * <p/>
          + * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
          + * operator through the default hash mechanics fails (e.g. because of changes between Flink versions.
          + *
          + * <p><strong>Important</strong>: this hash needs to be unique per transformation and job. Otherwise, job
          — End diff –

          I think this is not true for the additional hash?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3117#discussion_r96227062 — Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java — @@ -95,6 +95,28 @@ private CassandraSink(SingleOutputStreamOperator<IN> sink) { } /** + * Sets an additional, user provided hash for this operator. + * + * <p/> + * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an + * operator through the default hash mechanics fails (e.g. because of changes between Flink versions. + * + * <p><strong>Important</strong>: this hash needs to be unique per transformation and job. Otherwise, job — End diff – I think this is not true for the additional hash?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3117#discussion_r96630784

          — Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java —
          @@ -95,6 +95,28 @@ private CassandraSink(SingleOutputStreamOperator<IN> sink) {
          }

          /**
          + * Sets an additional, user provided hash for this operator.
          + *
          + * <p/>
          + * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
          + * operator through the default hash mechanics fails (e.g. because of changes between Flink versions.
          + *
          + * <p><strong>Important</strong>: this hash needs to be unique per transformation and job. Otherwise, job
          — End diff –

          If it is not true, then how can state be assigned without ambiguities if the hash is used as basis for JobVertexID?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3117#discussion_r96630784 — Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java — @@ -95,6 +95,28 @@ private CassandraSink(SingleOutputStreamOperator<IN> sink) { } /** + * Sets an additional, user provided hash for this operator. + * + * <p/> + * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an + * operator through the default hash mechanics fails (e.g. because of changes between Flink versions. + * + * <p><strong>Important</strong>: this hash needs to be unique per transformation and job. Otherwise, job — End diff – If it is not true, then how can state be assigned without ambiguities if the hash is used as basis for JobVertexID?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3117

          Thanks a lot for the reviews @zentol and @uce . I made another commit that addresses the review comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3117 Thanks a lot for the reviews @zentol and @uce . I made another commit that addresses the review comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3117#discussion_r96640591

          — Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java —
          @@ -96,18 +98,19 @@ private CassandraSink(SingleOutputStreamOperator<IN> sink) {

          /**

          • Sets an additional, user provided hash for this operator.
          • *
          • <p/>
          • <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
          • operator through the default hash mechanics fails (e.g. because of changes between Flink versions.
              • End diff –

          Still missing closing parenthesis (also in other copies of this javadoc)

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3117#discussion_r96640591 — Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java — @@ -96,18 +98,19 @@ private CassandraSink(SingleOutputStreamOperator<IN> sink) { /** Sets an additional, user provided hash for this operator. * <p/> <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an operator through the default hash mechanics fails (e.g. because of changes between Flink versions. End diff – Still missing closing parenthesis (also in other copies of this javadoc)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3117#discussion_r96640105

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java —
          @@ -18,27 +18,29 @@

          package org.apache.flink.streaming.api.graph;

          -import org.apache.flink.streaming.api.operators.ChainingStrategy;
          -import org.apache.flink.streaming.api.operators.StreamOperator;
          -import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
          import org.apache.flink.util.StringUtils;

          import java.util.HashMap;
          import java.util.Map;

          /**

          • * StreamGraphHasher that works with user provided hashes.
            + * StreamGraphHasher that works with user provided hashes. This us useful in case we want to set (alternative) hashes
              • End diff –

          This us useful -> This is useful

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3117#discussion_r96640105 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java — @@ -18,27 +18,29 @@ package org.apache.flink.streaming.api.graph; -import org.apache.flink.streaming.api.operators.ChainingStrategy; -import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; import org.apache.flink.util.StringUtils; import java.util.HashMap; import java.util.Map; /** * StreamGraphHasher that works with user provided hashes. + * StreamGraphHasher that works with user provided hashes. This us useful in case we want to set (alternative) hashes End diff – This us useful -> This is useful
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3117#discussion_r96651445

          — Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java —
          @@ -96,18 +98,19 @@ private CassandraSink(SingleOutputStreamOperator<IN> sink) {

          /**

          • Sets an additional, user provided hash for this operator.
          • *
          • <p/>
          • <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
          • operator through the default hash mechanics fails (e.g. because of changes between Flink versions.
              • End diff –

          Ok, I misunderstood what closing bracket we were talking about . Fixed those typos.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3117#discussion_r96651445 — Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java — @@ -96,18 +98,19 @@ private CassandraSink(SingleOutputStreamOperator<IN> sink) { /** Sets an additional, user provided hash for this operator. * <p/> <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an operator through the default hash mechanics fails (e.g. because of changes between Flink versions. End diff – Ok, I misunderstood what closing bracket we were talking about . Fixed those typos.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/3117

          Thanks for addressing the comments.

          I had a quick offline chat with Stephan regarding the naming of `setAdditionalNodeHash`. He also agrees that the name moves internal implementation details to the user API. Furthermore he raised the point that there is no way for the user to get the relation between `uid` and `setAdditionalNodeHash`.

          He proposed the following, which I liked.

          • Rename `setAdditionalNodeHash` to `setUidHash`. Add a big fat comment that this should be used as a work around or for trouble shooting.
          • Add a comment to `uid` saying that the given String will be hashed to the ID that corresponds to the JobVertexID in the logs/web UI so users now what's happening.
          • Make in both Javadocs clear how the methods relate to eachother.

          What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/3117 Thanks for addressing the comments. I had a quick offline chat with Stephan regarding the naming of `setAdditionalNodeHash`. He also agrees that the name moves internal implementation details to the user API. Furthermore he raised the point that there is no way for the user to get the relation between `uid` and `setAdditionalNodeHash`. He proposed the following, which I liked. Rename `setAdditionalNodeHash` to `setUidHash`. Add a big fat comment that this should be used as a work around or for trouble shooting. Add a comment to `uid` saying that the given String will be hashed to the ID that corresponds to the JobVertexID in the logs/web UI so users now what's happening. Make in both Javadocs clear how the methods relate to eachother. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3117

          Ok, I changed method names and commented as proposed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3117 Ok, I changed method names and commented as proposed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/3117

          Thanks! Looks good. Just tested this with a job where I did not set any UIDs. After changing in an incompatible way (with respect to auto generated IDs) I was able to restore my state with `setUidHash`. I further played around with it by adding a completely new operator, which also restored the state of an operator in addition to the regular one. I think this will be very powerful once we provide some less low level plumbing around it. The biggest pain point right now (independent of this PR) is figuring out the old IDs.

          I will go ahead and merge this for `master` and `release-1.2`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/3117 Thanks! Looks good. Just tested this with a job where I did not set any UIDs. After changing in an incompatible way (with respect to auto generated IDs) I was able to restore my state with `setUidHash`. I further played around with it by adding a completely new operator, which also restored the state of an operator in addition to the regular one. I think this will be very powerful once we provide some less low level plumbing around it. The biggest pain point right now (independent of this PR) is figuring out the old IDs. I will go ahead and merge this for `master` and `release-1.2`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3117

          Thanks a lot for the review and such intensive testing @uce !

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3117 Thanks a lot for the review and such intensive testing @uce !
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3117

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3117
          Hide
          aljoscha Aljoscha Krettek added a comment -

          I think this one can be closed, right Ufuk Celebi and Stefan Richter?

          Show
          aljoscha Aljoscha Krettek added a comment - I think this one can be closed, right Ufuk Celebi and Stefan Richter ?
          Hide
          srichter Stefan Richter added a comment -

          From my side yes.

          Show
          srichter Stefan Richter added a comment - From my side yes.
          Hide
          uce Ufuk Celebi added a comment -

          Fixed in 35da413 (release-1.2), 0de2bc3 (master).

          Show
          uce Ufuk Celebi added a comment - Fixed in 35da413 (release-1.2), 0de2bc3 (master).

            People

            • Assignee:
              srichter Stefan Richter
              Reporter:
              srichter Stefan Richter
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development