Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20784

.staging_xxx does not exist, when insert into hive

    XMLWordPrintableJSON

Details

    Description

      standalone cluster, batch mode,

      periodically execute this sql with "sql-client.sh", only change date parameter:

      insert overwrite snmpprobehive.snmpprobe.p_port_traffic_5m
      {{ select}}
      {{ ipp.binaryid as binaryid,}}
      {{ ipp.id as id,}}
      {{ 'all' as ver,}}
      {{ p0m.coltime as coltime,}}
      {{ p5m.ifhcinoctets - p0m.ifhcinoctets as inoctets,}}
      {{ p5m.ifhcoutoctets - p0m.ifhcoutoctets as outoctets,}}
      {{ (p5m.ifhcinoctets - p0m.ifhcinoctets) + (p5m.ifhcoutoctets - p0m.ifhcoutoctets) as bi_octets,}}
      {{ if((p5m.ifhcinoctets - p0m.ifhcinoctets) >= (p5m.ifhcoutoctets - p0m.ifhcoutoctets), (p5m.ifhcinoctets - p0m.ifhcinoctets), (p5m.ifhcoutoctets - p0m.ifhcoutoctets)) as unimax_octets,}}
      {{ cast((p5m.ifhcinoctets - p0m.ifhcinoctets) as double)/(5*60)*8 as in_speed,}}
      {{ cast((p5m.ifhcoutoctets - p0m.ifhcoutoctets) as double)/(5*60)*8 as out_speed,}}
      {{ cast(((p5m.ifhcinoctets - p0m.ifhcinoctets) + (p5m.ifhcoutoctets - p0m.ifhcoutoctets)) as double)/(5*60)*8 as bi_speed,}}
      {{ cast(if((p5m.ifhcinoctets - p0m.ifhcinoctets) >= (p5m.ifhcoutoctets - p0m.ifhcoutoctets), (p5m.ifhcinoctets - p0m.ifhcinoctets), (p5m.ifhcoutoctets - p0m.ifhcoutoctets)) as double)/(5*60)*8 unimax_speed,}}
      {{ cast((p5m.ifhcinoctets - p0m.ifhcinoctets) as double)/(5*60)*8/(cast(p0m.ifhighspeed as bigint)*1000000) as in_util,}}
      {{ cast((p5m.ifhcoutoctets - p0m.ifhcoutoctets) as double)/(5*60)*8/(cast(p0m.ifhighspeed as bigint)*1000000) as out_util,}}
      {{ cast(((p5m.ifhcinoctets - p0m.ifhcinoctets) + (p5m.ifhcoutoctets - p0m.ifhcoutoctets)) as double)/(5*60)*8/(cast(p0m.ifhighspeed as bigint)*1000000*2) as bi_util,}}
      {{ cast(if((p5m.ifhcinoctets - p0m.ifhcinoctets) >= (p5m.ifhcoutoctets - p0m.ifhcoutoctets), (p5m.ifhcinoctets - p0m.ifhcinoctets), (p5m.ifhcoutoctets - p0m.ifhcoutoctets)) as double)/(5*60)*8/(p0m.ifhighspeed*1000000) as unimax_util,}}
      {{ case}}
      {{ when (p5m.ifhcoutoctets - p0m.ifhcoutoctets) =0 or (p5m.ifhcoutoctets - p0m.ifhcoutoctets) is null then null}}
      {{ else cast((p5m.ifhcinoctets - p0m.ifhcinoctets) as double)/(p5m.ifhcoutoctets - p0m.ifhcoutoctets)}}
      {{ end as inout_ratio,}}
      {{ p0m.ifhighspeed as bandwidth,}}
      {{ p0m.ip as origin,}}
      {{ now(),}}
      {{ p0m.dt, p0m.hh, p0m.mi}}
      {{ from snmpprobehive.snmpprobe.p_snmp_ifxtable p0m}}
      {{ inner join snmpprobehive.snmpprobe.p_snmp_ifxtable p5m on p0m.id=p5m.id and p0m.mibindex=p5m.mibindex}}
      {{ inner join snmpprobehive.snmpprobe.rv_ip_port_hive ipp on ipp.did=p0m.id and ipp.ifindex=p0m.mibindex}}
      {{ where p5m.dt='2020-12-28' and p5m.hh='15' and p5m.mi='20' }}
      {{ and p0m.dt='2020-12-28' and p0m.hh='15' and p0m.mi='15' }}
      {{ and p0m.ifhighspeed > 0 }}
      {{ and ipp.operstatus ='up'}}

      when insert into hive table, randomly get  java.io.FileNotFoundException error.

      org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
      {{ at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)}}
      {{ at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:89)}}
      {{ at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:240)}}
      {{ at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyGlobalFailure(UpdateSchedulerNgOnInternalFailuresListener.java:65)}}
      {{ at org.apache.flink.runtime.executiongraph.ExecutionGraph.failGlobal(ExecutionGraph.java:1055)}}
      {{ at org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1305)}}
      {{ at org.apache.flink.runtime.executiongraph.ExecutionVertex.executionFinished(ExecutionVertex.java:849)}}
      {{ at org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1127)}}
      {{ at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1512)}}
      {{ at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1485)}}
      {{ at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:604)}}
      {{ at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)}}
      {{ at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)}}
      {{ at sun.reflect.GeneratedMethodAccessor56.invoke(Unknown Source)}}
      {{ at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)}}
      {{ at java.lang.reflect.Method.invoke(Method.java:498)}}
      {{ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)}}
      {{ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)}}
      {{ at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)}}
      {{ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)}}
      {{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)}}
      {{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)}}
      {{ at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)}}
      {{ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)}}
      {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)}}
      {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)}}
      {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)}}
      {{ at akka.actor.Actor$class.aroundReceive(Actor.scala:517)}}
      {{ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)}}
      {{ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)}}
      {{ at akka.actor.ActorCell.invoke(ActorCell.scala:561)}}
      {{ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)}}
      {{ at akka.dispatch.Mailbox.run(Mailbox.scala:225)}}
      {{ at akka.dispatch.Mailbox.exec(Mailbox.scala:235)}}
      {{ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)}}
      {{ at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)}}
      {{ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)}}
      {{ at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)}}
      Caused by: java.lang.Exception: Failed to finalize execution on master
      {{ ... 33 more}}
      Caused by: org.apache.flink.table.api.TableException: Exception in finalizeGlobal
      {{ at org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:97)}}
      {{ at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:131)}}
      {{ at org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1299)}}
      {{ ... 32 more}}
      Caused by: java.io.FileNotFoundException: File hdfs://service1/user/hive/warehouse/snmpprobe.db/p_port_packet_loss_5m/.staging_1609145792264/cp-0/task-0/dt=2020-12-28/hh=15/mi=50 does not exist.
      {{ at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:901)}}
      {{ at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:112)}}
      {{ at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:961)}}
      {{ at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:958)}}
      {{ at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)}}
      {{ at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:958)}}
      {{ at org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:165)}}
      {{ at org.apache.flink.table.utils.PartitionPathUtils.listStatusWithoutHidden(PartitionPathUtils.java:194)}}
      {{ at org.apache.flink.table.filesystem.PartitionLoader.renameFiles(PartitionLoader.java:111)}}
      {{ at org.apache.flink.table.filesystem.PartitionLoader.overwriteAndRenameFiles(PartitionLoader.java:89)}}
      {{ at org.apache.flink.table.filesystem.PartitionLoader.loadPartition(PartitionLoader.java:73)}}
      {{ at org.apache.flink.table.filesystem.FileSystemCommitter.commitSingleCheckpoint(FileSystemCommitter.java:111)}}
      {{ at org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:99)}}
      {{ at org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:95)}}
      {{ ... 34 more}}

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              macdoor615 macdoor615
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: