Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-15943

Rowtime field name cannot be the same as the json field

    XMLWordPrintableJSON

Details

    Description

      Run the following sql:

      – sql start 
      --source
      CREATE TABLE dwd_user_log (
        id VARCHAR,
        ctime TIMESTAMP,
        sessionId VARCHAR,
        pageId VARCHAR,
        eventId VARCHAR,
        deviceId Decimal
      ) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.topic' = 'dev_dwd_user_log_02',
        'connector.startup-mode' = 'latest-offset',
        'connector.properties.0.key' = 'zookeeper.connect',
        'connector.properties.0.value' = 'node01:2181',
        'connector.properties.1.key' = 'bootstrap.servers',
        'connector.properties.1.value' = 'node01:9092',
        'connector.properties.2.key' = 'group.id',
        'connector.properties.2.value' = 'dev-group',
        'update-mode' = 'append',
        'format.type' = 'json',
        -- 'format.derive-schema' = 'true',
        'format.json-schema' = '{
          "type": "object",
          "properties": {
            "id": 

      {       "type": "string"       }

      ,
            "ctime": 

      {       "type": "string",       "format": "date-time"       }

      ,
            "pageId": 

      {       "type": "string"       }

      ,
            "eventId": 

      {       "type": "string"       }

      ,
            "sessionId": 

      {       "type": "string"       }

      ,
            "deviceId": 

      {       "type": "number"       }

          }
        }',
        'schema.1.rowtime.timestamps.type' = 'from-field',
        'schema.1.rowtime.timestamps.from' = 'ctime',
        'schema.1.rowtime.watermarks.type' = 'periodic-bounded',
        'schema.1.rowtime.watermarks.delay' = '10000'
      );  

      -- sink
      -- sink for pv
      CREATE TABLE dws_pv (
          windowStart TIMESTAMP,
        windowEnd TIMESTAMP,
        pageId VARCHAR,
        viewCount BIGINT
      ) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.topic' = 'dev_dws_pvuv_02',
        'connector.startup-mode' = 'latest-offset',
        'connector.properties.0.key' = 'zookeeper.connect',
        'connector.properties.0.value' = 'node01:2181',
        'connector.properties.1.key' = 'bootstrap.servers',
        'connector.properties.1.value' = 'node01:9092',
        'connector.properties.2.key' = 'group.id',
        'connector.properties.2.value' = 'dev-group',
        'update-mode' = 'append',
        'format.type' = 'json',
        'format.derive-schema' = 'true'
      );

      -- pv
      INSERT INTO dws_pv
      SELECT
        TUMBLE_START(ctime, INTERVAL '20' SECOND)  AS windowStart,
        TUMBLE_END(ctime, INTERVAL '20' SECOND)  AS windowEnd,
        pageId,
        COUNT(deviceId) AS viewCount
      FROM dwd_user_log
      GROUP BY TUMBLE(ctime, INTERVAL '20' SECOND),pageId;
      – sql end
      And hit the following error:

      //Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'ctime' could not be resolved by the field mapping.Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'ctime' could not be resolved by the field mapping. at org.apache.flink.table.planner.sources.TableSourceUtil$.org$apache$flink$table$planner$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:357) at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$org$apache$flink$table$planner$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:388) at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$org$apache$flink$table$planner$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:388) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.flink.table.planner.sources.TableSourceUtil$.org$apache$flink$table$planner$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:388) at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$getRowtimeExtractionExpression$1.apply(TableSourceUtil.scala:275) at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$getRowtimeExtractionExpression$1.apply(TableSourceUtil.scala:270) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:270) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:117) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            gkgkgk gkgkgk
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: