Sqoop
  1. Sqoop
  2. SQOOP-738

Sqoop is not importing all data in Sqoop 2

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 2.0.0
    • Fix Version/s: 1.99.1
    • Component/s: None
    • Labels:
      None

      Description

      I've tried to import exactly 408,957 (nice rounded number right?) rows in 10 mappers and I've noticed that not all mappers will supply all the data all the time. For example in run I got 6 files with expected size of 10MB whereas the other 4 random files are completely empty. In another run I got 8 files of 10MB and just 2 files empty. I did not quite found any logic regarding how many and which files will end up empty. We definitely need to address this.

      1. SQOOP-738.patch
        7 kB
        Hari Shreedharan

        Issue Links

          Activity

          Hide
          Jarek Jarcec Cecho added a comment -

          Patch is in: https://git-wip-us.apache.org/repos/asf?p=sqoop.git;a=commit;h=dc81bcf998ac572a35ba5f8b43f8f90026f28ebe

          That was awesome work Hari, thank you very much!

          Jarcec

          Show
          Jarek Jarcec Cecho added a comment - Patch is in: https://git-wip-us.apache.org/repos/asf?p=sqoop.git;a=commit;h=dc81bcf998ac572a35ba5f8b43f8f90026f28ebe That was awesome work Hari, thank you very much! Jarcec
          Hide
          Hari Shreedharan added a comment -

          Also I found an issue in HDFSHdfsTextExportExtractor when writing this patch, which is why the 2 asserts were commented in TestHdfsExtract was commented out. These should be uncommented when SQOOP-761 is fixed.

          Show
          Hari Shreedharan added a comment - Also I found an issue in HDFSHdfsTextExportExtractor when writing this patch, which is why the 2 asserts were commented in TestHdfsExtract was commented out. These should be uncommented when SQOOP-761 is fixed.
          Hide
          Hari Shreedharan added a comment -

          Sorry for the larger than promised "3-line patch." I fixed a couple issues in the class:

          • readerFinished being set to true now before throwing.
          • refactored the future.get() call into one method and checking for exceptions into 2 methods (and renamed the original method)
          • gave a name to the output format executor consumer thread.
          • The close method now waits for the consumer thread to finish before returning - the real fix to the issue.
          • the readContent method checks if writerFinished only after acquiring the semaphore, which is now also released by close method.
          • As long as the write and close calls come from same thread (or can be serialized such that close call will come only after all write calls return, while guaranteeing all write calls are also serial in nature), then this patch should fix this issue.
          Show
          Hari Shreedharan added a comment - Sorry for the larger than promised "3-line patch." I fixed a couple issues in the class: readerFinished being set to true now before throwing. refactored the future.get() call into one method and checking for exceptions into 2 methods (and renamed the original method) gave a name to the output format executor consumer thread. The close method now waits for the consumer thread to finish before returning - the real fix to the issue. the readContent method checks if writerFinished only after acquiring the semaphore, which is now also released by close method. As long as the write and close calls come from same thread (or can be serialized such that close call will come only after all write calls return, while guaranteeing all write calls are also serial in nature), then this patch should fix this issue.
          Hide
          Hari Shreedharan added a comment -

          I renamed a couple of tests to reflect what they really do, and found that compressed tests fail once this patch is applied. Likely the counting logic has some problem with compressed files.

          Show
          Hari Shreedharan added a comment - I renamed a couple of tests to reflect what they really do, and found that compressed tests fail once this patch is applied. Likely the counting logic has some problem with compressed files.
          Hide
          Hari Shreedharan added a comment -

          Looks like the compressed tests are failing. I don't think it is due to LocalJobRunner. It is likely due to some other bug.

          Show
          Hari Shreedharan added a comment - Looks like the compressed tests are failing. I don't think it is due to LocalJobRunner. It is likely due to some other bug.
          Hide
          Hari Shreedharan added a comment -

          Sorry, it wasn't 3 lines. But I refined some aspects of the class a bit further.

          Show
          Hari Shreedharan added a comment - Sorry, it wasn't 3 lines. But I refined some aspects of the class a bit further.
          Hide
          Jarek Jarcec Cecho added a comment - - edited

          My patch that I've mentioned earlier on this ticket is failing on exactly the same asserts. I'm assuming that there is something weird going on in LocalJobRunner that the test are using, but I'll need some time to investigate this assumption.

          Show
          Jarek Jarcec Cecho added a comment - - edited My patch that I've mentioned earlier on this ticket is failing on exactly the same asserts. I'm assuming that there is something weird going on in LocalJobRunner that the test are using, but I'll need some time to investigate this assumption.
          Hide
          Hari Shreedharan added a comment - - edited

          I have a patch that should work. But these 2 lines in TestHdfsExtract.java end up failing:

                assertEquals((1+numbers)*numbers/2, sum);
          
                assertEquals(NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE, index-1);
          
          Show
          Hari Shreedharan added a comment - - edited I have a patch that should work. But these 2 lines in TestHdfsExtract.java end up failing: assertEquals((1+numbers)*numbers/2, sum); assertEquals(NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE, index-1);
          Hide
          Jarek Jarcec Cecho added a comment -

          I do have a patch that is working on the cluster (with explicit waiting), but I'm failing on tests and I'm not quite sure why. Please feel free to go ahead if you have 3 code lines solution that won't break the tests I'll be more than happy to commit it!

          Show
          Jarek Jarcec Cecho added a comment - I do have a patch that is working on the cluster (with explicit waiting), but I'm failing on tests and I'm not quite sure why. Please feel free to go ahead if you have 3 code lines solution that won't break the tests I'll be more than happy to commit it!
          Hide
          Hari Shreedharan added a comment - - edited

          Ah, so you mean to say that the free.release() call in the readContent method unblocks the framework to commit before the filesystem.close method is called? That makes sense, which is why it was not noticed till now - since it would be seen only on larger workloads. We actually never waited on the completion condition (even before I refactored this class, since we always did the threading from within this class only, and never considered the close condition). That is pretty easy to do. Just add a CyclicBuffer to do it. It would be about 3 lines of code.

          Show
          Hari Shreedharan added a comment - - edited Ah, so you mean to say that the free.release() call in the readContent method unblocks the framework to commit before the filesystem.close method is called? That makes sense, which is why it was not noticed till now - since it would be seen only on larger workloads. We actually never waited on the completion condition (even before I refactored this class, since we always did the threading from within this class only, and never considered the close condition). That is pretty easy to do. Just add a CyclicBuffer to do it. It would be about 3 lines of code.
          Hide
          Jarek Jarcec Cecho added a comment -

          Quick comment: Following log line from my earlier comment was added by me and was not part of the code that generated the log output that I've pasted before that.

          LOG.info("Closing SqoopOutputFormat RecordWriter");
          

          Please accept my apology for the confusion. I'll be more careful when posting code next time.

          Jarcec

          Show
          Jarek Jarcec Cecho added a comment - Quick comment: Following log line from my earlier comment was added by me and was not part of the code that generated the log output that I've pasted before that. LOG.info( "Closing SqoopOutputFormat RecordWriter" ); Please accept my apology for the confusion. I'll be more careful when posting code next time. Jarcec
          Hide
          Jarek Jarcec Cecho added a comment -

          Actually I totally agree with you. At close() method we're sure that the consumer has read all the data.

          The problem as I can see it is that we're only sure that the consumer has read all data - but job of the consumer is not only to read data from extractor, but also to save them (currently only to Hdfs). And it seems to me that we're not waiting on this particular condition anywhere in the code. I've noticed that we've got empty file when the loader was interrupted during filewriter.close() (HdfsTextImportLoader.java:94) which was the original hint that got me to this idea.

          I've also dig in Hadoop source code and to tell long story short: Hadoop will get RecoderWriter from OutputFormat (in my case SqoopOutputFormatLoadExecutor::SqoopRecordWriter) and execute the mapper with this writer. After mapper finish, Hadoop will close the RecordWriter and commit the task output. Committing task output in our case basically means to take intermediate output file and move it to final destination. However because we're doing asynchronous writes at the time of calling SqoopRecordWriter::close() we only now that our loader has read all data from extractor, not that it manage to save them to Hdfs and correctly close the file that it's writing into. And as it happens, I assume that Hadoop was sometimes fast enough to move this intermediate file before loader was able to finish it.

          I actually already do have working patch, I just need to clean it up and submit it for review.

          Jarcec

          Show
          Jarek Jarcec Cecho added a comment - Actually I totally agree with you. At close() method we're sure that the consumer has read all the data. The problem as I can see it is that we're only sure that the consumer has read all data - but job of the consumer is not only to read data from extractor, but also to save them (currently only to Hdfs). And it seems to me that we're not waiting on this particular condition anywhere in the code. I've noticed that we've got empty file when the loader was interrupted during filewriter.close() (HdfsTextImportLoader.java:94) which was the original hint that got me to this idea. I've also dig in Hadoop source code and to tell long story short: Hadoop will get RecoderWriter from OutputFormat (in my case SqoopOutputFormatLoadExecutor::SqoopRecordWriter) and execute the mapper with this writer. After mapper finish, Hadoop will close the RecordWriter and commit the task output. Committing task output in our case basically means to take intermediate output file and move it to final destination. However because we're doing asynchronous writes at the time of calling SqoopRecordWriter::close() we only now that our loader has read all data from extractor, not that it manage to save them to Hdfs and correctly close the file that it's writing into. And as it happens, I assume that Hadoop was sometimes fast enough to move this intermediate file before loader was able to finish it. I actually already do have working patch, I just need to clean it up and submit it for review. Jarcec
          Hide
          Hari Shreedharan added a comment -

          Also in the logs you posted above, I don't see a call to close at all. This should be there in the logs if the close method was called: Closing SqoopOutputFormat RecordWriter.

          Also, when I changed the close() method, I wrote it with the assumption that it is called from the same thread that called write (which was the assumption in the close method at that time - it basically did a data.wait() and the notify would come from the consumer thread). Unless that somehow changed, I don't see a problem in the above code.

          Show
          Hari Shreedharan added a comment - Also in the logs you posted above, I don't see a call to close at all. This should be there in the logs if the close method was called: Closing SqoopOutputFormat RecordWriter. Also, when I changed the close() method, I wrote it with the assumption that it is called from the same thread that called write (which was the assumption in the close method at that time - it basically did a data.wait() and the notify would come from the consumer thread). Unless that somehow changed, I don't see a problem in the above code.
          Hide
          Hari Shreedharan added a comment -

          Jarcec,

          Why do you think the consumer is not waited on? I looks like it is. The free.acquire call blocks this thread until the consumer thread has done free.release(), which it does only after processing the data in the pipeline. It looks ok to me.

          Show
          Hari Shreedharan added a comment - Jarcec, Why do you think the consumer is not waited on? I looks like it is. The free.acquire call blocks this thread until the consumer thread has done free.release(), which it does only after processing the data in the pipeline. It looks ok to me.
          Hide
          Jarek Jarcec Cecho added a comment -

          I've continued my investigation and I believe that the problem is located in this method of our own RecordWriter instance:

          org.apache.sqoop.job.mr.SqoopOutputFormatLoadExecutor:83
             @Override
              public void close(TaskAttemptContext context) throws InterruptedException {
                LOG.info("Closing SqoopOutputFormat RecordWriter");
                checkConsumerCompletion();
                free.acquire();
                writerFinished = true;
                // This will interrupt only the acquire call in the consumer class,
                // since we have acquired the free semaphore, and close is called from
                // the same thread that writes - so filled has not been released since then
                // so the consumer is definitely blocked on the filled semaphore.
                consumerFuture.cancel(true);
              }
          

          Contract of RecordWriter::close() method is to finish all writing (flush and close all stuff) so that Hadoop can continue with committing results. I believe that our implementation should wait on the reader thread to finish here in order to fulfill the contract.

          Jarcec

          Show
          Jarek Jarcec Cecho added a comment - I've continued my investigation and I believe that the problem is located in this method of our own RecordWriter instance: org.apache.sqoop.job.mr.SqoopOutputFormatLoadExecutor:83 @Override public void close(TaskAttemptContext context) throws InterruptedException { LOG.info( "Closing SqoopOutputFormat RecordWriter" ); checkConsumerCompletion(); free.acquire(); writerFinished = true ; // This will interrupt only the acquire call in the consumer class, // since we have acquired the free semaphore, and close is called from // the same thread that writes - so filled has not been released since then // so the consumer is definitely blocked on the filled semaphore. consumerFuture.cancel( true ); } Contract of RecordWriter::close() method is to finish all writing (flush and close all stuff) so that Hadoop can continue with committing results. I believe that our implementation should wait on the reader thread to finish here in order to fulfill the contract. Jarcec
          Hide
          Jarek Jarcec Cecho added a comment -

          I was able to catch up following mapper log from task that delivered empty file:

          2012-12-08 18:35:28,031 WARN mapreduce.Counters: Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead
          2012-12-08 18:35:29,099 WARN org.apache.hadoop.conf.Configuration: session.id is deprecated. Instead, use dfs.metrics.session-id
          2012-12-08 18:35:29,101 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: Initializing JVM Metrics with processName=MAP, sessionId=
          2012-12-08 18:35:39,582 INFO org.apache.hadoop.util.ProcessTree: setsid exited with exit code 0
          2012-12-08 18:35:39,587 INFO org.apache.hadoop.mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@543bc20e
          2012-12-08 18:35:48,655 INFO org.apache.hadoop.mapred.Task: Task:attempt_201212071653_0005_m_000004_0 is done. And is in the process of commiting
          2012-12-08 18:35:49,787 INFO org.apache.hadoop.mapred.Task: Task attempt_201212071653_0005_m_000004_0 is allowed to commit now
          2012-12-08 18:35:49,858 INFO org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Saved output of task 'attempt_201212071653_0005_m_000004_0' to /user/root/texts
          2012-12-08 18:35:49,864 INFO org.apache.hadoop.mapred.Task: Task 'attempt_201212071653_0005_m_000004_0' done.
          2012-12-08 18:35:51,445 INFO org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1
          2012-12-08 18:35:51,490 ERROR org.apache.sqoop.job.mr.SqoopOutputFormatLoadExecutor: Error while loading data out of MR job.
          org.apache.sqoop.common.SqoopException: MAPRED_EXEC_0018:Error occurs during loader run
          	at org.apache.sqoop.job.etl.HdfsTextImportLoader.load(HdfsTextImportLoader.java:98)
          	at org.apache.sqoop.job.mr.SqoopOutputFormatLoadExecutor$ConsumerThread.run(SqoopOutputFormatLoadExecutor.java:193)
          	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
          	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
          	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
          	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
          	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
          	at java.lang.Thread.run(Thread.java:662)
          Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /user/root/texts/_temporary/_attempt_201212071653_0005_m_000004_0/part-m-00004 File does not exist. [Lease.  Holder: DFSClient_NONMAPREDUCE_-243096719_1, pendingcreates: 1]
          	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2308)
          	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2299)
          	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:2366)
          	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:2343)
          	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:526)
          	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:335)
          	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44084)
          	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
          	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898)
          	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693)
          	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689)
          	at java.security.AccessController.doPrivileged(Native Method)
          	at javax.security.auth.Subject.doAs(Subject.java:396)
          	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
          	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687)
          
          	at org.apache.hadoop.ipc.Client.call(Client.java:1160)
          	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
          	at $Proxy10.complete(Unknown Source)
          	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
          	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
          	at java.lang.reflect.Method.invoke(Method.java:597)
          	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
          	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
          	at $Proxy10.complete(Unknown Source)
          	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:329)
          	at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:1769)
          	at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:1756)
          	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:66)
          	at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:99)
          	at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:301)
          	at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:130)
          	at java.io.OutputStreamWriter.close(OutputStreamWriter.java:216)
          	at java.io.BufferedWriter.close(BufferedWriter.java:248)
          	at org.apache.sqoop.job.etl.HdfsTextImportLoader.load(HdfsTextImportLoader.java:95)
          	... 7 more
          

          Please notice that Hadoop has committed the task (= move output file) before the exception which suggest that we were still writing output at the time the commit was happening. I believe that due to our synchronous way of moving data from mapper (reducer) to output format, it might happen that mapper finish before all data are written to disk. Sometimes when this happens, Hadoop will be fast enough to call task committer that will move output data file before we end writing, thus loosing unflushed data.

          Show
          Jarek Jarcec Cecho added a comment - I was able to catch up following mapper log from task that delivered empty file: 2012-12-08 18:35:28,031 WARN mapreduce.Counters: Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead 2012-12-08 18:35:29,099 WARN org.apache.hadoop.conf.Configuration: session.id is deprecated. Instead, use dfs.metrics.session-id 2012-12-08 18:35:29,101 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: Initializing JVM Metrics with processName=MAP, sessionId= 2012-12-08 18:35:39,582 INFO org.apache.hadoop.util.ProcessTree: setsid exited with exit code 0 2012-12-08 18:35:39,587 INFO org.apache.hadoop.mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@543bc20e 2012-12-08 18:35:48,655 INFO org.apache.hadoop.mapred.Task: Task:attempt_201212071653_0005_m_000004_0 is done. And is in the process of commiting 2012-12-08 18:35:49,787 INFO org.apache.hadoop.mapred.Task: Task attempt_201212071653_0005_m_000004_0 is allowed to commit now 2012-12-08 18:35:49,858 INFO org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Saved output of task 'attempt_201212071653_0005_m_000004_0' to /user/root/texts 2012-12-08 18:35:49,864 INFO org.apache.hadoop.mapred.Task: Task 'attempt_201212071653_0005_m_000004_0' done. 2012-12-08 18:35:51,445 INFO org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1 2012-12-08 18:35:51,490 ERROR org.apache.sqoop.job.mr.SqoopOutputFormatLoadExecutor: Error while loading data out of MR job. org.apache.sqoop.common.SqoopException: MAPRED_EXEC_0018:Error occurs during loader run at org.apache.sqoop.job.etl.HdfsTextImportLoader.load(HdfsTextImportLoader.java:98) at org.apache.sqoop.job.mr.SqoopOutputFormatLoadExecutor$ConsumerThread.run(SqoopOutputFormatLoadExecutor.java:193) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang. Thread .run( Thread .java:662) Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /user/root/texts/_temporary/_attempt_201212071653_0005_m_000004_0/part-m-00004 File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_-243096719_1, pendingcreates: 1] at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2308) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2299) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:2366) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:2343) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:526) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:335) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44084) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687) at org.apache.hadoop.ipc.Client.call(Client.java:1160) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202) at $Proxy10.complete(Unknown Source) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83) at $Proxy10.complete(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:329) at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:1769) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:1756) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:66) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:99) at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:301) at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:130) at java.io.OutputStreamWriter.close(OutputStreamWriter.java:216) at java.io.BufferedWriter.close(BufferedWriter.java:248) at org.apache.sqoop.job.etl.HdfsTextImportLoader.load(HdfsTextImportLoader.java:95) ... 7 more Please notice that Hadoop has committed the task (= move output file) before the exception which suggest that we were still writing output at the time the commit was happening. I believe that due to our synchronous way of moving data from mapper (reducer) to output format, it might happen that mapper finish before all data are written to disk. Sometimes when this happens, Hadoop will be fast enough to call task committer that will move output data file before we end writing, thus loosing unflushed data.

            People

            • Assignee:
              Hari Shreedharan
              Reporter:
              Jarek Jarcec Cecho
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development