Index: src/main/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- src/main/java/org/apache/hama/bsp/BSPJobClient.java (revision 1180743) +++ src/main/java/org/apache/hama/bsp/BSPJobClient.java (working copy) @@ -21,8 +21,7 @@ import java.io.IOException; import java.util.Map; import java.util.Random; - -import javax.security.auth.login.LoginException; +import java.util.StringTokenizer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,7 +33,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UnixUserGroupInformation; +import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hama.HamaConfiguration; @@ -200,8 +199,8 @@ if (masterAdress != null && !masterAdress.equals("local")) { this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy( JobSubmissionProtocol.class, JobSubmissionProtocol.versionID, - BSPMaster.getAddress(conf), conf, - NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class)); + BSPMaster.getAddress(conf), conf, NetUtils.getSocketFactory(conf, + JobSubmissionProtocol.class)); } else { LOG.debug("Using local BSP runner."); this.jobSubmitClient = new LocalBSPRunner(conf); @@ -249,18 +248,6 @@ return jobSubmitClient.jobsToComplete(); } - private UnixUserGroupInformation getUGI(Configuration conf) - throws IOException { - UnixUserGroupInformation ugi = null; - try { - ugi = UnixUserGroupInformation.login(conf, true); - } catch (LoginException e) { - throw (IOException) (new IOException( - "Failed to get the current user's information.").initCause(e)); - } - return ugi; - } - /** * Submit a job to the BSP system. This returns a handle to the * {@link RunningJob} which can be used to track the running-job. @@ -287,12 +274,6 @@ Path submitJobFile = new Path(submitJobDir, "job.xml"); LOG.debug("BSPJobClient.submitJobDir: " + submitJobDir); - /* - * set this user's id in job configuration, so later job files can be - * accessed using this user's id - */ - UnixUserGroupInformation ugi = getUGI(job.getConf()); - ClusterStatus clusterStatus = getClusterStatus(true); // check the number of BSP tasks @@ -300,8 +281,9 @@ int maxTasks = clusterStatus.getMaxTasks(); if (tasks <= 0 || tasks > maxTasks) { - LOG.warn("The number of tasks you've entered was invalid. Using default value of " - + maxTasks + "!"); + LOG + .warn("The number of tasks you've entered was invalid. Using default value of " + + maxTasks + "!"); job.setNumBspTask(maxTasks); } @@ -333,10 +315,11 @@ } // Set the user's name and working directory - job.setUser(ugi.getUserName()); - if (ugi.getGroupNames().length > 0) { - job.set("group.name", ugi.getGroupNames()[0]); + job.setUser(getUnixUserName()); + if (getUnixGroups().length > 0) { + job.set("group.name", getUnixGroups()[0]); } + if (job.getWorkingDirectory() == null) { job.setWorkingDirectory(fs.getWorkingDirectory()); } @@ -354,8 +337,8 @@ // // Now, actually submit the job (using the submit name) // - JobStatus status = jobSubmitClient.submitJob(jobId, - submitJobFile.makeQualified(fs).toString()); + JobStatus status = jobSubmitClient.submitJob(jobId, submitJobFile + .makeQualified(fs).toString()); if (status != null) { return new NetworkedJob(status); } else { @@ -583,9 +566,8 @@ System.out.println("Job name: " + job.getJobName()); System.out.printf("States are:\n\tRunning : 1\tSucceded : 2" + "\tFailed : 3\tPrep : 4\n"); - System.out.printf("%s\t%d\t%d\t%s\n", jobStatus.getJobID(), - jobStatus.getRunState(), jobStatus.getStartTime(), - jobStatus.getUsername()); + System.out.printf("%s\t%d\t%d\t%s\n", jobStatus.getJobID(), jobStatus + .getRunState(), jobStatus.getStartTime(), jobStatus.getUsername()); exitCode = 0; } @@ -680,4 +662,60 @@ int res = ToolRunner.run(new BSPJobClient(), args); System.exit(res); } + + // ////////////////////////////// + // + /** Get the current user's group list from Unix by running the command groups + * + * @return the groups list that the current user belongs to + * @throws IOException if encounter any error when running the command + */ + private static String[] getUnixGroups() throws IOException { + return executeShellCommand(Shell.getGROUPS_COMMAND()); + } + + /** + * Get current user's name from Unix by running the command whoami. + * + * @return current user's name + * @throws IOException if encounter any error while running the command + */ + static String getUnixUserName() throws IOException { + String[] result = executeShellCommand(new String[] { Shell.USER_NAME_COMMAND }); + if (result.length != 1) { + throw new IOException("Expect one token as the result of " + + Shell.USER_NAME_COMMAND + ": " + toString(result)); + } + return result[0]; + } + + /* Execute a command and return the result as an array of Strings */ + private static String[] executeShellCommand(String[] command) + throws IOException { + String groups = Shell.execCommand(command); + StringTokenizer tokenizer = new StringTokenizer(groups); + int numOfTokens = tokenizer.countTokens(); + String[] tokens = new String[numOfTokens]; + for (int i = 0; tokenizer.hasMoreTokens(); i++) { + tokens[i] = tokenizer.nextToken(); + } + + return tokens; + } + + /* + * Return a string representation of a string array. Two strings are separated + * by a blank. + */ + private static String toString(String[] strArray) { + if (strArray == null || strArray.length == 0) { + return ""; + } + StringBuilder buf = new StringBuilder(strArray[0]); + for (int i = 1; i < strArray.length; i++) { + buf.append(' '); + buf.append(strArray[i]); + } + return buf.toString(); + } } Index: src/main/java/org/apache/hama/bsp/GroomServer.java =================================================================== --- src/main/java/org/apache/hama/bsp/GroomServer.java (revision 1180743) +++ src/main/java/org/apache/hama/bsp/GroomServer.java (working copy) @@ -39,7 +39,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FSError; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -326,8 +325,6 @@ this.groomServerName = "groomd_" + this.rpcServer.replace(':', '_'); LOG.info("Starting groom: " + this.rpcServer); - DistributedCache.purgeCache(this.conf); - // establish the communication link to bsp master this.masterClient = (MasterProtocol) RPC.waitForProxy(MasterProtocol.class, MasterProtocol.versionID, bspMasterAddr, conf);