Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6948

EnumValueSerializer cannot handle removed enum values

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 1.3.1, 1.4.0
    • Fix Version/s: 1.3.1
    • Labels:
      None

      Description

      The EnumValueSerializer cannot handle removed enum values or enum values whose ordinal value have been changed. We should try to detect these cases and require migration. The problem is that we take the values of the new enum and iterate over them without checking that all old enum values contained in the config snapshot are actually contained.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/4142

          FLINK-6948 Harden EnumValueSerializer to detect changed enum indices

          This PR changes the seriailization format of the ScalaEnumSerializerConfigSnapshot to also include the
          ordinal value of an enum value when being deserialized. This allows to detect if the ordinal values
          have been changed and, thus, if migration is required.

          IMPORTANT: This PR changes the serialization format of ScalaEnumSerializerConfigSnapshot.

          cc @tzulitai

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink hardenEnumValueSerializerTest

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4142.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4142


          commit 80b2d61b58be06f9e3f40b35f0b215421ad86890
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-06-13T21:26:10Z

          FLINK-6836 [tests] Fix YARNSessionCapacitySchedulerITCase to work with Hadoop 2.6.5, 2.7.3 and 2.8.0

          Due to MNG-5899, maven cannot resolve dependency reduced poms in a multi project build. Therefore,
          flink-yarn-tests pulls in a wrong version of org.apache.httpcomponents.httpclient which does not work
          with Hadoop's ServletUtils together. As a solution we have to move the dependency management for the
          httpclient and httpcore version into the parent pom.xml.

          Another problem is the version of these libraries which has been recently bumped. In 4.4, httpclient
          changed its behaviour such that URLEncodedUtils#parse(String, Charset) now throws a NPE if the first
          parameter is null. In 4.2.6, an empty list was returned instead. Due to this incompatibility, we reverted
          the change and set the version to its previous value.

          Bump httpclient to 4.5.3 and httpcore to 4.4.6

          This closes #4120.

          commit 7c7fe493f2dd766a8ddf3de240f9447f857b423a
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-06-14T14:53:49Z

          FLINK-6921 [serializer] Allow EnumValueSerializer to deal with appended enum values

          The problem was that we don't check the bounds of the array with the enum names contained
          in the ScalaEnumSerializerConfigSnapshot.

          This PR also adds an Enumeration upgrade test which makes sure that appended fields are
          supported without migration. Moreover, it checks that a field removal and an order change
          leads to a required migration.

          This closes #4126.

          commit 9576530a0c5c3fe866bf87f126b29eb5acb56ba2
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-06-19T10:49:05Z

          FLINK-6948 Harden EnumValueSerializer to detect changed enum indices

          This PR changes the seriailization format of the ScalaEnumSerializerConfigSnapshot to also include the
          ordinal value of an enum value when being deserialized. This allows to detect if the ordinal values
          have been changed and, thus, if migration is required.

          IMPORTANT: This PR changes the serialization format of ScalaEnumSerializerConfigSnapshot.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4142 FLINK-6948 Harden EnumValueSerializer to detect changed enum indices This PR changes the seriailization format of the ScalaEnumSerializerConfigSnapshot to also include the ordinal value of an enum value when being deserialized. This allows to detect if the ordinal values have been changed and, thus, if migration is required. IMPORTANT: This PR changes the serialization format of ScalaEnumSerializerConfigSnapshot. cc @tzulitai You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink hardenEnumValueSerializerTest Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4142.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4142 commit 80b2d61b58be06f9e3f40b35f0b215421ad86890 Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-06-13T21:26:10Z FLINK-6836 [tests] Fix YARNSessionCapacitySchedulerITCase to work with Hadoop 2.6.5, 2.7.3 and 2.8.0 Due to MNG-5899 , maven cannot resolve dependency reduced poms in a multi project build. Therefore, flink-yarn-tests pulls in a wrong version of org.apache.httpcomponents.httpclient which does not work with Hadoop's ServletUtils together. As a solution we have to move the dependency management for the httpclient and httpcore version into the parent pom.xml. Another problem is the version of these libraries which has been recently bumped. In 4.4, httpclient changed its behaviour such that URLEncodedUtils#parse(String, Charset) now throws a NPE if the first parameter is null. In 4.2.6, an empty list was returned instead. Due to this incompatibility, we reverted the change and set the version to its previous value. Bump httpclient to 4.5.3 and httpcore to 4.4.6 This closes #4120. commit 7c7fe493f2dd766a8ddf3de240f9447f857b423a Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-06-14T14:53:49Z FLINK-6921 [serializer] Allow EnumValueSerializer to deal with appended enum values The problem was that we don't check the bounds of the array with the enum names contained in the ScalaEnumSerializerConfigSnapshot. This PR also adds an Enumeration upgrade test which makes sure that appended fields are supported without migration. Moreover, it checks that a field removal and an order change leads to a required migration. This closes #4126. commit 9576530a0c5c3fe866bf87f126b29eb5acb56ba2 Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-06-19T10:49:05Z FLINK-6948 Harden EnumValueSerializer to detect changed enum indices This PR changes the seriailization format of the ScalaEnumSerializerConfigSnapshot to also include the ordinal value of an enum value when being deserialized. This allows to detect if the ordinal values have been changed and, thus, if migration is required. IMPORTANT: This PR changes the serialization format of ScalaEnumSerializerConfigSnapshot.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4142#discussion_r122682551

          — Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala —
          @@ -150,8 +162,29 @@ object EnumValueSerializer {
          enumClass = InstantiationUtil.deserializeObject(
          inViewWrapper, getUserCodeClassLoader)
          — End diff –

          Since we're changing the serialization format, we might as well also exclude Java serialization here for good now. Otherwise we'll need to bump the version again afterwards.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4142#discussion_r122682551 — Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala — @@ -150,8 +162,29 @@ object EnumValueSerializer { enumClass = InstantiationUtil.deserializeObject( inViewWrapper, getUserCodeClassLoader) — End diff – Since we're changing the serialization format, we might as well also exclude Java serialization here for good now. Otherwise we'll need to bump the version again afterwards.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4142#discussion_r122684692

          — Diff: flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerTest.scala —
          @@ -0,0 +1,119 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.scala.typeutils
          +
          +import java.io._
          +
          +import org.apache.flink.api.scala.typeutils.EnumValueSerializer.ScalaEnumSerializerConfigSnapshot
          +import org.apache.flink.core.testutils.CommonTestUtils
          +import org.apache.flink.runtime.util.

          {DataInputDeserializer, DataOutputSerializer}

          +import org.apache.flink.util.TestLogger
          +import org.junit.

          {Ignore, Test}

          +import org.junit.Assert._
          +import org.scalatest.junit.JUnitSuiteLike
          +
          +class EnumValueSerializerTest extends TestLogger with JUnitSuiteLike {
          +
          + /**
          + * Tests that the snapshot configuration can be created and that the serializer
          + * is compatible when being called with the created serializer snapshot
          + *
          + * FLINK-6914
          — End diff –

          nit: I would use `@see` for this

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4142#discussion_r122684692 — Diff: flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerTest.scala — @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.typeutils + +import java.io._ + +import org.apache.flink.api.scala.typeutils.EnumValueSerializer.ScalaEnumSerializerConfigSnapshot +import org.apache.flink.core.testutils.CommonTestUtils +import org.apache.flink.runtime.util. {DataInputDeserializer, DataOutputSerializer} +import org.apache.flink.util.TestLogger +import org.junit. {Ignore, Test} +import org.junit.Assert._ +import org.scalatest.junit.JUnitSuiteLike + +class EnumValueSerializerTest extends TestLogger with JUnitSuiteLike { + + /** + * Tests that the snapshot configuration can be created and that the serializer + * is compatible when being called with the created serializer snapshot + * + * FLINK-6914 — End diff – nit: I would use `@see` for this
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4142#discussion_r122687300

          — Diff: flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala —
          @@ -0,0 +1,219 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.scala.typeutils
          +
          +import java.io._
          +import java.net.

          {URL, URLClassLoader}

          +
          +import org.apache.flink.api.common.typeutils.

          {CompatibilityResult, TypeSerializerSerializationUtil}

          +import org.apache.flink.core.memory.

          {DataInputViewStreamWrapper, DataOutputViewStreamWrapper}

          +import org.apache.flink.util.TestLogger
          +import org.junit.rules.TemporaryFolder
          +import org.junit.

          {Rule, Test}

          +import org.junit.Assert._
          +import org.scalatest.junit.JUnitSuiteLike
          +
          +import scala.reflect.NameTransformer
          +import scala.tools.nsc.reporters.ConsoleReporter
          +import scala.tools.nsc.

          {GenericRunnerSettings, Global}

          +
          +class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
          +
          + private val _tempFolder = new TemporaryFolder()
          +
          + @Rule
          + def tempFolder = _tempFolder
          +
          + val enumName = "EnumValueSerializerUpgradeTestEnum"
          +
          + val enumA =
          + s"""
          + |@SerialVersionUID(1L)
          + |object $enumName extends Enumeration

          { + | val A, B, C = Value + |}

          + """.stripMargin
          +
          + val enumB =
          + s"""
          + |@SerialVersionUID(1L)
          + |object $enumName extends Enumeration

          { + | val A, B, C, D = Value + |}

          + """.stripMargin
          +
          + val enumC =
          + s"""
          + |@SerialVersionUID(1L)
          + |object $enumName extends Enumeration

          { + | val A, C = Value + |}

          + """.stripMargin
          +
          + val enumD =
          + s"""
          + |@SerialVersionUID(1L)
          + |object $enumName extends Enumeration

          { + | val A, C, B = Value + |}

          + """.stripMargin
          +
          + val enumE =
          + s"""
          + |@SerialVersionUID(1L)
          + |object $enumName extends Enumeration

          { + | val A = Value(42) + | val B = Value(5) + | val C = Value(1337) + |}

          + """.stripMargin
          +
          + /**
          + * Check that identical enums don't require migration
          + */
          + @Test
          + def checkIdenticalEnums(): Unit =

          { + assertFalse(checkCompatibility(enumA, enumA).isRequiresMigration) + }

          +
          + /**
          + * Check that appending fields to the enum does not require migration
          + */
          + @Test
          + def checkAppendedField(): Unit =

          { + assertFalse(checkCompatibility(enumA, enumB).isRequiresMigration) + }

          +
          + /**
          + * Check that removing enum fields requires migration
          + */
          + @Test
          + def checkRemovedField(): Unit =

          { + assertTrue(checkCompatibility(enumA, enumC).isRequiresMigration) + }

          +
          + /**
          + * Check that changing the enum field order requires migration
          + */
          + @Test
          + def checkDifferentFieldOrder(): Unit =

          { + assertTrue(checkCompatibility(enumA, enumD).isRequiresMigration) + }

          +
          + /**
          + * Check that changing the enum ids causes a migration
          + */
          + @Test
          + def checkDifferentIds(): Unit =

          { + assertTrue( + "Different ids should cause a migration.", + checkCompatibility(enumA, enumE).isRequiresMigration) + }

          +
          + def checkCompatibility(enumSourceA: String, enumSourceB: String)
          + : CompatibilityResultEnumeration#Value =

          { + import EnumValueSerializerUpgradeTest._ + + val classLoader = compileAndLoadEnum(tempFolder.newFolder(), s"$enumName.scala", enumSourceA) + + val enum = instantiateEnum[Enumeration](classLoader, enumName) + + val enumValueSerializer = new EnumValueSerializer(enum) + val snapshot = enumValueSerializer.snapshotConfiguration() + + val baos = new ByteArrayOutputStream() + val output = new DataOutputViewStreamWrapper(baos) + TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(output, snapshot) + + output.close() + baos.close() + + val bais = new ByteArrayInputStream(baos.toByteArray) + val input= new DataInputViewStreamWrapper(bais) + + val classLoader2 = compileAndLoadEnum(tempFolder.newFolder(), s"$enumName.scala", enumSourceB) + + val snapshot2 = TypeSerializerSerializationUtil.readSerializerConfigSnapshot( + input, + classLoader2) + val enum2 = instantiateEnum[Enumeration](classLoader2, enumName) + + val enumValueSerializer2 = new EnumValueSerializer(enum2) + enumValueSerializer2.ensureCompatibility(snapshot2) + }

          +}
          +
          +object EnumValueSerializerUpgradeTest {
          + def compileAndLoadEnum(root: File, filename: String, source: String): ClassLoader =

          { + val file = writeSourceFile(root, filename, source) + + compileScalaFile(file) + + new URLClassLoader( + Array[URL](root.toURI.toURL), + Thread.currentThread().getContextClassLoader) + }

          +
          + def instantiateEnum[T <: Enumeration](classLoader: ClassLoader, enumName: String): T =

          { + val clazz = classLoader.loadClass(enumName + "$").asInstanceOf[Class[_ <: Enumeration]] + val field = clazz.getField(NameTransformer.MODULE_INSTANCE_NAME) + + field.get(null).asInstanceOf[T] + }

          +
          + def writeSourceFile(root: File, filename: String, source: String): File =

          { + val file = new File(root, filename) + val fileWriter = new FileWriter(file) + + fileWriter.write(source) + + fileWriter.close() + + file + }

          +
          + def compileScalaFile(file: File): Unit = {
          + val in = new BufferedReader(new StringReader(""))
          + val out = new PrintWriter(new BufferedWriter(
          + new OutputStreamWriter(System.out)))
          +
          + val settings = new GenericRunnerSettings(out.println _)
          +
          + val classLoader = Thread.currentThread().getContextClassLoader
          +
          + val urls = classLoader match

          { + case urlClassLoader: URLClassLoader => + urlClassLoader.getURLs + case x => throw new IllegalStateException(s"Not possible to extract URLs " + + s"from class loader $x.") + }

          +
          + settings.classpath.value = urls.map(_.toString).mkString(java.io.File.pathSeparator)
          + settings.outdir.value = file.getParent
          +
          + val reporter = new ConsoleReporter(settings)
          + val global = new Global(settings, reporter)
          + val run = new global.Run
          +
          + run.compile(List(file.getAbsolutePath))
          +
          + reporter.printSummary()
          — End diff –

          Would this add too much unnecessary output to the test log?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4142#discussion_r122687300 — Diff: flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala — @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.typeutils + +import java.io._ +import java.net. {URL, URLClassLoader} + +import org.apache.flink.api.common.typeutils. {CompatibilityResult, TypeSerializerSerializationUtil} +import org.apache.flink.core.memory. {DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.util.TestLogger +import org.junit.rules.TemporaryFolder +import org.junit. {Rule, Test} +import org.junit.Assert._ +import org.scalatest.junit.JUnitSuiteLike + +import scala.reflect.NameTransformer +import scala.tools.nsc.reporters.ConsoleReporter +import scala.tools.nsc. {GenericRunnerSettings, Global} + +class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike { + + private val _tempFolder = new TemporaryFolder() + + @Rule + def tempFolder = _tempFolder + + val enumName = "EnumValueSerializerUpgradeTestEnum" + + val enumA = + s""" + |@SerialVersionUID(1L) + |object $enumName extends Enumeration { + | val A, B, C = Value + |} + """.stripMargin + + val enumB = + s""" + |@SerialVersionUID(1L) + |object $enumName extends Enumeration { + | val A, B, C, D = Value + |} + """.stripMargin + + val enumC = + s""" + |@SerialVersionUID(1L) + |object $enumName extends Enumeration { + | val A, C = Value + |} + """.stripMargin + + val enumD = + s""" + |@SerialVersionUID(1L) + |object $enumName extends Enumeration { + | val A, C, B = Value + |} + """.stripMargin + + val enumE = + s""" + |@SerialVersionUID(1L) + |object $enumName extends Enumeration { + | val A = Value(42) + | val B = Value(5) + | val C = Value(1337) + |} + """.stripMargin + + /** + * Check that identical enums don't require migration + */ + @Test + def checkIdenticalEnums(): Unit = { + assertFalse(checkCompatibility(enumA, enumA).isRequiresMigration) + } + + /** + * Check that appending fields to the enum does not require migration + */ + @Test + def checkAppendedField(): Unit = { + assertFalse(checkCompatibility(enumA, enumB).isRequiresMigration) + } + + /** + * Check that removing enum fields requires migration + */ + @Test + def checkRemovedField(): Unit = { + assertTrue(checkCompatibility(enumA, enumC).isRequiresMigration) + } + + /** + * Check that changing the enum field order requires migration + */ + @Test + def checkDifferentFieldOrder(): Unit = { + assertTrue(checkCompatibility(enumA, enumD).isRequiresMigration) + } + + /** + * Check that changing the enum ids causes a migration + */ + @Test + def checkDifferentIds(): Unit = { + assertTrue( + "Different ids should cause a migration.", + checkCompatibility(enumA, enumE).isRequiresMigration) + } + + def checkCompatibility(enumSourceA: String, enumSourceB: String) + : CompatibilityResult Enumeration#Value = { + import EnumValueSerializerUpgradeTest._ + + val classLoader = compileAndLoadEnum(tempFolder.newFolder(), s"$enumName.scala", enumSourceA) + + val enum = instantiateEnum[Enumeration](classLoader, enumName) + + val enumValueSerializer = new EnumValueSerializer(enum) + val snapshot = enumValueSerializer.snapshotConfiguration() + + val baos = new ByteArrayOutputStream() + val output = new DataOutputViewStreamWrapper(baos) + TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(output, snapshot) + + output.close() + baos.close() + + val bais = new ByteArrayInputStream(baos.toByteArray) + val input= new DataInputViewStreamWrapper(bais) + + val classLoader2 = compileAndLoadEnum(tempFolder.newFolder(), s"$enumName.scala", enumSourceB) + + val snapshot2 = TypeSerializerSerializationUtil.readSerializerConfigSnapshot( + input, + classLoader2) + val enum2 = instantiateEnum[Enumeration](classLoader2, enumName) + + val enumValueSerializer2 = new EnumValueSerializer(enum2) + enumValueSerializer2.ensureCompatibility(snapshot2) + } +} + +object EnumValueSerializerUpgradeTest { + def compileAndLoadEnum(root: File, filename: String, source: String): ClassLoader = { + val file = writeSourceFile(root, filename, source) + + compileScalaFile(file) + + new URLClassLoader( + Array[URL](root.toURI.toURL), + Thread.currentThread().getContextClassLoader) + } + + def instantiateEnum [T <: Enumeration] (classLoader: ClassLoader, enumName: String): T = { + val clazz = classLoader.loadClass(enumName + "$").asInstanceOf[Class[_ <: Enumeration]] + val field = clazz.getField(NameTransformer.MODULE_INSTANCE_NAME) + + field.get(null).asInstanceOf[T] + } + + def writeSourceFile(root: File, filename: String, source: String): File = { + val file = new File(root, filename) + val fileWriter = new FileWriter(file) + + fileWriter.write(source) + + fileWriter.close() + + file + } + + def compileScalaFile(file: File): Unit = { + val in = new BufferedReader(new StringReader("")) + val out = new PrintWriter(new BufferedWriter( + new OutputStreamWriter(System.out))) + + val settings = new GenericRunnerSettings(out.println _) + + val classLoader = Thread.currentThread().getContextClassLoader + + val urls = classLoader match { + case urlClassLoader: URLClassLoader => + urlClassLoader.getURLs + case x => throw new IllegalStateException(s"Not possible to extract URLs " + + s"from class loader $x.") + } + + settings.classpath.value = urls.map(_.toString).mkString(java.io.File.pathSeparator) + settings.outdir.value = file.getParent + + val reporter = new ConsoleReporter(settings) + val global = new Global(settings, reporter) + val run = new global.Run + + run.compile(List(file.getAbsolutePath)) + + reporter.printSummary() — End diff – Would this add too much unnecessary output to the test log?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4142#discussion_r122686133

          — Diff: flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerTest.scala —
          @@ -0,0 +1,119 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.scala.typeutils
          +
          +import java.io._
          +
          +import org.apache.flink.api.scala.typeutils.EnumValueSerializer.ScalaEnumSerializerConfigSnapshot
          +import org.apache.flink.core.testutils.CommonTestUtils
          +import org.apache.flink.runtime.util.

          {DataInputDeserializer, DataOutputSerializer}

          +import org.apache.flink.util.TestLogger
          +import org.junit.

          {Ignore, Test}

          +import org.junit.Assert._
          +import org.scalatest.junit.JUnitSuiteLike
          +
          +class EnumValueSerializerTest extends TestLogger with JUnitSuiteLike {
          +
          + /**
          + * Tests that the snapshot configuration can be created and that the serializer
          + * is compatible when being called with the created serializer snapshot
          + *
          + * FLINK-6914
          + */
          + @Test
          + def testEnumValueSerializerEnsureCompatibilityIdempotency()

          { + val enumSerializer = new EnumValueSerializer(Letters) + + val snapshot = enumSerializer.snapshotConfiguration() + + assertFalse(enumSerializer.ensureCompatibility(snapshot).isRequiresMigration) + }

          +
          + @Ignore("This test case is only used to create a " +
          + "TypeSerializerConfigurationSnapshot for the EnumValueSerializer")
          + @Test
          + def createEnumValueSerializerConfigSnapshot(): Unit = {
          + val enumValueSerializer = new EnumValueSerializer(Letters)
          +
          + val snapshot = enumValueSerializer.snapshotConfiguration()
          +
          + val out = new DataOutputSerializer(128)
          +
          + snapshot.write(out)
          + val buffer = out.getCopyOfBuffer
          +
          + val tmpDir = CommonTestUtils.getTempDir
          + val configFile = new File(tmpDir, "EnumValueSerializerConfigSnapshot-1.3.1")
          +
          + log.info(s"Writing EnumValueSerializerConfigSnapshot-1.3.1 to $configFile")
          +
          + val outputStream = new DataOutputStream(new FileOutputStream(configFile))
          +
          + try

          { + outputStream.writeInt(buffer.length) + outputStream.write(buffer) + }

          finally

          { + outputStream.close() + }

          + }
          +
          + /**
          + * Check backwards compatibility between 1.3.1 and 1.3.2 since the
          + * ScalaEnumSerializerConfigSnapshot format changed.
          + *
          + * FLINK-694X
          — End diff –

          nit: I would use `@see` for this.
          Also, directly refer to FLINK-6948, because that's the exact issue that required the format change.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4142#discussion_r122686133 — Diff: flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerTest.scala — @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.typeutils + +import java.io._ + +import org.apache.flink.api.scala.typeutils.EnumValueSerializer.ScalaEnumSerializerConfigSnapshot +import org.apache.flink.core.testutils.CommonTestUtils +import org.apache.flink.runtime.util. {DataInputDeserializer, DataOutputSerializer} +import org.apache.flink.util.TestLogger +import org.junit. {Ignore, Test} +import org.junit.Assert._ +import org.scalatest.junit.JUnitSuiteLike + +class EnumValueSerializerTest extends TestLogger with JUnitSuiteLike { + + /** + * Tests that the snapshot configuration can be created and that the serializer + * is compatible when being called with the created serializer snapshot + * + * FLINK-6914 + */ + @Test + def testEnumValueSerializerEnsureCompatibilityIdempotency() { + val enumSerializer = new EnumValueSerializer(Letters) + + val snapshot = enumSerializer.snapshotConfiguration() + + assertFalse(enumSerializer.ensureCompatibility(snapshot).isRequiresMigration) + } + + @Ignore("This test case is only used to create a " + + "TypeSerializerConfigurationSnapshot for the EnumValueSerializer") + @Test + def createEnumValueSerializerConfigSnapshot(): Unit = { + val enumValueSerializer = new EnumValueSerializer(Letters) + + val snapshot = enumValueSerializer.snapshotConfiguration() + + val out = new DataOutputSerializer(128) + + snapshot.write(out) + val buffer = out.getCopyOfBuffer + + val tmpDir = CommonTestUtils.getTempDir + val configFile = new File(tmpDir, "EnumValueSerializerConfigSnapshot-1.3.1") + + log.info(s"Writing EnumValueSerializerConfigSnapshot-1.3.1 to $configFile") + + val outputStream = new DataOutputStream(new FileOutputStream(configFile)) + + try { + outputStream.writeInt(buffer.length) + outputStream.write(buffer) + } finally { + outputStream.close() + } + } + + /** + * Check backwards compatibility between 1.3.1 and 1.3.2 since the + * ScalaEnumSerializerConfigSnapshot format changed. + * + * FLINK-694 X — End diff – nit: I would use `@see` for this. Also, directly refer to FLINK-6948 , because that's the exact issue that required the format change.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4142

          The tests are failing due to checkstyle.
          I'll fix that and address my comments, and run tests locally. If green, will proceed to merge this for `master` and `release-1.3`

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4142 The tests are failing due to checkstyle. I'll fix that and address my comments, and run tests locally. If green, will proceed to merge this for `master` and `release-1.3`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4142

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4142
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Fixed for master via 228faf8c2a113de0dea366668e87484dbe7ec8b5.
          Fixed for 1.3.1 via c10fb2bbb42f255dc14ded16d754db395385d441.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Fixed for master via 228faf8c2a113de0dea366668e87484dbe7ec8b5. Fixed for 1.3.1 via c10fb2bbb42f255dc14ded16d754db395385d441.

            People

            • Assignee:
              till.rohrmann Till Rohrmann
              Reporter:
              till.rohrmann Till Rohrmann
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development