Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-17600

Blink Planner fails to generate RowtimeAttribute based on TableSource's DefinedRowtimeAttributes implementation

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 1.10.0
    • None
    • Table SQL / Planner
    • None

    Description

      Given the following SQL statement: 

      tableEnv.sqlQuery("SELECT EVENT_TIME, B, C FROM FOO")

      Where FOO is a table originating from a custom StreamTableSource[Row] which implements `DefinedRowtimeAttributes.getRowtimeAttributeDescriptors`, Blink Planner fails to mark the selected field with a `RowtimeAttribute`.

      This happens because `TableSourceUtil.getSourceRowType`s implementation receives a `None` TableSource from `CatalogSchemaTable.getRowType`, presumably because the Catalog has yet to create the underlying TableSource which is deferred to implementing TableFactory (in this case my own custom one).

      This does not reproduce in the old Flink planner, because the old planner uses `TableSourceTable` which explicitly holds a reference to the underlying `TableSource` and extracts it's row time attributes.

      Relevant code:

      CatalogSchemaTable:

       

      private static RelDataType getRowType(RelDataTypeFactory typeFactory,
            CatalogBaseTable catalogBaseTable,
            boolean isStreamingMode) {
         final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory;
         TableSchema tableSchema = catalogBaseTable.getSchema();
         final DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
         if (!isStreamingMode
            && catalogBaseTable instanceof ConnectorCatalogTable
            && ((ConnectorCatalogTable) catalogBaseTable).getTableSource().isPresent()) {
            // If the table source is bounded, materialize the time attributes to normal TIMESTAMP type.
            // Now for ConnectorCatalogTable, there is no way to
            // deduce if it is bounded in the table environment, so the data types in TableSchema
            // always patched with TimeAttribute.
            // See ConnectorCatalogTable#calculateSourceSchema
            // for details.
      
            // Remove the patched time attributes type to let the TableSourceTable handle it.
            // We should remove this logic if the isBatch flag in ConnectorCatalogTable is fixed.
            // TODO: Fix FLINK-14844.
            for (int i = 0; i < fieldDataTypes.length; i++) {
               LogicalType lt = fieldDataTypes[i].getLogicalType();
               if (lt instanceof TimestampType
                  && (((TimestampType) lt).getKind() == TimestampKind.PROCTIME
                  || ((TimestampType) lt).getKind() == TimestampKind.ROWTIME)) {
                  int precision = ((TimestampType) lt).getPrecision();
                  fieldDataTypes[i] = DataTypes.TIMESTAMP(precision);
               }
            }
         }
         return TableSourceUtil.getSourceRowType(flinkTypeFactory,
            tableSchema,
            scala.Option.empty(),
            isStreamingMode);
      }
      

      TableSourceUtil:

       

       

      def getSourceRowType(
          typeFactory: FlinkTypeFactory,
          tableSchema: TableSchema,
          tableSource: Option[TableSource[_]],
          streaming: Boolean): RelDataType = {
      
        val fieldNames = tableSchema.getFieldNames
        val fieldDataTypes = tableSchema.getFieldDataTypes
      
        if (tableSchema.getWatermarkSpecs.nonEmpty) {
          getSourceRowType(typeFactory, fieldNames, fieldDataTypes, tableSchema.getWatermarkSpecs.head,
            streaming)
        } else if (tableSource.isDefined) {
          getSourceRowType(typeFactory, fieldNames, fieldDataTypes, tableSource.get,
            streaming)
        } else {
          val fieldTypes = fieldDataTypes.map(fromDataTypeToLogicalType)
          typeFactory.buildRelNodeRowType(fieldNames, fieldTypes)
        }
      }

      TableSourceTable:

       // We must enrich logical schema from catalog table with physical type coming from table source.
        // Schema coming from catalog table might not have proper conversion classes. Those must be
        // extracted from produced type, before converting to RelDataType
        def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {    val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]    val fieldNames = tableSchema.getFieldNames    val nameMapping: JFunction[String, String] = tableSource match {
            case mapping: DefinedFieldMapping if mapping.getFieldMapping != null =>
              new JFunction[String, String] {
                override def apply(t: String): String = mapping.getFieldMapping.get(t)
              }
            case _ => JFunction.identity()
          }    val producedDataType = tableSource.getProducedDataType
          val fieldIndexes = TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
            tableSource,
            tableSchema.getTableColumns,
            isStreamingMode,
            nameMapping
          )    val typeInfos = if (LogicalTypeChecks.isCompositeType(producedDataType.getLogicalType)) {
            val physicalSchema = DataTypeUtils.expandCompositeTypeToSchema(producedDataType)
            fieldIndexes.map(mapIndex(_,
              idx =>
                TypeConversions.fromDataTypeToLegacyInfo(physicalSchema.getFieldDataType(idx).get()))
            )
          } else {
            fieldIndexes.map(mapIndex(_, _ => TypeConversions.fromDataTypeToLegacyInfo(producedDataType)))
          }    flinkTypeFactory.buildLogicalRowType(fieldNames, typeInfos)
        }
      

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              Yuval.Itzchakov Yuval Itzchakov
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: