diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java index 3b358ee..c3b1d0a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java @@ -156,14 +156,23 @@ protected void flushToFile(MapJoinPersistableTableContainer tableContainer, replication = (short) Math.max(minReplication, numOfPartitions); htsOperator.console.printInfo(Utilities.now() + "\tDump the side-table for tag: " + tag + " with group count: " + tableContainer.size() + " into file: " + path); - // get the hashtable file and path - OutputStream os = null; - ObjectOutputStream out = null; try { - os = fs.create(path, replication); - out = new ObjectOutputStream(new BufferedOutputStream(os, 4096)); + // get the hashtable file and path + OutputStream os = null; + ObjectOutputStream out = null; MapJoinTableContainerSerDe mapJoinTableSerde = htsOperator.mapJoinTableSerdes[tag]; - mapJoinTableSerde.persist(out, tableContainer); + try { + os = fs.create(path, replication); + out = new ObjectOutputStream(new BufferedOutputStream(os, 4096)); + mapJoinTableSerde.persist(out, tableContainer); + } finally { + if (out != null) { + out.close(); + } else if (os != null) { + os.close(); + } + } + FileStatus status = fs.getFileStatus(path); htsOperator.console.printInfo(Utilities.now() + "\tUploaded 1 File to: " + path + " (" + status.getLen() + " bytes)"); @@ -176,12 +185,6 @@ protected void flushToFile(MapJoinPersistableTableContainer tableContainer, + tag + ", file " + path, ex); } throw e; - } finally { - if (out != null) { - out.close(); - } else if (os != null) { - os.close(); - } } tableContainer.clear(); }