Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-13738

NegativeArraySizeException in LongHybridHashTable

    XMLWordPrintableJSON

Details

    Description

      Executing this (meaningless) query:

      INSERT INTO sinkTable ( SELECT CONCAT( CAST( id AS VARCHAR), CAST( COUNT(*) AS VARCHAR)) as something, 'const' FROM CsvTable, table1  WHERE sometxt LIKE 'a%' AND id = key GROUP BY id ) 

      leads to the following exception:

      Caused by: java.lang.NegativeArraySizeException
       at org.apache.flink.table.runtime.hashtable.LongHybridHashTable.tryDenseMode(LongHybridHashTable.java:216)
       at org.apache.flink.table.runtime.hashtable.LongHybridHashTable.endBuild(LongHybridHashTable.java:105)
       at LongHashJoinOperator$36.endInput1$(Unknown Source)
       at LongHashJoinOperator$36.endInput(Unknown Source)
       at org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:256)
       at org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor.checkFinished(StreamTwoInputSelectableProcessor.java:359)
       at org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor.processInput(StreamTwoInputSelectableProcessor.java:193)
       at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
       at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
       at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:687)
       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:517)
       at java.lang.Thread.run(Thread.java:748)

      This is the plan:

       

      == Abstract Syntax Tree ==
      LogicalSink(name=[sinkTable], fields=[f0, f1])
      +- LogicalProject(something=[CONCAT(CAST($0):VARCHAR(2147483647) CHARACTER SET "UTF-16LE", CAST($1):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL)], EXPR$1=[_UTF-16LE'const'])
         +- LogicalAggregate(group=[{0}], agg#0=[COUNT()])
            +- LogicalProject(id=[$1])
               +- LogicalFilter(condition=[AND(LIKE($0, _UTF-16LE'a%'), =($1, CAST($2):BIGINT))])
                  +- LogicalJoin(condition=[true], joinType=[inner])
                     :- LogicalTableScan(table=[[default_catalog, default_database, CsvTable, source: [CsvTableSource(read fields: sometxt, id)]]])
                     +- LogicalTableScan(table=[[default_catalog, default_database, table1, source: [GeneratorTableSource(key, rowtime, payload)]]])
      
      == Optimized Logical Plan ==
      Sink(name=[sinkTable], fields=[f0, f1]): rowcount = 1498810.6659336376, cumulative cost = {4.459964319978008E8 rows, 1.879799762133187E10 cpu, 4.8E9 io, 8.4E8 network, 1.799524266373455E8 memory}
      +- Calc(select=[CONCAT(CAST(id), CAST($f1)) AS something, _UTF-16LE'const' AS EXPR$1]): rowcount = 1498810.6659336376, cumulative cost = {4.444976213318672E8 rows, 1.8796498810665936E10 cpu, 4.8E9 io, 8.4E8 network, 1.799524266373455E8 memory}
         +- HashAggregate(isMerge=[false], groupBy=[id], select=[id, COUNT(*) AS $f1]): rowcount = 1498810.6659336376, cumulative cost = {4.429988106659336E8 rows, 1.8795E10 cpu, 4.8E9 io, 8.4E8 network, 1.799524266373455E8 memory}
            +- Calc(select=[id]): rowcount = 1.575E7, cumulative cost = {4.415E8 rows, 1.848E10 cpu, 4.8E9 io, 8.4E8 network, 1.2E8 memory}
               +- HashJoin(joinType=[InnerJoin], where=[=(id, key0)], select=[id, key0], build=[left]): rowcount = 1.575E7, cumulative cost = {4.2575E8 rows, 1.848E10 cpu, 4.8E9 io, 8.4E8 network, 1.2E8 memory}
                  :- Exchange(distribution=[hash[id]]): rowcount = 5000000.0, cumulative cost = {1.1E8 rows, 8.4E8 cpu, 2.0E9 io, 4.0E7 network, 0.0 memory}
                  :  +- Calc(select=[id], where=[LIKE(sometxt, _UTF-16LE'a%')]): rowcount = 5000000.0, cumulative cost = {1.05E8 rows, 0.0 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
                  :     +- TableSourceScan(table=[[default_catalog, default_database, CsvTable, source: [CsvTableSource(read fields: sometxt, id)]]], fields=[sometxt, id]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 0.0 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
                  +- Exchange(distribution=[hash[key0]]): rowcount = 1.0E8, cumulative cost = {3.0E8 rows, 1.68E10 cpu, 2.8E9 io, 8.0E8 network, 0.0 memory}
                     +- Calc(select=[CAST(key) AS key0]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 0.0 cpu, 2.8E9 io, 0.0 network, 0.0 memory}
                        +- TableSourceScan(table=[[default_catalog, default_database, table1, source: [GeneratorTableSource(key, rowtime, payload)]]], fields=[key, rowtime, payload]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 0.0 cpu, 2.8E9 io, 0.0 network, 0.0 memory}
      
      == Physical Execution Plan ==
      Stage 1 : Data Source
      	content : collect elements with CollectionInputFormat
      
      	Stage 2 : Operator
      		content : CsvTableSource(read fields: sometxt, id)
      		ship_strategy : REBALANCE
      
      		Stage 3 : Operator
      			content : SourceConversion(table=[default_catalog.default_database.CsvTable, source: [CsvTableSource(read fields: sometxt, id)]], fields=[sometxt, id])
      			ship_strategy : FORWARD
      
      			Stage 4 : Operator
      				content : Calc(select=[id], where=[(sometxt LIKE _UTF-16LE'a%')])
      				ship_strategy : FORWARD
      
      Stage 6 : Data Source
      	content : collect elements with CollectionInputFormat
      
      	Stage 7 : Operator
      		content : SourceConversion(table=[default_catalog.default_database.table1, source: [GeneratorTableSource(key, rowtime, payload)]], fields=[key, rowtime, payload])
      		ship_strategy : FORWARD
      
      		Stage 8 : Operator
      			content : Calc(select=[CAST(key) AS key0])
      			ship_strategy : FORWARD
      
      			Stage 10 : Operator
      				content : HashJoin(joinType=[InnerJoin], where=[(id = key0)], select=[id, key0], build=[left])
      				ship_strategy : HASH[id]
      
      				Stage 11 : Operator
      					content : Calc(select=[id])
      					ship_strategy : FORWARD
      
      					Stage 12 : Operator
      						content : HashAggregate(isMerge=[false], groupBy=[id], select=[id, COUNT(*) AS $f1])
      						ship_strategy : FORWARD
      
      						Stage 13 : Operator
      							content : Calc(select=[(CAST(id) CONCAT CAST($f1)) AS something, _UTF-16LE'const' AS EXPR$1])
      							ship_strategy : FORWARD
      
      							Stage 14 : Operator
      								content : SinkConversionToRow
      								ship_strategy : FORWARD
      
      								Stage 15 : Operator
      									content : Map
      									ship_strategy : FORWARD
      
      									Stage 16 : Data Sink
      										content : Sink: CsvTableSink(f0, f1)
      										ship_strategy : FORWARD
      
      
      

       

      Attachments

        Issue Links

          Activity

            People

              lzljs3620320 Jingsong Lee
              rmetzger Robert Metzger
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m