Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6883

Serializer for collection of Scala case classes are generated with different anonymous class names in 1.3

    Details

      Description

      In the Scala API, serializers are generated using Scala macros (via the org.apache.flink.streaming.api.scala.createTypeInformation(..) util).
      The generated serializers are inner anonymous classes, therefore classnames will differ depending on when / order that the serializers are generated.

      From 1.1 / 1.2 to Flink 1.3, the generated classnames for a serializer for a collections of case classes (e.g. List[SomeUserCaseClass]) will be different. In other words, the exact same user code written in the Scala API, compiling it with 1.1 / 1.2 and with 1.3 will result in different classnames.

      This is problematic for restoring older savepoints that have Scala case class collections in their state, because the old serializer cannot be recovered (due to the generated classname change).

      For now, I've managed to identify that the root cause for this is that in 1.3 the TypeSerializer base class additionally extends the TypeDeserializer interface. Removing this extending resolves the problem. The actual reason for why this affects the generated classname is still being investigated.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/4103

          FLINK-6883 [scala, state] Fix restore of Scala type states

          This PR is based on #4090. Together with #4090 as a whole, this PR fixes restoring savepoints of Scala type states.

            1. What this fixes

          This PR fixes the problem that the exact same job code written with the Flink Scala API using Scala types as state, will generate different classnames for the anonymous classed serializers of case classes / collection types when compiled against pre-1.3 version and Flink 1.3.

          The root cause of this is that prior to this PR in 1.3, the `TypeSerializer` base class additionally implements the `TypeDeserializer` interface. This alters Scala compiler's generation order of the anonymous serializer classes, and therefore ends up in different generated names.

          To fix this, the `TypeSerializer` base now no longer implements `TypeDeserializer`, while not affecting any user-facing interfaces of the serializer compatibility functionality (i.e. `CompatibilityResult`, `TypeSerializer#snapshotConfiguration`, `TypeSerializer#ensureCompatibility` interfaces are not broken).

          With this fix, we can at least guarantee that Scala jobs with Scala type states will be able to be restored across Flink majors versions, when 1) the same compiler is used, and 2) the user code is remained untouched (invocation order of the Scala `createTypeInformation` macro remains the same).

            1. Tests

          This PR also adds tests to guard against future problems like this. Includes:
          1. A `ScalaSerializersMigrationTest` to guard against different generated classnames for anonymous serializers across changes to the codebase. The tested classnames are what they were in Flink 1.1 and 1.2.
          2. A `scala.StatefulJobSavepointITCase` to test end-to-end migration from 1.2.x / 1.3.x for Scala jobs. The 1.2 savepoints were generated under the `release-1.2` branch.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-6883

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4103.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4103


          commit c9696c20d61ecba26fc19b4a7cdbb16586d30894
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-06-08T06:52:04Z

          [hotfix] [scala] Fix instantiation of Scala serializers' config snapshot classes

          Prior to this commit, the configuration snapshot classes of Scala
          serializers did not have the proper default empty constructor that is
          used for deserializing the configuration snapshot.

          Since some Scala serializers' config snapshots extend the Java
          CompositeTypeSerializerConfigSnapshot, their config snapshot classes are
          also changed to be implemented in Java since in Scala we can only call a
          single base class constructor from subclasses.

          commit 7e20e6251385e04334ec0f06dbaa5f1f0315b530
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-06-08T13:29:45Z

          FLINK-6869 [scala] Specify serialVersionUID for all Scala serializers

          Previously, Scala serializers did not specify the serialVersionUID, and
          therefore prohibited restore from previous Flink version snapshots
          because the serializers' implementations changed.

          The serialVersionUIDs added in this commit are identical to what they
          were (as generated by Java) in Flink 1.2, so that we can at least
          restore state that were written with the Scala serializers as of 1.2.

          commit bdcf354fa7416f6e1ea1251433b4d97292b219c6
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-06-10T20:41:35Z

          FLINK-6869 [core] Tolerate serialVersionUID mismatches for anonymous classed serializers

          This commit lets the TypeSerializerSerializationProxy be tolerable for
          serialVersionUID mismatches when reading anonymous classed serializers.

          Our Scala case class serializers require this since they use Scala
          macros to be generated at compile time, and therefore is not possible to
          fix a certain serialVersionUID for them.

          This commit also updates the streaming state docs to educate the user to
          avoid using anonymous classes for their state serializers.

          commit 31a2977c62abed0f985fdc79539b90c4152b60f1
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-06-11T09:02:38Z

          [hotfix] [cep] Fix incorrect CompatibilityResult.requiresMigration calls in CEP

          commit b294637c59c27ecef58ad67a123d2cfb401f51d2
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-06-11T13:30:36Z

          FLINK-6883 [core] Refactor TypeSerializer to not implement TypeDeserializer

          The separation of the TypeDeserializer interface from the TypeSerializer
          base class is due to the fact that additionally implementing the
          TypeDeserializer interface alters the generation order of anonymos
          serializer classes for Scala case classes and collections.

          Instead, the TypeDeserializer is now used as a mixin on the
          TypeDeserializerAdapter utility, which now serves as a bridge for
          both directions (i.e. TypeSerializer to TypeDeserializer, and vice
          versa). No user interfaces are broken due to this change.

          commit ff0522f6c7f0d8f645736f2769dca90de50179ae
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-06-11T13:31:42Z

          FLINK-6883 [tests] Add migration tests for Scala jobs

          This commit adds migration ITCases for jobs written using the Scala API.
          An extra concern for migration of Scala jobs is that Scala case classes
          and collections use anonymous generated serializers, which may affect
          state restore.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/4103 FLINK-6883 [scala, state] Fix restore of Scala type states This PR is based on #4090. Together with #4090 as a whole, this PR fixes restoring savepoints of Scala type states. What this fixes This PR fixes the problem that the exact same job code written with the Flink Scala API using Scala types as state, will generate different classnames for the anonymous classed serializers of case classes / collection types when compiled against pre-1.3 version and Flink 1.3. The root cause of this is that prior to this PR in 1.3, the `TypeSerializer` base class additionally implements the `TypeDeserializer` interface. This alters Scala compiler's generation order of the anonymous serializer classes, and therefore ends up in different generated names. To fix this, the `TypeSerializer` base now no longer implements `TypeDeserializer`, while not affecting any user-facing interfaces of the serializer compatibility functionality (i.e. `CompatibilityResult`, `TypeSerializer#snapshotConfiguration`, `TypeSerializer#ensureCompatibility` interfaces are not broken). With this fix, we can at least guarantee that Scala jobs with Scala type states will be able to be restored across Flink majors versions, when 1) the same compiler is used, and 2) the user code is remained untouched (invocation order of the Scala `createTypeInformation` macro remains the same). Tests This PR also adds tests to guard against future problems like this. Includes: 1. A `ScalaSerializersMigrationTest` to guard against different generated classnames for anonymous serializers across changes to the codebase. The tested classnames are what they were in Flink 1.1 and 1.2. 2. A `scala.StatefulJobSavepointITCase` to test end-to-end migration from 1.2.x / 1.3.x for Scala jobs. The 1.2 savepoints were generated under the `release-1.2` branch. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-6883 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4103.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4103 commit c9696c20d61ecba26fc19b4a7cdbb16586d30894 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-06-08T06:52:04Z [hotfix] [scala] Fix instantiation of Scala serializers' config snapshot classes Prior to this commit, the configuration snapshot classes of Scala serializers did not have the proper default empty constructor that is used for deserializing the configuration snapshot. Since some Scala serializers' config snapshots extend the Java CompositeTypeSerializerConfigSnapshot, their config snapshot classes are also changed to be implemented in Java since in Scala we can only call a single base class constructor from subclasses. commit 7e20e6251385e04334ec0f06dbaa5f1f0315b530 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-06-08T13:29:45Z FLINK-6869 [scala] Specify serialVersionUID for all Scala serializers Previously, Scala serializers did not specify the serialVersionUID, and therefore prohibited restore from previous Flink version snapshots because the serializers' implementations changed. The serialVersionUIDs added in this commit are identical to what they were (as generated by Java) in Flink 1.2, so that we can at least restore state that were written with the Scala serializers as of 1.2. commit bdcf354fa7416f6e1ea1251433b4d97292b219c6 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-06-10T20:41:35Z FLINK-6869 [core] Tolerate serialVersionUID mismatches for anonymous classed serializers This commit lets the TypeSerializerSerializationProxy be tolerable for serialVersionUID mismatches when reading anonymous classed serializers. Our Scala case class serializers require this since they use Scala macros to be generated at compile time, and therefore is not possible to fix a certain serialVersionUID for them. This commit also updates the streaming state docs to educate the user to avoid using anonymous classes for their state serializers. commit 31a2977c62abed0f985fdc79539b90c4152b60f1 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-06-11T09:02:38Z [hotfix] [cep] Fix incorrect CompatibilityResult.requiresMigration calls in CEP commit b294637c59c27ecef58ad67a123d2cfb401f51d2 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-06-11T13:30:36Z FLINK-6883 [core] Refactor TypeSerializer to not implement TypeDeserializer The separation of the TypeDeserializer interface from the TypeSerializer base class is due to the fact that additionally implementing the TypeDeserializer interface alters the generation order of anonymos serializer classes for Scala case classes and collections. Instead, the TypeDeserializer is now used as a mixin on the TypeDeserializerAdapter utility, which now serves as a bridge for both directions (i.e. TypeSerializer to TypeDeserializer, and vice versa). No user interfaces are broken due to this change. commit ff0522f6c7f0d8f645736f2769dca90de50179ae Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-06-11T13:31:42Z FLINK-6883 [tests] Add migration tests for Scala jobs This commit adds migration ITCases for jobs written using the Scala API. An extra concern for migration of Scala jobs is that Scala case classes and collections use anonymous generated serializers, which may affect state restore.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4103#discussion_r121388886

          — Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointITCase.scala —
          @@ -0,0 +1,298 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.scala.migration
          +
          +import java.util
          +
          +import org.apache.flink.api.common.accumulators.IntCounter
          +import org.apache.flink.api.common.functions.RichFlatMapFunction
          +import org.apache.flink.api.common.state.

          {ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}

          +import org.apache.flink.api.java.functions.KeySelector
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.runtime.state.memory.MemoryStateBackend
          +import org.apache.flink.streaming.api.TimeCharacteristic
          +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
          +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
          +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
          +import org.apache.flink.streaming.api.functions.source.SourceFunction
          +import org.apache.flink.streaming.api.watermark.Watermark
          +import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase
          +import org.apache.flink.util.Collector
          +import org.apache.flink.api.java.tuple.Tuple2
          +import org.apache.flink.runtime.state.

          {AbstractStateBackend, FunctionInitializationContext, FunctionSnapshotContext}

          +import org.apache.flink.api.scala._
          +import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum
          +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
          +import org.apache.flink.streaming.util.migration.MigrationVersion
          +import org.junit.runner.RunWith
          +import org.junit.runners.Parameterized
          +import org.junit.

          {Ignore, Test}

          +
          +import scala.util.

          {Failure, Try}

          +
          +object StatefulJobSavepointITCase {
          +
          + @Parameterized.Parameters(name = "Migrate Savepoint / Backend:

          {0}

          ")
          + def parameters: util.Collection[(MigrationVersion, String)] =

          { + util.Arrays.asList( + (MigrationVersion.v1_2, AbstractStateBackend.MEMORY_STATE_BACKEND_NAME), + (MigrationVersion.v1_2, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME), + (MigrationVersion.v1_3, AbstractStateBackend.MEMORY_STATE_BACKEND_NAME), + (MigrationVersion.v1_3, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME)) + }

          +
          + // TODO to generate savepoints for a specific Flink version / backend type,
          + // TODO change these values accordingly, e.g. to generate for 1.3 with RocksDB,
          + // TODO set as (MigrationVersion.v1_3, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME)
          + val GENERATE_SAVEPOINT_VER: MigrationVersion = null
          + val GENERATE_SAVEPOINT_BACKEND_TYPE: String = ""
          +
          + val NUM_ELEMENTS = 4
          +}
          +
          +/**
          + * ITCase for migration Scala state types across different Flink versions.
          + */
          +@RunWith(classOf[Parameterized])
          +class StatefulJobSavepointITCase(
          + migrationVersionAndBackend: (MigrationVersion, String))
          + extends SavepointMigrationTestBase with Serializable {
          +
          + @Ignore
          + @Test
          + def testCreateSavepoint(): Unit = {
          + val env = StreamExecutionEnvironment.getExecutionEnvironment
          + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
          +
          + StatefulJobSavepointITCase.GENERATE_SAVEPOINT_BACKEND_TYPE match

          { + case AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME => + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())) + case AbstractStateBackend.MEMORY_STATE_BACKEND_NAME => + env.setStateBackend(new MemoryStateBackend()) + case _ => throw new UnsupportedOperationException + }
          +
          + env.setStateBackend(new MemoryStateBackend)
          + env.enableCheckpointing(500)
          + env.setParallelism(4)
          + env.setMaxParallelism(4)
          +
          + env
          + .addSource(
          + new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
          + .keyBy(
          + new KeySelector[(Long, Long), Long] { + override def getKey(value: (Long, Long)): Long = value._1 + }
          + )
          + .flatMap(new StatefulFlatMapper)
          + .addSink(new AccumulatorCountingSink)
          +
          + executeAndSavepoint(
          + env,
          + "src/test/resources/stateful-scala-udf-migration-itcase-flink"
          + + StatefulJobSavepointITCase.GENERATE_SAVEPOINT_VER + "-"
          + + StatefulJobSavepointITCase.GENERATE_SAVEPOINT_BACKEND_TYPE + "-savepoint",
          + new Tuple2(
          + AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
          + StatefulJobSavepointITCase.NUM_ELEMENTS
          + )
          + )
          + }
          +
          + @Test
          + def testRestoreSavepoint(): Unit = {
          + val env = StreamExecutionEnvironment.getExecutionEnvironment
          + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
          +
          + migrationVersionAndBackend._2 match { + case AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME => + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())) + case AbstractStateBackend.MEMORY_STATE_BACKEND_NAME => + env.setStateBackend(new MemoryStateBackend()) + case _ => throw new UnsupportedOperationException + }

          +
          + env.setStateBackend(new MemoryStateBackend)
          + env.enableCheckpointing(500)
          + env.setParallelism(4)
          + env.setMaxParallelism(4)
          +
          + env
          + .addSource(
          + new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
          + .keyBy(
          + new KeySelector[(Long, Long), Long]

          { + override def getKey(value: (Long, Long)): Long = value._1 + }

          + )
          + .flatMap(new StatefulFlatMapper)
          + .addSink(new AccumulatorCountingSink)
          +
          + restoreAndExecute(
          + env,
          + SavepointMigrationTestBase.getResourceFilename(
          + "stateful-scala-udf-migration-itcase-flink"
          + + migrationVersionAndBackend._1 + "-"
          + + migrationVersionAndBackend._2 + "-savepoint"),
          + new Tuple2(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 4)
          + )
          + }
          +
          + @SerialVersionUID(1L)
          + private object CheckpointedSource

          { + var CHECKPOINTED_STRING = "Here be dragons!" + }

          +
          + @SerialVersionUID(1L)
          + private class CheckpointedSource(val numElements: Int)
          + extends SourceFunction[(Long, Long)] with CheckpointedFunction {
          +
          + private var isRunning = true
          + private var state: ListState[CustomCaseClass] = _
          +
          + @throws[Exception]
          + override def run(ctx: SourceFunction.SourceContext[(Long, Long)]) {
          + ctx.emitWatermark(new Watermark(0))
          + ctx.getCheckpointLock synchronized {
          + var i = 0
          + while (i < numElements) {
          + {
          — End diff –

          Are the additional `{}` blocks needed?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4103#discussion_r121388886 — Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointITCase.scala — @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.migration + +import java.util + +import org.apache.flink.api.common.accumulators.IntCounter +import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.api.common.state. {ListState, ListStateDescriptor, ValueState, ValueStateDescriptor} +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.memory.MemoryStateBackend +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase +import org.apache.flink.util.Collector +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.runtime.state. {AbstractStateBackend, FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend +import org.apache.flink.streaming.util.migration.MigrationVersion +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit. {Ignore, Test} + +import scala.util. {Failure, Try} + +object StatefulJobSavepointITCase { + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0} ") + def parameters: util.Collection [(MigrationVersion, String)] = { + util.Arrays.asList( + (MigrationVersion.v1_2, AbstractStateBackend.MEMORY_STATE_BACKEND_NAME), + (MigrationVersion.v1_2, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME), + (MigrationVersion.v1_3, AbstractStateBackend.MEMORY_STATE_BACKEND_NAME), + (MigrationVersion.v1_3, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME)) + } + + // TODO to generate savepoints for a specific Flink version / backend type, + // TODO change these values accordingly, e.g. to generate for 1.3 with RocksDB, + // TODO set as (MigrationVersion.v1_3, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME) + val GENERATE_SAVEPOINT_VER: MigrationVersion = null + val GENERATE_SAVEPOINT_BACKEND_TYPE: String = "" + + val NUM_ELEMENTS = 4 +} + +/** + * ITCase for migration Scala state types across different Flink versions. + */ +@RunWith(classOf [Parameterized] ) +class StatefulJobSavepointITCase( + migrationVersionAndBackend: (MigrationVersion, String)) + extends SavepointMigrationTestBase with Serializable { + + @Ignore + @Test + def testCreateSavepoint(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + + StatefulJobSavepointITCase.GENERATE_SAVEPOINT_BACKEND_TYPE match { + case AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME => + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())) + case AbstractStateBackend.MEMORY_STATE_BACKEND_NAME => + env.setStateBackend(new MemoryStateBackend()) + case _ => throw new UnsupportedOperationException + } + + env.setStateBackend(new MemoryStateBackend) + env.enableCheckpointing(500) + env.setParallelism(4) + env.setMaxParallelism(4) + + env + .addSource( + new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource") + .keyBy( + new KeySelector [(Long, Long), Long] { + override def getKey(value: (Long, Long)): Long = value._1 + } + ) + .flatMap(new StatefulFlatMapper) + .addSink(new AccumulatorCountingSink) + + executeAndSavepoint( + env, + "src/test/resources/stateful-scala-udf-migration-itcase-flink" + + StatefulJobSavepointITCase.GENERATE_SAVEPOINT_VER + "-" + + StatefulJobSavepointITCase.GENERATE_SAVEPOINT_BACKEND_TYPE + "-savepoint", + new Tuple2( + AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, + StatefulJobSavepointITCase.NUM_ELEMENTS + ) + ) + } + + @Test + def testRestoreSavepoint(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + + migrationVersionAndBackend._2 match { + case AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME => + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())) + case AbstractStateBackend.MEMORY_STATE_BACKEND_NAME => + env.setStateBackend(new MemoryStateBackend()) + case _ => throw new UnsupportedOperationException + } + + env.setStateBackend(new MemoryStateBackend) + env.enableCheckpointing(500) + env.setParallelism(4) + env.setMaxParallelism(4) + + env + .addSource( + new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource") + .keyBy( + new KeySelector [(Long, Long), Long] { + override def getKey(value: (Long, Long)): Long = value._1 + } + ) + .flatMap(new StatefulFlatMapper) + .addSink(new AccumulatorCountingSink) + + restoreAndExecute( + env, + SavepointMigrationTestBase.getResourceFilename( + "stateful-scala-udf-migration-itcase-flink" + + migrationVersionAndBackend._1 + "-" + + migrationVersionAndBackend._2 + "-savepoint"), + new Tuple2(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 4) + ) + } + + @SerialVersionUID(1L) + private object CheckpointedSource { + var CHECKPOINTED_STRING = "Here be dragons!" + } + + @SerialVersionUID(1L) + private class CheckpointedSource(val numElements: Int) + extends SourceFunction [(Long, Long)] with CheckpointedFunction { + + private var isRunning = true + private var state: ListState [CustomCaseClass] = _ + + @throws [Exception] + override def run(ctx: SourceFunction.SourceContext [(Long, Long)] ) { + ctx.emitWatermark(new Watermark(0)) + ctx.getCheckpointLock synchronized { + var i = 0 + while (i < numElements) { + { — End diff – Are the additional `{}` blocks needed?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4103#discussion_r121386625

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java —
          @@ -60,10 +62,10 @@
          *

          • @return a result that signals migration is necessary, also providing a convert deserializer.
            */
          • public static <T> CompatibilityResult<T> requiresMigration(TypeDeserializer<T> convertDeserializer) {
            + public static <T> CompatibilityResult<T> requiresMigration(@Nonnull TypeDeserializer<T> convertDeserializer) {
            Preconditions.checkNotNull(convertDeserializer, "Convert deserializer cannot be null.");
          • return new CompatibilityResult<>(true, Preconditions.checkNotNull(convertDeserializer));
            + return new CompatibilityResult<>(true, convertDeserializer);
              • End diff –

          Does `@Nonnull` always ensure that the argument is not null? We could simply keep the additional `Preconditions` check.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4103#discussion_r121386625 — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java — @@ -60,10 +62,10 @@ * @return a result that signals migration is necessary, also providing a convert deserializer. */ public static <T> CompatibilityResult<T> requiresMigration(TypeDeserializer<T> convertDeserializer) { + public static <T> CompatibilityResult<T> requiresMigration(@Nonnull TypeDeserializer<T> convertDeserializer) { Preconditions.checkNotNull(convertDeserializer, "Convert deserializer cannot be null."); return new CompatibilityResult<>(true, Preconditions.checkNotNull(convertDeserializer)); + return new CompatibilityResult<>(true, convertDeserializer); End diff – Does `@Nonnull` always ensure that the argument is not null? We could simply keep the additional `Preconditions` check.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4103#discussion_r121393465

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java —
          @@ -60,10 +62,10 @@
          *

          • @return a result that signals migration is necessary, also providing a convert deserializer.
            */
          • public static <T> CompatibilityResult<T> requiresMigration(TypeDeserializer<T> convertDeserializer) {
            + public static <T> CompatibilityResult<T> requiresMigration(@Nonnull TypeDeserializer<T> convertDeserializer) {
            Preconditions.checkNotNull(convertDeserializer, "Convert deserializer cannot be null.");
          • return new CompatibilityResult<>(true, Preconditions.checkNotNull(convertDeserializer));
            + return new CompatibilityResult<>(true, convertDeserializer);
              • End diff –

          I've removed this `Preconditions.checkNotNull` because it was already done in the factory method (see L66). So this one is just redundant, the removal is not related to the `Nonnull` annotation.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4103#discussion_r121393465 — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java — @@ -60,10 +62,10 @@ * @return a result that signals migration is necessary, also providing a convert deserializer. */ public static <T> CompatibilityResult<T> requiresMigration(TypeDeserializer<T> convertDeserializer) { + public static <T> CompatibilityResult<T> requiresMigration(@Nonnull TypeDeserializer<T> convertDeserializer) { Preconditions.checkNotNull(convertDeserializer, "Convert deserializer cannot be null."); return new CompatibilityResult<>(true, Preconditions.checkNotNull(convertDeserializer)); + return new CompatibilityResult<>(true, convertDeserializer); End diff – I've removed this `Preconditions.checkNotNull` because it was already done in the factory method (see L66). So this one is just redundant, the removal is not related to the `Nonnull` annotation.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4103#discussion_r121393653

          — Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointITCase.scala —
          @@ -0,0 +1,298 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.scala.migration
          +
          +import java.util
          +
          +import org.apache.flink.api.common.accumulators.IntCounter
          +import org.apache.flink.api.common.functions.RichFlatMapFunction
          +import org.apache.flink.api.common.state.

          {ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}

          +import org.apache.flink.api.java.functions.KeySelector
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.runtime.state.memory.MemoryStateBackend
          +import org.apache.flink.streaming.api.TimeCharacteristic
          +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
          +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
          +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
          +import org.apache.flink.streaming.api.functions.source.SourceFunction
          +import org.apache.flink.streaming.api.watermark.Watermark
          +import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase
          +import org.apache.flink.util.Collector
          +import org.apache.flink.api.java.tuple.Tuple2
          +import org.apache.flink.runtime.state.

          {AbstractStateBackend, FunctionInitializationContext, FunctionSnapshotContext}

          +import org.apache.flink.api.scala._
          +import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum
          +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
          +import org.apache.flink.streaming.util.migration.MigrationVersion
          +import org.junit.runner.RunWith
          +import org.junit.runners.Parameterized
          +import org.junit.

          {Ignore, Test}

          +
          +import scala.util.

          {Failure, Try}

          +
          +object StatefulJobSavepointITCase {
          +
          + @Parameterized.Parameters(name = "Migrate Savepoint / Backend:

          {0}

          ")
          + def parameters: util.Collection[(MigrationVersion, String)] =

          { + util.Arrays.asList( + (MigrationVersion.v1_2, AbstractStateBackend.MEMORY_STATE_BACKEND_NAME), + (MigrationVersion.v1_2, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME), + (MigrationVersion.v1_3, AbstractStateBackend.MEMORY_STATE_BACKEND_NAME), + (MigrationVersion.v1_3, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME)) + }

          +
          + // TODO to generate savepoints for a specific Flink version / backend type,
          + // TODO change these values accordingly, e.g. to generate for 1.3 with RocksDB,
          + // TODO set as (MigrationVersion.v1_3, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME)
          + val GENERATE_SAVEPOINT_VER: MigrationVersion = null
          + val GENERATE_SAVEPOINT_BACKEND_TYPE: String = ""
          +
          + val NUM_ELEMENTS = 4
          +}
          +
          +/**
          + * ITCase for migration Scala state types across different Flink versions.
          + */
          +@RunWith(classOf[Parameterized])
          +class StatefulJobSavepointITCase(
          + migrationVersionAndBackend: (MigrationVersion, String))
          + extends SavepointMigrationTestBase with Serializable {
          +
          + @Ignore
          + @Test
          + def testCreateSavepoint(): Unit = {
          + val env = StreamExecutionEnvironment.getExecutionEnvironment
          + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
          +
          + StatefulJobSavepointITCase.GENERATE_SAVEPOINT_BACKEND_TYPE match

          { + case AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME => + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())) + case AbstractStateBackend.MEMORY_STATE_BACKEND_NAME => + env.setStateBackend(new MemoryStateBackend()) + case _ => throw new UnsupportedOperationException + }
          +
          + env.setStateBackend(new MemoryStateBackend)
          + env.enableCheckpointing(500)
          + env.setParallelism(4)
          + env.setMaxParallelism(4)
          +
          + env
          + .addSource(
          + new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
          + .keyBy(
          + new KeySelector[(Long, Long), Long] { + override def getKey(value: (Long, Long)): Long = value._1 + }
          + )
          + .flatMap(new StatefulFlatMapper)
          + .addSink(new AccumulatorCountingSink)
          +
          + executeAndSavepoint(
          + env,
          + "src/test/resources/stateful-scala-udf-migration-itcase-flink"
          + + StatefulJobSavepointITCase.GENERATE_SAVEPOINT_VER + "-"
          + + StatefulJobSavepointITCase.GENERATE_SAVEPOINT_BACKEND_TYPE + "-savepoint",
          + new Tuple2(
          + AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
          + StatefulJobSavepointITCase.NUM_ELEMENTS
          + )
          + )
          + }
          +
          + @Test
          + def testRestoreSavepoint(): Unit = {
          + val env = StreamExecutionEnvironment.getExecutionEnvironment
          + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
          +
          + migrationVersionAndBackend._2 match { + case AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME => + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())) + case AbstractStateBackend.MEMORY_STATE_BACKEND_NAME => + env.setStateBackend(new MemoryStateBackend()) + case _ => throw new UnsupportedOperationException + }

          +
          + env.setStateBackend(new MemoryStateBackend)
          + env.enableCheckpointing(500)
          + env.setParallelism(4)
          + env.setMaxParallelism(4)
          +
          + env
          + .addSource(
          + new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
          + .keyBy(
          + new KeySelector[(Long, Long), Long]

          { + override def getKey(value: (Long, Long)): Long = value._1 + }

          + )
          + .flatMap(new StatefulFlatMapper)
          + .addSink(new AccumulatorCountingSink)
          +
          + restoreAndExecute(
          + env,
          + SavepointMigrationTestBase.getResourceFilename(
          + "stateful-scala-udf-migration-itcase-flink"
          + + migrationVersionAndBackend._1 + "-"
          + + migrationVersionAndBackend._2 + "-savepoint"),
          + new Tuple2(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 4)
          + )
          + }
          +
          + @SerialVersionUID(1L)
          + private object CheckpointedSource

          { + var CHECKPOINTED_STRING = "Here be dragons!" + }

          +
          + @SerialVersionUID(1L)
          + private class CheckpointedSource(val numElements: Int)
          + extends SourceFunction[(Long, Long)] with CheckpointedFunction {
          +
          + private var isRunning = true
          + private var state: ListState[CustomCaseClass] = _
          +
          + @throws[Exception]
          + override def run(ctx: SourceFunction.SourceContext[(Long, Long)]) {
          + ctx.emitWatermark(new Watermark(0))
          + ctx.getCheckpointLock synchronized {
          + var i = 0
          + while (i < numElements) {
          + {
          — End diff –

          Ah, I pasted this part from Java code 😅 will cleanup.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4103#discussion_r121393653 — Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointITCase.scala — @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.migration + +import java.util + +import org.apache.flink.api.common.accumulators.IntCounter +import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.api.common.state. {ListState, ListStateDescriptor, ValueState, ValueStateDescriptor} +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.memory.MemoryStateBackend +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase +import org.apache.flink.util.Collector +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.runtime.state. {AbstractStateBackend, FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend +import org.apache.flink.streaming.util.migration.MigrationVersion +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit. {Ignore, Test} + +import scala.util. {Failure, Try} + +object StatefulJobSavepointITCase { + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0} ") + def parameters: util.Collection [(MigrationVersion, String)] = { + util.Arrays.asList( + (MigrationVersion.v1_2, AbstractStateBackend.MEMORY_STATE_BACKEND_NAME), + (MigrationVersion.v1_2, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME), + (MigrationVersion.v1_3, AbstractStateBackend.MEMORY_STATE_BACKEND_NAME), + (MigrationVersion.v1_3, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME)) + } + + // TODO to generate savepoints for a specific Flink version / backend type, + // TODO change these values accordingly, e.g. to generate for 1.3 with RocksDB, + // TODO set as (MigrationVersion.v1_3, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME) + val GENERATE_SAVEPOINT_VER: MigrationVersion = null + val GENERATE_SAVEPOINT_BACKEND_TYPE: String = "" + + val NUM_ELEMENTS = 4 +} + +/** + * ITCase for migration Scala state types across different Flink versions. + */ +@RunWith(classOf [Parameterized] ) +class StatefulJobSavepointITCase( + migrationVersionAndBackend: (MigrationVersion, String)) + extends SavepointMigrationTestBase with Serializable { + + @Ignore + @Test + def testCreateSavepoint(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + + StatefulJobSavepointITCase.GENERATE_SAVEPOINT_BACKEND_TYPE match { + case AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME => + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())) + case AbstractStateBackend.MEMORY_STATE_BACKEND_NAME => + env.setStateBackend(new MemoryStateBackend()) + case _ => throw new UnsupportedOperationException + } + + env.setStateBackend(new MemoryStateBackend) + env.enableCheckpointing(500) + env.setParallelism(4) + env.setMaxParallelism(4) + + env + .addSource( + new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource") + .keyBy( + new KeySelector [(Long, Long), Long] { + override def getKey(value: (Long, Long)): Long = value._1 + } + ) + .flatMap(new StatefulFlatMapper) + .addSink(new AccumulatorCountingSink) + + executeAndSavepoint( + env, + "src/test/resources/stateful-scala-udf-migration-itcase-flink" + + StatefulJobSavepointITCase.GENERATE_SAVEPOINT_VER + "-" + + StatefulJobSavepointITCase.GENERATE_SAVEPOINT_BACKEND_TYPE + "-savepoint", + new Tuple2( + AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, + StatefulJobSavepointITCase.NUM_ELEMENTS + ) + ) + } + + @Test + def testRestoreSavepoint(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + + migrationVersionAndBackend._2 match { + case AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME => + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())) + case AbstractStateBackend.MEMORY_STATE_BACKEND_NAME => + env.setStateBackend(new MemoryStateBackend()) + case _ => throw new UnsupportedOperationException + } + + env.setStateBackend(new MemoryStateBackend) + env.enableCheckpointing(500) + env.setParallelism(4) + env.setMaxParallelism(4) + + env + .addSource( + new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource") + .keyBy( + new KeySelector [(Long, Long), Long] { + override def getKey(value: (Long, Long)): Long = value._1 + } + ) + .flatMap(new StatefulFlatMapper) + .addSink(new AccumulatorCountingSink) + + restoreAndExecute( + env, + SavepointMigrationTestBase.getResourceFilename( + "stateful-scala-udf-migration-itcase-flink" + + migrationVersionAndBackend._1 + "-" + + migrationVersionAndBackend._2 + "-savepoint"), + new Tuple2(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 4) + ) + } + + @SerialVersionUID(1L) + private object CheckpointedSource { + var CHECKPOINTED_STRING = "Here be dragons!" + } + + @SerialVersionUID(1L) + private class CheckpointedSource(val numElements: Int) + extends SourceFunction [(Long, Long)] with CheckpointedFunction { + + private var isRunning = true + private var state: ListState [CustomCaseClass] = _ + + @throws [Exception] + override def run(ctx: SourceFunction.SourceContext [(Long, Long)] ) { + ctx.emitWatermark(new Watermark(0)) + ctx.getCheckpointLock synchronized { + var i = 0 + while (i < numElements) { + { — End diff – Ah, I pasted this part from Java code 😅 will cleanup.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4103

          Fixed failing migration tests when scala-2.11 is used.
          Merging once Travis gives green light!

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4103 Fixed failing migration tests when scala-2.11 is used. Merging once Travis gives green light!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4103

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4103
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Fixed for 1.3.1 via 39c8270d39684765484fa4b6b2711e5714b81b64.
          Fixed for master via 69fada3d0b4c686f29c356f00eb49039f416879f.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Fixed for 1.3.1 via 39c8270d39684765484fa4b6b2711e5714b81b64. Fixed for master via 69fada3d0b4c686f29c356f00eb49039f416879f.

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development