commit 257d04d37c3ddbfd81ea0585217dbe4be8ab0b62 Author: Enis Soztutar Date: Fri Jul 29 13:55:32 2016 -0700 HBASE-16475 Remove SequenceFile based WAL diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java index 3e89be7..5f82ba5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java @@ -38,7 +38,6 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { this.watcher = server.getZooKeeper(); splitLogWorkerCoordination = new ZkSplitLogWorkerCoordination(this, watcher); splitLogManagerCoordination = new ZKSplitLogManagerCoordination(this, watcher); - } @Override @@ -49,7 +48,8 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { @Override public SplitLogWorkerCoordination getSplitLogWorkerCoordination() { return splitLogWorkerCoordination; - } + } + @Override public SplitLogManagerCoordination getSplitLogManagerCoordination() { return splitLogManagerCoordination; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java index 2114cc4..ed51db5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java @@ -131,6 +131,11 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader { seekOnFs(pos); } + @Override + public long getFileLength() throws IOException { + return fileLength; + } + /** * Initializes the log reader with a particular stream (may be null). * Reader assumes ownership of the stream if not null and may use it. Called once. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java deleted file mode 100644 index e41e1c3..0000000 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java +++ /dev/null @@ -1,309 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver.wal; - -import java.io.IOException; -import java.lang.reflect.Field; -import java.util.NavigableMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hdfs.client.HdfsDataInputStream; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.Metadata; -import org.apache.hadoop.io.Text; - -@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, - HBaseInterfaceAudience.CONFIG}) -public class SequenceFileLogReader extends ReaderBase { - private static final Log LOG = LogFactory.getLog(SequenceFileLogReader.class); - - // Legacy stuff from pre-PB WAL metadata. - private static final Text WAL_VERSION_KEY = new Text("version"); - // Let the version be 1. Let absence of a version meta tag be old, version 0. - // Set this version '1' to be the version that introduces compression, - // the COMPRESSION_VERSION. - private static final int COMPRESSION_VERSION = 1; - private static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type"); - private static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary"); - - /** - * Hack just to set the correct file length up in SequenceFile.Reader. - * See HADOOP-6307. The below is all about setting the right length on the - * file we are reading. fs.getFileStatus(file).getLen() is passed down to - * a private SequenceFile.Reader constructor. This won't work. Need to do - * the available on the stream. The below is ugly. It makes getPos, the - * first time its called, return length of the file -- i.e. tell a lie -- just - * so this line up in SF.Reader's constructor ends up with right answer: - * - * this.end = in.getPos() + length; - * - */ - private static class WALReader extends SequenceFile.Reader { - - WALReader(final FileSystem fs, final Path p, final Configuration c) - throws IOException { - super(fs, p, c); - } - - @Override - protected FSDataInputStream openFile(FileSystem fs, Path file, - int bufferSize, long length) - throws IOException { - return new WALReaderFSDataInputStream(super.openFile(fs, file, - bufferSize, length), length); - } - - /** - * Override just so can intercept first call to getPos. - */ - static class WALReaderFSDataInputStream extends FSDataInputStream { - private boolean firstGetPosInvocation = true; - private long length; - - WALReaderFSDataInputStream(final FSDataInputStream is, final long l) - throws IOException { - super(is); - this.length = l; - } - - // This section can be confusing. It is specific to how HDFS works. - // Let me try to break it down. This is the problem: - // - // 1. HDFS DataNodes update the NameNode about a filename's length - // on block boundaries or when a file is closed. Therefore, - // if an RS dies, then the NN's fs.getLength() can be out of date - // 2. this.in.available() would work, but it returns int & - // therefore breaks for files > 2GB (happens on big clusters) - // 3. DFSInputStream.getFileLength() gets the actual length from the DNs - // 4. DFSInputStream is wrapped 2 levels deep : this.in.in - // - // So, here we adjust getPos() using getFileLength() so the - // SequenceFile.Reader constructor (aka: first invocation) comes out - // with the correct end of the file: - // this.end = in.getPos() + length; - @Override - public long getPos() throws IOException { - if (this.firstGetPosInvocation) { - this.firstGetPosInvocation = false; - long adjust = 0; - HdfsDataInputStream hdfsDataInputStream = null; - try { - if (this.in.getClass().getName().endsWith("HdfsDataInputStream") - || this.in.getClass().getName().endsWith("DFSInputStream")) { - hdfsDataInputStream = (HdfsDataInputStream) this.getWrappedStream(); - long realLength = hdfsDataInputStream.getVisibleLength(); - assert(realLength >= this.length); - adjust = realLength - this.length; - } else { - LOG.info( - "Input stream class: " + this.in.getClass().getName() + ", not adjusting length"); - } - } catch (Exception e) { - LOG.warn("Error while trying to get accurate file length. " - + "Truncation / data loss may occur if RegionServers die.", - e); - throw new IOException(e); - } - return adjust + super.getPos(); - } - return super.getPos(); - } - } - } - - // Protected for tests. - protected SequenceFile.Reader reader; - long entryStart = 0; // needed for logging exceptions - - public SequenceFileLogReader() { - super(); - } - - @Override - public void close() throws IOException { - try { - if (reader != null) { - this.reader.close(); - this.reader = null; - } - } catch (IOException ioe) { - throw addFileInfoToException(ioe); - } - } - - @Override - public long getPosition() throws IOException { - return reader != null ? reader.getPosition() : 0; - } - - @Override - public void reset() throws IOException { - // Resetting the reader lets us see newly added data if the file is being written to - // We also keep the same compressionContext which was previously populated for this file - reader = new WALReader(fs, path, conf); - } - - @Override - protected String initReader(FSDataInputStream stream) throws IOException { - // We don't use the stream because we have to have the magic stream above. - if (stream != null) { - stream.close(); - } - reset(); - return null; - } - - @Override - protected void initAfterCompression(String cellCodecClsName) throws IOException { - // Nothing to do here - } - - @Override - protected void initAfterCompression() throws IOException { - // Nothing to do here - } - - @Override - protected boolean hasCompression() { - return isWALCompressionEnabled(reader.getMetadata()); - } - - @Override - protected boolean hasTagCompression() { - // Tag compression not supported with old SequenceFileLog Reader/Writer - return false; - } - - /** - * Call this method after init() has been executed - * @return whether WAL compression is enabled - */ - static boolean isWALCompressionEnabled(final Metadata metadata) { - // Check version is >= VERSION? - Text txt = metadata.get(WAL_VERSION_KEY); - if (txt == null || Integer.parseInt(txt.toString()) < COMPRESSION_VERSION) { - return false; - } - // Now check that compression type is present. Currently only one value. - txt = metadata.get(WAL_COMPRESSION_TYPE_KEY); - return txt != null && txt.equals(DICTIONARY_COMPRESSION_TYPE); - } - - - /** - * fill in the passed entry with teh next key/value. - * Note that because this format deals with our legacy storage, the provided - * Entery MUST use an {@link HLogKey} for the key. - * @return boolean indicating if the contents of Entry have been filled in. - */ - @Override - protected boolean readNext(Entry e) throws IOException { - try { - if (!(e.getKey() instanceof HLogKey)) { - final IllegalArgumentException exception = new IllegalArgumentException( - "SequenceFileLogReader only works when given entries that have HLogKey for keys. This" + - " one had '" + e.getKey().getClass() + "'"); - LOG.error("We need to use the legacy SequenceFileLogReader to handle a " + - " pre-0.96 style WAL, but HBase internals failed to use the deprecated HLogKey class." + - " This is a bug; please file an issue or email the developer mailing list. You will " + - "need the following exception details when seeking help from the HBase community.", - exception); - throw exception; - } - boolean hasNext = this.reader.next((HLogKey)e.getKey(), e.getEdit()); - if (!hasNext) return false; - // Scopes are probably in WAL edit, move to key - NavigableMap scopes = e.getEdit().getAndRemoveScopes(); - if (scopes != null) { - e.getKey().readOlderScopes(scopes); - } - return true; - } catch (IOException ioe) { - throw addFileInfoToException(ioe); - } - } - - @Override - protected void seekOnFs(long pos) throws IOException { - try { - reader.seek(pos); - } catch (IOException ioe) { - throw addFileInfoToException(ioe); - } - } - - protected IOException addFileInfoToException(final IOException ioe) - throws IOException { - long pos = -1; - try { - pos = getPosition(); - } catch (IOException e) { - LOG.warn("Failed getting position to add to throw", e); - } - - // See what SequenceFile.Reader thinks is the end of the file - long end = Long.MAX_VALUE; - try { - Field fEnd = SequenceFile.Reader.class.getDeclaredField("end"); - fEnd.setAccessible(true); - end = fEnd.getLong(this.reader); - } catch(NoSuchFieldException nfe) { - /* reflection failure, keep going */ - if (LOG.isTraceEnabled()) LOG.trace(nfe); - } catch(IllegalAccessException iae) { - /* reflection failure, keep going */ - if (LOG.isTraceEnabled()) LOG.trace(iae); - } catch(Exception e) { - /* All other cases. Should we handle it more aggressively? */ - LOG.warn("Unexpected exception when accessing the end field", e); - } - - String msg = (this.path == null? "": this.path.toString()) + - ", entryStart=" + entryStart + ", pos=" + pos + - ((end == Long.MAX_VALUE) ? "" : ", end=" + end) + - ", edit=" + this.edit; - - // Enhance via reflection so we don't change the original class type - try { - return (IOException) ioe.getClass() - .getConstructor(String.class) - .newInstance(msg) - .initCause(ioe); - } catch(NoSuchMethodException nfe) { - /* reflection failure, keep going */ - if (LOG.isTraceEnabled()) LOG.trace(nfe); - } catch(IllegalAccessException iae) { - /* reflection failure, keep going */ - if (LOG.isTraceEnabled()) LOG.trace(iae); - } catch(Exception e) { - /* All other cases. Should we handle it more aggressively? */ - LOG.warn("Unexpected exception when accessing the end field", e); - } - return ioe; - } -} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 79321b3..3d74a97 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -230,6 +230,7 @@ public interface WAL { void seek(long pos) throws IOException; long getPosition() throws IOException; void reset() throws IOException; + long getFileLength() throws IOException; } /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 8ed9bfb..50972fc 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; // imports for things that haven't moved from regionserver.wal yet. import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; -import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogReader; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; /** @@ -312,7 +311,12 @@ public class WALFactory { boolean isPbWal = (stream.read(magic) == magic.length) && Arrays.equals(magic, ProtobufLogReader.PB_WAL_MAGIC); - reader = isPbWal ? new ProtobufLogReader() : new SequenceFileLogReader(); + if (!isPbWal) { + throw new IOException("Cannot read non-PB WAL file. If this is an upgrade from before" + + "HBase-1.0 (0.92, 0.94) please make sure that all WAL files are cleaned up and " + + "cluster is shutdown gracefully before upgrade."); + } + reader = new ProtobufLogReader(); reader.init(fs, path, conf, stream); return reader; } @@ -399,7 +403,7 @@ public class WALFactory { // For now, first Configuration object wins. Practically this just impacts the reader/writer class private static final AtomicReference singleton = new AtomicReference(); private static final String SINGLETON_ID = WALFactory.class.getName(); - + // public only for FSHLog public static WALFactory getInstance(Configuration configuration) { WALFactory factory = singleton.get(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 3e27834..beccb04 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -251,7 +251,7 @@ public class WALSplitter { if (logfiles != null && logfiles.length > 0) { for (FileStatus logfile: logfiles) { WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null, - RecoveryMode.LOG_SPLITTING); + RecoveryMode.LOG_SPLITTING); if (s.splitLogFile(logfile, null)) { finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); if (s.outputSink.splits != null) { @@ -385,8 +385,7 @@ public class WALSplitter { throw iie; } catch (CorruptedLogFileException e) { LOG.warn("Could not parse, corrupted log file " + logPath, e); - csm.getSplitLogWorkerCoordination().markCorrupted(rootDir, - logfile.getPath().getName(), fs); + ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs); isCorrupted = true; } catch (IOException e) { e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyLogReader.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyLogReader.java new file mode 100644 index 0000000..00e70dc --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyLogReader.java @@ -0,0 +1,118 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALKey; + +public class FaultyLogReader extends ProtobufLogReader { + + private static final Log LOG = LogFactory.getLog(FaultyLogReader.class); + + // public until class relocates to o.a.h.h.wal + public enum FailureType { + BEGINNING, MIDDLE, END, NONE + } + + Queue nextQueue = new LinkedList(); + int numberOfFileEntries = 0; + + FailureType getFailureType() { + return FailureType.valueOf(conf.get("faultysequencefilelogreader.failuretype", "NONE")); + } + + @Override + public Entry next(Entry reuse) throws IOException { + long entryStart = this.getPosition(); + boolean b = true; + + if (nextQueue.isEmpty()) { // Read the whole thing at once and fake reading + while (b == true) { + Entry e = new Entry(new WALKey(), new WALEdit()); + if (compressionContext != null) { + e.setCompressionContext(compressionContext); + } + b = readNext(e); + nextQueue.offer(e); + numberOfFileEntries++; + } + } + + if (nextQueue.size() == this.numberOfFileEntries + && getFailureType() == FailureType.BEGINNING) { + throw this.addFileInfoToException(entryStart, new IOException("fake Exception")); + } else if (nextQueue.size() == this.numberOfFileEntries / 2 + && getFailureType() == FailureType.MIDDLE) { + throw this.addFileInfoToException(entryStart, new IOException("fake Exception")); + } else if (nextQueue.size() == 1 && getFailureType() == FailureType.END) { + throw this.addFileInfoToException(entryStart, new IOException("fake Exception")); + } + + if (nextQueue.peek() != null) { + edit++; + } + + Entry e = nextQueue.poll(); + + if (e.getEdit().isEmpty()) { + return null; + } + return e; + } + + private IOException addFileInfoToException(long entryStart, final IOException ioe) + throws IOException { + long pos = -1; + try { + pos = getPosition(); + } catch (IOException e) { + LOG.warn("Failed getting position to add to throw", e); + } + + String msg = (this.path == null? "": this.path.toString()) + + ", entryStart=" + entryStart + ", pos=" + pos + + ", fileLength=" + getFileLength() + + ", edit=" + this.edit; + + // Enhance via reflection so we don't change the original class type + try { + return (IOException) ioe.getClass() + .getConstructor(String.class) + .newInstance(msg) + .initCause(ioe); + } catch(NoSuchMethodException nfe) { + /* reflection failure, keep going */ + if (LOG.isTraceEnabled()) LOG.trace(nfe); + } catch(IllegalAccessException iae) { + /* reflection failure, keep going */ + if (LOG.isTraceEnabled()) LOG.trace(iae); + } catch(Exception e) { + /* All other cases. Should we handle it more aggressively? */ + LOG.warn("Unexpected exception when accessing the end field", e); + } + return ioe; + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java deleted file mode 100644 index a0e4490..0000000 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver.wal; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.Queue; - -import org.apache.hadoop.hbase.wal.WAL.Entry; - -public class FaultySequenceFileLogReader extends SequenceFileLogReader { - - // public until class relocates to o.a.h.h.wal - public enum FailureType { - BEGINNING, MIDDLE, END, NONE - } - - Queue nextQueue = new LinkedList(); - int numberOfFileEntries = 0; - - FailureType getFailureType() { - return FailureType.valueOf(conf.get("faultysequencefilelogreader.failuretype", "NONE")); - } - - @Override - public Entry next(Entry reuse) throws IOException { - this.entryStart = this.getPosition(); - boolean b = true; - - if (nextQueue.isEmpty()) { // Read the whole thing at once and fake reading - while (b == true) { - Entry e = new Entry(new HLogKey(), new WALEdit()); - if (compressionContext != null) { - e.setCompressionContext(compressionContext); - } - b = readNext(e); - nextQueue.offer(e); - numberOfFileEntries++; - } - } - - if (nextQueue.size() == this.numberOfFileEntries - && getFailureType() == FailureType.BEGINNING) { - throw this.addFileInfoToException(new IOException("fake Exception")); - } else if (nextQueue.size() == this.numberOfFileEntries / 2 - && getFailureType() == FailureType.MIDDLE) { - throw this.addFileInfoToException(new IOException("fake Exception")); - } else if (nextQueue.size() == 1 && getFailureType() == FailureType.END) { - throw this.addFileInfoToException(new IOException("fake Exception")); - } - - if (nextQueue.peek() != null) { - edit++; - } - - Entry e = nextQueue.poll(); - - if (e.getEdit().isEmpty()) { - return null; - } - return e; - } -} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java deleted file mode 100644 index 101758e..0000000 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java +++ /dev/null @@ -1,239 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver.wal; - -import java.io.IOException; -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.util.TreeMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.io.util.LRUDictionary; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.wal.FSHLogProvider; -import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.io.SequenceFile.Metadata; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.DefaultCodec; - -/** - * Implementation of {@link WALProvider.Writer} that delegates to - * SequenceFile.Writer. Legacy implementation only used for compat tests. - * - * Note that because this class writes to the legacy hadoop-specific SequenceFile - * format, users of it must write {@link HLogKey} keys and not arbitrary - * {@link WALKey}s because the latter are not Writables (nor made to work with - * Hadoop serialization). - */ -@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) -public class SequenceFileLogWriter implements FSHLogProvider.Writer { - private static final Log LOG = LogFactory.getLog(SequenceFileLogWriter.class); - // The sequence file we delegate to. - private SequenceFile.Writer writer; - // This is the FSDataOutputStream instance that is the 'out' instance - // in the SequenceFile.Writer 'writer' instance above. - private FSDataOutputStream writer_out; - - private CompressionContext compressionContext; - - // Legacy stuff from pre-PB WAL metadata. - private static final Text WAL_VERSION_KEY = new Text("version"); - private static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type"); - private static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary"); - - /** - * Default constructor. - */ - public SequenceFileLogWriter() { - super(); - } - /** - * Create sequence file Metadata for our WAL file with version and compression - * type (if any). - * @param conf - * @param compress - * @return Metadata instance. - */ - private static Metadata createMetadata(final Configuration conf, - final boolean compress) { - TreeMap metaMap = new TreeMap(); - metaMap.put(WAL_VERSION_KEY, new Text("1")); - if (compress) { - // Currently we only do one compression type. - metaMap.put(WAL_COMPRESSION_TYPE_KEY, DICTIONARY_COMPRESSION_TYPE); - } - return new Metadata(metaMap); - } - - private boolean initializeCompressionContext(Configuration conf, Path path) throws IOException { - boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); - if (doCompress) { - try { - this.compressionContext = new CompressionContext(LRUDictionary.class, - FSUtils.isRecoveredEdits(path), conf.getBoolean( - CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true)); - } catch (Exception e) { - throw new IOException("Failed to initiate CompressionContext", e); - } - } - return doCompress; - } - - @Override - public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) - throws IOException { - boolean compress = initializeCompressionContext(conf, path); - // Create a SF.Writer instance. - try { - // reflection for a version of SequenceFile.createWriter that doesn't - // automatically create the parent directory (see HBASE-2312) - this.writer = (SequenceFile.Writer) SequenceFile.class - .getMethod("createWriter", new Class[] {FileSystem.class, - Configuration.class, Path.class, Class.class, Class.class, - Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE, - CompressionType.class, CompressionCodec.class, Metadata.class}) - .invoke(null, new Object[] {fs, conf, path, HLogKey.class, WALEdit.class, - Integer.valueOf(FSUtils.getDefaultBufferSize(fs)), - Short.valueOf((short) - conf.getInt("hbase.regionserver.hlog.replication", - FSUtils.getDefaultReplication(fs, path))), - Long.valueOf(conf.getLong("hbase.regionserver.hlog.blocksize", - FSUtils.getDefaultBlockSize(fs, path))), - Boolean.valueOf(false) /*createParent*/, - SequenceFile.CompressionType.NONE, new DefaultCodec(), - createMetadata(conf, compress) - }); - } catch (InvocationTargetException ite) { - // function was properly called, but threw it's own exception - throw new IOException(ite.getCause()); - } catch (Exception e) { - // ignore all other exceptions. related to reflection failure - } - - // if reflection failed, use the old createWriter - if (this.writer == null) { - LOG.debug("new createWriter -- HADOOP-6840 -- not available"); - this.writer = SequenceFile.createWriter(fs, conf, path, - HLogKey.class, WALEdit.class, - FSUtils.getDefaultBufferSize(fs), - (short) conf.getInt("hbase.regionserver.hlog.replication", - FSUtils.getDefaultReplication(fs, path)), - conf.getLong("hbase.regionserver.hlog.blocksize", - FSUtils.getDefaultBlockSize(fs, path)), - SequenceFile.CompressionType.NONE, - new DefaultCodec(), - null, - createMetadata(conf, compress)); - } else { - if (LOG.isTraceEnabled()) LOG.trace("Using new createWriter -- HADOOP-6840"); - } - - this.writer_out = getSequenceFilePrivateFSDataOutputStreamAccessible(); - if (LOG.isTraceEnabled()) LOG.trace("Path=" + path + ", compression=" + compress); - } - - // Get at the private FSDataOutputStream inside in SequenceFile so we can - // call sync on it. Make it accessible. - private FSDataOutputStream getSequenceFilePrivateFSDataOutputStreamAccessible() - throws IOException { - FSDataOutputStream out = null; - final Field fields [] = this.writer.getClass().getDeclaredFields(); - final String fieldName = "out"; - for (int i = 0; i < fields.length; ++i) { - if (fieldName.equals(fields[i].getName())) { - try { - // Make the 'out' field up in SF.Writer accessible. - fields[i].setAccessible(true); - out = (FSDataOutputStream)fields[i].get(this.writer); - break; - } catch (IllegalAccessException ex) { - throw new IOException("Accessing " + fieldName, ex); - } catch (SecurityException e) { - LOG.warn("Does not have access to out field from FSDataOutputStream", - e); - } - } - } - return out; - } - - @Override - public void append(WAL.Entry entry) throws IOException { - entry.setCompressionContext(compressionContext); - try { - this.writer.append(entry.getKey(), entry.getEdit()); - } catch (NullPointerException npe) { - // Concurrent close... - throw new IOException(npe); - } - } - - @Override - public void close() throws IOException { - if (this.writer != null) { - try { - this.writer.close(); - } catch (NullPointerException npe) { - // Can get a NPE coming up from down in DFSClient$DFSOutputStream#close - LOG.warn(npe); - } - this.writer = null; - } - } - - @Override - public void sync() throws IOException { - try { - this.writer.syncFs(); - } catch (NullPointerException npe) { - // Concurrent close... - throw new IOException(npe); - } - } - - @Override - public long getLength() throws IOException { - try { - return this.writer.getLength(); - } catch (NullPointerException npe) { - // Concurrent close... - throw new IOException(npe); - } - } - - /** - * @return The dfsclient out stream up inside SF.Writer made accessible, or - * null if not available. - */ - public FSDataOutputStream getWriterFSDataOutputStream() { - return this.writer_out; - } -} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestReadOldRootAndMetaEdits.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestReadOldRootAndMetaEdits.java deleted file mode 100644 index 8bdb33c..0000000 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestReadOldRootAndMetaEdits.java +++ /dev/null @@ -1,161 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver.wal; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.wal.WALProvider; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -/** - * Tests to read old ROOT, Meta edits. - */ -@Category({RegionServerTests.class, MediumTests.class}) - -public class TestReadOldRootAndMetaEdits { - - private final static Log LOG = LogFactory.getLog(TestReadOldRootAndMetaEdits.class); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static Configuration conf; - private static FileSystem fs; - private static Path dir; - - @BeforeClass - public static void setupBeforeClass() throws Exception { - conf = TEST_UTIL.getConfiguration(); - conf.setClass("hbase.regionserver.hlog.writer.impl", - SequenceFileLogWriter.class, WALProvider.Writer.class); - fs = TEST_UTIL.getTestFileSystem(); - dir = new Path(TEST_UTIL.createRootDir(), "testReadOldRootAndMetaEdits"); - fs.mkdirs(dir); - - } - @AfterClass - public static void tearDownAfterClass() throws Exception { - } - - /** - * Inserts three waledits in the wal file, and reads them back. The first edit is of a regular - * table, second waledit is for the ROOT table (it will be ignored while reading), - * and last waledit is for the hbase:meta table, which will be linked to the new system:meta table. - * @throws IOException - */ - @Test - public void testReadOldRootAndMetaEdits() throws IOException { - LOG.debug("testReadOldRootAndMetaEdits"); - // kv list to be used for all WALEdits. - byte[] row = Bytes.toBytes("row"); - KeyValue kv = new KeyValue(row, row, row, row); - List kvs = new ArrayList(); - kvs.add(kv); - - WALProvider.Writer writer = null; - WAL.Reader reader = null; - // a regular table - TableName t = TableName.valueOf("t"); - HRegionInfo tRegionInfo = null; - int logCount = 0; - long timestamp = System.currentTimeMillis(); - Path path = new Path(dir, "t"); - try { - tRegionInfo = new HRegionInfo(t, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); - WAL.Entry tEntry = createAEntry(new HLogKey(tRegionInfo.getEncodedNameAsBytes(), t, - ++logCount, timestamp, HConstants.DEFAULT_CLUSTER_ID), kvs); - - // create a old root edit (-ROOT-). - WAL.Entry rootEntry = createAEntry(new HLogKey(Bytes.toBytes(TableName.OLD_ROOT_STR), - TableName.OLD_ROOT_TABLE_NAME, ++logCount, timestamp, - HConstants.DEFAULT_CLUSTER_ID), kvs); - - // create a old meta edit (hbase:meta). - WAL.Entry oldMetaEntry = createAEntry(new HLogKey(Bytes.toBytes(TableName.OLD_META_STR), - TableName.OLD_META_TABLE_NAME, ++logCount, timestamp, - HConstants.DEFAULT_CLUSTER_ID), kvs); - - // write above entries - writer = WALFactory.createWALWriter(fs, path, conf); - writer.append(tEntry); - writer.append(rootEntry); - writer.append(oldMetaEntry); - - // sync/close the writer - writer.sync(); - writer.close(); - - // read the log and see things are okay. - reader = WALFactory.createReader(fs, path, conf); - WAL.Entry entry = reader.next(); - assertNotNull(entry); - assertTrue(entry.getKey().getTablename().equals(t)); - assertEquals(Bytes.toString(entry.getKey().getEncodedRegionName()), - Bytes.toString(tRegionInfo.getEncodedNameAsBytes())); - - // read the ROOT waledit, but that will be ignored, and hbase:meta waledit will be read instead. - entry = reader.next(); - assertEquals(entry.getKey().getTablename(), TableName.META_TABLE_NAME); - // should reach end of log - assertNull(reader.next()); - } finally { - if (writer != null) { - writer.close(); - } - if (reader != null) { - reader.close(); - } - } -} - /** - * Creates a WALEdit for the passed KeyValues and returns a WALProvider.Entry instance composed of - * the WALEdit and passed WALKey. - * @return WAL.Entry instance for the passed WALKey and KeyValues - */ - private WAL.Entry createAEntry(WALKey walKey, List kvs) { - WALEdit edit = new WALEdit(); - for (KeyValue kv : kvs ) - edit.add(kv); - return new WAL.Entry(walKey, edit); - } - -} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 74445cf..3ee851b 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -18,11 +18,9 @@ */ package org.apache.hadoop.hbase.wal; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -54,9 +52,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; -import org.apache.hadoop.hbase.regionserver.wal.HLogKey; -import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogReader; -import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogWriter; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -454,6 +449,7 @@ public class TestWALFactory { class RecoverLogThread extends Thread { public Exception exception = null; + @Override public void run() { try { FSUtils.getInstance(fs, rlConf) @@ -675,74 +671,6 @@ public class TestWALFactory { assertNotNull(c); } - /** - * @throws IOException - */ - @Test - public void testReadLegacyLog() throws IOException { - final int columnCount = 5; - final int recordCount = 5; - final TableName tableName = - TableName.valueOf("tablename"); - final byte[] row = Bytes.toBytes("row"); - long timestamp = System.currentTimeMillis(); - Path path = new Path(dir, "tempwal"); - SequenceFileLogWriter sflw = null; - WAL.Reader reader = null; - try { - HRegionInfo hri = new HRegionInfo(tableName, - HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); - HTableDescriptor htd = new HTableDescriptor(tableName); - fs.mkdirs(dir); - // Write log in pre-PB format. - sflw = new SequenceFileLogWriter(); - sflw.init(fs, path, conf, false); - for (int i = 0; i < recordCount; ++i) { - WALKey key = new HLogKey( - hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID); - WALEdit edit = new WALEdit(); - for (int j = 0; j < columnCount; ++j) { - if (i == 0) { - htd.addFamily(new HColumnDescriptor("column" + j)); - } - String value = i + "" + j; - edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value))); - } - sflw.append(new WAL.Entry(key, edit)); - } - sflw.sync(); - sflw.close(); - - // Now read the log using standard means. - reader = wals.createReader(fs, path); - assertTrue(reader instanceof SequenceFileLogReader); - for (int i = 0; i < recordCount; ++i) { - WAL.Entry entry = reader.next(); - assertNotNull(entry); - assertEquals(columnCount, entry.getEdit().size()); - assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName()); - assertEquals(tableName, entry.getKey().getTablename()); - int idx = 0; - for (Cell val : entry.getEdit().getCells()) { - assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(), - val.getRowLength())); - String value = i + "" + idx; - assertArrayEquals(Bytes.toBytes(value), CellUtil.cloneValue(val)); - idx++; - } - } - WAL.Entry entry = reader.next(); - assertNull(entry); - } finally { - if (sflw != null) { - sflw.close(); - } - if (reader != null) { - reader.close(); - } - } - } - static class DumbWALActionsListener extends WALActionsListener.Base { int increments = 0; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index 3136416..6162c38 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -65,7 +65,8 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.wal.FaultySequenceFileLogReader; +import org.apache.hadoop.hbase.regionserver.wal.FaultyLogReader; +import org.apache.hadoop.hbase.regionserver.wal.FaultyLogReader.FailureType; import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -539,8 +540,11 @@ public class TestWALSplit { @Test (timeout=300000) public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException { conf.setBoolean(HBASE_SKIP_ERRORS, true); - for (FaultySequenceFileLogReader.FailureType failureType : - FaultySequenceFileLogReader.FailureType.values()) { + for (FaultyLogReader.FailureType failureType : + FaultyLogReader.FailureType.values()) { + if (failureType.equals(FailureType.NONE)) { + continue; + } final Set walDirContents = splitCorruptWALs(failureType); final Set archivedLogs = new HashSet(); final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:"); @@ -558,7 +562,7 @@ public class TestWALSplit { * @return set of wal names present prior to split attempt. * @throws IOException if the split process fails */ - private Set splitCorruptWALs(final FaultySequenceFileLogReader.FailureType failureType) + private Set splitCorruptWALs(final FaultyLogReader.FailureType failureType) throws IOException { Class backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl", Reader.class); @@ -566,7 +570,7 @@ public class TestWALSplit { try { conf.setClass("hbase.regionserver.hlog.reader.impl", - FaultySequenceFileLogReader.class, Reader.class); + FaultyLogReader.class, Reader.class); conf.set("faultysequencefilelogreader.failuretype", failureType.name()); // Clean up from previous tests or previous loop try { @@ -604,7 +608,7 @@ public class TestWALSplit { public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException { conf.setBoolean(HBASE_SKIP_ERRORS, false); - splitCorruptWALs(FaultySequenceFileLogReader.FailureType.BEGINNING); + splitCorruptWALs(FaultyLogReader.FailureType.BEGINNING); } @Test (timeout=300000) @@ -612,7 +616,7 @@ public class TestWALSplit { throws IOException { conf.setBoolean(HBASE_SKIP_ERRORS, false); try { - splitCorruptWALs(FaultySequenceFileLogReader.FailureType.BEGINNING); + splitCorruptWALs(FaultyLogReader.FailureType.BEGINNING); } catch (IOException e) { LOG.debug("split with 'skip errors' set to 'false' correctly threw"); } @@ -784,6 +788,7 @@ public class TestWALSplit { someOldThread.setDaemon(true); someOldThread.start(); final Thread t = new Thread("Background-thread-dumper") { + @Override public void run() { try { Threads.threadDumpingIsAlive(someOldThread); @@ -852,6 +857,7 @@ public class TestWALSplit { "Blocklist for " + OLDLOGDIR + " has changed"}; private int count = 0; + @Override public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { if (count < 3) { throw new IOException(errors[count++]); @@ -881,6 +887,7 @@ public class TestWALSplit { FileSystem spiedFs = Mockito.spy(fs); Mockito.doAnswer(new Answer() { + @Override public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { Thread.sleep(1500); // Sleep a while and wait report status invoked return (FSDataInputStream)invocation.callRealMethod();