diff --git a/pom.xml b/pom.xml index 48d5ef8..313fa5e 100644 --- a/pom.xml +++ b/pom.xml @@ -571,7 +571,7 @@ at revision 1034499 with this hdfs-895 patch: https://issues.apache.org/jira/secure/attachment/12459473/hdfs-895-branch-20-append.txt --> - 0.20-append-r1057313 + 0.22.0-SNAPSHOT 5.5.23 2.1 6.1.26 @@ -673,9 +673,10 @@ org.apache.hadoop - hadoop-core + hadoop-common ${hadoop.version} + hsqldb hsqldb @@ -696,6 +697,86 @@ oro oro + + jdiff + jdiff + + + org.apache.lucene + lucene-core + + + + + org.apache.hadoop + hadoop-hdfs + ${hadoop.version} + + + + hsqldb + hsqldb + + + net.sf.kosmosfs + kfs + + + org.eclipse.jdt + core + + + net.java.dev.jets3t + jets3t + + + oro + oro + + + jdiff + jdiff + + + org.apache.lucene + lucene-core + + + + + org.apache.hadoop + hadoop-mapred + ${hadoop.version} + + + + hsqldb + hsqldb + + + net.sf.kosmosfs + kfs + + + org.eclipse.jdt + core + + + net.java.dev.jets3t + jets3t + + + oro + oro + + + jdiff + jdiff + + + org.apache.lucene + lucene-core + @@ -882,7 +963,19 @@ org.apache.hadoop - hadoop-test + hadoop-common-test + ${hadoop.version} + test + + + org.apache.hadoop + hadoop-hdfs-test + ${hadoop.version} + test + + + org.apache.hadoop + hadoop-mapred-test ${hadoop.version} test diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java b/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java index c2a7649..904c107 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java @@ -33,7 +33,6 @@ import org.apache.hadoop.security.UserGroupInformation; */ class ConnectionHeader implements Writable { private String protocol; - private UserGroupInformation ugi = null; public ConnectionHeader() {} @@ -47,7 +46,6 @@ class ConnectionHeader implements Writable { */ public ConnectionHeader(String protocol, UserGroupInformation ugi) { this.protocol = protocol; - this.ugi = ugi; } @Override @@ -56,26 +54,11 @@ class ConnectionHeader implements Writable { if (protocol.isEmpty()) { protocol = null; } - - boolean ugiUsernamePresent = in.readBoolean(); - if (ugiUsernamePresent) { - String username = in.readUTF(); - ugi.readFields(in); - } else { - ugi = null; - } } @Override public void write(DataOutput out) throws IOException { Text.writeString(out, (protocol == null) ? "" : protocol); - if (ugi != null) { - //Send both effective user and real user for simple auth - out.writeBoolean(true); - out.writeUTF(ugi.getUserName()); - } else { - out.writeBoolean(false); - } } public String getProtocol() { @@ -83,10 +66,10 @@ class ConnectionHeader implements Writable { } public UserGroupInformation getUgi() { - return ugi; + return null; } public String toString() { - return protocol + "-" + ugi; + return protocol; } } diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java index 95407d1..1fd3d24 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java @@ -45,12 +45,12 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; @@ -271,9 +271,16 @@ public class HFileOutputFormat extends FileOutputFormat topClass; + try { + topClass = getTotalOrderPartitionerClass(); + } catch (ClassNotFoundException e) { + throw new IOException("Failed getting TotalOrderPartitioner", e); + } + job.setPartitionerClass(topClass); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setOutputFormatClass(HFileOutputFormat.class); @@ -302,10 +309,14 @@ public class HFileOutputFormat extends FileOutputFormat hadoop 0.20, then we want to use the hadoop TotalOrderPartitioner. + * If 0.20, then we want to use the TOP that we have under hadoopbackport. + * This method is about hbase being able to run on different versions of + * hadoop. In 0.20.x hadoops, we have to use the TOP that is bundled with + * hbase. Otherwise, we use the one in Hadoop. + * @return Instance of the TotalOrderPartitioner class + * @throws ClassNotFoundException If can't find a TotalOrderPartitioner. + */ + private static Class getTotalOrderPartitionerClass() + throws ClassNotFoundException { + Class clazz = null; + try { + clazz = (Class) Class.forName("org.apache.hadoop.mapreduce.lib.partitioner.TotalOrderPartitioner"); + } catch (ClassNotFoundException e) { + clazz = + (Class) Class.forName("org.apache.hadoop.mapreduce.hadoopbackport.TotalOrderPartitioner"); + } + return clazz; + } + /** * Run inside the task to deserialize column family to compression algorithm * map from the diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java index c3e0a68..394155d 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.mapreduce.hadoopbackport; import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -132,8 +134,7 @@ public class InputSampler extends Configured implements Tool { int samplesPerSplit = numSamples / splitsToSample; long records = 0; for (int i = 0; i < splitsToSample; ++i) { - TaskAttemptContext samplingContext = new TaskAttemptContext( - job.getConfiguration(), new TaskAttemptID()); + TaskAttemptContext samplingContext = getTaskAttemptContext(job); RecordReader reader = inf.createRecordReader( splits.get(i), samplingContext); reader.initialize(splits.get(i), samplingContext); @@ -152,6 +153,35 @@ public class InputSampler extends Configured implements Tool { } /** + * This method is about making hbase portable, making it so it can run on + * more than just hadoop 0.20. In later hadoops, TaskAttemptContext became + * an Interface. But in hadoops where TAC is an Interface, we shouldn't + * be using the classes that are in this package; we should be using the + * native Hadoop ones (We'll throw a ClassNotFoundException if end up in + * here when we should be using native hadoop TotalOrderPartitioner). + * @param job + * @return + * @throws IOException + */ + public static TaskAttemptContext getTaskAttemptContext(final Job job) + throws IOException { + if (TaskAttemptContext.class.isInterface()) { + throw new IOException(new ClassNotFoundException("TaskAttemptContext is an Interface, not a class")); + } + Constructor c; + try { + c = TaskAttemptContext.class.getConstructor(job.getConfiguration().getClass(), TaskAttemptID.class); + } catch (Exception e) { + throw new IOException("Failed getting constructor", e); + } + try { + return c.newInstance(job.getConfiguration(), new TaskAttemptID()); + } catch (Exception e) { + throw new IOException("Failed creating instance", e); + } + } + + /** * Sample from random points in the input. * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from * each split. @@ -214,8 +244,7 @@ public class InputSampler extends Configured implements Tool { // the target sample keyset for (int i = 0; i < splitsToSample || (i < splits.size() && samples.size() < numSamples); ++i) { - TaskAttemptContext samplingContext = new TaskAttemptContext( - job.getConfiguration(), new TaskAttemptID()); + TaskAttemptContext samplingContext = getTaskAttemptContext(job); RecordReader reader = inf.createRecordReader( splits.get(i), samplingContext); reader.initialize(splits.get(i), samplingContext); @@ -285,8 +314,7 @@ public class InputSampler extends Configured implements Tool { long records = 0; long kept = 0; for (int i = 0; i < splitsToSample; ++i) { - TaskAttemptContext samplingContext = new TaskAttemptContext( - job.getConfiguration(), new TaskAttemptID()); + TaskAttemptContext samplingContext = getTaskAttemptContext(job); RecordReader reader = inf.createRecordReader( splits.get(i), samplingContext); reader.initialize(splits.get(i), samplingContext); diff --git a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index b8489ac..9c0c540 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -595,7 +595,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { int port = this.conf.getInt("hbase.master.info.port", 60010); if (port >= 0) { String a = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0"); - this.infoServer = new InfoServer(MASTER, a, port, false); + this.infoServer = new InfoServer(MASTER, a, port, false, this.conf); this.infoServer.addServlet("status", "/master-status", MasterStatusServlet.class); this.infoServer.setAttribute(MASTER, this); this.infoServer.start(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 6404538..6d51cb5 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1288,7 +1288,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, boolean auto = this.conf.getBoolean("hbase.regionserver.info.port.auto", false); while (true) { try { - this.infoServer = new InfoServer("regionserver", addr, port, false); + this.infoServer = new InfoServer("regionserver", addr, port, false, this.conf); this.infoServer.addServlet("status", "/rs-status", RSStatusServlet.class); this.infoServer.setAttribute(REGIONSERVER, this); this.infoServer.start(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 0716788..e0c8104 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -484,8 +484,7 @@ public class HLog implements Syncable { // protected field name. OutputStream nextHdfsOut = null; if (nextWriter instanceof SequenceFileLogWriter) { - nextHdfsOut = - ((SequenceFileLogWriter)nextWriter).getDFSCOutputStream(); + nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getDFSClientOutputStream(); } // Tell our listeners that a new log was created if (!this.listeners.isEmpty()) { @@ -966,8 +965,7 @@ public class HLog implements Syncable { } } - @Override - public void sync() throws IOException { + private void syncer() throws IOException { synchronized (this.updateLock) { if (this.closed) { return; @@ -1042,8 +1040,15 @@ public class HLog implements Syncable { } public void hsync() throws IOException { - // Not yet implemented up in hdfs so just call hflush. - sync(); + syncer(); + } + + public void hflush() throws IOException { + syncer(); + } + + public void sync() throws IOException { + syncer(); } private void requestLogRoll() { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java index 8dc9a5e..a4f6015 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java @@ -45,11 +45,12 @@ public class SequenceFileLogWriter implements HLog.Writer { private SequenceFile.Writer writer; // The dfsclient out stream gotten made accessible or null if not available. private OutputStream dfsClient_out; - // The syncFs method from hdfs-200 or null if not available. - private Method syncFs; private Class keyClass; + private Method syncFs = null; + private Method hflush = null; + /** * Default constructor. */ @@ -69,7 +70,7 @@ public class SequenceFileLogWriter implements HLog.Writer { @Override public void init(FileSystem fs, Path path, Configuration conf) - throws IOException { + throws IOException { if (null == keyClass) { keyClass = HLog.getKeyClass(conf); @@ -87,10 +88,63 @@ public class SequenceFileLogWriter implements HLog.Writer { new DefaultCodec(), null, new Metadata()); + + makeSequenceFilePrivateFSDataOutputStreamAccessible(); + this.syncFs = getSyncFs(); + this.hflush = getHFlush(); + } - // Get at the private FSDataOutputStream inside in SequenceFile so we can - // call sync on it. Make it accessible. Stash it aside for call up in - // the sync method. + /** + * Now do dirty work to see if syncFs is available on the backing this.writer. + * It will be available in branch-0.20-append and in CDH3. + * @return The syncFs method or null if not available. + * @throws IOException + */ + private Method getSyncFs() + throws IOException { + Method m = null; + try { + // function pointer to writer.syncFs() + m = this.writer.getClass().getMethod("syncFs", new Class []{}); + } catch (SecurityException e) { + throw new IOException("Failed test for syncfs", e); + } catch (NoSuchMethodException e) { + // Not available + } + if (LOG.isDebugEnabled()) { + LOG.debug((m != null)? "Using syncFs -- HDFS-200": + "syncFs -- HDFS-200 -- not available"); + } + return m; + } + + /** + * See if hflush (0.21 and 0.22 hadoop) is available. + * @return The hflush method or null if not available. + * @throws IOException + */ + private Method getHFlush() + throws IOException { + Method m = null; + try { + Class c = getDFSClientOutputStream().getClass(); + m = c.getMethod("hflush", new Class []{}); + } catch (SecurityException e) { + throw new IOException("Failed test for hflush", e); + } catch (NoSuchMethodException e) { + // Ignore + } + if (LOG.isDebugEnabled()) { + LOG.debug((m != null)? "Using hflush": + "hflush not available"); + } + return m; + } + + // Get at the private FSDataOutputStream inside in SequenceFile so we can + // call sync on it. Make it accessible. + private void makeSequenceFilePrivateFSDataOutputStreamAccessible() + throws IOException { final Field fields [] = this.writer.getClass().getDeclaredFields(); final String fieldName = "out"; for (int i = 0; i < fields.length; ++i) { @@ -100,32 +154,13 @@ public class SequenceFileLogWriter implements HLog.Writer { fields[i].setAccessible(true); FSDataOutputStream out = (FSDataOutputStream)fields[i].get(this.writer); - this.dfsClient_out = out.getWrappedStream(); + this.dfsClient_out = out; break; } catch (IllegalAccessException ex) { throw new IOException("Accessing " + fieldName, ex); } } } - - // Now do dirty work to see if syncFs is available. - // Test if syncfs is available. - Method m = null; - boolean append = conf.getBoolean("dfs.support.append", false); - if (append) { - try { - // function pointer to writer.syncFs() - m = this.writer.getClass().getMethod("syncFs", new Class []{}); - } catch (SecurityException e) { - throw new IOException("Failed test for syncfs", e); - } catch (NoSuchMethodException e) { - // Not available - } - } - this.syncFs = m; - LOG.info((this.syncFs != null)? - "Using syncFs -- HDFS-200": - ("syncFs -- HDFS-200 -- not available, dfs.support.append=" + append)); } @Override @@ -146,6 +181,12 @@ public class SequenceFileLogWriter implements HLog.Writer { } catch (Exception e) { throw new IOException("Reflection", e); } + } else if (this.hflush != null) { + try { + this.hflush.invoke(getDFSClientOutputStream(), HLog.NO_ARGS); + } catch (Exception e) { + throw new IOException("Reflection", e); + } } } @@ -158,7 +199,7 @@ public class SequenceFileLogWriter implements HLog.Writer { * @return The dfsclient out stream up inside SF.Writer made accessible, or * null if not available. */ - public OutputStream getDFSCOutputStream() { + public OutputStream getDFSClientOutputStream() { return this.dfsClient_out; } -} +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/util/InfoServer.java b/src/main/java/org/apache/hadoop/hbase/util/InfoServer.java index 6ed9fe6..9d85d45 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/InfoServer.java +++ b/src/main/java/org/apache/hadoop/hbase/util/InfoServer.java @@ -20,16 +20,18 @@ package org.apache.hadoop.hbase.util; +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.URL; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.http.HttpServer; import org.mortbay.jetty.handler.ContextHandlerCollection; import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.DefaultServlet; -import java.io.IOException; -import java.net.URL; -import java.util.Map; - /** * Create a Jetty embedded server to answer http requests. The primary goal * is to serve up status information for the server. @@ -39,6 +41,8 @@ import java.util.Map; * "/" -> the jsp server code from (src/hbase-webapps/) */ public class InfoServer extends HttpServer { + private final Configuration config; + /** * Create a status server on the given port. * The jsp scripts are taken from src/hbase-webapps/name. @@ -49,15 +53,36 @@ public class InfoServer extends HttpServer { * increment by 1 until it finds a free port. * @throws IOException e */ - public InfoServer(String name, String bindAddress, int port, boolean findPort) + public InfoServer(String name, String bindAddress, int port, boolean findPort, + final Configuration c) throws IOException { super(name, bindAddress, port, findPort, HBaseConfiguration.create()); webServer.addHandler(new ContextHandlerCollection()); + this.config = c; } protected void addDefaultApps(ContextHandlerCollection parent, String appDir) throws IOException { - super.addDefaultApps(parent, appDir); + // Figure which super method to call. If 0.22 hadoop httpservlet, need to + // pass the configuration when I call the super addDefaultApps. + Method [] allMethods = this.getClass().getSuperclass().getDeclaredMethods(); + for (Method m: allMethods) { + if (!m.getName().equals("addDefaultApps")) continue; + Class [] parameterTypes = m.getParameterTypes(); + if (parameterTypes.length == 2) { + try { + m.invoke(this, parent, appDir); + } catch (Exception e) { + throw new IOException("Failed reflection", e); + } + } else if (parameterTypes.length == 3) { + try { + m.invoke(this, parent, appDir, this.config); + } catch (Exception e) { + throw new IOException("Failed reflection", e); + } + } + } // Must be same as up in hadoop. final String logsContextPath = "/logs"; // Now, put my logs in place of hadoops... disable old one first. @@ -82,24 +107,6 @@ public class InfoServer extends HttpServer { } } - /** - * Get the pathname to the path files. - * @return the pathname as a URL - */ - @Override - protected String getWebAppsPath() throws IOException { - // Hack: webapps is not a unique enough element to find in CLASSPATH - // We'll more than likely find the hadoop webapps dir. So, instead - // look for the 'master' webapp in the webapps subdir. That should - // get us the hbase context. Presumption is that place where the - // master webapp resides is where we want this InfoServer picking up - // web applications. - final String master = "master"; - String p = getWebAppDir(master); - // Now strip master + the separator off the end of our context - return p.substring(0, p.length() - (master.length() + 1/* The separator*/)); - } - private static String getWebAppsPath(final String path) throws IOException { URL url = InfoServer.class.getClassLoader().getResource(path); diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index babd788..0981553 100644 --- a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.zookeeper.ZooKeeper; @@ -1233,7 +1234,9 @@ public class HBaseTestingUtility { Field field = this.dfsCluster.getClass().getDeclaredField("nameNode"); field.setAccessible(true); NameNode nn = (NameNode)field.get(this.dfsCluster); - nn.namesystem.leaseManager.setLeasePeriod(100, 50000); + field = nn.getClass().getDeclaredField("namesystem"); + FSNamesystem namesystem = (FSNamesystem)field.get(nn); + namesystem.leaseManager.setLeasePeriod(100, 50000); } /** diff --git a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java index f6a7210..7fed242 100644 --- a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java +++ b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java @@ -174,8 +174,8 @@ public class TestHFileOutputFormat { try { Job job = new Job(conf); FileOutputFormat.setOutputPath(job, dir); - context = new TaskAttemptContext(job.getConfiguration(), - new TaskAttemptID()); + // FIX!!!! + context = org.apache.hadoop.hbase.mapreduce.hadoopbackport.InputSampler.getTaskAttemptContext(job); HFileOutputFormat hof = new HFileOutputFormat(); writer = hof.getRecordWriter(context); final byte [] b = Bytes.toBytes("b"); @@ -484,8 +484,8 @@ public class TestHFileOutputFormat { setupRandomGeneratorMapper(job); HFileOutputFormat.configureIncrementalLoad(job, table); FileOutputFormat.setOutputPath(job, dir); - context = new TaskAttemptContext(job.getConfiguration(), - new TaskAttemptID()); + // FIX + context = org.apache.hadoop.hbase.mapreduce.hadoopbackport.InputSampler.getTaskAttemptContext(job); HFileOutputFormat hof = new HFileOutputFormat(); writer = hof.getRecordWriter(context);