Index: src/test/core/org/apache/hadoop/fs/FilesContextBaseTest.java
===================================================================
--- src/test/core/org/apache/hadoop/fs/FilesContextBaseTest.java	(revision 0)
+++ src/test/core/org/apache/hadoop/fs/FilesContextBaseTest.java	(revision 0)
@@ -0,0 +1,488 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.EnumSet;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+/**
+ * <p>
+ * A collection of tests for the {@link FileContext}.
+ * This test should be used for testing an instance of FileContext
+ *  that has been initialized to a specific default FileSystem such a
+ *  LocalFileSystem, HDFS,S3, etc.
+ * </p>
+ * <p>
+ * To test a given {@link FileSystem} implementation create a subclass of this
+ * test and override {@link #setUp()} to initialize the <code>fc</code> 
+ * {@link FileContext} instance variable.
+ * </p>
+ */
+public abstract class FilesContextBaseTest extends TestCase {
+  
+  protected FileContext fc;
+  private byte[] data = new byte[getBlockSize() * 2]; // two blocks of data
+  {
+    for (int i = 0; i < data.length; i++) {
+      data[i] = (byte) (i % 10);
+    }
+  }
+  
+  @Override
+  protected void tearDown() throws Exception {
+    fc.delete(path("/test"), true);
+  }
+  
+  protected int getBlockSize() {
+    return 1024;
+  }
+  
+  protected Path getDefaultWorkingDirectory() throws IOException {
+    return path("/user/" + System.getProperty("user.name")).makeQualified(
+              fc.getDefaultFileSystem().getUri(), fc.getWorkingDirectory());
+  }
+
+  protected boolean renameSupported() {
+    return true;
+  }
+
+
+  public void testFsStatus() throws Exception {
+    FsStatus fsStatus = fc.getFSStatus();
+    assertNotNull(fsStatus);
+    //used, free and capacity are non-negative longs
+    assertTrue(fsStatus.getUsed() >= 0);
+    assertTrue(fsStatus.getRemaining() >= 0);
+    assertTrue(fsStatus.getCapacity() >= 0);
+  }
+  
+  public void testWorkingDirectory() throws Exception {
+
+    Path workDir = getDefaultWorkingDirectory();
+    assertEquals(workDir, fc.getWorkingDirectory());
+
+    fc.setWorkingDirectory(path("."));
+    assertEquals(workDir, fc.getWorkingDirectory());
+
+    fc.setWorkingDirectory(path(".."));
+    assertEquals(workDir.getParent(), fc.getWorkingDirectory());
+
+    Path relativeDir = path("hadoop");
+    fc.setWorkingDirectory(relativeDir);
+    assertEquals(relativeDir, fc.getWorkingDirectory());
+    
+    Path absoluteDir = path("/test/hadoop");
+    fc.setWorkingDirectory(absoluteDir);
+    assertEquals(absoluteDir, fc.getWorkingDirectory());
+
+  }
+  
+  public void testMkdirs() throws Exception {
+    Path testDir = path("/test/hadoop");
+    assertFalse(fc.exists(testDir));
+    assertFalse(fc.isFile(testDir));
+
+    assertTrue(fc.mkdirs(testDir, FsPermission.getDefault()));
+
+    assertTrue(fc.exists(testDir));
+    assertFalse(fc.isFile(testDir));
+    
+    assertTrue(fc.mkdirs(testDir, FsPermission.getDefault()));
+
+    assertTrue(fc.exists(testDir));
+    assertFalse(fc.isFile(testDir));
+
+    Path parentDir = testDir.getParent();
+    assertTrue(fc.exists(parentDir));
+    assertFalse(fc.isFile(parentDir));
+
+    Path grandparentDir = parentDir.getParent();
+    assertTrue(fc.exists(grandparentDir));
+    assertFalse(fc.isFile(grandparentDir));
+    
+  }
+  
+  public void testMkdirsFailsForSubdirectoryOfExistingFile() throws Exception {
+    Path testDir = path("/test/hadoop");
+    assertFalse(fc.exists(testDir));
+    assertTrue(fc.mkdirs(testDir, FsPermission.getDefault()));
+    assertTrue(fc.exists(testDir));
+    
+    createFile(path("/test/hadoop/file"));
+    
+    Path testSubDir = path("/test/hadoop/file/subdir");
+    try {
+      fc.mkdirs(testSubDir, FsPermission.getDefault());
+      fail("Should throw IOException.");
+    } catch (IOException e) {
+      // expected
+    }
+    assertFalse(fc.exists(testSubDir));
+    
+    Path testDeepSubDir = path("/test/hadoop/file/deep/sub/dir");
+    try {
+      fc.mkdirs(testDeepSubDir, FsPermission.getDefault());
+      fail("Should throw IOException.");
+    } catch (IOException e) {
+      // expected
+    }
+    assertFalse(fc.exists(testDeepSubDir));
+    
+  }
+  
+  public void testGetFileStatusThrowsExceptionForNonExistentFile() 
+    throws Exception {
+    try {
+      fc.getFileStatus(path("/test/hadoop/file"));
+      fail("Should throw FileNotFoundException");
+    } catch (FileNotFoundException e) {
+      // expected
+    }
+  } 
+  
+  public void testListStatusThrowsExceptionForNonExistentFile() throws Exception {
+    try {
+      fc.listStatus(path("/test/hadoop/file"));
+      fail("Should throw FileNotFoundException");
+    } catch (FileNotFoundException fnfe) {
+      // expected
+    }
+  }
+  
+  public void testListStatus() throws Exception {
+    Path[] testDirs = { path("/test/hadoop/a"),
+                        path("/test/hadoop/b"),
+                        path("/test/hadoop/c/1"), };
+    assertFalse(fc.exists(testDirs[0]));
+
+    for (Path path : testDirs) {
+      assertTrue(fc.mkdirs(path, FsPermission.getDefault()));
+    }
+
+    FileStatus[] paths = fc.listStatus(path("/test"));
+    assertEquals(1, paths.length);
+    assertEquals(path("/test/hadoop"), paths[0].getPath());
+
+    paths = fc.listStatus(path("/test/hadoop"));
+    assertEquals(3, paths.length);
+    assertEquals(path("/test/hadoop/a"), paths[0].getPath());
+    assertEquals(path("/test/hadoop/b"), paths[1].getPath());
+    assertEquals(path("/test/hadoop/c"), paths[2].getPath());
+
+    paths = fc.listStatus(path("/test/hadoop/a"));
+    assertEquals(0, paths.length);
+  }
+  
+  public void testWriteReadAndDeleteEmptyFile() throws Exception {
+    writeReadAndDelete(0);
+  }
+
+  public void testWriteReadAndDeleteHalfABlock() throws Exception {
+    writeReadAndDelete(getBlockSize() / 2);
+  }
+
+  public void testWriteReadAndDeleteOneBlock() throws Exception {
+    writeReadAndDelete(getBlockSize());
+  }
+  
+  public void testWriteReadAndDeleteOneAndAHalfBlocks() throws Exception {
+    writeReadAndDelete(getBlockSize() + (getBlockSize() / 2));
+  }
+  
+  public void testWriteReadAndDeleteTwoBlocks() throws Exception {
+    writeReadAndDelete(getBlockSize() * 2);
+  }
+  
+  private void writeReadAndDelete(int len) throws IOException {
+    Path path = path("/test/hadoop/file");
+    
+    fc.mkdirs(path.getParent(), FsPermission.getDefault());
+
+    FSDataOutputStream out = fc.create(path, FsPermission.getDefault(),
+        EnumSet.of(CreateFlag.CREATE), FileContext.CONFIG_DEFAULT,
+        (short) 1, getBlockSize(), null);
+    out.write(data, 0, len);
+    out.close();
+
+    assertTrue("Exists", fc.exists(path));
+    assertEquals("Length", len, fc.getFileStatus(path).getLen());
+
+    FSDataInputStream in = fc.open(path, FileContext.CONFIG_DEFAULT);
+    byte[] buf = new byte[len];
+    in.readFully(0, buf);
+    in.close();
+
+    assertEquals(len, buf.length);
+    for (int i = 0; i < buf.length; i++) {
+      assertEquals("Position " + i, data[i], buf[i]);
+    }
+    
+    assertTrue("Deleted", fc.delete(path, false));
+    
+    assertFalse("No longer exists", fc.exists(path));
+
+  }
+  
+  public void testOverwrite() throws IOException {
+    Path path = path("/test/hadoop/file");
+    
+    fc.mkdirs(path.getParent(), FsPermission.getDefault());
+
+    createFile(path);
+    
+    assertTrue("Exists", fc.exists(path));
+    assertEquals("Length", data.length, fc.getFileStatus(path).getLen());
+    
+    try {
+      fc.create(path, FsPermission.getDefault(), EnumSet.of(CreateFlag.CREATE));
+      fail("Should throw IOException.");
+    } catch (IOException e) {
+      // Expected
+    }
+    
+    FSDataOutputStream out = fc.create(path, FsPermission.getDefault(),
+        EnumSet.of(CreateFlag.OVERWRITE));
+    out.write(data, 0, data.length);
+    out.close();
+    
+    assertTrue("Exists", fc.exists(path));
+    assertEquals("Length", data.length, fc.getFileStatus(path).getLen());
+    
+  }
+  
+  public void testWriteInNonExistentDirectory() throws IOException {
+    Path path = path("/test/hadoop/file");
+    assertFalse("Parent doesn't exist", fc.exists(path.getParent()));
+    createFile(path);
+    
+    assertTrue("Exists", fc.exists(path));
+    assertEquals("Length", data.length, fc.getFileStatus(path).getLen());
+    assertTrue("Parent exists", fc.exists(path.getParent()));
+  }
+
+  public void testDeleteNonExistentFile() throws IOException {
+    Path path = path("/test/hadoop/file");    
+    assertFalse("Doesn't exist", fc.exists(path));
+    assertFalse("No deletion", fc.delete(path, true));
+  }
+  
+  public void testDeleteRecursively() throws IOException {
+    Path dir = path("/test/hadoop");
+    Path file = path("/test/hadoop/file");
+    Path subdir = path("/test/hadoop/subdir");
+    
+    createFile(file);
+    assertTrue("Created subdir", fc.mkdirs(subdir, FsPermission.getDefault()));
+    
+    assertTrue("File exists", fc.exists(file));
+    assertTrue("Dir exists", fc.exists(dir));
+    assertTrue("Subdir exists", fc.exists(subdir));
+    
+    try {
+      fc.delete(dir, false);
+      fail("Should throw IOException.");
+    } catch (IOException e) {
+      // expected
+    }
+    assertTrue("File still exists", fc.exists(file));
+    assertTrue("Dir still exists", fc.exists(dir));
+    assertTrue("Subdir still exists", fc.exists(subdir));
+    
+    assertTrue("Deleted", fc.delete(dir, true));
+    assertFalse("File doesn't exist", fc.exists(file));
+    assertFalse("Dir doesn't exist", fc.exists(dir));
+    assertFalse("Subdir doesn't exist", fc.exists(subdir));
+  }
+  
+  public void testDeleteEmptyDirectory() throws IOException {
+    Path dir = path("/test/hadoop");
+    assertTrue(fc.mkdirs(dir, FsPermission.getDefault()));
+    assertTrue("Dir exists", fc.exists(dir));
+    assertTrue("Deleted", fc.delete(dir, false));
+    assertFalse("Dir doesn't exist", fc.exists(dir));
+  }
+  
+  public void testRenameNonExistentPath() throws Exception {
+    if (!renameSupported()) return;
+    Path src = path("/test/hadoop/NonExistingPath");
+    Path dst = path("/test/new/newpath");
+    try {
+      boolean res = fc.rename(src, dst);
+      System.out.println("Rename of non-existing file returned " + res);
+      assertTrue("rename of non existing path should have failed", false);
+    } catch (Exception e) {
+      // expected
+    }
+  }
+
+  public void testRenameFileMoveToNonExistentDirectory() throws Exception {
+    if (!renameSupported()) return;
+    
+    Path src = path("/test/hadoop/file");
+    createFile(src);
+    Path dst = path("/test/NonExisting/foo");
+    rename(src, dst, true, true, false);
+  }
+
+  public void testRenameFileMoveToExistingDirectory() throws Exception {
+    if (!renameSupported()) return;
+    
+    Path src = path("/test/hadoop/file");
+    createFile(src);
+    Path dst = path("/test/new/newfile");
+    fc.mkdirs(dst.getParent(), FsPermission.getDefault());
+    rename(src, dst, true, false, true);
+  }
+
+  public void testRenameFileAsExistingFile() throws Exception {
+    if (!renameSupported()) return;
+    
+    Path src = path("/test/hadoop/file");
+    createFile(src);
+    Path dst = path("/test/new/newfile");
+    createFile(dst);
+    rename(src, dst, false, true, true);
+  }
+
+  public void testRenameFileAsExistingDirectory() throws Exception {
+    if (!renameSupported()) return;
+    
+    Path src = path("/test/hadoop/file");
+    createFile(src);
+    Path dst = path("/test/new/newdir");
+    fc.mkdirs(dst, FsPermission.getDefault());
+    rename(src, dst, true, false, true);
+    assertTrue("Destination changed",
+        fc.exists(path("/test/new/newdir/file")));
+  }
+  
+  public void testRenameDirectoryMoveToNonExistentDirectory() 
+    throws Exception {
+    if (!renameSupported()) return;
+    
+    Path src = path("/test/hadoop/dir");
+    fc.mkdirs(src, FsPermission.getDefault());
+    Path dst = path("/test/new/newdir");
+    rename(src, dst, true, true, false);
+  }
+  
+  public void testRenameDirectoryMoveToExistingDirectory() throws Exception {
+    if (!renameSupported()) return;
+    
+    Path src = path("/test/hadoop/dir");
+    fc.mkdirs(src, FsPermission.getDefault());
+    createFile(path("/test/hadoop/dir/file1"));
+    createFile(path("/test/hadoop/dir/subdir/file2"));
+    
+    Path dst = path("/test/new/newdir");
+    fc.mkdirs(dst.getParent(), FsPermission.getDefault());
+    rename(src, dst, true, false, true);
+    
+    assertFalse("Nested file1 exists",
+        fc.exists(path("/test/hadoop/dir/file1")));
+    assertFalse("Nested file2 exists",
+        fc.exists(path("/test/hadoop/dir/subdir/file2")));
+    assertTrue("Renamed nested file1 exists",
+        fc.exists(path("/test/new/newdir/file1")));
+    assertTrue("Renamed nested exists",
+        fc.exists(path("/test/new/newdir/subdir/file2")));
+  }
+  
+  public void testRenameDirectoryAsExistingFile() throws Exception {
+    if (!renameSupported()) return;
+    
+    Path src = path("/test/hadoop/dir");
+    fc.mkdirs(src, FsPermission.getDefault());
+    Path dst = path("/test/new/newfile");
+    createFile(dst);
+    rename(src, dst, false, true, true);
+  }
+  
+  public void testRenameDirectoryAsExistingDirectory() throws Exception {
+    if (!renameSupported()) return;
+    
+    Path src = path("/test/hadoop/dir");
+    fc.mkdirs(src, FsPermission.getDefault());
+    createFile(path("/test/hadoop/dir/file1"));
+    createFile(path("/test/hadoop/dir/subdir/file2"));
+    
+    Path dst = path("/test/new/newdir");
+    fc.mkdirs(dst, FsPermission.getDefault());
+    rename(src, dst, true, false, true);
+    assertTrue("Destination changed",
+        fc.exists(path("/test/new/newdir/dir")));    
+    assertFalse("Nested file1 exists",
+        fc.exists(path("/test/hadoop/dir/file1")));
+    assertFalse("Nested file2 exists",
+        fc.exists(path("/test/hadoop/dir/subdir/file2")));
+    assertTrue("Renamed nested file1 exists",
+        fc.exists(path("/test/new/newdir/dir/file1")));
+    assertTrue("Renamed nested exists",
+        fc.exists(path("/test/new/newdir/dir/subdir/file2")));
+  }
+
+  public void testInputStreamClosedTwice() throws IOException {
+    //HADOOP-4760 according to Closeable#close() closing already-closed 
+    //streams should have no effect. 
+    Path src = path("/test/hadoop/file");
+    createFile(src);
+    FSDataInputStream in = fc.open(src, FileContext.CONFIG_DEFAULT);
+    in.close();
+    in.close();
+  }
+  
+  public void testOutputStreamClosedTwice() throws IOException {
+    //HADOOP-4760 according to Closeable#close() closing already-closed 
+    //streams should have no effect. 
+    Path src = path("/test/hadoop/file");
+    FSDataOutputStream out = fc.create(src, FsPermission.getDefault(),
+        EnumSet.of(CreateFlag.CREATE));
+    out.writeChar('H'); //write some data
+    out.close();
+    out.close();
+  }
+  
+  protected Path path(String pathString) {
+    return fc.makeQualified(new Path(pathString));
+  }
+  
+  protected void createFile(Path path) throws IOException {
+    FSDataOutputStream out = fc.create(path, FsPermission.getDefault(),
+        EnumSet.of(CreateFlag.CREATE));
+    out.write(data, 0, data.length);
+    out.close();
+  }
+  
+  private void rename(Path src, Path dst, boolean renameShouldSucceed,
+      boolean srcExists, boolean dstExists) throws IOException {
+    assertEquals("Rename result", renameShouldSucceed, fc.rename(src, dst));
+    assertEquals("Source exists", srcExists, fc.exists(src));
+    assertEquals("Destination exists", dstExists, fc.exists(dst));
+  }
+}

Property changes on: src/test/core/org/apache/hadoop/fs/FilesContextBaseTest.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Index: src/test/core/org/apache/hadoop/fs/TestLocalFilesContext.java
===================================================================
--- src/test/core/org/apache/hadoop/fs/TestLocalFilesContext.java	(revision 0)
+++ src/test/core/org/apache/hadoop/fs/TestLocalFilesContext.java	(revision 0)
@@ -0,0 +1,23 @@
+package org.apache.hadoop.fs;
+
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Before;
+
+public class TestLocalFilesContext extends FilesContextBaseTest {
+
+  @Before
+  @Override
+  protected void setUp() throws Exception {
+    fc = FileContext.getLocalFSFileContext();
+  }
+  
+  static Path wd = null;
+  protected Path getDefaultWorkingDirectory() throws IOException {
+    if (wd == null)
+      wd = FileSystem.getLocal(new Configuration()).getWorkingDirectory();
+    return wd;
+  }
+}

Property changes on: src/test/core/org/apache/hadoop/fs/TestLocalFilesContext.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Index: src/java/org/apache/hadoop/fs/FileSystem.java
===================================================================
--- src/java/org/apache/hadoop/fs/FileSystem.java	(revision 809454)
+++ src/java/org/apache/hadoop/fs/FileSystem.java	(working copy)
@@ -1141,6 +1141,23 @@
    * @return the directory pathname
    */
   public abstract Path getWorkingDirectory();
+  
+  
+  /**
+   * Note: with the new FilesContext class, getWorkingDirectory()
+   * will be removed. 
+   * The working directory is implemented in FilesContext.
+   * 
+   * Some file systems like LocalFileSystem have an initial workingDir
+   * that we use as the starting workingDir. For other file systems
+   * like HDFS there is no built in notion of an inital workingDir.
+   * 
+   * @return if there is built in notion of workingDir then it
+   * is returned; else a null is returned.
+   */
+  protected Path getInitialWorkingDirectory() {
+    return null;
+  }
 
   /**
    * Call {@link #mkdirs(Path, FsPermission)} with default permission.
Index: src/java/org/apache/hadoop/fs/FilterFileSystem.java
===================================================================
--- src/java/org/apache/hadoop/fs/FilterFileSystem.java	(revision 809454)
+++ src/java/org/apache/hadoop/fs/FilterFileSystem.java	(working copy)
@@ -167,7 +167,11 @@
   public Path getWorkingDirectory() {
     return fs.getWorkingDirectory();
   }
-
+  
+  protected Path getInitialWorkingDirectory() {
+    return fs.getInitialWorkingDirectory();
+  }
+  
   /** {@inheritDoc} */
   @Override
   public FsStatus getStatus(Path p) throws IOException {
Index: src/java/org/apache/hadoop/fs/RawLocalFileSystem.java
===================================================================
--- src/java/org/apache/hadoop/fs/RawLocalFileSystem.java	(revision 809454)
+++ src/java/org/apache/hadoop/fs/RawLocalFileSystem.java	(working copy)
@@ -46,7 +46,7 @@
   private Path workingDir;
   
   public RawLocalFileSystem() {
-    workingDir = new Path(System.getProperty("user.dir")).makeQualified(this);
+    workingDir = getInitialWorkingDirectory();
   }
   
   /** Convert a path to a File. */
@@ -339,6 +339,11 @@
   public Path getWorkingDirectory() {
     return workingDir;
   }
+  
+  @Override
+  protected Path getInitialWorkingDirectory() {
+    return new Path(System.getProperty("user.dir")).makeQualified(this);
+  }
 
   /** {@inheritDoc} */
   @Override
Index: src/java/org/apache/hadoop/fs/FileContext.java
===================================================================
--- src/java/org/apache/hadoop/fs/FileContext.java	(revision 0)
+++ src/java/org/apache/hadoop/fs/FileContext.java	(revision 0)
@@ -0,0 +1,1245 @@
+package org.apache.hadoop.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * The FileContext class provides an interface to the application writer for
+ * using the Hadoop file system.
+ * It provides a set of static methods for the usual operation: create, open, 
+ * list, etc 
+ * 
+ * *** Path Names ***
+ * 
+ * The Hadoop file system supports a URI name space and URI names.
+ *   TBD - explain more here.
+ * Two common Hadoop file systems implementations are
+ *   the local file system: file:///path
+ *   the hdfs file system hdfs://nnAddress:nnPort/path
+ *   
+ * While using URI names is very flexible, it requires knowing the name or address
+ * of the server. For convenience one often wants to access the default system
+ * in your environment; to facilitate this Hadoop supports a notion of a
+ * default file system. The user can set what his default file system, although
+ * this is typically set up for you in your enviroment in your default config.
+ * A default filesystem imples a  default scheme and authority;
+ * slash-relative names (such as /for/bar) are resolved relative to that default FS.
+ * Similarly a user can also have working-directory-relative names (i.e. names
+ * not starting with a slash). While the working directory is generally in the
+ * same default FS, the wd can be in a different FS; in particular, changing
+ * the default file system DOES NOT change the working directory,
+ * 
+ *  Hence Hadoop path names can be one of:
+ *       fully qualified URI:  scheme://authority/path
+ *       slash relative names: /path    - relative to the default file system
+ *       wd-relative names:    path        - relative to the working dir
+ *       
+ *       Relative paths with scheme (scheme:foo/bar) are illegal
+ *  
+ *  
+ *  
+ *  *** Configuration and defaults: ***
+ *  The default configuration is obtained from the application config
+ *  (@see xxx for details).
+ *  The config contains defaults for
+ *    - default file system
+ *    - the home directory (default is "/user/<userName>")
+ *    - replication factor
+ *    - block size
+ *    - buffer size
+ *
+ * These defaults can be overridden by specifying the parameters as part of the
+ * the specific methods (such as create) or by setting them via the appropriate
+ * setDefaultXX methods below.
+ * 
+ * *** Usage Model for the File class ***
+ * 
+ * Example 1: use the default config read from the $HADOOP_CONFIG/core.xml.
+ *            Unspecified values come from core-defaults.xml in the release jar.
+ *            
+ *     myFiles = getFileContext(); // uses the default config 
+ *     myFiles.create(path, ...);
+ *     myFiles.setWorkingDir(path)
+ *     myFiles.open (path, ...);
+ *     myFiles.setDefaultReplicationFactor(..); // Can change some defaults     
+ *     
+ * Example 2: Use a specific config, ignoring $HADOOP_CONFIG
+ *    configX = someConfigSomeOnePassedToYou.
+ *    myFiles = getFileContext(theConfig); // Copies configX (not a pointer) 
+ *    myFiles.create(path, ...);
+ *    myFiles.setWorkingDir(path)
+ *    myFiles.setDefaultReplicationFactor(..); // Can change some defaults
+ *                                             // But configX is unchanged
+ *                                             
+ * Other ways of creating new FileContexts:
+ *   getLocalFSFileContext(...)  // local filesystem is the default FS
+ *   getLocalFileContext(URI, ...) // where specified URI is default FS.
+ *    
+ */
+
+/**
+ * TBD: The methods have not been filled out yet.
+ * Also the exceptions thrown by the methods have better specified.
+ */
+public class FileContext {
+  
+  /**
+   * The config variables that define the context in which this FileContext
+   * operates
+   *
+   */   
+  private URI defaultFsURI;
+  private FileSystem defaultFS; // the default FS for this FileContext.
+  private Path workingDir;          // Fully qualified
+  private long defaultBlockSize;
+  private short defaultReplicationFactor;
+  private int defaultBufferSize;// TBD DOES THIS REALLY BELONG HERE?
+  private Configuration theConfig;
+   
+  
+  private void initConfig(final Configuration conf) throws IOException {
+    // Extract the keys relavant to the FileContext layer;
+    // However keep pointer to the rest of the config since it needs to 
+    // be passed to the lower layer FileSystem for config variables it
+    // needs.
+      theConfig = conf;
+      defaultFsURI = URI.create(FsConfig.getDefaultFS(conf));
+      defaultFS = FileSystem.get(defaultFsURI,  conf);
+      
+      /*
+       * WorkingDir is implemented at the FileContext layer 
+       * NOT at the FileSystem layer. 
+       * If the DefaultFS, such as localFilesystem has a notion of
+       *  builtin WD, we use that as the initial WD.
+       *  Otherwise the WD is initialized to the home directory.
+       */
+      workingDir = defaultFS.getInitialWorkingDirectory();
+      if (workingDir == null) {
+        workingDir = defaultFS.getHomeDirectory();
+      }
+      
+      defaultBlockSize = FsConfig.getDefaultBlockSize(conf);
+      defaultBufferSize = FsConfig.getDefaultIOBuffersize(conf);
+      defaultReplicationFactor = FsConfig.getDefaultReplicationFactor(conf);
+  }
+    
+  private Path makeAbsolute(Path f) {
+    if (f.isAbsolute()) {
+      return f;
+    } else {
+      return new Path(workingDir, f);
+    }
+  }
+
+  /**
+   * Pathnames with scheme and relative path are illegal
+   * @param path to be checked
+   * @throws IOException if of type scheme:foo/bar
+   */
+  private static void isNotSchemeWithRelative(final Path path) throws IOException {
+    if (path.toUri().isAbsolute() && !path.isPathComponentAbsolute()) {
+      // path of type scheme:foo/bar are not supported
+      throw new IllegalArgumentException(
+          "Unsupported name: has scheme but relative path-part");
+    }
+  }
+  
+  /**
+   * Get the filesystem of supplied path
+   * @param f
+   * @return the filesystem of the path
+   * @throws IOException
+   */
+  private FileSystem getFileSystem(final Path f) throws IOException {
+    isNotSchemeWithRelative(f);
+    try { 
+      // Is it the default FS for this Fils?
+      defaultFS.checkPath(f);
+      return defaultFS;
+    } catch (Exception e) { // it is different FileSystem
+      return FileSystem.get(f.toUri(), theConfig);
+    }
+  }
+  /**
+   * This constructor uses the default config read from the
+   * $HADOOP_CONFIG/core.xml,
+   * Unspecified key-values for config are defaulted from core-defaults.xml
+   * in the release jar.
+   * Unspecified key values for config are defaulted from core-defaults.xml in the release jar.
+   * 
+   * @param conf - the config used.
+   * @throws IOException  if default FileSystem in the config  cannot be created
+   */
+  public FileContext() throws IOException {
+     this(new Configuration());
+  };
+  
+  /**
+   * This constructor uses the passed config. The keys relavant to the
+   * FileContext layer are extracted at time of construction. Changes to the
+   * config after the call are ignore by the FileContext layer. 
+   * The conf is passed to lower layers like FileSystem and HDFS which
+   * pick up their own config variables. 
+   * Unspecified key values for config are defaulted from core-defaults.xml in the release jar.
+   * 
+   * @param conf - the config used.
+   * @throws IOException  if default FileSystem in the config  cannot be created
+   */
+  public FileContext(final Configuration conf) throws IOException {
+    initConfig(conf);
+    util = new Util(this); // for the inner class
+  };
+  
+
+  public static final URI LOCAL_FS_URI = URI.create("file:///");
+  
+  /**
+   * To use the server side defaults for Replication factor and block size
+   * specify SERVER_DEFAULT.
+   */
+  public static final short SERVER_DEFAULT = -1;
+  /**
+   * To use the config  defaults for buff size, Replication factor & block size
+   * specify CONFIG_DEFAULT.
+   */
+  public static final short CONFIG_DEFAULT = -2;
+  
+  public static final FsPermission defaultPerm = FsPermission.getDefault();
+  
+  
+  
+  /**
+   * Create a FileContext using the default config read from the
+   * $HADOOP_CONFIG/core.xml,
+   * Unspecified key-values for config are defaulted from core-defaults.xml
+   * in the release jar.
+   * 
+   * @throws IOException if default FileSystem in the config  cannot be created
+   */
+  public static FileContext getFileContext() throws IOException {
+    return new FileContext();
+  }
+  
+  /**
+   * Create a FileContext using the passed config.
+   * The keys relavant to the
+   * FileContext layer are extracted at time of construction. Changes to the
+   * config after the call are ignore by the FileContext layer. 
+   * The conf is passed to lower layers like FileSystem and HDFS which
+   * pick up their own config variables. 
+   * Unspecified key values for config are defaulted from core-defaults.xml
+   * in the release jar.
+   * 
+   * @param conf
+   * @return new FileContext
+   * @throws IOException  if default FileSystem in the config  cannot be created
+   */
+  public static FileContext getFileContext(final Configuration conf) throws IOException {
+    return new FileContext(conf);
+  }
+  
+  /**
+   * 
+   * @return a FileContext for the local file system using the default config.
+   * @throws IOException 
+   */
+  public static FileContext getLocalFSFileContext() throws IOException {
+    return getFileContext(LOCAL_FS_URI);
+  }
+  
+  
+  /**
+   * 
+   * @return a FileContext for the local file system using the specified config.
+   * @throws IOException 
+   */
+  public static FileContext getLocalFSFileContext(final Configuration conf)
+                                                        throws IOException {
+    return getFileContext(LOCAL_FS_URI, conf);
+  }
+  
+  /**
+   * Create a FileContext for specified URI using the default config.
+   * 
+   * @param uri
+   * @return a FileSystem for the specified URI
+   * @throws IOException if the filesysem with specified cannot be created
+   */
+  public static FileContext getFileContext(final URI uri) throws IOException {
+    FileContext fc = new FileContext();
+    fc.setDefaultFileSystem(uri);
+    return fc;
+  }
+ 
+  /**
+   * Create a FileContext for specified URI using the specified config.
+   * 
+   * @param uri
+   * @param config
+   * @return a FileSystem for the specified URI
+   * @throws IOException if the filesysem with specified cannot be created
+   */
+  public static FileContext getFileContext(final URI uri,
+                    final Configuration conf) throws IOException {
+    FileContext fc = new FileContext(conf);
+    fc.setDefaultFileSystem(uri);
+    return fc;
+  }
+
+
+  /**
+   * Slash-relative pathnames are opened relative to the default file system.
+   * 
+   * This method sets the default file system, overiding what was derived from
+   * the config file.
+   * 
+   * @param uri - the new default file system 
+   * 
+   * TBD
+   *   Q: Should the type be string or path instead?
+   *   Q: Shall we call this "setRootFileSystem" instead?
+   *   
+   * @throws IOException if FileSystem denoted by uri cannot be created
+   */
+  public void setDefaultFileSystem(final URI uri) throws IOException {
+    defaultFsURI = uri;
+    defaultFS = FileSystem.get(defaultFsURI,  theConfig);
+  }
+  
+  public FileSystem getDefaultFileSystem() {
+    return defaultFS;
+  }
+  
+  /**
+   * Set the working directory for wd-relative names (such a "foo/bar")
+   * @param p
+   * @throws IOException
+   * 
+   * newWdir can be one of 
+   *     - relative path:  "foo/bar";
+   *     - absolute without scheme: "/foo/bar"
+   *     - fully qualified with scheme: "xx://auth/foo/bar"
+   *  Illegal WDs:
+   *      - relative with scheme: "xx:foo/bar"
+   *      
+   *   Set the wd to the absolute path.
+   *   This will ensure that the defaultFS can be changed while leaving
+   *   the wd unchanged.   
+   */
+  public void setWorkingDirectory(final Path p) throws IOException {
+    isNotSchemeWithRelative(p);
+    workingDir =  new Path(new Path(defaultFsURI), p);
+  }
+  
+  
+  /**
+   * Gets the working directory for wd-relative names (such a "foo/bar")
+   */
+  public Path getWorkingDirectory() {
+    return workingDir;
+  }
+  
+  /**
+   * Sets the default block size for newly created files, overriding what
+   * was derived from the config file.
+   * 
+   * @param blksize  
+   *     value of SERVER_DEFAULT implies use the default block size of the
+   * target file server at which the file is created.
+   * 
+   * 
+   */
+  public void setDefaultBlockSize(final long blksize) {
+    if (blksize >= 1 || blksize == SERVER_DEFAULT)
+      defaultBlockSize = blksize;
+    else 
+      throw new IllegalArgumentException();
+  }
+  
+  /**
+   * Gets the default block size for newly created files.
+   */
+  public long getDefaultBlockSize() {
+    return defaultBlockSize;
+  }
+  
+  /**
+   * Sets the default replication factor newly created files, overriding what
+   * was derived from the config file.
+   * 
+   * @param repFac
+   *     value of SERVER_DEFAULT  means  use the default replication factor
+   * of the target file server at which the file is created.
+   */
+  public void setDefaultReplicationFactor(final short repFac) {
+    if (repFac >= 1 || repFac == SERVER_DEFAULT)
+      defaultReplicationFactor = repFac;
+    else 
+      throw new IllegalArgumentException();
+  }
+  
+  /**
+   * Gets the default replciation factor for newly created files.
+   */
+  public short getDefaultReplicationFactor() {
+    return defaultReplicationFactor;
+  }
+  
+  
+  /**
+   * Sets the default buffer size for streams returned by #create() and #open(),
+   *  overriding what was derived from the config file.
+   * 
+   * @param bufsize  
+   *     value of SERVER_DEFAULT implies use the default block size of the
+   * target file server at which the file is created.
+   */
+  public void setDefaultBufferSize(final int bufsize) {
+
+    if (bufsize >= 1 || bufsize == SERVER_DEFAULT)
+      defaultBufferSize = bufsize;
+    else 
+      throw new IllegalArgumentException();
+  }
+  
+  /**
+   * Gets the default buffer size for newly created files.
+   */
+  public int getDefaultBufferSize() {
+    return defaultBufferSize;
+  }
+  
+  /**
+   * Make the path fully qualified if it is isn't. Use the default file system
+   * and working dir in this FileContext to qualify.
+   * @param p
+   * @return qualifed path
+   */
+  public Path makeQualified(final Path path) {
+    return path.makeQualified(defaultFsURI, getWorkingDirectory());
+  }
+  
+  /**
+   * This method exports the default config.
+   * 
+   * The following keys are exported:
+   * 
+   * 
+   * @param exportIntoThis - the key and values of the default config are
+   *   inserted into exportIntoThis. If it contained the same keys their values
+   *   would be replaced, other wise new keys and values would be inserted.
+   */
+  public void exportConfig(Configuration exportIntoThis) {
+    FsConfig.setDefaultFS(exportIntoThis, defaultFsURI.toString());
+    FsConfig.setDefaultBlockSize(exportIntoThis, defaultBlockSize);
+    FsConfig.setDefaultReplicationFactor(exportIntoThis, defaultReplicationFactor);
+    FsConfig.setDefaultIOBuffersize(exportIntoThis, defaultBufferSize);
+  }
+  
+  
+  /**
+   * Create or overwrite file on indicated path and returns an output stream
+   *   for writing into the file.
+   * @param f the file name to open
+   * @param permission - permissions is set permission&~umask
+   * @param createFlag gives the semantics  of create: overwrite, append etc.
+   * @throws IOException
+   * 
+   * @see #setPermission(Path, FsPermission)
+   */
+  public FSDataOutputStream create(final Path f, final FsPermission permission,
+               final EnumSet<CreateFlag> createFlag) throws IOException {
+    return create(f, permission, createFlag, defaultBufferSize, 
+        getDefaultReplicationFactor(), getDefaultBlockSize(), null);
+  }
+  
+  /**
+   * Create or overwrite file on indicated path and returns an output stream
+   *   for writing into the file.
+   * @param f the file name to open
+   * @param permission - permissions is set permission&~umask
+   * @param flag gives the semantics  of create: overwrite, append etc.
+   * @param bufferSize the size of the buffer;
+   *      value of CONFIG_DEFAULT implies use default in config
+   * @param replication  block replication for file
+   *      value of SERVER_DEFAULT implies use S-Side default
+   *      value of CONFIG_DEFAULT implies use default in config
+   * @param blockSize
+   *      value of SERVER_DEFAULT implies use S-Side default
+   *      value of CONFIG_DEFAULT implies use default in config
+   * @param progress if non null, used to report progress
+   * @throws IOException
+   * 
+   * @see #setPermission(Path, FsPermission)
+   */
+  public FSDataOutputStream create(final Path f,
+                                    final FsPermission permission,
+                                    final EnumSet<CreateFlag> createFlag,
+                                    final int bufferSize,
+                                    final short replication,
+                                    final long blockSize,
+                                    final Progressable progress)
+                                                  throws IOException {
+    Path abs_f = makeAbsolute(f);
+    return getFileSystem(abs_f).create(abs_f, permission, createFlag, 
+        (bufferSize == CONFIG_DEFAULT) ? 
+            getDefaultBufferSize() : bufferSize, 
+        (replication == CONFIG_DEFAULT) ? 
+            getDefaultReplicationFactor() : replication,
+        (blockSize == CONFIG_DEFAULT) ? 
+                getDefaultBlockSize() : blockSize,
+            progress);
+  }
+  
+  /**
+   * Make the given file and all non-existent parents into
+   * directories. Has the semantics of Unix 'mkdir -p'.
+   * Existence of the directory hierarchy is not an error.
+   * 
+   * @param dir - the dir to make
+   * @param permission - permissions is set permission&~umask
+   * @return true if the operation succeeds
+   * @throws IOException
+   */
+  public boolean mkdirs(final Path dir, final FsPermission permission)
+                                                      throws IOException {
+    Path abs_dir = makeAbsolute(dir);
+    return getFileSystem(abs_dir).mkdirs(abs_dir, permission);
+  }
+
+  /** Delete a file.
+  *
+  * @param f the path to delete.
+  * @param recursive if path is a directory and set to 
+  * true, the directory is deleted else throws an exception. In
+  * case of a file the recursive can be set to either true or false. 
+  * @return  true if delete is successful else false. 
+  * @throws IOException
+  */
+  public boolean delete(final Path f, final boolean recursive) 
+                                                  throws IOException {
+    Path abs_f = makeAbsolute(f);
+    return getFileSystem(abs_f).delete(abs_f, recursive);
+  }
+ 
+  
+  /**
+   * Opens an FSDataInputStream at the indicated Path.
+   * @param f the file name to open
+   * @param bufferSize the size of the buffer to be used.
+   *     value of CONFIG_DEFAULT implies use default in config
+   */
+  public FSDataInputStream open(final Path f, final int bufferSize)
+    throws IOException {
+    final Path abs_f = makeAbsolute(f);
+      return getFileSystem(abs_f).open(abs_f, (bufferSize == CONFIG_DEFAULT) ?
+                        getDefaultBufferSize() : bufferSize);
+  }
+  
+ /**
+  * Set replication for an existing file.
+  * 
+  * @param f file name
+  * @param replication new replication
+  * @throws IOException
+  * @return true if successful;
+  *         false if file does not exist or is a directory
+  */
+  public boolean setReplication(final Path f, final short replication)
+   throws IOException {
+    final Path abs_f = makeAbsolute(f);
+    return getFileSystem(abs_f).setReplication(abs_f, replication);
+  }
+
+ /**
+  * Renames Path src to Path dst.  
+   * @param src
+   * @param dst
+   * @return true if the rename suceeds
+   * @throws IOException
+   */
+  public boolean rename(final Path src, final Path dst)
+                                                throws IOException {
+    final Path abs_src  = makeAbsolute(src);
+    final Path abs_dst = makeAbsolute(dst);
+    FileSystem srcFS = getFileSystem(abs_src);
+    FileSystem dstFS = getFileSystem(abs_dst);
+    if(srcFS.getUri().equals(dstFS.getUri())) {
+      return srcFS.rename(abs_src, abs_dst);
+    }
+    throw new IOException("Renames across FileSystems not supported");
+  }
+  
+  /**
+   * Set permission of a path.
+   * @param f
+   * @param permission
+   */
+  public void setPermission(final Path f, final FsPermission permission)
+                                                      throws IOException {
+    final Path abs_f = makeAbsolute(f);
+    getFileSystem(abs_f).setPermission(abs_f, permission);
+  }
+
+  /**
+   * Set owner of a path (i.e. a file or a directory).
+   * The parameters username and groupname cannot both be null.
+   * @param f The path
+   * @param username If it is null, the original username remains unchanged.
+   * @param groupname If it is null, the original groupname remains unchanged.
+   */
+  public void setOwner(final Path f, final String username,
+                        final String groupname) throws IOException {
+    final Path abs_f = makeAbsolute(f);
+    getFileSystem(abs_f).setOwner(abs_f, username, groupname);
+  }
+
+
+  /**
+   * Set access time of a file
+   * @param f The path
+   * @param mtime Set the modification time of this file.
+   *              The number of milliseconds since Jan 1, 1970. 
+   *              A value of -1 means that this call should not set modification time.
+   * @param atime Set the access time of this file.
+   *              The number of milliseconds since Jan 1, 1970. 
+   *              A value of -1 means that this call should not set access time.
+   */
+  public void setTimes(final Path f, final long mtime, final long atime
+                                                      ) throws IOException {
+    final Path abs_f = makeAbsolute(f);
+    getFileSystem(abs_f).setTimes(abs_f, mtime, atime);
+  }
+
+
+  /**
+   * Get the checksum of a file.
+   *
+   * @param f The file path
+   * @return The file checksum.  The default return value is null,
+   *  which indicates that no checksum algorithm is implemented
+   *  in the corresponding FileSystem.
+   */
+  public FileChecksum getFileChecksum(final Path f) throws IOException {
+    final Path abs_f = makeAbsolute(f);
+    return getFileSystem(abs_f).getFileChecksum(abs_f);
+  }
+  
+  /**
+   * Set the verify checksum flag for the default filesystem.
+   *  This is only applicable if the 
+   * corresponding FileSystem supports checksum. By default doesn't do anything.
+   * @param verifyChecksum
+   * @param f - set the verifyChecksum for the Filesystem holding this path
+   */
+  public void setVerifyChecksum(final boolean verifyChecksum) {
+    defaultFS.setVerifyChecksum(verifyChecksum);
+  }
+
+  
+  /**
+   * Set the verify checksum flag for the  filesystem denoted by the path.
+   *  This is only applicable if the 
+   * corresponding FileSystem supports checksum. By default doesn't do anything.
+   * @param verifyChecksum
+   * @param f - set the verifyChecksum for the Filesystem containing this path
+   * @throws IOException 
+   */
+  public void setVerifyChecksum(final boolean verifyChecksum, final Path f)
+                                                          throws IOException {
+    final Path abs_f = makeAbsolute(f);
+    //TBD need to be implemented when we add symlinks.
+    throw new IOException("Not implemented yet");
+  }
+
+  /**
+   * Return a file status object that represents the path.
+   * @param f The path we want information from
+   * @return a FileStatus object
+   * @throws FileNotFoundException when the path does not exist;
+   *         IOException see specific implementation
+   */
+  public FileStatus getFileStatus(final Path f) throws IOException {
+    final Path abs_f = makeAbsolute(f);
+    return getFileSystem(abs_f).getFileStatus(abs_f);
+  }
+  
+  
+  
+  /**
+   * Return block Location of the given file for the given ofgset and len
+   *  For a nonexistent 
+   * file or regions, null will be returned.
+   *
+   * This call is most helpful with DFS, where it returns 
+   * hostnames of machines that contain the given file.
+   * 
+   * @param fileStatus - get blocklocations of this file
+   * @param start
+   * @param len
+   * @return
+   * @throws IOException
+   */
+  public BlockLocation[] getFileBlockLocations(final FileStatus fileStatus, 
+    final long start, final long len) throws IOException {
+      // TBD - this is wrong - need to make absolute.
+      // Perhaps FileSystem.getBlockLocaltions needs to be fixed.
+      throw new IOException("not implemented yet");
+      //return getFileSystem(fileStatus.getPath()).getFileBlockLocations(fileStatus, start, len);
+  }
+  
+  /**
+   * Returns a status object describing the use and capacity of the
+   * default file system. If the file system has multiple partitions, the
+   * use and capacity of the root partition is reflected.
+   * 
+   * @return a FsStatus object
+   * @throws IOException
+   *           
+   */
+  public FsStatus getFSStatus() throws IOException {
+    return getDefaultFileSystem().getStatus(null);
+  }
+  
+  /**
+   * Returns a status object describing the use and capacity of the
+   * file system denoted by the Parh argument p.
+   * If the file system has multiple partitions, the
+   * use and capacity of the partition pointed to by the specified
+   * path is reflected.
+   * @param p Path for which status should be obtained. null means of the
+   * root partition of the default file system. 
+   * @return a FsStatus object
+   * @throws IOException
+   *           see specific implementation
+   */
+  public FsStatus getFSStatus(final Path f) throws IOException {
+    return getFileSystem(f).getStatus(f);
+  }
+  
+  /**
+   * Does the file exist?
+   * @param f the  file or dir to be checked
+   */
+  public boolean exists(final Path f) throws IOException {
+    final Path abs_f = makeAbsolute(f);
+    return getFileSystem(abs_f).exists(abs_f);
+  }
+
+  /**
+   * Is a directory?
+   * Note: Avoid using this method if you already have FileStatus in hand.
+   * Instead reuse the FileStatus 
+   * returned by getFileStatus() or listStatus() methods.
+   * 
+   * @param f
+   * @return True iff the named path is a directory.
+   * @throws IOException
+   */
+  public boolean isDirectory(final Path f) throws IOException {
+    try {
+      final Path abs_f = makeAbsolute(f);
+      return getFileStatus(abs_f).isDir();
+    } catch (FileNotFoundException e) {
+      return false;               // f does not exist
+    }
+  }
+
+  /** True iff the named path is a regular file.
+   * Note: Avoid using this method  if you already have FileStatus in hand
+   * Instead reuse the FileStatus 
+   * returned by getFileStatus() or listStatus() methods.
+   */
+  public boolean isFile(final Path f) throws IOException {
+    try {
+      final Path abs_f = makeAbsolute(f);
+      return !getFileStatus(abs_f).isDir();
+    } catch (FileNotFoundException e) {
+      return false;               // f does not exist
+    }
+  }
+  
+ 
+  /**
+   * List the statuses of the files/directories in the given path if the path is
+   * a directory.
+   * 
+   * @param f
+   *          given path
+   * @return the statuses of the files/directories in the given patch
+   * @throws IOException
+   */
+  public FileStatus[] listStatus(final Path f) throws IOException {
+    final Path abs_f = makeAbsolute(f);
+    return getFileSystem(abs_f).listStatus(abs_f);
+  }
+  
+  /**
+   * Utility methods built over the basic FileContext methods.
+   */
+  private Util util;
+  public Util util() {
+    return util;
+  }
+  
+  public class Util {
+    FileContext thisFC;
+    Util(FileContext fc) {
+      thisFC = fc;
+    }
+
+
+    /**
+     * Return a list of file status objects that corresponds to the list of paths
+     * excluding those non-existent paths.
+     * 
+     * @param paths
+     *          the list of paths we want information from
+     * @return a list of FileStatus objects
+     * @throws IOException
+     *           see specific implementation
+     */
+    private FileStatus[] getFileStatus(final Path[] paths) throws IOException {
+      if (paths == null) {
+        return null;
+      }
+      ArrayList<FileStatus> results = new ArrayList<FileStatus>(paths.length);
+      for (int i = 0; i < paths.length; i++) {
+        try {
+          results.add(thisFC.getFileStatus(paths[i]));
+        } catch (FileNotFoundException e) { // do nothing
+        }
+      }
+      return results.toArray(new FileStatus[results.size()]);
+    }
+    
+    /**
+     * Filter files/directories in the given list of paths using default
+     * path filter.
+     * 
+     * @param files
+     *          a list of paths
+     * @return a list of statuses for the files under the given paths after
+     *         applying the filter default Path filter
+     * @exception IOException
+     */
+    public FileStatus[] listStatus(final Path[] files)
+        throws IOException {
+      return listStatus(files, DEFAULT_FILTER);
+    }
+    
+  
+    /**
+     * Filter files/directories in the given path using the user-supplied path
+     * filter.
+     * 
+     * @param f
+     *          a path name
+     * @param filter
+     *          the user-supplied path filter
+     * @return an array of FileStatus objects for the files under the given path
+     *         after applying the filter
+     * @throws IOException
+     *           if encounter any problem while fetching the status
+     */
+    public FileStatus[] listStatus(final Path f, final PathFilter filter)
+                                                    throws IOException {
+      ArrayList<FileStatus> results = new ArrayList<FileStatus>();
+      listStatus(results, f, filter);
+      return results.toArray(new FileStatus[results.size()]);
+    }
+    
+  
+    /**
+     * Filter files/directories in the given list of paths using user-supplied
+     * path filter.
+     * 
+     * @param files
+     *          a list of paths
+     * @param filter
+     *          the user-supplied path filter
+     * @return a list of statuses for the files under the given paths after
+     *         applying the filter
+     * @exception IOException
+     */
+    public FileStatus[] listStatus(final Path[] files, final PathFilter filter)
+        throws IOException {
+      ArrayList<FileStatus> results = new ArrayList<FileStatus>();
+      for (int i = 0; i < files.length; i++) {
+        listStatus(results, files[i], filter);
+      }
+      return results.toArray(new FileStatus[results.size()]);
+    }
+  
+    /*
+     * Filter files/directories in the given path using the user-supplied path
+     * filter. Results are added to the given array <code>results</code>.
+     */
+    private void listStatus(ArrayList<FileStatus> results, final Path f,
+        PathFilter filter) throws IOException {
+      FileStatus listing[] = thisFC.listStatus(f);
+      if (listing != null) {
+        for (int i = 0; i < listing.length; i++) {
+          if (filter.accept(listing[i].getPath())) {
+            results.add(listing[i]);
+          }
+        }
+      }
+    }
+  
+    /**
+     * <p>Return all the files that match filePattern and are not checksum
+     * files. Results are sorted by their names.
+     * 
+     * <p>
+     * A filename pattern is composed of <i>regular</i> characters and
+     * <i>special pattern matching</i> characters, which are:
+     *
+     * <dl>
+     *  <dd>
+     *   <dl>
+     *    <p>
+     *    <dt> <tt> ? </tt>
+     *    <dd> Matches any single character.
+     *
+     *    <p>
+     *    <dt> <tt> * </tt>
+     *    <dd> Matches zero or more characters.
+     *
+     *    <p>
+     *    <dt> <tt> [<i>abc</i>] </tt>
+     *    <dd> Matches a single character from character set
+     *     <tt>{<i>a,b,c</i>}</tt>.
+     *
+     *    <p>
+     *    <dt> <tt> [<i>a</i>-<i>b</i>] </tt>
+     *    <dd> Matches a single character from the character range
+     *     <tt>{<i>a...b</i>}</tt>.  Note that character <tt><i>a</i></tt> must be
+     *     lexicographically less than or equal to character <tt><i>b</i></tt>.
+     *
+     *    <p>
+     *    <dt> <tt> [^<i>a</i>] </tt>
+     *    <dd> Matches a single character that is not from character set or range
+     *     <tt>{<i>a</i>}</tt>.  Note that the <tt>^</tt> character must occur
+     *     immediately to the right of the opening bracket.
+     *
+     *    <p>
+     *    <dt> <tt> \<i>c</i> </tt>
+     *    <dd> Removes (escapes) any special meaning of character <i>c</i>.
+     *
+     *    <p>
+     *    <dt> <tt> {ab,cd} </tt>
+     *    <dd> Matches a string from the string set <tt>{<i>ab, cd</i>} </tt>
+     *    
+     *    <p>
+     *    <dt> <tt> {ab,c{de,fh}} </tt>
+     *    <dd> Matches a string from the string set <tt>{<i>ab, cde, cfh</i>}</tt>
+     *
+     *   </dl>
+     *  </dd>
+     * </dl>
+     *
+     * @param pathPattern a regular expression specifying a pth pattern
+
+     * @return an array of paths that match the path pattern
+     * @throws IOException
+     */
+    public FileStatus[] globStatus(final Path pathPattern) throws IOException {
+      return globStatus(pathPattern, DEFAULT_FILTER);
+    }
+    
+    /**
+     * Return an array of FileStatus objects whose path names match pathPattern
+     * and is accepted by the user-supplied path filter. Results are sorted by
+     * their path names.
+     * Return null if pathPattern has no glob and the path does not exist.
+     * Return an empty array if pathPattern has a glob and no path matches it. 
+     * 
+     * @param pathPattern
+     *          a regular expression specifying the path pattern
+     * @param filter
+     *          a user-supplied path filter
+     * @return an array of FileStatus objects
+     * @throws IOException if any I/O error occurs when fetching file status
+     */
+    public FileStatus[] globStatus(final Path pathPattern,
+                              final PathFilter filter) throws IOException {
+      String filename = pathPattern.toUri().getPath();
+      List<String> filePatterns = GlobExpander.expand(filename);
+      if (filePatterns.size() == 1) {
+        return globStatusInternal(pathPattern, filter);
+      } else {
+        List<FileStatus> results = new ArrayList<FileStatus>();
+        for (String filePattern : filePatterns) {
+          FileStatus[] files = globStatusInternal(new Path(filePattern), filter);
+          for (FileStatus file : files) {
+            results.add(file);
+          }
+        }
+        return results.toArray(new FileStatus[results.size()]);
+      }
+    }
+
+    private FileStatus[] globStatusInternal(final Path pathPattern,
+                                final PathFilter filter) throws IOException {
+      Path[] parents = new Path[1];
+      int level = 0;
+      String filename = pathPattern.toUri().getPath();
+      
+      // path has only zero component
+      if ("".equals(filename) || Path.SEPARATOR.equals(filename)) {
+        return getFileStatus(new Path[]{pathPattern});
+      }
+
+      // path has at least one component
+      String[] components = filename.split(Path.SEPARATOR);
+      // get the first component
+      if (pathPattern.isAbsolute()) {
+        parents[0] = new Path(Path.SEPARATOR);
+        level = 1;
+      } else {
+        parents[0] = new Path(Path.CUR_DIR);
+      }
+
+      // glob the paths that match the parent path, i.e., [0, components.length-1]
+      boolean[] hasGlob = new boolean[]{false};
+      Path[] parentPaths = globPathsLevel(parents, components, level, hasGlob);
+      FileStatus[] results;
+      if (parentPaths == null || parentPaths.length == 0) {
+        results = null;
+      } else {
+        // Now work on the last component of the path
+        GlobFilter fp = new GlobFilter(components[components.length - 1], filter);
+        if (fp.hasPattern()) { // last component has a pattern
+          // list parent directories and then glob the results
+          results = listStatus(parentPaths, fp);
+          hasGlob[0] = true;
+        } else { // last component does not have a pattern
+          // get all the path names
+          ArrayList<Path> filteredPaths = new ArrayList<Path>(parentPaths.length);
+          for (int i = 0; i < parentPaths.length; i++) {
+            parentPaths[i] = new Path(parentPaths[i],
+              components[components.length - 1]);
+            if (fp.accept(parentPaths[i])) {
+              filteredPaths.add(parentPaths[i]);
+            }
+          }
+          // get all their statuses
+          results = getFileStatus(
+              filteredPaths.toArray(new Path[filteredPaths.size()]));
+        }
+      }
+
+      // Decide if the pathPattern contains a glob or not
+      if (results == null) {
+        if (hasGlob[0]) {
+          results = new FileStatus[0];
+        }
+      } else {
+        if (results.length == 0 ) {
+          if (!hasGlob[0]) {
+            results = null;
+          }
+        } else {
+          Arrays.sort(results);
+        }
+      }
+      return results;
+    }
+
+    /*
+     * For a path of N components, return a list of paths that match the
+     * components [<code>level</code>, <code>N-1</code>].
+     */
+    private Path[] globPathsLevel(Path[] parents, String[] filePattern,
+        int level, boolean[] hasGlob) throws IOException {
+      if (level == filePattern.length - 1)
+        return parents;
+      if (parents == null || parents.length == 0) {
+        return null;
+      }
+      GlobFilter fp = new GlobFilter(filePattern[level]);
+      if (fp.hasPattern()) {
+        parents = FileUtil.stat2Paths(listStatus(parents, fp));
+        hasGlob[0] = true;
+      } else {
+        for (int i = 0; i < parents.length; i++) {
+          parents[i] = new Path(parents[i], filePattern[level]);
+        }
+      }
+      return globPathsLevel(parents, filePattern, level + 1, hasGlob);
+    }
+
+ 
+   
+    
+    
+    /**
+     * Copy file from src to dest
+     * @param src
+     * @param dest
+     * @throws IOException
+     */
+    public void copy(final Path src, final Path dst)  throws IOException {
+      copy(src, dst, false, false);
+    }
+    
+    /**
+     * Copy from src to dst, optionally deleting src and overwriting dst
+     * @param src
+     * @param dst
+     * @param deleteSource - delete src of true
+     * @param overwrite  overwrite dst if true; throw IOException if dst exists
+     *         and overwrite is false.
+     * @throws IOException
+     */
+    public void copy(final Path src,  final Path dst,
+        boolean deleteSource, 
+        boolean overwrite)
+        throws IOException {
+      isNotSchemeWithRelative(src);
+      isNotSchemeWithRelative(src);
+      // TBD
+    }
+  }
+  final private static PathFilter DEFAULT_FILTER = new PathFilter() {
+    public boolean accept(final Path file) {
+      return true;
+    }
+  };
+  
+  /* A class that could decide if a string matches the glob or not */
+  private static class GlobFilter implements PathFilter {
+    private PathFilter userFilter = DEFAULT_FILTER;
+    private Pattern regex;
+    private boolean hasPattern = false;
+      
+    /** Default pattern character: Escape any special meaning. */
+    private static final char  PAT_ESCAPE = '\\';
+    /** Default pattern character: Any single character. */
+    private static final char  PAT_ANY = '.';
+    /** Default pattern character: Character set close. */
+    private static final char  PAT_SET_CLOSE = ']';
+      
+    GlobFilter() {
+    }
+      
+    GlobFilter(final String filePattern) throws IOException {
+      setRegex(filePattern);
+    }
+      
+    GlobFilter(final String filePattern, final PathFilter filter) throws IOException {
+      userFilter = filter;
+      setRegex(filePattern);
+    }
+      
+    private boolean isJavaRegexSpecialChar(char pChar) {
+      return pChar == '.' || pChar == '$' || pChar == '(' || pChar == ')' ||
+             pChar == '|' || pChar == '+';
+    }
+    void setRegex(String filePattern) throws IOException {
+      int len;
+      int setOpen;
+      int curlyOpen;
+      boolean setRange;
+
+      StringBuilder fileRegex = new StringBuilder();
+
+      // Validate the pattern
+      len = filePattern.length();
+      if (len == 0)
+        return;
+
+      setOpen = 0;
+      setRange = false;
+      curlyOpen = 0;
+
+      for (int i = 0; i < len; i++) {
+        char pCh;
+          
+        // Examine a single pattern character
+        pCh = filePattern.charAt(i);
+        if (pCh == PAT_ESCAPE) {
+          fileRegex.append(pCh);
+          i++;
+          if (i >= len)
+            error("An escaped character does not present", filePattern, i);
+          pCh = filePattern.charAt(i);
+        } else if (isJavaRegexSpecialChar(pCh)) {
+          fileRegex.append(PAT_ESCAPE);
+        } else if (pCh == '*') {
+          fileRegex.append(PAT_ANY);
+          hasPattern = true;
+        } else if (pCh == '?') {
+          pCh = PAT_ANY;
+          hasPattern = true;
+        } else if (pCh == '{') {
+          fileRegex.append('(');
+          pCh = '(';
+          curlyOpen++;
+          hasPattern = true;
+        } else if (pCh == ',' && curlyOpen > 0) {
+          fileRegex.append(")|");
+          pCh = '(';
+        } else if (pCh == '}' && curlyOpen > 0) {
+          // End of a group
+          curlyOpen--;
+          fileRegex.append(")");
+          pCh = ')';
+        } else if (pCh == '[' && setOpen == 0) {
+          setOpen++;
+          hasPattern = true;
+        } else if (pCh == '^' && setOpen > 0) {
+        } else if (pCh == '-' && setOpen > 0) {
+          // Character set range
+          setRange = true;
+        } else if (pCh == PAT_SET_CLOSE && setRange) {
+          // Incomplete character set range
+          error("Incomplete character set range", filePattern, i);
+        } else if (pCh == PAT_SET_CLOSE && setOpen > 0) {
+          // End of a character set
+          if (setOpen < 2)
+            error("Unexpected end of set", filePattern, i);
+          setOpen = 0;
+        } else if (setOpen > 0) {
+          // Normal character, or the end of a character set range
+          setOpen++;
+          setRange = false;
+        }
+        fileRegex.append(pCh);
+      }
+        
+      // Check for a well-formed pattern
+      if (setOpen > 0 || setRange || curlyOpen > 0) {
+        // Incomplete character set or character range
+        error("Expecting set closure character or end of range, or }", 
+            filePattern, len);
+      }
+      regex = Pattern.compile(fileRegex.toString());
+    }
+      
+    boolean hasPattern() {
+      return hasPattern;
+    }
+      
+    public boolean accept(final Path path) {
+      return regex.matcher(path.getName()).matches() && userFilter.accept(path);
+    }
+      
+    private void error(final String s, final String pattern, final int pos)
+                                                        throws IOException {
+      throw new IOException("Illegal file pattern: "
+                            +s+ " for glob "+ pattern + " at " + pos);
+    }
+  }
+}
\ No newline at end of file

Property changes on: src/java/org/apache/hadoop/fs/FileContext.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Index: src/java/org/apache/hadoop/fs/Path.java
===================================================================
--- src/java/org/apache/hadoop/fs/Path.java	(revision 809454)
+++ src/java/org/apache/hadoop/fs/Path.java	(working copy)
@@ -126,6 +126,13 @@
     initialize(scheme, authority, path);
   }
 
+  /**
+   * Construct a path from a URI
+   */
+  public Path(URI aUri) {
+    uri = aUri;
+  }
+  
   /** Construct a Path from components. */
   public Path(String scheme, String authority, String path) {
     checkPathArg( path );
@@ -175,10 +182,17 @@
     return FileSystem.get(this.toUri(), conf);
   }
 
+  /**
+   *  True if the path component (i.e. directory) of this path is absolute.
+   */
+  public boolean isPathComponentAbsolute() {
+    int start = hasWindowsDrive(uri.getPath(), true) ? 3 : 0;
+    return uri.getPath().startsWith(SEPARATOR, start);
+   }
+  
   /** True if the directory of this path is absolute. */
   public boolean isAbsolute() {
-    int start = hasWindowsDrive(uri.getPath(), true) ? 3 : 0;
-    return uri.getPath().startsWith(SEPARATOR, start);
+     return isPathComponentAbsolute();
   }
 
   /** Returns the final component of this path.*/
@@ -265,29 +279,41 @@
     return depth;
   }
 
+  
+  /**
+   *  Returns a qualified path object.
+   *  
+   *  Deprecated - use {@link #makeQualified(URI, Path)}
+   */
+ 
+  @Deprecated 
+  public Path makeQualified(FileSystem fs) {
+    return makeQualified(fs.getUri(), fs.getWorkingDirectory());
+  }
+  
+  
   /** Returns a qualified path object. */
-  public Path makeQualified(FileSystem fs) {
+  public Path makeQualified(URI defaultUri, Path workingDir ) {
     Path path = this;
     if (!isAbsolute()) {
-      path = new Path(fs.getWorkingDirectory(), this);
+      path = new Path(workingDir, this);
     }
 
     URI pathUri = path.toUri();
-    URI fsUri = fs.getUri();
       
     String scheme = pathUri.getScheme();
     String authority = pathUri.getAuthority();
 
     if (scheme != null &&
-        (authority != null || fsUri.getAuthority() == null))
+        (authority != null || defaultUri.getAuthority() == null))
       return path;
 
     if (scheme == null) {
-      scheme = fsUri.getScheme();
+      scheme = defaultUri.getScheme();
     }
 
     if (authority == null) {
-      authority = fsUri.getAuthority();
+      authority = defaultUri.getAuthority();
       if (authority == null) {
         authority = "";
       }
Index: src/java/org/apache/hadoop/fs/FsConfig.java
===================================================================
--- src/java/org/apache/hadoop/fs/FsConfig.java	(revision 0)
+++ src/java/org/apache/hadoop/fs/FsConfig.java	(revision 0)
@@ -0,0 +1,105 @@
+package org.apache.hadoop.fs;
+
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+
+/** 
+ * This class is thin layer to manage the FS related keys in
+ * a configuration object.
+ * It provides convenience static method to set and get the keys from a 
+ * a configuration.
+ * 
+ * An instance object of FsConfig can be used to store and use the Fs config
+ * variables.
+ * 
+ * NOTE: setting a key in a config does not set that key to be the default
+ * for your application; it merely sets the key in the config object.
+ * To set the default for FS use the {@link Files} class.
+ *
+ */
+
+public class FsConfig {
+  
+  // Configuration keys  and default values in the config file
+  // TBD note we should deprecate the keys constants elsewhere
+  
+  
+  // The Keys
+  static final String FS_DEFAULT_NAME_KEY = "fs.default.name";
+  static final String FS_HOME_DIR_KEY = "fs.homeDir";
+  static final String FS_REPLICATION_FACTOR_KEY = "dfs.replication";
+  static final String FS_BLOCK_SIZE_KEY = "dfs.block.size";
+  static final String IO_BUFFER_SIZE_KEY ="io.file.buffer.size";
+
+
+  // The default values
+  // Default values of SERVER_DEFAULT(-1) implies use the ones from
+  // the target file system where files are created.
+  static final String FS_DEFAULT_NAME = "file:///";
+  static final String FS_HOME_DIR = "/user"; // relative to FS_DEFAULT
+  static final short FS_DEFAULT_REPLICATION_FACTOR = FileContext.SERVER_DEFAULT;
+  static final long FS_DEFAULT_BLOCK_SIZE = FileContext.SERVER_DEFAULT;
+  static final int IO_BUFFER_SIZE =4096;
+  
+  
+  /**
+   * In addition the fs config has keys for the impl of different
+   * file systems of the form:
+   *    fs.<scheme>.impl
+   *    The value is a string specifying the class name.
+   */
+  
+  
+  public static String getDefaultFS(final Configuration conf) {
+    return conf.get(FS_DEFAULT_NAME_KEY, FS_DEFAULT_NAME);
+  }
+  
+  public static String getHomeDir(final Configuration conf) {
+    return conf.get(FS_HOME_DIR_KEY, FS_HOME_DIR);
+  }
+  
+  public static short getDefaultReplicationFactor(final Configuration conf) {
+    return (short) 
+        conf.getInt(FS_REPLICATION_FACTOR_KEY, FS_DEFAULT_REPLICATION_FACTOR);
+  }
+  
+  public static long getDefaultBlockSize(final Configuration conf) {
+    return conf.getLong(FS_BLOCK_SIZE_KEY, FS_DEFAULT_BLOCK_SIZE);
+  }
+
+  
+  public static int getDefaultIOBuffersize(final Configuration conf) {
+    return conf.getInt(IO_BUFFER_SIZE_KEY, IO_BUFFER_SIZE);
+  }
+  
+  public static Class<?> getImplClass(URI uri, Configuration conf) {
+    return conf.getClass("fs." + uri.getScheme() + ".impl", null);
+  }
+
+  
+  /**
+   * The Setters: see the note on the javdoc for the class above.
+   */
+
+  public static void setDefaultFS(final Configuration conf, String uri) {
+    conf.set(FS_DEFAULT_NAME_KEY, uri);
+  }
+  
+  public static void setHomeDir(final Configuration conf, String path) {
+    conf.set(FS_HOME_DIR_KEY, path);
+  }
+  
+  public static void setDefaultReplicationFactor(final Configuration conf, short rf) {
+        conf.setInt(FS_REPLICATION_FACTOR_KEY, rf);
+  }
+  
+  public static void setDefaultBlockSize(final Configuration conf, long bs) {
+    conf.setLong(FS_BLOCK_SIZE_KEY, bs);
+  }
+  
+  
+  public static void setDefaultIOBuffersize(final Configuration conf, int bs) {
+     conf.setInt(IO_BUFFER_SIZE_KEY, bs);
+  }
+}

Property changes on: src/java/org/apache/hadoop/fs/FsConfig.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

