Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7398

Table API operators/UDFs must not store Logger

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • 1.3.2, 1.4.0
    • 1.3.4, 1.4.0
    • Table SQL / API
    • None

    Description

      Table API operators and UDFs store a reference to the (slf4j) Logger in an instance field (c.f. https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39). This means that the Logger will be serialised with the UDF and sent to the cluster. This in itself does not sound right and leads to problems when the slf4j configuration on the Client is different from the cluster environment.

      This is an example of a user running into that problem: https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E. Here, they have Logback on the client but the Logback classes are not available on the cluster and so deserialisation of the UDFs fails with a ClassNotFoundException.

      This is a rough list of the involved classes:

      src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43:  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45:  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28:  val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double])
      src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62:  private val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59:  private val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51:  private val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28:  val LOGICAL: Convention = new Convention.Impl("LOGICAL", classOf[FlinkLogicalRel])
      src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38:  val LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
      src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala:55:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala:48:  val LOG: Logger = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala:66:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala:61:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala:47:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala:67:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala:72:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala:60:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala:40:  val LOG: Logger = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala:45:  val LOG: Logger = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala:41:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala:44:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala:77:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala:41:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala:43:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala:37:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala:39:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala:39:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala:38:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/MapRunner.scala:36:  val LOG = LoggerFactory.getLogger(this.getClass)
      src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala:37:  val LOG = LoggerFactory.getLogger(this.getClass)
      

      Attachments

        1. Example.png
          820 kB
          Jacob Park

        Issue Links

          Activity

            People

              wheat9 Haohui Mai
              aljoscha Aljoscha Krettek
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: