Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-25084

Field names must be unique. Found duplicates

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 1.13.2
    • 1.15.0, 1.13.6, 1.14.3
    • API / DataStream
    • None
    • AWS Kinesis Application in Zeppelin
      Apache Flink 1.13, Apache Zeppelin 0.9
       

    Description

      I am getting a "Field names must be unique. Found duplicates" error when trying to aggregate a column used as a descriptor in HOP windowing.

      Imagine this example, with events_table reading from kinesis stream, the definition given below, I am getting the "Field names must be unique. Found duplicates: [ts]" when trying to run the following SQL in Kinesis Data Analytics Application in Zeppelin:

      %flink.ssql(type=update)
      
      -- insert into learn_actions_deduped 
      SELECT window_start, window_end, uuid, event_type, max(ts) as max_event_ts
      FROM TABLE(HOP(TABLE events_table, DESCRIPTOR(ts), INTERVAL '5' SECONDS, INTERVAL '15' MINUTES))
      GROUP BY window_start, window_end, uuid, event_type;
      

      The question is how can I use the descriptor column in aggregation without having to duplicate it?

      The error details:
      java.io.IOException: Fail to run stream sql job
      at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:172)
      at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:105)
      at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89)
      at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:503)
      at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:266)
      at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
      at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
      at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
      at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
      at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
      at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
      at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
      at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
      at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: java.lang.RuntimeException: Error while applying rule PullUpWindowTableFunctionIntoWindowAggregateRule, args [rel#1172:StreamPhysicalWindowAggregate.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#1170,groupBy=uuid, event_type,window=HOP(win_start=[window_start], win_end=[window_end], size=[15 min], slide=[5 s]),select=uuid, event_type, MAX(ts) AS max_event_ts, start('w$) AS window_start, end('w$) AS window_end), rel#1179:StreamPhysicalExchange.STREAM_PHYSICAL.hash[2, 3]true.None: 0.[NONE].[NONE](input=RelSubset#1169,distribution=hash[uuid, event_type]), rel#1168:StreamPhysicalCalc.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#1167,select=window_start, window_end, uuid, event_type, CAST(ts) AS ts), rel#1166:StreamPhysicalWindowTableFunction.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#1165,window=HOP(time_col=[ts], size=[15 min], slide=[5 s]))]
      at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
      at org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
      at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
      at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
      at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69)
      at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
      at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
      at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
      at scala.collection.Iterator.foreach(Iterator.scala:937)
      at scala.collection.Iterator.foreach$(Iterator.scala:937)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
      at scala.collection.IterableLike.foreach(IterableLike.scala:70)
      at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
      at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
      at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
      at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
      at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
      at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
      at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
      at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
      at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
      at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
      at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
      at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
      at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1510)
      at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1460)
      at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:161)
      ... 16 more
      Caused by: java.lang.RuntimeException: Error occurred while applying rule PullUpWindowTableFunctionIntoWindowAggregateRule
      at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:161)
      at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:268)
      at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:283)
      at org.apache.flink.table.planner.plan.rules.physical.stream.PullUpWindowTableFunctionIntoWindowAggregateRule.onMatch(PullUpWindowTableFunctionIntoWindowAggregateRule.scala:143)
      at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229)
      ... 42 more
      Caused by: org.apache.flink.table.api.ValidationException: Field names must be unique. Found duplicates: [ts]
      at org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:272)
      at org.apache.flink.table.types.logical.RowType.(RowType.java:157)
      at org.apache.flink.table.types.logical.RowType.of(RowType.java:297)
      at org.apache.flink.table.types.logical.RowType.of(RowType.java:289)
      at org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:657)
      at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.aggInfoList$lzycompute(StreamPhysicalWindowAggregate.scala:60)
      at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.aggInfoList(StreamPhysicalWindowAggregate.scala:59)
      at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.explainTerms(StreamPhysicalWindowAggregate.scala:86)
      at org.apache.calcite.rel.AbstractRelNode.getDigestItems(AbstractRelNode.java:409)
      at org.apache.calcite.rel.AbstractRelNode.deepHashCode(AbstractRelNode.java:391)
      at org.apache.calcite.rel.AbstractRelNode$InnerRelDigest.hashCode(AbstractRelNode.java:443)
      at java.base/java.util.HashMap.hash(HashMap.java:339)
      at java.base/java.util.HashMap.get(HashMap.java:552)
      at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1150)
      at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
      at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
      at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148)
      ... 46 more

      CREATE TABLE events_table (
          uuid varchar(36),
          event_type VARCHAR(20),
          ts TIMESTAMP(3),
          WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
      )
      PARTITIONED BY (event_type)
      WITH (
          'connector' = 'kinesis',
          'stream' = 'kinesis-event-stream',
          'aws.region' = 'us-west-2',
          'scan.stream.initpos' = 'TRIM_HORIZON',
          'format' = 'json',
          'scan.stream.recordpublisher' = 'EFO',
          'scan.stream.efo.consumername' = 'learn-actions-efo',
          'scan.stream.efo.registration' = 'LAZY', -- EAGER
          'json.timestamp-format.standard' = 'ISO-8601'
      );
      

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            ibuda Ivan Budanaev
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment