Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20277

flink-1.11.2 ContinuousFileMonitoringFunction cannot restore from failure

    XMLWordPrintableJSON

    Details

      Description

      流式消费Hive表,出现失败时,任务无法正常恢复,一直重启。

      一直报错:The ContinuousFileMonitoringFunction has already restored from a previous Flink version.

       

      java.io.FileNotFoundException: File does not exist: hdfs://nameservice1/rawdata/db/bw_hana/sapecc/hepecc_ekko_cut
      at org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:1270) ~[hadoop-hdfs-2.6.0-cdh5.16.2.jar:?]
      at org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:1262) ~[hadoop-hdfs-2.6.0-cdh5.16.2.jar:?]
      at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) ~[hadoop-common-2.6.0-cdh5.16.2.jar:?]
      at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1262) ~[hadoop-hdfs-2.6.0-cdh5.16.2.jar:?]
      at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
      at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:588) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
      at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.getInputSplitsSortedByModTime(ContinuousFileMonit
      oringFunction.java:279) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
      at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.monitorDirAndForwardSplits(ContinuousFileMonitori
      ngFunction.java:251) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
      at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:215) ~[
      flink-dist_2.11-1.11.2.jar:1.11.2]
      at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
      at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
      at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) ~[flink-dist_2
      .11-1.11.2.jar:1.11.2]

       

       

      2020-11-23 05:00:33,313 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Split Reader: HiveFileMonitoringFunction -> S
      ink: Sink(table=[default_catalog.default_database.kafka_hepecc_ekko_cut_json], fields=[mandt, ebeln, bukrs, bstyp, bsart, bsakz, loekz, statu
      , aedat, ernam, pincr, lponr, lifnr, spras, zterm, zbd1t, zbd2t, zbd3t, zbd1p, zbd2p, ekorg, ekgrp, waers, wkurs, kufix, bedat, kdatb, kdate,
      bwbdt, angdt, bnddt, gwldt, ausnr, angnr, ihran, ihrez, verkf, telf1, llief, kunnr, konnr, abgru, autlf, weakt, reswk, lblif, inco1, inco2,
      ktwrt, submi, knumv, kalsm, stafo, lifre, exnum, unsez, logsy, upinc, stako, frggr, frgsx, frgke, frgzu, frgrl, lands, lphis, adrnr, stceg_l,
      stceg, absgr, addnr, kornr, memory, procstat, rlwrt, revno, scmproc, reason_code, memorytype, rettp, retpc, dptyp, dppct, dpamt, dpdat, msr_
      id, hierarchy_exists, threshold_exists, legal_contract, description, release_date, force_id, force_cnt, reloc_id, reloc_seq_id, source_logsys
      , auflg, yxcort, ysyb, ypsfs, yxqlx, yjplx, ylszj, yxdry, yxdrymc, ylbid1, yqybm, ysxpt_order, yy_write_if, fanpfg, yresfg, yrcofg, yretxt, y
      ...skipping...
      java.lang.IllegalArgumentException: The ContinuousFileMonitoringFunction has already restored from a previous Flink version.
      at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.initializeState(ContinuousFileMonitoringFunction.java:176) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
      at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
      at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
      at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
      at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
      at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
      at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
      at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
      at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
      at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
      at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_251]

       

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                godfreyhe godfrey he
                Reporter:
                hiscat 谢波
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: