Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Duplicate
-
1.10.0
-
None
-
None
Description
Given the following SQL statement:
tableEnv.sqlQuery("SELECT EVENT_TIME, B, C FROM FOO")
Where FOO is a table originating from a custom StreamTableSource[Row] which implements `DefinedRowtimeAttributes.getRowtimeAttributeDescriptors`, Blink Planner fails to mark the selected field with a `RowtimeAttribute`.
This happens because `TableSourceUtil.getSourceRowType`s implementation receives a `None` TableSource from `CatalogSchemaTable.getRowType`, presumably because the Catalog has yet to create the underlying TableSource which is deferred to implementing TableFactory (in this case my own custom one).
This does not reproduce in the old Flink planner, because the old planner uses `TableSourceTable` which explicitly holds a reference to the underlying `TableSource` and extracts it's row time attributes.
Relevant code:
CatalogSchemaTable:
private static RelDataType getRowType(RelDataTypeFactory typeFactory, CatalogBaseTable catalogBaseTable, boolean isStreamingMode) { final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory; TableSchema tableSchema = catalogBaseTable.getSchema(); final DataType[] fieldDataTypes = tableSchema.getFieldDataTypes(); if (!isStreamingMode && catalogBaseTable instanceof ConnectorCatalogTable && ((ConnectorCatalogTable) catalogBaseTable).getTableSource().isPresent()) { // If the table source is bounded, materialize the time attributes to normal TIMESTAMP type. // Now for ConnectorCatalogTable, there is no way to // deduce if it is bounded in the table environment, so the data types in TableSchema // always patched with TimeAttribute. // See ConnectorCatalogTable#calculateSourceSchema // for details. // Remove the patched time attributes type to let the TableSourceTable handle it. // We should remove this logic if the isBatch flag in ConnectorCatalogTable is fixed. // TODO: Fix FLINK-14844. for (int i = 0; i < fieldDataTypes.length; i++) { LogicalType lt = fieldDataTypes[i].getLogicalType(); if (lt instanceof TimestampType && (((TimestampType) lt).getKind() == TimestampKind.PROCTIME || ((TimestampType) lt).getKind() == TimestampKind.ROWTIME)) { int precision = ((TimestampType) lt).getPrecision(); fieldDataTypes[i] = DataTypes.TIMESTAMP(precision); } } } return TableSourceUtil.getSourceRowType(flinkTypeFactory, tableSchema, scala.Option.empty(), isStreamingMode); }
TableSourceUtil:
def getSourceRowType( typeFactory: FlinkTypeFactory, tableSchema: TableSchema, tableSource: Option[TableSource[_]], streaming: Boolean): RelDataType = { val fieldNames = tableSchema.getFieldNames val fieldDataTypes = tableSchema.getFieldDataTypes if (tableSchema.getWatermarkSpecs.nonEmpty) { getSourceRowType(typeFactory, fieldNames, fieldDataTypes, tableSchema.getWatermarkSpecs.head, streaming) } else if (tableSource.isDefined) { getSourceRowType(typeFactory, fieldNames, fieldDataTypes, tableSource.get, streaming) } else { val fieldTypes = fieldDataTypes.map(fromDataTypeToLogicalType) typeFactory.buildRelNodeRowType(fieldNames, fieldTypes) } }
TableSourceTable:
// We must enrich logical schema from catalog table with physical type coming from table source. // Schema coming from catalog table might not have proper conversion classes. Those must be // extracted from produced type, before converting to RelDataType def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] val fieldNames = tableSchema.getFieldNames val nameMapping: JFunction[String, String] = tableSource match { case mapping: DefinedFieldMapping if mapping.getFieldMapping != null => new JFunction[String, String] { override def apply(t: String): String = mapping.getFieldMapping.get(t) } case _ => JFunction.identity() } val producedDataType = tableSource.getProducedDataType val fieldIndexes = TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers( tableSource, tableSchema.getTableColumns, isStreamingMode, nameMapping ) val typeInfos = if (LogicalTypeChecks.isCompositeType(producedDataType.getLogicalType)) { val physicalSchema = DataTypeUtils.expandCompositeTypeToSchema(producedDataType) fieldIndexes.map(mapIndex(_, idx => TypeConversions.fromDataTypeToLegacyInfo(physicalSchema.getFieldDataType(idx).get())) ) } else { fieldIndexes.map(mapIndex(_, _ => TypeConversions.fromDataTypeToLegacyInfo(producedDataType))) } flinkTypeFactory.buildLogicalRowType(fieldNames, typeInfos) }
Attachments
Issue Links
- is duplicated by
-
FLINK-16160 Schema#proctime and Schema#rowtime don't work in TableEnvironment#connect code path
- Closed