Index: conf/hbase-env.sh =================================================================== --- conf/hbase-env.sh (revision 1311171) +++ conf/hbase-env.sh (working copy) @@ -30,6 +30,13 @@ # The maximum amount of heap to use, in MB. Default is 1000. # export HBASE_HEAPSIZE=1000 +# The remove directory in HDFS to fetch the new Jar file. +# e.g. hdfs://XXXX:9000/hbase/.classloader +# export HBASE_CLASSLOADER_REMOTE_DIRECTORY= + +# The local directory which used to write into new Jar file. +# export HBASE_CLASSLOADER_LOCAL_DIRECTORY= + # Extra Java runtime options. # Below are what we set by default. May only work with SUN JVM. # For more on why as well as other possible settings, @@ -39,6 +46,10 @@ # Uncomment below to enable java garbage collection logging in the .out file. # export HBASE_OPTS="$HBASE_OPTS -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps $HBASE_GC_OPTS" +# export HBASE_OPTS="$HBASE_OPTS -Djava.system.class.loader=org.apache.hadoop.hbase.util.HdfsClassLoader" +# export HBASE_OPTS="$HBASE_OPTS -Dorg.apache.hadoop.hbase.util.remoteDirectory=${HBASE_CLASSLOADER_REMOTE_DIRECTORY}" +# export HBASE_OPTS="$HBASE_OPTS -Dorg.apache.hadoop.hbase.util.localDirectory=${HBASE_CLASSLOADER_LOCAL_DIRECTORY}" + # Uncomment below (along with above GC logging) to put GC information in its own logfile (will set HBASE_GC_OPTS) # export HBASE_USE_GC_LOGFILE=true Index: src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (revision 1311171) +++ src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (working copy) @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.ipc.RemoteException; @@ -55,6 +56,7 @@ import org.apache.zookeeper.KeeperException; import java.io.Closeable; +import java.io.File; import java.io.IOException; import java.io.InterruptedIOException; import java.net.SocketTimeoutException; @@ -1752,4 +1754,32 @@ function.master.close(); } } + + /** + * Upload a local Jar file to remote hdfs directory. + * + * @param localFileStr + * Local Jar file. File extension name must be ".jar". + * @param remoteHdfsDir + * Remote HDFS directory under hbase.rootdir. + * @throws IOException + */ + public void uploadLocalJarToHDFS(String localFileStr, String remoteHdfsDir) + throws IOException { + File localFile = new File(localFileStr); + if (!localFile.isFile() + || !localFileStr.endsWith(HConstants.JAR_FILE_EXTENSION)) { + LOG.warn("Wrong jar file name."); + return; + } + // Change the file name in HDFS to avoid overwriting the + // file with the same name. + StringBuilder remoteName = new StringBuilder(""); + remoteName.append(localFile.getName().split(".jar")[0]); + remoteName.append("-"); + remoteName.append(System.currentTimeMillis()); + remoteName.append(HConstants.JAR_FILE_EXTENSION); + FSUtils.uploadFile(this.conf, localFileStr, remoteHdfsDir, + remoteName.toString()); + } } Index: src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1311171) +++ src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -653,7 +653,8 @@ /** Region in Transition metrics threshold time */ public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD="hbase.metrics.rit.stuck.warning.threshold"; - + public static final String JAR_FILE_EXTENSION = ".jar"; + private HConstants() { // Can't be instantiated with this ctor. } Index: src/main/java/org/apache/hadoop/hbase/util/ClassPathLocalizer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/ClassPathLocalizer.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/util/ClassPathLocalizer.java (revision 0) @@ -0,0 +1,338 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.StringTokenizer; + +/** + * + * The classpath localizer knows how to localize (jar) files located on a hadoop + * directory to a local directory. It then lists the files and returns a list of + * URLs which can be added to a {@link java.net.URLClassLoader} + *

+ * The localizer requiers several system properties to be present: + *

    + *
  1. + */ +public class ClassPathLocalizer { + + /** + * The source of the classpath directory - on hadoop. e.g. + * "hdfs://XXXX:9000/hbase/.classloader" + */ + public static final String REMOTE_DIRECTORY_PROPERTY_NAME = "org.apache.hadoop.hbase.util.remoteDirectory"; + /** + * The local base directory on the local machine. Different local directories + * should be set if several classloaders are used on the same machine. + */ + public static final String LOCAL_DIRECTORY_PROPERTY_NAME = "org.apache.hadoop.hbase.util.localDirectory"; + /** + * A package prefix which would trigger classloading. Defaults to + * org.apache.hadoop.hbase.util.. + */ + public static final String TRIGGERING_PREFIX_PROPERTY_NAME = "org.apache.hadoop.hbase.util.triggeringPackagePrefix"; + /** + * List of blocked classes separated by commas - blocked classes won't trigger + * localization even if mathcing the triggering package prefix. + */ + public static final String BLOCKED_CLASSES_PROPERTY_NAME = "org.apache.hadoop.hbase.util.blockedClasses"; + /** + * Whether to print messages to the console or not. Default to true. + */ + public static final String VERBOSE_PROPERTY_NAME = "org.apache.hadoop.hbase.util..verbose"; + /** + * A boolean indicating whether or not to clear the local director on startup. + */ + public static final String CLEAR_PROPERTY_NAME = "org.apache.hadoop.hbase.util..clearLocalDirectory"; + + /** + * The default name of the local directory (leaf name): {@value}. + */ + public static final String DEFAULT_LOCAL_DIRECTORY_NAME = ".hadoopClassLoader"; + + private static final String CRC_EXTENSION = ".crc"; + + private static boolean verbose = Boolean.valueOf(System.getProperty( + VERBOSE_PROPERTY_NAME, "true")); + + private String remoteDirectory; + private String[] triggeringPrefixs; + private String[] blockedClasses; + private File localDirectory; + + /** + * Initialize - verify the configuration and cleanup previous loading + * locations. + */ + public void init() { + remoteDirectory = System.getProperty(REMOTE_DIRECTORY_PROPERTY_NAME); + if (remoteDirectory == null || remoteDirectory.isEmpty()) { + throw new IllegalStateException("Please define a system property named " + + REMOTE_DIRECTORY_PROPERTY_NAME); + } + localDirectory = new File(System.getProperty(LOCAL_DIRECTORY_PROPERTY_NAME, + new File(System.getProperty("user.home"), DEFAULT_LOCAL_DIRECTORY_NAME) + .getAbsolutePath())); + if (!(localDirectory.isDirectory() || localDirectory.mkdirs())) { + throw new RuntimeException("Can't access or create " + localDirectory); + } + + triggeringPrefixs = toList( + System + .getProperty(TRIGGERING_PREFIX_PROPERTY_NAME, + "org.apache.hadoop.hbase.filter.,org.apache.hadoop.hbase.coprocessor."), + ","); + blockedClasses = toList( + System.getProperty(BLOCKED_CLASSES_PROPERTY_NAME, "BeanInfo"), ","); + if (Boolean.valueOf(System.getProperty(CLEAR_PROPERTY_NAME, "false"))) { + try { + clear(localDirectory); + } catch (IOException e) { + HdfsClassLoader.logError("Error while clearing " + localDirectory, e); + } + } + } + + /** + * Run classpath localization in a thread (to bypass synchrnization deadlock). + * + * @return the thread (already started) + */ + public Thread runInThread() { + LocalizerThread thread = new LocalizerThread(); + thread.start(); + return thread; + } + + /** + * Gets all files from hadoop (if required) and lists the local directory to + * provide a list of URLs to add to the classpath. + * + * @return a list of URLs to extend the classpath with + * + * @throws Exception + * in case of a failure somewhere + */ + public List run() throws Exception { + FileSystem fileSystem = FileSystem.get(new URI(remoteDirectory), + HBaseConfiguration.create()); + fetchInSequence(fileSystem); + return getExtendedClasspath(fileSystem); + } + + /** + * Checks whether a given classname is a trigger for classpath localization. + * Not all classes should trigger classpath localization since there are many + * cases where classes are not found during jvm/application bootstrap, + * bootstrapping should be called only when a likely candidate is missed. + * + * @param name + * the class name to check. + * + * @return true if the given classname triggers + */ + public boolean isATriggeringClass(String name) { + if (name == null) { + return false; + } + boolean isATrigger = false; + OUTER: for (String prefix : triggeringPrefixs) { + if (name.startsWith(prefix)) { + isATrigger = true; + INNER: for (String blockedClass : blockedClasses) { + if (name.endsWith(blockedClass)) { + isATrigger = false; + break OUTER; + } + } + break OUTER; + } + } + if (isATrigger) { + HdfsClassLoader.log("Triggerring loading with class " + name); + } + return isATrigger; + } + + /** + * Lists the local directory to which classes where fetched and returns a + * result in a form of a list of URLs. + * + * @param fileSystem + * the file system to use + * + * @return a list of urls + * + * @throws MalformedURLException + * should never occur + */ + List getExtendedClasspath(FileSystem fileSystem) throws IOException { + FileStatus[] fileStatusArray = fileSystem.listStatus(new Path( + remoteDirectory), new CRCPathFilter()); + if (fileStatusArray == null || fileStatusArray.length == 0) { + return Collections.emptyList(); + } + + final Set libNames = new HashSet(); + for (FileStatus fileStatus : fileStatusArray) { + if (!fileStatus.isDir()) { + libNames.add(fileStatus.getPath().getName()); + } + } + + List extendedClasspath = new LinkedList(); + final File[] files = localDirectory.listFiles(new FileFilter() { + @Override + public boolean accept(File file) { + return file.isFile() && libNames.contains(file.getName()) + && !file.getName().endsWith(SingleFileCopier.CHECKSUM_EXTENSION) + && !file.getName().endsWith(CRC_EXTENSION); + } + }); + if (files != null) { + for (File file : files) { + HdfsClassLoader.log("Adding " + file + " to extended classpath"); + // For converting files to URLs one should use java.io.File.toURI() + // followed by java.net.URI.toURL() instead of just + // java.io.File.toURL(), as the latter method has problems with + // converting special characters. + extendedClasspath.add(file.toURI().toURL()); + } + } + return extendedClasspath; + } + + /** + * Fetch the lib directory, one at a time. + * + * @param fileSystem + * the file system to use + * + * @throws IOException + * in case hadoop interaction fails + */ + void fetchInSequence(FileSystem fileSystem) throws IOException { + FileStatus[] fileStatusArray = fileSystem.listStatus(new Path( + remoteDirectory), new CRCPathFilter()); + if (fileStatusArray == null || fileStatusArray.length == 0) { + HdfsClassLoader.log("Directory " + remoteDirectory + + " is empty or doesn't exist"); + return; + } + List fileStatuses = new ArrayList( + Arrays.asList(fileStatusArray)); + boolean aFileWasFetched = false; + for (FileStatus fileStatus : fileStatuses) { + // Only copy the Jar file. + if (!fileStatus.isDir() + && fileStatus.getPath().toString() + .endsWith(HConstants.JAR_FILE_EXTENSION)) { + File file = new File(localDirectory, fileStatus.getPath().getName()); + new SingleFileCopier(fileSystem, fileStatus, file).copy(); + aFileWasFetched = true; + } + } + if (!aFileWasFetched) { + HdfsClassLoader.log("No files were found at " + remoteDirectory); + } + } + + private void clear(File localDirectory) throws IOException { + HdfsClassLoader.log("Clearing " + localDirectory.getAbsolutePath()); + if (localDirectory.exists()) { + if (localDirectory.isFile()) { + if (!localDirectory.delete()) { + throw new IOException("could not delete " + localDirectory); + } + } else { + for (File file : localDirectory.listFiles()) { + clear(file); + } + } + } + } + + private class CRCPathFilter implements PathFilter { + + @Override + public boolean accept(Path path) { + return !path.getName().endsWith(CRC_EXTENSION); + } + } + + private String[] toList(String property, String delims) { + StringTokenizer tokenizer = new StringTokenizer(property, delims); + List result = new ArrayList(); + while (tokenizer.hasMoreTokens()) { + result.add(tokenizer.nextToken().trim()); + } + return result.toArray(new String[result.size()]); + } + + /** + * A thread used to localize all classes. + */ + public class LocalizerThread extends Thread { + + private List urls = Collections.emptyList(); + + @Override + public void run() { + HdfsClassLoader.log("Fetching classpath from hadoop - START"); + try { + urls = ClassPathLocalizer.this.run(); + } catch (Throwable throwable) { + Throwable cause = throwable.getCause() != null ? throwable.getCause() + : throwable; + HdfsClassLoader.logError("Failed to localize classed from hadoop: ", + cause); + } + HdfsClassLoader.log("Fetching classpath from hadoop - DONE"); + } + + /** + * Gets the list of urls. + * + * @return the list of urls to localize. + */ + public List getUrls() { + return urls; + } + } +} \ No newline at end of file Property changes on: src\main\java\org\apache\hadoop\hbase\util\ClassPathLocalizer.java ___________________________________________________________________ Added: svn:needs-lock + * Index: src/main/java/org/apache/hadoop/hbase/util/FSUtils.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (revision 1311171) +++ src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (working copy) @@ -19,10 +19,14 @@ */ package org.apache.hadoop.hbase.util; +import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.EOFException; +import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -50,6 +54,7 @@ import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -1016,4 +1021,31 @@ if (status == null || status.length < 1) return null; return status; } + + /** + * Upload a local file to remote hdfs directory. + * + * @param conf + * Configuration. + * @param localFileStr + * local file path + file name. + * @param remoteHdfsDir + * The remote directory(under hbase.rootdir) to store the file. + * @param remoteFileName + * Specify the remote file name. + * @throws IOException + */ + public static void uploadFile(final Configuration conf, + final String localFileStr, final String remoteHdfsDir, + final String remoteFileName) throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + Path remotePath = new Path(rootDir, remoteHdfsDir); + FileSystem fs = FileSystem.get(conf); + if (!fs.exists(remotePath)) { + fs.mkdirs(remotePath); + } + InputStream in = new BufferedInputStream(new FileInputStream(localFileStr)); + OutputStream out = fs.create(new Path(remotePath, remoteHdfsDir)); + IOUtils.copyBytes(in, out, conf); + } } Index: src/main/java/org/apache/hadoop/hbase/util/HdfsClassLoader.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/HdfsClassLoader.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/util/HdfsClassLoader.java (revision 0) @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import sun.misc.URLClassPath; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLClassLoader; +import java.security.AccessControlContext; +import java.util.List; + +/** + * This class loader is able to load classes from hadoop. It accepts a + * configuration parameter which is the hadoop directory to localize. When the + * default loaders can't load a class (and not before that) it localizes the + * hadoop directory, loads it and retries. In case the hadoop directory has + * already been localized it uses the local copy. + *

    + */ +public class HdfsClassLoader extends URLClassLoader { + public static final String SYSTEM_CLASSLOADER_NAME = "java.system.class.loader"; + public static final String VERBOSE_PROPERTY_NAME = "org.apache.hadoop.hbase.classloader.verbose"; + public static boolean USE_AS_DEFAULT = false; + + static { + if (System.getProperty(SYSTEM_CLASSLOADER_NAME, "").compareTo( + HdfsClassLoader.class.getName()) == 0) { + USE_AS_DEFAULT = true; + } + } + + private static final long WAIT_TIME_MILLIS = 50L; + private static boolean verbose = Boolean.valueOf(System.getProperty( + VERBOSE_PROPERTY_NAME, "true")); + + private Class localizerClass; + private Object classPathLocalizer; + + /** + * Default constructor. + * + * @throws Exception + * in case of an error. + */ + public HdfsClassLoader() throws Exception { + this(ClassLoader.getSystemClassLoader()); + } + + /** + * Create a new instance of Hdfs class loader. + * + * @param parent + * the parent class laoder. + * + * @throws Exception + * in case of an error. + */ + public HdfsClassLoader(java.lang.ClassLoader parent) throws Exception { + super(new URL[0], parent); + Field ucpField = URLClassLoader.class.getDeclaredField("ucp"); + Field accField = URLClassLoader.class.getDeclaredField("acc"); + ucpField.setAccessible(true); + accField.setAccessible(true); + + URLClassPath ucp = (URLClassPath) ucpField.get(parent); + + AccessControlContext acc = (AccessControlContext) accField.get(parent); + + ucpField.set(this, ucp); + accField.set(this, acc); + + ucpField.set(parent, sun.misc.Launcher.getBootstrapClassPath()); + } + + /** + * Extends the class path on first attempt to find a class. Class path in + * intentionally not extended from the find resource method, the assumption is + * that only a transported class may use one of the resources in the + * transported jars. + * + * @param name + * the class nmae + * + * @return the class object + * + * @throws ClassNotFoundException + * in case not found + */ + @SuppressWarnings({ "unchecked" }) + @Override + protected Class findClass(String name) throws ClassNotFoundException { + try { + return super.findClass(name); + } catch (ClassNotFoundException cne) { + log("Parent ClassLoader failed to load class " + name); + /** + * No need to synchronize since class loading is synchronized anyway (I + * hope :-) + */ + boolean isATriggeringClass; + + try { + initLocalizer(); + isATriggeringClass = callIsATriggeringClass(localizerClass, + classPathLocalizer, name); + } catch (Throwable throwable) { + Throwable cause = throwable.getCause() != null ? throwable.getCause() + : throwable; + logError( + "Failed initiazling or finding out whether the class is a trigger: ", + cause); + throw new ClassNotFoundException(throwable.getMessage(), cause); + } + if (isATriggeringClass) { + + try { + log("Trying to load class " + name + " from hadoop"); + + Object thread = call(localizerClass, classPathLocalizer, + "runInThread"); + do { + wait(WAIT_TIME_MILLIS); + } while ((Boolean) call(thread.getClass(), thread, "isAlive")); + + for (URL url : (List) call(thread.getClass(), thread, "getUrls")) { + addURL(url); + } + } catch (Throwable throwable) { + Throwable cause = throwable.getCause() != null ? throwable.getCause() + : throwable; + logError("Failed to localize classed from hadoop: ", cause); + throw new ClassNotFoundException(throwable.getMessage(), cause); + } + + return super.findClass(name); + } else { + throw cne; + } + } + } + + private void initLocalizer() throws ClassNotFoundException, + InstantiationException, IllegalAccessException, + InvocationTargetException, NoSuchMethodException { + if (localizerClass == null) { + localizerClass = loadClass("org.apache.hadoop.hbase.util.ClassPathLocalizer"); + classPathLocalizer = localizerClass.getConstructor((Class[]) null) + .newInstance((Object[]) null); + call(localizerClass, classPathLocalizer, "init"); + } + } + + private static boolean callIsATriggeringClass(Class localizerClass, + Object o, String name) throws NoSuchMethodException, + IllegalAccessException, InvocationTargetException { + Method isATriggeringClassMethod = localizerClass.getMethod( + "isATriggeringClass", String.class); + return (Boolean) isATriggeringClassMethod.invoke(o, name); + } + + private static Object call(Class clz, Object obj, String methodName) + throws SecurityException, NoSuchMethodException, + IllegalArgumentException, IllegalAccessException, + InvocationTargetException { + Method method = clz.getMethod(methodName); + return method.invoke(obj); + } + + public static void log(String message) { + if (verbose) { + System.out.printf("%s\n", message); + } + } + + public static void logError(String message, Throwable throwable) { + if (verbose) { + System.err.printf("%s: %s\n", message, throwable); + throwable.printStackTrace(); + } + } +} \ No newline at end of file Property changes on: src\main\java\org\apache\hadoop\hbase\util\HdfsClassLoader.java ___________________________________________________________________ Added: svn:needs-lock + * Index: src/main/java/org/apache/hadoop/hbase/util/SingleFileCopier.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/SingleFileCopier.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/util/SingleFileCopier.java (revision 0) @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.fs.FileChecksum; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; +import org.apache.hadoop.fs.Path; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.concurrent.Callable; + +/** + * Copies a single file to the local file system from hadoop. + */ +public class SingleFileCopier implements Callable { + + static final String CHECKSUM_EXTENSION = ".hcrc"; + private FileSystem fileSystem; + private FileStatus source; + private File destination; + + /** + * Creates a new single file copier. + * + * @param fileSystem + * the file system to use. + * @param source + * the source (on the file system) + * @param destination + * the local destinations + */ + public SingleFileCopier(FileSystem fileSystem, FileStatus source, + File destination) { + this.fileSystem = fileSystem; + this.source = source; + this.destination = destination; + } + + /** + * Copies the source to the destination. + * + * @throws IOException + * in case of failure. + */ + public void copy() throws IOException { + this.destination.getParentFile().mkdirs(); + + FileChecksum existingChecksum = readChecksum(this.destination); + FileChecksum fileChecksum = getFileChecksum(); + if (fileChecksum == null) { + throw new IOException("Null checksum for " + this.source.getPath()); + } + if (this.source.getLen() == this.destination.length() + && fileChecksum.equals(existingChecksum)) { + HdfsClassLoader.log("Skipping copy of File " + destination.getPath() + + " - exists and valid."); + return; + } + Path srcPath = this.source.getPath(); + Path dstPath = new Path(this.destination.toURI().toURL().toString()); + + this.fileSystem.copyToLocalFile(srcPath, dstPath); + writeChecksum(fileChecksum, destination); + warnOnDifferentLength(); + HdfsClassLoader.log("Successfully fetched " + destination); + } + + private FileChecksum getFileChecksum() throws IOException { + return fileSystem.exists(source.getPath()) ? fileSystem + .getFileChecksum(source.getPath()) : null; + } + + private void writeChecksum(FileChecksum fileChecksum, File destination) + throws IOException { + File checksumFile = new File(destination.getParent(), destination.getName() + + CHECKSUM_EXTENSION); + DataOutputStream dataOut = new DataOutputStream(new BufferedOutputStream( + new FileOutputStream(checksumFile))); + try { + fileChecksum.write(dataOut); + } finally { + dataOut.close(); + } + } + + private FileChecksum readChecksum(File destination) throws IOException { + File checksumFile = new File(destination.getParent(), destination.getName() + + CHECKSUM_EXTENSION); + if (checksumFile.exists()) { + FileChecksum fileChecksum = new MD5MD5CRC32FileChecksum(); + DataInputStream dataIn = new DataInputStream(new BufferedInputStream( + new FileInputStream(checksumFile))); + try { + fileChecksum.readFields(dataIn); + } finally { + dataIn.close(); + } + return fileChecksum; + } + return null; + } + + private void warnOnDifferentLength() { + // Prepare message here to avoid an execution path which doesn't work on + // exception + String message = "HDfsClassLoader: Destination " + destination + ":" + + destination.length() + " differs from source length " + + source.getPath().getName() + ":" + source.getLen(); + if (destination.length() != source.getLen()) { + HdfsClassLoader.logError(message, null); + } + } + + /** + * {@inheritDoc} + *

    + * Returns the destination file, for the caller's convenience. + */ + @Override + public File call() throws Exception { + copy(); + return destination; + } +} \ No newline at end of file Property changes on: src\main\java\org\apache\hadoop\hbase\util\SingleFileCopier.java ___________________________________________________________________ Added: svn:needs-lock + *