diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8490558..84c89c2 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -188,11 +188,15 @@ PLAN_SERIALIZATION("hive.plan.serialization.format", "kryo", "Query plan format serialization between client and task nodes. \n" + "Two supported values are : kryo and javaXML. Kryo is default."), - SCRATCHDIR("hive.exec.scratchdir", "/tmp/hive-${system:user.name}", "Scratch space for Hive jobs"), + SCRATCHDIR_PREFIX("hive.exec.scratchdir.prefix", "/tmp/hive-", "HDFS scratch dir prefix"), + SCRATCHDIR("hive.exec.scratchdir", "${hive.exec.scratchdir.prefix}${system:user.name}", "HDFS scratch dir for Hive jobs"), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), SCRATCHDIRPERMISSION("hive.scratch.dir.permission", "700", ""), + DOWNLOADED_RESOURCES_DIR("hive.downloaded.resources.dir", + "${system:java.io.tmpdir}" + File.separator + "${hive.session.id}_resources", + "Temporary local directory for added resources in the remote file system."), SUBMITVIACHILD("hive.exec.submitviachild", false, ""), SUBMITLOCALTASKVIACHILD("hive.exec.submit.local.task.via.child", true, "Determines whether local tasks (typically mapjoin hashtable generation phase) runs in \n" + @@ -256,9 +260,6 @@ "Maximum number of dynamic partitions allowed to be created in each mapper/reducer node."), MAXCREATEDFILES("hive.exec.max.created.files", 100000L, "Maximum number of HDFS files created by all mappers/reducers in a MapReduce job."), - DOWNLOADED_RESOURCES_DIR("hive.downloaded.resources.dir", - "${system:java.io.tmpdir}" + File.separator + "${hive.session.id}_resources", - "Temporary local directory for added resources in the remote file system."), DEFAULTPARTITIONNAME("hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__", "The default partition name in case the dynamic partition column value is null/empty string or any other values that cannot be escaped. \n" + "This value must not contain any special character used in HDFS URI (e.g., ':', '%', '/' etc). \n" + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 668e580..7fcbe3c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -18,6 +18,18 @@ package org.apache.hadoop.hive.ql; +import java.io.DataInput; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; + import org.antlr.runtime.TokenRewriteStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,7 +42,6 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.TaskRunner; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; @@ -41,18 +52,6 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.util.StringUtils; -import java.io.DataInput; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.ConcurrentHashMap; - /** * Context for Semantic Analyzers. Usage: not reusable - construct a new one for * each query should call clear() at end of use to remove temporary folders @@ -191,7 +190,7 @@ public String getCmd () { * @param scratchDir path of tmp directory */ private Path getScratchDir(String scheme, String authority, - boolean mkdir, String scratchDir) { + boolean mkdir, String scratchDir) { String fileSystem = scheme + ":" + authority; Path dir = fsScratchDirs.get(fileSystem + "-" + TaskRunner.getTaskRunnerID()); @@ -203,11 +202,11 @@ private Path getScratchDir(String scheme, String authority, try { FileSystem fs = dirPath.getFileSystem(conf); dirPath = new Path(fs.makeQualified(dirPath).toString()); - FsPermission fsPermission = new FsPermission(Short.parseShort(scratchDirPermission.trim(), 8)); + FsPermission fsPermission = new FsPermission(scratchDirPermission); - if (!Utilities.createDirsWithPermission(conf, dirPath, fsPermission)) { + if (!fs.mkdirs(dirPath, fsPermission)) { throw new RuntimeException("Cannot make directory: " - + dirPath.toString()); + + dirPath.toString()); } if (isHDFSCleanup) { fs.deleteOnExit(dirPath); @@ -233,7 +232,7 @@ public Path getLocalScratchDir(boolean mkdir) { FileSystem fs = FileSystem.getLocal(conf); URI uri = fs.getUri(); return getScratchDir(uri.getScheme(), uri.getAuthority(), - mkdir, localScratchDir); + mkdir, localScratchDir); } catch (IOException e) { throw new RuntimeException (e); } @@ -257,7 +256,7 @@ public Path getMRScratchDir() { URI uri = dir.toUri(); Path newScratchDir = getScratchDir(uri.getScheme(), uri.getAuthority(), - !explain, uri.getPath()); + !explain, uri.getPath()); LOG.info("New scratch dir is " + newScratchDir); return newScratchDir; } catch (IOException e) { @@ -270,7 +269,7 @@ public Path getMRScratchDir() { private Path getExternalScratchDir(URI extURI) { return getScratchDir(extURI.getScheme(), extURI.getAuthority(), - !explain, nonLocalScratchPath.toUri().getPath()); + !explain, nonLocalScratchPath.toUri().getPath()); } /** @@ -283,7 +282,7 @@ private void removeScratchDir() { p.getFileSystem(conf).delete(p, true); } catch (Exception e) { LOG.warn("Error Removing Scratch: " - + StringUtils.stringifyException(e)); + + StringUtils.stringifyException(e)); } } fsScratchDirs.clear(); @@ -305,7 +304,7 @@ private String nextPathId() { */ public boolean isMRTmpFileURI(String uriStr) { return (uriStr.indexOf(executionId) != -1) && - (uriStr.indexOf(MR_PREFIX) != -1); + (uriStr.indexOf(MR_PREFIX) != -1); } /** @@ -315,7 +314,7 @@ public boolean isMRTmpFileURI(String uriStr) { */ public Path getMRTmpPath() { return new Path(getMRScratchDir(), MR_PREFIX + - nextPathId()); + nextPathId()); } /** @@ -343,7 +342,7 @@ public Path getExternalTmpPath(Path path) { return getExtTmpPathRelTo(path.getParent()); } return new Path(getExternalScratchDir(extURI), EXT_PREFIX + - nextPathId()); + nextPathId()); } /** @@ -353,8 +352,8 @@ public Path getExternalTmpPath(Path path) { */ public Path getExtTmpPathRelTo(Path path) { URI uri = path.toUri(); - return new Path (getScratchDir(uri.getScheme(), uri.getAuthority(), !explain, - uri.getPath() + Path.SEPARATOR + "_" + this.executionId), EXT_PREFIX + nextPathId()); + return new Path (getScratchDir(uri.getScheme(), uri.getAuthority(), !explain, + uri.getPath() + Path.SEPARATOR + "_" + this.executionId), EXT_PREFIX + nextPathId()); } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 4cf4522..f9eb3ef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -92,7 +92,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.HiveInterruptCallback; import org.apache.hadoop.hive.common.HiveInterruptUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; @@ -430,7 +429,7 @@ public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) { @Override protected Expression instantiate(Object oldInstance, Encoder out) { return new Expression(Enum.class, "valueOf", new Object[] {oldInstance.getClass(), - ((Enum) oldInstance).name()}); + ((Enum) oldInstance).name()}); } @Override @@ -774,11 +773,11 @@ public void write(Kryo kryo, Output output, Timestamp ts) { } } - /** Custom Kryo serializer for sql date, otherwise Kryo gets confused between + /** Custom Kryo serializer for sql date, otherwise Kryo gets confused between java.sql.Date and java.util.Date while deserializing */ private static class SqlDateSerializer extends - com.esotericsoftware.kryo.Serializer { + com.esotericsoftware.kryo.Serializer { @Override public java.sql.Date read(Kryo kryo, Input input, Class clazz) { @@ -798,7 +797,7 @@ public CommonToken read(Kryo kryo, Input input, Class clazz) { } @Override - public void write(Kryo kryo, Output output, CommonToken token) { + public void write(Kryo kryo, Output output, CommonToken token) { output.writeInt(token.getType()); output.writeString(token.getText()); } @@ -1076,10 +1075,10 @@ public static TableDesc getTableDesc(Table tbl) { public static TableDesc getTableDesc(String cols, String colTypes) { return (new TableDesc(SequenceFileInputFormat.class, HiveSequenceFileOutputFormat.class, Utilities.makeProperties( - serdeConstants.SERIALIZATION_FORMAT, "" + Utilities.ctrlaCode, - serdeConstants.LIST_COLUMNS, cols, - serdeConstants.LIST_COLUMN_TYPES, colTypes, - serdeConstants.SERIALIZATION_LIB,LazySimpleSerDe.class.getName()))); + serdeConstants.SERIALIZATION_FORMAT, "" + Utilities.ctrlaCode, + serdeConstants.LIST_COLUMNS, cols, + serdeConstants.LIST_COLUMN_TYPES, colTypes, + serdeConstants.SERIALIZATION_LIB,LazySimpleSerDe.class.getName()))); } public static PartitionDesc getPartitionDesc(Partition part) throws HiveException { @@ -1352,7 +1351,7 @@ public static String getFileExtension(JobConf jc, boolean isCompressed, */ public static SequenceFile.Writer createSequenceWriter(JobConf jc, FileSystem fs, Path file, Class keyClass, Class valClass, boolean isCompressed, Progressable progressable) - throws IOException { + throws IOException { CompressionCodec codec = null; CompressionType compressionType = CompressionType.NONE; Class codecClass = null; @@ -1362,7 +1361,7 @@ public static String getFileExtension(JobConf jc, boolean isCompressed, codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc); } return (SequenceFile.createWriter(fs, jc, file, keyClass, valClass, compressionType, codec, - progressable)); + progressable)); } @@ -1495,7 +1494,7 @@ public static void rename(FileSystem fs, Path src, Path dst) throws IOException, * @throws IOException */ public static void renameOrMoveFiles(FileSystem fs, Path src, Path dst) throws IOException, - HiveException { + HiveException { if (!fs.exists(dst)) { if (!fs.rename(src, dst)) { throw new HiveException("Unable to move: " + src + " to: " + dst); @@ -1739,7 +1738,7 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, */ private static void createEmptyBuckets(Configuration hconf, ArrayList paths, FileSinkDesc conf, Reporter reporter) - throws HiveException, IOException { + throws HiveException, IOException { JobConf jc; if (hconf instanceof JobConf) { @@ -1800,7 +1799,7 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I for (int i = 0; i < parts.length; ++i) { assert parts[i].isDir() : "dynamic partition " + parts[i].getPath() - + " is not a direcgtory"; + + " is not a direcgtory"; FileStatus[] items = fs.listStatus(parts[i].getPath()); // remove empty directory since DP insert should not generate empty partitions. @@ -2196,7 +2195,7 @@ public void interrupt() { try { new Path(path).getFileSystem(ctx.getConf()).close(); } catch (IOException ignore) { - LOG.debug(ignore); + LOG.debug(ignore); } } if (executor != null) { @@ -2436,7 +2435,7 @@ private static void getMRTasks(List> tasks, List fullPartSpec = new LinkedHashMap(partSpec); @@ -2544,7 +2543,7 @@ public static void setColumnTypeList(JobConf jobConf, Operator op, boolean exclu public static Path generatePath(Path basePath, String dumpFilePrefix, Byte tag, String bigBucketFileName) { return new Path(basePath, "MapJoin-" + dumpFilePrefix + tag + - "-" + bigBucketFileName + suffix); + "-" + bigBucketFileName + suffix); } public static String generateFileName(Byte tag, String bigBucketFileName) { @@ -2664,7 +2663,7 @@ public T run(PreparedStatement stmt) throws SQLException { try { Thread.sleep(waitTime); } catch (InterruptedException iex) { - } + } } catch (SQLException e) { // throw other types of SQLExceptions (SQLNonTransientException / SQLRecoverableException) throw e; @@ -2771,8 +2770,8 @@ public static PreparedStatement prepareWithRetry(Connection conn, String stmt, */ public static long getRandomWaitTime(int baseWindow, int failures, Random r) { return (long) ( - baseWindow * failures + // grace period for the last round of attempt - baseWindow * (failures + 1) * r.nextDouble()); // expanding time window for each failure + baseWindow * failures + // grace period for the last round of attempt + baseWindow * (failures + 1) * r.nextDouble()); // expanding time window for each failure } public static final char sqlEscapeChar = '\\'; @@ -2850,7 +2849,7 @@ public static String formatMsecToStr(long msec) { * @return the number of reducers. */ public static int estimateNumberOfReducers(HiveConf conf, ContentSummary inputSummary, - MapWork work, boolean finalMapRed) throws IOException { + MapWork work, boolean finalMapRed) throws IOException { long bytesPerReducer = conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER); int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); @@ -2863,7 +2862,7 @@ public static int estimateNumberOfReducers(HiveConf conf, ContentSummary inputSu + maxReducers + " estimated totalInputFileSize=" + totalInputFileSize); } else { LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" - + maxReducers + " totalInputFileSize=" + totalInputFileSize); + + maxReducers + " totalInputFileSize=" + totalInputFileSize); } // If this map reduce job writes final data to a table and bucketing is being inferred, @@ -3030,7 +3029,7 @@ public static double getHighestSamplePercentage (MapWork work) { if (!skipDummy && isEmptyPath(job, path, ctx)) { path = createDummyFileForEmptyPartition(path, job, work, - hiveScratchDir, alias, sequenceNumber++); + hiveScratchDir, alias, sequenceNumber++); } pathsToAdd.add(path); @@ -3195,7 +3194,7 @@ public static void setInputPaths(JobConf job, List pathsToAdd) { */ public static void setInputAttributes(Configuration conf, MapWork mWork) { HiveConf.ConfVars var = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") ? - HiveConf.ConfVars.HIVETEZINPUTFORMAT : HiveConf.ConfVars.HIVEINPUTFORMAT; + HiveConf.ConfVars.HIVETEZINPUTFORMAT : HiveConf.ConfVars.HIVEINPUTFORMAT; if (mWork.getInputformat() != null) { HiveConf.setVar(conf, var, mWork.getInputformat()); } @@ -3222,7 +3221,7 @@ public static void createTmpDirs(Configuration conf, MapWork mWork) Map> pa = mWork.getPathToAliases(); if (pa != null) { List> ops = - new ArrayList>(); + new ArrayList>(); for (List ls : pa.values()) { for (String a : ls) { ops.add(mWork.getAliasToWork().get(a)); @@ -3247,15 +3246,13 @@ public static void createTmpDirs(Configuration conf, ReduceWork rWork) return; } List> ops - = new LinkedList>(); + = new LinkedList>(); ops.add(rWork.getReducer()); createTmpDirs(conf, ops); } private static void createTmpDirs(Configuration conf, List> ops) throws IOException { - - FsPermission fsPermission = new FsPermission((short)00777); while (!ops.isEmpty()) { Operator op = ops.remove(0); @@ -3265,7 +3262,8 @@ private static void createTmpDirs(Configuration conf, if (tempDir != null) { Path tempPath = Utilities.toTempPath(tempDir); - createDirsWithPermission(conf, tempPath, fsPermission); + FileSystem fs = tempPath.getFileSystem(conf); + fs.mkdirs(tempPath); } } @@ -3327,7 +3325,7 @@ public static File createTempDir(String baseDir){ } } throw new IllegalStateException("Failed to create a temp dir under " - + baseDir + " Giving up after " + MAX_ATTEMPS + " attemps"); + + baseDir + " Giving up after " + MAX_ATTEMPS + " attemps"); } @@ -3400,34 +3398,7 @@ public static int getFooterCount(TableDesc table, JobConf job) throws IOExceptio return footerCount; } - /** - * @param conf the configuration used to derive the filesystem to create the path - * @param mkdir the path to be created - * @param fsPermission ignored if it is hive server session and doAs is enabled - * @return true if successfully created the directory else false - * @throws IOException if hdfs experiences any error conditions - */ - public static boolean createDirsWithPermission(Configuration conf, Path mkdir, - FsPermission fsPermission) throws IOException { - - boolean recursive = false; - if (SessionState.get() != null) { - recursive = SessionState.get().isHiveServerQuery() && - conf.getBoolean(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, - HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.defaultBoolVal); - // we reset the permission in case of hive server and doAs enabled because - // currently scratch directory uses /tmp/hive-hive as the scratch directory. - // However, with doAs enabled, the first user to create this directory would - // own the directory and subsequent users cannot access the scratch directory. - // The right fix is to have scratch dir per user. - fsPermission = new FsPermission((short)00777); - } - - // if we made it so far without exception we are good! - return createDirsWithPermission(conf, mkdir, fsPermission, recursive); - } - - private static void resetConfAndCloseFS (Configuration conf, boolean unsetUmask, + private static void resetConfAndCloseFS (Configuration conf, boolean unsetUmask, String origUmask, FileSystem fs) throws IOException { if (unsetUmask) { if (origUmask != null) { @@ -3440,36 +3411,6 @@ private static void resetConfAndCloseFS (Configuration conf, boolean unsetUmask, fs.close(); } - public static boolean createDirsWithPermission(Configuration conf, Path mkdirPath, - FsPermission fsPermission, boolean recursive) throws IOException { - String origUmask = null; - LOG.debug("Create dirs " + mkdirPath + " with permission " + fsPermission + " recursive " + - recursive); - - if (recursive) { - origUmask = conf.get("fs.permissions.umask-mode"); - // this umask is required because by default the hdfs mask is 022 resulting in - // all parents getting the fsPermission & !(022) permission instead of fsPermission - conf.set("fs.permissions.umask-mode", "000"); - } - - FileSystem fs = ShimLoader.getHadoopShims().getNonCachedFileSystem(mkdirPath.toUri(), conf); - boolean retval = false; - try { - retval = fs.mkdirs(mkdirPath, fsPermission); - resetConfAndCloseFS(conf, recursive, origUmask, fs); - } catch (IOException ioe) { - try { - resetConfAndCloseFS(conf, recursive, origUmask, fs); - } - catch (IOException e) { - // do nothing - double failure - } - } - return retval; - } - - /** * Convert path to qualified path. * diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 2959fcc..f947b12 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -41,12 +41,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.LocalResource; @@ -81,7 +77,7 @@ private final Set localizedResources = new HashSet(); private static List openSessions - = Collections.synchronizedList(new LinkedList()); + = Collections.synchronizedList(new LinkedList()); /** * Constructor. We do not automatically connect, because we only want to @@ -133,7 +129,7 @@ public void open(HiveConf conf) * @throws TezException */ public void open(HiveConf conf, String[] additionalFiles) - throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException { + throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException { this.conf = conf; UserGroupInformation ugi; @@ -209,7 +205,7 @@ public void open(HiveConf conf, String[] additionalFiles) } public void refreshLocalResourcesFromConf(HiveConf conf) - throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException { + throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException { String dir = tezScratchDir.toString(); @@ -223,7 +219,7 @@ public void refreshLocalResourcesFromConf(HiveConf conf) // these are local resources that are set through the mr "tmpjars" property List handlerLr = utils.localizeTempFiles(dir, conf, - additionalFilesNotFromConf.toArray(new String[additionalFilesNotFromConf.size()])); + additionalFilesNotFromConf.toArray(new String[additionalFilesNotFromConf.size()])); if (handlerLr != null) { localizedResources.addAll(handlerLr); @@ -297,14 +293,14 @@ public LocalResource getAppJarLr() { * be used with Tez. Assumes scratchDir exists. */ private Path createTezDir(String sessionId) - throws IOException { + throws IOException { // tez needs its own scratch dir (per session) Path tezDir = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR), TEZ_DIR); tezDir = new Path(tezDir, sessionId); FileSystem fs = tezDir.getFileSystem(conf); - FsPermission fsPermission = new FsPermission((short)00777); - Utilities.createDirsWithPermission(conf, tezDir, fsPermission, true); + FsPermission fsPermission = new FsPermission(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION)); + fs.mkdirs(tezDir, fsPermission); // Make sure the path is normalized (we expect validation to pass since we just created it). tezDir = DagUtils.validateTargetDir(tezDir, conf).getPath(); // don't keep the directory around on non-clean exit diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index fcfcf42..f9eaa7c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveMetastoreClientFactoryImpl; import org.apache.hadoop.hive.ql.util.DosToUnix; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import com.google.common.base.Preconditions; @@ -182,7 +183,7 @@ private PerfLogger perfLogger; - private final String userName; + private String userName; /** * scratch path to use for all non-local (ie. hdfs) file system tmp folders @@ -338,10 +339,10 @@ public static SessionState start(SessionState startSs) { setCurrentSessionState(startSs); - if(startSs.hiveHist == null){ + if (startSs.hiveHist == null){ if (startSs.getConf().getBoolVar(HiveConf.ConfVars.HIVE_SESSION_HISTORY_ENABLED)) { startSs.hiveHist = new HiveHistoryImpl(startSs); - }else { + } else { //Hive history is disabled, create a no-op proxy startSs.hiveHist = HiveHistoryProxyHandler.getNoOpHiveHistoryProxy(); } @@ -359,13 +360,18 @@ public static SessionState start(SessionState startSs) { // Get the following out of the way when you start the session these take a // while and should be done when we start up. try { - //Hive object instance should be created with a copy of the conf object. If the conf is + // Hive object instance should be created with a copy of the conf object. If the conf is // shared with SessionState, other parts of the code might update the config, but // Hive.get(HiveConf) would not recognize the case when it needs refreshing Hive.get(new HiveConf(startSs.conf)).getMSC(); - ShimLoader.getHadoopShims().getUGIForConf(startSs.conf); + UserGroupInformation sessionUGI = ShimLoader.getHadoopShims().getUGIForConf(startSs.conf); + // Set username for this session + startSs.setUserName(sessionUGI.getShortUserName()); FileSystem.get(startSs.conf); - startSs.createSessionPaths(startSs.conf); + // Create scratch dirs for this session + startSs.createSessionScratchDirs(); + // Create session paths for this session + startSs.createSessionPaths(); } catch (Exception e) { // catch-all due to some exec time dependencies on session state // that would cause ClassNoFoundException otherwise @@ -442,21 +448,21 @@ private void dropSessionPaths(Configuration conf) throws IOException { } } - private void createSessionPaths(Configuration conf) throws IOException { - + private void createSessionPaths() throws IOException { + Configuration conf = getConf(); String scratchDirPermission = HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION); String sessionId = getSessionId(); // local & non-local tmp location is configurable. however it is the same across // all external file systems hdfsSessionPath = - new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR), - sessionId); + new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR), + sessionId); createPath(conf, hdfsSessionPath, scratchDirPermission); conf.set(HDFS_SESSION_PATH_KEY, hdfsSessionPath.toUri().toString()); localSessionPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.LOCALSCRATCHDIR), - sessionId); + sessionId); createPath(conf, localSessionPath, scratchDirPermission); conf.set(LOCAL_SESSION_PATH_KEY, localSessionPath.toUri().toString()); hdfsTmpTableSpace = new Path(hdfsSessionPath, TMP_PREFIX); @@ -464,14 +470,55 @@ private void createSessionPaths(Configuration conf) throws IOException { conf.set(TMP_TABLE_SPACE_KEY, hdfsTmpTableSpace.toUri().toString()); } + /** + * Create dirs for this session: + * 1. HDFS scratch dir + * 2. Local scratch dir + * 3. Local downloaded resource dir + * @param conf + * @throws IOException + */ + private void createSessionScratchDirs() throws IOException { + HiveConf conf = getConf(); + String scratchDirPermission = HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION); + // HDFS scratch dir + String hdfsScratchDir = HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR_PREFIX) + getUserName(); + conf.setVar(HiveConf.ConfVars.SCRATCHDIR, hdfsScratchDir); + setupScratchDir(hdfsScratchDir, scratchDirPermission, false, conf); + // Local scratch dir + String localScratchDir = HiveConf.getVar(conf, HiveConf.ConfVars.LOCALSCRATCHDIR); + setupScratchDir(localScratchDir, scratchDirPermission, true, conf); + // Download resources dir + String downloadResourceDir = HiveConf.getVar(conf, HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR); + setupScratchDir(downloadResourceDir, scratchDirPermission, true, conf); + } + + // Create the give Path if doesn't exists and make it writable + private void setupScratchDir(String dirPath, String scratchDirPermission, boolean isLocal, + HiveConf conf) throws IOException { + Path scratchDirPath = new Path(dirPath); + FsPermission fsPermission = new FsPermission(scratchDirPermission); + FileSystem fs; + if (isLocal) { + fs = FileSystem.getLocal(conf); + } else { + fs = scratchDirPath.getFileSystem(conf); + } + if (!fs.exists(scratchDirPath)) { + fs.mkdirs(scratchDirPath, fsPermission); + String dirType = isLocal ? "local" : "HDFS"; + LOG.info("Created " + dirType + " directory: " + scratchDirPath.toString()); + } + } + private void createPath(Configuration conf, Path p, String perm) throws IOException { FileSystem fs = p.getFileSystem(conf); p = new Path(fs.makeQualified(p).toString()); - FsPermission fsPermission = new FsPermission(Short.parseShort(perm.trim(), 8)); + FsPermission fsPermission = new FsPermission(perm); - if (!Utilities.createDirsWithPermission(conf, p, fsPermission)) { + if (!fs.mkdirs(p, fsPermission)) { throw new IOException("Cannot create directory: " - + p.toString()); + + p.toString()); } // best effort to clean up if we don't shut down properly @@ -1113,6 +1160,10 @@ public String getUserName() { return userName; } + public void setUserName(String userName) { + this.userName = userName; + } + /** * If authorization mode is v2, then pass it through authorizer so that it can apply * any security configuration changes. diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java index bf3fd88..390a4a6 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java @@ -20,15 +20,12 @@ import static org.apache.hadoop.hive.ql.exec.Utilities.getFileExtension; -import java.io.IOException; import java.sql.Timestamp; import java.util.ArrayList; import java.util.List; import junit.framework.TestCase; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -39,7 +36,6 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFFromUtcTimestamp; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.JobConf; -import org.junit.Test; public class TestUtilities extends TestCase { @@ -77,9 +73,9 @@ public void testSerializeTimestamp() { List children = new ArrayList(1); children.add(constant); ExprNodeGenericFuncDesc desc = new ExprNodeGenericFuncDesc(TypeInfoFactory.timestampTypeInfo, - new GenericUDFFromUtcTimestamp(), children); + new GenericUDFFromUtcTimestamp(), children); assertEquals(desc.getExprString(), Utilities.deserializeExpression( - Utilities.serializeExpression(desc)).getExprString()); + Utilities.serializeExpression(desc)).getExprString()); } public void testgetDbTableName() throws HiveException{ @@ -109,24 +105,4 @@ public void testgetDbTableName() throws HiveException{ assertEquals("Invalid table name " + tablename, ex.getMessage()); } } - - @Test - public void testFSUmaskReset() throws Exception { - // ensure that FS Umask is not reset (HIVE-7001) - checkFSUMaskReset(true); - checkFSUMaskReset(false); - } - - private void checkFSUMaskReset(boolean recursiveArg) throws IllegalArgumentException, IOException { - final String FS_MASK_PARAM = "fs.permissions.umask-mode"; - final String FS_MASK_VAL = "055"; - HiveConf conf = new HiveConf(); - String dir = System.getProperty("test.tmp.dir") + "/testUtilitiesUMaskReset"; - conf.set(FS_MASK_PARAM, FS_MASK_VAL); - Utilities.createDirsWithPermission(conf, new Path(dir), new FsPermission((short) 00777), - recursiveArg); - assertEquals(conf.get(FS_MASK_PARAM), FS_MASK_VAL); - } - - } diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java index 80d7b82..b7a3034 100644 --- a/service/src/java/org/apache/hive/service/cli/CLIService.java +++ b/service/src/java/org/apache/hive/service/cli/CLIService.java @@ -30,12 +30,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.conf.SystemVariables; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -46,7 +42,6 @@ import org.apache.hive.service.ServiceException; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.operation.Operation; -import org.apache.hive.service.cli.session.HiveSession; import org.apache.hive.service.cli.session.SessionManager; import org.apache.hive.service.cli.thrift.TProtocolVersion; @@ -123,16 +118,6 @@ public UserGroupInformation getHttpUGI() { @Override public synchronized void start() { super.start(); - - try { - // make sure that the base scratch directories exists and writable - setupStagingDir(hiveConf.getVar(HiveConf.ConfVars.SCRATCHDIR), false); - setupStagingDir(hiveConf.getVar(HiveConf.ConfVars.LOCALSCRATCHDIR), true); - setupStagingDir(hiveConf.getVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR), true); - } catch (IOException eIO) { - throw new ServiceException("Error setting stage directories", eIO); - } - try { // Initialize and test a connection to the metastore metastoreClient = new HiveMetaStoreClient(hiveConf); @@ -467,25 +452,6 @@ public synchronized String getDelegationTokenFromMetaStore(String owner) } } - // create the give Path if doesn't exists and make it writable - private void setupStagingDir(String dirPath, boolean isLocal) throws IOException { - Path scratchDir = getStaticPath(new Path(dirPath)); - if (scratchDir == null) { - return; - } - FileSystem fs; - if (isLocal) { - fs = FileSystem.getLocal(hiveConf); - } else { - fs = scratchDir.getFileSystem(hiveConf); - } - if (!fs.exists(scratchDir)) { - fs.mkdirs(scratchDir); - } - FsPermission fsPermission = new FsPermission((short)0777); - fs.setPermission(scratchDir, fsPermission); - } - @Override public String getDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, String owner, String renewer) throws HiveSQLException { @@ -509,16 +475,4 @@ public void renewDelegationToken(SessionHandle sessionHandle, HiveAuthFactory au sessionManager.getSession(sessionHandle).renewDelegationToken(authFactory, tokenStr); LOG.info(sessionHandle + ": renewDelegationToken()"); } - - // DOWNLOADED_RESOURCES_DIR for example, which is by default ${system:java.io.tmpdir}/${hive.session.id}_resources, - // {system:java.io.tmpdir} would be already evaluated but ${hive.session.id} would be not in here. - // for that case, this returns evaluatd parts only, in this case, "/tmp" - // what for ${hive.session.id}_resources/${system:java.io.tmpdir}? just don't do that. - private Path getStaticPath(Path path) { - Path current = path; - for (; current != null && SystemVariables.containsVar(current.getName()); - current = current.getParent()) { - } - return current; - } } diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java index 4c3164e..4a4b9a2 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java @@ -18,14 +18,14 @@ package org.apache.hive.service.cli.session; +import java.util.Map; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.cli.operation.OperationManager; import org.apache.hive.service.cli.thrift.TProtocolVersion; -import java.util.Map; - /** * Methods that don't need to be executed under a doAs * context are here. Rest of them in HiveSession interface diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index b39d64d..5b378e4 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -73,7 +73,7 @@ private String username; private final String password; - private final HiveConf hiveConf; + private HiveConf hiveConf; private final SessionState sessionState; private String ipAddress; @@ -206,6 +206,12 @@ public void setOperationManager(OperationManager operationManager) { } @Override + /** + * Opens a new HiveServer2 session for the client connection. + * Note that if doAs is true, this call goes through a proxy object, + * which wraps the method logic in a UserGroupInformation#doAs. + * That is why it is important to call SessionState#start here rather than the constructor. + */ public void open() { SessionState.start(sessionState); } diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java index c2f0495..0f90bd9 100644 --- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.service.CompositeService; -import org.apache.hive.service.auth.TSetIpAddressProcessor; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.cli.operation.OperationManager; @@ -121,11 +120,13 @@ public SessionHandle openSession(TProtocolVersion protocol, String username, Str Map sessionConf, boolean withImpersonation, String delegationToken) throws HiveSQLException { HiveSession session; + // If doAs is set to true for HiveServer2, we will create a proxy object for the session impl. + // Within the proxy object, we wrap the method call in a UserGroupInformation#doAs if (withImpersonation) { - HiveSessionImplwithUGI hiveSessionUgi = new HiveSessionImplwithUGI(protocol, username, password, - hiveConf, ipAddress, delegationToken); - session = HiveSessionProxy.getProxy(hiveSessionUgi, hiveSessionUgi.getSessionUgi()); - hiveSessionUgi.setProxySession(session); + HiveSessionImplwithUGI sessionWithUGI = new HiveSessionImplwithUGI(protocol, username, password, + hiveConf, ipAddress, delegationToken); + session = HiveSessionProxy.getProxy(sessionWithUGI, sessionWithUGI.getSessionUgi()); + sessionWithUGI.setProxySession(session); } else { session = new HiveSessionImpl(protocol, username, password, hiveConf, ipAddress); }