.../hadoop/hive/ql/exec/mr/HashTableLoader.java | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java index f5662f0..5cf0802 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; @@ -53,7 +54,7 @@ */ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTableLoader { - private static final Logger LOG = LoggerFactory.getLogger(MapJoinOperator.class.getName()); + private static final Logger LOG = LoggerFactory.getLogger(MapJoinOperator.class); private ExecMapperContext context; private Configuration hconf; @@ -76,7 +77,7 @@ public void load( MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException { String currentInputPath = context.getCurrentInputPath().toString(); - LOG.info("******* Load from HashTable for input file: " + currentInputPath); + LOG.info("Load from HashTable for input file: {}", currentInputPath); MapredLocalWork localWork = context.getLocalWork(); try { if (localWork.getDirectFetchOp() != null) { @@ -92,9 +93,9 @@ public void load( continue; } Path path = Utilities.generatePath(baseDir, desc.getDumpFilePrefix(), (byte)pos, fileName); - LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path); + LOG.info("Load back 1 hashtable file from tmp file uri: {}", path); ObjectInputStream in = new ObjectInputStream(new BufferedInputStream( - new FileInputStream(path.toUri().getPath()), 4096)); + new FileInputStream(path.toUri().getPath()))); try{ mapJoinTables[pos] = mapJoinTableSerdes[pos].load(in); } finally { @@ -115,12 +116,10 @@ private Path getBaseDir(MapredLocalWork localWork) throws Exception { String stageID = localWork.getStageID(); String suffix = Utilities.generateTarFileName(stageID); FileSystem localFs = FileSystem.getLocal(hconf); - for (int j = 0; j < localArchives.length; j++) { - Path archive = localArchives[j]; - if (!archive.getName().endsWith(suffix)) { - continue; + for (Path archive : localArchives) { + if (archive.getName().endsWith(suffix)) { + return archive.makeQualified(localFs); } - return archive.makeQualified(localFs); } } return null; @@ -130,7 +129,7 @@ private void loadDirectly(MapJoinTableContainer[] mapJoinTables, String inputFil throws Exception { MapredLocalWork localWork = context.getLocalWork(); List> directWorks = localWork.getDirectFetchOp().get(joinOp); - if (directWorks == null || directWorks.isEmpty()) { + if (CollectionUtils.isEmpty(directWorks)) { return; } JobConf job = new JobConf(hconf); @@ -153,7 +152,5 @@ private void loadDirectly(MapJoinTableContainer[] mapJoinTables, String inputFil mapJoinTables[i] = tables[i]; } } - - Arrays.fill(tables, null); } }