diff --git a/pom.xml b/pom.xml index 48d5ef8..313fa5e 100644 --- a/pom.xml +++ b/pom.xml @@ -571,7 +571,7 @@ at revision 1034499 with this hdfs-895 patch: https://issues.apache.org/jira/secure/attachment/12459473/hdfs-895-branch-20-append.txt --> - 0.20-append-r1057313 + 0.22.0-SNAPSHOT 5.5.23 2.1 6.1.26 @@ -673,9 +673,10 @@ org.apache.hadoop - hadoop-core + hadoop-common ${hadoop.version} + hsqldb hsqldb @@ -696,6 +697,86 @@ oro oro + + jdiff + jdiff + + + org.apache.lucene + lucene-core + + + + + org.apache.hadoop + hadoop-hdfs + ${hadoop.version} + + + + hsqldb + hsqldb + + + net.sf.kosmosfs + kfs + + + org.eclipse.jdt + core + + + net.java.dev.jets3t + jets3t + + + oro + oro + + + jdiff + jdiff + + + org.apache.lucene + lucene-core + + + + + org.apache.hadoop + hadoop-mapred + ${hadoop.version} + + + + hsqldb + hsqldb + + + net.sf.kosmosfs + kfs + + + org.eclipse.jdt + core + + + net.java.dev.jets3t + jets3t + + + oro + oro + + + jdiff + jdiff + + + org.apache.lucene + lucene-core + @@ -882,7 +963,19 @@ org.apache.hadoop - hadoop-test + hadoop-common-test + ${hadoop.version} + test + + + org.apache.hadoop + hadoop-hdfs-test + ${hadoop.version} + test + + + org.apache.hadoop + hadoop-mapred-test ${hadoop.version} test diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java b/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java index c2a7649..904c107 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java @@ -33,7 +33,6 @@ import org.apache.hadoop.security.UserGroupInformation; */ class ConnectionHeader implements Writable { private String protocol; - private UserGroupInformation ugi = null; public ConnectionHeader() {} @@ -47,7 +46,6 @@ class ConnectionHeader implements Writable { */ public ConnectionHeader(String protocol, UserGroupInformation ugi) { this.protocol = protocol; - this.ugi = ugi; } @Override @@ -56,26 +54,11 @@ class ConnectionHeader implements Writable { if (protocol.isEmpty()) { protocol = null; } - - boolean ugiUsernamePresent = in.readBoolean(); - if (ugiUsernamePresent) { - String username = in.readUTF(); - ugi.readFields(in); - } else { - ugi = null; - } } @Override public void write(DataOutput out) throws IOException { Text.writeString(out, (protocol == null) ? "" : protocol); - if (ugi != null) { - //Send both effective user and real user for simple auth - out.writeBoolean(true); - out.writeUTF(ugi.getUserName()); - } else { - out.writeBoolean(false); - } } public String getProtocol() { @@ -83,10 +66,10 @@ class ConnectionHeader implements Writable { } public UserGroupInformation getUgi() { - return ugi; + return null; } public String toString() { - return protocol + "-" + ugi; + return protocol; } } diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java index 95407d1..0f2a678 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java @@ -45,12 +45,12 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; @@ -271,9 +271,16 @@ public class HFileOutputFormat extends FileOutputFormat topClass; + try { + topClass = getTotalOrderPartitionerClass(); + } catch (ClassNotFoundException e) { + throw new IOException("Failed getting TotalOrderPartitioner", e); + } + job.setPartitionerClass(topClass); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setOutputFormatClass(HFileOutputFormat.class); @@ -302,10 +309,14 @@ public class HFileOutputFormat extends FileOutputFormat hadoop 0.20, then we want to use the hadoop TotalOrderPartitioner. + * If 0.20, then we want to use the TOP that we have under hadoopbackport. + * This method is about hbase being able to run on different versions of + * hadoop. In 0.20.x hadoops, we have to use the TOP that is bundled with + * hbase. Otherwise, we use the one in Hadoop. + * @return Instance of the TotalOrderPartitioner class + * @throws ClassNotFoundException If can't find a TotalOrderPartitioner. + */ + private static Class getTotalOrderPartitionerClass() + throws ClassNotFoundException { + Class clazz = null; + try { + clazz = (Class) Class.forName("org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner"); + } catch (ClassNotFoundException e) { + clazz = + (Class) Class.forName("org.apache.hadoop.mapreduce.hadoopbackport.TotalOrderPartitioner"); + } + return clazz; + } + /** * Run inside the task to deserialize column family to compression algorithm * map from the diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java index c3e0a68..a09b9b6 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.mapreduce.hadoopbackport; import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -132,8 +134,7 @@ public class InputSampler extends Configured implements Tool { int samplesPerSplit = numSamples / splitsToSample; long records = 0; for (int i = 0; i < splitsToSample; ++i) { - TaskAttemptContext samplingContext = new TaskAttemptContext( - job.getConfiguration(), new TaskAttemptID()); + TaskAttemptContext samplingContext = getTaskAttemptContext(job); RecordReader reader = inf.createRecordReader( splits.get(i), samplingContext); reader.initialize(splits.get(i), samplingContext); @@ -152,6 +153,32 @@ public class InputSampler extends Configured implements Tool { } /** + * This method is about making hbase portable, making it so it can run on + * more than just hadoop 0.20. In later hadoops, TaskAttemptContext became + * an Interface. But in hadoops where TAC is an Interface, we shouldn't + * be using the classes that are in this package; we should be using the + * native Hadoop ones (We'll throw a ClassNotFoundException if end up in + * here when we should be using native hadoop TotalOrderPartitioner). + * @param job + * @return + * @throws IOException + */ + public static TaskAttemptContext getTaskAttemptContext(final Job job) + throws IOException { + Constructor c; + try { + c = TaskAttemptContext.class.getConstructor(job.getConfiguration().getClass(), TaskAttemptID.class); + } catch (Exception e) { + throw new IOException("Failed getting constructor", e); + } + try { + return c.newInstance(job.getConfiguration(), new TaskAttemptID()); + } catch (Exception e) { + throw new IOException("Failed creating instance", e); + } + } + + /** * Sample from random points in the input. * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from * each split. @@ -214,8 +241,7 @@ public class InputSampler extends Configured implements Tool { // the target sample keyset for (int i = 0; i < splitsToSample || (i < splits.size() && samples.size() < numSamples); ++i) { - TaskAttemptContext samplingContext = new TaskAttemptContext( - job.getConfiguration(), new TaskAttemptID()); + TaskAttemptContext samplingContext = getTaskAttemptContext(job); RecordReader reader = inf.createRecordReader( splits.get(i), samplingContext); reader.initialize(splits.get(i), samplingContext); @@ -285,8 +311,7 @@ public class InputSampler extends Configured implements Tool { long records = 0; long kept = 0; for (int i = 0; i < splitsToSample; ++i) { - TaskAttemptContext samplingContext = new TaskAttemptContext( - job.getConfiguration(), new TaskAttemptID()); + TaskAttemptContext samplingContext = getTaskAttemptContext(job); RecordReader reader = inf.createRecordReader( splits.get(i), samplingContext); reader.initialize(splits.get(i), samplingContext); diff --git a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index b8489ac..9c0c540 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -595,7 +595,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { int port = this.conf.getInt("hbase.master.info.port", 60010); if (port >= 0) { String a = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0"); - this.infoServer = new InfoServer(MASTER, a, port, false); + this.infoServer = new InfoServer(MASTER, a, port, false, this.conf); this.infoServer.addServlet("status", "/master-status", MasterStatusServlet.class); this.infoServer.setAttribute(MASTER, this); this.infoServer.start(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 6404538..6d51cb5 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1288,7 +1288,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, boolean auto = this.conf.getBoolean("hbase.regionserver.info.port.auto", false); while (true) { try { - this.infoServer = new InfoServer("regionserver", addr, port, false); + this.infoServer = new InfoServer("regionserver", addr, port, false, this.conf); this.infoServer.addServlet("status", "/rs-status", RSStatusServlet.class); this.infoServer.setAttribute(REGIONSERVER, this); this.infoServer.start(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 0716788..c8dad04 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -48,6 +48,7 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -139,8 +140,8 @@ public class HLog implements Syncable { HLog.logReaderClass = null; } - private OutputStream hdfs_out; // OutputStream associated with the current SequenceFile.writer - private int initialReplication; // initial replication factor of SequenceFile.writer + private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current SequenceFile.writer + private int initialReplication; // initial replication factor of SequenceFile.writer private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas final static Object [] NO_ARGS = new Object []{}; @@ -368,33 +369,42 @@ public class HLog implements Syncable { rollWriter(); // handle the reflection necessary to call getNumCurrentReplicas() - this.getNumCurrentReplicas = null; + this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out); + + logSyncerThread = new LogSyncer(this.optionalFlushInterval); + Threads.setDaemonThreadRunning(logSyncerThread, + Thread.currentThread().getName() + ".logSyncer"); + coprocessorHost = new WALCoprocessorHost(this, conf); + } + + /** + * Find the 'getNumCurrentReplicas' on the passed os stream. + * @return Method or null. + */ + private Method getGetNumCurrentReplicas(final FSDataOutputStream os) { + Method m = null; Exception exception = null; - if (this.hdfs_out != null) { + if (os != null) { try { - this.getNumCurrentReplicas = this.hdfs_out.getClass(). + m = os.getWrappedStream().getClass(). getMethod("getNumCurrentReplicas", new Class []{}); - this.getNumCurrentReplicas.setAccessible(true); + m.setAccessible(true); } catch (NoSuchMethodException e) { // Thrown if getNumCurrentReplicas() function isn't available exception = e; } catch (SecurityException e) { // Thrown if we can't get access to getNumCurrentReplicas() exception = e; - this.getNumCurrentReplicas = null; // could happen on setAccessible() + m = null; // could happen on setAccessible() } } - if (this.getNumCurrentReplicas != null) { + if (m != null) { LOG.info("Using getNumCurrentReplicas--HDFS-826"); } else { LOG.info("getNumCurrentReplicas--HDFS-826 not available; hdfs_out=" + - this.hdfs_out + ", exception=" + exception.getMessage()); + os + ", exception=" + exception.getMessage()); } - - logSyncerThread = new LogSyncer(this.optionalFlushInterval); - Threads.setDaemonThreadRunning(logSyncerThread, - Thread.currentThread().getName() + ".logSyncer"); - coprocessorHost = new WALCoprocessorHost(this, conf); + return m; } public void registerWALActionsListener (final WALObserver listener) { @@ -436,9 +446,15 @@ public class HLog implements Syncable { return logSeqNum.get(); } + /** + * Method used internal to this class and for tests only. + * @return The wrapped stream our writer is using; its not the + * writer's 'out' FSDatoOutputStream but the stream that this 'out' wraps + * (In hdfs its an instance of DFSDataOutputStream). + */ // usage: see TestLogRolling.java OutputStream getOutputStream() { - return this.hdfs_out; + return this.hdfs_out.getWrappedStream(); } /** @@ -482,10 +498,9 @@ public class HLog implements Syncable { // Can we get at the dfsclient outputstream? If an instance of // SFLW, it'll have done the necessary reflection to get at the // protected field name. - OutputStream nextHdfsOut = null; + FSDataOutputStream nextHdfsOut = null; if (nextWriter instanceof SequenceFileLogWriter) { - nextHdfsOut = - ((SequenceFileLogWriter)nextWriter).getDFSCOutputStream(); + nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream(); } // Tell our listeners that a new log was created if (!this.listeners.isEmpty()) { @@ -768,6 +783,7 @@ public class HLog implements Syncable { */ public void closeAndDelete() throws IOException { close(); + if (fs.exists(this.dir)) return; FileStatus[] files = fs.listStatus(this.dir); for(FileStatus file : files) { Path p = getHLogArchivePath(this.oldLogDir, file.getPath()); @@ -776,7 +792,7 @@ public class HLog implements Syncable { } } LOG.debug("Moved " + files.length + " log files to " + - FSUtils.getPath(this.oldLogDir)); + FSUtils.getPath(this.oldLogDir)); if (!fs.delete(dir, true)) { LOG.info("Unable to delete " + dir); } @@ -966,8 +982,7 @@ public class HLog implements Syncable { } } - @Override - public void sync() throws IOException { + private void syncer() throws IOException { synchronized (this.updateLock) { if (this.closed) { return; @@ -1027,9 +1042,10 @@ public class HLog implements Syncable { * * @throws Exception */ - int getLogReplication() throws IllegalArgumentException, IllegalAccessException, InvocationTargetException { - if(this.getNumCurrentReplicas != null && this.hdfs_out != null) { - Object repl = this.getNumCurrentReplicas.invoke(this.hdfs_out, NO_ARGS); + int getLogReplication() + throws IllegalArgumentException, IllegalAccessException, InvocationTargetException { + if (this.getNumCurrentReplicas != null && this.hdfs_out != null) { + Object repl = this.getNumCurrentReplicas.invoke(getOutputStream(), NO_ARGS); if (repl instanceof Integer) { return ((Integer)repl).intValue(); } @@ -1042,8 +1058,15 @@ public class HLog implements Syncable { } public void hsync() throws IOException { - // Not yet implemented up in hdfs so just call hflush. - sync(); + syncer(); + } + + public void hflush() throws IOException { + syncer(); + } + + public void sync() throws IOException { + syncer(); } private void requestLogRoll() { @@ -1309,7 +1332,9 @@ public class HLog implements Syncable { public static NavigableSet getSplitEditFilesSorted(final FileSystem fs, final Path regiondir) throws IOException { + NavigableSet filesSorted = new TreeSet(); Path editsdir = getRegionDirRecoveredEditsDir(regiondir); + if (!fs.exists(editsdir)) return filesSorted; FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { @Override public boolean accept(Path p) { @@ -1327,7 +1352,6 @@ public class HLog implements Syncable { return result; } }); - NavigableSet filesSorted = new TreeSet(); if (files == null) return filesSorted; for (FileStatus status: files) { filesSorted.add(status.getPath()); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index db29e56..68ced55 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -530,10 +530,10 @@ public class HLogSplitter { private static List listAll(FileSystem fs, Path dir) throws IOException { List fset = new ArrayList(100); - FileStatus [] files = fs.listStatus(dir); + FileStatus [] files = fs.exists(dir)? fs.listStatus(dir): null; if (files != null) { for (FileStatus f : files) { - if (f.isDir()) { + if (f.isDirectory()) { fset.addAll(listAll(fs, f.getPath())); } else { fset.add(f); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java index 8dc9a5e..63139fb 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java @@ -43,13 +43,15 @@ public class SequenceFileLogWriter implements HLog.Writer { private final Log LOG = LogFactory.getLog(this.getClass()); // The sequence file we delegate to. private SequenceFile.Writer writer; - // The dfsclient out stream gotten made accessible or null if not available. - private OutputStream dfsClient_out; - // The syncFs method from hdfs-200 or null if not available. - private Method syncFs; + // This is the FSDataOutputStream instance that is the 'out' instance + // in the SequenceFile.Writer 'writer' instance above. + private FSDataOutputStream writer_out; private Class keyClass; + private Method syncFs = null; + private Method hflush = null; + /** * Default constructor. */ @@ -66,10 +68,10 @@ public class SequenceFileLogWriter implements HLog.Writer { public SequenceFileLogWriter(Class keyClass) { this.keyClass = keyClass; } - + @Override public void init(FileSystem fs, Path path, Configuration conf) - throws IOException { + throws IOException { if (null == keyClass) { keyClass = HLog.getKeyClass(conf); @@ -87,10 +89,63 @@ public class SequenceFileLogWriter implements HLog.Writer { new DefaultCodec(), null, new Metadata()); + + this.writer_out = getSequenceFilePrivateFSDataOutputStreamAccessible(); + this.syncFs = getSyncFs(); + this.hflush = getHFlush(); + String msg = + "syncFs=" + (this.syncFs != null) + ", hflush=" + (this.hflush != null); + if (this.syncFs != null || this.hflush != null) { + LOG.debug(msg); + } else { + LOG.warn("No sync support! " + msg); + } + } - // Get at the private FSDataOutputStream inside in SequenceFile so we can - // call sync on it. Make it accessible. Stash it aside for call up in - // the sync method. + /** + * Now do dirty work to see if syncFs is available on the backing this.writer. + * It will be available in branch-0.20-append and in CDH3. + * @return The syncFs method or null if not available. + * @throws IOException + */ + private Method getSyncFs() + throws IOException { + Method m = null; + try { + // function pointer to writer.syncFs() method; present when sync is hdfs-200. + m = this.writer.getClass().getMethod("syncFs", new Class []{}); + } catch (SecurityException e) { + throw new IOException("Failed test for syncfs", e); + } catch (NoSuchMethodException e) { + // Not available + } + return m; + } + + /** + * See if hflush (0.21 and 0.22 hadoop) is available. + * @return The hflush method or null if not available. + * @throws IOException + */ + private Method getHFlush() + throws IOException { + Method m = null; + try { + Class c = getWriterFSDataOutputStream().getClass(); + m = c.getMethod("hflush", new Class []{}); + } catch (SecurityException e) { + throw new IOException("Failed test for hflush", e); + } catch (NoSuchMethodException e) { + // Ignore + } + return m; + } + + // Get at the private FSDataOutputStream inside in SequenceFile so we can + // call sync on it. Make it accessible. + private FSDataOutputStream getSequenceFilePrivateFSDataOutputStreamAccessible() + throws IOException { + FSDataOutputStream out = null; final Field fields [] = this.writer.getClass().getDeclaredFields(); final String fieldName = "out"; for (int i = 0; i < fields.length; ++i) { @@ -98,34 +153,17 @@ public class SequenceFileLogWriter implements HLog.Writer { try { // Make the 'out' field up in SF.Writer accessible. fields[i].setAccessible(true); - FSDataOutputStream out = - (FSDataOutputStream)fields[i].get(this.writer); - this.dfsClient_out = out.getWrappedStream(); + out = (FSDataOutputStream)fields[i].get(this.writer); break; } catch (IllegalAccessException ex) { throw new IOException("Accessing " + fieldName, ex); + } catch (SecurityException e) { + // TODO Auto-generated catch block + e.printStackTrace(); } } } - - // Now do dirty work to see if syncFs is available. - // Test if syncfs is available. - Method m = null; - boolean append = conf.getBoolean("dfs.support.append", false); - if (append) { - try { - // function pointer to writer.syncFs() - m = this.writer.getClass().getMethod("syncFs", new Class []{}); - } catch (SecurityException e) { - throw new IOException("Failed test for syncfs", e); - } catch (NoSuchMethodException e) { - // Not available - } - } - this.syncFs = m; - LOG.info((this.syncFs != null)? - "Using syncFs -- HDFS-200": - ("syncFs -- HDFS-200 -- not available, dfs.support.append=" + append)); + return out; } @Override @@ -146,6 +184,12 @@ public class SequenceFileLogWriter implements HLog.Writer { } catch (Exception e) { throw new IOException("Reflection", e); } + } else if (this.hflush != null) { + try { + this.hflush.invoke(getWriterFSDataOutputStream(), HLog.NO_ARGS); + } catch (Exception e) { + throw new IOException("Reflection", e); + } } } @@ -158,7 +202,7 @@ public class SequenceFileLogWriter implements HLog.Writer { * @return The dfsclient out stream up inside SF.Writer made accessible, or * null if not available. */ - public OutputStream getDFSCOutputStream() { - return this.dfsClient_out; + public FSDataOutputStream getWriterFSDataOutputStream() { + return this.writer_out; } -} +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 3409108..145cddc 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -737,9 +737,12 @@ public class FSUtils { } catch (NoSuchMethodException e) { append = false; } - } else { + } + if (!append) { + // Look for the 0.21, 0.22, new-style append evidence. try { FSDataOutputStream.class.getMethod("hflush", new Class []{}); + append = true; } catch (NoSuchMethodException e) { append = false; } diff --git a/src/main/java/org/apache/hadoop/hbase/util/InfoServer.java b/src/main/java/org/apache/hadoop/hbase/util/InfoServer.java index 6ed9fe6..fe22ed1 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/InfoServer.java +++ b/src/main/java/org/apache/hadoop/hbase/util/InfoServer.java @@ -20,16 +20,19 @@ package org.apache.hadoop.hbase.util; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.URL; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.http.HttpServer; import org.mortbay.jetty.handler.ContextHandlerCollection; import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.DefaultServlet; -import java.io.IOException; -import java.net.URL; -import java.util.Map; - /** * Create a Jetty embedded server to answer http requests. The primary goal * is to serve up status information for the server. @@ -39,6 +42,8 @@ import java.util.Map; * "/" -> the jsp server code from (src/hbase-webapps/) */ public class InfoServer extends HttpServer { + private final Configuration config; + /** * Create a status server on the given port. * The jsp scripts are taken from src/hbase-webapps/name. @@ -49,15 +54,36 @@ public class InfoServer extends HttpServer { * increment by 1 until it finds a free port. * @throws IOException e */ - public InfoServer(String name, String bindAddress, int port, boolean findPort) + public InfoServer(String name, String bindAddress, int port, boolean findPort, + final Configuration c) throws IOException { super(name, bindAddress, port, findPort, HBaseConfiguration.create()); webServer.addHandler(new ContextHandlerCollection()); + this.config = c; } protected void addDefaultApps(ContextHandlerCollection parent, String appDir) throws IOException { - super.addDefaultApps(parent, appDir); + // Figure which super method to call. If 0.22 hadoop httpservlet, need to + // pass the configuration when I call the super addDefaultApps. + Method [] allMethods = this.getClass().getSuperclass().getDeclaredMethods(); + for (Method m: allMethods) { + if (!m.getName().equals("addDefaultApps")) continue; + Class [] parameterTypes = m.getParameterTypes(); + if (parameterTypes.length == 2) { + try { + m.invoke(this, parent, appDir); + } catch (Exception e) { + throw new IOException("Failed reflection", e); + } + } else if (parameterTypes.length == 3) { + try { + m.invoke(this, parent, appDir, this.config); + } catch (Exception e) { + throw new IOException("Failed reflection", e); + } + } + } // Must be same as up in hadoop. final String logsContextPath = "/logs"; // Now, put my logs in place of hadoops... disable old one first. @@ -83,10 +109,25 @@ public class InfoServer extends HttpServer { } /** + * Get the pathname to the webapps files. + * @param appName eg "secondary" or "datanode" + * @return the pathname as a URL + * @throws FileNotFoundException if 'webapps' directory cannot be found on CLASSPATH. + */ + protected String getWebAppsPath(String appName) throws FileNotFoundException { + // Copied from the super-class. + URL url = getClass().getClassLoader().getResource("hbase-webapps/" + appName); + if (url == null) + throw new FileNotFoundException("webapps/" + appName + + " not found in CLASSPATH"); + String urlString = url.toString(); + return urlString.substring(0, urlString.lastIndexOf('/')); + } + + /** * Get the pathname to the path files. * @return the pathname as a URL */ - @Override protected String getWebAppsPath() throws IOException { // Hack: webapps is not a unique enough element to find in CLASSPATH // We'll more than likely find the hadoop webapps dir. So, instead @@ -95,29 +136,9 @@ public class InfoServer extends HttpServer { // master webapp resides is where we want this InfoServer picking up // web applications. final String master = "master"; - String p = getWebAppDir(master); - // Now strip master + the separator off the end of our context - return p.substring(0, p.length() - (master.length() + 1/* The separator*/)); - } - - private static String getWebAppsPath(final String path) - throws IOException { - URL url = InfoServer.class.getClassLoader().getResource(path); - if (url == null) - throw new IOException("hbase-webapps not found in CLASSPATH: " + path); - return url.toString(); - } - - /** - * Get the path for this web app - * @param webappName web app - * @return path - * @throws IOException e - */ - public static String getWebAppDir(final String webappName) - throws IOException { - String webappDir; - webappDir = getWebAppsPath("hbase-webapps/" + webappName); - return webappDir; + String p = getWebAppsPath(master); + int index = p.lastIndexOf(master); + // Now strip master off the end. + return p.substring(0, index); } -} +} \ No newline at end of file diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index babd788..7321fca 100644 --- a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.zookeeper.ZooKeeper; @@ -1233,7 +1234,10 @@ public class HBaseTestingUtility { Field field = this.dfsCluster.getClass().getDeclaredField("nameNode"); field.setAccessible(true); NameNode nn = (NameNode)field.get(this.dfsCluster); - nn.namesystem.leaseManager.setLeasePeriod(100, 50000); + field = nn.getClass().getDeclaredField("namesystem"); + field.setAccessible(true); + FSNamesystem namesystem = (FSNamesystem)field.get(nn); + namesystem.leaseManager.setLeasePeriod(100, 50000); } /** diff --git a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java index f6a7210..0b6bfa9 100644 --- a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java +++ b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java @@ -57,14 +57,17 @@ import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Hadoops; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.TaskID; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -174,8 +177,12 @@ public class TestHFileOutputFormat { try { Job job = new Job(conf); FileOutputFormat.setOutputPath(job, dir); - context = new TaskAttemptContext(job.getConfiguration(), - new TaskAttemptID()); + if (Hadoops.isPost020MapReduce()) { + context = new TaskAttemptContextImpl(job.getConfiguration(), + TaskAttemptID.forName("attempt_200707121733_0001_m_000000_0")); + } else { + context = org.apache.hadoop.hbase.mapreduce.hadoopbackport.InputSampler.getTaskAttemptContext(job); + } HFileOutputFormat hof = new HFileOutputFormat(); writer = hof.getRecordWriter(context); final byte [] b = Bytes.toBytes("b"); @@ -484,8 +491,8 @@ public class TestHFileOutputFormat { setupRandomGeneratorMapper(job); HFileOutputFormat.configureIncrementalLoad(job, table); FileOutputFormat.setOutputPath(job, dir); - context = new TaskAttemptContext(job.getConfiguration(), - new TaskAttemptID()); + // FIX + context = org.apache.hadoop.hbase.mapreduce.hadoopbackport.InputSampler.getTaskAttemptContext(job); HFileOutputFormat hof = new HFileOutputFormat(); writer = hof.getRecordWriter(context); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java index f1f4c6b..bf06673 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java @@ -173,9 +173,8 @@ public class TestHLogSplit { throws IOException { AtomicBoolean stop = new AtomicBoolean(false); - FileStatus[] stats = fs.listStatus(new Path("/hbase/t1")); - assertTrue("Previous test should clean up table dir", - stats == null || stats.length == 0); + assertFalse("Previous test should clean up table dir", + fs.exists(new Path("/hbase/t1"))); generateHLogs(-1); @@ -967,8 +966,7 @@ public class TestHLogSplit { HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir, logfile.getPath().toString(), conf); Path tdir = HTableDescriptor.getTableDir(hbaseDir, TABLE_NAME); - FileStatus [] files = this.fs.listStatus(tdir); - assertTrue(files == null || files.length == 0); + assertFalse(fs.exists(tdir)); assertEquals(0, countHLog(fs.listStatus(oldLogDir)[0].getPath(), fs, conf)); } diff --git a/src/test/java/org/apache/hadoop/hbase/util/Hadoops.java b/src/test/java/org/apache/hadoop/hbase/util/Hadoops.java new file mode 100644 index 0000000..285338f --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/util/Hadoops.java @@ -0,0 +1,34 @@ +/** + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Utility to help figure what the backing Hadoop provides. + */ +public class Hadoops { + /** + * @return True if the available mapreduce is post-0.20. + */ + public static boolean isPost020MapReduce() { + return TaskAttemptContext.class.isInterface(); + } +} \ No newline at end of file diff --git a/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java index c8fc065..a0df53d 100644 --- a/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java +++ b/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java @@ -34,7 +34,6 @@ public class TestFSUtils { HBaseTestingUtility htu = new HBaseTestingUtility(); htu.getConfiguration().setBoolean("dfs.support.append", false); assertFalse(FSUtils.isHDFS(htu.getConfiguration())); - assertFalse(FSUtils.isAppendSupported(htu.getConfiguration())); htu.getConfiguration().setBoolean("dfs.support.append", true); MiniDFSCluster cluster = null; try { @@ -45,4 +44,4 @@ public class TestFSUtils { if (cluster != null) cluster.shutdown(); } } -} +} \ No newline at end of file