diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index a09590a..cec684c 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.Serializable; import java.io.UnsupportedEncodingException; +import java.security.PrivilegedExceptionAction; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; @@ -49,7 +50,9 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationState; @@ -165,26 +168,47 @@ private void runInternal(HiveConf sqlOperationConf) throws HiveSQLException { @Override public void run() throws HiveSQLException { setState(OperationState.PENDING); - prepare(getConfigForOperation()); + HiveConf opConfig = getConfigForOperation(); + prepare(opConfig); if (!shouldRunAsync()) { - runInternal(getConfigForOperation()); + runInternal(opConfig); } else { final SessionState parentSessionState = SessionState.get(); + // current Hive object needs to be set in aysnc thread in case of remote metastore. + // The metastore client in Hive is associated with right user final Hive sessionHive = getCurrentHive(); + // current UGI will get used by metastore when metsatore is in embedded mode + // so this needs to get passed to the new async thread + final UserGroupInformation currentUGI = getCurrentUGI(opConfig); + // Runnable impl to call runInternal asynchronously, // from a different thread Runnable backgroundOperation = new Runnable() { + @Override public void run() { - // Storing the current Hive object necessary when doAs is enabled - // User information is part of the metastore client member in Hive - Hive.set(sessionHive); - SessionState.setCurrentSessionState(parentSessionState); + PrivilegedExceptionAction doAsAction = new PrivilegedExceptionAction() { + @Override + public Object run() throws HiveSQLException { + + // Storing the current Hive object necessary when doAs is enabled + // User information is part of the metastore client member in Hive + Hive.set(sessionHive); + SessionState.setCurrentSessionState(parentSessionState); + try { + runInternal(getConfigForOperation()); + } catch (HiveSQLException e) { + setOperationException(e); + LOG.error("Error running hive query: ", e); + } + return null; + } + }; try { - runInternal(getConfigForOperation()); - } catch (HiveSQLException e) { - setOperationException(e); - LOG.error("Error: ", e); + ShimLoader.getHadoopShims().doAs(currentUGI, doAsAction); + } catch (Exception e) { + setOperationException(new HiveSQLException(e)); + LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e); } } }; @@ -201,6 +225,14 @@ public void run() { } } + private UserGroupInformation getCurrentUGI(HiveConf opConfig) throws HiveSQLException { + try { + return ShimLoader.getHadoopShims().getUGIForConf(opConfig); + } catch (Exception e) { + throw new HiveSQLException("Unable to get current user", e); + } + } + private Hive getCurrentHive() throws HiveSQLException { try { return Hive.get();