diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java index 85344fc..676eefa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java @@ -47,11 +47,12 @@ public class SparkHashTableSinkOperator extends TerminalOperator implements Serializable { - private static final int MIN_REPLICATION = 10; private static final long serialVersionUID = 1L; private final String CLASS_NAME = this.getClass().getName(); private final PerfLogger perfLogger = SessionState.getPerfLogger(); protected static final Logger LOG = LoggerFactory.getLogger(SparkHashTableSinkOperator.class.getName()); + public static final String DFS_REPLICATION_MAX = "dfs.replication.max"; + private int minReplication = 10; private final HashTableSinkOperator htsOperator; @@ -73,6 +74,9 @@ protected void initializeOp(Configuration hconf) throws HiveException { byte tag = conf.getTag(); inputOIs[tag] = inputObjInspectors[0]; conf.setTagOrder(new Byte[]{ tag }); + int dfsMaxRep = hconf.getInt(DFS_REPLICATION_MAX, minReplication); + if (minReplication > dfsMaxRep) + minReplication = dfsMaxRep; htsOperator.setConf(conf); htsOperator.initialize(hconf, inputOIs); } @@ -151,7 +155,7 @@ protected void flushToFile(MapJoinPersistableTableContainer tableContainer, } // TODO find out numOfPartitions for the big table int numOfPartitions = replication; - replication = (short) Math.max(MIN_REPLICATION, numOfPartitions); + replication = (short) Math.max(minReplication, numOfPartitions); htsOperator.console.printInfo(Utilities.now() + "\tDump the side-table for tag: " + tag + " with group count: " + tableContainer.size() + " into file: " + path); // get the hashtable file and path