Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1094342)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -381,6 +381,7 @@
HIVE_LOCK_MANAGER("hive.lock.manager", "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager"),
HIVE_LOCK_NUMRETRIES("hive.lock.numretries", 100),
HIVE_LOCK_SLEEP_BETWEEN_RETRIES("hive.lock.sleep.between.retries", 60),
+ HIVE_LOCK_MAPRED_ONLY("hive.lock.mapred.only.operation", false),
HIVE_ZOOKEEPER_QUORUM("hive.zookeeper.quorum", ""),
HIVE_ZOOKEEPER_CLIENT_PORT("hive.zookeeper.client.port", ""),
@@ -423,7 +424,7 @@
// temporary variable for testing. This is added just to turn off this feature in case of a bug in
// deployment. It has not been documented in hive-default.xml intentionally, this should be removed
// once the feature is stable
- HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS("hive.mapper.cannot.span.multiple.partitions", false),
+ HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS("hive.mapper.cannot.span.multiple.partitions", false),
;
Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 1094342)
+++ conf/hive-default.xml (working copy)
@@ -998,5 +998,11 @@
A comma separated list of acceptable URI schemes for import and export.
+
+ hive.lock.mapred.only.operation
+ false
+ This param is to control whether or not only do lock on queries
+ that need to execute at least one mapred job.
+
Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 1094342)
+++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy)
@@ -820,19 +820,47 @@
releaseLocks(ctx.getHiveLocks());
return new CommandProcessorResponse(ret, errorMessage, SQLState);
}
+
+ boolean alwaysLock = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_LOCK_MAPRED_ONLY);
+ boolean requireLock = false;
+ if(!alwaysLock) {
+ Queue> taskQueue = new LinkedList>();
+ taskQueue.addAll(plan.getRootTasks());
+ while (taskQueue.peek() != null) {
+ Task extends Serializable> tsk = taskQueue.remove();
+ requireLock = requireLock || tsk.requireLock();
+ if(requireLock) {
+ break;
+ }
+ if (tsk instanceof ConditionalTask) {
+ taskQueue.addAll(((ConditionalTask)tsk).getListTasks());
+ }
+ if(tsk.getChildTasks()!= null) {
+ taskQueue.addAll(tsk.getChildTasks());
+ }
+ // does not add back up task here, because back up task should be the same
+ // type of the original task.
+ }
+ } else {
+ requireLock = true;
+ }
- ret = acquireReadWriteLocks();
- if (ret != 0) {
- releaseLocks(ctx.getHiveLocks());
- return new CommandProcessorResponse(ret, errorMessage, SQLState);
+ if (requireLock) {
+ ret = acquireReadWriteLocks();
+ if (ret != 0) {
+ releaseLocks(ctx.getHiveLocks());
+ return new CommandProcessorResponse(ret, errorMessage, SQLState);
+ }
}
ret = execute();
if (ret != 0) {
+ //if needRequireLock is false, the release here will do nothing because there is no lock
releaseLocks(ctx.getHiveLocks());
return new CommandProcessorResponse(ret, errorMessage, SQLState);
}
+ //if needRequireLock is false, the release here will do nothing because there is no lock
releaseLocks(ctx.getHiveLocks());
return new CommandProcessorResponse(ret);
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (revision 1094342)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (working copy)
@@ -163,6 +163,10 @@
private static String INTERMEDIATE_ORIGINAL_DIR_SUFFIX;
private static String INTERMEDIATE_EXTRACTED_DIR_SUFFIX;
+ public boolean requireLock() {
+ return this.work != null && this.work.getNeedLock();
+ }
+
public DDLTask() {
super();
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 1094342)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy)
@@ -106,6 +106,10 @@
this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this);
}
+ public boolean requireLock() {
+ return true;
+ }
+
protected static String getResourceFiles(Configuration conf, SessionState.ResourceType t) {
// fill in local files to be added to the task environment
SessionState ss = SessionState.get();
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (revision 1094342)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (working copy)
@@ -97,7 +97,9 @@
return sdf.format(cal.getTime());
}
-
+ public boolean requireLock() {
+ return true;
+ }
@Override
public int execute(DriverContext driverContext) {
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (revision 1094342)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (working copy)
@@ -472,4 +472,9 @@
public void setLocalMode(boolean isLocalMode) {
this.isLocalMode = isLocalMode;
}
+
+ public boolean requireLock() {
+ return false;
+ }
+
}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (revision 1094342)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (working copy)
@@ -74,6 +74,10 @@
job = new JobConf(conf, BlockMergeTask.class);
jobExecHelper = new HadoopJobExecHelper(job, this.console, this, this);
}
+
+ public boolean requireLock() {
+ return true;
+ }
boolean success = true;
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (revision 1094342)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (working copy)
@@ -1175,8 +1175,9 @@
mergeDesc.setOutputDir(outputDir);
addInputsOutputsAlterTable(tableName, partSpec);
- Task extends Serializable> mergeTask = TaskFactory.get(new DDLWork(
- getInputs(), getOutputs(), mergeDesc), conf);
+ DDLWork ddlWork = new DDLWork(getInputs(), getOutputs(), mergeDesc);
+ ddlWork.setNeedLock(true);
+ Task extends Serializable> mergeTask = TaskFactory.get(ddlWork, conf);
tableSpec tablepart = new tableSpec(this.db, conf, tablePartAST);
StatsWork statDesc = new StatsWork(tablepart);
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java (revision 1094342)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java (working copy)
@@ -66,6 +66,8 @@
private RevokeDesc revokeDesc;
private GrantRevokeRoleDDL grantRevokeRoleDDL;
+ boolean needLock = false;
+
/**
* ReadEntitites that are passed to the hooks.
*/
@@ -877,4 +879,12 @@
this.mergeFilesDesc = mergeDesc;
}
+ public boolean getNeedLock() {
+ return needLock;
+ }
+
+ public void setNeedLock(boolean needLock) {
+ this.needLock = needLock;
+ }
+
}