Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22010

when flink executed union all opeators,exception occured

    XMLWordPrintableJSON

Details

    • Important

    Description

      when I executed job on 1.11.2,the job no exception,when I executed job on 1.12.1 or 1.12.2 ,the job occured some exception.

      code as the following:

      result = result1.union_all(result2)
      result = result.union_all(result3)

      #union_all(result4).union_all(result5).union_all(result5).union_all(result6).union_all(result7)
      result.execute().print()

      above the code, when i comment the code as the following,the code also no exception on flink 1.12.1 :

      result = result1.union_all(result2)
      #result = result.union_all(result3)

      #.union_all(result4).union_all(result5).union_all(result5).union_all(result6).union_all(result7)
      result.execute().print()

      I dont know how to solve the problems, May be someone could help me?

      Excepion as the following:

      py4j.protocol.Py4JJavaError: An error occurred while calling o340.print.
      : java.lang.RuntimeException: Failed to fetch next result
      at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
      at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
      at org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:117)
      at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)
      at org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:149)
      at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:154)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
      at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
      at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
      at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
      at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
      at java.lang.Thread.run(Thread.java:748)
      Caused by: java.io.IOException: Failed to fetch job execution result
      at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:169)
      at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118)
      at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
      ... 16 more
      Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 9ba6325f27c97192e42e76bd52d05db8)
      at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
      at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
      at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:167)
      ... 18 more
      Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 9ba6325f27c97192e42e76bd52d05db8)
      at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125)
      at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
      at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
      at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
      at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
      at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665)
      at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
      at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
      at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
      at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
      at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)
      at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
      at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
      at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
      at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
      at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
      at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      ... 1 more
      Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
      at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123)
      ... 19 more
      Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
      at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
      at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
      at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
      at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
      at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
      at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
      at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
      at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
      at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
      at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
      at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
      at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
      at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
      at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
      at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
      at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
      at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
      at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
      at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
      at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
      at akka.actor.Actor.aroundReceive(Actor.scala:517)
      at akka.actor.Actor.aroundReceive$(Actor.scala:515)
      at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
      at akka.actor.ActorCell.invoke(ActorCell.scala:561)
      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
      at akka.dispatch.Mailbox.run(Mailbox.scala:225)
      at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
      at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: java.lang.RuntimeException: org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate 512 pages
      at org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:84)
      at org.apache.flink.table.runtime.hashtable.BaseHybridHashTable.getNextBuffer(BaseHybridHashTable.java:254)
      at org.apache.flink.table.runtime.hashtable.BaseHybridHashTable.nextSegment(BaseHybridHashTable.java:313)
      at org.apache.flink.table.runtime.hashtable.LongHashPartition.<init>(LongHashPartition.java:166)
      at org.apache.flink.table.runtime.hashtable.LongHashPartition.<init>(LongHashPartition.java:136)
      at org.apache.flink.table.runtime.hashtable.LongHybridHashTable.createPartitions(LongHybridHashTable.java:276)
      at org.apache.flink.table.runtime.hashtable.LongHybridHashTable.<init>(LongHybridHashTable.java:89)
      at LongHashJoinOperator$1266$LongHashTable$1250.<init>(Unknown Source)
      at LongHashJoinOperator$1266.open(Unknown Source)
      at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
      at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
      at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate 512 pages
      at org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:235)
      at org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:82)
      ... 16 more
      Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could not allocate 16777216 bytes, only 0 bytes are remaining. This usually indicates that you are requesting more memory than you have reserved. However, when running an old JVM version it can also be caused by slow garbage collection. Try to upgrade to Java 8u72 or higher if running on an old Java version.
      at org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:170)
      at org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:84)
      at org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:232)
      ... 17 more

      Attachments

        Activity

          People

            Unassigned Unassigned
            zhou_yb zhou
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: