Index: src/test/core/org/apache/hadoop/fs/FileContextBaseTest.java
===================================================================
--- src/test/core/org/apache/hadoop/fs/FileContextBaseTest.java	(revision 0)
+++ src/test/core/org/apache/hadoop/fs/FileContextBaseTest.java	(revision 0)
@@ -0,0 +1,489 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.EnumSet;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileContext.CreatOpts;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+/**
+ * <p>
+ * A collection of tests for the {@link FileContext}.
+ * This test should be used for testing an instance of FileContext
+ *  that has been initialized to a specific default FileSystem such a
+ *  LocalFileSystem, HDFS,S3, etc.
+ * </p>
+ * <p>
+ * To test a given {@link FileSystem} implementation create a subclass of this
+ * test and override {@link #setUp()} to initialize the <code>fc</code> 
+ * {@link FileContext} instance variable.
+ * </p>
+ */
+public abstract class FileContextBaseTest extends TestCase {
+  
+  protected FileContext fc;
+  private byte[] data = new byte[getBlockSize() * 2]; // two blocks of data
+  {
+    for (int i = 0; i < data.length; i++) {
+      data[i] = (byte) (i % 10);
+    }
+  }
+  
+  @Override
+  protected void tearDown() throws Exception {
+    fc.delete(path("/test"), true);
+  }
+  
+  protected int getBlockSize() {
+    return 1024;
+  }
+  
+  protected Path getDefaultWorkingDirectory() throws IOException {
+    return path("/user/" + System.getProperty("user.name")).makeQualified(
+              fc.getDefaultFileSystem().getUri(), fc.getWorkingDirectory());
+  }
+
+  protected boolean renameSupported() {
+    return true;
+  }
+
+
+  public void testFsStatus() throws Exception {
+    FsStatus fsStatus = fc.getFSStatus();
+    assertNotNull(fsStatus);
+    //used, free and capacity are non-negative longs
+    assertTrue(fsStatus.getUsed() >= 0);
+    assertTrue(fsStatus.getRemaining() >= 0);
+    assertTrue(fsStatus.getCapacity() >= 0);
+  }
+  
+  public void testWorkingDirectory() throws Exception {
+
+    Path workDir = getDefaultWorkingDirectory();
+    assertEquals(workDir, fc.getWorkingDirectory());
+
+    fc.setWorkingDirectory(path("."));
+    assertEquals(workDir, fc.getWorkingDirectory());
+
+    fc.setWorkingDirectory(path(".."));
+    assertEquals(workDir.getParent(), fc.getWorkingDirectory());
+
+    Path relativeDir = path("hadoop");
+    fc.setWorkingDirectory(relativeDir);
+    assertEquals(relativeDir, fc.getWorkingDirectory());
+    
+    Path absoluteDir = path("/test/hadoop");
+    fc.setWorkingDirectory(absoluteDir);
+    assertEquals(absoluteDir, fc.getWorkingDirectory());
+
+  }
+  
+  public void testMkdirs() throws Exception {
+    Path testDir = path("/test/hadoop");
+    assertFalse(fc.exists(testDir));
+    assertFalse(fc.isFile(testDir));
+
+    assertTrue(fc.mkdirs(testDir, FsPermission.getDefault()));
+
+    assertTrue(fc.exists(testDir));
+    assertFalse(fc.isFile(testDir));
+    
+    assertTrue(fc.mkdirs(testDir, FsPermission.getDefault()));
+
+    assertTrue(fc.exists(testDir));
+    assertFalse(fc.isFile(testDir));
+
+    Path parentDir = testDir.getParent();
+    assertTrue(fc.exists(parentDir));
+    assertFalse(fc.isFile(parentDir));
+
+    Path grandparentDir = parentDir.getParent();
+    assertTrue(fc.exists(grandparentDir));
+    assertFalse(fc.isFile(grandparentDir));
+    
+  }
+  
+  public void testMkdirsFailsForSubdirectoryOfExistingFile() throws Exception {
+    Path testDir = path("/test/hadoop");
+    assertFalse(fc.exists(testDir));
+    assertTrue(fc.mkdirs(testDir, FsPermission.getDefault()));
+    assertTrue(fc.exists(testDir));
+    
+    createFile(path("/test/hadoop/file"));
+    
+    Path testSubDir = path("/test/hadoop/file/subdir");
+    try {
+      fc.mkdirs(testSubDir, FsPermission.getDefault());
+      fail("Should throw IOException.");
+    } catch (IOException e) {
+      // expected
+    }
+    assertFalse(fc.exists(testSubDir));
+    
+    Path testDeepSubDir = path("/test/hadoop/file/deep/sub/dir");
+    try {
+      fc.mkdirs(testDeepSubDir, FsPermission.getDefault());
+      fail("Should throw IOException.");
+    } catch (IOException e) {
+      // expected
+    }
+    assertFalse(fc.exists(testDeepSubDir));
+    
+  }
+  
+  public void testGetFileStatusThrowsExceptionForNonExistentFile() 
+    throws Exception {
+    try {
+      fc.getFileStatus(path("/test/hadoop/file"));
+      fail("Should throw FileNotFoundException");
+    } catch (FileNotFoundException e) {
+      // expected
+    }
+  } 
+  
+  public void testListStatusThrowsExceptionForNonExistentFile() throws Exception {
+    try {
+      fc.listStatus(path("/test/hadoop/file"));
+      fail("Should throw FileNotFoundException");
+    } catch (FileNotFoundException fnfe) {
+      // expected
+    }
+  }
+  
+  public void testListStatus() throws Exception {
+    Path[] testDirs = { path("/test/hadoop/a"),
+                        path("/test/hadoop/b"),
+                        path("/test/hadoop/c/1"), };
+    assertFalse(fc.exists(testDirs[0]));
+
+    for (Path path : testDirs) {
+      assertTrue(fc.mkdirs(path, FsPermission.getDefault()));
+    }
+
+    FileStatus[] paths = fc.listStatus(path("/test"));
+    assertEquals(1, paths.length);
+    assertEquals(path("/test/hadoop"), paths[0].getPath());
+
+    paths = fc.listStatus(path("/test/hadoop"));
+    assertEquals(3, paths.length);
+    assertEquals(path("/test/hadoop/a"), paths[0].getPath());
+    assertEquals(path("/test/hadoop/b"), paths[1].getPath());
+    assertEquals(path("/test/hadoop/c"), paths[2].getPath());
+
+    paths = fc.listStatus(path("/test/hadoop/a"));
+    assertEquals(0, paths.length);
+  }
+  
+  public void testWriteReadAndDeleteEmptyFile() throws Exception {
+    writeReadAndDelete(0);
+  }
+
+  public void testWriteReadAndDeleteHalfABlock() throws Exception {
+    writeReadAndDelete(getBlockSize() / 2);
+  }
+
+  public void testWriteReadAndDeleteOneBlock() throws Exception {
+    writeReadAndDelete(getBlockSize());
+  }
+  
+  public void testWriteReadAndDeleteOneAndAHalfBlocks() throws Exception {
+    writeReadAndDelete(getBlockSize() + (getBlockSize() / 2));
+  }
+  
+  public void testWriteReadAndDeleteTwoBlocks() throws Exception {
+    writeReadAndDelete(getBlockSize() * 2);
+  }
+  
+  private void writeReadAndDelete(int len) throws IOException {
+    Path path = path("/test/hadoop/file");
+    
+    fc.mkdirs(path.getParent(), FsPermission.getDefault());
+
+    FSDataOutputStream out = fc.create(path, FsPermission.getDefault(),
+        EnumSet.of(CreateFlag.CREATE), 
+        CreatOpts.repFac((short) 1), CreatOpts.blockSize(getBlockSize()));
+    out.write(data, 0, len);
+    out.close();
+
+    assertTrue("Exists", fc.exists(path));
+    assertEquals("Length", len, fc.getFileStatus(path).getLen());
+
+    FSDataInputStream in = fc.open(path);
+    byte[] buf = new byte[len];
+    in.readFully(0, buf);
+    in.close();
+
+    assertEquals(len, buf.length);
+    for (int i = 0; i < buf.length; i++) {
+      assertEquals("Position " + i, data[i], buf[i]);
+    }
+    
+    assertTrue("Deleted", fc.delete(path, false));
+    
+    assertFalse("No longer exists", fc.exists(path));
+
+  }
+  
+  public void testOverwrite() throws IOException {
+    Path path = path("/test/hadoop/file");
+    
+    fc.mkdirs(path.getParent(), FsPermission.getDefault());
+
+    createFile(path);
+    
+    assertTrue("Exists", fc.exists(path));
+    assertEquals("Length", data.length, fc.getFileStatus(path).getLen());
+    
+    try {
+      fc.create(path, FsPermission.getDefault(), EnumSet.of(CreateFlag.CREATE));
+      fail("Should throw IOException.");
+    } catch (IOException e) {
+      // Expected
+    }
+    
+    FSDataOutputStream out = fc.create(path, FsPermission.getDefault(),
+        EnumSet.of(CreateFlag.OVERWRITE));
+    out.write(data, 0, data.length);
+    out.close();
+    
+    assertTrue("Exists", fc.exists(path));
+    assertEquals("Length", data.length, fc.getFileStatus(path).getLen());
+    
+  }
+  
+  public void testWriteInNonExistentDirectory() throws IOException {
+    Path path = path("/test/hadoop/file");
+    assertFalse("Parent doesn't exist", fc.exists(path.getParent()));
+    createFile(path);
+    
+    assertTrue("Exists", fc.exists(path));
+    assertEquals("Length", data.length, fc.getFileStatus(path).getLen());
+    assertTrue("Parent exists", fc.exists(path.getParent()));
+  }
+
+  public void testDeleteNonExistentFile() throws IOException {
+    Path path = path("/test/hadoop/file");    
+    assertFalse("Doesn't exist", fc.exists(path));
+    assertFalse("No deletion", fc.delete(path, true));
+  }
+  
+  public void testDeleteRecursively() throws IOException {
+    Path dir = path("/test/hadoop");
+    Path file = path("/test/hadoop/file");
+    Path subdir = path("/test/hadoop/subdir");
+    
+    createFile(file);
+    assertTrue("Created subdir", fc.mkdirs(subdir, FsPermission.getDefault()));
+    
+    assertTrue("File exists", fc.exists(file));
+    assertTrue("Dir exists", fc.exists(dir));
+    assertTrue("Subdir exists", fc.exists(subdir));
+    
+    try {
+      fc.delete(dir, false);
+      fail("Should throw IOException.");
+    } catch (IOException e) {
+      // expected
+    }
+    assertTrue("File still exists", fc.exists(file));
+    assertTrue("Dir still exists", fc.exists(dir));
+    assertTrue("Subdir still exists", fc.exists(subdir));
+    
+    assertTrue("Deleted", fc.delete(dir, true));
+    assertFalse("File doesn't exist", fc.exists(file));
+    assertFalse("Dir doesn't exist", fc.exists(dir));
+    assertFalse("Subdir doesn't exist", fc.exists(subdir));
+  }
+  
+  public void testDeleteEmptyDirectory() throws IOException {
+    Path dir = path("/test/hadoop");
+    assertTrue(fc.mkdirs(dir, FsPermission.getDefault()));
+    assertTrue("Dir exists", fc.exists(dir));
+    assertTrue("Deleted", fc.delete(dir, false));
+    assertFalse("Dir doesn't exist", fc.exists(dir));
+  }
+  
+  public void testRenameNonExistentPath() throws Exception {
+    if (!renameSupported()) return;
+    Path src = path("/test/hadoop/NonExistingPath");
+    Path dst = path("/test/new/newpath");
+    try {
+      boolean res = fc.rename(src, dst);
+      System.out.println("Rename of non-existing file returned " + res);
+      assertTrue("rename of non existing path should have failed", false);
+    } catch (Exception e) {
+      // expected
+    }
+  }
+
+  public void testRenameFileMoveToNonExistentDirectory() throws Exception {
+    if (!renameSupported()) return;
+    
+    Path src = path("/test/hadoop/file");
+    createFile(src);
+    Path dst = path("/test/NonExisting/foo");
+    rename(src, dst, true, true, false);
+  }
+
+  public void testRenameFileMoveToExistingDirectory() throws Exception {
+    if (!renameSupported()) return;
+    
+    Path src = path("/test/hadoop/file");
+    createFile(src);
+    Path dst = path("/test/new/newfile");
+    fc.mkdirs(dst.getParent(), FsPermission.getDefault());
+    rename(src, dst, true, false, true);
+  }
+
+  public void testRenameFileAsExistingFile() throws Exception {
+    if (!renameSupported()) return;
+    
+    Path src = path("/test/hadoop/file");
+    createFile(src);
+    Path dst = path("/test/new/newfile");
+    createFile(dst);
+    rename(src, dst, false, true, true);
+  }
+
+  public void testRenameFileAsExistingDirectory() throws Exception {
+    if (!renameSupported()) return;
+    
+    Path src = path("/test/hadoop/file");
+    createFile(src);
+    Path dst = path("/test/new/newdir");
+    fc.mkdirs(dst, FsPermission.getDefault());
+    rename(src, dst, true, false, true);
+    assertTrue("Destination changed",
+        fc.exists(path("/test/new/newdir/file")));
+  }
+  
+  public void testRenameDirectoryMoveToNonExistentDirectory() 
+    throws Exception {
+    if (!renameSupported()) return;
+    
+    Path src = path("/test/hadoop/dir");
+    fc.mkdirs(src, FsPermission.getDefault());
+    Path dst = path("/test/new/newdir");
+    rename(src, dst, true, true, false);
+  }
+  
+  public void testRenameDirectoryMoveToExistingDirectory() throws Exception {
+    if (!renameSupported()) return;
+    
+    Path src = path("/test/hadoop/dir");
+    fc.mkdirs(src, FsPermission.getDefault());
+    createFile(path("/test/hadoop/dir/file1"));
+    createFile(path("/test/hadoop/dir/subdir/file2"));
+    
+    Path dst = path("/test/new/newdir");
+    fc.mkdirs(dst.getParent(), FsPermission.getDefault());
+    rename(src, dst, true, false, true);
+    
+    assertFalse("Nested file1 exists",
+        fc.exists(path("/test/hadoop/dir/file1")));
+    assertFalse("Nested file2 exists",
+        fc.exists(path("/test/hadoop/dir/subdir/file2")));
+    assertTrue("Renamed nested file1 exists",
+        fc.exists(path("/test/new/newdir/file1")));
+    assertTrue("Renamed nested exists",
+        fc.exists(path("/test/new/newdir/subdir/file2")));
+  }
+  
+  public void testRenameDirectoryAsExistingFile() throws Exception {
+    if (!renameSupported()) return;
+    
+    Path src = path("/test/hadoop/dir");
+    fc.mkdirs(src, FsPermission.getDefault());
+    Path dst = path("/test/new/newfile");
+    createFile(dst);
+    rename(src, dst, false, true, true);
+  }
+  
+  public void testRenameDirectoryAsExistingDirectory() throws Exception {
+    if (!renameSupported()) return;
+    
+    Path src = path("/test/hadoop/dir");
+    fc.mkdirs(src, FsPermission.getDefault());
+    createFile(path("/test/hadoop/dir/file1"));
+    createFile(path("/test/hadoop/dir/subdir/file2"));
+    
+    Path dst = path("/test/new/newdir");
+    fc.mkdirs(dst, FsPermission.getDefault());
+    rename(src, dst, true, false, true);
+    assertTrue("Destination changed",
+        fc.exists(path("/test/new/newdir/dir")));    
+    assertFalse("Nested file1 exists",
+        fc.exists(path("/test/hadoop/dir/file1")));
+    assertFalse("Nested file2 exists",
+        fc.exists(path("/test/hadoop/dir/subdir/file2")));
+    assertTrue("Renamed nested file1 exists",
+        fc.exists(path("/test/new/newdir/dir/file1")));
+    assertTrue("Renamed nested exists",
+        fc.exists(path("/test/new/newdir/dir/subdir/file2")));
+  }
+
+  public void testInputStreamClosedTwice() throws IOException {
+    //HADOOP-4760 according to Closeable#close() closing already-closed 
+    //streams should have no effect. 
+    Path src = path("/test/hadoop/file");
+    createFile(src);
+    FSDataInputStream in = fc.open(src);
+    in.close();
+    in.close();
+  }
+  
+  public void testOutputStreamClosedTwice() throws IOException {
+    //HADOOP-4760 according to Closeable#close() closing already-closed 
+    //streams should have no effect. 
+    Path src = path("/test/hadoop/file");
+    FSDataOutputStream out = fc.create(src, FsPermission.getDefault(),
+        EnumSet.of(CreateFlag.CREATE));
+    out.writeChar('H'); //write some data
+    out.close();
+    out.close();
+  }
+  
+  protected Path path(String pathString) {
+    return fc.makeQualified(new Path(pathString));
+  }
+  
+  protected void createFile(Path path) throws IOException {
+    FSDataOutputStream out = fc.create(path, FsPermission.getDefault(),
+        EnumSet.of(CreateFlag.CREATE));
+    out.write(data, 0, data.length);
+    out.close();
+  }
+  
+  private void rename(Path src, Path dst, boolean renameShouldSucceed,
+      boolean srcExists, boolean dstExists) throws IOException {
+    assertEquals("Rename result", renameShouldSucceed, fc.rename(src, dst));
+    assertEquals("Source exists", srcExists, fc.exists(src));
+    assertEquals("Destination exists", dstExists, fc.exists(dst));
+  }
+}

Property changes on: src/test/core/org/apache/hadoop/fs/FileContextBaseTest.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Index: src/test/core/org/apache/hadoop/fs/TestLocalFilesContext.java
===================================================================
--- src/test/core/org/apache/hadoop/fs/TestLocalFilesContext.java	(revision 0)
+++ src/test/core/org/apache/hadoop/fs/TestLocalFilesContext.java	(revision 0)
@@ -0,0 +1,23 @@
+package org.apache.hadoop.fs;
+
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Before;
+
+public class TestLocalFilesContext extends FileContextBaseTest {
+
+  @Before
+  @Override
+  protected void setUp() throws Exception {
+    fc = FileContext.getLocalFSFileContext();
+  }
+  
+  static Path wd = null;
+  protected Path getDefaultWorkingDirectory() throws IOException {
+    if (wd == null)
+      wd = FileSystem.getLocal(new Configuration()).getWorkingDirectory();
+    return wd;
+  }
+}

Property changes on: src/test/core/org/apache/hadoop/fs/TestLocalFilesContext.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Index: src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
===================================================================
--- src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java	(revision 811216)
+++ src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java	(working copy)
@@ -343,7 +343,7 @@
 
     @Override
     public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
-	throws IOException {
-	moveFromLocalFile(tmpLocalFile, fsOutputFile);
+                throws IOException {
+      moveFromLocalFile(tmpLocalFile, fsOutputFile);
     }
 }
Index: src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
===================================================================
--- src/java/org/apache/hadoop/fs/s3/S3FileSystem.java	(revision 811216)
+++ src/java/org/apache/hadoop/fs/s3/S3FileSystem.java	(working copy)
@@ -233,8 +233,25 @@
                             blockSize, progress, bufferSize),
          statistics);
   }
+  
+  @Override
+  protected FSDataOutputStream createAbsPerm(Path f,
+      FsPermission absolutePermission, EnumSet<CreateFlag> flag,
+      int bufferSize, short replication, long blockSize, int bytesPerChecksum)
+      throws IOException {
+    // S3 does not have permissions - no diff between masked and unmasked perms
+    return this.create(f, absolutePermission, flag, bufferSize,
+        replication, blockSize, null);
+  }
 
   @Override
+  protected boolean mkdirsAbsPerm(Path f, FsPermission absolutePermission)
+      throws IOException {
+    // S3 does not have permissions - no diff between masked and unmasked perms
+    return this.mkdirs(f, absolutePermission);
+  }
+
+  @Override
   public FSDataInputStream open(Path path, int bufferSize) throws IOException {
     INode inode = checkFile(path);
     return new FSDataInputStream(new S3InputStream(getConf(), store, inode,
@@ -374,4 +391,6 @@
       return ret == null ? 0L : ret[0].getLength();
     }
   }
+
+
 }
Index: src/java/org/apache/hadoop/fs/FileSystem.java
===================================================================
--- src/java/org/apache/hadoop/fs/FileSystem.java	(revision 811216)
+++ src/java/org/apache/hadoop/fs/FileSystem.java	(working copy)
@@ -42,6 +42,11 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileContext.BlockSize;
+import org.apache.hadoop.fs.FileContext.BufferSize;
+import org.apache.hadoop.fs.FileContext.BytesPerChecksum;
+import org.apache.hadoop.fs.FileContext.CreatOpts;
+import org.apache.hadoop.fs.FileContext.ReplicationFac;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -536,7 +541,7 @@
    * Opens an FSDataOutputStream at the indicated Path with write-progress
    * reporting.
    * @param f the file name to open.
-   * @param permission
+   * @param permission - will applied agains umask
    * @param flag determines the semantic of this create.
    * @param bufferSize the size of the buffer to be used.
    * @param replication required block replication for the file.
@@ -550,8 +555,45 @@
       EnumSet<CreateFlag> flag, int bufferSize, short replication, long blockSize,
       Progressable progress) throws IOException ;
   
+  /*
+   * This version of the create method assumes that the permission 
+   * of create does not matter
+   * It has been added to support the FileContext that processes the permission
+   * with umask before calling this method.
+   * This a temporary method added to support the transition from FileSystem
+   * to FileContext for user applications.
+   */
+  protected FSDataOutputStream createAbsPerm(Path f,
+     FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
+     short replication, long blockSize, int bytesPerChecksum) throws IOException {
+    
+    // Default impl  assumes that permissions do not matter and hence
+    // calling the regular create is good enough.
+    // FSs that implement permissions should override this.
+    
+    return this.create(f, absolutePermission, flag, bufferSize, replication,
+        blockSize, null);
+    
+  }
+  
 
   /**
+   * This version of the mkdirs method assumes that the permission.
+   * It has been added to support the FileContext that processes the the permission
+   * with umask before calling this method.
+   * This a temporary method added to support the transition from FileSystem
+   * to FileContext for user applications.
+   */
+  protected boolean mkdirsAbsPerm(Path f, FsPermission absolutPermission
+      ) throws IOException {
+    // Default impl is to assume that permissions do not matter and hence
+    // calling the regular mkdirs is good enough.
+    // FSs that implement permissions should override this.
+   return this.mkdirs(f, absolutPermission);
+  }
+
+
+  /**
    * Creates the given Path as a brand-new zero-length file.  If
    * create fails, or if it already existed, return false.
    */
@@ -1141,6 +1183,23 @@
    * @return the directory pathname
    */
   public abstract Path getWorkingDirectory();
+  
+  
+  /**
+   * Note: with the new FilesContext class, getWorkingDirectory()
+   * will be removed. 
+   * The working directory is implemented in FilesContext.
+   * 
+   * Some file systems like LocalFileSystem have an initial workingDir
+   * that we use as the starting workingDir. For other file systems
+   * like HDFS there is no built in notion of an inital workingDir.
+   * 
+   * @return if there is built in notion of workingDir then it
+   * is returned; else a null is returned.
+   */
+  protected Path getInitialWorkingDirectory() {
+    return null;
+  }
 
   /**
    * Call {@link #mkdirs(Path, FsPermission)} with default permission.
Index: src/java/org/apache/hadoop/fs/FilterFileSystem.java
===================================================================
--- src/java/org/apache/hadoop/fs/FilterFileSystem.java	(revision 811216)
+++ src/java/org/apache/hadoop/fs/FilterFileSystem.java	(working copy)
@@ -167,7 +167,11 @@
   public Path getWorkingDirectory() {
     return fs.getWorkingDirectory();
   }
-
+  
+  protected Path getInitialWorkingDirectory() {
+    return fs.getInitialWorkingDirectory();
+  }
+  
   /** {@inheritDoc} */
   @Override
   public FsStatus getStatus(Path p) throws IOException {
@@ -276,4 +280,19 @@
       ) throws IOException {
     fs.setPermission(p, permission);
   }
+
+  @Override
+  protected FSDataOutputStream createAbsPerm(Path f,
+      FsPermission absolutePermission, EnumSet<CreateFlag> flag,
+      int bufferSize, short replication, long blockSize, int bytesPerChecksum)
+      throws IOException {
+    return fs.createAbsPerm(f, absolutePermission, flag,
+        bufferSize, replication, blockSize, bytesPerChecksum);
+  }
+
+  @Override
+  protected boolean mkdirsAbsPerm(Path f, FsPermission abdolutePermission)
+      throws IOException {
+    return fs.mkdirsAbsPerm(f, abdolutePermission);
+  }
 }
Index: src/java/org/apache/hadoop/fs/RawLocalFileSystem.java
===================================================================
--- src/java/org/apache/hadoop/fs/RawLocalFileSystem.java	(revision 811216)
+++ src/java/org/apache/hadoop/fs/RawLocalFileSystem.java	(working copy)
@@ -46,7 +46,7 @@
   private Path workingDir;
   
   public RawLocalFileSystem() {
-    workingDir = new Path(System.getProperty("user.dir")).makeQualified(this);
+    workingDir = getInitialWorkingDirectory();
   }
   
   /** Convert a path to a File. */
@@ -258,10 +258,31 @@
    
     FSDataOutputStream out = create(f,
         flag.contains(CreateFlag.OVERWRITE), bufferSize, replication, blockSize, progress);
-    setPermission(f, permission);
+    setPermission(f, permission.applyUMask(FsPermission.getUMask(getConf())));
     return out;
   }
   
+
+  @Override
+  protected FSDataOutputStream createAbsPerm(Path f,
+      FsPermission absolutePermission, EnumSet<CreateFlag> flag,
+      int bufferSize, short replication, long blockSize, int bytesPerChecksum)
+      throws IOException {
+    
+    if(flag.contains(CreateFlag.APPEND)){
+      if (!exists(f)){
+        if(flag.contains(CreateFlag.CREATE))
+          return create(f, false, bufferSize, replication, blockSize, null);
+      }
+      return append(f, bufferSize, null);
+    }
+ 
+    FSDataOutputStream out = create(f, flag.contains(CreateFlag.OVERWRITE),
+                                    bufferSize, replication, blockSize, null);
+    setPermission(f, absolutePermission);
+    return out;
+  }
+
   public boolean rename(Path src, Path dst) throws IOException {
     if (pathToFile(src).renameTo(pathToFile(dst))) {
       return true;
@@ -322,7 +343,17 @@
     return b;
   }
   
+
   @Override
+  protected boolean mkdirsAbsPerm(Path f, FsPermission absolutePermission)
+      throws IOException {
+    boolean b = mkdirs(f);
+    setPermission(f, absolutePermission);
+    return b;
+  }
+  
+  
+  @Override
   public Path getHomeDirectory() {
     return new Path(System.getProperty("user.home")).makeQualified(this);
   }
@@ -339,6 +370,11 @@
   public Path getWorkingDirectory() {
     return workingDir;
   }
+  
+  @Override
+  protected Path getInitialWorkingDirectory() {
+    return new Path(System.getProperty("user.dir")).makeQualified(this);
+  }
 
   /** {@inheritDoc} */
   @Override
@@ -503,4 +539,5 @@
     String output = Shell.execCommand(args);
     return output;
   }
+
 }
Index: src/java/org/apache/hadoop/fs/FileContext.java
===================================================================
--- src/java/org/apache/hadoop/fs/FileContext.java	(revision 0)
+++ src/java/org/apache/hadoop/fs/FileContext.java	(revision 0)
@@ -0,0 +1,1295 @@
+package org.apache.hadoop.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * The FileContext class provides an interface to the application writer for
+ * using the Hadoop file system.
+ * It provides a set of methods for the usual operation: create, open, 
+ * list, etc 
+ * 
+ * *** Path Names ***
+ * 
+ * The Hadoop file system supports a URI name space and URI names.
+ *   TBD - explain more here.
+ * Two common Hadoop file systems implementations are
+ *   the local file system: file:///path
+ *   the hdfs file system hdfs://nnAddress:nnPort/path
+ *   
+ * While using URI names is very flexible, it requires knowing the name or address
+ * of the server. For convenience one often wants to access the default system
+ * in your environment; to facilitate this Hadoop supports a notion of a
+ * default file system. The user can set his default file system, although
+ * this is typically set up for you in your environment in your default config.
+ * A default filesystem imples a  default scheme and authority;
+ * slash-relative names (such as /for/bar) are resolved relative to that default FS.
+ * Similarly a user can also have working-directory-relative names (i.e. names
+ * not starting with a slash). While the working directory is generally in the
+ * same default FS, the wd can be in a different FS; in particular, changing
+ * the default file system DOES NOT change the working directory,
+ * 
+ *  Hence Hadoop path names can be one of:
+ *       fully qualified URI:  scheme://authority/path
+ *       slash relative names: /path    - relative to the default file system
+ *       wd-relative names:    path        - relative to the working dir
+ *       
+ *       Relative paths with scheme (scheme:foo/bar) are illegal
+ *  
+ *  ****The Role of the FileContext and configuration defaults****
+ *  The FileContext provides file namespace context for resolving file names;
+ *  it also contains the umask for permissions, In that sense it is like the
+ *  per-process file related state in Unix system.
+ *  These in general are obtained from the default configuration file
+ *  in your environment,  (@see xxx for details).
+ *  
+ *  No other configuration parameters are obtained from the default config as 
+ *  far as the file context layer is concerned. All file systems instance
+ *  (ie deployments of file systems) have default properties; we call these are
+ *  server side (SS) defaults. Operation like create allow one to select many 
+ *  properties: you either pass them in explicitly as parameters or 
+ *  one can choose to used the SS properties.
+ *  
+ *  The filsystem related SS defaults are
+ *    - the home directory (default is "/user/<userName>")
+ *    - the initil wd (only for local fs)
+ *    - replication factor
+ *    - block size
+ *    - buffer size
+ *    - bytesPerChecksum (if used).
+ *
+ * 
+ * *** Usage Model for the FileContext class ***
+ * 
+ * Example 1: use the default config read from the $HADOOP_CONFIG/core.xml.
+ *            Unspecified values come from core-defaults.xml in the release jar.
+ *            
+ *     myFiles = getFileContext(); // uses the default config 
+ *     myFiles.create(path, ...);
+ *     myFiles.setWorkingDir(path)
+ *     myFiles.open (path, ...);
+ *     myFiles.setDefaultReplicationFactor(..); // Can change some defaults     
+ *     
+ * Example 2: Use a specific config, ignoring $HADOOP_CONFIG
+ *    configX = someConfigSomeOnePassedToYou.
+ *    myFiles = getFileContext(theConfig); // Copies configX (not a pointer) 
+ *    myFiles.create(path, ...);
+ *    myFiles.setWorkingDir(path)
+ *    myFiles.setDefaultReplicationFactor(..); // Can change some defaults
+ *                                             // But configX is unchanged
+ *                                             
+ * Other ways of creating new FileContexts:
+ *   getLocalFSFileContext(...)  // local filesystem is the default FS
+ *   getLocalFileContext(URI, ...) // where specified URI is default FS.
+ *    
+ */
+
+/**
+ * Also the exceptions thrown by the methods have better specified.
+ */
+public class FileContext {
+  
+  /**
+   * The config variables that define the context in which this FileContext
+   * operates
+   *
+   */   
+  private URI defaultFsURI;
+  private FileSystem defaultFS; // the default FS for this FileContext.
+  private Path workingDir;          // Fully qualified
+  FsPermission umask;
+  private Configuration theConfig;
+
+ 
+  
+  private void initFromConfig(final Configuration conf) throws IOException {
+    // Extract the keys relavant to the FileContext layer;
+    // However keep pointer to the rest of the config since it needs to 
+    // be passed to the lower layer FileSystem for config variables it
+    // needs.
+      theConfig = conf;
+      defaultFsURI = URI.create(FsConfig.getDefaultFS(conf));
+      defaultFS = FileSystem.get(defaultFsURI,  conf);
+      umask = FsPermission.getUMask(conf);
+      
+      /*
+       * WorkingDir is implemented at the FileContext layer 
+       * NOT at the FileSystem layer. 
+       * If the DefaultFS, such as localFilesystem has a notion of
+       *  builtin WD, we use that as the initial WD.
+       *  Otherwise the WD is initialized to the home directory.
+       */
+      workingDir = defaultFS.getInitialWorkingDirectory();
+      if (workingDir == null) {
+        workingDir = defaultFS.getHomeDirectory();
+      }
+  }
+    
+  private Path makeAbsolute(Path f) {
+    if (f.isAbsolute()) {
+      return f;
+    } else {
+      return new Path(workingDir, f);
+    }
+  }
+
+  /**
+   * Pathnames with scheme and relative path are illegal
+   * @param path to be checked
+   * @throws IOException if of type scheme:foo/bar
+   */
+  private static void isNotSchemeWithRelative(final Path path) throws IOException {
+    if (path.toUri().isAbsolute() && !path.isPathComponentAbsolute()) {
+      // path of type scheme:foo/bar are not supported
+      throw new IllegalArgumentException(
+          "Unsupported name: has scheme but relative path-part");
+    }
+  }
+  
+  /**
+   * Get the filesystem of supplied path
+   * @param f
+   * @return the filesystem of the path
+   * @throws IOException
+   */
+  private FileSystem getFSofPath(final Path f) throws IOException {
+    isNotSchemeWithRelative(f);
+    try { 
+      // Is it the default FS for this Fils?
+      defaultFS.checkPath(f);
+      return defaultFS;
+    } catch (Exception e) { // it is different FileSystem
+      return FileSystem.get(f.toUri(), theConfig);
+    }
+  }
+  
+
+
+  /**
+   * This method sets the default file system, overiding what was derived from
+   * the config file.
+   * 
+   * It is deliberated not public -if caller wants a FC with different defaultFS
+   * he simply needs to construct a new FileContext
+   * 
+   * @param uri - the new default file system 
+   *   
+   * @throws IOException if FileSystem denoted by uri cannot be created
+   */
+  private void setDefaultFileSystem(final URI uri) throws IOException {
+    defaultFsURI = uri;
+    defaultFS = FileSystem.get(defaultFsURI,  theConfig);
+  }
+  
+  /**
+   * This method sets the default file system, overiding what was derived from
+   * the config file.
+   * 
+   * It is deliberated not public -if caller wants a FC with different defaultFS
+   * he simply needs to construct a new FileContext
+   * 
+   * @param fs - the new default file system 
+   */
+  private void setDefaultFileSystem(final FileSystem fs) {
+    defaultFsURI = fs.getUri();
+    defaultFS = fs;;
+  }
+  
+  
+  
+  
+  /**
+   * This constructor uses the default config read from the
+   * $HADOOP_CONFIG/core.xml,
+   * Unspecified key-values for config are defaulted from core-defaults.xml
+   * in the release jar.
+   * Unspecified key values for config are defaulted from core-defaults.xml in the release jar.
+   * 
+   * @param conf - the config used.
+   * @throws IOException  if default FileSystem in the config  cannot be created
+   */
+  private FileContext() throws IOException {
+     this(new Configuration());
+  };
+  
+  /**
+   * This constructor uses the passed config. The keys relevant to the
+   * FileContext layer are extracted at time of construction. Changes to the
+   * config after the call are ignore by the FileContext layer. 
+   * The conf is passed to lower layers like FileSystem and HDFS which
+   * pick up their own config variables. 
+   * Unspecified key values for config are defaulted from core-defaults.xml in the release jar.
+   * 
+   * @param conf - the config used.
+   * @throws IOException  if default FileSystem in the config  cannot be created
+   */
+  private FileContext(final Configuration conf) throws IOException {
+    initFromConfig(conf);
+    util = new Util(this); // for the inner class
+  };
+  
+
+  public static final URI LOCAL_FS_URI = URI.create("file:///");
+
+  
+  public static final FsPermission defaultPerm = FsPermission.getDefault();
+  
+  
+  
+  /**
+   * Create a FileContext using the default config read from the
+   * $HADOOP_CONFIG/core.xml,
+   * Unspecified key-values for config are defaulted from core-defaults.xml
+   * in the release jar.
+   * 
+   * @throws IOException if default FileSystem in the config  cannot be created
+   */
+  public static FileContext getFileContext() throws IOException {
+    return new FileContext();
+  }
+  
+  /**
+   * Create a FileContext using the passed config.
+   * The keys relavant to the
+   * FileContext layer are extracted at time of construction. Changes to the
+   * config after the call are ignore by the FileContext layer. 
+   * The conf is passed to lower layers like FileSystem and HDFS which
+   * pick up their own config variables. 
+   * Unspecified key values for config are defaulted from core-defaults.xml
+   * in the release jar.
+   * 
+   * @param conf
+   * @return new FileContext
+   * @throws IOException  if default FileSystem in the config  cannot be created
+   */
+  public static FileContext getFileContext(final Configuration conf) throws IOException {
+    return new FileContext(conf);
+  }
+  
+  /**
+   * 
+   * @return a FileContext for the local file system using the default config.
+   * @throws IOException 
+   */
+  public static FileContext getLocalFSFileContext() throws IOException {
+    return getFileContext(LOCAL_FS_URI);
+  }
+  
+  
+  /**
+   * 
+   * @return a FileContext for the local file system using the specified config.
+   * @throws IOException 
+   */
+  public static FileContext getLocalFSFileContext(final Configuration conf)
+                                                        throws IOException {
+    return getFileContext(LOCAL_FS_URI, conf);
+  }
+  
+  /**
+   * Create a FileContext for specified URI using the default config.
+   * 
+   * @param uri
+   * @return a FileSystem for the specified URI
+   * @throws IOException if the filesysem with specified cannot be created
+   */
+  public static FileContext getFileContext(final URI uri) throws IOException {
+    FileContext fc = new FileContext();
+    fc.setDefaultFileSystem(uri);
+    return fc;
+  }
+  
+  /**
+   * Create a FileContext for specified FileSystem using the default config.
+   * 
+   * @param uri
+   * @return a FileSystem for the specified URI
+   * @throws IOException if the filesysem with specified cannot be created
+   */
+  public static FileContext getFileContext(final FileSystem fs) throws IOException {
+    FileContext fc = new FileContext();
+    fc.setDefaultFileSystem(fs);
+    return fc;
+  }
+ 
+  /**
+   * Create a FileContext for specified URI using the specified config.
+   * 
+   * @param uri
+   * @param config
+   * @return a FileSystem for the specified URI
+   * @throws IOException if the filesysem with specified cannot be created
+   */
+  public static FileContext getFileContext(final URI uri,
+                    final Configuration conf) throws IOException {
+    FileContext fc = new FileContext(conf);
+    fc.setDefaultFileSystem(uri);
+    return fc;
+  }
+
+  
+  public FileSystem getDefaultFileSystem() {
+    return defaultFS;
+  }
+  
+  /**
+   * Set the working directory for wd-relative names (such a "foo/bar")
+   * @param p
+   * @throws IOException
+   * 
+   * newWdir can be one of 
+   *     - relative path:  "foo/bar";
+   *     - absolute without scheme: "/foo/bar"
+   *     - fully qualified with scheme: "xx://auth/foo/bar"
+   *  Illegal WDs:
+   *      - relative with scheme: "xx:foo/bar"
+   *      
+   *   Set the wd to the absolute path.
+   *   This will ensure that the defaultFS can be changed while leaving
+   *   the wd unchanged.   
+   */
+  public void setWorkingDirectory(final Path p) throws IOException {
+    isNotSchemeWithRelative(p);
+    workingDir =  new Path(new Path(defaultFsURI), p);
+  }
+  
+  
+  /**
+   * Gets the working directory for wd-relative names (such a "foo/bar")
+   */
+  public Path getWorkingDirectory() {
+    return workingDir;
+  }
+  
+  /**
+   * Make the path fully qualified if it is isn't. Use the default file system
+   * and working dir in this FileContext to qualify.
+   * @param p
+   * @return qualifed path
+   */
+  public Path makeQualified(final Path path) {
+    return path.makeQualified(defaultFsURI, getWorkingDirectory());
+  } 
+
+  /**
+   *Class to support the varargs for create() options.
+   *
+   */
+  public static class CreatOpts {
+    private CreatOpts() { };
+    public static BlockSize blockSize(long bs) { return new BlockSize(bs);}
+    public static BufferSize bufferSize(short bs) { return new BufferSize(bs);}
+    public static ReplicationFac repFac(short rf) { return new ReplicationFac(rf);}
+    public static BytesPerChecksum bytesPerChecksum(short crc) {
+      return new BytesPerChecksum(crc);
+    }
+  }
+  static class BlockSize extends CreatOpts {
+    final long blockSize;
+    protected BlockSize(long bs) { blockSize = bs; }
+    long getValue() { return blockSize; }
+  }
+  static class ReplicationFac extends CreatOpts {
+    final short replication;
+    protected ReplicationFac(short rf) { replication = rf; }
+    short getValue() { return replication; }
+  }
+  static class BufferSize extends CreatOpts {
+    final int bufferSize;
+    protected BufferSize(short bs) { bufferSize = bs; }
+    int getValue() { return bufferSize; }
+  }
+  static class BytesPerChecksum extends CreatOpts {
+    final int bytesPerChecksum;
+    protected BytesPerChecksum(short bs) { bytesPerChecksum = bs; }
+    int getValue() { return bytesPerChecksum; }
+  }
+  /**
+   * Create or overwrite file on indicated path and returns an output stream
+   *   for writing into the file.
+   * @param f the file name to open
+   * @param permission - permissions is set permission&~umask
+   * @param flag gives the semantics  of create: overwrite, append etc.
+   * @param opts  - varargs of CreateOpt which is Blocksize, ReplicationFactor,
+   *    BufferSize, BytesPerChecksum. The defaults if any are not
+   *    specifed is to use SS defaults for these properties.
+   * @throws IOException
+   * 
+   * @see #setPermission(Path, FsPermission)
+   */
+  public FSDataOutputStream create(final Path f,
+                                    final FsPermission permission,
+                                    final EnumSet<CreateFlag> createFlag,
+                                    CreatOpts... opts)
+                                                  throws IOException {
+    Path abs_f = makeAbsolute(f);
+    FsPermission absPerms = (permission == null ? 
+        FsPermission.getDefault() : permission).applyUMask(umask);
+    
+    // Note this is hack for now to get the tests to run
+    // We need get the SS defaults from the lower layer.
+    // Once HDFS-578 is done this will be fixed. TB Fixed.
+    int bufferSize = FsConfig.getDefaultIOBuffersize(theConfig);
+    short replication = FsConfig.getDefaultReplicationFactor(theConfig);
+    long blockSize = FsConfig.getDefaultBlockSize(theConfig);
+    int bytesPerChecksum = theConfig.getInt("dfs.write.packet.size", 64*1024);
+ 
+    for ( CreatOpts iOpt : opts) {
+      if (BlockSize.class.isInstance(iOpt)) {
+        blockSize = ((BlockSize) iOpt).getValue();
+      }
+      if (BufferSize.class.isInstance(iOpt)) {
+        bufferSize = ((BufferSize) iOpt).getValue();
+      }
+      if (ReplicationFac.class.isInstance(iOpt)) {
+        replication = ((ReplicationFac) iOpt).getValue();
+      }
+      if (BytesPerChecksum.class.isInstance(iOpt)) {
+        bytesPerChecksum = ((BytesPerChecksum) iOpt).getValue();
+      }
+    }
+
+
+    return getFSofPath(abs_f).createAbsPerm(abs_f, absPerms, createFlag,
+                        bufferSize, replication, blockSize, bytesPerChecksum);
+  }
+  
+  /**
+   * Make the given file and all non-existent parents into
+   * directories. Has the semantics of Unix 'mkdir -p'.
+   * Existence of the directory hierarchy is not an error.
+   * 
+   * @param dir - the dir to make
+   * @param permission - permissions is set permission&~umask
+   * @return true if the operation succeeds; false f dir already exists 
+   * @throws IOException when operation fails (e.g. permissions) etc.
+   */
+  public boolean mkdirs(final Path dir, final FsPermission permission)
+                                                      throws IOException {
+    Path abs_dir = makeAbsolute(dir);
+    FsPermission absPerms = (permission == null ? 
+          FsPermission.getDefault() : permission).applyUMask(umask);
+    return getFSofPath(abs_dir).mkdirsAbsPerm(abs_dir, absPerms);
+  }
+
+  /** Delete a file.
+  *
+  * @param f the path to delete.
+  * @param recursive if path is a directory and set to 
+  * true, the directory is deleted else throws an exception. In
+  * case of a file the recursive can be set to either true or false. 
+  * @return  true if delete is successful else false. 
+  * @throws IOException
+  */
+  public boolean delete(final Path f, final boolean recursive) 
+                                                  throws IOException {
+    Path abs_f = makeAbsolute(f);
+    return getFSofPath(abs_f).delete(abs_f, recursive);
+  }
+ 
+  /**
+   * Opens an FSDataInputStream at the indicated Path using
+   * default buffersize
+   * @param f the file name to open
+
+   */
+  public FSDataInputStream open(final Path f)
+    throws IOException {
+    final Path abs_f = makeAbsolute(f);
+      return getFSofPath(abs_f).open(abs_f);
+  }
+  
+  /**
+   * Opens an FSDataInputStream at the indicated Path.
+   * @param f the file name to open
+   * @param bufferSize the size of the buffer to be used.
+   */
+  public FSDataInputStream open(final Path f, int bufferSize)
+    throws IOException {
+    final Path abs_f = makeAbsolute(f);
+      return getFSofPath(abs_f).open(abs_f, bufferSize);
+  }
+  
+ /**
+  * Set replication for an existing file.
+  * 
+  * @param f file name
+  * @param replication new replication
+  * @throws IOException
+  * @return true if successful;
+  *         false if file does not exist or is a directory
+  */
+  public boolean setReplication(final Path f, final short replication)
+   throws IOException {
+    final Path abs_f = makeAbsolute(f);
+    return getFSofPath(abs_f).setReplication(abs_f, replication);
+  }
+
+ /**
+  * Renames Path src to Path dst.  
+   * @param src
+   * @param dst
+   * @return true if the rename suceeds
+   * @throws IOException
+   */
+  public boolean rename(final Path src, final Path dst)
+                                                throws IOException {
+    final Path abs_src  = makeAbsolute(src);
+    final Path abs_dst = makeAbsolute(dst);
+    FileSystem srcFS = getFSofPath(abs_src);
+    FileSystem dstFS = getFSofPath(abs_dst);
+    if(srcFS.getUri().equals(dstFS.getUri())) {
+      return srcFS.rename(abs_src, abs_dst);
+    }
+    throw new IOException("Renames across FileSystems not supported");
+  }
+  
+  /**
+   * Set permission of a path.
+   * @param f
+   * @param permission
+   */
+  public void setPermission(final Path f, final FsPermission permission)
+                                                      throws IOException {
+    final Path abs_f = makeAbsolute(f);
+    getFSofPath(abs_f).setPermission(abs_f, permission);
+  }
+
+  /**
+   * Set owner of a path (i.e. a file or a directory).
+   * The parameters username and groupname cannot both be null.
+   * @param f The path
+   * @param username If it is null, the original username remains unchanged.
+   * @param groupname If it is null, the original groupname remains unchanged.
+   */
+  public void setOwner(final Path f, final String username,
+                        final String groupname) throws IOException {
+    final Path abs_f = makeAbsolute(f);
+    getFSofPath(abs_f).setOwner(abs_f, username, groupname);
+  }
+
+
+  /**
+   * Set access time of a file
+   * @param f The path
+   * @param mtime Set the modification time of this file.
+   *              The number of milliseconds since Jan 1, 1970. 
+   *              A value of -1 means that this call should not set modification time.
+   * @param atime Set the access time of this file.
+   *              The number of milliseconds since Jan 1, 1970. 
+   *              A value of -1 means that this call should not set access time.
+   */
+  public void setTimes(final Path f, final long mtime, final long atime
+                                                      ) throws IOException {
+    final Path abs_f = makeAbsolute(f);
+    getFSofPath(abs_f).setTimes(abs_f, mtime, atime);
+  }
+
+
+  /**
+   * Get the checksum of a file.
+   *
+   * @param f The file path
+   * @return The file checksum.  The default return value is null,
+   *  which indicates that no checksum algorithm is implemented
+   *  in the corresponding FileSystem.
+   */
+  public FileChecksum getFileChecksum(final Path f) throws IOException {
+    final Path abs_f = makeAbsolute(f);
+    return getFSofPath(abs_f).getFileChecksum(abs_f);
+  }
+  
+  /**
+   * Set the verify checksum flag for the default filesystem.
+   *  This is only applicable if the 
+   * corresponding FileSystem supports checksum. By default doesn't do anything.
+   * @param verifyChecksum
+   * @param f - set the verifyChecksum for the Filesystem holding this path
+   */
+  public void setVerifyChecksum(final boolean verifyChecksum) {
+    defaultFS.setVerifyChecksum(verifyChecksum);
+  }
+
+  
+  /**
+   * Set the verify checksum flag for the  filesystem denoted by the path.
+   *  This is only applicable if the 
+   * corresponding FileSystem supports checksum. By default doesn't do anything.
+   * @param verifyChecksum
+   * @param f - set the verifyChecksum for the Filesystem containing this path
+   * @throws IOException 
+   */
+  public void setVerifyChecksum(final boolean verifyChecksum, final Path f)
+                                                          throws IOException {
+    final Path abs_f = makeAbsolute(f);
+    //TBD need to be implemented when add support to find the fs for the path.
+    // This can be different when we add symlinks.
+    throw new IOException("Not implemented yet");
+  }
+
+  /**
+   * Return a file status object that represents the path.
+   * @param f The path we want information from
+   * @return a FileStatus object
+   * @throws FileNotFoundException when the path does not exist;
+   *         IOException see specific implementation
+   */
+  public FileStatus getFileStatus(final Path f) throws IOException {
+    final Path abs_f = makeAbsolute(f);
+    return getFSofPath(abs_f).getFileStatus(abs_f);
+  }
+  
+  
+  
+  /**
+   * Return blockLocation of the given file for the given offset and len
+   *  For a nonexistent file or regions, null will be returned.
+   *
+   * This call is most helpful with DFS, where it returns 
+   * hostnames of machines that contain the given file.
+   * 
+   * @param fileStatus - get blocklocations of this file
+   * @param start position (byte offset)
+   * @param len (in bytes)
+   * @return block locations for given file
+   * @throws IOException
+   */
+  public BlockLocation[] getFileBlockLocations(final FileStatus fileStatus, 
+    final long start, final long len) throws IOException {
+      // TBD - this is wrong - need to make absolute.
+      // Perhaps FileSystem.getBlockLocaltions needs to be fixed.
+      throw new IOException("not implemented yet");
+      //return getFileSystem(fileStatus.getPath()).getFileBlockLocations(fileStatus, start, len);
+  }
+  
+  /**
+   * Returns a status object describing the use and capacity of the
+   * default file system. If the file system has multiple partitions, the
+   * use and capacity of the root partition is reflected.
+   * 
+   * @return a FsStatus object
+   * @throws IOException
+   *           
+   */
+  public FsStatus getFSStatus() throws IOException {
+    return getDefaultFileSystem().getStatus(null);
+  }
+  
+  /**
+   * Returns a status object describing the use and capacity of the
+   * file system denoted by the Parh argument p.
+   * If the file system has multiple partitions, the
+   * use and capacity of the partition pointed to by the specified
+   * path is reflected.
+   * @param p Path for which status should be obtained. null means of the
+   * root partition of the default file system. 
+   * @return a FsStatus object
+   * @throws IOException
+   *           see specific implementation
+   */
+  public FsStatus getFSStatus(final Path f) throws IOException {
+    return getFSofPath(f).getStatus(f);
+  }
+  
+  /**
+   * Does the file exist?
+   * @param f the  file or dir to be checked
+   */
+  public boolean exists(final Path f) throws IOException {
+    final Path abs_f = makeAbsolute(f);
+    return getFSofPath(abs_f).exists(abs_f);
+  }
+
+  /**
+   * Is a directory?
+   * Note: Avoid using this method if you already have FileStatus in hand.
+   * Instead reuse the FileStatus 
+   * returned by getFileStatus() or listStatus() methods.
+   * 
+   * @param f
+   * @return True iff the named path is a directory.
+   * @throws IOException
+   */
+  public boolean isDirectory(final Path f) throws IOException {
+    try {
+      final Path abs_f = makeAbsolute(f);
+      return getFileStatus(abs_f).isDir();
+    } catch (FileNotFoundException e) {
+      return false;               // f does not exist
+    }
+  }
+
+  /** True iff the named path is a regular file.
+   * Note: Avoid using this method  if you already have FileStatus in hand
+   * Instead reuse the FileStatus 
+   * returned by getFileStatus() or listStatus() methods.
+   */
+  public boolean isFile(final Path f) throws IOException {
+    try {
+      final Path abs_f = makeAbsolute(f);
+      return !getFileStatus(abs_f).isDir();
+    } catch (FileNotFoundException e) {
+      return false;               // f does not exist
+    }
+  }
+  
+ 
+  /**
+   * List the statuses of the files/directories in the given path if the path is
+   * a directory.
+   * 
+   * @param f
+   *          given path
+   * @return the statuses of the files/directories in the given path
+   * @throws IOException
+   */
+  public FileStatus[] listStatus(final Path f) throws IOException {
+    final Path abs_f = makeAbsolute(f);
+    return getFSofPath(abs_f).listStatus(abs_f);
+  }
+  
+  /**
+   * Utility methods built over the basic FileContext methods.
+   */
+  private Util util;
+  public Util util() {
+    return util;
+  }
+  
+  public class Util {
+    FileContext thisFC;
+    Util(FileContext fc) {
+      thisFC = fc;
+    }
+
+
+    /**
+     * Return a list of file status objects that corresponds to the list of paths
+     * excluding those non-existent paths.
+     * 
+     * @param paths
+     *          the list of paths we want information from
+     * @return a list of FileStatus objects
+     * @throws IOException
+     *           see specific implementation
+     */
+    private FileStatus[] getFileStatus(final Path[] paths) throws IOException {
+      if (paths == null) {
+        return null;
+      }
+      ArrayList<FileStatus> results = new ArrayList<FileStatus>(paths.length);
+      for (int i = 0; i < paths.length; i++) {
+        try {
+          results.add(thisFC.getFileStatus(paths[i]));
+        } catch (FileNotFoundException e) { // do nothing
+        }
+      }
+      return results.toArray(new FileStatus[results.size()]);
+    }
+    
+    /**
+     * Filter files/directories in the given list of paths using default
+     * path filter.
+     * 
+     * @param files
+     *          a list of paths
+     * @return a list of statuses for the files under the given paths after
+     *         applying the filter default Path filter
+     * @exception IOException
+     */
+    public FileStatus[] listStatus(final Path[] files)
+        throws IOException {
+      return listStatus(files, DEFAULT_FILTER);
+    }
+    
+  
+    /**
+     * Filter files/directories in the given path using the user-supplied path
+     * filter.
+     * 
+     * @param f
+     *          a path name
+     * @param filter
+     *          the user-supplied path filter
+     * @return an array of FileStatus objects for the files under the given path
+     *         after applying the filter
+     * @throws IOException
+     *           if encounter any problem while fetching the status
+     */
+    public FileStatus[] listStatus(final Path f, final PathFilter filter)
+                                                    throws IOException {
+      ArrayList<FileStatus> results = new ArrayList<FileStatus>();
+      listStatus(results, f, filter);
+      return results.toArray(new FileStatus[results.size()]);
+    }
+    
+  
+    /**
+     * Filter files/directories in the given list of paths using user-supplied
+     * path filter.
+     * 
+     * @param files
+     *          a list of paths
+     * @param filter
+     *          the user-supplied path filter
+     * @return a list of statuses for the files under the given paths after
+     *         applying the filter
+     * @exception IOException
+     */
+    public FileStatus[] listStatus(final Path[] files, final PathFilter filter)
+        throws IOException {
+      ArrayList<FileStatus> results = new ArrayList<FileStatus>();
+      for (int i = 0; i < files.length; i++) {
+        listStatus(results, files[i], filter);
+      }
+      return results.toArray(new FileStatus[results.size()]);
+    }
+  
+    /*
+     * Filter files/directories in the given path using the user-supplied path
+     * filter. Results are added to the given array <code>results</code>.
+     */
+    private void listStatus(ArrayList<FileStatus> results, final Path f,
+        PathFilter filter) throws IOException {
+      FileStatus listing[] = thisFC.listStatus(f);
+      if (listing != null) {
+        for (int i = 0; i < listing.length; i++) {
+          if (filter.accept(listing[i].getPath())) {
+            results.add(listing[i]);
+          }
+        }
+      }
+    }
+  
+    /**
+     * <p>Return all the files that match filePattern and are not checksum
+     * files. Results are sorted by their names.
+     * 
+     * <p>
+     * A filename pattern is composed of <i>regular</i> characters and
+     * <i>special pattern matching</i> characters, which are:
+     *
+     * <dl>
+     *  <dd>
+     *   <dl>
+     *    <p>
+     *    <dt> <tt> ? </tt>
+     *    <dd> Matches any single character.
+     *
+     *    <p>
+     *    <dt> <tt> * </tt>
+     *    <dd> Matches zero or more characters.
+     *
+     *    <p>
+     *    <dt> <tt> [<i>abc</i>] </tt>
+     *    <dd> Matches a single character from character set
+     *     <tt>{<i>a,b,c</i>}</tt>.
+     *
+     *    <p>
+     *    <dt> <tt> [<i>a</i>-<i>b</i>] </tt>
+     *    <dd> Matches a single character from the character range
+     *     <tt>{<i>a...b</i>}</tt>.  Note that character <tt><i>a</i></tt> must be
+     *     lexicographically less than or equal to character <tt><i>b</i></tt>.
+     *
+     *    <p>
+     *    <dt> <tt> [^<i>a</i>] </tt>
+     *    <dd> Matches a single character that is not from character set or range
+     *     <tt>{<i>a</i>}</tt>.  Note that the <tt>^</tt> character must occur
+     *     immediately to the right of the opening bracket.
+     *
+     *    <p>
+     *    <dt> <tt> \<i>c</i> </tt>
+     *    <dd> Removes (escapes) any special meaning of character <i>c</i>.
+     *
+     *    <p>
+     *    <dt> <tt> {ab,cd} </tt>
+     *    <dd> Matches a string from the string set <tt>{<i>ab, cd</i>} </tt>
+     *    
+     *    <p>
+     *    <dt> <tt> {ab,c{de,fh}} </tt>
+     *    <dd> Matches a string from the string set <tt>{<i>ab, cde, cfh</i>}</tt>
+     *
+     *   </dl>
+     *  </dd>
+     * </dl>
+     *
+     * @param pathPattern a regular expression specifying a pth pattern
+
+     * @return an array of paths that match the path pattern
+     * @throws IOException
+     */
+    public FileStatus[] globStatus(final Path pathPattern) throws IOException {
+      return globStatus(pathPattern, DEFAULT_FILTER);
+    }
+    
+    /**
+     * Return an array of FileStatus objects whose path names match pathPattern
+     * and is accepted by the user-supplied path filter. Results are sorted by
+     * their path names.
+     * Return null if pathPattern has no glob and the path does not exist.
+     * Return an empty array if pathPattern has a glob and no path matches it. 
+     * 
+     * @param pathPattern
+     *          a regular expression specifying the path pattern
+     * @param filter
+     *          a user-supplied path filter
+     * @return an array of FileStatus objects
+     * @throws IOException if any I/O error occurs when fetching file status
+     */
+    public FileStatus[] globStatus(final Path pathPattern,
+                              final PathFilter filter) throws IOException {
+      String filename = pathPattern.toUri().getPath();
+      List<String> filePatterns = GlobExpander.expand(filename);
+      if (filePatterns.size() == 1) {
+        return globStatusInternal(pathPattern, filter);
+      } else {
+        List<FileStatus> results = new ArrayList<FileStatus>();
+        for (String filePattern : filePatterns) {
+          FileStatus[] files = globStatusInternal(new Path(filePattern), filter);
+          for (FileStatus file : files) {
+            results.add(file);
+          }
+        }
+        return results.toArray(new FileStatus[results.size()]);
+      }
+    }
+
+    private FileStatus[] globStatusInternal(final Path pathPattern,
+                                final PathFilter filter) throws IOException {
+      Path[] parents = new Path[1];
+      int level = 0;
+      String filename = pathPattern.toUri().getPath();
+      
+      // path has only zero component
+      if ("".equals(filename) || Path.SEPARATOR.equals(filename)) {
+        return getFileStatus(new Path[]{pathPattern});
+      }
+
+      // path has at least one component
+      String[] components = filename.split(Path.SEPARATOR);
+      // get the first component
+      if (pathPattern.isAbsolute()) {
+        parents[0] = new Path(Path.SEPARATOR);
+        level = 1;
+      } else {
+        parents[0] = new Path(Path.CUR_DIR);
+      }
+
+      // glob the paths that match the parent path, i.e., [0, components.length-1]
+      boolean[] hasGlob = new boolean[]{false};
+      Path[] parentPaths = globPathsLevel(parents, components, level, hasGlob);
+      FileStatus[] results;
+      if (parentPaths == null || parentPaths.length == 0) {
+        results = null;
+      } else {
+        // Now work on the last component of the path
+        GlobFilter fp = new GlobFilter(components[components.length - 1], filter);
+        if (fp.hasPattern()) { // last component has a pattern
+          // list parent directories and then glob the results
+          results = listStatus(parentPaths, fp);
+          hasGlob[0] = true;
+        } else { // last component does not have a pattern
+          // get all the path names
+          ArrayList<Path> filteredPaths = new ArrayList<Path>(parentPaths.length);
+          for (int i = 0; i < parentPaths.length; i++) {
+            parentPaths[i] = new Path(parentPaths[i],
+              components[components.length - 1]);
+            if (fp.accept(parentPaths[i])) {
+              filteredPaths.add(parentPaths[i]);
+            }
+          }
+          // get all their statuses
+          results = getFileStatus(
+              filteredPaths.toArray(new Path[filteredPaths.size()]));
+        }
+      }
+
+      // Decide if the pathPattern contains a glob or not
+      if (results == null) {
+        if (hasGlob[0]) {
+          results = new FileStatus[0];
+        }
+      } else {
+        if (results.length == 0 ) {
+          if (!hasGlob[0]) {
+            results = null;
+          }
+        } else {
+          Arrays.sort(results);
+        }
+      }
+      return results;
+    }
+
+    /*
+     * For a path of N components, return a list of paths that match the
+     * components [<code>level</code>, <code>N-1</code>].
+     */
+    private Path[] globPathsLevel(Path[] parents, String[] filePattern,
+        int level, boolean[] hasGlob) throws IOException {
+      if (level == filePattern.length - 1)
+        return parents;
+      if (parents == null || parents.length == 0) {
+        return null;
+      }
+      GlobFilter fp = new GlobFilter(filePattern[level]);
+      if (fp.hasPattern()) {
+        parents = FileUtil.stat2Paths(listStatus(parents, fp));
+        hasGlob[0] = true;
+      } else {
+        for (int i = 0; i < parents.length; i++) {
+          parents[i] = new Path(parents[i], filePattern[level]);
+        }
+      }
+      return globPathsLevel(parents, filePattern, level + 1, hasGlob);
+    }
+
+ 
+    /**
+     * Copy file from src to dest
+     * @param src
+     * @param dest
+     * @return true if copy is successful
+     * @throws IOException
+     */
+    public boolean copy(final Path src, final Path dst)  throws IOException {
+       return copy(src, dst, false, false);
+     }
+     
+     /**
+     * Copy from src to dst, optionally deleting src and overwriting dst
+     * @param src
+     * @param dst
+     * @param deleteSource - delete src if true
+     * @param overwrite  overwrite dst if true; throw IOException if dst exists
+     *         and overwrite is false.
+     * @return true if copy is successful
+     * @throws IOException
+     */
+    public boolean copy(final Path src,  final Path dst,
+         boolean deleteSource, 
+         boolean overwrite)
+         throws IOException {
+       isNotSchemeWithRelative(src);
+       isNotSchemeWithRelative(dst);
+       FileSystem dstFs = getFSofPath(dst);
+       FileSystem srcFs = getFSofPath(src);
+       checkDest(src.getName(), dstFs, dst, false);
+       if (srcFs.isDirectory(src)) {
+         checkDependencies(srcFs, src, dstFs, dst);
+         if (!dstFs.mkdirs(dst)) {
+           throw new IOException("Failed to create destination directory `" + dst + "'");
+         }
+         FileStatus contents[] = srcFs.listStatus(src);
+         for (FileStatus content : contents) {
+           copy(content.getPath(), new Path(dst, content.getPath()),
+                deleteSource, overwrite);
+         }
+       } else {
+         InputStream in=null;
+         OutputStream out = null;
+         try {
+           in = srcFs.open(src);
+           out = dstFs.create(dst, overwrite);
+           IOUtils.copyBytes(in, out, theConfig, true);
+         } catch (IOException e) {
+           IOUtils.closeStream(out);
+           IOUtils.closeStream(in);
+           throw e;
+         }
+       }
+       if (deleteSource) {
+         return srcFs.delete(src, true);
+       } else {
+         return true;
+       }
+     }
+  }
+  final private static PathFilter DEFAULT_FILTER = new PathFilter() {
+    public boolean accept(final Path file) {
+      return true;
+    }
+  };
+  
+  /* A class that could decide if a string matches the glob or not */
+  private static class GlobFilter implements PathFilter {
+    private PathFilter userFilter = DEFAULT_FILTER;
+    private Pattern regex;
+    private boolean hasPattern = false;
+      
+    /** Default pattern character: Escape any special meaning. */
+    private static final char  PAT_ESCAPE = '\\';
+    /** Default pattern character: Any single character. */
+    private static final char  PAT_ANY = '.';
+    /** Default pattern character: Character set close. */
+    private static final char  PAT_SET_CLOSE = ']';
+      
+    GlobFilter() {
+    }
+      
+    GlobFilter(final String filePattern) throws IOException {
+      setRegex(filePattern);
+    }
+      
+    GlobFilter(final String filePattern, final PathFilter filter) throws IOException {
+      userFilter = filter;
+      setRegex(filePattern);
+    }
+      
+    private boolean isJavaRegexSpecialChar(char pChar) {
+      return pChar == '.' || pChar == '$' || pChar == '(' || pChar == ')' ||
+             pChar == '|' || pChar == '+';
+    }
+    
+    void setRegex(String filePattern) throws IOException {
+      int len;
+      int setOpen;
+      int curlyOpen;
+      boolean setRange;
+
+      StringBuilder fileRegex = new StringBuilder();
+
+      // Validate the pattern
+      len = filePattern.length();
+      if (len == 0)
+        return;
+
+      setOpen = 0;
+      setRange = false;
+      curlyOpen = 0;
+
+      for (int i = 0; i < len; i++) {
+        char pCh;
+          
+        // Examine a single pattern character
+        pCh = filePattern.charAt(i);
+        if (pCh == PAT_ESCAPE) {
+          fileRegex.append(pCh);
+          i++;
+          if (i >= len)
+            error("An escaped character does not present", filePattern, i);
+          pCh = filePattern.charAt(i);
+        } else if (isJavaRegexSpecialChar(pCh)) {
+          fileRegex.append(PAT_ESCAPE);
+        } else if (pCh == '*') {
+          fileRegex.append(PAT_ANY);
+          hasPattern = true;
+        } else if (pCh == '?') {
+          pCh = PAT_ANY;
+          hasPattern = true;
+        } else if (pCh == '{') {
+          fileRegex.append('(');
+          pCh = '(';
+          curlyOpen++;
+          hasPattern = true;
+        } else if (pCh == ',' && curlyOpen > 0) {
+          fileRegex.append(")|");
+          pCh = '(';
+        } else if (pCh == '}' && curlyOpen > 0) {
+          // End of a group
+          curlyOpen--;
+          fileRegex.append(")");
+          pCh = ')';
+        } else if (pCh == '[' && setOpen == 0) {
+          setOpen++;
+          hasPattern = true;
+        } else if (pCh == '^' && setOpen > 0) {
+        } else if (pCh == '-' && setOpen > 0) {
+          // Character set range
+          setRange = true;
+        } else if (pCh == PAT_SET_CLOSE && setRange) {
+          // Incomplete character set range
+          error("Incomplete character set range", filePattern, i);
+        } else if (pCh == PAT_SET_CLOSE && setOpen > 0) {
+          // End of a character set
+          if (setOpen < 2)
+            error("Unexpected end of set", filePattern, i);
+          setOpen = 0;
+        } else if (setOpen > 0) {
+          // Normal character, or the end of a character set range
+          setOpen++;
+          setRange = false;
+        }
+        fileRegex.append(pCh);
+      }
+        
+      // Check for a well-formed pattern
+      if (setOpen > 0 || setRange || curlyOpen > 0) {
+        // Incomplete character set or character range
+        error("Expecting set closure character or end of range, or }", 
+            filePattern, len);
+      }
+      regex = Pattern.compile(fileRegex.toString());
+    }
+      
+    boolean hasPattern() {
+      return hasPattern;
+    }
+      
+    public boolean accept(final Path path) {
+      return regex.matcher(path.getName()).matches() && userFilter.accept(path);
+    }
+      
+    private void error(final String s, final String pattern, final int pos)
+                                                        throws IOException {
+      throw new IOException("Illegal file pattern: "
+                            +s+ " for glob "+ pattern + " at " + pos);
+    }
+  }
+  
+  //
+  // Check destionation. Throw IOException if destination already exists
+  // and overwrite is not true
+  //
+  private static void checkDest(String srcName, FileSystem dstFS, Path dst,
+       boolean overwrite) throws IOException {
+   if (dstFS.exists(dst)) {
+     FileStatus sdst = dstFS.getFileStatus(dst);
+     if (sdst.isDir()) {
+       // TBD not very clear
+      if (null == srcName) {
+         throw new IOException("Target " + dst + " is a directory");
+       }
+       checkDest(null, dstFS, new Path(dst, srcName), overwrite);
+     } else if (!overwrite) {
+       throw new IOException("Target " + dst + " already exists");
+     }
+   }
+ }
+   
+   //
+  // If the destination is a subdirectory of the source, then
+  // generate exception
+  //
+  private static void checkDependencies(FileSystem srcFS, 
+                                         Path src, 
+                                         FileSystem dstFS, 
+                                         Path dst)
+                                         throws IOException {
+   if (srcFS == dstFS) {
+     String srcq = src.makeQualified(srcFS).toString() + Path.SEPARATOR;
+     String dstq = dst.makeQualified(dstFS).toString() + Path.SEPARATOR;
+     if (dstq.startsWith(srcq)) {
+       if (srcq.length() == dstq.length()) {
+         throw new IOException("Cannot copy " + src + " to itself.");
+       } else {
+         throw new IOException("Cannot copy " + src + " to its subdirectory " +
+                               dst);
+       }
+     }
+   }
+ }
+}
\ No newline at end of file

Property changes on: src/java/org/apache/hadoop/fs/FileContext.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Index: src/java/org/apache/hadoop/fs/Path.java
===================================================================
--- src/java/org/apache/hadoop/fs/Path.java	(revision 811216)
+++ src/java/org/apache/hadoop/fs/Path.java	(working copy)
@@ -126,6 +126,13 @@
     initialize(scheme, authority, path);
   }
 
+  /**
+   * Construct a path from a URI
+   */
+  public Path(URI aUri) {
+    uri = aUri;
+  }
+  
   /** Construct a Path from components. */
   public Path(String scheme, String authority, String path) {
     checkPathArg( path );
@@ -175,10 +182,17 @@
     return FileSystem.get(this.toUri(), conf);
   }
 
+  /**
+   *  True if the path component (i.e. directory) of this path is absolute.
+   */
+  public boolean isPathComponentAbsolute() {
+    int start = hasWindowsDrive(uri.getPath(), true) ? 3 : 0;
+    return uri.getPath().startsWith(SEPARATOR, start);
+   }
+  
   /** True if the directory of this path is absolute. */
   public boolean isAbsolute() {
-    int start = hasWindowsDrive(uri.getPath(), true) ? 3 : 0;
-    return uri.getPath().startsWith(SEPARATOR, start);
+     return isPathComponentAbsolute();
   }
 
   /** Returns the final component of this path.*/
@@ -265,29 +279,41 @@
     return depth;
   }
 
+  
+  /**
+   *  Returns a qualified path object.
+   *  
+   *  Deprecated - use {@link #makeQualified(URI, Path)}
+   */
+ 
+  @Deprecated 
+  public Path makeQualified(FileSystem fs) {
+    return makeQualified(fs.getUri(), fs.getWorkingDirectory());
+  }
+  
+  
   /** Returns a qualified path object. */
-  public Path makeQualified(FileSystem fs) {
+  public Path makeQualified(URI defaultUri, Path workingDir ) {
     Path path = this;
     if (!isAbsolute()) {
-      path = new Path(fs.getWorkingDirectory(), this);
+      path = new Path(workingDir, this);
     }
 
     URI pathUri = path.toUri();
-    URI fsUri = fs.getUri();
       
     String scheme = pathUri.getScheme();
     String authority = pathUri.getAuthority();
 
     if (scheme != null &&
-        (authority != null || fsUri.getAuthority() == null))
+        (authority != null || defaultUri.getAuthority() == null))
       return path;
 
     if (scheme == null) {
-      scheme = fsUri.getScheme();
+      scheme = defaultUri.getScheme();
     }
 
     if (authority == null) {
-      authority = fsUri.getAuthority();
+      authority = defaultUri.getAuthority();
       if (authority == null) {
         authority = "";
       }
Index: src/java/org/apache/hadoop/fs/FsConfig.java
===================================================================
--- src/java/org/apache/hadoop/fs/FsConfig.java	(revision 0)
+++ src/java/org/apache/hadoop/fs/FsConfig.java	(revision 0)
@@ -0,0 +1,105 @@
+package org.apache.hadoop.fs;
+
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+
+/** 
+ * This class is thin layer to manage the FS related keys in
+ * a configuration object.
+ * It provides convenience static method to set and get the keys from a 
+ * a configuration.
+ * 
+ * An instance object of FsConfig can be used to store and use the Fs config
+ * variables.
+ * 
+ * NOTE: setting a key in a config does not set that key to be the default
+ * for your application; it merely sets the key in the config object.
+ * To set the default for FS use the {@link Files} class.
+ *
+ */
+
+class FsConfig {
+  
+  // Configuration keys  and default values in the config file
+  // TBD note we should deprecate the keys constants elsewhere
+  
+  
+  // The Keys
+  static final String FS_DEFAULT_NAME_KEY = "fs.default.name";
+  static final String FS_HOME_DIR_KEY = "fs.homeDir";
+  static final String FS_REPLICATION_FACTOR_KEY = "dfs.replication";
+  static final String FS_BLOCK_SIZE_KEY = "dfs.block.size";
+  static final String IO_BUFFER_SIZE_KEY ="io.file.buffer.size";
+
+
+  // The default values
+  // Default values of SERVER_DEFAULT(-1) implies use the ones from
+  // the target file system where files are created.
+  static final String FS_DEFAULT_NAME = "file:///";
+  static final String FS_HOME_DIR = "/user"; // relative to FS_DEFAULT
+  static final short FS_DEFAULT_REPLICATION_FACTOR = 3;
+  static final long FS_DEFAULT_BLOCK_SIZE = 32 * 1024 * 1024;
+  static final int IO_BUFFER_SIZE =4096;
+  
+  
+  /**
+   * In addition the fs config has keys for the impl of different
+   * file systems of the form:
+   *    fs.<scheme>.impl
+   *    The value is a string specifying the class name.
+   */
+  
+  
+  public static String getDefaultFS(final Configuration conf) {
+    return conf.get(FS_DEFAULT_NAME_KEY, FS_DEFAULT_NAME);
+  }
+  
+  public static String getHomeDir(final Configuration conf) {
+    return conf.get(FS_HOME_DIR_KEY, FS_HOME_DIR);
+  }
+  
+  public static short getDefaultReplicationFactor(final Configuration conf) {
+    return (short) 
+        conf.getInt(FS_REPLICATION_FACTOR_KEY, FS_DEFAULT_REPLICATION_FACTOR);
+  }
+  
+  public static long getDefaultBlockSize(final Configuration conf) {
+    return conf.getLong(FS_BLOCK_SIZE_KEY, FS_DEFAULT_BLOCK_SIZE);
+  }
+
+  
+  public static int getDefaultIOBuffersize(final Configuration conf) {
+    return conf.getInt(IO_BUFFER_SIZE_KEY, IO_BUFFER_SIZE);
+  }
+  
+  public static Class<?> getImplClass(URI uri, Configuration conf) {
+    return conf.getClass("fs." + uri.getScheme() + ".impl", null);
+  }
+
+  
+  /**
+   * The Setters: see the note on the javdoc for the class above.
+   */
+
+  public static void setDefaultFS(final Configuration conf, String uri) {
+    conf.set(FS_DEFAULT_NAME_KEY, uri);
+  }
+  
+  public static void setHomeDir(final Configuration conf, String path) {
+    conf.set(FS_HOME_DIR_KEY, path);
+  }
+  
+  public static void setDefaultReplicationFactor(final Configuration conf, short rf) {
+        conf.setInt(FS_REPLICATION_FACTOR_KEY, rf);
+  }
+  
+  public static void setDefaultBlockSize(final Configuration conf, long bs) {
+    conf.setLong(FS_BLOCK_SIZE_KEY, bs);
+  }
+  
+  
+  public static void setDefaultIOBuffersize(final Configuration conf, int bs) {
+     conf.setInt(IO_BUFFER_SIZE_KEY, bs);
+  }
+}

Property changes on: src/java/org/apache/hadoop/fs/FsConfig.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

