Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java (revision 0) @@ -0,0 +1,409 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io; + +import java.util.ArrayList; +import java.util.Collection; + +import java.io.IOException; +import java.io.InputStream; +import java.io.FileNotFoundException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.FSUtils; + +/** + * FileLink describes a link to a file. + * + * The link checks first in the original path, if it is not present + * it fallbacks to the alternative locations. + */ +@InterfaceAudience.Private +public class FileLink { + static final Log LOG = LogFactory.getLog(FileLink.class); + + public static final String BACK_REFERENCES_DIRECTORY_PREFIX = ".links-"; + + /** + * FileLink InputStream that handles the switch between the original path + * and the alternative locations, when the file is moved. + */ + private static class FileLinkInputStream extends InputStream + implements Seekable, PositionedReadable { + private FSDataInputStream in = null; + private Path currentPath = null; + private long pos = 0; + + private final FileLink fileLink; + private final int bufferSize; + private final FileSystem fs; + + public FileLinkInputStream(final FileSystem fs, final FileLink fileLink) + throws IOException { + this(fs, fileLink, fs.getConf().getInt("io.file.buffer.size", 4096)); + } + + public FileLinkInputStream(final FileSystem fs, final FileLink fileLink, int bufferSize) + throws IOException { + this.bufferSize = bufferSize; + this.fileLink = fileLink; + this.fs = fs; + + this.in = tryOpen(); + } + + @Override + public int read() throws IOException { + int res; + try { + res = in.read(); + } catch (FileNotFoundException e) { + res = tryOpen().read(); + } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt() + res = tryOpen().read(); + } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt() + res = tryOpen().read(); + } + if (res > 0) pos += 1; + return res; + } + + @Override + public int read(byte b[]) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(byte b[], int off, int len) throws IOException { + int n; + try { + n = in.read(b, off, len); + } catch (FileNotFoundException e) { + n = tryOpen().read(b, off, len); + } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt() + n = tryOpen().read(b, off, len); + } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt() + n = tryOpen().read(b, off, len); + } + if (n > 0) pos += n; + assert(in.getPos() == pos); + return n; + } + + @Override + public int read(long position, byte[] buffer, int offset, int length) throws IOException { + int n; + try { + n = in.read(position, buffer, offset, length); + } catch (FileNotFoundException e) { + n = tryOpen().read(position, buffer, offset, length); + } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt() + n = tryOpen().read(position, buffer, offset, length); + } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt() + n = tryOpen().read(position, buffer, offset, length); + } + return n; + } + + @Override + public void readFully(long position, byte[] buffer) throws IOException { + readFully(position, buffer, 0, buffer.length); + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + try { + in.readFully(position, buffer, offset, length); + } catch (FileNotFoundException e) { + tryOpen().readFully(position, buffer, offset, length); + } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt() + tryOpen().readFully(position, buffer, offset, length); + } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt() + tryOpen().readFully(position, buffer, offset, length); + } + } + + @Override + public long skip(long n) throws IOException { + long skipped; + + try { + skipped = in.skip(n); + } catch (FileNotFoundException e) { + skipped = tryOpen().skip(n); + } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt() + skipped = tryOpen().skip(n); + } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt() + skipped = tryOpen().skip(n); + } + + if (skipped > 0) pos += skipped; + return skipped; + } + + @Override + public int available() throws IOException { + try { + return in.available(); + } catch (FileNotFoundException e) { + return tryOpen().available(); + } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt() + return tryOpen().available(); + } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt() + return tryOpen().available(); + } + } + + @Override + public void seek(long pos) throws IOException { + try { + in.seek(pos); + } catch (FileNotFoundException e) { + tryOpen().seek(pos); + } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt() + tryOpen().seek(pos); + } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt() + tryOpen().seek(pos); + } + this.pos = pos; + } + + @Override + public long getPos() throws IOException { + return pos; + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + boolean res; + try { + res = in.seekToNewSource(targetPos); + } catch (FileNotFoundException e) { + res = tryOpen().seekToNewSource(targetPos); + } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt() + res = tryOpen().seekToNewSource(targetPos); + } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt() + res = tryOpen().seekToNewSource(targetPos); + } + if (res) pos = targetPos; + return res; + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public synchronized void mark(int readlimit) { + } + + @Override + public synchronized void reset() throws IOException { + throw new IOException("mark/reset not supported"); + } + + @Override + public boolean markSupported() { + return false; + } + + /** + * Try to open one the file from one of the available locations. + * @return FSDataInputStream stream of the opened file link + * @throws IOException on unexpected error, or file not found. + */ + private FSDataInputStream tryOpen() throws IOException { + for (Path path: fileLink.getLocations()) { + if (path.equals(currentPath)) continue; + try { + in = fs.open(path, bufferSize); + in.seek(pos); + assert(in.getPos() == pos) : "Link unable to seek to the right position=" + pos; + if (LOG.isTraceEnabled()) { + if (currentPath != null) { + LOG.trace("link open path=" + path); + } else { + LOG.trace("link switch from path=" + currentPath + " to path=" + path); + } + } + currentPath = path; + return(in); + } catch (FileNotFoundException e) { + // Try another file location + } + } + throw new FileNotFoundException("Unable to open link: " + fileLink); + } + } + + private Collection locations = null; + + public FileLink() { + this.locations = null; + } + + /** + * @param originPath Original location of the File Link + * @param alternativePaths Alternative locations of the File Link + * @throws IOException on unexpected error. + */ + public FileLink(Path originPath, Path... alternativePaths) { + setLocations(originPath, alternativePaths); + } + + /** + * @param locations Locations of the File Link + * @throws IOException on unexpected error. + */ + public FileLink(Collection locations) { + this.locations = locations; + } + + /** + * @return the locations of the file link. + */ + public Collection getLocations() { + return locations; + } + + public String toString() { + StringBuilder str = new StringBuilder(getClass().getName()); + str.append(" locations=["); + int i = 0; + for (Path location: locations) { + if (i++ > 0) str.append(", "); + str.append(location.toString()); + } + str.append("]"); + return str.toString(); + } + + /** + * @return the path of the first available link. + */ + public Path getAvailablePath(FileSystem fs) throws IOException { + for (Path path: locations) { + if (fs.exists(path)) { + return path; + } + } + throw new FileNotFoundException("Unable to open link: " + this); + } + + /** + * Get the FileStatus of the referenced file. + * + * @param fs {@link FileSystem} on which to get the file status + * @return InputStream for the hfile link. + * @throws IOException on unexpected error. + */ + public FileStatus getFileStatus(FileSystem fs) throws IOException { + for (Path path: locations) { + try { + return fs.getFileStatus(path); + } catch (FileNotFoundException e) { + // Try another file location + } + } + throw new FileNotFoundException("Unable to open link: " + this); + } + + /** + * Open the FileLink for read. + * It uses the FileLinkInputStream to be able to switch between the + * original path and alternative locations when the file is moved. + * + * @param fs {@link FileSystem} on which to open the FileLink + * @return InputStream for reading the file link. + * @throws IOException on unexpected error. + */ + public FSDataInputStream open(final FileSystem fs) throws IOException { + return new FSDataInputStream(new FileLinkInputStream(fs, this)); + } + + /** + * Open the FileLink for read. + * It uses the FileLinkInputStream to be able to switch between the + * original path and alternative locations when the file is moved. + * + * @param fs {@link FileSystem} on which to open the FileLink + * @param bufferSize the size of the buffer to be used. + * @return InputStream for reading the file link. + * @throws IOException on unexpected error. + */ + public FSDataInputStream open(final FileSystem fs, int bufferSize) throws IOException { + return new FSDataInputStream(new FileLinkInputStream(fs, this)); + } + + /** + * NOTE: This method must be used only in the constructor! + * It creates a List with the specified locations for the link. + */ + protected void setLocations(Path originPath, Path... alternativePaths) { + this.locations = new ArrayList(); + this.locations.add(originPath); + for (int i = 0; i < alternativePaths.length; i++) { + this.locations.add(alternativePaths[i]); + } + } + + /** + * Get the directory to store the link back references + * + * @param storeDir Root directory for the link reference folder + * @param fileName File Name with links + * @return Path for the link back references. + */ + public static Path getBackReferencesDir(final Path storeDir, final String fileName) { + return new Path(storeDir, BACK_REFERENCES_DIRECTORY_PREFIX + fileName); + } + + /** + * Get the referenced file name from the reference link directory path. + * + * @param dirPath Link references directory path + * @return Name of the file referenced + */ + public static String getBackReferenceFileName(final Path dirPath) { + return dirPath.getName().substring(BACK_REFERENCES_DIRECTORY_PREFIX.length()); + } + + /** + * Checks if the specified directory path is a back reference links folder. + * + * @param dirPath Directory path to verify + * @return True if the specified directory is a link references folder + */ + public static boolean isBackReferencesDir(final Path dirPath) { + if (dirPath == null) return false; + return dirPath.getName().startsWith(BACK_REFERENCES_DIRECTORY_PREFIX); + } +} + Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java (revision 0) @@ -0,0 +1,289 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io; + +import java.io.IOException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +import org.apache.commons.codec.binary.Hex; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.hadoop.hbase.util.FSUtils; + +/** + * HFileLink describes a link to an hfile. + * + * An hfile can be in /hbase//// + * or can be in /hbase//
/// + * ("hbase.table.archive.directory" conf property) + * + * The link checks first in the original path if it is not present + * it fallbacks to the archived path. + */ +@InterfaceAudience.Private +public class HFileLink extends FileLink { + /** Define the HFile Link name pattern in the form of: hfile-region-table */ + public static final Pattern LINK_NAME_PARSER = + Pattern.compile("^([0-9a-f\\.]+)-([0-9a-f]+)-([a-zA-Z_0-9]+[a-zA-Z0-9_\\-\\.]*)$"); + + private final Path archivePath; + private final Path originPath; + + /** + * @param conf {@link Configuration} from which to extract specific archive locations + * @param path The path of the HFile Link. + * @throws IOException on unexpected error. + */ + public HFileLink(Configuration conf, Path path) throws IOException { + Path hfilePath = getRelativeTablePath(path); + + this.originPath = new Path(FSUtils.getRootDir(conf), hfilePath); + this.archivePath = new Path(HFileArchiveUtil.getArchivePath(conf), hfilePath); + setLocations(originPath, archivePath); + } + + public HFileLink(Path originPath, Path archivePath) { + this.originPath = originPath; + this.archivePath = archivePath; + setLocations(originPath, archivePath); + } + + /** + * @return the origin path of the hfile. + */ + public Path getOriginPath() { + return this.originPath; + } + + /** + * @return the path of the archived hfile. + */ + public Path getArchivePath() { + return this.archivePath; + } + + /** + * @param p Path to check. + * @return True if the path is a HFileLink. + */ + public static boolean isHFileLink(final Path path) { + return isHFileLink(path.getName()); + } + + + /** + * @param fileName File name to check. + * @return True if the path is a HFileLink. + */ + public static boolean isHFileLink(String fileName) { + Matcher m = LINK_NAME_PARSER.matcher(fileName); + if (!m.matches()) return false; + + return m.groupCount() > 2 && m.group(2) != null && m.group(3) != null; + } + + /** + * Get the HFileLink referenced path. + * + * @param fs {@link FileSystem} on which to check the HFileLink + * @param path HFileLink path + * @return Referenced path (original path or archived path) + * @throws IOException on unexpected error. + */ + public static Path getReferencedPath(FileSystem fs, final Path path) throws IOException { + return getReferencedPath(fs.getConf(), fs, path); + } + + /** + * Get the HFileLink referenced path. + * The returned path can be the "original" file path like: /hbase/table/region/cf/hfile + * or a path to the archived file like: /hbase/archive/table/region/cf/hfile + * + * @param fs {@link FileSystem} on which to check the HFileLink + * @param conf {@link Configuration} from which to extract specific archive locations + * @param path HFileLink path + * @return Referenced path (original path or archived path) + * @throws IOException on unexpected error. + */ + public static Path getReferencedPath(Configuration conf, FileSystem fs, final Path path) + throws IOException { + Path hfilePath = getRelativeTablePath(path); + + Path originPath = new Path(FSUtils.getRootDir(conf), hfilePath); + if (fs.exists(originPath)) { + return originPath; + } + + return new Path(HFileArchiveUtil.getArchivePath(conf), hfilePath); + } + + /** + * Convert a HFileLink path to a table relative path. + * e.g. the link: /hbase/test/0123/cf/abcd-4567-testtb + * becomes: /hbase/testtb/4567/cf/abcd + * + * @param path HFileLink path + * @return Relative table path + * @throws IOException on unexpected error. + */ + private static Path getRelativeTablePath(final Path path) { + // hfile-region-table + Matcher m = LINK_NAME_PARSER.matcher(path.getName()); + if (!m.matches()) { + throw new IllegalArgumentException(path.getName() + " is not a valid HFileLink name!"); + } + + // Convert the HFileLink name into a real table/region/cf/hfile path. + String hfileName = m.group(1); + String regionName = m.group(2); + String tableName = m.group(3); + String familyName = path.getParent().getName(); + return new Path(new Path(tableName, regionName), new Path(familyName, hfileName)); + } + + public static String getReferencedHFileName(String fileName) { + Matcher m = LINK_NAME_PARSER.matcher(fileName); + if (!m.matches()) { + throw new IllegalArgumentException(fileName + " is not a valid HFileLink name!"); + } + return(m.group(1)); + } + + /** + * Create a new HFileLink name + * + * @param hfileRegionInfo - Linked HFile Region Info + * @param hfileName - Linked HFile name + * @return file name of the HFile Link + */ + public static String createName(final HRegionInfo hfileRegionInfo, final String hfileName) { + return createName(hfileRegionInfo.getTableNameAsString(), + hfileRegionInfo.getEncodedName(), hfileName); + } + + /** + * Create a new HFileLink name + * + * @param tableName - Linked HFile table name + * @param regionName - Linked HFile region name + * @param hfileName - Linked HFile name + * @return file name of the HFile Link + */ + public static String createName(final String tableName, + final String regionName, final String hfileName) { + return String.format("%s-%s-%s", hfileName, regionName, tableName); + } + + /** + * Create a new HFileLink + * + * @param conf {@link Configuration} to read for the archive directory name + * @param fs {@link FileSystem} on which to write the HFileLink + * @param dstFamilyPath - Destination path (table/region/cf/) + * @param hfileRegionInfo - Linked HFile Region Info + * @param hfileName - Linked HFile name + * @return true if the file is created, otherwise the file exists. + * @throws IOException on unexpected error. + */ + public static boolean create(final Configuration conf, final FileSystem fs, + final Path dstFamilyPath, final HRegionInfo hfileRegionInfo, + final String hfileName) throws IOException { + String familyName = dstFamilyPath.getName(); + String regionName = dstFamilyPath.getParent().getName(); + String tableName = dstFamilyPath.getParent().getParent().getName(); + + String name = createName(hfileRegionInfo, hfileName); + String refName = createBackReferenceName(tableName, regionName); + + // Make sure the destination directory exists + fs.mkdirs(dstFamilyPath); + + // Make sure the FileLink reference directory exists + Path archiveStoreDir = HFileArchiveUtil.getStoreArchivePath(conf, + hfileRegionInfo.getTableNameAsString(), hfileRegionInfo.getEncodedName(), familyName); + Path backRefssDir = getBackReferencesDir(archiveStoreDir, hfileName); + fs.mkdirs(backRefssDir); + + // Create the reference for the link + Path backRefPath = new Path(backRefssDir, refName); + fs.createNewFile(backRefPath); + try { + // Create the link + return fs.createNewFile(new Path(dstFamilyPath, name)); + } catch (IOException e) { + LOG.error("couldn't create the link=" + name + " for " + dstFamilyPath, e); + // Revert the reference if the link creation failed + fs.delete(backRefPath, false); + throw e; + } + } + + /** + * Create the back reference name + */ + private static String createBackReferenceName(final String tableName, final String regionName) { + return regionName + "." + tableName; + } + + /** + * Get the full path of the HFile referenced by the back reference + * + * @param rootdir root hbase directory + * @param linkRefPath Link Back Reference path + * @return full path of the referenced hfile + * @throws IOException on unexpected error. + */ + public static Path getHFileFromBackReference(final Path rootDir, final Path linkRefPath) { + int separatorIndex = linkRefPath.getName().indexOf('.'); + String linkRegionName = linkRefPath.getName().substring(0, separatorIndex); + String linkTableName = linkRefPath.getName().substring(separatorIndex + 1); + String hfileName = getBackReferenceFileName(linkRefPath.getParent()); + Path familyPath = linkRefPath.getParent().getParent(); + Path regionPath = familyPath.getParent(); + Path tablePath = regionPath.getParent(); + + String linkName = createName(tablePath.getName(), regionPath.getName(), hfileName); + Path linkTableDir = FSUtils.getTablePath(rootDir, linkTableName); + Path regionDir = HRegion.getRegionDir(linkTableDir, linkRegionName); + return new Path(new Path(regionDir, familyPath.getName()), linkName); + } + + /** + * Get the full path of the HFile referenced by the back reference + * + * @param conf {@link Configuration} to read for the archive directory name + * @param linkRefPath Link Back Reference path + * @return full path of the referenced hfile + * @throws IOException on unexpected error. + */ + public static Path getHFileFromBackReference(final Configuration conf, final Path linkRefPath) + throws IOException { + return getHFileFromBackReference(FSUtils.getRootDir(conf), linkRefPath); + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (revision 1382316) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (working copy) @@ -591,6 +591,39 @@ } /** + * @param fs A file system + * @param path Path to HFile + * @param fsdis an open checksummed stream of path's file + * @param fsdisNoFsChecksum an open unchecksummed stream of path's file + * @param size max size of the trailer. + * @param cacheConf Cache configuration for hfile's contents + * @param preferredEncodingInCache Preferred in-cache data encoding algorithm. + * @param closeIStream boolean for closing file after the getting the reader version. + * @return A version specific Hfile Reader + * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException + */ + public static Reader createReaderWithEncoding( + FileSystem fs, Path path, FSDataInputStream fsdis, + FSDataInputStream fsdisNoFsChecksum, long size, CacheConfig cacheConf, + DataBlockEncoding preferredEncodingInCache, boolean closeIStream) + throws IOException { + HFileSystem hfs = null; + + // If the fs is not an instance of HFileSystem, then create an + // instance of HFileSystem that wraps over the specified fs. + // In this case, we will not be able to avoid checksumming inside + // the filesystem. + if (!(fs instanceof HFileSystem)) { + hfs = new HFileSystem(fs); + } else { + hfs = (HFileSystem)fs; + } + return pickReaderVersion(path, fsdis, fsdisNoFsChecksum, size, + closeIStream, cacheConf, + preferredEncodingInCache, hfs); + } + + /** * * @param fs filesystem * @param path Path to file to read Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java (revision 1382316) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java (working copy) @@ -82,7 +82,10 @@ if (logCleaners != null) { for (String className : logCleaners) { T logCleaner = newFileCleaner(className, conf); - if (logCleaner != null) this.cleanersChain.add(logCleaner); + if (logCleaner != null) { + LOG.debug("initialize cleaner=" + className); + this.cleanersChain.add(logCleaner); + } } } } @@ -196,7 +199,7 @@ */ private void checkAndDelete(Path filePath) throws IOException, IllegalArgumentException { if (!validate(filePath)) { - LOG.warn("Found a wrongly formatted file: " + filePath.getName() + "deleting it."); + LOG.warn("Found a wrongly formatted file: " + filePath.getName() + " deleting it."); if (!this.fs.delete(filePath, true)) { LOG.warn("Attempted to delete:" + filePath + ", but couldn't. Run cleaner chain and attempt to delete on next pass."); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java (revision 1382316) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java (working copy) @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.regionserver.StoreFile; /** * This Chore, every time it runs, will clear the HFiles in the hfile archive @@ -46,6 +47,9 @@ @Override protected boolean validate(Path file) { + if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) { + return true; + } return StoreFile.validateStoreFileName(file.getName()); } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileLinkCleaner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileLinkCleaner.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileLinkCleaner.java (revision 0) @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.cleaner; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; + +/** + * HFileLink cleaner that determines if a hfile should be deleted. + * HFiles can be deleted only if there're no links to them. + * + * When a HFileLink is created a back reference file is created in: + * /hbase/archive/table/region/cf/.links-hfile/ref-region.ref-table + * To check if the hfile can be deleted the back references folder must be empty. + */ +@InterfaceAudience.Private +public class HFileLinkCleaner extends BaseHFileCleanerDelegate { + private static final Log LOG = LogFactory.getLog(HFileLinkCleaner.class); + + private FileSystem fs; + + @Override + public synchronized boolean isFileDeletable(Path filePath) { + if (!instantiateFS()) return false; + + // HFile Link is always deletable + if (HFileLink.isHFileLink(filePath)) return true; + + // If the file is inside a link references directory, means that is a back ref link. + // The back ref can be deleted only if the referenced file doesn't exists. + Path parentDir = filePath.getParent(); + if (HFileLink.isBackReferencesDir(parentDir)) { + try { + Path hfilePath = HFileLink.getHFileFromBackReference(getConf(), filePath); + return !fs.exists(hfilePath); + } catch (IOException e) { + LOG.error("Couldn't verify if the referenced file still exists, keep it just in case"); + return false; + } + } + + // HFile is deletable only if has no links + try { + Path backRefDir = HFileLink.getBackReferencesDir(parentDir, filePath.getName()); + return FSUtils.listStatus(fs, backRefDir) == null; + } catch (IOException e) { + LOG.error("Couldn't get the references, not deleting file, just in case"); + return false; + } + } + + /** + * setup the filesystem, if it hasn't been already + */ + private synchronized boolean instantiateFS() { + if (this.fs == null) { + try { + this.fs = FileSystem.get(this.getConf()); + } catch (IOException e) { + LOG.error("Couldn't instantiate the file system, not deleting file, just in case"); + return false; + } + } + return true; + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 1382316) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -328,8 +329,18 @@ */ public static Path getStoreHomedir(final Path tabledir, final String encodedName, final byte [] family) { - return new Path(tabledir, new Path(encodedName, - new Path(Bytes.toString(family)))); + return getStoreHomedir(tabledir, encodedName, Bytes.toString(family)); + } + + /** + * @param tabledir + * @param encodedName Encoded region name. + * @param family + * @return Path to family/Store home directory. + */ + public static Path getStoreHomedir(final Path tabledir, + final String encodedName, final String family) { + return new Path(tabledir, new Path(encodedName, new Path(family))); } /** @@ -385,7 +396,7 @@ final Path p = files[i].getPath(); // Check for empty file. Should never be the case but can happen // after data loss in hdfs for whatever reason (upgrade, etc.): HBASE-646 - if (this.fs.getFileStatus(p).getLen() <= 0) { + if (!HFileLink.isHFileLink(p) && this.fs.getFileStatus(p).getLen() <= 0) { LOG.warn("Skipping " + p + " because its empty. HBASE-646 DATA LOSS?"); continue; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 1382316) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -45,6 +46,8 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.HalfStoreFileReader; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; @@ -151,6 +154,9 @@ // If this StoreFile references another, this is the other files path. private Path referencePath; + // If this storefile is a link to another, this is the link instance. + private HFileLink link; + // Block cache configuration and reference. private final CacheConfig cacheConf; @@ -245,9 +251,14 @@ this.dataBlockEncoder = dataBlockEncoder == null ? NoOpDataBlockEncoder.INSTANCE : dataBlockEncoder; - if (isReference(p)) { + + if (HFileLink.isHFileLink(p)) { + this.link = new HFileLink(conf, p); + LOG.debug("Store file " + p + " is a link"); + } else if (isReference(p)) { this.reference = Reference.read(fs, p); this.referencePath = getReferredToFile(this.path); + LOG.debug("Store file " + p + " is a reference"); } if (BloomFilterFactory.isGeneralBloomEnabled(conf)) { @@ -292,6 +303,13 @@ } /** + * @return True if this is a StoreFile Link + */ + boolean isLink() { + return this.link != null; + } + + /** * @param p Path to check. * @return True if the path has format of a HStoreFile reference. */ @@ -473,6 +491,7 @@ Path referencePath = getReferredToFile(p); return computeRefFileHDFSBlockDistribution(fs, reference, referencePath); } else { + if (HFileLink.isHFileLink(p)) p = HFileLink.getReferencedPath(fs, p); FileStatus status = fs.getFileStatus(p); long length = status.getLen(); return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, length); @@ -488,7 +507,12 @@ this.hdfsBlocksDistribution = computeRefFileHDFSBlockDistribution( this.fs, this.reference, this.referencePath); } else { - FileStatus status = this.fs.getFileStatus(this.path); + FileStatus status; + if (isLink()) { + status = link.getFileStatus(fs); + } else { + status = this.fs.getFileStatus(path); + } long length = status.getLen(); this.hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution( this.fs, status, 0, length); @@ -509,6 +533,10 @@ this.reader = new HalfStoreFileReader(this.fs, this.referencePath, this.cacheConf, this.reference, dataBlockEncoder.getEncodingInCache()); + } else if (isLink()) { + long size = link.getFileStatus(fs).getLen(); + this.reader = new Reader(this.fs, this.path, link, size, this.cacheConf, + dataBlockEncoder.getEncodingInCache(), true); } else { this.reader = new Reader(this.fs, this.path, this.cacheConf, dataBlockEncoder.getEncodingInCache()); @@ -867,6 +895,8 @@ * @return true if the file could be a valid store file, false otherwise */ public static boolean validateStoreFileName(String fileName) { + if (HFileLink.isHFileLink(fileName)) + return true; return !fileName.contains("-"); } @@ -1256,6 +1286,23 @@ bloomFilterType = BloomType.NONE; } + public Reader(FileSystem fs, Path path, HFileLink hfileLink, long size, + CacheConfig cacheConf, DataBlockEncoding preferredEncodingInCache, + boolean closeIStream) throws IOException { + super(path); + + FSDataInputStream in = hfileLink.open(fs); + FSDataInputStream inNoChecksum = in; + if (fs instanceof HFileSystem) { + FileSystem noChecksumFs = ((HFileSystem)fs).getNoChecksumFs(); + inNoChecksum = hfileLink.open(noChecksumFs); + } + + reader = HFile.createReaderWithEncoding(fs, path, in, inNoChecksum, + size, cacheConf, preferredEncodingInCache, closeIStream); + bloomFilterType = BloomType.NONE; + } + /** * ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS */ Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (revision 1382316) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (working copy) @@ -1112,6 +1112,25 @@ } /** + * Given a particular region dir, return all the familydirs inside it + * + * @param fs A file system for the Path + * @param regionDir Path to a specific region directory + * @return List of paths to valid family directories in region dir. + * @throws IOException + */ + public static List getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException { + // assumes we are in a region dir. + FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs)); + List familyDirs = new ArrayList(fds.length); + for (FileStatus fdfs: fds) { + Path fdPath = fdfs.getPath(); + familyDirs.add(fdPath); + } + return familyDirs; + } + + /** * Filter for HFiles that excludes reference files. */ public static class HFileFilter implements PathFilter { @@ -1228,7 +1247,19 @@ if (status == null || status.length < 1) return null; return status; } - + + /** + * Calls fs.listStatus() and treats FileNotFoundException as non-fatal + * This would accommodate difference in various hadoop versions + * + * @param fs file system + * @param dir directory + * @return null if tabledir doesn't exist, otherwise FileStatus array + */ + public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException { + return listStatus(fs, dir, null); + } + /** * Calls fs.delete() and returns the value returned by the fs.delete() * Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java (revision 1382316) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java (working copy) @@ -42,6 +42,21 @@ /** * Get the directory to archive a store directory * @param conf {@link Configuration} to read for the archive directory name + * @param tableName table name under which the store currently lives + * @param regionName region encoded name under which the store currently lives + * @param family name of the family in the store + * @return {@link Path} to the directory to archive the given store or + * null if it should not be archived + */ + public static Path getStoreArchivePath(final Configuration conf, final String tableName, + final String regionName, final String familyName) throws IOException { + Path tableArchiveDir = getTableArchivePath(conf, tableName); + return HStore.getStoreHomedir(tableArchiveDir, regionName, familyName); + } + + /** + * Get the directory to archive a store directory + * @param conf {@link Configuration} to read for the archive directory name * @param region parent region information under which the store currently * lives * @param family name of the family in the store @@ -104,6 +119,19 @@ } /** + * Get the path to the table archive directory based on the configured archive directory. + *

+ * Assumed that the table should already be archived. + * @param conf {@link Configuration} to read the archive directory property. Can be null + * @param tableName Name of the table to be archived. Cannot be null. + * @return {@link Path} to the archive directory for the table + */ + public static Path getTableArchivePath(final Configuration conf, final String tableName) + throws IOException { + return new Path(getArchivePath(conf), tableName); + } + + /** * Get the archive directory as per the configuration * @param conf {@link Configuration} to read the archive directory from (can be null, in which * case you get the default value). Can be null. Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java (revision 0) @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import junit.framework.TestCase; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.io.FileLink; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Test that FileLink switches between alternate locations + * when the current location moves or gets deleted. + */ +@Category(MediumTests.class) +public class TestFileLink { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static MiniDFSCluster cluster; + private static Configuration conf; + private static FileSystem fs; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = TEST_UTIL.getConfiguration(); + conf.setInt("dfs.blocksize", 1024 * 1024); + conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024); + + TEST_UTIL.startMiniDFSCluster(1); + cluster = TEST_UTIL.getDFSCluster(); + fs = cluster.getFileSystem(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Test that link is still readable even when the current file gets renamed. + */ + @Test + public void testLinkReadDuringRename() throws Exception { + Path originalPath = new Path(TEST_UTIL.getDataTestDir(), "test.file"); + Path archivedPath = new Path(TEST_UTIL.getDataTestDir(), "archived.file"); + fillFile(fs, originalPath, (byte)2); + + List files = new ArrayList(); + files.add(originalPath); + files.add(archivedPath); + + FileLink link = new FileLink(files); + FSDataInputStream in = link.open(fs); + try { + byte[] data = new byte[8192]; + long size = 0; + + // Read from origin + int n = in.read(data); + dataVerify(data, n, (byte)2); + size += n; + + // Move origin to archive + assertFalse(fs.exists(archivedPath)); + fs.rename(originalPath, archivedPath); + assertFalse(fs.exists(originalPath)); + assertTrue(fs.exists(archivedPath)); + + // Try to read to the end + while ((n = in.read(data)) > 0) { + dataVerify(data, n, (byte)2); + size += n; + } + + assertEquals(268435456, size); + } finally { + in.close(); + if (fs.exists(originalPath)) fs.delete(originalPath); + if (fs.exists(archivedPath)) fs.delete(archivedPath); + } + } + + /** + * Test that link is still readable even when the current file gets deleted. + */ + @Test + public void testLinkReadDuringDelete() throws IOException { + List files = new ArrayList(); + for (int i = 0; i < 3; i++) { + Path path = new Path(TEST_UTIL.getDataTestDir(), String.format("test-data-%d", i)); + fillFile(fs, path, (byte)i); + files.add(path); + } + + FileLink link = new FileLink(files); + FSDataInputStream in = link.open(fs); + try { + byte[] data = new byte[8192]; + int n; + + // Switch to file 1 + n = in.read(data); + dataVerify(data, n, (byte)0); + fs.delete(files.get(0)); + skipBuffer(in, (byte)0); + + // Switch to file 2 + n = in.read(data); + dataVerify(data, n, (byte)1); + fs.delete(files.get(1)); + skipBuffer(in, (byte)1); + + // Switch to file 3 + n = in.read(data); + dataVerify(data, n, (byte)2); + fs.delete(files.get(2)); + skipBuffer(in, (byte)2); + + // No more files available + try { + n = in.read(data); + assert(n <= 0); + } catch (FileNotFoundException e) { + assertTrue(true); + } + } finally { + in.close(); + } + } + + private void fillFile (FileSystem fs, Path path, byte v) throws IOException { + byte[] data = new byte[1 << 17]; + for (int i = 0; i < data.length; i++) { + data[i] = v; + } + + FSDataOutputStream stream = fs.create(path); + try { + for (int i = 0; i < 2048; i++) { + stream.write(data, 0, data.length); + } + } finally { + stream.close(); + } + } + + private void dataVerify(byte[] data, int n, byte v) { + for (int i = 0; i < n; ++i) { + assertEquals(v, data[i]); + } + } + + private void skipBuffer(FSDataInputStream in, byte v) throws IOException { + byte[] data = new byte[8192]; + try { + int n; + while ((n = in.read(data)) == data.length) { + for (int i = 0; i < data.length; ++i) { + if (data[i] != v) + throw new Exception("File changed"); + } + } + } catch (Exception e) { + } + } + + @org.junit.Rule + public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = + new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java (revision 0) @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.cleaner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.backup.HFileArchiver; +import org.apache.hadoop.hbase.catalog.CatalogTracker; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test the HFileLink Cleaner. + * HFiles with links cannot be deleted until a link is present. + */ +@Category(SmallTests.class) +public class TestHFileLinkCleaner { + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @Test + public void testHFileLinkCleaning() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(HConstants.HBASE_DIR, TEST_UTIL.getDataTestDir().toString()); + conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, + "org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner," + + "org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner"); + Path rootDir = FSUtils.getRootDir(conf); + FileSystem fs = FileSystem.get(conf); + + final String tableName = "test-table"; + final String tableLinkName = "test-link"; + final String hfileName = "1234567890"; + final String familyName = "cf"; + + HRegionInfo hri = new HRegionInfo(Bytes.toBytes(tableName)); + HRegionInfo hriLink = new HRegionInfo(Bytes.toBytes(tableLinkName)); + + Path archiveDir = HFileArchiveUtil.getArchivePath(conf); + Path archiveStoreDir = HFileArchiveUtil.getStoreArchivePath(conf, + tableName, hri.getEncodedName(), familyName); + Path archiveLinkStoreDir = HFileArchiveUtil.getStoreArchivePath(conf, + tableLinkName, hriLink.getEncodedName(), familyName); + + // Create hfile /hbase/table-link/region/cf/getEncodedName.HFILE(conf); + Path familyPath = getFamilyDirPath(archiveDir, tableName, hri.getEncodedName(), familyName); + fs.mkdirs(familyPath); + Path hfilePath = new Path(familyPath, hfileName); + fs.createNewFile(hfilePath); + + // Create link to hfile + Path familyLinkPath = getFamilyDirPath(rootDir, tableLinkName, + hriLink.getEncodedName(), familyName); + fs.mkdirs(familyLinkPath); + HFileLink.create(conf, fs, familyLinkPath, hri, hfileName); + Path linkBackRefDir = HFileLink.getBackReferencesDir(archiveStoreDir, hfileName); + assertTrue(fs.exists(linkBackRefDir)); + FileStatus[] backRefs = fs.listStatus(linkBackRefDir); + assertEquals(1, backRefs.length); + Path linkBackRef = backRefs[0].getPath(); + + // Initialize cleaner + final long ttl = 1000; + conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl); + Server server = new DummyServer(); + HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archiveDir); + + // Link backref cannot be removed + Thread.sleep(ttl * 2); + cleaner.chore(); + assertTrue(fs.exists(linkBackRef)); + assertTrue(fs.exists(hfilePath)); + + // Link backref can be removed + fs.rename(new Path(rootDir, tableLinkName), new Path(archiveDir, tableLinkName)); + Thread.sleep(ttl * 2); + cleaner.chore(); + assertFalse("Link should be deleted", fs.exists(linkBackRef)); + + // HFile can be removed + Thread.sleep(ttl * 2); + cleaner.chore(); + assertFalse("HFile should be deleted", fs.exists(hfilePath)); + + // Remove everything + for (int i = 0; i < 4; ++i) { + Thread.sleep(ttl * 2); + cleaner.chore(); + } + assertFalse("HFile should be deleted", fs.exists(new Path(archiveDir, tableName))); + assertFalse("Link should be deleted", fs.exists(new Path(archiveDir, tableLinkName))); + + cleaner.interrupt(); + } + + private static Path getFamilyDirPath (final Path rootDir, final String table, + final String region, final String family) { + return new Path(new Path(new Path(rootDir, table), region), family); + } + + static class DummyServer implements Server { + + @Override + public Configuration getConfiguration() { + return TEST_UTIL.getConfiguration(); + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + try { + return new ZooKeeperWatcher(getConfiguration(), "dummy server", this); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + @Override + public CatalogTracker getCatalogTracker() { + return null; + } + + @Override + public ServerName getServerName() { + return new ServerName("regionserver,60020,000000"); + } + + @Override + public void abort(String why, Throwable e) {} + + @Override + public boolean isAborted() { + return false; + } + + @Override + public void stop(String why) {} + + @Override + public boolean isStopped() { + return false; + } + } + + @org.junit.Rule + public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = + new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java (revision 1382316) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java (working copy) @@ -35,9 +35,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -52,6 +54,7 @@ import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; +import org.apache.hadoop.hbase.util.FSUtils; import org.junit.experimental.categories.Category; import org.mockito.Mockito; @@ -171,6 +174,39 @@ assertTrue(Bytes.equals(kv.getRow(), finalRow)); } + public void testHFileLink() throws IOException { + HRegionInfo hri = new HRegionInfo(Bytes.toBytes("table-link")); + Path storedir = new Path(new Path(FSUtils.getRootDir(conf), + new Path(hri.getTableNameAsString(), hri.getEncodedName())), "cf"); + + // Make a store file and write data to it. + StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, + this.fs, 8 * 1024) + .withOutputDir(storedir) + .build(); + Path storeFilePath = writer.getPath(); + writeStoreFile(writer); + writer.close(); + + Path dstPath = new Path(FSUtils.getRootDir(conf), new Path("test-region", "cf")); + HFileLink.create(conf, this.fs, dstPath, hri, storeFilePath.getName()); + Path linkFilePath = new Path(dstPath, HFileLink.createName(hri, storeFilePath.getName())); + + // Try to open store file from link + StoreFile hsf = new StoreFile(this.fs, linkFilePath, conf, cacheConf, + StoreFile.BloomType.NONE, NoOpDataBlockEncoder.INSTANCE); + assertTrue(hsf.isLink()); + + // Now confirm that I can read from the link + int count = 1; + HFileScanner s = hsf.createReader().getScanner(false, false); + s.seekTo(); + while (s.next()) { + count++; + } + assertEquals((LAST_CHAR - FIRST_CHAR + 1) * (LAST_CHAR - FIRST_CHAR + 1), count); + } + private void checkHalfHFile(final StoreFile f) throws IOException { byte [] midkey = f.createReader().midkey();