Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java (revision 0) @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io; + +import java.util.ArrayList; +import java.util.Collection; + +import java.io.IOException; +import java.io.InputStream; +import java.io.FileNotFoundException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; + +/** + * FileLink describes a link to a file. + * + * The link checks first in the original path, if it is not present + * it fallbacks to the alternative locations. + */ +@InterfaceAudience.Private +public class FileLink { + static final Log LOG = LogFactory.getLog(FileLink.class); + + /** + * FileLink InputStream that handles the switch between the original path + * and the alternative locations, when the file is moved. + */ + private static class FileLinkInputStream extends InputStream + implements Seekable, PositionedReadable { + private FSDataInputStream in = null; + private Path currentPath = null; + private long pos = 0; + + private final FileLink fileLink; + private final int bufferSize; + private final FileSystem fs; + + public FileLinkInputStream(final FileSystem fs, final FileLink fileLink) + throws IOException { + this(fs, fileLink, fs.getConf().getInt("io.file.buffer.size", 4096)); + } + + public FileLinkInputStream(final FileSystem fs, final FileLink fileLink, int bufferSize) + throws IOException { + this.bufferSize = bufferSize; + this.fileLink = fileLink; + this.fs = fs; + + this.in = tryOpen(); + } + + @Override + public int read() throws IOException { + int res; + try { + res = in.read(); + } catch (FileNotFoundException e) { + res = tryOpen().read(); + } + if (res > 0) pos += 1; + return res; + } + + @Override + public int read(byte b[]) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(byte b[], int off, int len) throws IOException { + int n; + try { + n = in.read(b, off, len); + } catch (FileNotFoundException e) { + n = tryOpen().read(b, off, len); + } + if (n > 0) pos += n; + assert(in.getPos() == pos); + return n; + } + + @Override + public int read(long position, byte[] buffer, int offset, int length) throws IOException { + int n; + try { + n = in.read(position, buffer, offset, length); + } catch (FileNotFoundException e) { + n = tryOpen().read(position, buffer, offset, length); + } + return n; + } + + @Override + public void readFully(long position, byte[] buffer) throws IOException { + readFully(position, buffer, 0, buffer.length); + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + try { + in.readFully(position, buffer, offset, length); + } catch (FileNotFoundException e) { + tryOpen().readFully(position, buffer, offset, length); + } + } + + @Override + public long skip(long n) throws IOException { + long skipped; + + try { + skipped = in.skip(n); + } catch (FileNotFoundException e) { + skipped = tryOpen().skip(n); + } + + if (skipped > 0) pos += skipped; + return skipped; + } + + @Override + public int available() throws IOException { + try { + return in.available(); + } catch (FileNotFoundException e) { + return tryOpen().available(); + } + } + + @Override + public void seek(long pos) throws IOException { + try { + in.seek(pos); + } catch (FileNotFoundException e) { + tryOpen().seek(pos); + } + this.pos = pos; + } + + @Override + public long getPos() throws IOException { + return pos; + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + boolean res; + try { + res = in.seekToNewSource(targetPos); + } catch (FileNotFoundException e) { + res = tryOpen().seekToNewSource(targetPos); + } + if (res) pos = targetPos; + return res; + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public synchronized void mark(int readlimit) { + } + + @Override + public synchronized void reset() throws IOException { + throw new IOException("mark/reset not supported"); + } + + @Override + public boolean markSupported() { + return false; + } + + /** + * Try to open one the file from one of the available locations. + * @return FSDataInputStream stream of the opened file link + * @throws IOException on unexepcted error, or file not found. + */ + private FSDataInputStream tryOpen() throws IOException { + for (Path path: fileLink.getLocations()) { + if (path.equals(currentPath)) continue; + try { + in = fs.open(path, bufferSize); + in.seek(pos); + assert(in.getPos() == pos) : "Link unable to seek to the right position"; + if (LOG.isTraceEnabled()) { + if (currentPath != null) { + LOG.trace("link open path=" + path); + } else { + LOG.trace("link switch from path=" + currentPath + " to path=" + path); + } + } + currentPath = path; + return(in); + } catch (FileNotFoundException e) { + // Try another file location + } + } + throw new FileNotFoundException("Unable to open link: " + fileLink); + } + } + + private Collection locations = null; + + public FileLink() { + this.locations = null; + } + + /** + * @param originPath Original location of the File Link + * @param alternativePaths Alternative locations of the File Link + * @throws IOException on unexepcted error. + */ + public FileLink(Path originPath, Path... alternativePaths) { + setLocations(originPath, alternativePaths); + } + + /** + * @param locations Locations of the File Link + * @throws IOException on unexepcted error. + */ + public FileLink(Collection locations) { + this.locations = locations; + } + + /** + * @return the locations of the file link. + */ + public Collection getLocations() { + return locations; + } + + public String toString() { + StringBuilder str = new StringBuilder(getClass().getName()); + str.append(" locations=["); + int i = 0; + for (Path location: locations) { + if (i++ > 0) str.append(", "); + str.append(location.toString()); + } + str.append("]"); + return str.toString(); + } + + /** + * @return the path of the first available link. + */ + public Path getAvailablePath(FileSystem fs) throws IOException { + for (Path path: locations) { + if (fs.exists(path)) { + return path; + } + } + throw new FileNotFoundException("Unable to open link: " + this); + } + + /** + * Get the FileStatus of the referenced file. + * + * @param fs {@link FileSystem} on which to get the file status + * @return InputStream for the hfile link. + * @throws IOException on unexepcted error. + */ + public FileStatus getFileStatus(FileSystem fs) throws IOException { + for (Path path: locations) { + try { + return fs.getFileStatus(path); + } catch (FileNotFoundException e) { + // Try another file location + } + } + throw new FileNotFoundException("Unable to open link: " + this); + } + + /** + * Open the FileLink for read. + * It uses the FileLinkInputStream to be able to switch between the + * original path and alternative locations when the file is moved. + * + * @param fs {@link FileSystem} on which to open the FileLink + * @return InputStream for reading the file link. + * @throws IOException on unexepcted error. + */ + public FSDataInputStream open(final FileSystem fs) throws IOException { + return new FSDataInputStream(new FileLinkInputStream(fs, this)); + } + + /** + * Open the FileLink for read. + * It uses the FileLinkInputStream to be able to switch between the + * original path and alternative locations when the file is moved. + * + * @param fs {@link FileSystem} on which to open the FileLink + * @param bufferSize the size of the buffer to be used. + * @return InputStream for reading the file link. + * @throws IOException on unexepcted error. + */ + public FSDataInputStream open(final FileSystem fs, int bufferSize) throws IOException { + return new FSDataInputStream(new FileLinkInputStream(fs, this)); + } + + /** + * NOTE: This method must be used only in the constructor! + * It creates a List with the specified locations for the link. + */ + protected void setLocations(Path originPath, Path... alternativePaths) { + this.locations = new ArrayList(); + this.locations.add(originPath); + for (int i = 0; i < alternativePaths.length; i++) { + this.locations.add(alternativePaths[i]); + } + } +} + Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java (revision 0) @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io; + +import java.io.IOException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.hadoop.hbase.util.FSUtils; + +/** + * HFileLink describes a link to an hfile. + * + * An hfile can be in /hbase//// + * or can be in /hbase/.archive/
/// + * + * The link checks first in the original path if it is not present + * it fallbacks to the archived path. + */ +@InterfaceAudience.Private +public class HFileLink extends FileLink { + /** Define the HFile Link name pattern in the form of: hfile-region-table */ + public static final Pattern LINK_NAME_PARSER = + Pattern.compile("^([0-9a-f\\.]+)-([0-9a-f]+)-([a-zA-Z_0-9][a-zA-Z0-9_\\-\\.]+)$"); + + private final Path archivePath; + private final Path originPath; + + /** + * @param conf {@link Configuration} from which to extract specific archive locations + * @param path The path of the HFile Link. + * @throws IOException on unexepcted error. + */ + public HFileLink(Configuration conf, Path path) throws IOException { + Path hfilePath = getRelativeTablePath(path); + + this.originPath = new Path(FSUtils.getRootDir(conf), hfilePath); + this.archivePath = new Path(HFileArchiveUtil.getArchivePath(conf), hfilePath); + setLocations(originPath, archivePath); + } + + public HFileLink(Path originPath, Path archivePath) { + this.originPath = originPath; + this.archivePath = archivePath; + setLocations(originPath, archivePath); + } + + /** + * @return the origin path of the hfile. + */ + public Path getOriginPath() { + return this.originPath; + } + + /** + * @return the path of the archived hfile. + */ + public Path getArchivePath() { + return this.archivePath; + } + + /** + * @param p Path to check. + * @return True if the path is a HFileLink. + */ + public static boolean isHFileLink(final Path path) { + return isHFileLink(path.getName()); + } + + + /** + * @param fileName File name to check. + * @return True if the path is a HFileLink. + */ + public static boolean isHFileLink(String fileName) { + Matcher m = LINK_NAME_PARSER.matcher(fileName); + if (!m.matches()) return false; + + return m.groupCount() > 2 && m.group(2) != null && m.group(3) != null; + } + + /** + * Get the HFileLink referenced path. + * + * @param fs {@link FileSystem} on which to check the HFileLink + * @param path HFileLink path + * @return Referenced path (original path or archived path) + * @throws IOException on unexepcted error. + */ + public static Path getReferencedPath(FileSystem fs, final Path path) throws IOException { + return getReferencedPath(fs.getConf(), fs, path); + } + + /** + * Get the HFileLink referenced path. + * The returned path can be the "original" file path like: /hbase/table/region/cf/hfile + * or a path to the archived file like: /hbase/.archive/table/region/cf/hfile + * + * @param fs {@link FileSystem} on which to check the HFileLink + * @param conf {@link Configuration} from which to extract specific archive locations + * @param path HFileLink path + * @return Referenced path (original path or archived path) + * @throws IOException on unexepcted error. + */ + public static Path getReferencedPath(Configuration conf, FileSystem fs, final Path path) + throws IOException { + Path hfilePath = getRelativeTablePath(path); + + Path originPath = new Path(FSUtils.getRootDir(conf), hfilePath); + if (fs.exists(originPath)) { + return originPath; + } + + return new Path(HFileArchiveUtil.getArchivePath(conf), hfilePath); + } + + /** + * Convert a HFileLink path to a table relative path. + * e.g. the link: /hbase/test/0123/cf/abcd-4567-testtb + * becomes: /hbase/testtb/4567/cf/abcd + * + * @param path HFileLink path + * @return Relative table path + * @throws IOException on unexepcted error. + */ + private static Path getRelativeTablePath(final Path path) { + // hfile-region-table + Matcher m = LINK_NAME_PARSER.matcher(path.getName()); + if (!m.matches()) { + throw new IllegalArgumentException(path.getName() + " is not a valid HFileLink name!"); + } + + // Convert the HFileLink name into a real table/region/cf/hfile path. + String hfileName = m.group(1); + String regionName = m.group(2); + String tableName = m.group(3); + String familyName = path.getParent().getName(); + return new Path(new Path(tableName, regionName), new Path(familyName, hfileName)); + } + + public static String getReferencedHFileName(String fileName) { + Matcher m = LINK_NAME_PARSER.matcher(fileName); + if (!m.matches()) { + throw new IllegalArgumentException(fileName + " is not a valid HFileLink name!"); + } + return(m.group(1)); + } + + /** + * Create a new HFileLink name + * + * @param hfileRegionInfo - Linked HFile Region Info + * @param hfileName - Linked HFile name + * @return file name of the HFile Link + */ + public static String createName(HRegionInfo hfileRegionInfo, String hfileName) { + return String.format("%s-%s-%s", hfileName, hfileRegionInfo.getEncodedName(), + hfileRegionInfo.getTableNameAsString()); + } + + /** + * Create a new HFileLink + * + * @param fs {@link FileSystem} on which to write the HFileLink + * @param dstPath - Destination path (table/region/cf/) + * @param hfileRegionInfo - Linked HFile Region Info + * @param hfileName - Linked HFile name + * @return true if the file is created, otherwise is the file exists. + * @throws IOException on unexepcted error. + */ + public static boolean create(FileSystem fs, final Path dstPath, + HRegionInfo hfileRegionInfo, String hfileName) throws IOException { + String name = createName(hfileRegionInfo, hfileName); + fs.mkdirs(dstPath); + return fs.createNewFile(new Path(dstPath, name)); + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (revision 1379715) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (working copy) @@ -592,6 +592,39 @@ } /** + * @param fs A file system + * @param path Path to HFile + * @param fsdis an open checksummed stream of path's file + * @param fsdisNoFsChecksum an open unchecksummed stream of path's file + * @param size max size of the trailer. + * @param cacheConf Cache configuration for hfile's contents + * @param preferredEncodingInCache Preferred in-cache data encoding algorithm. + * @param closeIStream boolean for closing file after the getting the reader version. + * @return A version specific Hfile Reader + * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException + */ + public static Reader createReaderWithEncoding( + FileSystem fs, Path path, FSDataInputStream fsdis, + FSDataInputStream fsdisNoFsChecksum, long size, CacheConfig cacheConf, + DataBlockEncoding preferredEncodingInCache, boolean closeIStream) + throws IOException { + HFileSystem hfs = null; + + // If the fs is not an instance of HFileSystem, then create an + // instance of HFileSystem that wraps over the specified fs. + // In this case, we will not be able to avoid checksumming inside + // the filesystem. + if (!(fs instanceof HFileSystem)) { + hfs = new HFileSystem(fs); + } else { + hfs = (HFileSystem)fs; + } + return pickReaderVersion(path, fsdis, fsdisNoFsChecksum, size, + closeIStream, cacheConf, + preferredEncodingInCache, hfs); + } + + /** * * @param fs filesystem * @param path Path to file to read Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 1379715) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -386,7 +387,7 @@ final Path p = files[i].getPath(); // Check for empty file. Should never be the case but can happen // after data loss in hdfs for whatever reason (upgrade, etc.): HBASE-646 - if (this.fs.getFileStatus(p).getLen() <= 0) { + if (!HFileLink.isHFileLink(p) && this.fs.getFileStatus(p).getLen() <= 0) { LOG.warn("Skipping " + p + " because its empty. HBASE-646 DATA LOSS?"); continue; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 1379715) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -38,6 +38,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -46,6 +47,8 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.HalfStoreFileReader; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; @@ -152,6 +155,9 @@ // If this StoreFile references another, this is the other files path. private Path referencePath; + // If this storefile is a link to another, this is the link instance. + private HFileLink link; + // Block cache configuration and reference. private final CacheConfig cacheConf; @@ -246,9 +252,14 @@ this.dataBlockEncoder = dataBlockEncoder == null ? NoOpDataBlockEncoder.INSTANCE : dataBlockEncoder; - if (isReference(p)) { + + if (HFileLink.isHFileLink(p)) { + this.link = new HFileLink(conf, p); + LOG.debug("Store file " + p + " is a link"); + } else if (isReference(p)) { this.reference = Reference.read(fs, p); this.referencePath = getReferredToFile(this.path); + LOG.debug("Store file " + p + " is a reference"); } if (BloomFilterFactory.isGeneralBloomEnabled(conf)) { @@ -293,6 +304,13 @@ } /** + * @return True if this is a StoreFile Link + */ + boolean isLink() { + return this.link != null; + } + + /** * @param p Path to check. * @return True if the path has format of a HStoreFile reference. */ @@ -474,6 +492,7 @@ Path referencePath = getReferredToFile(p); return computeRefFileHDFSBlockDistribution(fs, reference, referencePath); } else { + if (HFileLink.isHFileLink(p)) p = HFileLink.getReferencedPath(fs, p); FileStatus status = fs.getFileStatus(p); long length = status.getLen(); return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, length); @@ -489,7 +508,12 @@ this.hdfsBlocksDistribution = computeRefFileHDFSBlockDistribution( this.fs, this.reference, this.referencePath); } else { - FileStatus status = this.fs.getFileStatus(this.path); + FileStatus status; + if (isLink()) { + status = link.getFileStatus(fs); + } else { + status = this.fs.getFileStatus(path); + } long length = status.getLen(); this.hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution( this.fs, status, 0, length); @@ -510,6 +534,10 @@ this.reader = new HalfStoreFileReader(this.fs, this.referencePath, this.cacheConf, this.reference, dataBlockEncoder.getEncodingInCache()); + } else if (isLink()) { + long size = link.getFileStatus(fs).getLen(); + this.reader = new Reader(this.fs, this.path, link, size, this.cacheConf, + dataBlockEncoder.getEncodingInCache(), true); } else { this.reader = new Reader(this.fs, this.path, this.cacheConf, dataBlockEncoder.getEncodingInCache()); @@ -868,6 +896,8 @@ * @return true if the file could be a valid store file, false otherwise */ public static boolean validateStoreFileName(String fileName) { + if (HFileLink.isHFileLink(fileName)) + return true; return !fileName.contains("-"); } @@ -1257,6 +1287,23 @@ bloomFilterType = BloomType.NONE; } + public Reader(FileSystem fs, Path path, HFileLink hfileLink, long size, + CacheConfig cacheConf, DataBlockEncoding preferredEncodingInCache, + boolean closeIStream) throws IOException { + super(path); + + FSDataInputStream in = hfileLink.open(fs); + FSDataInputStream inNoChecksum = in; + if (fs instanceof HFileSystem) { + FileSystem noChecksumFs = ((HFileSystem)fs).getNoChecksumFs(); + inNoChecksum = hfileLink.open(noChecksumFs); + } + + reader = HFile.createReaderWithEncoding(fs, path, in, inNoChecksum, + size, cacheConf, preferredEncodingInCache, closeIStream); + bloomFilterType = BloomType.NONE; + } + /** * ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS */ Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java (revision 0) @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import junit.framework.TestCase; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MediumTests; + +import org.apache.hadoop.hbase.io.FileLink; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +@Category(MediumTests.class) +public class TestFileLink { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static MiniDFSCluster cluster; + private static Configuration conf; + private static FileSystem fs; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = TEST_UTIL.getConfiguration(); + conf.setInt("dfs.blocksize", 1024 * 1024); + conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024); + + TEST_UTIL.startMiniDFSCluster(1); + cluster = TEST_UTIL.getDFSCluster(); + fs = cluster.getFileSystem(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testLinkReadDuringRename() throws Exception { + Path originalPath = new Path("test.file"); + Path archivedPath = new Path("archived.file"); + fillFile(fs, originalPath, (byte)2); + + List files = new ArrayList(); + files.add(originalPath); + files.add(archivedPath); + + FileLink link = new FileLink(files); + FSDataInputStream in = link.open(fs); + try { + byte[] data = new byte[8192]; + long size = 0; + + // Read from origin + int n = in.read(data); + dataVerify(data, n, (byte)2); + size += n; + + // Move origin to archive + assertFalse(fs.exists(archivedPath)); + fs.rename(originalPath, archivedPath); + assertFalse(fs.exists(originalPath)); + assertTrue(fs.exists(archivedPath)); + + // Try to read to the end + while ((n = in.read(data)) > 0) { + dataVerify(data, n, (byte)2); + size += n; + } + + assertEquals(268435456, size); + } finally { + in.close(); + if (fs.exists(originalPath)) fs.delete(originalPath); + if (fs.exists(archivedPath)) fs.delete(archivedPath); + } + } + + @Test + public void testLinkReadDuringDelete() throws IOException { + List files = new ArrayList(); + for (int i = 0; i < 3; i++) { + Path path = new Path(String.format("test-data-%d", i)); + fillFile(fs, path, (byte)i); + files.add(path); + } + + FileLink link = new FileLink(files); + FSDataInputStream in = link.open(fs); + try { + byte[] data = new byte[8192]; + int n; + + // Switch to file 1 + n = in.read(data); + dataVerify(data, n, (byte)0); + fs.delete(files.get(0)); + skipBuffer(in, (byte)0); + + // Switch to file 2 + n = in.read(data); + dataVerify(data, n, (byte)1); + fs.delete(files.get(1)); + skipBuffer(in, (byte)1); + + // Switch to file 3 + n = in.read(data); + dataVerify(data, n, (byte)2); + fs.delete(files.get(2)); + skipBuffer(in, (byte)2); + + // No more files available + try { + n = in.read(data); + assertTrue(false); + } catch (FileNotFoundException e) { + assertTrue(true); + } + } finally { + in.close(); + } + } + + private void fillFile (FileSystem fs, Path path, byte v) throws IOException { + byte[] data = new byte[1 << 17]; + for (int i = 0; i < data.length; i++) { + data[i] = v; + } + + FSDataOutputStream stream = fs.create(path); + try { + for (int i = 0; i < 2048; i++) { + stream.write(data, 0, data.length); + } + } finally { + stream.close(); + } + } + + private void dataVerify(byte[] data, int n, byte v) { + for (int i = 0; i < n; ++i) { + assertEquals(v, data[i]); + } + } + + private void skipBuffer(FSDataInputStream in, byte v) throws IOException { + byte[] data = new byte[8192]; + try { + int n; + while ((n = in.read(data)) == data.length) { + for (int i = 0; i < data.length; ++i) { + if (data[i] != v) + throw new Exception("File changed"); + } + } + } catch (Exception e) { + } + } + + @org.junit.Rule + public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = + new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java (revision 1379715) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java (working copy) @@ -36,9 +36,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -53,6 +55,7 @@ import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; +import org.apache.hadoop.hbase.util.FSUtils; import org.junit.experimental.categories.Category; import org.mockito.Mockito; @@ -172,6 +175,39 @@ assertTrue(Bytes.equals(kv.getRow(), finalRow)); } + public void testHFileLink() throws IOException { + HRegionInfo hri = new HRegionInfo(Bytes.toBytes("table-link")); + Path storedir = new Path(new Path(FSUtils.getRootDir(conf), + new Path(hri.getTableNameAsString(), hri.getEncodedName())), "cf"); + + // Make a store file and write data to it. + StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, + this.fs, 8 * 1024) + .withOutputDir(storedir) + .build(); + Path storeFilePath = writer.getPath(); + writeStoreFile(writer); + writer.close(); + + Path dstPath = new Path("test-region", "cf"); + HFileLink.create(this.fs, dstPath, hri, storeFilePath.getName()); + Path linkFilePath = new Path(dstPath, HFileLink.createName(hri, storeFilePath.getName())); + + // Try to open store file from link + StoreFile hsf = new StoreFile(this.fs, linkFilePath, conf, cacheConf, + StoreFile.BloomType.NONE, NoOpDataBlockEncoder.INSTANCE); + assertTrue(hsf.isLink()); + + // Now confirm that I can read from the link + int count = 1; + HFileScanner s = hsf.createReader().getScanner(false, false); + s.seekTo(); + while (s.next()) { + count++; + } + assertEquals((LAST_CHAR - FIRST_CHAR + 1) * (LAST_CHAR - FIRST_CHAR + 1), count); + } + private void checkHalfHFile(final StoreFile f) throws IOException { byte [] midkey = f.createReader().midkey();