Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
1.3.2, 1.4.0
-
None
Description
Table API operators and UDFs store a reference to the (slf4j) Logger in an instance field (c.f. https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39). This means that the Logger will be serialised with the UDF and sent to the cluster. This in itself does not sound right and leads to problems when the slf4j configuration on the Client is different from the cluster environment.
This is an example of a user running into that problem: https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E. Here, they have Logback on the client but the Logback classes are not available on the cluster and so deserialisation of the UDFs fails with a ClassNotFoundException.
This is a rough list of the involved classes:
src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: private val LOG: Logger = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45: private val LOG: Logger = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28: val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double]) src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62: private val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59: private val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51: private val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28: val LOGICAL: Convention = new Convention.Impl("LOGICAL", classOf[FlinkLogicalRel]) src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38: val LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList( src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala:55: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala:48: val LOG: Logger = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala:66: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala:61: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala:47: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala:67: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala:72: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala:60: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala:40: val LOG: Logger = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala:45: val LOG: Logger = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala:41: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala:44: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala:77: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala:41: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala:43: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala:37: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala:39: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala:39: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala:38: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/MapRunner.scala:36: val LOG = LoggerFactory.getLogger(this.getClass) src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala:37: val LOG = LoggerFactory.getLogger(this.getClass)
Attachments
Attachments
Issue Links
- links to