Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7695

Read TFRecord Files from hdfs will meet exception if file size is large

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.2.0, 2.3.0, 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0, 2.9.0, 2.10.0, 2.11.0, 2.12.0, 2.13.0
    • None
    • None

    Description

      I read the TFRecord files which is in HDFS will meet error.

      • The single TFRecord file is larger than 3GB.
      • The total size larger than 1TB.
      • Using Beam 2.13.0 + Flinkrunner 2.13.0 + Java 1.8, I also test 2.11.0/2.12.0 with same problem

      The dependency jar (in build.gradle):

      dependencies
      
      { // This dependency is found on compile classpath of this component and consumers. //implementation 'com.google.guava:guava:27.0.1-jre' compile 'org.apache.beam:beam-sdks-java-core:2.13.0' compile 'org.apache.beam:beam-vendor-guava-20_0:0.1' compile 'org.tensorflow:tensorflow-hadoop:1.13.1' compile 'org.apache.beam:beam-runners-direct-java:2.13.0' //implementation "org.apache.beam:beam-sdks-java-core:2.13.0" compile "org.apache.beam:beam-runners-flink_2.11:2.13.0" compile "org.apache.beam:beam-sdks-java-io-hadoop-file-system:2.13.0" compile "org.apache.hadoop:hadoop-common:2.7.3" compile "org.apache.hadoop:hadoop-client:2.7.3" compile "org.apache.hadoop:hadoop-mapreduce-client-core:2.7.3" compile "org.tensorflow:proto:1.13.1" compile "org.apache.beam:beam-sdks-java-extensions-sketching:2.13.0" // Use JUnit test framework testImplementation 'junit:junit:4.12' }

      The error msg:

       

      ------------------------------------------------------------
      
      The program finished with the following exception:
      
      org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
      at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
      at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
      at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
      at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
      at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
      at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
      at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:422)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
      at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
      at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
      Caused by: java.lang.RuntimeException: Pipeline execution failed
      at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:116)
      at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
      at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
      at avazu.data.transform.App.testTfrecordQIYU(App.java:572)
      at avazu.data.transform.App.main(App.java:744)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
      ... 12 more
      Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: fe4ce5375efbbb55e56967e4c7a975b2)
      at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:475)
      at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
      at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:139)
      at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
      ... 21 more
      Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
      at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
      ... 26 more
      Caused by: java.io.IOException: Mismatch of length mask when reading a record. Expected 808268081 but received 1769712859.
      at org.apache.beam.sdk.io.TFRecordIO$TFRecordCodec.read(TFRecordIO.java:651)
      at org.apache.beam.sdk.io.TFRecordIO$TFRecordSource$TFRecordReader.readNextRecord(TFRecordIO.java:530)
      at org.apache.beam.sdk.io.CompressedSource$CompressedReader.readNextRecord(CompressedSource.java:431)
      at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:484)
      at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:479)
      at org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249)
      at org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeStart(ReaderInvocationUtil.java:51)
      at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:75)
      at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:42)
      at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
      at java.lang.Thread.run(Thread.java:745)
      

      How to fix?

      • I have already fix the bug with the following code.
      • I will refine the following code and commit the codes.

       

      diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
      index 96a753a..484a7cb 100644
      --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
      +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
      @@ -631,8 +631,16 @@ public class TFRecordIO {
      int headerBytes = inChannel.read(header);
      if (headerBytes <= 0) {
      return null;
      + } else if (headerBytes != HEADER_LEN) {
      + while (header.hasRemaining() && inChannel.read(header) >= 0) {}
      + if (header.hasRemaining()) {
      + throw new IOException(String.format(
      + "EOF while reading record of length %d. Read only %d bytes. Input might be truncated. Not a valid TFRecord. Fewer than 12 bytes.",
      + HEADER_LEN, header.position()));
      + }
      + } else {
      +
      }
      - checkState(headerBytes == HEADER_LEN, "Not a valid TFRecord. Fewer than 12 bytes.");
      
      header.rewind();
      long length = header.getLong();
      @@ -655,7 +663,12 @@ public class TFRecordIO {
      }
      
      footer.clear();
      - inChannel.read(footer);
      + while (footer.hasRemaining() && inChannel.read(footer) >= 0) {}
      + if (footer.hasRemaining()) {
      + throw new IOException(String.format(
      + "EOF while reading record of length %d. Read only %d bytes. Input might be truncated. Footer error.",
      + FOOTER_LEN, footer.position()));
      + }
      footer.rewind();
      
      int maskedCrc32OfData = footer.getInt();
      

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            silenceli LI HAO
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: