Actually I totally agree with you. At close() method we're sure that the consumer has read all the data.
The problem as I can see it is that we're only sure that the consumer has read all data - but job of the consumer is not only to read data from extractor, but also to save them (currently only to Hdfs). And it seems to me that we're not waiting on this particular condition anywhere in the code. I've noticed that we've got empty file when the loader was interrupted during filewriter.close() (HdfsTextImportLoader.java:94) which was the original hint that got me to this idea.
I've also dig in Hadoop source code and to tell long story short: Hadoop will get RecoderWriter from OutputFormat (in my case SqoopOutputFormatLoadExecutor::SqoopRecordWriter) and execute the mapper with this writer. After mapper finish, Hadoop will close the RecordWriter and commit the task output. Committing task output in our case basically means to take intermediate output file and move it to final destination. However because we're doing asynchronous writes at the time of calling SqoopRecordWriter::close() we only now that our loader has read all data from extractor, not that it manage to save them to Hdfs and correctly close the file that it's writing into. And as it happens, I assume that Hadoop was sometimes fast enough to move this intermediate file before loader was able to finish it.
I actually already do have working patch, I just need to clean it up and submit it for review.