Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19739

CompileException when windowing in batch mode: A method named "replace" is not declared in any enclosing class nor any supertype

    XMLWordPrintableJSON

Details

    Description

      Example script:

      from pyflink.table import EnvironmentSettings, BatchTableEnvironment
      from pyflink.table.window import Tumble
      
      
      env_settings = (
          EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
      )
      table_env = BatchTableEnvironment.create(environment_settings=env_settings)
      
      table_env.execute_sql(
          """
          CREATE TABLE table1 (
              amount INT,
              ts TIMESTAMP(3),
              WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
          ) WITH (
              'connector.type' = 'filesystem',
              'format.type' = 'csv',
              'connector.path' = '/home/alex/work/test-flink/data1.csv'
          )
      """
      )
      
      
      table1 = table_env.from_path("table1")
      table = (
          table1
              .window(Tumble.over("5.days").on("ts").alias("__window"))
              .group_by("__window")
              .select("amount.sum")
      )
      print(table.to_pandas())
      

      Output:

      WARNING: An illegal reflective access operation has occurred
      WARNING: Illegal reflective access by org.apache.flink.api.python.shaded.io.netty.util.internal.ReflectionUtil (file:/home/alex/work/flink/flink-dist/target/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT/opt/flink-python_2.11-1.12-SNAPSHOT.jar) to constructor java.nio.DirectByteBuffer(long,int)
      WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.python.shaded.io.netty.util.internal.ReflectionUtil
      WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
      WARNING: All illegal access operations will be denied in a future release
      /* 1 */
      /* 2 */      public class LocalHashWinAggWithoutKeys$59 extends org.apache.flink.table.runtime.operators.TableStreamOperator
      /* 3 */          implements org.apache.flink.streaming.api.operators.OneInputStreamOperator, org.apache.flink.streaming.api.operators.BoundedOneInput {
      /* 4 */
      /* 5 */        private final Object[] references;
      /* 6 */        
      /* 7 */        private static final org.slf4j.Logger LOG$2 =
      /* 8 */          org.slf4j.LoggerFactory.getLogger("LocalHashWinAgg");
      /* 9 */        
      /* 10 */        private transient org.apache.flink.table.types.logical.LogicalType[] aggMapKeyTypes$5;
      /* 11 */        private transient org.apache.flink.table.types.logical.LogicalType[] aggBufferTypes$6;
      /* 12 */        private transient org.apache.flink.table.runtime.operators.aggregate.BytesHashMap aggregateMap$7;
      /* 13 */        org.apache.flink.table.data.binary.BinaryRowData emptyAggBuffer$9 = new org.apache.flink.table.data.binary.BinaryRowData(1);
      /* 14 */        org.apache.flink.table.data.writer.BinaryRowWriter emptyAggBufferWriterTerm$10 = new org.apache.flink.table.data.writer.BinaryRowWriter(emptyAggBuffer$9);
      /* 15 */        org.apache.flink.table.data.GenericRowData hashAggOutput = new org.apache.flink.table.data.GenericRowData(2);
      /* 16 */        private transient org.apache.flink.table.data.binary.BinaryRowData reuseAggMapKey$17 = new org.apache.flink.table.data.binary.BinaryRowData(1);
      /* 17 */        private transient org.apache.flink.table.data.binary.BinaryRowData reuseAggBuffer$18 = new org.apache.flink.table.data.binary.BinaryRowData(1);
      /* 18 */        private transient org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.Entry reuseAggMapEntry$19 = new org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.Entry(reuseAggMapKey$17, reuseAggBuffer$18);
      /* 19 */        org.apache.flink.table.data.binary.BinaryRowData aggMapKey$3 = new org.apache.flink.table.data.binary.BinaryRowData(1);
      /* 20 */        org.apache.flink.table.data.writer.BinaryRowWriter aggMapKeyWriter$4 = new org.apache.flink.table.data.writer.BinaryRowWriter(aggMapKey$3);
      /* 21 */        private boolean hasInput = false;
      /* 22 */        org.apache.flink.streaming.runtime.streamrecord.StreamRecord element = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord((Object)null);
      /* 23 */        private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
      /* 24 */
      /* 25 */        public LocalHashWinAggWithoutKeys$59(
      /* 26 */            Object[] references,
      /* 27 */            org.apache.flink.streaming.runtime.tasks.StreamTask task,
      /* 28 */            org.apache.flink.streaming.api.graph.StreamConfig config,
      /* 29 */            org.apache.flink.streaming.api.operators.Output output,
      /* 30 */            org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) throws Exception {
      /* 31 */          this.references = references;
      /* 32 */          aggMapKeyTypes$5 = (((org.apache.flink.table.types.logical.LogicalType[]) references[0]));
      /* 33 */          aggBufferTypes$6 = (((org.apache.flink.table.types.logical.LogicalType[]) references[1]));
      /* 34 */          this.setup(task, config, output);
      /* 35 */          if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
      /* 36 */            ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
      /* 37 */              .setProcessingTimeService(processingTimeService);
      /* 38 */          }
      /* 39 */        }
      /* 40 */
      /* 41 */        @Override
      /* 42 */        public void open() throws Exception {
      /* 43 */          super.open();
      /* 44 */          aggregateMap$7 = new org.apache.flink.table.runtime.operators.aggregate.BytesHashMap(this.getContainingTask(),this.getContainingTask().getEnvironment().getMemoryManager(),computeMemorySize(), aggMapKeyTypes$5, aggBufferTypes$6);
      /* 45 */          
      /* 46 */          
      /* 47 */          emptyAggBufferWriterTerm$10.reset();
      /* 48 */          
      /* 49 */          
      /* 50 */          if (true) {
      /* 51 */            emptyAggBufferWriterTerm$10.setNullAt(0);
      /* 52 */          } else {
      /* 53 */            emptyAggBufferWriterTerm$10.writeInt(0, ((int) -1));
      /* 54 */          }
      /* 55 */                       
      /* 56 */          emptyAggBufferWriterTerm$10.complete();
      /* 57 */                  
      /* 58 */        }
      /* 59 */
      /* 60 */        @Override
      /* 61 */        public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {
      /* 62 */          org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) element.getValue();
      /* 63 */          
      /* 64 */          org.apache.flink.table.data.binary.BinaryRowData currentAggBuffer$8;
      /* 65 */          int field$11;
      /* 66 */          boolean isNull$11;
      /* 67 */          int field$12;
      /* 68 */          boolean isNull$12;
      /* 69 */          boolean isNull$13;
      /* 70 */          int result$14;
      /* 71 */          org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.LookupInfo lookupInfo$20;
      /* 72 */          org.apache.flink.table.data.TimestampData field$21;
      /* 73 */          boolean isNull$21;
      /* 74 */          boolean isNull$22;
      /* 75 */          long result$23;
      /* 76 */          boolean isNull$24;
      /* 77 */          long result$25;
      /* 78 */          boolean isNull$26;
      /* 79 */          long result$27;
      /* 80 */          boolean isNull$28;
      /* 81 */          long result$29;
      /* 82 */          boolean isNull$30;
      /* 83 */          long result$31;
      /* 84 */          boolean isNull$32;
      /* 85 */          long result$33;
      /* 86 */          boolean isNull$34;
      /* 87 */          boolean result$35;
      /* 88 */          boolean isNull$36;
      /* 89 */          long result$37;
      /* 90 */          boolean isNull$38;
      /* 91 */          long result$39;
      /* 92 */          boolean isNull$40;
      /* 93 */          long result$41;
      /* 94 */          boolean isNull$42;
      /* 95 */          long result$43;
      /* 96 */          boolean isNull$44;
      /* 97 */          long result$45;
      /* 98 */          boolean isNull$46;
      /* 99 */          long result$47;
      /* 100 */          boolean isNull$48;
      /* 101 */          long result$49;
      /* 102 */          boolean isNull$50;
      /* 103 */          long result$51;
      /* 104 */          boolean isNull$52;
      /* 105 */          long result$53;
      /* 106 */          boolean isNull$55;
      /* 107 */          long result$56;
      /* 108 */          boolean isNull$57;
      /* 109 */          long result$58;
      /* 110 */          
      /* 111 */          
      /* 112 */          if (!in1.isNullAt(1)) {
      /* 113 */            hasInput = true;
      /* 114 */            // input field access for group key projection, window/pane assign
      /* 115 */             // and aggregate map update
      /* 116 */            isNull$11 = in1.isNullAt(0);
      /* 117 */          field$11 = -1;
      /* 118 */          if (!isNull$11) {
      /* 119 */            field$11 = in1.getInt(0);
      /* 120 */          }
      /* 121 */          isNull$21 = in1.isNullAt(1);
      /* 122 */          field$21 = null;
      /* 123 */          if (!isNull$21) {
      /* 124 */            field$21 = in1.getTimestamp(1, 3);
      /* 125 */          }
      /* 126 */            // assign timestamp(window or pane)
      /* 127 */            
      /* 128 */          
      /* 129 */          
      /* 130 */          
      /* 131 */          
      /* 132 */          isNull$22 = isNull$21;
      /* 133 */          result$23 = -1L;
      /* 134 */          if (!isNull$22) {
      /* 135 */            
      /* 136 */            result$23 = field$21.getMillisecond();
      /* 137 */            
      /* 138 */          }
      /* 139 */          
      /* 140 */          
      /* 141 */          isNull$24 = isNull$22 || false;
      /* 142 */          result$25 = -1L;
      /* 143 */          if (!isNull$24) {
      /* 144 */            
      /* 145 */            result$25 = (long) (result$23 * ((long) 1L));
      /* 146 */            
      /* 147 */          }
      /* 148 */          
      /* 149 */          isNull$26 = isNull$21;
      /* 150 */          result$27 = -1L;
      /* 151 */          if (!isNull$26) {
      /* 152 */            
      /* 153 */            result$27 = field$21.getMillisecond();
      /* 154 */            
      /* 155 */          }
      /* 156 */          
      /* 157 */          
      /* 158 */          isNull$28 = isNull$26 || false;
      /* 159 */          result$29 = -1L;
      /* 160 */          if (!isNull$28) {
      /* 161 */            
      /* 162 */            result$29 = (long) (result$27 * ((long) 1L));
      /* 163 */            
      /* 164 */          }
      /* 165 */          
      /* 166 */          
      /* 167 */          isNull$30 = isNull$28 || false;
      /* 168 */          result$31 = -1L;
      /* 169 */          if (!isNull$30) {
      /* 170 */            
      /* 171 */            result$31 = (long) (result$29 - ((long) 0L));
      /* 172 */            
      /* 173 */          }
      /* 174 */          
      /* 175 */          
      /* 176 */          isNull$32 = isNull$30 || false;
      /* 177 */          result$33 = -1L;
      /* 178 */          if (!isNull$32) {
      /* 179 */            
      /* 180 */            result$33 = (long) (result$31 % ((long) 432000000L));
      /* 181 */            
      /* 182 */          }
      /* 183 */          
      /* 184 */          
      /* 185 */          isNull$34 = isNull$32 || false;
      /* 186 */          result$35 = false;
      /* 187 */          if (!isNull$34) {
      /* 188 */            
      /* 189 */            result$35 = result$33 < ((int) 0);
      /* 190 */            
      /* 191 */          }
      /* 192 */          
      /* 193 */          long result$54 = -1L;
      /* 194 */          boolean isNull$54;
      /* 195 */          if (result$35) {
      /* 196 */            
      /* 197 */          
      /* 198 */          
      /* 199 */          
      /* 200 */          
      /* 201 */          
      /* 202 */          isNull$36 = isNull$21;
      /* 203 */          result$37 = -1L;
      /* 204 */          if (!isNull$36) {
      /* 205 */            
      /* 206 */            result$37 = field$21.getMillisecond();
      /* 207 */            
      /* 208 */          }
      /* 209 */          
      /* 210 */          
      /* 211 */          isNull$38 = isNull$36 || false;
      /* 212 */          result$39 = -1L;
      /* 213 */          if (!isNull$38) {
      /* 214 */            
      /* 215 */            result$39 = (long) (result$37 * ((long) 1L));
      /* 216 */            
      /* 217 */          }
      /* 218 */          
      /* 219 */          
      /* 220 */          isNull$40 = isNull$38 || false;
      /* 221 */          result$41 = -1L;
      /* 222 */          if (!isNull$40) {
      /* 223 */            
      /* 224 */            result$41 = (long) (result$39 - ((long) 0L));
      /* 225 */            
      /* 226 */          }
      /* 227 */          
      /* 228 */          
      /* 229 */          isNull$42 = isNull$40 || false;
      /* 230 */          result$43 = -1L;
      /* 231 */          if (!isNull$42) {
      /* 232 */            
      /* 233 */            result$43 = (long) (result$41 % ((long) 432000000L));
      /* 234 */            
      /* 235 */          }
      /* 236 */          
      /* 237 */          
      /* 238 */          isNull$44 = isNull$42 || false;
      /* 239 */          result$45 = -1L;
      /* 240 */          if (!isNull$44) {
      /* 241 */            
      /* 242 */            result$45 = (long) (result$43 + ((long) 432000000L));
      /* 243 */            
      /* 244 */          }
      /* 245 */          
      /* 246 */            isNull$54 = isNull$44;
      /* 247 */            if (!isNull$54) {
      /* 248 */              result$54 = result$45;
      /* 249 */            }
      /* 250 */          }
      /* 251 */          else {
      /* 252 */            
      /* 253 */          
      /* 254 */          
      /* 255 */          
      /* 256 */          
      /* 257 */          isNull$46 = isNull$21;
      /* 258 */          result$47 = -1L;
      /* 259 */          if (!isNull$46) {
      /* 260 */            
      /* 261 */            result$47 = field$21.getMillisecond();
      /* 262 */            
      /* 263 */          }
      /* 264 */          
      /* 265 */          
      /* 266 */          isNull$48 = isNull$46 || false;
      /* 267 */          result$49 = -1L;
      /* 268 */          if (!isNull$48) {
      /* 269 */            
      /* 270 */            result$49 = (long) (result$47 * ((long) 1L));
      /* 271 */            
      /* 272 */          }
      /* 273 */          
      /* 274 */          
      /* 275 */          isNull$50 = isNull$48 || false;
      /* 276 */          result$51 = -1L;
      /* 277 */          if (!isNull$50) {
      /* 278 */            
      /* 279 */            result$51 = (long) (result$49 - ((long) 0L));
      /* 280 */            
      /* 281 */          }
      /* 282 */          
      /* 283 */          
      /* 284 */          isNull$52 = isNull$50 || false;
      /* 285 */          result$53 = -1L;
      /* 286 */          if (!isNull$52) {
      /* 287 */            
      /* 288 */            result$53 = (long) (result$51 % ((long) 432000000L));
      /* 289 */            
      /* 290 */          }
      /* 291 */          
      /* 292 */            isNull$54 = isNull$52;
      /* 293 */            if (!isNull$54) {
      /* 294 */              result$54 = result$53;
      /* 295 */            }
      /* 296 */          }
      /* 297 */          isNull$55 = isNull$24 || isNull$54;
      /* 298 */          result$56 = -1L;
      /* 299 */          if (!isNull$55) {
      /* 300 */            
      /* 301 */            result$56 = (long) (result$25 - result$54);
      /* 302 */            
      /* 303 */          }
      /* 304 */          
      /* 305 */          
      /* 306 */          isNull$57 = isNull$55 || false;
      /* 307 */          result$58 = -1L;
      /* 308 */          if (!isNull$57) {
      /* 309 */            
      /* 310 */            result$58 = (long) (result$56 - ((long) 0L));
      /* 311 */            
      /* 312 */          }
      /* 313 */          
      /* 314 */            // process each input
      /* 315 */            
      /* 316 */            // build aggregate map key
      /* 317 */            
      /* 318 */          
      /* 319 */          aggMapKeyWriter$4.reset();
      /* 320 */          
      /* 321 */          
      /* 322 */          if (false) {
      /* 323 */            aggMapKeyWriter$4.setNullAt(0);
      /* 324 */          } else {
      /* 325 */            aggMapKeyWriter$4.writeLong(0, result$58);
      /* 326 */          }
      /* 327 */                       
      /* 328 */          aggMapKeyWriter$4.complete();
      /* 329 */                  
      /* 330 */            // aggregate by each input with assigned timestamp
      /* 331 */            // look up output buffer using current key (grouping keys ..., assigned timestamp)
      /* 332 */          lookupInfo$20 = aggregateMap$7.lookup(aggMapKey$3);
      /* 333 */          currentAggBuffer$8 = lookupInfo$20.getValue();
      /* 334 */          if (!lookupInfo$20.isFound()) {
      /* 335 */            
      /* 336 */            // append empty agg buffer into aggregate map for current group key
      /* 337 */            try {
      /* 338 */              currentAggBuffer$8 =
      /* 339 */                aggregateMap$7.append(lookupInfo$20, emptyAggBuffer$9);
      /* 340 */            } catch (java.io.EOFException exp) {
      /* 341 */              
      /* 342 */          LOG$2.info("BytesHashMap out of memory with {} entries, output directly.", aggregateMap$7.getNumElements());
      /* 343 */           // hash map out of memory, output directly
      /* 344 */          
      /* 345 */          org.apache.flink.util.MutableObjectIterator<org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.Entry> iterator =
      /* 346 */            aggregateMap$7.getEntryIterator();
      /* 347 */          while (iterator.next(reuseAggMapEntry$19) != null) {
      /* 348 */             
      /* 349 */             
      /* 350 */          
      /* 351 */          hashAggOutput.replace(reuseAggMapKey$17, reuseAggBuffer$18);
      /* 352 */                   
      /* 353 */             output.collect(outElement.replace(hashAggOutput));
      /* 354 */          }
      /* 355 */                 
      /* 356 */           // retry append
      /* 357 */          
      /* 358 */           // reset aggregate map retry append
      /* 359 */          aggregateMap$7.reset();
      /* 360 */          lookupInfo$20 = aggregateMap$7.lookup(aggMapKey$3);
      /* 361 */          try {
      /* 362 */            currentAggBuffer$8 =
      /* 363 */              aggregateMap$7.append(lookupInfo$20, emptyAggBuffer$9);
      /* 364 */          } catch (java.io.EOFException e) {
      /* 365 */            throw new OutOfMemoryError("BytesHashMap Out of Memory.");
      /* 366 */          }
      /* 367 */                 
      /* 368 */                    
      /* 369 */            }
      /* 370 */          }
      /* 371 */          // aggregate buffer fields access
      /* 372 */          isNull$12 = currentAggBuffer$8.isNullAt(0);
      /* 373 */          field$12 = -1;
      /* 374 */          if (!isNull$12) {
      /* 375 */            field$12 = currentAggBuffer$8.getInt(0);
      /* 376 */          }
      /* 377 */          // do aggregate and update agg buffer
      /* 378 */          int result$16 = -1;
      /* 379 */          boolean isNull$16;
      /* 380 */          if (isNull$11) {
      /* 381 */            
      /* 382 */            isNull$16 = isNull$12;
      /* 383 */            if (!isNull$16) {
      /* 384 */              result$16 = field$12;
      /* 385 */            }
      /* 386 */          }
      /* 387 */          else {
      /* 388 */            int result$15 = -1;
      /* 389 */          boolean isNull$15;
      /* 390 */          if (isNull$12) {
      /* 391 */            
      /* 392 */            isNull$15 = isNull$11;
      /* 393 */            if (!isNull$15) {
      /* 394 */              result$15 = field$11;
      /* 395 */            }
      /* 396 */          }
      /* 397 */          else {
      /* 398 */            
      /* 399 */          
      /* 400 */          
      /* 401 */          isNull$13 = isNull$12 || isNull$11;
      /* 402 */          result$14 = -1;
      /* 403 */          if (!isNull$13) {
      /* 404 */            
      /* 405 */            result$14 = (int) (field$12 + field$11);
      /* 406 */            
      /* 407 */          }
      /* 408 */          
      /* 409 */            isNull$15 = isNull$13;
      /* 410 */            if (!isNull$15) {
      /* 411 */              result$15 = result$14;
      /* 412 */            }
      /* 413 */          }
      /* 414 */            isNull$16 = isNull$15;
      /* 415 */            if (!isNull$16) {
      /* 416 */              result$16 = result$15;
      /* 417 */            }
      /* 418 */          }
      /* 419 */          if (isNull$16) {
      /* 420 */            currentAggBuffer$8.setNullAt(0);
      /* 421 */          } else {
      /* 422 */            currentAggBuffer$8.setInt(0, result$16);
      /* 423 */          }
      /* 424 */                     
      /* 425 */          }
      /* 426 */        }
      /* 427 */
      /* 428 */        
      /* 429 */        @Override
      /* 430 */        public void endInput() throws Exception {
      /* 431 */          org.apache.flink.table.data.binary.BinaryRowData currentAggBuffer$8;
      /* 432 */        int field$11;
      /* 433 */        boolean isNull$11;
      /* 434 */        int field$12;
      /* 435 */        boolean isNull$12;
      /* 436 */        boolean isNull$13;
      /* 437 */        int result$14;
      /* 438 */        org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.LookupInfo lookupInfo$20;
      /* 439 */        org.apache.flink.table.data.TimestampData field$21;
      /* 440 */        boolean isNull$21;
      /* 441 */        boolean isNull$22;
      /* 442 */        long result$23;
      /* 443 */        boolean isNull$24;
      /* 444 */        long result$25;
      /* 445 */        boolean isNull$26;
      /* 446 */        long result$27;
      /* 447 */        boolean isNull$28;
      /* 448 */        long result$29;
      /* 449 */        boolean isNull$30;
      /* 450 */        long result$31;
      /* 451 */        boolean isNull$32;
      /* 452 */        long result$33;
      /* 453 */        boolean isNull$34;
      /* 454 */        boolean result$35;
      /* 455 */        boolean isNull$36;
      /* 456 */        long result$37;
      /* 457 */        boolean isNull$38;
      /* 458 */        long result$39;
      /* 459 */        boolean isNull$40;
      /* 460 */        long result$41;
      /* 461 */        boolean isNull$42;
      /* 462 */        long result$43;
      /* 463 */        boolean isNull$44;
      /* 464 */        long result$45;
      /* 465 */        boolean isNull$46;
      /* 466 */        long result$47;
      /* 467 */        boolean isNull$48;
      /* 468 */        long result$49;
      /* 469 */        boolean isNull$50;
      /* 470 */        long result$51;
      /* 471 */        boolean isNull$52;
      /* 472 */        long result$53;
      /* 473 */        boolean isNull$55;
      /* 474 */        long result$56;
      /* 475 */        boolean isNull$57;
      /* 476 */        long result$58;
      /* 477 */          
      /* 478 */        org.apache.flink.util.MutableObjectIterator<org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.Entry> iterator =
      /* 479 */          aggregateMap$7.getEntryIterator();
      /* 480 */        while (iterator.next(reuseAggMapEntry$19) != null) {
      /* 481 */           
      /* 482 */           
      /* 483 */        
      /* 484 */        hashAggOutput.replace(reuseAggMapKey$17, reuseAggBuffer$18);
      /* 485 */                 
      /* 486 */           output.collect(outElement.replace(hashAggOutput));
      /* 487 */        }
      /* 488 */               
      /* 489 */        }
      /* 490 */                 
      /* 491 */
      /* 492 */        @Override
      /* 493 */        public void close() throws Exception {
      /* 494 */           super.close();
      /* 495 */          aggregateMap$7.free();
      /* 496 */          
      /* 497 */        }
      /* 498 */
      /* 499 */        
      /* 500 */      }
      /* 501 */    
      
      Traceback (most recent call last):
        File "/home/alex/.config/JetBrains/PyCharm2020.2/scratches/scratch_903.py", line 32, in <module>
          print(table.to_pandas())
        File "/home/alex/work/flink/flink-python/pyflink/table/table.py", line 829, in to_pandas
          if batches.hasNext():
        File "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/py4j/java_gateway.py", line 1285, in __call__
          return_value = get_return_value(
        File "/home/alex/work/flink/flink-python/pyflink/util/exceptions.py", line 147, in deco
          return f(*a, **kw)
        File "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
          raise Py4JJavaError(
      py4j.protocol.Py4JJavaError: An error occurred while calling o51.hasNext.
      : java.lang.RuntimeException: Failed to fetch next result
      	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
      	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:77)
      	at org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:115)
      	at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:355)
      	at org.apache.flink.table.runtime.arrow.ArrowUtils$1.hasNext(ArrowUtils.java:644)
      	at org.apache.flink.table.runtime.arrow.ArrowUtils$2.hasNext(ArrowUtils.java:666)
      	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
      	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
      	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
      	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
      	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
      	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
      	at java.base/java.lang.Thread.run(Thread.java:834)
      Caused by: java.io.IOException: Failed to fetch job execution result
      	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
      	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:126)
      	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:103)
      	... 16 more
      Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
      	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022)
      	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:172)
      	... 18 more
      Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
      	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
      	at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680)
      	at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658)
      	at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2094)
      	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117)
      	... 19 more
      Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
      	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
      	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:217)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:210)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:204)
      	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:526)
      	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:413)
      	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
      	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
      	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
      	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
      	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
      	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
      	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
      	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: java.lang.RuntimeException: Could not instantiate generated class 'LocalHashWinAggWithoutKeys$59'
      	at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67)
      	at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40)
      	at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:70)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:613)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:574)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:533)
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
      	at java.base/java.lang.Thread.run(Thread.java:834)
      Caused by: org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
      	at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
      	at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
      	at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65)
      	... 13 more
      Caused by: org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
      	at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
      	at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
      	at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
      	at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
      	... 15 more
      Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
      	at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
      	at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
      	at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
      	at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
      	at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
      	at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
      	at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
      	... 18 more
      Caused by: org.codehaus.commons.compiler.CompileException: Line 351, Column 33: A method named "replace" is not declared in any enclosing class nor any supertype, nor through a static import
      	at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
      	at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8997)
      	at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
      	at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
      	at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
      	at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
      	at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
      	at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
      	at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
      	at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
      	at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
      	at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
      	at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
      	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
      	at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
      	at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
      	at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
      	at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
      	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
      	at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553)
      	at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215)
      	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493)
      	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487)
      	at org.codehaus.janino.Java$Block.accept(Java.java:2776)
      	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1842)
      	at org.codehaus.janino.UnitCompiler.access$2200(UnitCompiler.java:215)
      	at org.codehaus.janino.UnitCompiler$6.visitWhileStatement(UnitCompiler.java:1498)
      	at org.codehaus.janino.UnitCompiler$6.visitWhileStatement(UnitCompiler.java:1487)
      	at org.codehaus.janino.Java$WhileStatement.accept(Java.java:3052)
      	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
      	at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553)
      	at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215)
      	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493)
      	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487)
      	at org.codehaus.janino.Java$Block.accept(Java.java:2776)
      	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
      	at org.codehaus.janino.UnitCompiler.compileTryCatch(UnitCompiler.java:3136)
      	at org.codehaus.janino.UnitCompiler.compileTryCatchFinally(UnitCompiler.java:2966)
      	at org.codehaus.janino.UnitCompiler.compileTryCatchFinallyWithResources(UnitCompiler.java:2770)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2742)
      	at org.codehaus.janino.UnitCompiler.access$2300(UnitCompiler.java:215)
      	at org.codehaus.janino.UnitCompiler$6.visitTryStatement(UnitCompiler.java:1499)
      	at org.codehaus.janino.UnitCompiler$6.visitTryStatement(UnitCompiler.java:1487)
      	at org.codehaus.janino.Java$TryStatement.accept(Java.java:3238)
      	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
      	at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553)
      	at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215)
      	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493)
      	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487)
      	at org.codehaus.janino.Java$Block.accept(Java.java:2776)
      	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2476)
      	at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:215)
      	at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1495)
      	at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1487)
      	at org.codehaus.janino.Java$IfStatement.accept(Java.java:2947)
      	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
      	at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553)
      	at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215)
      	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493)
      	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487)
      	at org.codehaus.janino.Java$Block.accept(Java.java:2776)
      	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2476)
      	at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:215)
      	at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1495)
      	at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1487)
      	at org.codehaus.janino.Java$IfStatement.accept(Java.java:2947)
      	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
      	at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
      	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
      	at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
      	at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
      	at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
      	at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
      	at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
      	at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
      	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
      	at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
      	at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
      	at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
      	at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
      	at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
      	at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
      	at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
      	at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
      	... 24 more
      

      However it works fine in streaming mode:

      env_settings = (
          EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
      )
      table_env = StreamTableEnvironment.create(environment_settings=env_settings)
      

      How the table is created seems irrelevant - this raises the same error:

      from datetime import datetime
      
      from pyflink.table import DataTypes, BatchTableEnvironment, EnvironmentSettings
      from pyflink.table.window import Tumble
      
      env_settings = (
          EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
      )
      table_environment = BatchTableEnvironment.create(environment_settings=env_settings)
      transactions = table_environment.from_elements(
          [
              (1, datetime(2000, 1, 1, 0, 0, 0)),
              (-2, datetime(2000, 1, 2, 0, 0, 0)),
              (3, datetime(2000, 1, 3, 0, 0, 0)),
              (-4, datetime(2000, 1, 4, 0, 0, 0)),
          ],
          DataTypes.ROW(
              [
                  DataTypes.FIELD("amount", DataTypes.BIGINT()),
                  DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)),
              ]
          ),
      )
      table = (
          transactions
              .window(Tumble.over("5.days").on("ts").alias("__window"))
              .group_by("__window")
              .select("amount.sum")
      )
      print(table.to_pandas())
      

      Attachments

        Activity

          People

            TsReaper Caizhi Weng
            alexmojaki Alex Hall
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: