diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 95c5c0e..ea6a4a1 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2795,6 +2795,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "directories that are partition-like but contain unsupported characters. 'throw' (an " + "exception) is the default; 'skip' will skip the invalid directories and still repair the" + " others; 'ignore' will skip the validation (legacy behavior, causes bugs in many cases)"), + HIVE_MSCK_REPAIR_BATCH_SIZE( + "hive.msck.repair.batch.size", 0, + "Batch size for the msck repair command. If the value is greaterthan zero, " + + "it will execute batchwise with the configured batch size. " + + "The defaul value is zero. Zero means it will execute directly (Not batchwise)"), HIVE_SERVER2_LLAP_CONCURRENT_QUERIES("hive.server2.llap.concurrent.queries", -1, "The number of queries allowed in parallel via llap. Negative number implies 'infinite'."), HIVE_TEZ_ENABLE_MEMORY_MANAGER("hive.tez.enable.memory.manager", true, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index b26f09d..4a541c1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -1829,12 +1829,28 @@ private int msck(Hive db, MsckDesc msckDesc) { AddPartitionDesc apd = new AddPartitionDesc( table.getDbName(), table.getTableName(), false); try { - for (CheckResult.PartitionResult part : partsNotInMs) { - apd.addPartition(Warehouse.makeSpecFromName(part.getPartitionName()), null); - repairOutput.add("Repair: Added partition to metastore " - + msckDesc.getTableName() + ':' + part.getPartitionName()); + int batch_size = conf.getIntVar(ConfVars.HIVE_MSCK_REPAIR_BATCH_SIZE); + if (batch_size > 0 && partsNotInMs.size() > batch_size) { + int counter = 0; + for (CheckResult.PartitionResult part : partsNotInMs) { + counter++; + apd.addPartition(Warehouse.makeSpecFromName(part.getPartitionName()), null); + repairOutput.add("Repair: Added partition to metastore " + msckDesc.getTableName() + + ':' + part.getPartitionName()); + if (counter == batch_size) { + db.createPartitions(apd); + apd = new AddPartitionDesc(table.getDbName(), table.getTableName(), false); + counter = 0; + } + } + } else { + for (CheckResult.PartitionResult part : partsNotInMs) { + apd.addPartition(Warehouse.makeSpecFromName(part.getPartitionName()), null); + repairOutput.add("Repair: Added partition to metastore " + msckDesc.getTableName() + + ':' + part.getPartitionName()); + } + db.createPartitions(apd); } - db.createPartitions(apd); } catch (Exception e) { LOG.info("Could not bulk-add partitions to metastore; trying one by one", e); repairOutput.clear(); diff --git ql/src/test/queries/clientpositive/msck_repair_batchsize.q ql/src/test/queries/clientpositive/msck_repair_batchsize.q new file mode 100644 index 0000000..60970e2 --- /dev/null +++ ql/src/test/queries/clientpositive/msck_repair_batchsize.q @@ -0,0 +1,19 @@ +set hive.msck.repair.batch.size=1; + +DROP TABLE IF EXISTS repairtable; + +CREATE TABLE repairtable(col STRING) PARTITIONED BY (p1 STRING, p2 STRING); + +MSCK TABLE repairtable; + +dfs ${system:test.dfs.mkdir} ${system:test.warehouse.dir}/repairtable/p1=a/p2=a; +dfs ${system:test.dfs.mkdir} ${system:test.warehouse.dir}/repairtable/p1=b/p2=a; +dfs -touchz ${system:test.warehouse.dir}/repairtable/p1=b/p2=a/datafile; + +MSCK TABLE default.repairtable; + +MSCK REPAIR TABLE default.repairtable; + +MSCK TABLE repairtable; + +DROP TABLE default.repairtable; diff --git ql/src/test/results/clientpositive/msck_repair_batchsize.q.out ql/src/test/results/clientpositive/msck_repair_batchsize.q.out new file mode 100644 index 0000000..86ac031 --- /dev/null +++ ql/src/test/results/clientpositive/msck_repair_batchsize.q.out @@ -0,0 +1,40 @@ +PREHOOK: query: DROP TABLE IF EXISTS repairtable +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS repairtable +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE repairtable(col STRING) PARTITIONED BY (p1 STRING, p2 STRING) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@repairtable +POSTHOOK: query: CREATE TABLE repairtable(col STRING) PARTITIONED BY (p1 STRING, p2 STRING) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@repairtable +PREHOOK: query: MSCK TABLE repairtable +PREHOOK: type: MSCK +POSTHOOK: query: MSCK TABLE repairtable +POSTHOOK: type: MSCK +PREHOOK: query: MSCK TABLE default.repairtable +PREHOOK: type: MSCK +POSTHOOK: query: MSCK TABLE default.repairtable +POSTHOOK: type: MSCK +Partitions not in metastore: repairtable:p1=a/p2=a repairtable:p1=b/p2=a +PREHOOK: query: MSCK REPAIR TABLE default.repairtable +PREHOOK: type: MSCK +POSTHOOK: query: MSCK REPAIR TABLE default.repairtable +POSTHOOK: type: MSCK +Partitions not in metastore: repairtable:p1=a/p2=a repairtable:p1=b/p2=a +Repair: Added partition to metastore default.repairtable:p1=a/p2=a +Repair: Added partition to metastore default.repairtable:p1=b/p2=a +PREHOOK: query: MSCK TABLE repairtable +PREHOOK: type: MSCK +POSTHOOK: query: MSCK TABLE repairtable +POSTHOOK: type: MSCK +PREHOOK: query: DROP TABLE default.repairtable +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@repairtable +PREHOOK: Output: default@repairtable +POSTHOOK: query: DROP TABLE default.repairtable +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@repairtable +POSTHOOK: Output: default@repairtable