.../hadoop/hive/ql/exec/SparkHashTableSinkOperator.java | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java index c3b1d0a..50dcdb5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java @@ -49,11 +49,13 @@ private final String CLASS_NAME = this.getClass().getName(); private final transient PerfLogger perfLogger = SessionState.getPerfLogger(); protected static final Logger LOG = LoggerFactory.getLogger(SparkHashTableSinkOperator.class.getName()); - public static final String DFS_REPLICATION_MAX = "dfs.replication.max"; - private int minReplication = 10; + private static final String MAPRED_FILE_REPLICATION = "mapreduce.client.submit.file.replication"; + private static final int DEFAULT_REPLICATION = 1; private final HashTableSinkOperator htsOperator; + private short numReplication; + /** Kryo ctor. */ protected SparkHashTableSinkOperator() { super(); @@ -72,9 +74,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { byte tag = conf.getTag(); inputOIs[tag] = inputObjInspectors[0]; conf.setTagOrder(new Byte[]{ tag }); - int dfsMaxReplication = hconf.getInt(DFS_REPLICATION_MAX, minReplication); - // minReplication value should not cross the value of dfs.replication.max - minReplication = Math.min(minReplication, dfsMaxReplication); + numReplication = (short) hconf.getInt(MAPRED_FILE_REPLICATION, DEFAULT_REPLICATION); htsOperator.setConf(conf); htsOperator.initialize(hconf, inputOIs); } @@ -136,7 +136,6 @@ protected void flushToFile(MapJoinPersistableTableContainer tableContainer, String dumpFilePrefix = conf.getDumpFilePrefix(); Path path = Utilities.generatePath(tmpURI, dumpFilePrefix, tag, fileName); FileSystem fs = path.getFileSystem(htsOperator.getConfiguration()); - short replication = fs.getDefaultReplication(path); fs.mkdirs(path); // Create the folder and its parents if not there while (true) { @@ -151,9 +150,7 @@ protected void flushToFile(MapJoinPersistableTableContainer tableContainer, // No problem, use a new name } } - // TODO find out numOfPartitions for the big table - int numOfPartitions = replication; - replication = (short) Math.max(minReplication, numOfPartitions); + htsOperator.console.printInfo(Utilities.now() + "\tDump the side-table for tag: " + tag + " with group count: " + tableContainer.size() + " into file: " + path); try { @@ -162,7 +159,7 @@ protected void flushToFile(MapJoinPersistableTableContainer tableContainer, ObjectOutputStream out = null; MapJoinTableContainerSerDe mapJoinTableSerde = htsOperator.mapJoinTableSerdes[tag]; try { - os = fs.create(path, replication); + os = fs.create(path, numReplication); out = new ObjectOutputStream(new BufferedOutputStream(os, 4096)); mapJoinTableSerde.persist(out, tableContainer); } finally {