diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 4441c2f..2c77d38 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -1267,7 +1267,7 @@ private synchronized void setupJdbcConnectionPool(String driverUrl) throws SQLEx BoneCPConfig config = new BoneCPConfig(); config.setJdbcUrl(driverUrl); config.setMaxConnectionsPerPartition(10); - config.setPartitionCount(1); + config.setPartitionCount(4); connPool = new BoneCP(config); } diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 7dbb8be..182a208 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1227,89 +1227,100 @@ public int execute() throws CommandNeedRetryException { driverCxt.addToRunnable(tsk); } - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TIME_TO_SUBMIT); - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS); - // Loop while you either have tasks running, or tasks queued up - while (!destroyed && driverCxt.isRunning()) { - - // Launch upto maxthreads tasks - Task task; - while ((task = driverCxt.getRunnable(maxthreads)) != null) { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TASK + task.getName() + "." + task.getId()); - TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt); - if (!runner.isRunning()) { - break; + // Setup thread to send heartbeats while we're running + Heartbeatter heartbeatter = new Heartbeatter(conf, ctx.getHiveTxnManager()); + heartbeatter.start(); + try { + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TIME_TO_SUBMIT); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS); + // Loop while you either have tasks running, or tasks queued up + while (!destroyed && driverCxt.isRunning()) { + // Launch upto maxthreads tasks + Task task; + while ((task = driverCxt.getRunnable(maxthreads)) != null) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TASK + task.getName() + "." + task.getId()); + TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt); + if (!runner.isRunning()) { + break; + } } - } - // poll the Tasks to see which one completed - TaskRunner tskRun = driverCxt.pollFinished(); - if (tskRun == null) { - continue; - } - hookContext.addCompleteTask(tskRun); - - Task tsk = tskRun.getTask(); - TaskResult result = tskRun.getTaskResult(); - - int exitVal = result.getExitVal(); - if (exitVal != 0) { - if (tsk.ifRetryCmdWhenFail()) { - driverCxt.shutdown(); - // in case we decided to run everything in local mode, restore the - // the jobtracker setting to its initial value - ctx.restoreOriginalTracker(); - throw new CommandNeedRetryException(); + // poll the Tasks to see which one completed + TaskRunner tskRun = driverCxt.pollFinished(); + if (tskRun == null) { + continue; } - Task backupTask = tsk.getAndInitBackupTask(); - if (backupTask != null) { - setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk); - console.printError(errorMessage); - errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName(); - console.printError(errorMessage); - - // add backup task to runnable - if (DriverContext.isLaunchable(backupTask)) { - driverCxt.addToRunnable(backupTask); + hookContext.addCompleteTask(tskRun); + + Task tsk = tskRun.getTask(); + TaskResult result = tskRun.getTaskResult(); + + int exitVal = result.getExitVal(); + if (exitVal != 0) { + if (tsk.ifRetryCmdWhenFail()) { + driverCxt.shutdown(); + // in case we decided to run everything in local mode, restore the + // the jobtracker setting to its initial value + ctx.restoreOriginalTracker(); + throw new CommandNeedRetryException(); } - continue; + Task backupTask = tsk.getAndInitBackupTask(); + if (backupTask != null) { + setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk); + console.printError(errorMessage); + errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName(); + console.printError(errorMessage); + + // add backup task to runnable + if (DriverContext.isLaunchable(backupTask)) { + driverCxt.addToRunnable(backupTask); + } + continue; - } else { - hookContext.setHookType(HookContext.HookType.ON_FAILURE_HOOK); - // Get all the failure execution hooks and execute them. - for (Hook ofh : getHooks(HiveConf.ConfVars.ONFAILUREHOOKS)) { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName()); + } else { + hookContext.setHookType(HookContext.HookType.ON_FAILURE_HOOK); + // Get all the failure execution hooks and execute them. + for (Hook ofh : getHooks(ConfVars.ONFAILUREHOOKS)) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName()); - ((ExecuteWithHookContext) ofh).run(hookContext); + ((ExecuteWithHookContext) ofh).run(hookContext); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName()); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName()); + } + setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk); + SQLState = "08S01"; + console.printError(errorMessage); + driverCxt.shutdown(); + // in case we decided to run everything in local mode, restore the + // the jobtracker setting to its initial value + ctx.restoreOriginalTracker(); + return exitVal; } - setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk); - SQLState = "08S01"; - console.printError(errorMessage); - driverCxt.shutdown(); - // in case we decided to run everything in local mode, restore the - // the jobtracker setting to its initial value - ctx.restoreOriginalTracker(); - return exitVal; } - } - driverCxt.finished(tskRun); + driverCxt.finished(tskRun); - if (SessionState.get() != null) { - SessionState.get().getHiveHistory().setTaskProperty(queryId, tsk.getId(), - Keys.TASK_RET_CODE, String.valueOf(exitVal)); - SessionState.get().getHiveHistory().endTask(queryId, tsk); - } + if (SessionState.get() != null) { + SessionState.get().getHiveHistory().setTaskProperty(queryId, tsk.getId(), + Keys.TASK_RET_CODE, String.valueOf(exitVal)); + SessionState.get().getHiveHistory().endTask(queryId, tsk); + } - if (tsk.getChildTasks() != null) { - for (Task child : tsk.getChildTasks()) { - if (DriverContext.isLaunchable(child)) { - driverCxt.addToRunnable(child); + if (tsk.getChildTasks() != null) { + for (Task child : tsk.getChildTasks()) { + if (DriverContext.isLaunchable(child)) { + driverCxt.addToRunnable(child); + } } } } + } finally { + // We don't explicitly interupt the thread because doing so can screw up any db + // transactions its in the middle of. We just tell it to stop and it will eventually. + LockException lockException = heartbeatter.timeToStop(); + // If the lock was aborted, throw an error. This is not ideal, + // it would be better if we stopped the query in process. + if (lockException != null) throw lockException; } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RUN_TASKS); @@ -1618,4 +1629,59 @@ public String getErrorMsg() { return errorMessage; } + private static class Heartbeatter extends Thread { + + private boolean timeToStop = false; + private LockException exception = null; + private HiveConf conf; + private HiveTxnManager txnManager; + + Heartbeatter(HiveConf conf, HiveTxnManager txnManager) { + this.conf = conf; + this.txnManager = txnManager; + } + + @Override + public void run() { + // This action doesn't have any transaction manager, so don't mess with heartbeating. + if (txnManager == null) return; + long lastHeartbeat = 0; + // Convert the time to milliseconds, but cut it in half so we make sure we're heartbeating + // often enough. + final long heartbeatInterval = HiveConf.getIntVar(conf, ConfVars.HIVE_TXN_TIMEOUT) * 500; + + while (!timeToStop) { + // Send heartbeat to transaction manager so it knows we're still alive and running. + long now = System.currentTimeMillis(); + if (now - lastHeartbeat > heartbeatInterval) { + try { + txnManager.heartbeat(); + } catch (LockException e) { + LOG.warn("Received a lock exception, marking for abort " + + org.apache.hadoop.util.StringUtils.stringifyException(e)); + exception = e; + return; + } + lastHeartbeat = now; + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { + LOG.debug("Received interupt, exiting"); + return; + } + } + } + + /** + * Tell the heartbeating thread that it is time to stop. + * @return any exceptions received by the heartbeating thread. If it returns null + * then no error was encountered. + */ + LockException timeToStop() { + timeToStop = true; + return exception; + } + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index 535912f..36dd8f1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -20,6 +20,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.thrift.TException; @@ -42,10 +43,10 @@ private static final long MAX_SLEEP = 15000; private HiveLockManagerCtx context; private Set locks; - private HiveMetaStoreClient client; + private IMetaStoreClient client; private long nextSleep = 50; - DbLockManager(HiveMetaStoreClient client) { + DbLockManager(IMetaStoreClient client) { locks = new HashSet(); this.client = client; } diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 7773f66..2f74310 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.thrift.TException; @@ -45,7 +47,7 @@ static final private Log LOG = LogFactory.getLog(CLASS_NAME); private DbLockManager lockMgr = null; - private HiveMetaStoreClient client = null; + private IMetaStoreClient client = null; private long txnId = 0; DbTxnManager() { @@ -306,10 +308,13 @@ private void init() throws LockException { "methods."); } try { - client = new HiveMetaStoreClient(conf); + // Don't directly create the client, use Hive to get it for me so that I get a retrying + // client. + client = Hive.get(conf).getMSC(); } catch (MetaException e) { - throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), - e); + throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e); + } catch (HiveException e) { + throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e); } } }