Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33361

Add Java 17 compatibility to Flink Kafka connector

Details

    Description

      When currently trying to mvn clean install -Dflink.version=1.18.0 -Dscala-2.12 -Prun-end-to-end-tests -DdistDir=/Users/mvisser/Developer/flink-1.18.0 -Dflink.convergence.phase=install -Dlog4j.configurationFile=tools/ci/log4j.properties this fails with errors like:

      [INFO] 
      [INFO] Results:
      [INFO] 
      [ERROR] Errors: 
      [ERROR] FlinkKafkaConsumerBaseMigrationTest.testRestore
      [ERROR]   Run 1: Exception while creating StreamOperatorStateContext.
      [ERROR]   Run 2: Exception while creating StreamOperatorStateContext.
      [ERROR]   Run 3: Exception while creating StreamOperatorStateContext.
      [ERROR]   Run 4: Exception while creating StreamOperatorStateContext.
      [ERROR]   Run 5: Exception while creating StreamOperatorStateContext.
      [ERROR]   Run 6: Exception while creating StreamOperatorStateContext.
      [ERROR]   Run 7: Exception while creating StreamOperatorStateContext.
      [ERROR]   Run 8: Exception while creating StreamOperatorStateContext.
      [ERROR]   Run 9: Exception while creating StreamOperatorStateContext.
      [INFO] 
      [ERROR]   FlinkKafkaConsumerBaseTest.testExplicitStateSerializerCompatibility:721 » Runtime
      [ERROR]   FlinkKafkaConsumerBaseTest.testScaleDown:742->testRescaling:817 » Checkpoint C...
      [ERROR]   FlinkKafkaConsumerBaseTest.testScaleUp:737->testRescaling:817 » Checkpoint Cou...
      [ERROR]   UpsertKafkaDynamicTableFactoryTest.testBufferedTableSink:243 » UncheckedIO jav...
      

      Example stacktrace:

      Test testBufferedTableSink(org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactoryTest) failed with:
      java.io.UncheckedIOException: java.io.IOException: Serializing the source elements failed: java.lang.reflect.InaccessibleObjectException: Unable to make field private final java.lang.Object[] java.util.Arrays$ArrayList.a accessible: module java.base does not "opens java.util" to unnamed module @45b4c3a9
      	at org.apache.flink.streaming.api.functions.source.FromElementsFunction.setOutputType(FromElementsFunction.java:162)
      	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySetOutputType(StreamingFunctionUtils.java:84)
      	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.setOutputType(StreamingFunctionUtils.java:60)
      	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setOutputType(AbstractUdfStreamOperator.java:146)
      	at org.apache.flink.streaming.api.operators.SimpleOperatorFactory.setOutputType(SimpleOperatorFactory.java:118)
      	at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:434)
      	at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:402)
      	at org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:356)
      	at org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInternal(LegacySourceTransformationTranslator.java:66)
      	at org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:53)
      	at org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:40)
      	at org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62)
      	at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:860)
      	at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:590)
      	at org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:881)
      	at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:839)
      	at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:590)
      	at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:328)
      	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2289)
      	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2280)
      	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2266)
      	at org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactoryTest.testBufferedTableSink(UpsertKafkaDynamicTableFactoryTest.java:243)
      	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
      	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
      	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
      	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
      	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
      	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
      	at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258)
      	at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
      	at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
      	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
      	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
      	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
      	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
      	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
      	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
      	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
      	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
      	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
      	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
      	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
      	at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
      	at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
      	at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
      	at org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
      	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
      	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
      	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90)
      	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55)
      	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102)
      	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54)
      	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
      	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
      	at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
      	at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
      	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.lambda$execute$1(JUnitPlatformProvider.java:199)
      	at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
      	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:193)
      	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
      	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:120)
      	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
      	at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
      	at org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
      	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)
      Caused by: java.io.IOException: Serializing the source elements failed: java.lang.reflect.InaccessibleObjectException: Unable to make field private final java.lang.Object[] java.util.Arrays$ArrayList.a accessible: module java.base does not "opens java.util" to unnamed module @45b4c3a9
      	at org.apache.flink.streaming.api.functions.source.FromElementsFunction.serializeElements(FromElementsFunction.java:139)
      	at org.apache.flink.streaming.api.functions.source.FromElementsFunction.setOutputType(FromElementsFunction.java:160)
      	... 68 more
      Caused by: java.lang.RuntimeException: java.lang.reflect.InaccessibleObjectException: Unable to make field private final java.lang.Object[] java.util.Arrays$ArrayList.a accessible: module java.base does not "opens java.util" to unnamed module @45b4c3a9
      	at com.twitter.chill.java.ArraysAsListSerializer.<init>(ArraysAsListSerializer.java:69)
      	at org.apache.flink.api.java.typeutils.runtime.kryo.FlinkChillPackageRegistrar.registerSerializers(FlinkChillPackageRegistrar.java:67)
      	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:513)
      	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:522)
      	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:348)
      	at org.apache.flink.streaming.api.functions.source.FromElementsFunction.serializeElements(FromElementsFunction.java:136)
      	... 69 more
      Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private final java.lang.Object[] java.util.Arrays$ArrayList.a accessible: module java.base does not "opens java.util" to unnamed module @45b4c3a9
      	at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
      	at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
      	at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178)
      	at java.base/java.lang.reflect.Field.setAccessible(Field.java:172)
      	at com.twitter.chill.java.ArraysAsListSerializer.<init>(ArraysAsListSerializer.java:67)
      	... 74 more
      ================================================================================
      

      Attachments

        Issue Links

          Activity

            Sergey Nuyanzin Sergey Nuyanzin added a comment - - edited

            martijnvisser i guess this ticket is connected with FLINK-33305 and corresponding PR
            the idea is that it could be fixed in 2 ways
            1. rewrite complete configuration of surefire plugin
            2. allos for shared utils to specify -- add-opens, --add-exports via a dedicated property like it's done for flink modules, and then on connector's level only this property should be adapted

            Sergey Nuyanzin Sergey Nuyanzin added a comment - - edited martijnvisser i guess this ticket is connected with FLINK-33305 and corresponding PR the idea is that it could be fixed in 2 ways 1. rewrite complete configuration of surefire plugin 2. allos for shared utils to specify -- add-opens , --add-exports via a dedicated property like it's done for flink modules, and then on connector's level only this property should be adapted
            Sergey Nuyanzin Sergey Nuyanzin added a comment - Merged as 825052f55754e401176083c121ffaf38362b7a26

            People

              Sergey Nuyanzin Sergey Nuyanzin
              martijnvisser Martijn Visser
              Votes:
              1 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: